Passed
Pull Request — master (#18)
by Marc
03:37
created

Connection::writeQuery()   A

Complexity

Conditions 3
Paths 4

Size

Total Lines 17
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 7
CRAP Score 3.0987

Importance

Changes 0
Metric Value
dl 0
loc 17
ccs 7
cts 9
cp 0.7778
rs 9.4285
c 0
b 0
f 0
cc 3
eloc 10
nc 4
nop 2
crap 3.0987
1
<?php
2
declare(strict_types=1);
3
4
namespace TBolier\RethinkQL\Connection;
5
6
use Psr\Http\Message\StreamInterface;
7
use Symfony\Component\Serializer\SerializerInterface;
8
use TBolier\RethinkQL\Connection\Socket\Exception;
9
use TBolier\RethinkQL\Connection\Socket\HandshakeInterface;
10
use TBolier\RethinkQL\Query\Expr;
11
use TBolier\RethinkQL\Query\Message;
12
use TBolier\RethinkQL\Query\MessageInterface;
13
use TBolier\RethinkQL\Query\Options as QueryOptions;
14
use TBolier\RethinkQL\Response\Cursor;
15
use TBolier\RethinkQL\Response\Response;
16
use TBolier\RethinkQL\Response\ResponseInterface;
17
use TBolier\RethinkQL\Types\Query\QueryType;
18
use TBolier\RethinkQL\Types\Response\ResponseType;
19
20
class Connection implements ConnectionInterface, ConnectionCursorInterface
21
{
22
    /**
23
     * @var int[]
24
     */
25
    private $activeTokens;
26
    /**
27
     * @var string
28
     */
29
    private $dbName;
30
    /**
31
     * @var HandshakeInterface
32
     */
33
    private $handshake;
34
    /**
35
     * @var bool
36
     */
37
    private $noReply = false;
38
    /**
39
     * @var SerializerInterface
40
     */
41
    private $querySerializer;
42
    /**
43
     * @var SerializerInterface
44
     */
45
    private $responseSerializer;
46
    /**
47
     * @var StreamInterface
48
     */
49
    private $stream;
50
    /**
51
     * @var \Closure
52
     */
53
    private $streamWrapper;
54
55
    /**
56
     * @param \Closure $streamWrapper
57
     * @param HandshakeInterface $handshake
58
     * @param string $dbName
59
     * @param SerializerInterface $querySerializer
60
     * @param SerializerInterface $responseSerializer
61
     */
62 31
    public function __construct(
63
        \Closure $streamWrapper,
64
        HandshakeInterface $handshake,
65
        string $dbName,
66
        SerializerInterface $querySerializer,
67
        SerializerInterface $responseSerializer
68
    ) {
69 31
        $this->streamWrapper = $streamWrapper;
70 31
        $this->dbName = $dbName;
71 31
        $this->handshake = $handshake;
72 31
        $this->querySerializer = $querySerializer;
73 31
        $this->responseSerializer = $responseSerializer;
74 31
    }
75
76
    /**
77
     * @param MessageInterface $query
78
     * @return void
79
     */
80 1
    public function changes(MessageInterface $query): void
81
    {
82
        // TODO: Implement changes() method.
83 1
    }
84
85
    /**
86
     * @param bool $noreplyWait
87
     * @throws \Exception
88
     */
89 2
    public function close($noreplyWait = true): void
90
    {
91 2
        if ($noreplyWait) {
92 1
            $this->noreplyWait();
93
        }
94
95 2
        $this->stream->close();
96 2
    }
97
98
    /**
99
     * @inheritdoc
100
     * @throws ConnectionException
101
     */
102 27
    public function connect(): self
103
    {
104 27
        if ($this->stream !== null && $this->stream->isWritable()) {
105 13
            return $this;
106
        }
107
108
        try {
109 27
            $this->stream = ($this->streamWrapper)();
110 27
            $this->handshake->hello($this->stream);
111 1
        } catch (\Exception $e) {
112 1
            throw new ConnectionException($e->getMessage(), $e->getCode(), $e);
113
        }
114
115 26
        return $this;
116
    }
117
118
    /**
119
     * @inheritdoc
120
     * @throws \Exception
121
     */
122
    public function reconnect($noreplyWait = true): Connection
123
    {
124
        $this->close($noreplyWait);
125
126
        return $this->connect();
127
    }
128
129
    /**
130
     * @inheritdoc
131
     * @throws \Exception
132
     */
133 1
    public function continueQuery(int $token): ResponseInterface
134
    {
135 1
        $message = (new Message())->setQuery(
136 1
           [QueryType::CONTINUE]
137
        );
138
139 1
        $this->writeQuery($token, $message);
140
141
        // Await the response
142 1
        $response = $this->receiveResponse($token, $message);
143
144 1
        if ($response->getType() !== ResponseType::SUCCESS_PARTIAL) {
145 1
            unset($this->activeTokens[$token]);
146
        }
147
148 1
        return $response;
149
    }
150
151
    /**
152
     * @param string $string
153
     * @return ResponseInterface
154
     * @throws ConnectionException
155
     */
156 1
    public function expr(string $string): ResponseInterface
157
    {
158 1
        $message = new Message();
159 1
        $message->setCommand(QueryType::START)
160 1
            ->setQuery([new Expr($string)]);
161
162 1
        return $this->run($message);
163
    }
164
165
    /**
166
     * @inheritdoc
167
     * @throws ConnectionException
168
     */
169 2
    public function rewindFromCursor(MessageInterface $message): ResponseInterface
170
    {
171 2
        return $this->run($message, true);
172
    }
173
174
    /**
175
     * @inheritdoc
176
     * @throws ConnectionException
177
     */
178 19
    public function run(MessageInterface $message, $raw = false)
179
    {
180
        try {
181 19
            $token = $this->generateToken();
182
183 19
             $this->writeQuery($token, $message);
184
185 19
            if ($this->noReply) {
186 1
                return;
187
            }
188
189 18
            $response = $this->receiveResponse($token, $message);
190
191 18
            if ($response->getType() === ResponseType::SUCCESS_PARTIAL) {
192 1
                $this->activeTokens[$token] = true;
193
            }
194
195 18
            if ($raw || $response->getType() === ResponseType::SUCCESS_ATOM) {
196 16
                return $response;
197
            }
198
199 4
            return $this->createCursorFromResponse($response, $token, $message);
0 ignored issues
show
Bug Best Practice introduced by
The return type of return $this->createCurs...nse, $token, $message); (TBolier\RethinkQL\Response\Cursor) is incompatible with the return type declared by the interface TBolier\RethinkQL\Connec...onnectionInterface::run of type TBolier\RethinkQL\Response\ResponseInterface.

If you return a value from a function or method, it should be a sub-type of the type that is given by the parent type f.e. an interface, or abstract method. This is more formally defined by the Lizkov substitution principle, and guarantees that classes that depend on the parent type can use any instance of a child type interchangably. This principle also belongs to the SOLID principles for object oriented design.

Let’s take a look at an example:

class Author {
    private $name;

    public function __construct($name) {
        $this->name = $name;
    }

    public function getName() {
        return $this->name;
    }
}

abstract class Post {
    public function getAuthor() {
        return 'Johannes';
    }
}

class BlogPost extends Post {
    public function getAuthor() {
        return new Author('Johannes');
    }
}

class ForumPost extends Post { /* ... */ }

function my_function(Post $post) {
    echo strtoupper($post->getAuthor());
}

Our function my_function expects a Post object, and outputs the author of the post. The base class Post returns a simple string and outputting a simple string will work just fine. However, the child class BlogPost which is a sub-type of Post instead decided to return an object, and is therefore violating the SOLID principles. If a BlogPost were passed to my_function, PHP would not complain, but ultimately fail when executing the strtoupper call in its body.

Loading history...
200
        } catch (\Exception $e) {
201
            throw new ConnectionException($e->getMessage(), $e->getCode(), $e);
202
        }
203
    }
204
205
    /**
206
     * @inheritdoc
207
     * @throws ConnectionException
208
     */
209 1
    public function runNoReply(MessageInterface $query)
210
    {
211 1
        $this->noReply = true;
212 1
        $result = $this->run($query);
213 1
        $this->noReply = false;
214
215 1
        return $result;
216
    }
217
218
    /**
219
     * @inheritdoc
220
     * @throws \Exception
221
     */
222 1
    public function server(): ResponseInterface
223
    {
224
        try {
225 1
            $token = $this->generateToken();
226
227 1
            $query = new Message(QueryType::SERVER_INFO);
228 1
            $this->writeQuery($token, $query);
229
230 1
            $response = $this->receiveResponse($token, $query);
231
232 1
            if ($response->getType() !== 5) {
233 1
                throw new ConnectionException('Unexpected response type for server query.');
234
            }
235
        } catch (\Exception $e) {
236
            throw new ConnectionException($e->getMessage(), $e->getCode(), $e);
237
        }
238
239 1
        return $response;
240
    }
241
242
    /**
243
     * @inheritdoc
244
     * @throws \Exception
245
     */
246 1
    public function stopQuery(int $token): ResponseInterface
247
    {
248 1
        $message = (new Message())->setQuery(
249 1
           [QueryType::STOP]
250
        );
251
252 1
        $this->writeQuery($token, $message);
253
254 1
        $response = $this->receiveResponse($token, $message);
255
256 1
        unset($this->activeTokens[$token]);
257
258 1
        return $response;
259
    }
260
261
    /**
262
     * @inheritdoc
263
     */
264 14
    public function use(string $name): void
265
    {
266 14
        $this->dbName = $name;
267 14
    }
268
269
    /**
270
     * @inheritdoc
271
     * @throws \Exception
272
     */
273 24
    public function writeQuery(int $token, MessageInterface $message): int
274
    {
275 24
        if ($this->dbName) {
276 24
            $message->setOptions((new QueryOptions())->setDb($this->dbName));
277
        }
278
279
        try {
280 24
            $request = $this->querySerializer->serialize($message, 'json');
281
        } catch (\Exception $e) {
282
            throw new Exception('Serializing query message failed.', $e->getCode(), $e);
283
        }
284
285 24
        $requestSize = pack('V', \strlen($request));
286 24
        $binaryToken = pack('V', $token) . pack('V', 0);
287
288 24
        return $this->stream->write($binaryToken . $requestSize . $request);
289
    }
290
291
    /**
292
     * @inheritdoc
293
     * @throws ConnectionException
294
     * @throws \Exception
295
     */
296 1
    public function noreplyWait(): void
297
    {
298
        try {
299 1
            $token = $this->generateToken();
300
301 1
            $query = new Message(QueryType::NOREPLY_WAIT);
302 1
            $this->writeQuery($token, $query);
303
304 1
            $response = $this->receiveResponse($token, $query);
305
306 1
            if ($response->getType() !== 4) {
307 1
                throw new ConnectionException('Unexpected response type for noreplyWait query.');
308
            }
309
        } catch (\Exception $e) {
310
            throw new ConnectionException($e->getMessage(), $e->getCode(), $e);
311
        }
312 1
    }
313
314
    /**
315
     * @param ResponseInterface $response
316
     * @param int $token
317
     * @param MessageInterface $message
318
     * @return Cursor
319
     */
320 4
    private function createCursorFromResponse(
321
        ResponseInterface $response,
322
        int $token,
323
        MessageInterface $message
324
    ): Cursor {
325 4
        return new Cursor($this, $token, $response, $message);
326
    }
327
328
    /**
329
     * @return int
330
     * @throws \Exception
331
     */
332 21
    private function generateToken(): int
333
    {
334
        try {
335 21
            $tries = 0;
336 21
            $maxToken = 1 << 30;
337
            do {
338 21
                $token = random_int(0, $maxToken);
339 21
                $haveCollision = isset($this->activeTokens[$token]);
340 21
            } while ($haveCollision && $tries++ < 1024);
341 21
            if ($haveCollision) {
342 21
                throw new \Exception('Unable to generate a unique token for the query.');
343
            }
344
        } catch (\Exception $e) {
345
            throw new ConnectionException('Generating the token failed.', $e->getCode(), $e);
346
        }
347
348 21
        return $token;
349
    }
350
351
    /**
352
     * @param int $token
353
     * @param MessageInterface $message
354
     * @return ResponseInterface
355
     * @throws \RuntimeException
356
     * @throws ConnectionException
357
     */
358 22
    private function receiveResponse(int $token, MessageInterface $message): ResponseInterface
359
    {
360 22
        $responseHeader = $this->stream->read(4 + 8);
361 22
        if (empty($responseHeader)) {
362
            throw new ConnectionException('Empty response headers received from server.');
363
        }
364
365 22
        $responseHeader = unpack('Vtoken/Vtoken2/Vsize', $responseHeader);
366 22
        $responseToken = $responseHeader['token'];
367 22
        if ($responseHeader['token2'] !== 0) {
368
            throw new ConnectionException('Invalid response from server: Invalid token.');
369
        }
370
371 22
        $responseSize = $responseHeader['size'];
372 22
        $responseBuf = $this->stream->read($responseSize);
373
374
        /** @var ResponseInterface $response */
375 22
        $response = $this->responseSerializer->deserialize($responseBuf, Response::class, 'json');
376 22
        $this->validateResponse($response, $responseToken, $token, $message);
377
378 22
        return $response;
379
    }
380
381
    /**
382
     * @param ResponseInterface $response
383
     * @param int $responseToken
384
     * @param int $token
385
     * @param MessageInterface $message
386
     * @throws ConnectionException
387
     */
388 22
    private function validateResponse(
389
        ResponseInterface $response,
390
        int $responseToken,
391
        int $token,
392
        MessageInterface $message
393
    ): void {
394 22
        if (!$response->getType()) {
395
            throw new ConnectionException('Response message has no type.');
396
        }
397
398 22
        if ($response->getType() === ResponseType::CLIENT_ERROR) {
399
            throw new ConnectionException('Server says PHP-RQL is buggy: ' . $response->getData()[0]);
400
        }
401
402 22
        if ($responseToken !== $token) {
403
            throw new ConnectionException(
404
                'Received wrong token. Response does not match the request. '
405
                . 'Expected ' . $token . ', received ' . $responseToken
406
            );
407
        }
408
409 22
        if ($response->getType() === ResponseType::COMPILE_ERROR) {
410
            throw new ConnectionException('Compile error: ' . $response->getData()[0] . ', jsonQuery: ' . json_encode($message));
411
        }
412
413 22
        if ($response->getType() === ResponseType::RUNTIME_ERROR) {
414
            throw new ConnectionException('Runtime error: ' . $response->getData()[0] . ', jsonQuery: ' . json_encode($message));
415
        }
416 22
    }
417
}
418