Passed
Pull Request — master (#18)
by Timon
04:12
created

Connection::writeQuery()   A

Complexity

Conditions 3
Paths 4

Size

Total Lines 17
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 7
CRAP Score 3.0987

Importance

Changes 0
Metric Value
dl 0
loc 17
ccs 7
cts 9
cp 0.7778
rs 9.4285
c 0
b 0
f 0
cc 3
eloc 10
nc 4
nop 2
crap 3.0987
1
<?php
2
declare(strict_types=1);
3
4
namespace TBolier\RethinkQL\Connection;
5
6
use Psr\Http\Message\StreamInterface;
7
use Symfony\Component\Serializer\SerializerInterface;
8
use TBolier\RethinkQL\Connection\Socket\Exception;
9
use TBolier\RethinkQL\Connection\Socket\HandshakeInterface;
10
use TBolier\RethinkQL\Message\ExprMessage;
11
use TBolier\RethinkQL\Message\Message;
12
use TBolier\RethinkQL\Message\MessageInterface;
13
use TBolier\RethinkQL\Query\Expr;
14
use TBolier\RethinkQL\Query\Options as QueryOptions;
15
use TBolier\RethinkQL\Response\Cursor;
16
use TBolier\RethinkQL\Response\Response;
17
use TBolier\RethinkQL\Response\ResponseInterface;
18
use TBolier\RethinkQL\Types\Query\QueryType;
19
use TBolier\RethinkQL\Types\Response\ResponseType;
20
21
class Connection implements ConnectionInterface, ConnectionCursorInterface
22
{
23
    /**
24
     * @var int[]
25
     */
26
    private $activeTokens;
27
    /**
28
     * @var string
29
     */
30
    private $dbName;
31
    /**
32
     * @var HandshakeInterface
33
     */
34
    private $handshake;
35
    /**
36
     * @var bool
37
     */
38
    private $noReply = false;
39
    /**
40
     * @var SerializerInterface
41
     */
42
    private $querySerializer;
43
    /**
44
     * @var SerializerInterface
45
     */
46
    private $responseSerializer;
47
    /**
48
     * @var StreamInterface
49
     */
50
    private $stream;
51
    /**
52
     * @var \Closure
53
     */
54
    private $streamWrapper;
55
56
    /**
57
     * @param \Closure $streamWrapper
58
     * @param HandshakeInterface $handshake
59
     * @param string $dbName
60
     * @param SerializerInterface $querySerializer
61
     * @param SerializerInterface $responseSerializer
62
     */
63 32
    public function __construct(
64
        \Closure $streamWrapper,
65
        HandshakeInterface $handshake,
66
        string $dbName,
67
        SerializerInterface $querySerializer,
68
        SerializerInterface $responseSerializer
69
    ) {
70 32
        $this->streamWrapper = $streamWrapper;
71 32
        $this->dbName = $dbName;
72 32
        $this->handshake = $handshake;
73 32
        $this->querySerializer = $querySerializer;
74 32
        $this->responseSerializer = $responseSerializer;
75 32
    }
76
77
    /**
78
     * @param MessageInterface $query
79
     * @return void
80
     */
81 1
    public function changes(MessageInterface $query): void
82
    {
83
        // TODO: Implement changes() method.
84 1
    }
85
86
    /**
87
     * @param bool $noreplyWait
88
     * @throws \Exception
89
     */
90 2
    public function close($noreplyWait = true): void
91
    {
92 2
        if ($noreplyWait) {
93 1
            $this->noreplyWait();
94
        }
95
96 2
        $this->stream->close();
97 2
    }
98
99
    /**
100
     * @inheritdoc
101
     * @throws ConnectionException
102
     */
103 28
    public function connect(): self
104
    {
105 28
        if ($this->stream !== null && $this->stream->isWritable()) {
106 13
            return $this;
107
        }
108
109
        try {
110 28
            $this->stream = ($this->streamWrapper)();
111 28
            $this->handshake->hello($this->stream);
112 1
        } catch (\Exception $e) {
113 1
            throw new ConnectionException($e->getMessage(), $e->getCode(), $e);
114
        }
115
116 27
        return $this;
117
    }
118
119
    /**
120
     * @inheritdoc
121
     * @throws \Exception
122
     */
123
    public function reconnect($noreplyWait = true): Connection
124
    {
125
        $this->close($noreplyWait);
126
127
        return $this->connect();
128
    }
129
130
    /**
131
     * @inheritdoc
132
     * @throws \Exception
133
     */
134 1
    public function continueQuery(int $token): ResponseInterface
135
    {
136 1
        $message = (new Message())->setQuery(
137 1
            [QueryType::CONTINUE]
138
        );
139
140 1
        $this->writeQuery($token, $message);
141
142
        // Await the response
143 1
        $response = $this->receiveResponse($token, $message);
144
145 1
        if ($response->getType() !== ResponseType::SUCCESS_PARTIAL) {
146 1
            unset($this->activeTokens[$token]);
147
        }
148
149 1
        return $response;
150
    }
151
152
    /**
153
     * @param string $string
154
     * @return ResponseInterface
155
     * @throws ConnectionException
156
     */
157 2
    public function expr(string $string): ResponseInterface
158
    {
159 2
        return $this->run(new ExprMessage(QueryType::START, 'foo'));
160
    }
161
162
    /**
163
     * @inheritdoc
164
     * @throws ConnectionException
165
     */
166 1
    public function rewindFromCursor(MessageInterface $message): ResponseInterface
167
    {
168 1
        return $this->run($message, true);
169
    }
170
171
    /**
172
     * @inheritdoc
173
     * @throws ConnectionException
174
     */
175 20
    public function run(MessageInterface $message, $raw = false)
176
    {
177
        try {
178 20
            $token = $this->generateToken();
179
180 20
            $this->writeQuery($token, $message);
181
182 20
            if ($this->noReply) {
183 1
                return null;
184
            }
185
186 19
            $response = $this->receiveResponse($token, $message);
187
188 19
            if ($response->getType() === ResponseType::SUCCESS_PARTIAL) {
189 1
                $this->activeTokens[$token] = true;
190
            }
191
192 19
            if ($raw || $response->getType() === ResponseType::SUCCESS_ATOM) {
193 17
                return $response;
194
            }
195
196 3
            return $this->createCursorFromResponse($response, $token, $message);
0 ignored issues
show
Bug Best Practice introduced by
The return type of return $this->createCurs...nse, $token, $message); (TBolier\RethinkQL\Response\Cursor) is incompatible with the return type declared by the interface TBolier\RethinkQL\Connec...onnectionInterface::run of type Iterable|TBolier\Rethink...ponse\ResponseInterface.

If you return a value from a function or method, it should be a sub-type of the type that is given by the parent type f.e. an interface, or abstract method. This is more formally defined by the Lizkov substitution principle, and guarantees that classes that depend on the parent type can use any instance of a child type interchangably. This principle also belongs to the SOLID principles for object oriented design.

Let’s take a look at an example:

class Author {
    private $name;

    public function __construct($name) {
        $this->name = $name;
    }

    public function getName() {
        return $this->name;
    }
}

abstract class Post {
    public function getAuthor() {
        return 'Johannes';
    }
}

class BlogPost extends Post {
    public function getAuthor() {
        return new Author('Johannes');
    }
}

class ForumPost extends Post { /* ... */ }

function my_function(Post $post) {
    echo strtoupper($post->getAuthor());
}

Our function my_function expects a Post object, and outputs the author of the post. The base class Post returns a simple string and outputting a simple string will work just fine. However, the child class BlogPost which is a sub-type of Post instead decided to return an object, and is therefore violating the SOLID principles. If a BlogPost were passed to my_function, PHP would not complain, but ultimately fail when executing the strtoupper call in its body.

Loading history...
197
        } catch (\Exception $e) {
198
            throw new ConnectionException($e->getMessage(), $e->getCode(), $e);
199
        }
200
    }
201
202
    /**
203
     * @inheritdoc
204
     * @throws ConnectionException
205
     */
206 1
    public function runNoReply(MessageInterface $query): void
207
    {
208 1
        $this->noReply = true;
209 1
        $this->run($query);
210 1
        $this->noReply = false;
211 1
    }
212
213
    /**
214
     * @inheritdoc
215
     * @throws \Exception
216
     */
217 2
    public function server(): ResponseInterface
218
    {
219
        try {
220 2
            $token = $this->generateToken();
221
222 2
            $query = new Message(QueryType::SERVER_INFO);
223 2
            $this->writeQuery($token, $query);
224
225 2
            $response = $this->receiveResponse($token, $query);
226
227 2
            if ($response->getType() !== 5) {
228 2
                throw new ConnectionException('Unexpected response type for server query.');
229
            }
230
        } catch (\Exception $e) {
231
            throw new ConnectionException($e->getMessage(), $e->getCode(), $e);
232
        }
233
234 2
        return $response;
235
    }
236
237
    /**
238
     * @inheritdoc
239
     * @throws \Exception
240
     */
241 1
    public function stopQuery(int $token): ResponseInterface
242
    {
243 1
        $message = (new Message())->setQuery(
244 1
            [QueryType::STOP]
245
        );
246
247 1
        $this->writeQuery($token, $message);
248
249 1
        $response = $this->receiveResponse($token, $message);
250
251 1
        unset($this->activeTokens[$token]);
252
253 1
        return $response;
254
    }
255
256
    /**
257
     * @inheritdoc
258
     */
259 14
    public function use(string $name): void
260
    {
261 14
        $this->dbName = $name;
262 14
    }
263
264
    /**
265
     * @inheritdoc
266
     * @throws \Exception
267
     */
268 26
    public function writeQuery(int $token, MessageInterface $message): int
269
    {
270 26
        if ($this->dbName) {
271 26
            $message->setOptions((new QueryOptions())->setDb($this->dbName));
272
        }
273
274
        try {
275 26
            $request = $this->querySerializer->serialize($message, 'json');
276
        } catch (\Exception $e) {
277
            throw new Exception('Serializing query message failed.', $e->getCode(), $e);
278
        }
279
280 26
        $requestSize = pack('V', \strlen($request));
281 26
        $binaryToken = pack('V', $token) . pack('V', 0);
282
283 26
        return $this->stream->write($binaryToken . $requestSize . $request);
284
    }
285
286
    /**
287
     * @inheritdoc
288
     * @throws ConnectionException
289
     * @throws \Exception
290
     */
291 1
    public function noreplyWait(): void
292
    {
293
        try {
294 1
            $token = $this->generateToken();
295
296 1
            $query = new Message(QueryType::NOREPLY_WAIT);
297 1
            $this->writeQuery($token, $query);
298
299 1
            $response = $this->receiveResponse($token, $query);
300
301 1
            if ($response->getType() !== 4) {
302 1
                throw new ConnectionException('Unexpected response type for noreplyWait query.');
303
            }
304
        } catch (\Exception $e) {
305
            throw new ConnectionException($e->getMessage(), $e->getCode(), $e);
306
        }
307 1
    }
308
309
    /**
310
     * @param ResponseInterface $response
311
     * @param int $token
312
     * @param MessageInterface $message
313
     * @return Cursor
314
     */
315 3
    private function createCursorFromResponse(
316
        ResponseInterface $response,
317
        int $token,
318
        MessageInterface $message
319
    ): Iterable {
320 3
        return new Cursor($this, $token, $response, $message);
321
    }
322
323
    /**
324
     * @return int
325
     * @throws \Exception
326
     */
327 23
    private function generateToken(): int
328
    {
329
        try {
330 23
            $tries = 0;
331 23
            $maxToken = 1 << 30;
332
            do {
333 23
                $token = random_int(0, $maxToken);
334 23
                $haveCollision = isset($this->activeTokens[$token]);
335 23
            } while ($haveCollision && $tries++ < 1024);
336 23
            if ($haveCollision) {
337 23
                throw new \Exception('Unable to generate a unique token for the query.');
338
            }
339
        } catch (\Exception $e) {
340
            throw new ConnectionException('Generating the token failed.', $e->getCode(), $e);
341
        }
342
343 23
        return $token;
344
    }
345
346
    /**
347
     * @param int $token
348
     * @param MessageInterface $message
349
     * @return ResponseInterface
350
     * @throws \RuntimeException
351
     * @throws ConnectionException
352
     */
353 24
    private function receiveResponse(int $token, MessageInterface $message): ResponseInterface
354
    {
355 24
        $responseHeader = $this->stream->read(4 + 8);
356 24
        if (empty($responseHeader)) {
357
            throw new ConnectionException('Empty response headers received from server.');
358
        }
359
360 24
        $responseHeader = unpack('Vtoken/Vtoken2/Vsize', $responseHeader);
361 24
        $responseToken = $responseHeader['token'];
362 24
        if ($responseHeader['token2'] !== 0) {
363
            throw new ConnectionException('Invalid response from server: Invalid token.');
364
        }
365
366 24
        $responseSize = $responseHeader['size'];
367 24
        $responseBuf = $this->stream->read($responseSize);
368
369
        /** @var ResponseInterface $response */
370 24
        $response = $this->responseSerializer->deserialize($responseBuf, Response::class, 'json');
371 24
        $this->validateResponse($response, $responseToken, $token, $message);
372
373 24
        return $response;
374
    }
375
376
    /**
377
     * @param ResponseInterface $response
378
     * @param int $responseToken
379
     * @param int $token
380
     * @param MessageInterface $message
381
     * @throws ConnectionException
382
     */
383 24
    private function validateResponse(
384
        ResponseInterface $response,
385
        int $responseToken,
386
        int $token,
387
        MessageInterface $message
388
    ): void {
389 24
        if (!$response->getType()) {
390
            throw new ConnectionException('Response message has no type.');
391
        }
392
393 24
        if ($response->getType() === ResponseType::CLIENT_ERROR) {
394
            throw new ConnectionException('Client error: ' . $response->getData()[0] . ' jsonQuery: ' . json_encode($message));
395
        }
396
397 24
        if ($responseToken !== $token) {
398
            throw new ConnectionException(
399
                'Received wrong token. Response does not match the request. '
400
                . 'Expected ' . $token . ', received ' . $responseToken
401
            );
402
        }
403
404 24
        if ($response->getType() === ResponseType::COMPILE_ERROR) {
405
            throw new ConnectionException('Compile error: ' . $response->getData()[0] . ', jsonQuery: ' . json_encode($message));
406
        }
407
408 24
        if ($response->getType() === ResponseType::RUNTIME_ERROR) {
409
            throw new ConnectionException('Runtime error: ' . $response->getData()[0] . ', jsonQuery: ' . json_encode($message));
410
        }
411 24
    }
412
}
413