1
|
|
|
<?php |
2
|
|
|
declare(strict_types=1); |
3
|
|
|
|
4
|
|
|
namespace TBolier\RethinkQL\Connection; |
5
|
|
|
|
6
|
|
|
use Psr\Http\Message\StreamInterface; |
7
|
|
|
use Symfony\Component\Serializer\SerializerInterface; |
8
|
|
|
use TBolier\RethinkQL\Connection\Socket\Exception; |
9
|
|
|
use TBolier\RethinkQL\Connection\Socket\HandshakeInterface; |
10
|
|
|
use TBolier\RethinkQL\Message\ExprMessage; |
11
|
|
|
use TBolier\RethinkQL\Message\Message; |
12
|
|
|
use TBolier\RethinkQL\Message\MessageInterface; |
13
|
|
|
use TBolier\RethinkQL\Query\Options as QueryOptions; |
14
|
|
|
use TBolier\RethinkQL\Response\Cursor; |
15
|
|
|
use TBolier\RethinkQL\Response\Response; |
16
|
|
|
use TBolier\RethinkQL\Response\ResponseInterface; |
17
|
|
|
use TBolier\RethinkQL\Types\Query\QueryType; |
18
|
|
|
use TBolier\RethinkQL\Types\Response\ResponseType; |
19
|
|
|
|
20
|
|
|
class Connection implements ConnectionInterface, ConnectionCursorInterface |
21
|
|
|
{ |
22
|
|
|
/** |
23
|
|
|
* @var int[] |
24
|
|
|
*/ |
25
|
|
|
private $activeTokens; |
26
|
|
|
/** |
27
|
|
|
* @var string |
28
|
|
|
*/ |
29
|
|
|
private $dbName; |
30
|
|
|
/** |
31
|
|
|
* @var HandshakeInterface |
32
|
|
|
*/ |
33
|
|
|
private $handshake; |
34
|
|
|
/** |
35
|
|
|
* @var bool |
36
|
|
|
*/ |
37
|
|
|
private $noReply = false; |
38
|
|
|
/** |
39
|
|
|
* @var SerializerInterface |
40
|
|
|
*/ |
41
|
|
|
private $querySerializer; |
42
|
|
|
/** |
43
|
|
|
* @var SerializerInterface |
44
|
|
|
*/ |
45
|
|
|
private $responseSerializer; |
46
|
|
|
/** |
47
|
|
|
* @var StreamInterface |
48
|
|
|
*/ |
49
|
|
|
private $stream; |
50
|
|
|
/** |
51
|
|
|
* @var \Closure |
52
|
|
|
*/ |
53
|
|
|
private $streamWrapper; |
54
|
|
|
|
55
|
|
|
/** |
56
|
|
|
* @param \Closure $streamWrapper |
57
|
|
|
* @param HandshakeInterface $handshake |
58
|
|
|
* @param string $dbName |
59
|
|
|
* @param SerializerInterface $querySerializer |
60
|
|
|
* @param SerializerInterface $responseSerializer |
61
|
|
|
*/ |
62
|
32 |
|
public function __construct( |
63
|
|
|
\Closure $streamWrapper, |
64
|
|
|
HandshakeInterface $handshake, |
65
|
|
|
string $dbName, |
66
|
|
|
SerializerInterface $querySerializer, |
67
|
|
|
SerializerInterface $responseSerializer |
68
|
|
|
) { |
69
|
32 |
|
$this->streamWrapper = $streamWrapper; |
70
|
32 |
|
$this->dbName = $dbName; |
71
|
32 |
|
$this->handshake = $handshake; |
72
|
32 |
|
$this->querySerializer = $querySerializer; |
73
|
32 |
|
$this->responseSerializer = $responseSerializer; |
74
|
32 |
|
} |
75
|
|
|
|
76
|
|
|
/** |
77
|
|
|
* @param MessageInterface $query |
78
|
|
|
* @return void |
79
|
|
|
*/ |
80
|
1 |
|
public function changes(MessageInterface $query): void |
81
|
|
|
{ |
82
|
|
|
// TODO: Implement changes() method. |
83
|
1 |
|
} |
84
|
|
|
|
85
|
|
|
/** |
86
|
|
|
* @param bool $noreplyWait |
87
|
|
|
* @throws \Exception |
88
|
|
|
*/ |
89
|
2 |
|
public function close($noreplyWait = true): void |
90
|
|
|
{ |
91
|
2 |
|
if ($noreplyWait) { |
92
|
1 |
|
$this->noreplyWait(); |
93
|
|
|
} |
94
|
|
|
|
95
|
2 |
|
$this->stream->close(); |
96
|
2 |
|
} |
97
|
|
|
|
98
|
|
|
/** |
99
|
|
|
* @inheritdoc |
100
|
|
|
* @throws ConnectionException |
101
|
|
|
*/ |
102
|
28 |
|
public function connect(): self |
103
|
|
|
{ |
104
|
28 |
|
if ($this->stream !== null && $this->stream->isWritable()) { |
105
|
13 |
|
return $this; |
106
|
|
|
} |
107
|
|
|
|
108
|
|
|
try { |
109
|
28 |
|
$this->stream = ($this->streamWrapper)(); |
110
|
28 |
|
$this->handshake->hello($this->stream); |
111
|
1 |
|
} catch (\Exception $e) { |
112
|
1 |
|
throw new ConnectionException($e->getMessage(), $e->getCode(), $e); |
113
|
|
|
} |
114
|
|
|
|
115
|
27 |
|
return $this; |
116
|
|
|
} |
117
|
|
|
|
118
|
|
|
/** |
119
|
|
|
* @inheritdoc |
120
|
|
|
* @throws \Exception |
121
|
|
|
*/ |
122
|
|
|
public function reconnect($noreplyWait = true): Connection |
123
|
|
|
{ |
124
|
|
|
$this->close($noreplyWait); |
125
|
|
|
|
126
|
|
|
return $this->connect(); |
127
|
|
|
} |
128
|
|
|
|
129
|
|
|
/** |
130
|
|
|
* @inheritdoc |
131
|
|
|
* @throws \Exception |
132
|
|
|
*/ |
133
|
1 |
|
public function continueQuery(int $token): ResponseInterface |
134
|
|
|
{ |
135
|
1 |
|
$message = (new Message())->setQuery( |
136
|
1 |
|
[QueryType::CONTINUE] |
137
|
|
|
); |
138
|
|
|
|
139
|
1 |
|
$this->writeQuery($token, $message); |
140
|
|
|
|
141
|
|
|
// Await the response |
142
|
1 |
|
$response = $this->receiveResponse($token, $message); |
143
|
|
|
|
144
|
1 |
|
if ($response->getType() !== ResponseType::SUCCESS_PARTIAL) { |
145
|
1 |
|
unset($this->activeTokens[$token]); |
146
|
|
|
} |
147
|
|
|
|
148
|
1 |
|
return $response; |
149
|
|
|
} |
150
|
|
|
|
151
|
|
|
/** |
152
|
|
|
* @param string $string |
153
|
|
|
* @return ResponseInterface |
154
|
|
|
* @throws ConnectionException |
155
|
|
|
*/ |
156
|
2 |
|
public function expr(string $string): ResponseInterface |
157
|
|
|
{ |
158
|
2 |
|
return $this->run(new ExprMessage(QueryType::START, 'foo')); |
159
|
|
|
} |
160
|
|
|
|
161
|
|
|
/** |
162
|
|
|
* @inheritdoc |
163
|
|
|
* @throws ConnectionException |
164
|
|
|
*/ |
165
|
1 |
|
public function rewindFromCursor(MessageInterface $message): ResponseInterface |
166
|
|
|
{ |
167
|
1 |
|
return $this->run($message, true); |
168
|
|
|
} |
169
|
|
|
|
170
|
|
|
/** |
171
|
|
|
* @inheritdoc |
172
|
|
|
* @throws ConnectionException |
173
|
|
|
*/ |
174
|
20 |
|
public function run(MessageInterface $message, $raw = false) |
175
|
|
|
{ |
176
|
|
|
try { |
177
|
20 |
|
$token = $this->generateToken(); |
178
|
|
|
|
179
|
20 |
|
$this->writeQuery($token, $message); |
180
|
|
|
|
181
|
20 |
|
if ($this->noReply) { |
182
|
1 |
|
return null; |
183
|
|
|
} |
184
|
|
|
|
185
|
19 |
|
$response = $this->receiveResponse($token, $message); |
186
|
|
|
|
187
|
19 |
|
if ($response->getType() === ResponseType::SUCCESS_PARTIAL) { |
188
|
1 |
|
$this->activeTokens[$token] = true; |
189
|
|
|
} |
190
|
|
|
|
191
|
19 |
|
if ($raw || $response->getType() === ResponseType::SUCCESS_ATOM) { |
192
|
17 |
|
return $response; |
193
|
|
|
} |
194
|
|
|
|
195
|
3 |
|
return $this->createCursorFromResponse($response, $token, $message); |
|
|
|
|
196
|
|
|
} catch (\Exception $e) { |
197
|
|
|
throw new ConnectionException($e->getMessage(), $e->getCode(), $e); |
198
|
|
|
} |
199
|
|
|
} |
200
|
|
|
|
201
|
|
|
/** |
202
|
|
|
* @inheritdoc |
203
|
|
|
* @throws ConnectionException |
204
|
|
|
*/ |
205
|
1 |
|
public function runNoReply(MessageInterface $query): void |
206
|
|
|
{ |
207
|
1 |
|
$this->noReply = true; |
208
|
1 |
|
$this->run($query); |
209
|
1 |
|
$this->noReply = false; |
210
|
1 |
|
} |
211
|
|
|
|
212
|
|
|
/** |
213
|
|
|
* @inheritdoc |
214
|
|
|
* @throws \Exception |
215
|
|
|
*/ |
216
|
2 |
|
public function server(): ResponseInterface |
217
|
|
|
{ |
218
|
|
|
try { |
219
|
2 |
|
$token = $this->generateToken(); |
220
|
|
|
|
221
|
2 |
|
$query = new Message(QueryType::SERVER_INFO); |
222
|
2 |
|
$this->writeQuery($token, $query); |
223
|
|
|
|
224
|
2 |
|
$response = $this->receiveResponse($token, $query); |
225
|
|
|
|
226
|
2 |
|
if ($response->getType() !== 5) { |
227
|
2 |
|
throw new ConnectionException('Unexpected response type for server query.'); |
228
|
|
|
} |
229
|
|
|
} catch (\Exception $e) { |
230
|
|
|
throw new ConnectionException($e->getMessage(), $e->getCode(), $e); |
231
|
|
|
} |
232
|
|
|
|
233
|
2 |
|
return $response; |
234
|
|
|
} |
235
|
|
|
|
236
|
|
|
/** |
237
|
|
|
* @inheritdoc |
238
|
|
|
* @throws \Exception |
239
|
|
|
*/ |
240
|
1 |
|
public function stopQuery(int $token): ResponseInterface |
241
|
|
|
{ |
242
|
1 |
|
$message = (new Message())->setQuery( |
243
|
1 |
|
[QueryType::STOP] |
244
|
|
|
); |
245
|
|
|
|
246
|
1 |
|
$this->writeQuery($token, $message); |
247
|
|
|
|
248
|
1 |
|
$response = $this->receiveResponse($token, $message); |
249
|
|
|
|
250
|
1 |
|
unset($this->activeTokens[$token]); |
251
|
|
|
|
252
|
1 |
|
return $response; |
253
|
|
|
} |
254
|
|
|
|
255
|
|
|
/** |
256
|
|
|
* @inheritdoc |
257
|
|
|
*/ |
258
|
14 |
|
public function use(string $name): void |
259
|
|
|
{ |
260
|
14 |
|
$this->dbName = $name; |
261
|
14 |
|
} |
262
|
|
|
|
263
|
|
|
/** |
264
|
|
|
* @inheritdoc |
265
|
|
|
* @throws \Exception |
266
|
|
|
*/ |
267
|
26 |
|
public function writeQuery(int $token, MessageInterface $message): int |
268
|
|
|
{ |
269
|
26 |
|
if ($this->dbName) { |
270
|
26 |
|
$message->setOptions((new QueryOptions())->setDb($this->dbName)); |
271
|
|
|
} |
272
|
|
|
|
273
|
|
|
try { |
274
|
26 |
|
$request = $this->querySerializer->serialize($message, 'json'); |
275
|
|
|
} catch (\Exception $e) { |
276
|
|
|
throw new Exception('Serializing query message failed.', $e->getCode(), $e); |
277
|
|
|
} |
278
|
|
|
|
279
|
26 |
|
$requestSize = pack('V', \strlen($request)); |
280
|
26 |
|
$binaryToken = pack('V', $token) . pack('V', 0); |
281
|
|
|
|
282
|
26 |
|
return $this->stream->write($binaryToken . $requestSize . $request); |
283
|
|
|
} |
284
|
|
|
|
285
|
|
|
/** |
286
|
|
|
* @inheritdoc |
287
|
|
|
* @throws ConnectionException |
288
|
|
|
* @throws \Exception |
289
|
|
|
*/ |
290
|
1 |
|
public function noreplyWait(): void |
291
|
|
|
{ |
292
|
|
|
try { |
293
|
1 |
|
$token = $this->generateToken(); |
294
|
|
|
|
295
|
1 |
|
$query = new Message(QueryType::NOREPLY_WAIT); |
296
|
1 |
|
$this->writeQuery($token, $query); |
297
|
|
|
|
298
|
1 |
|
$response = $this->receiveResponse($token, $query); |
299
|
|
|
|
300
|
1 |
|
if ($response->getType() !== 4) { |
301
|
1 |
|
throw new ConnectionException('Unexpected response type for noreplyWait query.'); |
302
|
|
|
} |
303
|
|
|
} catch (\Exception $e) { |
304
|
|
|
throw new ConnectionException($e->getMessage(), $e->getCode(), $e); |
305
|
|
|
} |
306
|
1 |
|
} |
307
|
|
|
|
308
|
|
|
/** |
309
|
|
|
* @param ResponseInterface $response |
310
|
|
|
* @param int $token |
311
|
|
|
* @param MessageInterface $message |
312
|
|
|
* @return Cursor |
313
|
|
|
*/ |
314
|
3 |
|
private function createCursorFromResponse( |
315
|
|
|
ResponseInterface $response, |
316
|
|
|
int $token, |
317
|
|
|
MessageInterface $message |
318
|
|
|
): Iterable { |
319
|
3 |
|
return new Cursor($this, $token, $response, $message); |
320
|
|
|
} |
321
|
|
|
|
322
|
|
|
/** |
323
|
|
|
* @return int |
324
|
|
|
* @throws \Exception |
325
|
|
|
*/ |
326
|
23 |
|
private function generateToken(): int |
327
|
|
|
{ |
328
|
|
|
try { |
329
|
23 |
|
$tries = 0; |
330
|
23 |
|
$maxToken = 1 << 30; |
331
|
|
|
do { |
332
|
23 |
|
$token = random_int(0, $maxToken); |
333
|
23 |
|
$haveCollision = isset($this->activeTokens[$token]); |
334
|
23 |
|
} while ($haveCollision && $tries++ < 1024); |
335
|
23 |
|
if ($haveCollision) { |
336
|
23 |
|
throw new \Exception('Unable to generate a unique token for the query.'); |
337
|
|
|
} |
338
|
|
|
} catch (\Exception $e) { |
339
|
|
|
throw new ConnectionException('Generating the token failed.', $e->getCode(), $e); |
340
|
|
|
} |
341
|
|
|
|
342
|
23 |
|
return $token; |
343
|
|
|
} |
344
|
|
|
|
345
|
|
|
/** |
346
|
|
|
* @param int $token |
347
|
|
|
* @param MessageInterface $message |
348
|
|
|
* @return ResponseInterface |
349
|
|
|
* @throws \RuntimeException |
350
|
|
|
* @throws ConnectionException |
351
|
|
|
*/ |
352
|
24 |
|
private function receiveResponse(int $token, MessageInterface $message): ResponseInterface |
353
|
|
|
{ |
354
|
24 |
|
$responseHeader = $this->stream->read(4 + 8); |
355
|
24 |
|
if (empty($responseHeader)) { |
356
|
|
|
throw new ConnectionException('Empty response headers received from server.'); |
357
|
|
|
} |
358
|
|
|
|
359
|
24 |
|
$responseHeader = unpack('Vtoken/Vtoken2/Vsize', $responseHeader); |
360
|
24 |
|
$responseToken = $responseHeader['token']; |
361
|
24 |
|
if ($responseHeader['token2'] !== 0) { |
362
|
|
|
throw new ConnectionException('Invalid response from server: Invalid token.'); |
363
|
|
|
} |
364
|
|
|
|
365
|
24 |
|
$responseSize = $responseHeader['size']; |
366
|
24 |
|
$responseBuf = $this->stream->read($responseSize); |
367
|
|
|
|
368
|
|
|
/** @var ResponseInterface $response */ |
369
|
24 |
|
$response = $this->responseSerializer->deserialize($responseBuf, Response::class, 'json'); |
370
|
24 |
|
$this->validateResponse($response, $responseToken, $token, $message); |
371
|
|
|
|
372
|
24 |
|
return $response; |
373
|
|
|
} |
374
|
|
|
|
375
|
|
|
/** |
376
|
|
|
* @param ResponseInterface $response |
377
|
|
|
* @param int $responseToken |
378
|
|
|
* @param int $token |
379
|
|
|
* @param MessageInterface $message |
380
|
|
|
* @throws ConnectionException |
381
|
|
|
*/ |
382
|
24 |
|
private function validateResponse( |
383
|
|
|
ResponseInterface $response, |
384
|
|
|
int $responseToken, |
385
|
|
|
int $token, |
386
|
|
|
MessageInterface $message |
387
|
|
|
): void { |
388
|
24 |
|
if (!$response->getType()) { |
389
|
|
|
throw new ConnectionException('Response message has no type.'); |
390
|
|
|
} |
391
|
|
|
|
392
|
24 |
|
if ($response->getType() === ResponseType::CLIENT_ERROR) { |
393
|
|
|
throw new ConnectionException('Client error: ' . $response->getData()[0] . ' jsonQuery: ' . json_encode($message)); |
394
|
|
|
} |
395
|
|
|
|
396
|
24 |
|
if ($responseToken !== $token) { |
397
|
|
|
throw new ConnectionException( |
398
|
|
|
'Received wrong token. Response does not match the request. ' |
399
|
|
|
. 'Expected ' . $token . ', received ' . $responseToken |
400
|
|
|
); |
401
|
|
|
} |
402
|
|
|
|
403
|
24 |
|
if ($response->getType() === ResponseType::COMPILE_ERROR) { |
404
|
|
|
throw new ConnectionException('Compile error: ' . $response->getData()[0] . ', jsonQuery: ' . json_encode($message)); |
405
|
|
|
} |
406
|
|
|
|
407
|
24 |
|
if ($response->getType() === ResponseType::RUNTIME_ERROR) { |
408
|
|
|
throw new ConnectionException('Runtime error: ' . $response->getData()[0] . ', jsonQuery: ' . json_encode($message)); |
409
|
|
|
} |
410
|
24 |
|
} |
411
|
|
|
} |
412
|
|
|
|
If you return a value from a function or method, it should be a sub-type of the type that is given by the parent type f.e. an interface, or abstract method. This is more formally defined by the Lizkov substitution principle, and guarantees that classes that depend on the parent type can use any instance of a child type interchangably. This principle also belongs to the SOLID principles for object oriented design.
Let’s take a look at an example:
Our function
my_function
expects aPost
object, and outputs the author of the post. The base classPost
returns a simple string and outputting a simple string will work just fine. However, the child classBlogPost
which is a sub-type ofPost
instead decided to return anobject
, and is therefore violating the SOLID principles. If aBlogPost
were passed tomy_function
, PHP would not complain, but ultimately fail when executing thestrtoupper
call in its body.