Passed
Branch master (69e5ee)
by Timon
05:22
created

Connection::writeQuery()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 15
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 8
CRAP Score 2

Importance

Changes 0
Metric Value
cc 2
eloc 9
nc 2
nop 2
dl 0
loc 15
rs 9.4285
c 0
b 0
f 0
ccs 8
cts 8
cp 1
crap 2
1
<?php
2
declare(strict_types=1);
3
4
namespace TBolier\RethinkQL\Connection;
5
6
use Psr\Http\Message\StreamInterface;
7
use Symfony\Component\Serializer\SerializerInterface;
8
use TBolier\RethinkQL\Connection\Socket\Exception;
9
use TBolier\RethinkQL\Connection\Socket\HandshakeInterface;
10
use TBolier\RethinkQL\Query\Expr;
11
use TBolier\RethinkQL\Query\Message;
12
use TBolier\RethinkQL\Query\MessageInterface;
13
use TBolier\RethinkQL\Query\Options as QueryOptions;
14
use TBolier\RethinkQL\Query\Query;
15
use TBolier\RethinkQL\Response\Cursor;
16
use TBolier\RethinkQL\Response\Response;
17
use TBolier\RethinkQL\Response\ResponseInterface;
18
use TBolier\RethinkQL\Types\Query\QueryType;
19
use TBolier\RethinkQL\Types\Response\ResponseType;
20
21
class Connection implements ConnectionInterface, ConnectionCursorInterface
22
{
23
    /**
24
     * @var int[]
25
     */
26
    private $activeTokens;
27
28
    /**
29
     * @var StreamInterface
30
     */
31
    private $stream;
32
33
    /**
34
     * @var string
35
     */
36
    private $dbName;
37
38
    /**
39
     * @var bool
40
     */
41
    private $noReply = false;
42
43
    /**
44
     * @var \Closure
45
     */
46
    private $streamWrapper;
47
48
    /**
49
     * @var HandshakeInterface
50
     */
51
    private $handshake;
52
53
    /**
54
     * @var SerializerInterface
55
     */
56
    private $querySerializer;
57
58
    /**
59
     * @var SerializerInterface
60
     */
61
    private $responseSerializer;
62
63
    /**
64
     * @param \Closure $streamWrapper
65
     * @param HandshakeInterface $handshake
66
     * @param string $dbName
67
     * @param SerializerInterface $querySerializer
68
     * @param SerializerInterface $responseSerializer
69
     */
70 36
    public function __construct(
71
        \Closure $streamWrapper,
72
        HandshakeInterface $handshake,
73
        string $dbName,
74
        SerializerInterface $querySerializer,
75
        SerializerInterface $responseSerializer
76
    ) {
77 36
        $this->streamWrapper = $streamWrapper;
78 36
        $this->dbName = $dbName;
79 36
        $this->handshake = $handshake;
80 36
        $this->querySerializer = $querySerializer;
81 36
        $this->responseSerializer = $responseSerializer;
82 36
    }
83
84
    /**
85
     * @inheritdoc
86
     * @throws ConnectionException
87
     */
88 30
    public function connect(): self
89
    {
90 30
        if ($this->isStreamOpen()) {
91 13
            return $this;
92
        }
93
94
        try {
95 30
            $this->stream = ($this->streamWrapper)();
96 30
            $this->handshake->hello($this->stream);
97 1
        } catch (\Exception $e) {
98 1
            throw new ConnectionException($e->getMessage(), $e->getCode(), $e);
99
        }
100
101 29
        return $this;
102
    }
103
104
    /**
105
     * @param bool $noReplyWait
106
     * @throws ConnectionException
107
     * @throws \Exception
108
     */
109 6
    public function close($noReplyWait = true): void
110
    {
111 6
        if (!$this->isStreamOpen()) {
112 1
            throw new ConnectionException('Not connected.');
113
        }
114
115 5
        if ($noReplyWait) {
116 2
            $this->noReplyWait();
117
        }
118
119 4
        $this->stream->close();
120 4
    }
121
122
    /**
123
     * @throws ConnectionException
124
     * @throws \Exception
125
     */
126 2
    private function noReplyWait(): void
127
    {
128 2
        if (!$this->isStreamOpen()) {
129
            throw new ConnectionException('Not connected.');
130
        }
131
132
        try {
133 2
            $token = $this->generateToken();
134
135 1
            $query = new Message(QueryType::NOREPLY_WAIT);
136 1
            $this->writeQuery($token, $query);
137
138
            // Await the response
139 1
            $response = $this->receiveResponse($token, $query);
140
141 1
            if ($response->getType() !== 4) {
142 1
                throw new ConnectionException('Unexpected response type for noreplyWait query.');
143
            }
144 1
        } catch (\Exception $e) {
145 1
            throw new ConnectionException($e->getMessage(), $e->getCode(), $e);
146
        }
147 1
    }
148
149
    /**
150
     * @param MessageInterface $message
151
     * @param bool $raw
152
     * @return ResponseInterface|Cursor
153
     * @throws ConnectionException
154
     */
155 24
    public function run(MessageInterface $message, $raw = false)
156
    {
157 24
        if (!$this->isStreamOpen()) {
158 1
            throw new ConnectionException('Not connected.');
159
        }
160
161
        try {
162 23
            $token = $this->generateToken();
163
164 22
            $this->writeQuery($token, $message);
165
166 21
            if ($this->noReply) {
167 1
                return;
168
            }
169
170 20
            $response = $this->receiveResponse($token, $message);
171
172 14
            if ($response->getType() === ResponseType::SUCCESS_PARTIAL) {
173 1
                $this->activeTokens[$token] = true;
174
            }
175
176 14
            if ($raw || $response->getType() === ResponseType::SUCCESS_ATOM) {
177 13
                return $response;
178
            }
179
180 3
            return $this->createCursorFromResponse($response, $token, $message);
0 ignored issues
show
Bug Best Practice introduced by
The return type of return $this->createCurs...nse, $token, $message); (TBolier\RethinkQL\Response\Cursor) is incompatible with the return type declared by the interface TBolier\RethinkQL\Connec...onnectionInterface::run of type TBolier\RethinkQL\Response\ResponseInterface.

If you return a value from a function or method, it should be a sub-type of the type that is given by the parent type f.e. an interface, or abstract method. This is more formally defined by the Lizkov substitution principle, and guarantees that classes that depend on the parent type can use any instance of a child type interchangably. This principle also belongs to the SOLID principles for object oriented design.

Let’s take a look at an example:

class Author {
    private $name;

    public function __construct($name) {
        $this->name = $name;
    }

    public function getName() {
        return $this->name;
    }
}

abstract class Post {
    public function getAuthor() {
        return 'Johannes';
    }
}

class BlogPost extends Post {
    public function getAuthor() {
        return new Author('Johannes');
    }
}

class ForumPost extends Post { /* ... */ }

function my_function(Post $post) {
    echo strtoupper($post->getAuthor());
}

Our function my_function expects a Post object, and outputs the author of the post. The base class Post returns a simple string and outputting a simple string will work just fine. However, the child class BlogPost which is a sub-type of Post instead decided to return an object, and is therefore violating the SOLID principles. If a BlogPost were passed to my_function, PHP would not complain, but ultimately fail when executing the strtoupper call in its body.

Loading history...
181 8
        } catch (\Exception $e) {
182 8
            throw new ConnectionException($e->getMessage(), $e->getCode(), $e);
183
        }
184
    }
