Passed
Pull Request — master (#18)
by Timon
02:57
created

Connection::writeQuery()   A

Complexity

Conditions 3
Paths 4

Size

Total Lines 17
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 7
CRAP Score 3.0987

Importance

Changes 0
Metric Value
dl 0
loc 17
ccs 7
cts 9
cp 0.7778
rs 9.4285
c 0
b 0
f 0
cc 3
eloc 10
nc 4
nop 2
crap 3.0987
1
<?php
2
declare(strict_types=1);
3
4
namespace TBolier\RethinkQL\Connection;
5
6
use Psr\Http\Message\StreamInterface;
7
use Symfony\Component\Serializer\SerializerInterface;
8
use TBolier\RethinkQL\Connection\Socket\Exception;
9
use TBolier\RethinkQL\Connection\Socket\HandshakeInterface;
10
use TBolier\RethinkQL\Message\ExprMessage;
11
use TBolier\RethinkQL\Message\Message;
12
use TBolier\RethinkQL\Message\MessageInterface;
13
use TBolier\RethinkQL\Query\Options as QueryOptions;
14
use TBolier\RethinkQL\Response\Cursor;
15
use TBolier\RethinkQL\Response\Response;
16
use TBolier\RethinkQL\Response\ResponseInterface;
17
use TBolier\RethinkQL\Types\Query\QueryType;
18
use TBolier\RethinkQL\Types\Response\ResponseType;
19
20
class Connection implements ConnectionInterface, ConnectionCursorInterface
21
{
22
    /**
23
     * @var int[]
24
     */
25
    private $activeTokens;
26
    /**
27
     * @var string
28
     */
29
    private $dbName;
30
    /**
31
     * @var HandshakeInterface
32
     */
33
    private $handshake;
34
    /**
35
     * @var bool
36
     */
37
    private $noReply = false;
38
    /**
39
     * @var SerializerInterface
40
     */
41
    private $querySerializer;
42
    /**
43
     * @var SerializerInterface
44
     */
45
    private $responseSerializer;
46
    /**
47
     * @var StreamInterface
48
     */
49
    private $stream;
50
    /**
51
     * @var \Closure
52
     */
53
    private $streamWrapper;
54
55
    /**
56
     * @param \Closure $streamWrapper
57
     * @param HandshakeInterface $handshake
58
     * @param string $dbName
59
     * @param SerializerInterface $querySerializer
60
     * @param SerializerInterface $responseSerializer
61
     */
62 32
    public function __construct(
63
        \Closure $streamWrapper,
64
        HandshakeInterface $handshake,
65
        string $dbName,
66
        SerializerInterface $querySerializer,
67
        SerializerInterface $responseSerializer
68
    ) {
69 32
        $this->streamWrapper = $streamWrapper;
70 32
        $this->dbName = $dbName;
71 32
        $this->handshake = $handshake;
72 32
        $this->querySerializer = $querySerializer;
73 32
        $this->responseSerializer = $responseSerializer;
74 32
    }
75
76
    /**
77
     * @param MessageInterface $query
78
     * @return void
79
     */
80 1
    public function changes(MessageInterface $query): void
81
    {
82
        // TODO: Implement changes() method.
83 1
    }
84
85
    /**
86
     * @param bool $noreplyWait
87
     * @throws \Exception
88
     */
89 2
    public function close($noreplyWait = true): void
90
    {
91 2
        if ($noreplyWait) {
92 1
            $this->noreplyWait();
93
        }
94
95 2
        $this->stream->close();
96 2
    }
97
98
    /**
99
     * @inheritdoc
100
     * @throws ConnectionException
101
     */
102 28
    public function connect(): self
103
    {
104 28
        if ($this->stream !== null && $this->stream->isWritable()) {
105 13
            return $this;
106
        }
107
108
        try {
109 28
            $this->stream = ($this->streamWrapper)();
110 28
            $this->handshake->hello($this->stream);
111 1
        } catch (\Exception $e) {
112 1
            throw new ConnectionException($e->getMessage(), $e->getCode(), $e);
113
        }
114
115 27
        return $this;
116
    }
117
118
    /**
119
     * @inheritdoc
120
     * @throws \Exception
121
     */
122
    public function reconnect($noreplyWait = true): Connection
123
    {
124
        $this->close($noreplyWait);
125
126
        return $this->connect();
127
    }
128
129
    /**
130
     * @inheritdoc
131
     * @throws \Exception
132
     */
133 1
    public function continueQuery(int $token): ResponseInterface
134
    {
135 1
        $message = (new Message())->setQuery(
136 1
            [QueryType::CONTINUE]
137
        );
138
139 1
        $this->writeQuery($token, $message);
140
141
        // Await the response
142 1
        $response = $this->receiveResponse($token, $message);
143
144 1
        if ($response->getType() !== ResponseType::SUCCESS_PARTIAL) {
145 1
            unset($this->activeTokens[$token]);
146
        }
147
148 1
        return $response;
149
    }
150
151
    /**
152
     * @param string $string
153
     * @return ResponseInterface
154
     * @throws ConnectionException
155
     */
156 2
    public function expr(string $string): ResponseInterface
157
    {
158 2
        return $this->run(new ExprMessage(QueryType::START, 'foo'));
159
    }
160
161
    /**
162
     * @inheritdoc
163
     * @throws ConnectionException
164
     */
165 1
    public function rewindFromCursor(MessageInterface $message): ResponseInterface
166
    {
167 1
        return $this->run($message, true);
168
    }
169
170
    /**
171
     * @inheritdoc
172
     * @throws ConnectionException
173
     */
174 20
    public function run(MessageInterface $message, $raw = false)
175
    {
176
        try {
177 20
            $token = $this->generateToken();
178
179 20
            $this->writeQuery($token, $message);
180
181 20
            if ($this->noReply) {
182 1
                return null;
183
            }
184
185 19
            $response = $this->receiveResponse($token, $message);
186
187 19
            if ($response->getType() === ResponseType::SUCCESS_PARTIAL) {
188 1
                $this->activeTokens[$token] = true;
189
            }
190
191 19
            if ($raw || $response->getType() === ResponseType::SUCCESS_ATOM) {
192 17
                return $response;
193
            }
194
195 3
            return $this->createCursorFromResponse($response, $token, $message);
0 ignored issues
show
Bug Best Practice introduced by
The return type of return $this->createCurs...nse, $token, $message); (TBolier\RethinkQL\Response\Cursor) is incompatible with the return type declared by the interface TBolier\RethinkQL\Connec...onnectionInterface::run of type Iterable|TBolier\Rethink...ponse\ResponseInterface.

If you return a value from a function or method, it should be a sub-type of the type that is given by the parent type f.e. an interface, or abstract method. This is more formally defined by the Lizkov substitution principle, and guarantees that classes that depend on the parent type can use any instance of a child type interchangably. This principle also belongs to the SOLID principles for object oriented design.

Let’s take a look at an example:

class Author {
    private $name;

    public function __construct($name) {
        $this->name = $name;
    }

    public function getName() {
        return $this->name;
    }
}

abstract class Post {
    public function getAuthor() {
        return 'Johannes';
    }
}

class BlogPost extends Post {
    public function getAuthor() {
        return new Author('Johannes');
    }
}

class ForumPost extends Post { /* ... */ }

function my_function(Post $post) {
    echo strtoupper($post->getAuthor());
}

Our function my_function expects a Post object, and outputs the author of the post. The base class Post returns a simple string and outputting a simple string will work just fine. However, the child class BlogPost which is a sub-type of Post instead decided to return an object, and is therefore violating the SOLID principles. If a BlogPost were passed to my_function, PHP would not complain, but ultimately fail when executing the strtoupper call in its body.

Loading history...
196
        } catch (\Exception $e) {
197
            throw new ConnectionException($e->getMessage(), $e->getCode(), $e);
198
        }
199
    }
200
201
    /**
202
     * @inheritdoc
203
     * @throws ConnectionException
204
     */
205 1
    public function runNoReply(MessageInterface $query): void
206
    {
207 1
        $this->noReply = true;
208 1
        $this->run($query);
209 1
        $this->noReply = false;
210 1
    }
211
212
    /**
213
     * @inheritdoc
214
     * @throws \Exception
215
     */
216 2
    public function server(): ResponseInterface
217
    {
218
        try {
219 2
            $token = $this->generateToken();
220
221 2
            $query = new Message(QueryType::SERVER_INFO);
222 2
            $this->writeQuery($token, $query);
223
224 2
            $response = $this->receiveResponse($token, $query);
225
226 2
            if ($response->getType() !== 5) {
227 2
                throw new ConnectionException('Unexpected response type for server query.');
228
            }
229
        } catch (\Exception $e) {
230
            throw new ConnectionException($e->getMessage(), $e->getCode(), $e);
231
        }
232
233 2
        return $response;
234
    }
235
236
    /**
237
     * @inheritdoc
238
     * @throws \Exception
239
     */
240 1
    public function stopQuery(int $token): ResponseInterface
241
    {
242 1
        $message = (new Message())->setQuery(
243 1
            [QueryType::STOP]
244
        );
245
246 1
        $this->writeQuery($token, $message);
247
248 1
        $response = $this->receiveResponse($token, $message);
249
250 1
        unset($this->activeTokens[$token]);
251
252 1
        return $response;
253
    }
254
255
    /**
256
     * @inheritdoc
257
     */
258 14
    public function use(string $name): void
259
    {
260 14
        $this->dbName = $name;
261 14
    }
262
263
    /**
264
     * @inheritdoc
265
     * @throws \Exception
266
     */
267 26
    public function writeQuery(int $token, MessageInterface $message): int
268
    {
269 26
        if ($this->dbName) {
270 26
            $message->setOptions((new QueryOptions())->setDb($this->dbName));
271
        }
272
273
        try {
274 26
            $request = $this->querySerializer->serialize($message, 'json');
275
        } catch (\Exception $e) {
276
            throw new Exception('Serializing query message failed.', $e->getCode(), $e);
277
        }
278
279 26
        $requestSize = pack('V', \strlen($request));
280 26
        $binaryToken = pack('V', $token) . pack('V', 0);
281
282 26
        return $this->stream->write($binaryToken . $requestSize . $request);
283
    }
284
285
    /**
286
     * @inheritdoc
287
     * @throws ConnectionException
288
     * @throws \Exception
289
     */
290 1
    public function noreplyWait(): void
291
    {
292
        try {
293 1
            $token = $this->generateToken();
294
295 1
            $query = new Message(QueryType::NOREPLY_WAIT);
296 1
            $this->writeQuery($token, $query);
297
298 1
            $response = $this->receiveResponse($token, $query);
299
300 1
            if ($response->getType() !== 4) {
301 1
                throw new ConnectionException('Unexpected response type for noreplyWait query.');
302
            }
303
        } catch (\Exception $e) {
304
            throw new ConnectionException($e->getMessage(), $e->getCode(), $e);
305
        }
306 1
    }
307
308
    /**
309
     * @param ResponseInterface $response
310
     * @param int $token
311
     * @param MessageInterface $message
312
     * @return Cursor
313
     */
314 3
    private function createCursorFromResponse(
315
        ResponseInterface $response,
316
        int $token,
317
        MessageInterface $message
318
    ): Iterable {
319 3
        return new Cursor($this, $token, $response, $message);
320
    }
321
322
    /**
323
     * @return int
324
     * @throws \Exception
325
     */
326 23
    private function generateToken(): int
327
    {
328
        try {
329 23
            $tries = 0;
330 23
            $maxToken = 1 << 30;
331
            do {
332 23
                $token = random_int(0, $maxToken);
333 23
                $haveCollision = isset($this->activeTokens[$token]);
334 23
            } while ($haveCollision && $tries++ < 1024);
335 23
            if ($haveCollision) {
336 23
                throw new \Exception('Unable to generate a unique token for the query.');
337
            }
338
        } catch (\Exception $e) {
339
            throw new ConnectionException('Generating the token failed.', $e->getCode(), $e);
340
        }
341
342 23
        return $token;
343
    }
344
345
    /**
346
     * @param int $token
347
     * @param MessageInterface $message
348
     * @return ResponseInterface
349
     * @throws \RuntimeException
350
     * @throws ConnectionException
351
     */
352 24
    private function receiveResponse(int $token, MessageInterface $message): ResponseInterface
353
    {
354 24
        $responseHeader = $this->stream->read(4 + 8);
355 24
        if (empty($responseHeader)) {
356
            throw new ConnectionException('Empty response headers received from server.');
357
        }
358
359 24
        $responseHeader = unpack('Vtoken/Vtoken2/Vsize', $responseHeader);
360 24
        $responseToken = $responseHeader['token'];
361 24
        if ($responseHeader['token2'] !== 0) {
362
            throw new ConnectionException('Invalid response from server: Invalid token.');
363
        }
364
365 24
        $responseSize = $responseHeader['size'];
366 24
        $responseBuf = $this->stream->read($responseSize);
367
368
        /** @var ResponseInterface $response */
369 24
        $response = $this->responseSerializer->deserialize($responseBuf, Response::class, 'json');
370 24
        $this->validateResponse($response, $responseToken, $token, $message);
371
372 24
        return $response;
373
    }
374
375
    /**
376
     * @param ResponseInterface $response
377
     * @param int $responseToken
378
     * @param int $token
379
     * @param MessageInterface $message
380
     * @throws ConnectionException
381
     */
382 24
    private function validateResponse(
383
        ResponseInterface $response,
384
        int $responseToken,
385
        int $token,
386
        MessageInterface $message
387
    ): void {
388 24
        if (!$response->getType()) {
389
            throw new ConnectionException('Response message has no type.');
390
        }
391
392 24
        if ($response->getType() === ResponseType::CLIENT_ERROR) {
393
            throw new ConnectionException('Client error: ' . $response->getData()[0] . ' jsonQuery: ' . json_encode($message));
394
        }
395
396 24
        if ($responseToken !== $token) {
397
            throw new ConnectionException(
398
                'Received wrong token. Response does not match the request. '
399
                . 'Expected ' . $token . ', received ' . $responseToken
400
            );
401
        }
402
403 24
        if ($response->getType() === ResponseType::COMPILE_ERROR) {
404
            throw new ConnectionException('Compile error: ' . $response->getData()[0] . ', jsonQuery: ' . json_encode($message));
405
        }
406
407 24
        if ($response->getType() === ResponseType::RUNTIME_ERROR) {
408
            throw new ConnectionException('Runtime error: ' . $response->getData()[0] . ', jsonQuery: ' . json_encode($message));
409
        }
410 24
    }
411
}
412