Passed
Pull Request — master (#18)
by Timon
08:58
created

Connection::writeQuery()   A

Complexity

Conditions 3
Paths 4

Size

Total Lines 17
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 7
CRAP Score 3.0987

Importance

Changes 0
Metric Value
dl 0
loc 17
ccs 7
cts 9
cp 0.7778
rs 9.4285
c 0
b 0
f 0
cc 3
eloc 10
nc 4
nop 2
crap 3.0987
1
<?php
2
declare(strict_types=1);
3
4
namespace TBolier\RethinkQL\Connection;
5
6
use Psr\Http\Message\StreamInterface;
7
use Symfony\Component\Serializer\SerializerInterface;
8
use TBolier\RethinkQL\Connection\Socket\Exception;
9
use TBolier\RethinkQL\Connection\Socket\HandshakeInterface;
10
use TBolier\RethinkQL\Message\ExprMessage;
11
use TBolier\RethinkQL\Message\Message;
12
use TBolier\RethinkQL\Message\MessageInterface;
13
use TBolier\RethinkQL\Query\Expr;
14
use TBolier\RethinkQL\Query\Options as QueryOptions;
15
use TBolier\RethinkQL\Response\Cursor;
16
use TBolier\RethinkQL\Response\Response;
17
use TBolier\RethinkQL\Response\ResponseInterface;
18
use TBolier\RethinkQL\Types\Query\QueryType;
19
use TBolier\RethinkQL\Types\Response\ResponseType;
20
21
class Connection implements ConnectionInterface, ConnectionCursorInterface
22
{
23
    /**
24
     * @var int[]
25
     */
26
    private $activeTokens;
27
    /**
28
     * @var string
29
     */
30
    private $dbName;
31
    /**
32
     * @var HandshakeInterface
33
     */
34
    private $handshake;
35
    /**
36
     * @var bool
37
     */
38
    private $noReply = false;
39
    /**
40
     * @var SerializerInterface
41
     */
42
    private $querySerializer;
43
    /**
44
     * @var SerializerInterface
45
     */
46
    private $responseSerializer;
47
    /**
48
     * @var StreamInterface
49
     */
50
    private $stream;
51
    /**
52
     * @var \Closure
53
     */
54
    private $streamWrapper;
55
56
    /**
57
     * @param \Closure $streamWrapper
58
     * @param HandshakeInterface $handshake
59
     * @param string $dbName
60
     * @param SerializerInterface $querySerializer
61
     * @param SerializerInterface $responseSerializer
62
     */
63 32
    public function __construct(
64
        \Closure $streamWrapper,
65
        HandshakeInterface $handshake,
66
        string $dbName,
67
        SerializerInterface $querySerializer,
68
        SerializerInterface $responseSerializer
69
    ) {
70 32
        $this->streamWrapper = $streamWrapper;
71 32
        $this->dbName = $dbName;
72 32
        $this->handshake = $handshake;
73 32
        $this->querySerializer = $querySerializer;
74 32
        $this->responseSerializer = $responseSerializer;
75 32
    }
76
77
    /**
78
     * @param MessageInterface $query
79
     * @return void
80
     */
81 1
    public function changes(MessageInterface $query): void
82
    {
83
        // TODO: Implement changes() method.
84 1
    }
85
86
    /**
87
     * @param bool $noreplyWait
88
     * @throws \Exception
89
     */
90 2
    public function close($noreplyWait = true): void
91
    {
92 2
        if ($noreplyWait) {
93 1
            $this->noreplyWait();
94
        }
95
96 2
        $this->stream->close();
97 2
    }
98
99
    /**
100
     * @inheritdoc
101
     * @throws ConnectionException
102
     */
103 28
    public function connect(): self
104
    {
105 28
        if ($this->stream !== null && $this->stream->isWritable()) {
106 13
            return $this;
107
        }
108
109
        try {
110 28
            $this->stream = ($this->streamWrapper)();
111 28
            $this->handshake->hello($this->stream);
112 1
        } catch (\Exception $e) {
113 1
            throw new ConnectionException($e->getMessage(), $e->getCode(), $e);
114
        }
115
116 27
        return $this;
117
    }
118
119
    /**
120
     * @inheritdoc
121
     * @throws \Exception
122
     */
123
    public function reconnect($noreplyWait = true): Connection
124
    {
125
        $this->close($noreplyWait);
126
127
        return $this->connect();
128
    }
129
130
    /**
131
     * @inheritdoc
132
     * @throws \Exception
133
     */
134 1
    public function continueQuery(int $token): ResponseInterface
135
    {
136 1
        $message = (new Message())->setQuery(
137 1
            [QueryType::CONTINUE]
138
        );
139
140 1
        $this->writeQuery($token, $message);
141
142
        // Await the response
143 1
        $response = $this->receiveResponse($token, $message);
144
145 1
        if ($response->getType() !== ResponseType::SUCCESS_PARTIAL) {
146 1
            unset($this->activeTokens[$token]);
147
        }
148
149 1
        return $response;
150
    }
151
152
    /**
153
     * @param string $string
154
     * @return ResponseInterface
155
     * @throws ConnectionException
156
     */
157 2
    public function expr(string $string): ResponseInterface
158
    {
159 2
        $message = new ExprMessage();
160 2
        $message->setCommand(QueryType::START)
161 2
            ->setQuery(new Expr($string));
0 ignored issues
show
Documentation introduced by
new \TBolier\RethinkQL\Query\Expr($string) is of type object<TBolier\RethinkQL\Query\Expr>, but the function expects a array|string.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
162
163 2
        return $this->run($message);
164
    }
165
166
    /**
167
     * @inheritdoc
168
     * @throws ConnectionException
169
     */
170 2
    public function rewindFromCursor(MessageInterface $message): ResponseInterface
171
    {
172 2
        return $this->run($message, true);
173
    }
174
175
    /**
176
     * @inheritdoc
177
     * @throws ConnectionException
178
     */
179 20
    public function run(MessageInterface $message, $raw = false)
180
    {
181
        try {
182 20
            $token = $this->generateToken();
183
184 20
            $this->writeQuery($token, $message);
185
186 20
            if ($this->noReply) {
187 1
                return;
188
            }
189
190 19
            $response = $this->receiveResponse($token, $message);
191
192 19
            if ($response->getType() === ResponseType::SUCCESS_PARTIAL) {
193 1
                $this->activeTokens[$token] = true;
194
            }
195
196 19
            if ($raw || $response->getType() === ResponseType::SUCCESS_ATOM) {
197 17
                return $response;
198
            }
199
200 4
            return $this->createCursorFromResponse($response, $token, $message);
0 ignored issues
show
Bug Best Practice introduced by
The return type of return $this->createCurs...nse, $token, $message); (TBolier\RethinkQL\Response\Cursor) is incompatible with the return type declared by the interface TBolier\RethinkQL\Connec...onnectionInterface::run of type TBolier\RethinkQL\Response\ResponseInterface.

If you return a value from a function or method, it should be a sub-type of the type that is given by the parent type f.e. an interface, or abstract method. This is more formally defined by the Lizkov substitution principle, and guarantees that classes that depend on the parent type can use any instance of a child type interchangably. This principle also belongs to the SOLID principles for object oriented design.

Let’s take a look at an example:

class Author {
    private $name;

    public function __construct($name) {
        $this->name = $name;
    }

    public function getName() {
        return $this->name;
    }
}

abstract class Post {
    public function getAuthor() {
        return 'Johannes';
    }
}

class BlogPost extends Post {
    public function getAuthor() {
        return new Author('Johannes');
    }
}

class ForumPost extends Post { /* ... */ }

function my_function(Post $post) {
    echo strtoupper($post->getAuthor());
}

Our function my_function expects a Post object, and outputs the author of the post. The base class Post returns a simple string and outputting a simple string will work just fine. However, the child class BlogPost which is a sub-type of Post instead decided to return an object, and is therefore violating the SOLID principles. If a BlogPost were passed to my_function, PHP would not complain, but ultimately fail when executing the strtoupper call in its body.

Loading history...
201
        } catch (\Exception $e) {
202
            throw new ConnectionException($e->getMessage(), $e->getCode(), $e);
203
        }
204
    }
205
206
    /**
207
     * @inheritdoc
208
     * @throws ConnectionException
209
     */
210 1
    public function runNoReply(MessageInterface $query)
211
    {
212 1
        $this->noReply = true;
213 1
        $result = $this->run($query);
214 1
        $this->noReply = false;
215
216 1
        return $result;
217
    }
218
219
    /**
220
     * @inheritdoc
221
     * @throws \Exception
222
     */
223 2
    public function server(): ResponseInterface
224
    {
225
        try {
226 2
            $token = $this->generateToken();
227
228 2
            $query = new Message(QueryType::SERVER_INFO);
229 2
            $this->writeQuery($token, $query);
230
231 2
            $response = $this->receiveResponse($token, $query);
232
233 2
            if ($response->getType() !== 5) {
234 2
                throw new ConnectionException('Unexpected response type for server query.');
235
            }
236
        } catch (\Exception $e) {
237
            throw new ConnectionException($e->getMessage(), $e->getCode(), $e);
238
        }
239
240 2
        return $response;
241
    }
242
243
    /**
244
     * @inheritdoc
245
     * @throws \Exception
246
     */
247 1
    public function stopQuery(int $token): ResponseInterface
248
    {
249 1
        $message = (new Message())->setQuery(
250 1
            [QueryType::STOP]
251
        );
252
253 1
        $this->writeQuery($token, $message);
254
255 1
        $response = $this->receiveResponse($token, $message);
256
257 1
        unset($this->activeTokens[$token]);
258
259 1
        return $response;
260
    }
261
262
    /**
263
     * @inheritdoc
264
     */
265 14
    public function use(string $name): void
266
    {
267 14
        $this->dbName = $name;
268 14
    }
269
270
    /**
271
     * @inheritdoc
272
     * @throws \Exception
273
     */
274 26
    public function writeQuery(int $token, MessageInterface $message): int
275
    {
276 26
        if ($this->dbName) {
277 26
            $message->setOptions((new QueryOptions())->setDb($this->dbName));
278
        }
279
280
        try {
281 26
            $request = $this->querySerializer->serialize($message, 'json');
282
        } catch (\Exception $e) {
283
            throw new Exception('Serializing query message failed.', $e->getCode(), $e);
284
        }
285
286 26
        $requestSize = pack('V', \strlen($request));
287 26
        $binaryToken = pack('V', $token) . pack('V', 0);
288
289 26
        return $this->stream->write($binaryToken . $requestSize . $request);
290
    }
291
292
    /**
293
     * @inheritdoc
294
     * @throws ConnectionException
295
     * @throws \Exception
296
     */
297 1
    public function noreplyWait(): void
298
    {
299
        try {
300 1
            $token = $this->generateToken();
301
302 1
            $query = new Message(QueryType::NOREPLY_WAIT);
303 1
            $this->writeQuery($token, $query);
304
305 1
            $response = $this->receiveResponse($token, $query);
306
307 1
            if ($response->getType() !== 4) {
308 1
                throw new ConnectionException('Unexpected response type for noreplyWait query.');
309
            }
310
        } catch (\Exception $e) {
311
            throw new ConnectionException($e->getMessage(), $e->getCode(), $e);
312
        }
313 1
    }
314
315
    /**
316
     * @param ResponseInterface $response
317
     * @param int $token
318
     * @param MessageInterface $message
319
     * @return Cursor
320
     */
321 4
    private function createCursorFromResponse(
322
        ResponseInterface $response,
323
        int $token,
324
        MessageInterface $message
325
    ): Cursor {
326 4
        return new Cursor($this, $token, $response, $message);
327
    }
328
329
    /**
330
     * @return int
331
     * @throws \Exception
332
     */
333 23
    private function generateToken(): int
334
    {
335
        try {
336 23
            $tries = 0;
337 23
            $maxToken = 1 << 30;
338
            do {
339 23
                $token = random_int(0, $maxToken);
340 23
                $haveCollision = isset($this->activeTokens[$token]);
341 23
            } while ($haveCollision && $tries++ < 1024);
342 23
            if ($haveCollision) {
343 23
                throw new \Exception('Unable to generate a unique token for the query.');
344
            }
345
        } catch (\Exception $e) {
346
            throw new ConnectionException('Generating the token failed.', $e->getCode(), $e);
347
        }
348
349 23
        return $token;
350
    }
351
352
    /**
353
     * @param int $token
354
     * @param MessageInterface $message
355
     * @return ResponseInterface
356
     * @throws \RuntimeException
357
     * @throws ConnectionException
358
     */
359 24
    private function receiveResponse(int $token, MessageInterface $message): ResponseInterface
360
    {
361 24
        $responseHeader = $this->stream->read(4 + 8);
362 24
        if (empty($responseHeader)) {
363
            throw new ConnectionException('Empty response headers received from server.');
364
        }
365
366 24
        $responseHeader = unpack('Vtoken/Vtoken2/Vsize', $responseHeader);
367 24
        $responseToken = $responseHeader['token'];
368 24
        if ($responseHeader['token2'] !== 0) {
369
            throw new ConnectionException('Invalid response from server: Invalid token.');
370
        }
371
372 24
        $responseSize = $responseHeader['size'];
373 24
        $responseBuf = $this->stream->read($responseSize);
374
375
        /** @var ResponseInterface $response */
376 24
        $response = $this->responseSerializer->deserialize($responseBuf, Response::class, 'json');
377 24
        $this->validateResponse($response, $responseToken, $token, $message);
378
379 24
        return $response;
380
    }
381
382
    /**
383
     * @param ResponseInterface $response
384
     * @param int $responseToken
385
     * @param int $token
386
     * @param MessageInterface $message
387
     * @throws ConnectionException
388
     */
389 24
    private function validateResponse(
390
        ResponseInterface $response,
391
        int $responseToken,
392
        int $token,
393
        MessageInterface $message
394
    ): void {
395 24
        if (!$response->getType()) {
396
            throw new ConnectionException('Response message has no type.');
397
        }
398
399 24
        if ($response->getType() === ResponseType::CLIENT_ERROR) {
400
            throw new ConnectionException('Client error: ' . $response->getData()[0] . ' jsonQuery: ' . json_encode($message));
401
        }
402
403 24
        if ($responseToken !== $token) {
404
            throw new ConnectionException(
405
                'Received wrong token. Response does not match the request. '
406
                . 'Expected ' . $token . ', received ' . $responseToken
407
            );
408
        }
409
410 24
        if ($response->getType() === ResponseType::COMPILE_ERROR) {
411
            throw new ConnectionException('Compile error: ' . $response->getData()[0] . ', jsonQuery: ' . json_encode($message));
412
        }
413
414 24
        if ($response->getType() === ResponseType::RUNTIME_ERROR) {
415
            throw new ConnectionException('Runtime error: ' . $response->getData()[0] . ', jsonQuery: ' . json_encode($message));
416
        }
417 24
    }
418
}
419