Passed
Push — master ( 395e3e...1ef20b )
by Timon
02:23
created

Connection::isStreamOpen()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 6

Importance

Changes 0
Metric Value
dl 0
loc 4
ccs 0
cts 2
cp 0
rs 10
c 0
b 0
f 0
cc 2
eloc 2
nc 2
nop 0
crap 6
1
<?php
2
declare(strict_types=1);
3
4
namespace TBolier\RethinkQL\Connection;
5
6
use Psr\Http\Message\StreamInterface;
7
use Symfony\Component\Serializer\SerializerInterface;
8
use TBolier\RethinkQL\Connection\Socket\Exception;
9
use TBolier\RethinkQL\Connection\Socket\HandshakeInterface;
10
use TBolier\RethinkQL\Query\Expr;
11
use TBolier\RethinkQL\Query\Message;
12
use TBolier\RethinkQL\Query\MessageInterface;
13
use TBolier\RethinkQL\Query\Options as QueryOptions;
14
use TBolier\RethinkQL\Query\Query;
15
use TBolier\RethinkQL\Response\Cursor;
16
use TBolier\RethinkQL\Response\Response;
17
use TBolier\RethinkQL\Response\ResponseInterface;
18
use TBolier\RethinkQL\Types\Query\QueryType;
19
use TBolier\RethinkQL\Types\Response\ResponseType;
20
21
class Connection implements ConnectionInterface, ConnectionCursorInterface
22
{
23
    /**
24
     * @var int[]
25
     */
26
    private $activeTokens;
27
    /**
28
     * @var string
29
     */
30
    private $dbName;
31
    /**
32
     * @var HandshakeInterface
33
     */
34
    private $handshake;
35
    /**
36
     * @var bool
37
     */
38
    private $noReply = false;
39
    /**
40
     * @var SerializerInterface
41
     */
42
    private $querySerializer;
43
    /**
44
     * @var SerializerInterface
45
     */
46
    private $responseSerializer;
47
    /**
48
     * @var StreamInterface
49
     */
50
    private $stream;
51
    /**
52
     * @var \Closure
53
     */
54
    private $streamWrapper;
55
56
    /**
57
     * @param \Closure $streamWrapper
58
     * @param HandshakeInterface $handshake
59
     * @param string $dbName
60
     * @param SerializerInterface $querySerializer
61
     * @param SerializerInterface $responseSerializer
62
     */
63 31
    public function __construct(
64
        \Closure $streamWrapper,
65
        HandshakeInterface $handshake,
66
        string $dbName,
67
        SerializerInterface $querySerializer,
68
        SerializerInterface $responseSerializer
69
    ) {
70 31
        $this->streamWrapper = $streamWrapper;
71 31
        $this->dbName = $dbName;
72 31
        $this->handshake = $handshake;
73 31
        $this->querySerializer = $querySerializer;
74 31
        $this->responseSerializer = $responseSerializer;
75 31
    }
76
77
    /**
78
     * @param MessageInterface $query
79
     * @return void
80
     */
81 1
    public function changes(MessageInterface $query): void
82
    {
83
        // TODO: Implement changes() method.
84 1
    }
85
86
    /**
87
     * @param bool $noreplyWait
88
     * @throws \Exception
89
     */
90 2
    public function close($noreplyWait = true): void
91
    {
92 2
        if ($noreplyWait) {
93 1
            $this->noreplyWait();
94
        }
95
96 2
        $this->stream->close();
97 2
    }
98
99
    /**
100
     * @inheritdoc
101
     * @throws ConnectionException
102
     */
103 27
    public function connect(): self
104
    {
105 27
        if ($this->stream !== null && $this->stream->isWritable()) {
106 12
            return $this;
107
        }
108
109
        try {
110 27
            $this->stream = ($this->streamWrapper)();
111 27
            $this->handshake->hello($this->stream);
112 1
        } catch (\Exception $e) {
113 1
            throw new ConnectionException($e->getMessage(), $e->getCode(), $e);
114
        }
115
116 26
        return $this;
117
    }
118
119
    /**
120
     * @inheritdoc
121
     * @throws \Exception
122
     */
123
    public function reconnect($noreplyWait = true): Connection
124
    {
125
        $this->close($noreplyWait);
126
127
        return $this->connect();
128
    }
129
130
    /**
131
     * @inheritdoc
132
     * @throws \Exception
133
     */
134 1
    public function continueQuery(int $token): ResponseInterface
135
    {
136 1
        $message = (new Message())->setQuery(
137 1
            new Query([QueryType::CONTINUE])
138
        );
139
140 1
        $this->writeQuery($token, $message);
141
142
        // Await the response
143 1
        $response = $this->receiveResponse($token, $message);
144
145 1
        if ($response->getType() !== ResponseType::SUCCESS_PARTIAL) {
146 1
            unset($this->activeTokens[$token]);
147
        }
148
149 1
        return $response;
150
    }
151
152
    /**
153
     * @param string $string
154
     * @return ResponseInterface
155
     * @throws ConnectionException
156
     */
157 2
    public function expr(string $string): ResponseInterface
158
    {
159 2
        $message = new Message();
160 2
        $message->setQueryType(QueryType::START)
161 2
            ->setQuery(new Expr($string));
162
163 2
        return $this->run($message);
164
    }
165
166
    /**
167
     * @inheritdoc
168
     * @throws ConnectionException
169
     */
170 2
    public function rewindFromCursor(MessageInterface $message): ResponseInterface
171
    {
172 2
        return $this->run($message, true);
173
    }
174
175
    /**
176
     * @inheritdoc
177
     * @throws ConnectionException
178
     */
179 19
    public function run(MessageInterface $message, $raw = false)
180
    {
181
        try {
182 19
            $token = $this->generateToken();
183
184 19
            $this->writeQuery($token, $message);
185
186 19
            if ($this->noReply) {
187 1
                return;
188
            }
189
190 18
            $response = $this->receiveResponse($token, $message);
191
192 18
            if ($response->getType() === ResponseType::SUCCESS_PARTIAL) {
193 1
                $this->activeTokens[$token] = true;
194
            }
195
196 18
            if ($raw || $response->getType() === ResponseType::SUCCESS_ATOM) {
197 16
                return $response;
198
            }
199
200 4
            return $this->createCursorFromResponse($response, $token, $message);
0 ignored issues
show
Bug Best Practice introduced by
The return type of return $this->createCurs...nse, $token, $message); (TBolier\RethinkQL\Response\Cursor) is incompatible with the return type declared by the interface TBolier\RethinkQL\Connec...onnectionInterface::run of type TBolier\RethinkQL\Response\ResponseInterface.

If you return a value from a function or method, it should be a sub-type of the type that is given by the parent type f.e. an interface, or abstract method. This is more formally defined by the Lizkov substitution principle, and guarantees that classes that depend on the parent type can use any instance of a child type interchangably. This principle also belongs to the SOLID principles for object oriented design.

Let’s take a look at an example:

class Author {
    private $name;

    public function __construct($name) {
        $this->name = $name;
    }

    public function getName() {
        return $this->name;
    }
}

abstract class Post {
    public function getAuthor() {
        return 'Johannes';
    }
}

class BlogPost extends Post {
    public function getAuthor() {
        return new Author('Johannes');
    }
}

class ForumPost extends Post { /* ... */ }

function my_function(Post $post) {
    echo strtoupper($post->getAuthor());
}

Our function my_function expects a Post object, and outputs the author of the post. The base class Post returns a simple string and outputting a simple string will work just fine. However, the child class BlogPost which is a sub-type of Post instead decided to return an object, and is therefore violating the SOLID principles. If a BlogPost were passed to my_function, PHP would not complain, but ultimately fail when executing the strtoupper call in its body.

Loading history...
201
        } catch (\Exception $e) {
202
            throw new ConnectionException($e->getMessage(), $e->getCode(), $e);
203
        }
204
    }
