Passed
Pull Request — master (#18)
by Timon
02:50
created

Connection::writeQuery()   A

Complexity

Conditions 3
Paths 4

Size

Total Lines 17
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 7
CRAP Score 3.0987

Importance

Changes 0
Metric Value
dl 0
loc 17
ccs 7
cts 9
cp 0.7778
rs 9.4285
c 0
b 0
f 0
cc 3
eloc 10
nc 4
nop 2
crap 3.0987
1
<?php
2
declare(strict_types=1);
3
4
namespace TBolier\RethinkQL\Connection;
5
6
use Psr\Http\Message\StreamInterface;
7
use Symfony\Component\Serializer\SerializerInterface;
8
use TBolier\RethinkQL\Connection\Socket\Exception;
9
use TBolier\RethinkQL\Connection\Socket\HandshakeInterface;
10
use TBolier\RethinkQL\Message\ExprMessage;
11
use TBolier\RethinkQL\Message\Message;
12
use TBolier\RethinkQL\Message\MessageInterface;
13
use TBolier\RethinkQL\Query\Expr;
14
use TBolier\RethinkQL\Query\Options as QueryOptions;
15
use TBolier\RethinkQL\Response\Cursor;
16
use TBolier\RethinkQL\Response\Response;
17
use TBolier\RethinkQL\Response\ResponseInterface;
18
use TBolier\RethinkQL\Types\Query\QueryType;
19
use TBolier\RethinkQL\Types\Response\ResponseType;
20
21
class Connection implements ConnectionInterface, ConnectionCursorInterface
22
{
23
    /**
24
     * @var int[]
25
     */
26
    private $activeTokens;
27
    /**
28
     * @var string
29
     */
30
    private $dbName;
31
    /**
32
     * @var HandshakeInterface
33
     */
34
    private $handshake;
35
    /**
36
     * @var bool
37
     */
38
    private $noReply = false;
39
    /**
40
     * @var SerializerInterface
41
     */
42
    private $querySerializer;
43
    /**
44
     * @var SerializerInterface
45
     */
46
    private $responseSerializer;
47
    /**
48
     * @var StreamInterface
49
     */
50
    private $stream;
51
    /**
52
     * @var \Closure
53
     */
54
    private $streamWrapper;
55
56
    /**
57
     * @param \Closure $streamWrapper
58
     * @param HandshakeInterface $handshake
59
     * @param string $dbName
60
     * @param SerializerInterface $querySerializer
61
     * @param SerializerInterface $responseSerializer
62
     */
63 32
    public function __construct(
64
        \Closure $streamWrapper,
65
        HandshakeInterface $handshake,
66
        string $dbName,
67
        SerializerInterface $querySerializer,
68
        SerializerInterface $responseSerializer
69
    ) {
70 32
        $this->streamWrapper = $streamWrapper;
71 32
        $this->dbName = $dbName;
72 32
        $this->handshake = $handshake;
73 32
        $this->querySerializer = $querySerializer;
74 32
        $this->responseSerializer = $responseSerializer;
75 32
    }
76
77
    /**
78
     * @param MessageInterface $query
79
     * @return void
80
     */
81 1
    public function changes(MessageInterface $query): void
82
    {
83
        // TODO: Implement changes() method.
84 1
    }
85
86
    /**
87
     * @param bool $noreplyWait
88
     * @throws \Exception
89
     */
90 2
    public function close($noreplyWait = true): void
91
    {
92 2
        if ($noreplyWait) {
93 1
            $this->noreplyWait();
94
        }
95
96 2
        $this->stream->close();
97 2
    }
98
99
    /**
100
     * @inheritdoc
101
     * @throws ConnectionException
102
     */
103 28
    public function connect(): self
104
    {
105 28
        if ($this->stream !== null && $this->stream->isWritable()) {
106 13
            return $this;
107
        }
108
109
        try {
110 28
            $this->stream = ($this->streamWrapper)();
111 28
            $this->handshake->hello($this->stream);
112 1
        } catch (\Exception $e) {
113 1
            throw new ConnectionException($e->getMessage(), $e->getCode(), $e);
114
        }
115
116 27
        return $this;
117
    }
118
119
    /**
120
     * @inheritdoc
121
     * @throws \Exception
122
     */
123
    public function reconnect($noreplyWait = true): Connection
124
    {
125
        $this->close($noreplyWait);
126
127
        return $this->connect();
128
    }
129
130
    /**
131
     * @inheritdoc
132
     * @throws \Exception
133
     */
134 1
    public function continueQuery(int $token): ResponseInterface
135
    {
136 1
        $message = (new Message())->setQuery(
137 1
            [QueryType::CONTINUE]
138
        );
139
140 1
        $this->writeQuery($token, $message);
141
142
        // Await the response
143 1
        $response = $this->receiveResponse($token, $message);
144
145 1
        if ($response->getType() !== ResponseType::SUCCESS_PARTIAL) {
146 1
            unset($this->activeTokens[$token]);
147
        }
148
149 1
        return $response;
150
    }
151
152
    /**
153
     * @param string $string
154
     * @return ResponseInterface
155
     * @throws ConnectionException
156
     */
157 2
    public function expr(string $string): ResponseInterface
158
    {
159 2
        $message = new ExprMessage();
160 2
        $message->setCommand(QueryType::START)
161 2
            ->setQuery($string);
162
163 2
        return $this->run($message);
164
    }
165
166
    /**
167
     * @inheritdoc
168
     * @throws ConnectionException
169
     */
170 1
    public function rewindFromCursor(MessageInterface $message): ResponseInterface
171
    {
172 1
        return $this->run($message, true);
173
    }
174
175
    /**
176
     * @inheritdoc
177
     * @throws ConnectionException
178
     */
179 20
    public function run(MessageInterface $message, $raw = false)
180
    {
181
        try {
182 20
            $token = $this->generateToken();
183
184 20
            $this->writeQuery($token, $message);
185
186 20
            if ($this->noReply) {
187 1
                return null;
188
            }
189
190 19
            $response = $this->receiveResponse($token, $message);
191
192 19
            if ($response->getType() === ResponseType::SUCCESS_PARTIAL) {
193 1
                $this->activeTokens[$token] = true;
194
            }
195
196 19
            if ($raw || $response->getType() === ResponseType::SUCCESS_ATOM) {
197 17
                return $response;
198
            }
199
200 3
            return $this->createCursorFromResponse($response, $token, $message);
0 ignored issues
show
Bug Best Practice introduced by
The return type of return $this->createCurs...nse, $token, $message); (TBolier\RethinkQL\Response\Cursor) is incompatible with the return type declared by the interface TBolier\RethinkQL\Connec...onnectionInterface::run of type Iterable|TBolier\Rethink...ponse\ResponseInterface.

If you return a value from a function or method, it should be a sub-type of the type that is given by the parent type f.e. an interface, or abstract method. This is more formally defined by the Lizkov substitution principle, and guarantees that classes that depend on the parent type can use any instance of a child type interchangably. This principle also belongs to the SOLID principles for object oriented design.

Let’s take a look at an example:

class Author {
    private $name;

    public function __construct($name) {
        $this->name = $name;
    }

    public function getName() {
        return $this->name;
    }
}

abstract class Post {
    public function getAuthor() {
        return 'Johannes';
    }
}

class BlogPost extends Post {
    public function getAuthor() {
        return new Author('Johannes');
    }
}

class ForumPost extends Post { /* ... */ }

function my_function(Post $post) {
    echo strtoupper($post->getAuthor());
}

Our function my_function expects a Post object, and outputs the author of the post. The base class Post returns a simple string and outputting a simple string will work just fine. However, the child class BlogPost which is a sub-type of Post instead decided to return an object, and is therefore violating the SOLID principles. If a BlogPost were passed to my_function, PHP would not complain, but ultimately fail when executing the strtoupper call in its body.

Loading history...
201
        } catch (\Exception $e) {
202
            throw new ConnectionException($e->getMessage(), $e->getCode(), $e);
203
        }
204
    }
205
206
    /**
207
     * @inheritdoc
208
     * @throws ConnectionException
209
     */
210 1
    public function runNoReply(MessageInterface $query): void
211
    {
212 1
        $this->noReply = true;
213 1
        $this->run($query);
214 1
        $this->noReply = false;
215 1
    }
216
217
    /**
218
     * @inheritdoc
219
     * @throws \Exception
220
     */
221 2
    public function server(): ResponseInterface
222
    {
223
        try {
224 2
            $token = $this->generateToken();
225
226 2
            $query = new Message(QueryType::SERVER_INFO);
227 2
            $this->writeQuery($token, $query);
228
229 2
            $response = $this->receiveResponse($token, $query);
230
231 2
            if ($response->getType() !== 5) {
232 2
                throw new ConnectionException('Unexpected response type for server query.');
233
            }
234
        } catch (\Exception $e) {
235
            throw new ConnectionException($e->getMessage(), $e->getCode(), $e);
236
        }
237
238 2
        return $response;
239
    }
240
241
    /**
242
     * @inheritdoc
243
     * @throws \Exception
244
     */
245 1
    public function stopQuery(int $token): ResponseInterface
246
    {
247 1
        $message = (new Message())->setQuery(
248 1
            [QueryType::STOP]
249
        );
250
251 1
        $this->writeQuery($token, $message);
252
253 1
        $response = $this->receiveResponse($token, $message);
254
255 1
        unset($this->activeTokens[$token]);
256
257 1
        return $response;
258
    }
259
260
    /**
261
     * @inheritdoc
262
     */
263 14
    public function use(string $name): void
264
    {
265 14
        $this->dbName = $name;
266 14
    }
267
268
    /**
269
     * @inheritdoc
270
     * @throws \Exception
271
     */
272 26
    public function writeQuery(int $token, MessageInterface $message): int
273
    {
274 26
        if ($this->dbName) {
275 26
            $message->setOptions((new QueryOptions())->setDb($this->dbName));
276
        }
277
278
        try {
279 26
            $request = $this->querySerializer->serialize($message, 'json');
280
        } catch (\Exception $e) {
281
            throw new Exception('Serializing query message failed.', $e->getCode(), $e);
282
        }
283
284 26
        $requestSize = pack('V', \strlen($request));
285 26
        $binaryToken = pack('V', $token) . pack('V', 0);
286
287 26
        return $this->stream->write($binaryToken . $requestSize . $request);
288
    }
289
290
    /**
291
     * @inheritdoc
292
     * @throws ConnectionException
293
     * @throws \Exception
294
     */
295 1
    public function noreplyWait(): void
296
    {
297
        try {
298 1
            $token = $this->generateToken();
299
300 1
            $query = new Message(QueryType::NOREPLY_WAIT);
301 1
            $this->writeQuery($token, $query);
302
303 1
            $response = $this->receiveResponse($token, $query);
304
305 1
            if ($response->getType() !== 4) {
306 1
                throw new ConnectionException('Unexpected response type for noreplyWait query.');
307
            }
308
        } catch (\Exception $e) {
309
            throw new ConnectionException($e->getMessage(), $e->getCode(), $e);
310
        }
311 1
    }
312
313
    /**
314
     * @param ResponseInterface $response
315
     * @param int $token
316
     * @param MessageInterface $message
317
     * @return Cursor
318
     */
319 3
    private function createCursorFromResponse(
320
        ResponseInterface $response,
321
        int $token,
322
        MessageInterface $message
323
    ): Cursor {
324 3
        return new Cursor($this, $token, $response, $message);
325
    }
326
327
    /**
328
     * @return int
329
     * @throws \Exception
330
     */
331 23
    private function generateToken(): int
332
    {
333
        try {
334 23
            $tries = 0;
335 23
            $maxToken = 1 << 30;
336
            do {
337 23
                $token = random_int(0, $maxToken);
338 23
                $haveCollision = isset($this->activeTokens[$token]);
339 23
            } while ($haveCollision && $tries++ < 1024);
340 23
            if ($haveCollision) {
341 23
                throw new \Exception('Unable to generate a unique token for the query.');
342
            }
343
        } catch (\Exception $e) {
344
            throw new ConnectionException('Generating the token failed.', $e->getCode(), $e);
345
        }
346
347 23
        return $token;
348
    }
349
350
    /**
351
     * @param int $token
352
     * @param MessageInterface $message
353
     * @return ResponseInterface
354
     * @throws \RuntimeException
355
     * @throws ConnectionException
356
     */
357 24
    private function receiveResponse(int $token, MessageInterface $message): ResponseInterface
358
    {
359 24
        $responseHeader = $this->stream->read(4 + 8);
360 24
        if (empty($responseHeader)) {
361
            throw new ConnectionException('Empty response headers received from server.');
362
        }
363
364 24
        $responseHeader = unpack('Vtoken/Vtoken2/Vsize', $responseHeader);
365 24
        $responseToken = $responseHeader['token'];
366 24
        if ($responseHeader['token2'] !== 0) {
367
            throw new ConnectionException('Invalid response from server: Invalid token.');
368
        }
369
370 24
        $responseSize = $responseHeader['size'];
371 24
        $responseBuf = $this->stream->read($responseSize);
372
373
        /** @var ResponseInterface $response */
374 24
        $response = $this->responseSerializer->deserialize($responseBuf, Response::class, 'json');
375 24
        $this->validateResponse($response, $responseToken, $token, $message);
376
377 24
        return $response;
378
    }
379
380
    /**
381
     * @param ResponseInterface $response
382
     * @param int $responseToken
383
     * @param int $token
384
     * @param MessageInterface $message
385
     * @throws ConnectionException
386
     */
387 24
    private function validateResponse(
388
        ResponseInterface $response,
389
        int $responseToken,
390
        int $token,
391
        MessageInterface $message
392
    ): void {
393 24
        if (!$response->getType()) {
394
            throw new ConnectionException('Response message has no type.');
395
        }
396
397 24
        if ($response->getType() === ResponseType::CLIENT_ERROR) {
398
            throw new ConnectionException('Client error: ' . $response->getData()[0] . ' jsonQuery: ' . json_encode($message));
399
        }
400
401 24
        if ($responseToken !== $token) {
402
            throw new ConnectionException(
403
                'Received wrong token. Response does not match the request. '
404
                . 'Expected ' . $token . ', received ' . $responseToken
405
            );
406
        }
407
408 24
        if ($response->getType() === ResponseType::COMPILE_ERROR) {
409
            throw new ConnectionException('Compile error: ' . $response->getData()[0] . ', jsonQuery: ' . json_encode($message));
410
        }
411
412 24
        if ($response->getType() === ResponseType::RUNTIME_ERROR) {
413
            throw new ConnectionException('Runtime error: ' . $response->getData()[0] . ', jsonQuery: ' . json_encode($message));
414
        }
415 24
    }
416
}
417