Passed
Push — master ( e44bbb...5bf3f3 )
by Timon
02:17
created

Connection::stopQuery()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 14
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
dl 0
loc 14
ccs 0
cts 7
cp 0
rs 9.4285
c 0
b 0
f 0
cc 1
eloc 7
nc 1
nop 1
crap 2
1
<?php
2
declare(strict_types=1);
3
4
namespace TBolier\RethinkQL\Connection;
5
6
use Psr\Http\Message\StreamInterface;
7
use Symfony\Component\Serializer\SerializerInterface;
8
use TBolier\RethinkQL\Connection\Socket\Exception;
9
use TBolier\RethinkQL\Connection\Socket\HandshakeInterface;
10
use TBolier\RethinkQL\Query\Expr;
11
use TBolier\RethinkQL\Query\Message;
12
use TBolier\RethinkQL\Query\MessageInterface;
13
use TBolier\RethinkQL\Query\Options as QueryOptions;
14
use TBolier\RethinkQL\Query\Query;
15
use TBolier\RethinkQL\Response\Cursor;
16
use TBolier\RethinkQL\Response\Response;
17
use TBolier\RethinkQL\Response\ResponseInterface;
18
use TBolier\RethinkQL\Types\Query\QueryType;
19
use TBolier\RethinkQL\Types\Response\ResponseType;
20
21
class Connection implements ConnectionInterface, ConnectionCursorInterface
22
{
23
    /**
24
     * @var int[]
25
     */
26
    private $activeTokens;
27
28
    /**
29
     * @var StreamInterface
30
     */
31
    private $stream;
32
33
    /**
34
     * @var string
35
     */
36
    private $dbName;
37
38
    /**
39
     * @var bool
40
     */
41
    private $noReply = false;
42
43
    /**
44
     * @var \Closure
45
     */
46
    private $streamWrapper;
47
48
    /**
49
     * @var HandshakeInterface
50
     */
51
    private $handshake;
52
53
    /**
54
     * @var SerializerInterface
55
     */
56
    private $querySerializer;
57
58
    /**
59
     * @var SerializerInterface
60
     */
61
    private $responseSerializer;
62
63
    /**
64
     * @param \Closure $streamWrapper
65
     * @param HandshakeInterface $handshake
66
     * @param string $dbName
67
     * @param SerializerInterface $querySerializer
68
     * @param SerializerInterface $responseSerializer
69
     */
70 14
    public function __construct(
71
        \Closure $streamWrapper,
72
        HandshakeInterface $handshake,
73
        string $dbName,
74
        SerializerInterface $querySerializer,
75
        SerializerInterface $responseSerializer
76
    ) {
77 14
        $this->streamWrapper = $streamWrapper;
78 14
        $this->dbName = $dbName;
79 14
        $this->handshake = $handshake;
80 14
        $this->querySerializer = $querySerializer;
81 14
        $this->responseSerializer = $responseSerializer;
82 14
    }
83
84
    /**
85
     * @inheritdoc
86
     * @throws ConnectionException
87
     */
88 14
    public function connect(): self
89
    {
90 14
        if ($this->isStreamOpen()) {
91 12
            return $this;
92
        }
93
94
        try {
95 14
            $this->stream = ($this->streamWrapper)();
96 14
            $this->handshake->hello($this->stream);
97
        } catch (\Exception $e) {
98
            throw new ConnectionException($e->getMessage(), $e->getCode(), $e);
99
        }
100
101 14
        return $this;
102
    }
103
104
    /**
105
     * @param bool $noReplyWait
106
     * @throws ConnectionException
107
     * @throws \Exception
108
     */
109
    public function close($noReplyWait = true): void
110
    {
111
        if (!$this->isStreamOpen()) {
112
            throw new ConnectionException('Not connected.');
113
        }
114
115
        if ($noReplyWait) {
116
            $this->noReplyWait();
117
        }
118
119
        $this->stream->close();
120
    }
121
122
    /**
123
     * @throws ConnectionException
124
     * @throws \Exception
125
     */
126
    private function noReplyWait(): void
127
    {
128
        if (!$this->isStreamOpen()) {
129
            throw new ConnectionException('Not connected.');
130
        }
131
132
        try {
133
            $token = $this->generateToken();
134
135
            $query = new Message(QueryType::NOREPLY_WAIT);
136
            $this->writeQuery($token, $query);
137
138
            // Await the response
139
            $response = $this->receiveResponse($token, $query);
140
141
            if ($response->getType() !== 4) {
142
                throw new ConnectionException('Unexpected response type for noreplyWait query.');
143
            }
144
        } catch (\Exception $e) {
145
            throw new ConnectionException($e->getMessage(), $e->getCode(), $e);
146
        }
147
    }
148
149
    /**
150
     * @param MessageInterface $message
151
     * @param bool $raw
152
     * @return ResponseInterface|Cursor
153
     * @throws ConnectionException
154
     */
155 13
    public function run(MessageInterface $message, $raw = false)
156
    {
157 13
        if (!$this->isStreamOpen()) {
158
            throw new ConnectionException('Not connected.');
159
        }
160
161
        try {
162 13
            $token = $this->generateToken();
163
164 13
            $this->writeQuery($token, $message);
165
166 13
            if ($this->noReply) {
167
                return;
168
            }
169
170 13
            $response = $this->receiveResponse($token, $message);
171
172 13
            if ($response->getType() === ResponseType::SUCCESS_PARTIAL) {
173
                $this->activeTokens[$token] = true;
174
            }
175
176 13
            if ($raw || $response->getType() === ResponseType::SUCCESS_ATOM) {
177 13
                return $response;
178
            }
179
180 2
            return $this->createCursorFromResponse($response, $token, $message);
0 ignored issues
show
Bug Best Practice introduced by
The return type of return $this->createCurs...nse, $token, $message); (TBolier\RethinkQL\Response\Cursor) is incompatible with the return type declared by the interface TBolier\RethinkQL\Connec...onnectionInterface::run of type TBolier\RethinkQL\Response\ResponseInterface.

If you return a value from a function or method, it should be a sub-type of the type that is given by the parent type f.e. an interface, or abstract method. This is more formally defined by the Lizkov substitution principle, and guarantees that classes that depend on the parent type can use any instance of a child type interchangably. This principle also belongs to the SOLID principles for object oriented design.

Let’s take a look at an example:

class Author {
    private $name;

    public function __construct($name) {
        $this->name = $name;
    }

    public function getName() {
        return $this->name;
    }
}

abstract class Post {
    public function getAuthor() {
        return 'Johannes';
    }
}

class BlogPost extends Post {
    public function getAuthor() {
        return new Author('Johannes');
    }
}

class ForumPost extends Post { /* ... */ }

function my_function(Post $post) {
    echo strtoupper($post->getAuthor());
}

Our function my_function expects a Post object, and outputs the author of the post. The base class Post returns a simple string and outputting a simple string will work just fine. However, the child class BlogPost which is a sub-type of Post instead decided to return an object, and is therefore violating the SOLID principles. If a BlogPost were passed to my_function, PHP would not complain, but ultimately fail when executing the strtoupper call in its body.

Loading history...
181
        } catch (\Exception $e) {
182
            throw new ConnectionException($e->getMessage(), $e->getCode(), $e);
183
        }
184
    }