205
206
    /**
207
     * @inheritdoc
208
     * @throws ConnectionException
209
     */
210 1
    public function runNoReply(MessageInterface $query)
211
    {
212 1
        $this->noReply = true;
213 1
        $result = $this->run($query);
214 1
        $this->noReply = false;
215
216 1
        return $result;
217
    }
218
219
    /**
220
     * @inheritdoc
221
     * @throws \Exception
222
     */
223 2
    public function server(): ResponseInterface
224
    {
225
        try {
226 2
            $token = $this->generateToken();
227
228 2
            $query = new Message(QueryType::SERVER_INFO);
229 2
            $this->writeQuery($token, $query);
230
231 2
            $response = $this->receiveResponse($token, $query);
232
233 2
            if ($response->getType() !== 5) {
234 2
                throw new ConnectionException('Unexpected response type for server query.');
235
            }
236
        } catch (\Exception $e) {
237
            throw new ConnectionException($e->getMessage(), $e->getCode(), $e);
238
        }
239
240 2
        return $response;
241
    }
242
243
    /**
244
     * @inheritdoc
245
     * @throws \Exception
246
     */
247 1
    public function stopQuery(int $token): ResponseInterface
248
    {
249 1
        $message = (new Message())->setQuery(
250 1
            new Query([QueryType::STOP])
251
        );
252
253 1
        $this->writeQuery($token, $message);
254
255 1
        $response = $this->receiveResponse($token, $message);
256
257 1
        unset($this->activeTokens[$token]);
258
259 1
        return $response;
260
    }
261
262
    /**
263
     * @inheritdoc
264
     */
265 13
    public function use(string $name): void
266
    {
267 13
        $this->dbName = $name;
268 13
    }
269
270
    /**
271
     * @inheritdoc
272
     * @throws \Exception
273
     */
274 25
    public function writeQuery(int $token, MessageInterface $message): int
