1
|
|
|
<?php |
2
|
|
|
declare(strict_types=1); |
3
|
|
|
|
4
|
|
|
namespace TBolier\RethinkQL\Connection; |
5
|
|
|
|
6
|
|
|
use Psr\Http\Message\StreamInterface; |
7
|
|
|
use Symfony\Component\Serializer\SerializerInterface; |
8
|
|
|
use TBolier\RethinkQL\Connection\Socket\Exception; |
9
|
|
|
use TBolier\RethinkQL\Connection\Socket\HandshakeInterface; |
10
|
|
|
use TBolier\RethinkQL\Query\Expr; |
11
|
|
|
use TBolier\RethinkQL\Query\Message; |
12
|
|
|
use TBolier\RethinkQL\Query\MessageInterface; |
13
|
|
|
use TBolier\RethinkQL\Query\Options as QueryOptions; |
14
|
|
|
use TBolier\RethinkQL\Query\Query; |
15
|
|
|
use TBolier\RethinkQL\Response\Cursor; |
16
|
|
|
use TBolier\RethinkQL\Response\Response; |
17
|
|
|
use TBolier\RethinkQL\Response\ResponseInterface; |
18
|
|
|
use TBolier\RethinkQL\Types\Query\QueryType; |
19
|
|
|
use TBolier\RethinkQL\Types\Response\ResponseType; |
20
|
|
|
|
21
|
|
|
class Connection implements ConnectionInterface, ConnectionCursorInterface |
22
|
|
|
{ |
23
|
|
|
/** |
24
|
|
|
* @var int[] |
25
|
|
|
*/ |
26
|
|
|
private $activeTokens; |
27
|
|
|
|
28
|
|
|
/** |
29
|
|
|
* @var StreamInterface |
30
|
|
|
*/ |
31
|
|
|
private $stream; |
32
|
|
|
|
33
|
|
|
/** |
34
|
|
|
* @var string |
35
|
|
|
*/ |
36
|
|
|
private $dbName; |
37
|
|
|
|
38
|
|
|
/** |
39
|
|
|
* @var bool |
40
|
|
|
*/ |
41
|
|
|
private $noReply = false; |
42
|
|
|
|
43
|
|
|
/** |
44
|
|
|
* @var \Closure |
45
|
|
|
*/ |
46
|
|
|
private $streamWrapper; |
47
|
|
|
|
48
|
|
|
/** |
49
|
|
|
* @var HandshakeInterface |
50
|
|
|
*/ |
51
|
|
|
private $handshake; |
52
|
|
|
|
53
|
|
|
/** |
54
|
|
|
* @var SerializerInterface |
55
|
|
|
*/ |
56
|
|
|
private $querySerializer; |
57
|
|
|
|
58
|
|
|
/** |
59
|
|
|
* @var SerializerInterface |
60
|
|
|
*/ |
61
|
|
|
private $responseSerializer; |
62
|
|
|
|
63
|
|
|
/** |
64
|
|
|
* @param \Closure $streamWrapper |
65
|
|
|
* @param HandshakeInterface $handshake |
66
|
|
|
* @param string $dbName |
67
|
|
|
* @param SerializerInterface $querySerializer |
68
|
|
|
* @param SerializerInterface $responseSerializer |
69
|
|
|
*/ |
70
|
36 |
|
public function __construct( |
71
|
|
|
\Closure $streamWrapper, |
72
|
|
|
HandshakeInterface $handshake, |
73
|
|
|
string $dbName, |
74
|
|
|
SerializerInterface $querySerializer, |
75
|
|
|
SerializerInterface $responseSerializer |
76
|
|
|
) { |
77
|
36 |
|
$this->streamWrapper = $streamWrapper; |
78
|
36 |
|
$this->dbName = $dbName; |
79
|
36 |
|
$this->handshake = $handshake; |
80
|
36 |
|
$this->querySerializer = $querySerializer; |
81
|
36 |
|
$this->responseSerializer = $responseSerializer; |
82
|
36 |
|
} |
83
|
|
|
|
84
|
|
|
/** |
85
|
|
|
* @inheritdoc |
86
|
|
|
* @throws ConnectionException |
87
|
|
|
*/ |
88
|
30 |
|
public function connect(): self |
89
|
|
|
{ |
90
|
30 |
|
if ($this->isStreamOpen()) { |
91
|
13 |
|
return $this; |
92
|
|
|
} |
93
|
|
|
|
94
|
|
|
try { |
95
|
30 |
|
$this->stream = ($this->streamWrapper)(); |
96
|
30 |
|
$this->handshake->hello($this->stream); |
97
|
1 |
|
} catch (\Exception $e) { |
98
|
1 |
|
throw new ConnectionException($e->getMessage(), $e->getCode(), $e); |
99
|
|
|
} |
100
|
|
|
|
101
|
29 |
|
return $this; |
102
|
|
|
} |
103
|
|
|
|
104
|
|
|
/** |
105
|
|
|
* @param bool $noReplyWait |
106
|
|
|
* @throws ConnectionException |
107
|
|
|
* @throws \Exception |
108
|
|
|
*/ |
109
|
6 |
|
public function close($noReplyWait = true): void |
110
|
|
|
{ |
111
|
6 |
|
if (!$this->isStreamOpen()) { |
112
|
1 |
|
throw new ConnectionException('Not connected.'); |
113
|
|
|
} |
114
|
|
|
|
115
|
5 |
|
if ($noReplyWait) { |
116
|
2 |
|
$this->noReplyWait(); |
117
|
|
|
} |
118
|
|
|
|
119
|
4 |
|
$this->stream->close(); |
120
|
4 |
|
} |
121
|
|
|
|
122
|
|
|
/** |
123
|
|
|
* @throws ConnectionException |
124
|
|
|
* @throws \Exception |
125
|
|
|
*/ |
126
|
2 |
|
private function noReplyWait(): void |
127
|
|
|
{ |
128
|
2 |
|
if (!$this->isStreamOpen()) { |
129
|
|
|
throw new ConnectionException('Not connected.'); |
130
|
|
|
} |
131
|
|
|
|
132
|
|
|
try { |
133
|
2 |
|
$token = $this->generateToken(); |
134
|
|
|
|
135
|
1 |
|
$query = new Message(QueryType::NOREPLY_WAIT); |
136
|
1 |
|
$this->writeQuery($token, $query); |
137
|
|
|
|
138
|
|
|
// Await the response |
139
|
1 |
|
$response = $this->receiveResponse($token, $query); |
140
|
|
|
|
141
|
1 |
|
if ($response->getType() !== 4) { |
142
|
1 |
|
throw new ConnectionException('Unexpected response type for noreplyWait query.'); |
143
|
|
|
} |
144
|
1 |
|
} catch (\Exception $e) { |
145
|
1 |
|
throw new ConnectionException($e->getMessage(), $e->getCode(), $e); |
146
|
|
|
} |
147
|
1 |
|
} |
148
|
|
|
|
149
|
|
|
/** |
150
|
|
|
* @param MessageInterface $message |
151
|
|
|
* @param bool $raw |
152
|
|
|
* @return ResponseInterface|Cursor |
153
|
|
|
* @throws ConnectionException |
154
|
|
|
*/ |
155
|
24 |
|
public function run(MessageInterface $message, $raw = false) |
156
|
|
|
{ |
157
|
24 |
|
if (!$this->isStreamOpen()) { |
158
|
1 |
|
throw new ConnectionException('Not connected.'); |
159
|
|
|
} |
160
|
|
|
|
161
|
|
|
try { |
162
|
23 |
|
$token = $this->generateToken(); |
163
|
|
|
|
164
|
22 |
|
$this->writeQuery($token, $message); |
165
|
|
|
|
166
|
21 |
|
if ($this->noReply) { |
167
|
1 |
|
return; |
168
|
|
|
} |
169
|
|
|
|
170
|
20 |
|
$response = $this->receiveResponse($token, $message); |
171
|
|
|
|
172
|
14 |
|
if ($response->getType() === ResponseType::SUCCESS_PARTIAL) { |
173
|
1 |
|
$this->activeTokens[$token] = true; |
174
|
|
|
} |
175
|
|
|
|
176
|
14 |
|
if ($raw || $response->getType() === ResponseType::SUCCESS_ATOM) { |
177
|
13 |
|
return $response; |
178
|
|
|
} |
179
|
|
|
|
180
|
3 |
|
return $this->createCursorFromResponse($response, $token, $message); |
|
|
|
|
181
|
8 |
|
} catch (\Exception $e) { |
182
|
8 |
|
throw new ConnectionException($e->getMessage(), $e->getCode(), $e); |
183
|
|
|
} |
184
|
|
|
} |
185
|
|
|
|
186
|
|
|
/** |
187
|
|
|
* @inheritdoc |
188
|
|
|
* @throws ConnectionException |
189
|
|
|
*/ |
190
|
1 |
|
public function rewindFromCursor(MessageInterface $message): ResponseInterface |
191
|
|
|
{ |
192
|
1 |
|
return $this->run($message, true); |
193
|
|
|
} |
194
|
|
|
|
195
|
|
|
/** |
196
|
|
|
* @param ResponseInterface $response |
197
|
|
|
* @param int $token |
198
|
|
|
* @param MessageInterface $message |
199
|
|
|
* @return Cursor |
200
|
|
|
*/ |
201
|
3 |
|
private function createCursorFromResponse(ResponseInterface $response, int $token, MessageInterface $message): Cursor |
202
|
|
|
{ |
203
|
3 |
|
return new Cursor($this, $token, $response, $message); |
204
|
|
|
} |
205
|
|
|
|
206
|
|
|
/** |
207
|
|
|
* @param MessageInterface $query |
208
|
|
|
* @return ResponseInterface|Cursor |
209
|
|
|
* @throws ConnectionException |
210
|
|
|
*/ |
211
|
1 |
|
public function runNoReply(MessageInterface $query) |
212
|
|
|
{ |
213
|
1 |
|
$this->noReply = true; |
214
|
1 |
|
$result = $this->run($query); |
|
|
|
|
215
|
1 |
|
$this->noReply = false; |
216
|
|
|
|
217
|
1 |
|
return $result; |
218
|
|
|
} |
219
|
|
|
|
220
|
|
|
/** |
221
|
|
|
* @return int |
222
|
|
|
* @throws \Exception |
223
|
|
|
*/ |
224
|
27 |
|
private function generateToken(): int |
225
|
|
|
{ |
226
|
|
|
try { |
227
|
27 |
|
$tries = 0; |
228
|
27 |
|
$maxToken = 1 << 30; |
229
|
|
|
do { |
230
|
27 |
|
$token = random_int(0, $maxToken); |
231
|
25 |
|
$haveCollision = isset($this->activeTokens[$token]); |
232
|
25 |
|
} while ($haveCollision && $tries++ < 1024); |
233
|
25 |
|
if ($haveCollision) { |
234
|
25 |
|
throw new \Exception('Unable to generate a unique token for the query.'); |
235
|
|
|
} |
236
|
2 |
|
} catch (\Exception $e) { |
237
|
2 |
|
throw new ConnectionException('Generating the token failed.', $e->getCode(), $e); |
238
|
|
|
} |
239
|
|
|
|
240
|
25 |
|
return $token; |
241
|
|
|
} |
242
|
|
|
|
243
|
|
|
/** |
244
|
|
|
* @inheritdoc |
245
|
|
|
* @throws \Exception |
246
|
|
|
*/ |
247
|
25 |
|
public function writeQuery(int $token, MessageInterface $message): int |
248
|
|
|
{ |
249
|
25 |
|
$message->setOptions((new QueryOptions())->setDb($this->dbName)); |
250
|
|
|
|
251
|
|
|
try { |
252
|
25 |
|
$request = $this->querySerializer->serialize($message, 'json'); |
253
|
2 |
|
} catch (\Exception $e) { |
254
|
2 |
|
throw new Exception('Serializing query message failed.', $e->getCode(), $e); |
255
|
|
|
} |
256
|
|
|
|
257
|
23 |
|
$requestSize = pack('V', \strlen($request)); |
258
|
23 |
|
$binaryToken = pack('V', $token) . pack('V', 0); |
259
|
|
|
|
260
|
23 |
|
return $this->stream->write($binaryToken . $requestSize . $request); |
261
|
|
|
} |
262
|
|
|
|
263
|
|
|
/** |
264
|
|
|
* @inheritdoc |
265
|
|
|
* @throws \Exception |
266
|
|
|
*/ |
267
|
|
|
public function continueQuery(int $token): ResponseInterface |
268
|
|
|
{ |
269
|
|
|
$message = (new Message())->setQuery( |
270
|
|
|
new Query([QueryType::CONTINUE]) |
271
|
|
|
); |
272
|
|
|
|
273
|
|
|
$this->writeQuery($token, $message); |
274
|
|
|
|
275
|
|
|
// Await the response |
276
|
|
|
$response = $this->receiveResponse($token, $message); |
277
|
|
|
|
278
|
|
|
if ($response->getType() !== ResponseType::SUCCESS_PARTIAL) { |
279
|
|
|
unset($this->activeTokens[$token]); |
280
|
|
|
} |
281
|
|
|
|
282
|
|
|
return $response; |
283
|
|
|
} |
284
|
|
|
|
285
|
|
|
/** |
286
|
|
|
* @inheritdoc |
287
|
|
|
* @throws \Exception |
288
|
|
|
*/ |
289
|
|
|
public function stopQuery(int $token): ResponseInterface |
290
|
|
|
{ |
291
|
|
|
$message = (new Message())->setQuery( |
292
|
|
|
new Query([QueryType::STOP]) |
293
|
|
|
); |
294
|
|
|
|
295
|
|
|
$this->writeQuery($token, $message); |
296
|
|
|
|
297
|
|
|
$response = $this->receiveResponse($token, $message); |
298
|
|
|
|
299
|
|
|
unset($this->activeTokens[$token]); |
300
|
|
|
|
301
|
|
|
return $response; |
302
|
|
|
} |
303
|
|
|
|
304
|
|
|
/** |
305
|
|
|
* @param int $token |
306
|
|
|
* @param MessageInterface $message |
307
|
|
|
* @return ResponseInterface |
308
|
|
|
* @throws \RuntimeException |
309
|
|
|
* @throws ConnectionException |
310
|
|
|
*/ |
311
|
22 |
|
private function receiveResponse(int $token, MessageInterface $message): ResponseInterface |
312
|
|
|
{ |
313
|
22 |
|
$responseHeader = $this->stream->read(4 + 8); |
314
|
22 |
|
$responseHeader = unpack('Vtoken/Vtoken2/Vsize', $responseHeader); |
315
|
22 |
|
$responseToken = $responseHeader['token']; |
316
|
22 |
|
if ($responseHeader['token2'] !== 0) { |
317
|
1 |
|
throw new ConnectionException('Invalid response from server: Invalid token.'); |
318
|
|
|
} |
319
|
|
|
|
320
|
21 |
|
$responseSize = $responseHeader['size']; |
321
|
21 |
|
$responseBuf = $this->stream->read($responseSize); |
322
|
|
|
|
323
|
|
|
/** @var ResponseInterface $response */ |
324
|
21 |
|
$response = $this->responseSerializer->deserialize($responseBuf, Response::class, 'json'); |
325
|
21 |
|
$this->validateResponse($response, $responseToken, $token, $message); |
326
|
|
|
|
327
|
16 |
|
return $response; |
328
|
|
|
} |
329
|
|
|
|
330
|
|
|
/** |
331
|
|
|
* @param ResponseInterface $response |
332
|
|
|
* @param int $responseToken |
333
|
|
|
* @param int $token |
334
|
|
|
* @param MessageInterface $message |
335
|
|
|
* @throws ConnectionException |
336
|
|
|
*/ |
337
|
21 |
|
private function validateResponse( |
338
|
|
|
ResponseInterface $response, |
339
|
|
|
int $responseToken, |
340
|
|
|
int $token, |
341
|
|
|
MessageInterface $message |
342
|
|
|
): void { |
343
|
21 |
|
if (!$response->getType()) { |
344
|
1 |
|
throw new ConnectionException('Response message has no type.'); |
345
|
|
|
} |
346
|
|
|
|
347
|
20 |
|
if ($response->getType() === ResponseType::CLIENT_ERROR) { |
348
|
1 |
|
throw new ConnectionException('Server says PHP-RQL is buggy: ' . $response->getData()[0]); |
349
|
|
|
} |
350
|
|
|
|
351
|
19 |
|
if ($responseToken !== $token) { |
352
|
1 |
|
throw new ConnectionException( |
353
|
|
|
'Received wrong token. Response does not match the request. ' |
354
|
1 |
|
. 'Expected ' . $token . ', received ' . $responseToken |
355
|
|
|
); |
356
|
|
|
} |
357
|
|
|
|
358
|
18 |
|
if ($response->getType() === ResponseType::COMPILE_ERROR) { |
359
|
1 |
|
throw new ConnectionException('Compile error: ' . $response->getData()[0] . ', jsonQuery: ' . json_encode($message)); |
360
|
|
|
} |
361
|
|
|
|
362
|
17 |
|
if ($response->getType() === ResponseType::RUNTIME_ERROR) { |
363
|
1 |
|
throw new ConnectionException('Runtime error: ' . $response->getData()[0] . ', jsonQuery: ' . json_encode($message)); |
364
|
|
|
} |
365
|
16 |
|
} |
366
|
|
|
|
367
|
|
|
/** |
368
|
|
|
* @inheritdoc |
369
|
|
|
*/ |
370
|
13 |
|
public function use(string $name): void |
371
|
|
|
{ |
372
|
13 |
|
$this->dbName = $name; |
373
|
13 |
|
} |
374
|
|
|
|
375
|
|
|
/** |
376
|
|
|
* @inheritdoc |
377
|
|
|
*/ |
378
|
33 |
|
public function isStreamOpen(): bool |
379
|
|
|
{ |
380
|
33 |
|
return ($this->stream !== null && $this->stream->isWritable()); |
381
|
|
|
} |
382
|
|
|
|
383
|
|
|
/** |
384
|
|
|
* @param MessageInterface $query |
385
|
|
|
* @return array |
386
|
|
|
*/ |
387
|
|
|
public function changes(MessageInterface $query): array |
388
|
|
|
{ |
389
|
|
|
// TODO: Implement changes() method. |
390
|
|
|
} |
391
|
|
|
|
392
|
|
|
/** |
393
|
|
|
* @return ResponseInterface |
394
|
|
|
* @throws \Exception |
395
|
|
|
*/ |
396
|
3 |
|
public function server(): ResponseInterface |
397
|
|
|
{ |
398
|
3 |
|
if (!$this->isStreamOpen()) { |
399
|
1 |
|
throw new ConnectionException('Not connected.'); |
400
|
|
|
} |
401
|
|
|
|
402
|
|
|
try { |
403
|
2 |
|
$token = $this->generateToken(); |
404
|
|
|
|
405
|
2 |
|
$query = new Message(QueryType::SERVER_INFO); |
406
|
2 |
|
$this->writeQuery($token, $query); |
407
|
|
|
|
408
|
|
|
// Await the response |
409
|
1 |
|
$response = $this->receiveResponse($token, $query); |
410
|
|
|
|
411
|
1 |
|
if ($response->getType() !== 5) { |
412
|
1 |
|
throw new ConnectionException('Unexpected response type for server query.'); |
413
|
|
|
} |
414
|
1 |
|
} catch (\Exception $e) { |
415
|
1 |
|
throw new ConnectionException($e->getMessage(), $e->getCode(), $e); |
416
|
|
|
} |
417
|
|
|
|
418
|
1 |
|
return $response; |
419
|
|
|
} |
420
|
|
|
|
421
|
|
|
/** |
422
|
|
|
* @param string $string |
423
|
|
|
* @return ResponseInterface |
424
|
|
|
* @throws ConnectionException |
425
|
|
|
*/ |
426
|
1 |
|
public function expr(string $string): ResponseInterface |
427
|
|
|
{ |
428
|
1 |
|
$message = new Message(); |
429
|
1 |
|
$message->setQueryType(QueryType::START) |
430
|
1 |
|
->setQuery(new Expr($string)); |
431
|
|
|
|
432
|
1 |
|
return $this->run($message); |
433
|
|
|
} |
434
|
|
|
} |
435
|
|
|
|
If you return a value from a function or method, it should be a sub-type of the type that is given by the parent type f.e. an interface, or abstract method. This is more formally defined by the Lizkov substitution principle, and guarantees that classes that depend on the parent type can use any instance of a child type interchangably. This principle also belongs to the SOLID principles for object oriented design.
Let’s take a look at an example:
Our function
my_function
expects aPost
object, and outputs the author of the post. The base classPost
returns a simple string and outputting a simple string will work just fine. However, the child classBlogPost
which is a sub-type ofPost
instead decided to return anobject
, and is therefore violating the SOLID principles. If aBlogPost
were passed tomy_function
, PHP would not complain, but ultimately fail when executing thestrtoupper
call in its body.