185
186
    /**
187
     * @inheritdoc
188
     * @throws ConnectionException
189
     */
190 1
    public function rewindFromCursor(MessageInterface $message): ResponseInterface
191
    {
192 1
        return $this->run($message, true);
193
    }
194
195
    /**
196
     * @param ResponseInterface $response
197
     * @param int $token
198
     * @param MessageInterface $message
199
     * @return Cursor
200
     */
201 3
    private function createCursorFromResponse(ResponseInterface $response, int $token, MessageInterface $message): Cursor
202
    {
203 3
        return new Cursor($this, $token, $response, $message);
204
    }
205
206
    /**
207
     * @param MessageInterface $query
208
     * @return ResponseInterface|Cursor
209
     * @throws ConnectionException
210
     */
211 1
    public function runNoReply(MessageInterface $query)
212
    {
213 1
        $this->noReply = true;
214 1
        $result = $this->run($query);
0 ignored issues
show
Bug Compatibility introduced by
The expression $this->run($query); of type TBolier\RethinkQL\Respon...thinkQL\Response\Cursor adds the type TBolier\RethinkQL\Response\Cursor to the return on line 217 which is incompatible with the return type declared by the interface TBolier\RethinkQL\Connec...onInterface::runNoReply of type TBolier\RethinkQL\Respon...inkQL\Connection\Cursor.
Loading history...
215 1
        $this->noReply = false;
216
217 1
        return $result;
218
    }