275
    {
276 25
        $message->setOptions((new QueryOptions())->setDb($this->dbName));
277
278
        try {
279 25
            $request = $this->querySerializer->serialize($message, 'json');
280
        } catch (\Exception $e) {
281
            throw new Exception('Serializing query message failed.', $e->getCode(), $e);
282
        }
283
284 25
        $requestSize = pack('V', \strlen($request));
285 25
        $binaryToken = pack('V', $token) . pack('V', 0);
286
287 25
        return $this->stream->write($binaryToken . $requestSize . $request);
288
    }
289
290
    /**
291
     * @inheritdoc
292
     * @throws ConnectionException
293
     * @throws \Exception
294
     */
295 1
    public function noreplyWait(): void
296
    {
297
        try {
298 1
            $token = $this->generateToken();
299
300 1
            $query = new Message(QueryType::NOREPLY_WAIT);
301 1
            $this->writeQuery($token, $query);
302
303 1
            $response = $this->receiveResponse($token, $query);
304
305 1
            if ($response->getType() !== 4) {
306 1
                throw new ConnectionException('Unexpected response type for noreplyWait query.');
307
            }
308
        } catch (\Exception $e) {
309
            throw new ConnectionException($e->getMessage(), $e->getCode(), $e);
310
        }
311 1
    }
312
313
    /**
314
     * @param ResponseInterface $response
315
     * @param int $token
316
     * @param MessageInterface $message
317
     * @return Cursor
318
     */
319 4
    private function createCursorFromResponse(
320
        ResponseInterface $response,
321
        int $token,
322
        MessageInterface $message
323
    ): Cursor {
324 4
        return new Cursor($this, $token, $response, $message);
325
    }
326
327
    /**
328
     * @return int
329
     * @throws \Exception
330
     */
331 22
    private function generateToken(): int
332
    {
333
        try {
334 22
            $tries = 0;
335 22
            $maxToken = 1 << 30;
336
            do {
337 22
                $token = random_int(0, $maxToken);
338 22
                $haveCollision = isset($this->activeTokens[$token]);
339 22
            } while ($haveCollision && $tries++ < 1024);
340 22
            if ($haveCollision) {
341 22
                throw new \Exception('Unable to generate a unique token for the query.');
342
            }
343
        } catch (\Exception $e) {
344
            throw new ConnectionException('Generating the token failed.', $e->getCode(), $e);
345
        }
346
347 22
        return $token;
348
    }
349
350
    /**
351
     * @param int $token
352
     * @param MessageInterface $message
353
     * @return ResponseInterface
354
     * @throws \RuntimeException
355
     * @throws ConnectionException
356
     */
357 23
    private function receiveResponse(int $token, MessageInterface $message): ResponseInterface
358
    {
359 23
        $responseHeader = $this->stream->read(4 + 8);
360 23
        if (empty($responseHeader)) {
361
            throw new ConnectionException('Empty response headers received from server.');
362
        }
363
364 23
        $responseHeader = unpack('Vtoken/Vtoken2/Vsize', $responseHeader);
365 23
        $responseToken = $responseHeader['token'];
366 23
        if ($responseHeader['token2'] !== 0) {
367
            throw new ConnectionException('Invalid response from server: Invalid token.');
368
        }
369
370 23
        $responseSize = $responseHeader['size'];
371 23
        $responseBuf = $this->stream->read($responseSize);
372
373
        /** @var ResponseInterface $response */
374 23
        $response = $this->responseSerializer->deserialize($responseBuf, Response::class, 'json');
375 23
        $this->validateResponse($response, $responseToken, $token, $message);
376
377 23
        return $response;
378
    }
379
380
    /**
381
     * @param ResponseInterface $response
382
     * @param int $responseToken
383
     * @param int $token
384
     * @param MessageInterface $message
385
     * @throws ConnectionException
386
     */
387 23
    private function validateResponse(
388
        ResponseInterface $response,
389
        int $responseToken,
390
        int $token,
391
        MessageInterface $message
392
    ): void {
393 23
        if (!$response->getType()) {
394
            throw new ConnectionException('Response message has no type.');
395
        }
396
397 23
        if ($response->getType() === ResponseType::CLIENT_ERROR) {
398
            throw new ConnectionException('Server says PHP-RQL is buggy: ' . $response->getData()[0]);
399
        }
400
401 23
        if ($responseToken !== $token) {
402
            throw new ConnectionException(
403
                'Received wrong token. Response does not match the request. '
404
                . 'Expected ' . $token . ', received ' . $responseToken
405
            );
406
        }
407
408 23
        if ($response->getType() === ResponseType::COMPILE_ERROR) {
409
            throw new ConnectionException('Compile error: ' . $response->getData()[0] . ', jsonQuery: ' . json_encode($message));
410
        }
411
412 23
        if ($response->getType() === ResponseType::RUNTIME_ERROR) {
413
            throw new ConnectionException('Runtime error: ' . $response->getData()[0] . ', jsonQuery: ' . json_encode($message));
414
        }
415 23
    }
416
}
417