1
|
|
|
<?php |
2
|
|
|
declare(strict_types=1); |
3
|
|
|
|
4
|
|
|
namespace TBolier\RethinkQL\Connection; |
5
|
|
|
|
6
|
|
|
use Psr\Http\Message\StreamInterface; |
7
|
|
|
use TBolier\RethinkQL\Connection\Socket\HandshakeInterface; |
8
|
|
|
use TBolier\RethinkQL\Connection\Socket\StreamHandlerInterface; |
9
|
|
|
use TBolier\RethinkQL\Query\Expr; |
10
|
|
|
use TBolier\RethinkQL\Query\Message; |
11
|
|
|
use TBolier\RethinkQL\Query\MessageInterface; |
12
|
|
|
use TBolier\RethinkQL\Query\Query; |
13
|
|
|
use TBolier\RethinkQL\Types\Query\QueryType; |
14
|
|
|
use TBolier\RethinkQL\Types\Response\ResponseType; |
15
|
|
|
use TBolier\RethinkQL\Types\Term\TermType; |
16
|
|
|
|
17
|
|
|
class Connection implements ConnectionInterface |
18
|
|
|
{ |
19
|
|
|
/** |
20
|
|
|
* @var OptionsInterface |
21
|
|
|
*/ |
22
|
|
|
private $options; |
23
|
|
|
|
24
|
|
|
/** |
25
|
|
|
* @var int[] |
26
|
|
|
*/ |
27
|
|
|
private $activeTokens; |
28
|
|
|
|
29
|
|
|
/** |
30
|
|
|
* @var StreamInterface |
31
|
|
|
*/ |
32
|
|
|
private $stream; |
33
|
|
|
|
34
|
|
|
/** |
35
|
|
|
* @var string |
36
|
|
|
*/ |
37
|
|
|
private $dbname; |
38
|
|
|
|
39
|
|
|
/** |
40
|
|
|
* @var bool |
41
|
|
|
*/ |
42
|
|
|
private $noReply = false; |
43
|
|
|
|
44
|
|
|
/** |
45
|
|
|
* @var \Closure |
46
|
|
|
*/ |
47
|
|
|
private $streamWrapper; |
48
|
|
|
|
49
|
|
|
/** |
50
|
|
|
* @var HandshakeInterface |
51
|
|
|
*/ |
52
|
|
|
private $handshake; |
53
|
|
|
|
54
|
|
|
/** |
55
|
|
|
* @param \Closure $streamWrapper |
56
|
|
|
* @param HandshakeInterface $handshake |
57
|
|
|
* @param string $dbName |
58
|
|
|
*/ |
59
|
|
|
public function __construct(\Closure $streamWrapper, HandshakeInterface $handshake, string $dbName) |
60
|
|
|
{ |
61
|
|
|
$this->streamWrapper = $streamWrapper; |
62
|
|
|
$this->dbName = $dbName; |
|
|
|
|
63
|
|
|
$this->handshake = $handshake; |
64
|
|
|
} |
65
|
|
|
|
66
|
|
|
/** |
67
|
|
|
* @inheritdoc |
68
|
|
|
* @throws ConnectionException |
69
|
|
|
*/ |
70
|
|
|
public function connect(): self |
71
|
|
|
{ |
72
|
|
|
if ($this->isOpenStream()) { |
73
|
|
|
return $this; |
74
|
|
|
} |
75
|
|
|
|
76
|
|
|
try { |
77
|
|
|
$this->stream = ($this->streamWrapper)(); |
78
|
|
|
$this->handshake->helo($this->stream); |
79
|
|
|
} catch (\Exception $e) { |
80
|
|
|
throw new ConnectionException($e->getMessage(), $e->getCode(), $e); |
81
|
|
|
} |
82
|
|
|
|
83
|
|
|
return $this; |
84
|
|
|
} |
85
|
|
|
|
86
|
|
|
/** |
87
|
|
|
* @param bool $noReplyWait |
88
|
|
|
* @throws ConnectionException |
89
|
|
|
* @throws \Exception |
90
|
|
|
*/ |
91
|
|
|
public function close($noReplyWait = true): void |
92
|
|
|
{ |
93
|
|
|
if (!$this->isOpenStream()) { |
94
|
|
|
throw new ConnectionException('Not connected.'); |
95
|
|
|
} |
96
|
|
|
|
97
|
|
|
if ($noReplyWait) { |
98
|
|
|
$this->noReplyWait(); |
99
|
|
|
} |
100
|
|
|
|
101
|
|
|
$this->stream->close(); |
102
|
|
|
} |
103
|
|
|
|
104
|
|
|
/** |
105
|
|
|
* @throws ConnectionException |
106
|
|
|
* @throws \Exception |
107
|
|
|
*/ |
108
|
|
|
private function noReplyWait(): void |
109
|
|
|
{ |
110
|
|
|
if (!$this->isOpenStream()) { |
111
|
|
|
throw new ConnectionException('Not connected.'); |
112
|
|
|
} |
113
|
|
|
|
114
|
|
|
try { |
115
|
|
|
$token = $this->generateToken(); |
116
|
|
|
|
117
|
|
|
$query = new Message(QueryType::NOREPLY_WAIT); |
118
|
|
|
$this->writeQuery($token, $query); |
119
|
|
|
|
120
|
|
|
// Await the response |
121
|
|
|
$response = $this->receiveResponse($token, $query); |
122
|
|
|
|
123
|
|
|
if ($response['t'] !== 4) { |
124
|
|
|
throw new ConnectionException('Unexpected response type for noreplyWait query.'); |
125
|
|
|
} |
126
|
|
|
} catch (\Exception $e) { |
127
|
|
|
throw new ConnectionException($e->getMessage(), $e->getCode(), $e); |
128
|
|
|
} |
129
|
|
|
} |
130
|
|
|
|
131
|
|
|
/** |
132
|
|
|
* @param MessageInterface $message |
133
|
|
|
* @return array |
134
|
|
|
* @throws ConnectionException |
135
|
|
|
*/ |
136
|
|
|
public function run(MessageInterface $message): array |
137
|
|
|
{ |
138
|
|
|
if (!$this->isOpenStream()) { |
139
|
|
|
throw new ConnectionException('Not connected.'); |
140
|
|
|
} |
141
|
|
|
|
142
|
|
|
try { |
143
|
|
|
$token = $this->generateToken(); |
144
|
|
|
|
145
|
|
|
if ($message instanceof Query) { |
146
|
|
|
$message->setQuery($this->utf8Converter($message->getQuery())); |
147
|
|
|
} |
148
|
|
|
|
149
|
|
|
$this->writeQuery($token, $message); |
150
|
|
|
|
151
|
|
|
if ($this->noReply) { |
152
|
|
|
return []; |
153
|
|
|
} |
154
|
|
|
|
155
|
|
|
// Await the response |
156
|
|
|
$response = $this->receiveResponse($token, $message); |
157
|
|
|
|
158
|
|
|
// Todo: support all response types, and decide what the return type should be. |
159
|
|
|
if ($response['t'] === ResponseType::SUCCESS_PARTIAL) { |
160
|
|
|
$this->activeTokens[$token] = true; |
161
|
|
|
} |
162
|
|
|
|
163
|
|
|
|
164
|
|
|
return $response['r']; |
165
|
|
|
} catch (\Exception $e) { |
166
|
|
|
throw new ConnectionException($e->getMessage(), $e->getCode(), $e); |
167
|
|
|
} |
168
|
|
|
} |
169
|
|
|
|
170
|
|
|
/** |
171
|
|
|
* @param MessageInterface $query |
172
|
|
|
* @return array |
173
|
|
|
* @throws ConnectionException |
174
|
|
|
*/ |
175
|
|
|
public function runNoReply(MessageInterface $query): array |
176
|
|
|
{ |
177
|
|
|
$this->noReply = true; |
178
|
|
|
$this->run($query); |
179
|
|
|
$this->noReply = false; |
180
|
|
|
} |
181
|
|
|
|
182
|
|
|
/** |
183
|
|
|
* @param MessageInterface $message |
184
|
|
|
* @return mixed |
185
|
|
|
*/ |
186
|
|
|
private function utf8Converter(MessageInterface $message): MessageInterface |
187
|
|
|
{ |
188
|
|
|
if (null !== $message->getQuery()) { |
189
|
|
|
return $message; |
190
|
|
|
} |
191
|
|
|
|
192
|
|
|
array_walk_recursive($message->getQuery(), function (&$item) { |
|
|
|
|
193
|
|
|
if (is_scalar($item) && !mb_detect_encoding((string)$item, 'utf-8', true)) { |
194
|
|
|
$item = utf8_encode($item); |
195
|
|
|
} |
196
|
|
|
}); |
197
|
|
|
|
198
|
|
|
return $message; |
199
|
|
|
} |
200
|
|
|
|
201
|
|
|
/** |
202
|
|
|
* @return int |
203
|
|
|
* @throws \Exception |
204
|
|
|
*/ |
205
|
|
|
private function generateToken(): int |
206
|
|
|
{ |
207
|
|
|
try { |
208
|
|
|
$tries = 0; |
209
|
|
|
$maxToken = 1 << 30; |
210
|
|
|
do { |
211
|
|
|
$token = \random_int(0, $maxToken); |
212
|
|
|
$haveCollision = isset($this->activeTokens[$token]); |
213
|
|
|
} while ($haveCollision && $tries++ < 1024); |
214
|
|
|
if ($haveCollision) { |
215
|
|
|
throw new \Exception('Unable to generate a unique token for the query.'); |
216
|
|
|
} |
217
|
|
|
} catch (\Exception $e) { |
218
|
|
|
throw new ConnectionException('Generating the token failed.', $e->getCode(), $e); |
219
|
|
|
} |
220
|
|
|
|
221
|
|
|
return $token; |
222
|
|
|
} |
223
|
|
|
|
224
|
|
|
/** |
225
|
|
|
* @param int $token |
226
|
|
|
* @param MessageInterface $message |
227
|
|
|
* @return int |
228
|
|
|
* @throws \Exception |
229
|
|
|
*/ |
230
|
|
|
private function writeQuery(int $token, MessageInterface $message): int |
231
|
|
|
{ |
232
|
|
|
$message->setOptions([ |
233
|
|
|
'db' => [ |
234
|
|
|
TermType::DB, |
235
|
|
|
[$this->dbname], |
236
|
|
|
(object)[], |
237
|
|
|
], |
238
|
|
|
]); |
239
|
|
|
|
240
|
|
|
$request = json_encode($message); |
241
|
|
|
|
242
|
|
|
switch (json_last_error()) { |
243
|
|
|
case JSON_ERROR_DEPTH: |
244
|
|
|
throw new ConnectionException('JSON error: Maximum stack depth exceeded'); |
245
|
|
|
case JSON_ERROR_STATE_MISMATCH: |
246
|
|
|
throw new ConnectionException('JSON error: Underflow or the modes mismatch'); |
247
|
|
|
case JSON_ERROR_CTRL_CHAR: |
248
|
|
|
throw new ConnectionException('JSON error: Unexpected control character found'); |
249
|
|
|
case JSON_ERROR_SYNTAX: |
250
|
|
|
throw new ConnectionException('JSON error: Syntax error, malformed JSON.'); |
251
|
|
|
case JSON_ERROR_UTF8: |
252
|
|
|
throw new ConnectionException('JSON error: Malformed UTF-8 characters, possibly incorrectly encoded.'); |
253
|
|
|
case JSON_ERROR_NONE: |
254
|
|
|
break; |
255
|
|
|
default: |
256
|
|
|
throw new ConnectionException('Failed to encode query as JSON: ' . json_last_error()); |
257
|
|
|
break; |
|
|
|
|
258
|
|
|
} |
259
|
|
|
|
260
|
|
|
if ($request === false) { |
261
|
|
|
throw new ConnectionException('Failed to encode query as JSON: ' . json_last_error()); |
262
|
|
|
} |
263
|
|
|
|
264
|
|
|
$requestSize = pack('V', \strlen($request)); |
265
|
|
|
$binaryToken = pack('V', $token) . pack('V', 0); |
266
|
|
|
|
267
|
|
|
return $this->stream->write($binaryToken . $requestSize . $request); |
268
|
|
|
} |
269
|
|
|
|
270
|
|
|
/** |
271
|
|
|
* @param int $token |
272
|
|
|
* @param MessageInterface $message |
273
|
|
|
* @return array |
274
|
|
|
* @throws \RuntimeException |
275
|
|
|
* @throws ConnectionException |
276
|
|
|
*/ |
277
|
|
|
private function receiveResponse(int $token, MessageInterface $message): array |
278
|
|
|
{ |
279
|
|
|
$responseHeader = $this->stream->read(4 + 8); |
280
|
|
|
$responseHeader = unpack('Vtoken/Vtoken2/Vsize', $responseHeader); |
281
|
|
|
$responseToken = $responseHeader['token']; |
282
|
|
|
if ($responseHeader['token2'] !== 0) { |
283
|
|
|
throw new ConnectionException('Invalid response from server: Invalid token.'); |
284
|
|
|
} |
285
|
|
|
|
286
|
|
|
$responseSize = $responseHeader['size']; |
287
|
|
|
$responseBuf = $this->stream->read($responseSize); |
288
|
|
|
|
289
|
|
|
$response = json_decode($responseBuf, true); |
290
|
|
|
$this->validateResponse($response, $responseToken, $token, $message); |
291
|
|
|
|
292
|
|
|
return $response; |
293
|
|
|
} |
294
|
|
|
|
295
|
|
|
/** |
296
|
|
|
* @param array $response |
297
|
|
|
* @param int $responseToken |
298
|
|
|
* @param int $token |
299
|
|
|
* @param MessageInterface $message |
300
|
|
|
* @throws ConnectionException |
301
|
|
|
*/ |
302
|
|
|
private function validateResponse(array $response, int $responseToken, int $token, MessageInterface $message): void |
303
|
|
|
{ |
304
|
|
|
if (isset($response['error'])) { |
305
|
|
|
throw new ConnectionException($response['error'], $response['error_code']); |
306
|
|
|
} |
307
|
|
|
|
308
|
|
|
if (!isset($response['t'])) { |
309
|
|
|
throw new ConnectionException('Response message has no type.'); |
310
|
|
|
} |
311
|
|
|
|
312
|
|
|
if ($response['t'] === ResponseType::CLIENT_ERROR) { |
313
|
|
|
throw new ConnectionException('Server says PHP-RQL is buggy: ' . $response['r'][0]); |
314
|
|
|
} |
315
|
|
|
|
316
|
|
|
if ($responseToken !== $token) { |
317
|
|
|
throw new ConnectionException( |
318
|
|
|
'Received wrong token. Response does not match the request. ' |
319
|
|
|
. 'Expected ' . $token . ', received ' . $responseToken |
320
|
|
|
); |
321
|
|
|
} |
322
|
|
|
|
323
|
|
|
if ($response['t'] === ResponseType::COMPILE_ERROR) { |
324
|
|
|
throw new ConnectionException('Compile error: ' . $response['r'][0] . ', jsonQuery: ' . json_encode($message)); |
325
|
|
|
} |
326
|
|
|
|
327
|
|
|
if ($response['t'] === ResponseType::RUNTIME_ERROR) { |
328
|
|
|
throw new ConnectionException('Runtime error: ' . $response['r'][0] . ', jsonQuery: ' . json_encode($message)); |
329
|
|
|
} |
330
|
|
|
} |
331
|
|
|
|
332
|
|
|
/** |
333
|
|
|
* @inheritdoc |
334
|
|
|
*/ |
335
|
|
|
public function use(string $name): void |
336
|
|
|
{ |
337
|
|
|
$this->dbname = $name; |
338
|
|
|
} |
339
|
|
|
|
340
|
|
|
/** |
341
|
|
|
* @inheritdoc |
342
|
|
|
*/ |
343
|
|
|
public function isOpenStream(): bool |
344
|
|
|
{ |
345
|
|
|
return ($this->stream !== null && $this->stream->isWritable()); |
346
|
|
|
} |
347
|
|
|
|
348
|
|
|
/** |
349
|
|
|
* @param MessageInterface $query |
350
|
|
|
* @return array |
351
|
|
|
*/ |
352
|
|
|
public function changes(MessageInterface $query): array |
353
|
|
|
{ |
354
|
|
|
// TODO: Implement changes() method. |
355
|
|
|
} |
356
|
|
|
|
357
|
|
|
/** |
358
|
|
|
* @return array |
359
|
|
|
* @throws \Exception |
360
|
|
|
*/ |
361
|
|
|
public function server(): array |
362
|
|
|
{ |
363
|
|
|
if (!$this->isOpenStream()) { |
364
|
|
|
throw new ConnectionException('Not connected.'); |
365
|
|
|
} |
366
|
|
|
|
367
|
|
|
try { |
368
|
|
|
$token = $this->generateToken(); |
369
|
|
|
|
370
|
|
|
$query = new Message(QueryType::SERVER_INFO); |
371
|
|
|
$this->writeQuery($token, $query); |
372
|
|
|
|
373
|
|
|
// Await the response |
374
|
|
|
$response = $this->receiveResponse($token, $query); |
375
|
|
|
|
376
|
|
|
if ($response['t'] !== 5) { |
377
|
|
|
throw new ConnectionException('Unexpected response type for server query.'); |
378
|
|
|
} |
379
|
|
|
} catch (\Exception $e) { |
380
|
|
|
throw new ConnectionException($e->getMessage(), $e->getCode(), $e); |
381
|
|
|
} |
382
|
|
|
|
383
|
|
|
return $response; |
384
|
|
|
} |
385
|
|
|
|
386
|
|
|
/** |
387
|
|
|
* @param string $string |
388
|
|
|
* @return array |
389
|
|
|
* @throws ConnectionException |
390
|
|
|
*/ |
391
|
|
|
public function expr(string $string): array |
392
|
|
|
{ |
393
|
|
|
$message = new Message(); |
394
|
|
|
$message->setQueryType(QueryType::START) |
395
|
|
|
->setQuery(new Expr($string)); |
396
|
|
|
|
397
|
|
|
return $this->run($message); |
398
|
|
|
} |
399
|
|
|
} |
400
|
|
|
|
An attempt at access to an undefined property has been detected. This may either be a typographical error or the property has been renamed but there are still references to its old name.
If you really want to allow access to undefined properties, you can define magic methods to allow access. See the php core documentation on Overloading.