Passed
Pull Request — master (#14)
by Timon
02:48
created

Connection::writeQuery()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 15
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 2.0625

Importance

Changes 0
Metric Value
dl 0
loc 15
ccs 6
cts 8
cp 0.75
rs 9.4285
c 0
b 0
f 0
cc 2
eloc 9
nc 2
nop 2
crap 2.0625
1
<?php
2
declare(strict_types=1);
3
4
namespace TBolier\RethinkQL\Connection;
5
6
use Psr\Http\Message\StreamInterface;
7
use Symfony\Component\Serializer\SerializerInterface;
8
use TBolier\RethinkQL\Connection\Socket\Exception;
9
use TBolier\RethinkQL\Connection\Socket\HandshakeInterface;
10
use TBolier\RethinkQL\Query\Expr;
11
use TBolier\RethinkQL\Query\Message;
12
use TBolier\RethinkQL\Query\MessageInterface;
13
use TBolier\RethinkQL\Query\Options as QueryOptions;
14
use TBolier\RethinkQL\Query\Query;
15
use TBolier\RethinkQL\Response\Cursor;
16
use TBolier\RethinkQL\Response\Response;
17
use TBolier\RethinkQL\Response\ResponseInterface;
18
use TBolier\RethinkQL\Types\Query\QueryType;
19
use TBolier\RethinkQL\Types\Response\ResponseType;
20
21
class Connection implements ConnectionInterface, ConnectionCursorInterface
22
{
23
    /**
24
     * @var int[]
25
     */
26
    private $activeTokens;
27
    /**
28
     * @var string
29
     */
30
    private $dbName;
31
    /**
32
     * @var HandshakeInterface
33
     */
34
    private $handshake;
35
    /**
36
     * @var bool
37
     */
38
    private $noReply = false;
39
    /**
40
     * @var SerializerInterface
41
     */
42
    private $querySerializer;
43
    /**
44
     * @var SerializerInterface
45
     */
46
    private $responseSerializer;
47
    /**
48
     * @var StreamInterface
49
     */
50
    private $stream;
51
    /**
52
     * @var \Closure
53
     */
54
    private $streamWrapper;
55
56
    /**
57
     * @param \Closure $streamWrapper
58
     * @param HandshakeInterface $handshake
59
     * @param string $dbName
60
     * @param SerializerInterface $querySerializer
61
     * @param SerializerInterface $responseSerializer
62
     */
63 31
    public function __construct(
64
        \Closure $streamWrapper,
65
        HandshakeInterface $handshake,
66
        string $dbName,
67
        SerializerInterface $querySerializer,
68
        SerializerInterface $responseSerializer
69
    ) {
70 31
        $this->streamWrapper = $streamWrapper;
71 31
        $this->dbName = $dbName;
72 31
        $this->handshake = $handshake;
73 31
        $this->querySerializer = $querySerializer;
74 31
        $this->responseSerializer = $responseSerializer;
75 31
    }
76
77
    /**
78
     * @param MessageInterface $query
79
     * @return void
80
     */
81 1
    public function changes(MessageInterface $query): void
82
    {
83
        // TODO: Implement changes() method.
84 1
    }
85
86
    /**
87
     * @param bool $noReplyWait
88
     * @throws ConnectionException
89
     * @throws \Exception
90
     */
91 2
    public function close($noReplyWait = true): void
92
    {
93 2
        if ($noReplyWait) {
94 1
            $this->noReplyWait();
95
        }
96
97 2
        $this->stream->close();
98 2
    }
99
100
    /**
101
     * @inheritdoc
102
     * @throws ConnectionException
103
     */
104 27
    public function connect(): self
105
    {
106 27
        if ($this->stream !== null && $this->stream->isWritable()) {
107 12
            return $this;
108
        }
109
110
        try {
111 27
            $this->stream = ($this->streamWrapper)();
112 27
            $this->handshake->hello($this->stream);
113 1
        } catch (\Exception $e) {
114 1
            throw new ConnectionException($e->getMessage(), $e->getCode(), $e);
115
        }
116
117 26
        return $this;
118
    }
119
120
    /**
121
     * @inheritdoc
122
     * @throws \Exception
123
     */
124 1
    public function continueQuery(int $token): ResponseInterface
125
    {
126 1
         $message = (new Message())->setQuery(
127 1
            new Query([QueryType::CONTINUE])
128
        );
129
130 1
        $this->writeQuery($token, $message);
131
132
        // Await the response
133 1
        $response = $this->receiveResponse($token, $message);
134
135 1
        if ($response->getType() !== ResponseType::SUCCESS_PARTIAL) {
136 1
            unset($this->activeTokens[$token]);
137
        }
138
139 1
        return $response;
140
    }
141
142
    /**
143
     * @param string $string
144
     * @return ResponseInterface
145
     * @throws ConnectionException
146
     */
147 2
    public function expr(string $string): ResponseInterface
148
    {
149 2
        $message = new Message();
150 2
        $message->setQueryType(QueryType::START)
151 2
            ->setQuery(new Expr($string));
152
153 2
        return $this->run($message);
154
    }
155
156
    /**
157
     * @inheritdoc
158
     */
159
    public function isStreamOpen(): bool
160
    {
161
        return ($this->stream !== null && $this->stream->isWritable());
162
    }
163
164
    /**
165
     * @inheritdoc
166
     * @throws ConnectionException
167
     */
168 2
    public function rewindFromCursor(MessageInterface $message): ResponseInterface
169
    {
170 2
        return $this->run($message, true);
171
    }
172
173
    /**
174
     * @param MessageInterface $message
175
     * @param bool $raw
176
     * @return ResponseInterface|Cursor
177
     * @throws ConnectionException
178
     */
179 19
    public function run(MessageInterface $message, $raw = false)
180
    {
181
        try {
182 19
            $token = $this->generateToken();
183
184 19
            $this->writeQuery($token, $message);
185
186 19
            if ($this->noReply) {
187 1
                return;
188
            }
189
190 18
            $response = $this->receiveResponse($token, $message);
191
192 18
            if ($response->getType() === ResponseType::SUCCESS_PARTIAL) {
193 1
                $this->activeTokens[$token] = true;
194
            }
195
196 18
            if ($raw || $response->getType() === ResponseType::SUCCESS_ATOM) {
197 16
                return $response;
198
            }
199
200 4
            return $this->createCursorFromResponse($response, $token, $message);
0 ignored issues
show
Bug Best Practice introduced by
The return type of return $this->createCurs...nse, $token, $message); (TBolier\RethinkQL\Response\Cursor) is incompatible with the return type declared by the interface TBolier\RethinkQL\Connec...onnectionInterface::run of type TBolier\RethinkQL\Response\ResponseInterface.

If you return a value from a function or method, it should be a sub-type of the type that is given by the parent type f.e. an interface, or abstract method. This is more formally defined by the Lizkov substitution principle, and guarantees that classes that depend on the parent type can use any instance of a child type interchangably. This principle also belongs to the SOLID principles for object oriented design.

Let’s take a look at an example:

class Author {
    private $name;

    public function __construct($name) {
        $this->name = $name;
    }

    public function getName() {
        return $this->name;
    }
}

abstract class Post {
    public function getAuthor() {
        return 'Johannes';
    }
}

class BlogPost extends Post {
    public function getAuthor() {
        return new Author('Johannes');
    }
}

class ForumPost extends Post { /* ... */ }

function my_function(Post $post) {
    echo strtoupper($post->getAuthor());
}

Our function my_function expects a Post object, and outputs the author of the post. The base class Post returns a simple string and outputting a simple string will work just fine. However, the child class BlogPost which is a sub-type of Post instead decided to return an object, and is therefore violating the SOLID principles. If a BlogPost were passed to my_function, PHP would not complain, but ultimately fail when executing the strtoupper call in its body.

Loading history...
201
        } catch (\Exception $e) {
202
            throw new ConnectionException($e->getMessage(), $e->getCode(), $e);
203
        }
204
    }
205
206
    /**
207
     * @param MessageInterface $query
208
     * @return ResponseInterface|Cursor
209
     * @throws ConnectionException
210
     */
211 1
    public function runNoReply(MessageInterface $query)
212
    {
213 1
        $this->noReply = true;
214 1
        $result = $this->run($query);
215 1
        $this->noReply = false;
216
217 1
        return $result;
218
    }
219
220
    /**
221
     * @return ResponseInterface
222
     * @throws \Exception
223
     */
224 2
    public function server(): ResponseInterface
225
    {
226
        try {
227 2
            $token = $this->generateToken();
228
229 2
            $query = new Message(QueryType::SERVER_INFO);
230 2
            $this->writeQuery($token, $query);
231
232
            // Await the response
233 2
            $response = $this->receiveResponse($token, $query);
234
235 2
            if ($response->getType() !== 5) {
236 2
                throw new ConnectionException('Unexpected response type for server query.');
237
            }
238
        } catch (\Exception $e) {
239
            throw new ConnectionException($e->getMessage(), $e->getCode(), $e);
240
        }
241
242 2
        return $response;
243
    }
244
245
    /**
246
     * @inheritdoc
247
     * @throws \Exception
248
     */
249 1
    public function stopQuery(int $token): ResponseInterface
250
    {
251 1
        $message = (new Message())->setQuery(
252 1
            new Query([QueryType::STOP])
253
        );
254
255 1
        $this->writeQuery($token, $message);
256
257 1
        $response = $this->receiveResponse($token, $message);
258
259 1
        unset($this->activeTokens[$token]);
260
261 1
        return $response;
262
    }
263
264
    /**
265
     * @inheritdoc
266
     */
267 13
    public function use(string $name): void
268
    {
269 13
        $this->dbName = $name;
270 13
    }
271
272
    /**
273
     * @inheritdoc
274
     * @throws \Exception
275
     */
276 25
    public function writeQuery(int $token, MessageInterface $message): int
277
    {
278 25
        $message->setOptions((new QueryOptions())->setDb($this->dbName));
279
280
        try {
281 25
            $request = $this->querySerializer->serialize($message, 'json');
282
        } catch (\Exception $e) {
283
            throw new Exception('Serializing query message failed.', $e->getCode(), $e);
284
        }
285
286 25
        $requestSize = pack('V', \strlen($request));
287 25
        $binaryToken = pack('V', $token) . pack('V', 0);
288
289 25
        return $this->stream->write($binaryToken . $requestSize . $request);
290
    }
291
292
    /**
293
     * @param ResponseInterface $response
294
     * @param int $token
295
     * @param MessageInterface $message
296
     * @return Cursor
297
     */
298 4
    private function createCursorFromResponse(
299
        ResponseInterface $response,
300
        int $token,
301
        MessageInterface $message
302
    ): Cursor {
303 4
        return new Cursor($this, $token, $response, $message);
304
    }
305
306
    /**
307
     * @return int
308
     * @throws \Exception
309
     */
310 22
    private function generateToken(): int
311
    {
312
        try {
313 22
            $tries = 0;
314 22
            $maxToken = 1 << 30;
315
            do {
316 22
                $token = random_int(0, $maxToken);
317 22
                $haveCollision = isset($this->activeTokens[$token]);
318 22
            } while ($haveCollision && $tries++ < 1024);
319 22
            if ($haveCollision) {
320 22
                throw new \Exception('Unable to generate a unique token for the query.');
321
            }
322
        } catch (\Exception $e) {
323
            throw new ConnectionException('Generating the token failed.', $e->getCode(), $e);
324
        }
325
326 22
        return $token;
327
    }
328
329
    /**
330
     * @throws ConnectionException
331
     * @throws \Exception
332
     */
333 1
    private function noReplyWait(): void
334
    {
335
        try {
336 1
            $token = $this->generateToken();
337
338 1
            $query = new Message(QueryType::NOREPLY_WAIT);
339 1
            $this->writeQuery($token, $query);
340
341
            // Await the response
342 1
            $response = $this->receiveResponse($token, $query);
343
344 1
            if ($response->getType() !== 4) {
345 1
                throw new ConnectionException('Unexpected response type for noreplyWait query.');
346
            }
347
        } catch (\Exception $e) {
348
            throw new ConnectionException($e->getMessage(), $e->getCode(), $e);
349
        }
350 1
    }
351
352
    /**
353
     * @param int $token
354
     * @param MessageInterface $message
355
     * @return ResponseInterface
356
     * @throws \RuntimeException
357
     * @throws ConnectionException
358
     */
359 23
    private function receiveResponse(int $token, MessageInterface $message): ResponseInterface
360
    {
361 23
        $responseHeader = $this->stream->read(4 + 8);
362 23
        if (empty($responseHeader)) {
363
            throw new ConnectionException('Empty response headers received from server.');
364
        }
365
366 23
        $responseHeader = unpack('Vtoken/Vtoken2/Vsize', $responseHeader);
367 23
        $responseToken = $responseHeader['token'];
368 23
        if ($responseHeader['token2'] !== 0) {
369
            throw new ConnectionException('Invalid response from server: Invalid token.');
370
        }
371
372 23
        $responseSize = $responseHeader['size'];
373 23
        $responseBuf = $this->stream->read($responseSize);
374
375
        /** @var ResponseInterface $response */
376 23
        $response = $this->responseSerializer->deserialize($responseBuf, Response::class, 'json');
377 23
        $this->validateResponse($response, $responseToken, $token, $message);
378
379 23
        return $response;
380
    }
381
382
    /**
383
     * @param ResponseInterface $response
384
     * @param int $responseToken
385
     * @param int $token
386
     * @param MessageInterface $message
387
     * @throws ConnectionException
388
     */
389 23
    private function validateResponse(
390
        ResponseInterface $response,
391
        int $responseToken,
392
        int $token,
393
        MessageInterface $message
394
    ): void {
395 23
        if (!$response->getType()) {
396
            throw new ConnectionException('Response message has no type.');
397
        }
398
399 23
        if ($response->getType() === ResponseType::CLIENT_ERROR) {
400
            throw new ConnectionException('Server says PHP-RQL is buggy: ' . $response->getData()[0]);
401
        }
402
403 23
        if ($responseToken !== $token) {
404
            throw new ConnectionException(
405
                'Received wrong token. Response does not match the request. '
406
                . 'Expected ' . $token . ', received ' . $responseToken
407
            );
408
        }
409
410 23
        if ($response->getType() === ResponseType::COMPILE_ERROR) {
411
            throw new ConnectionException('Compile error: ' . $response->getData()[0] . ', jsonQuery: ' . json_encode($message));
412
        }
413
414 23
        if ($response->getType() === ResponseType::RUNTIME_ERROR) {
415
            throw new ConnectionException('Runtime error: ' . $response->getData()[0] . ', jsonQuery: ' . json_encode($message));
416
        }
417 23
    }
418
}
419