185
186
    /**
187
     * @inheritdoc
188
     * @throws ConnectionException
189
     */
190 1
    public function rewindFromCursor(MessageInterface $message): ResponseInterface
191
    {
192 1
        return $this->run($message, true);
193
    }
194
195
    /**
196
     * @param ResponseInterface $response
197
     * @param int $token
198
     * @param MessageInterface $message
199
     * @return Cursor
200
     */
201 2
    private function createCursorFromResponse(ResponseInterface $response, int $token, MessageInterface $message): Cursor
202
    {
203 2
        return new Cursor($this, $token, $response, $message);
204
    }
205
206
    /**
207
     * @param MessageInterface $query
208
     * @return array
209
     * @throws ConnectionException
210
     */
211
    public function runNoReply(MessageInterface $query): array
212
    {
213
        $this->noReply = true;
214
        $this->run($query);
215
        $this->noReply = false;
216
    }
217
218
    /**
219
     * @return int
220
     * @throws \Exception
221
     */
222 14
    private function generateToken(): int
223
    {
224
        try {
225 14
            $tries = 0;
226 14
            $maxToken = 1 << 30;
227
            do {
228 14
                $token = \random_int(0, $maxToken);
229 14
                $haveCollision = isset($this->activeTokens[$token]);
230 14
            } while ($haveCollision && $tries++ < 1024);
231 14
            if ($haveCollision) {
232 14
                throw new \Exception('Unable to generate a unique token for the query.');
233
            }
234
        } catch (\Exception $e) {
235
            throw new ConnectionException('Generating the token failed.', $e->getCode(), $e);
236
        }
237
238 14
        return $token;
239
    }
240
241
    /**
242
     * @inheritdoc
243
     * @throws \Exception
244
     */
245 14
    public function writeQuery(int $token, MessageInterface $message): int
246
    {
247 14
        $message->setOptions((new QueryOptions())->setDb($this->dbName));
248
249
        try {
250 14
            $request = $this->querySerializer->serialize($message, 'json');
251
        } catch (\Exception $e) {
252
            throw new Exception('Serializing query message failed.', $e->getCode(), $e);
253
        }
254
255 14
        $requestSize = pack('V', \strlen($request));
256 14
        $binaryToken = pack('V', $token) . pack('V', 0);
257
258 14
        return $this->stream->write($binaryToken . $requestSize . $request);
259
    }
260
261
    /**
262
     * @inheritdoc
263
     * @throws \Exception
264
     */
265
    public function continueQuery(int $token): ResponseInterface