219
220
    /**
221
     * @return int
222
     * @throws \Exception
223
     */
224 27
    private function generateToken(): int
225
    {
226
        try {
227 27
            $tries = 0;
228 27
            $maxToken = 1 << 30;
229
            do {
230 27
                $token = random_int(0, $maxToken);
231 25
                $haveCollision = isset($this->activeTokens[$token]);
232 25
            } while ($haveCollision && $tries++ < 1024);
233 25
            if ($haveCollision) {
234 25
                throw new \Exception('Unable to generate a unique token for the query.');
235
            }
236 2
        } catch (\Exception $e) {
237 2
            throw new ConnectionException('Generating the token failed.', $e->getCode(), $e);
238
        }
239
240 25
        return $token;
241
    }
242
243
    /**
244
     * @inheritdoc
245
     * @throws \Exception
246
     */
247 25
    public function writeQuery(int $token, MessageInterface $message): int
248
    {
249 25
        $message->setOptions((new QueryOptions())->setDb($this->dbName));
250
251
        try {
252 25
            $request = $this->querySerializer->serialize($message, 'json');
253 2
        } catch (\Exception $e) {
254 2
            throw new Exception('Serializing query message failed.', $e->getCode(), $e);
255
        }
256
257 23
        $requestSize = pack('V', \strlen($request));
258 23
        $binaryToken = pack('V', $token) . pack('V', 0);
259
260 23
        return $this->stream->write($binaryToken . $requestSize . $request);
261
    }
262
263
    /**
264
     * @inheritdoc
265
     * @throws \Exception
266
     */
267
    public function continueQuery(int $token): ResponseInterface
268
    {
269
        $message = (new Message())->setQuery(
270
            new Query([QueryType::CONTINUE])
271
        );
272
273
        $this->writeQuery($token, $message);
274
275
        // Await the response
276
        $response = $this->receiveResponse($token, $message);
277
278
        if ($response->getType() !== ResponseType::SUCCESS_PARTIAL) {
279
            unset($this->activeTokens[$token]);
280
        }
281
282
        return $response;
283
    }
284
285
    /**
286
     * @inheritdoc
287
     * @throws \Exception
288
     */
289
    public function stopQuery(int $token): ResponseInterface
290
    {
291
        $message = (new Message())->setQuery(
292
            new Query([QueryType::STOP])
293
        );
294
295
        $this->writeQuery($token, $message);
296
297
        $response = $this->receiveResponse($token, $message);
298
299
        unset($this->activeTokens[$token]);
300
301
        return $response;
302
    }
303
304
    /**
305
     * @param int $token
306
     * @param MessageInterface $message
307
     * @return ResponseInterface
308
     * @throws \RuntimeException
309
     * @throws ConnectionException
310
     */
311 22
    private function receiveResponse(int $token, MessageInterface $message): ResponseInterface
312
    {
313 22
        $responseHeader = $this->stream->read(4 + 8);
314 22
        $responseHeader = unpack('Vtoken/Vtoken2/Vsize', $responseHeader);
315 22
        $responseToken = $responseHeader['token'];
316 22
        if ($responseHeader['token2'] !== 0) {
317 1
            throw new ConnectionException('Invalid response from server: Invalid token.');
318
        }
319
320 21
        $responseSize = $responseHeader['size'];
321 21
        $responseBuf = $this->stream->read($responseSize);
322
323
        /** @var ResponseInterface $response */
324 21
        $response = $this->responseSerializer->deserialize($responseBuf, Response::class, 'json');
325 21
        $this->validateResponse($response, $responseToken, $token, $message);
326
327 16
        return $response;
328
    }
329
330
    /**
331
     * @param ResponseInterface $response
332
     * @param int $responseToken
333
     * @param int $token
334
     * @param MessageInterface $message
335
     * @throws ConnectionException
336
     */
337 21
    private function validateResponse(
338
        ResponseInterface $response,
339
        int $responseToken,
340
        int $token,
341
        MessageInterface $message
342
    ): void {
343 21
        if (!$response->getType()) {
344 1
            throw new ConnectionException('Response message has no type.');
345
        }
346
347 20
        if ($response->getType() === ResponseType::CLIENT_ERROR) {
348 1
            throw new ConnectionException('Server says PHP-RQL is buggy: ' . $response->getData()[0]);
349
        }
350
351 19
        if ($responseToken !== $token) {
352 1
            throw new ConnectionException(
353
                'Received wrong token. Response does not match the request. '
354 1
                . 'Expected ' . $token . ', received ' . $responseToken
355
            );
356
        }
357
358 18
        if ($response->getType() === ResponseType::COMPILE_ERROR) {
359 1
            throw new ConnectionException('Compile error: ' . $response->getData()[0] . ', jsonQuery: ' . json_encode($message));
360
        }
361
362 17
        if ($response->getType() === ResponseType::RUNTIME_ERROR) {
363 1
            throw new ConnectionException('Runtime error: ' . $response->getData()[0] . ', jsonQuery: ' . json_encode($message));
364
        }
365 16
    }
366
367
    /**
368
     * @inheritdoc
369
     */
370 13
    public function use(string $name): void
371
    {
372 13
        $this->dbName = $name;
373 13
    }
374
375
    /**
376
     * @inheritdoc
377
     */
378 33
    public function isStreamOpen(): bool
379
    {
380 33
        return ($this->stream !== null && $this->stream->isWritable());
381
    }
382
383
    /**
384
     * @param MessageInterface $query
385
     * @return array
386
     */
387
    public function changes(MessageInterface $query): array
388
    {
389
        // TODO: Implement changes() method.
390
    }
391
392
    /**
393
     * @return ResponseInterface
394
     * @throws \Exception
395
     */
396 3
    public function server(): ResponseInterface
397
    {
398 3
        if (!$this->isStreamOpen()) {
399 1
            throw new ConnectionException('Not connected.');
400
        }
401
402
        try {
403 2
            $token = $this->generateToken();
404
405 2
            $query = new Message(QueryType::SERVER_INFO);
406 2
            $this->writeQuery($token, $query);
407
408
            // Await the response
409 1
            $response = $this->receiveResponse($token, $query);
410
411 1
            if ($response->getType() !== 5) {
412 1
                throw new ConnectionException('Unexpected response type for server query.');
413
            }
414 1
        } catch (\Exception $e) {
415 1
            throw new ConnectionException($e->getMessage(), $e->getCode(), $e);
416
        }
417
418 1
        return $response;
419
    }
420
421
    /**
422
     * @param string $string
423
     * @return ResponseInterface
424
     * @throws ConnectionException
425
     */
426 1
    public function expr(string $string): ResponseInterface
427
    {
428 1
        $message = new Message();
429 1
        $message->setQueryType(QueryType::START)
430 1
            ->setQuery(new Expr($string));
431
432 1
        return $this->run($message);
433
    }
434
}
435