266
    {
267
        $message = (new Message())->setQuery(
268
            new Query([QueryType::CONTINUE])
269
        );
270
271
        $this->writeQuery($token, $message);
272
273
        // Await the response
274
        $response = $this->receiveResponse($token, $message);
275
276
        if ($response->getType() !== ResponseType::SUCCESS_PARTIAL) {
277
            unset($this->activeTokens[$token]);
278
        }
279
280
        return $response;
281
    }
282
283
    /**
284
     * @inheritdoc
285
     * @throws \Exception
286
     */
287
    public function stopQuery(int $token): ResponseInterface
288
    {
289
        $message = (new Message())->setQuery(
290
            new Query([QueryType::STOP])
291
        );
292
293
        $this->writeQuery($token, $message);
294
295
        $response = $this->receiveResponse($token, $message);
296
297
        unset($this->activeTokens[$token]);
298
299
        return $response;
300
    }
301
302
    /**
303
     * @param int $token
304
     * @param MessageInterface $message
305
     * @return ResponseInterface
306
     * @throws \RuntimeException
307
     * @throws ConnectionException
308
     */
309 14
    private function receiveResponse(int $token, MessageInterface $message): ResponseInterface
310
    {
311 14
        $responseHeader = $this->stream->read(4 + 8);
312 14
        $responseHeader = unpack('Vtoken/Vtoken2/Vsize', $responseHeader);
313 14
        $responseToken = $responseHeader['token'];
314 14
        if ($responseHeader['token2'] !== 0) {
315
            throw new ConnectionException('Invalid response from server: Invalid token.');
316
        }
317
318 14
        $responseSize = $responseHeader['size'];
319 14
        $responseBuf = $this->stream->read($responseSize);
320
321
        /** @var ResponseInterface $response */
322 14
        $response = $this->responseSerializer->deserialize($responseBuf, Response::class, 'json');
323 14
        $this->validateResponse($response, $responseToken, $token, $message);
324
325 14
        return $response;
326
    }
327
328
    /**
329
     * @param ResponseInterface $response
330
     * @param int $responseToken
331
     * @param int $token
332
     * @param MessageInterface $message
333
     * @throws ConnectionException
334
     */
335 14
    private function validateResponse(
336
        ResponseInterface $response,
337
        int $responseToken,
338
        int $token,
339
        MessageInterface $message
340
    ): void {
341 14
        if (!$response->getType()) {
342
            throw new ConnectionException('Response message has no type.');
343
        }
344
345 14
        if ($response->getType() === ResponseType::CLIENT_ERROR) {
346
            throw new ConnectionException('Server says PHP-RQL is buggy: ' . $response->getData()[0]);
347
        }
348
349 14
        if ($responseToken !== $token) {
350
            throw new ConnectionException(
351
                'Received wrong token. Response does not match the request. '
352
                . 'Expected ' . $token . ', received ' . $responseToken
353
            );
354
        }
355
356 14
        if ($response->getType() === ResponseType::COMPILE_ERROR) {
357
            throw new ConnectionException('Compile error: ' . $response->getData()[0] . ', jsonQuery: ' . json_encode($message));
358
        }
359
360 14
        if ($response->getType() === ResponseType::RUNTIME_ERROR) {
361
            throw new ConnectionException('Runtime error: ' . $response->getData()[0] . ', jsonQuery: ' . json_encode($message));
362
        }
363 14
    }
364
365
    /**
366
     * @inheritdoc
367
     */
368 12
    public function use(string $name): void
369
    {
370 12
        $this->dbName = $name;
371 12
    }
372
373
    /**
374
     * @inheritdoc
375
     */
376 14
    public function isStreamOpen(): bool
377
    {
378 14
        return ($this->stream !== null && $this->stream->isWritable());
379
    }
380
381
    /**
382
     * @param MessageInterface $query
383
     * @return array
384
     */
385
    public function changes(MessageInterface $query): array
386
    {
387
        // TODO: Implement changes() method.
388
    }
389
390
    /**
391
     * @return ResponseInterface
392
     * @throws \Exception
393
     */
394 1
    public function server(): ResponseInterface
395
    {
396 1
        if (!$this->isStreamOpen()) {
397
            throw new ConnectionException('Not connected.');
398
        }
399
400
        try {
401 1
            $token = $this->generateToken();
402
403 1
            $query = new Message(QueryType::SERVER_INFO);
404 1
            $this->writeQuery($token, $query);
405
406
            // Await the response
407 1
            $response = $this->receiveResponse($token, $query);
408
409 1
            if ($response->getType() !== 5) {
410 1
                throw new ConnectionException('Unexpected response type for server query.');
411
            }
412
        } catch (\Exception $e) {
413
            throw new ConnectionException($e->getMessage(), $e->getCode(), $e);
414
        }
415
416 1
        return $response;
417
    }
418
419
    /**
420
     * @param string $string
421
     * @return ResponseInterface
422
     * @throws ConnectionException
423
     */
424 1
    public function expr(string $string): ResponseInterface
425
    {
426 1
        $message = new Message();
427 1
        $message->setQueryType(QueryType::START)
428 1
            ->setQuery(new Expr($string));
429
430 1
        return $this->run($message);
431
    }
432
}
433