Passed
Pull Request — master (#23)
by Marc
03:04
created

Connection::noreplyWait()   A

Complexity

Conditions 3
Paths 7

Size

Total Lines 15
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 7
CRAP Score 3.0987

Importance

Changes 0
Metric Value
dl 0
loc 15
c 0
b 0
f 0
ccs 7
cts 9
cp 0.7778
rs 9.4285
cc 3
eloc 9
nc 7
nop 0
crap 3.0987
1
<?php
2
declare(strict_types=1);
3
4
namespace TBolier\RethinkQL\Connection;
5
6
use Psr\Http\Message\StreamInterface;
7
use Symfony\Component\Serializer\SerializerInterface;
8
use TBolier\RethinkQL\Connection\Socket\Exception;
9
use TBolier\RethinkQL\Connection\Socket\HandshakeInterface;
10
use TBolier\RethinkQL\Message\ExprMessage;
11
use TBolier\RethinkQL\Message\Message;
12
use TBolier\RethinkQL\Message\MessageInterface;
13
use TBolier\RethinkQL\Query\Options as QueryOptions;
14
use TBolier\RethinkQL\Response\Cursor;
15
use TBolier\RethinkQL\Response\Response;
16
use TBolier\RethinkQL\Response\ResponseInterface;
17
use TBolier\RethinkQL\Types\Query\QueryType;
18
use TBolier\RethinkQL\Types\Response\ResponseType;
19
20
class Connection implements ConnectionInterface, ConnectionCursorInterface
21
{
22
    /**
23
     * @var int[]
24
     */
25
    private $activeTokens;
26
    /**
27
     * @var string
28
     */
29
    private $dbName;
30
    /**
31
     * @var HandshakeInterface
32
     */
33
    private $handshake;
34
    /**
35
     * @var bool
36
     */
37
    private $noReply = false;
38
    /**
39
     * @var SerializerInterface
40
     */
41
    private $querySerializer;
42
    /**
43
     * @var SerializerInterface
44
     */
45
    private $responseSerializer;
46
    /**
47
     * @var StreamInterface
48
     */
49
    private $stream;
50
    /**
51
     * @var \Closure
52
     */
53
    private $streamWrapper;
54
55
    /**
56
     * @param \Closure $streamWrapper
57
     * @param HandshakeInterface $handshake
58
     * @param string $dbName
59
     * @param SerializerInterface $querySerializer
60
     * @param SerializerInterface $responseSerializer
61
     */
62 32
    public function __construct(
63
        \Closure $streamWrapper,
64
        HandshakeInterface $handshake,
65
        string $dbName,
66
        SerializerInterface $querySerializer,
67
        SerializerInterface $responseSerializer
68
    ) {
69 32
        $this->streamWrapper = $streamWrapper;
70 32
        $this->dbName = $dbName;
71 32
        $this->handshake = $handshake;
72 32
        $this->querySerializer = $querySerializer;
73 32
        $this->responseSerializer = $responseSerializer;
74 32
    }
75
76
    /**
77
     * @param MessageInterface $query
78
     * @return void
79
     */
80 1
    public function changes(MessageInterface $query): void
81
    {
82
        // TODO: Implement changes() method.
83 1
    }
84
85
    /**
86
     * @param bool $noreplyWait
87
     * @throws \Exception
88
     */
89 2
    public function close($noreplyWait = true): void
90
    {
91 2
        if ($noreplyWait) {
92 1
            $this->noreplyWait();
93
        }
94
95 2
        $this->stream->close();
96 2
    }
97
98
    /**
99
     * @inheritdoc
100
     * @throws ConnectionException
101
     */
102 28
    public function connect(): self
103
    {
104 28
        if ($this->stream !== null && $this->stream->isWritable()) {
105 13
            return $this;
106
        }
107
108
        try {
109 28
            $this->stream = ($this->streamWrapper)();
110 28
            $this->handshake->hello($this->stream);
111 1
        } catch (\Exception $e) {
112 1
            throw new ConnectionException($e->getMessage(), $e->getCode(), $e);
113
        }
114
115 27
        return $this;
116
    }
117
118
    /**
119
     * @inheritdoc
120
     * @throws \Exception
121
     */
122
    public function reconnect($noreplyWait = true): Connection
123
    {
124
        $this->close($noreplyWait);
125
126
        return $this->connect();
127
    }
128
129
    /**
130
     * @inheritdoc
131
     * @throws \Exception
132
     */
133 1
    public function continueQuery(int $token): ResponseInterface
134
    {
135 1
        $message = (new Message())->setQuery(
136 1
            [QueryType::CONTINUE]
137
        );
138
139 1
        $this->writeQuery($token, $message);
140
141
        // Await the response
142 1
        $response = $this->receiveResponse($token, $message);
143
144 1
        if ($response->getType() !== ResponseType::SUCCESS_PARTIAL) {
145 1
            unset($this->activeTokens[$token]);
146
        }
147
148 1
        return $response;
149
    }
150
151
    /**
152
     * @param string $string
153
     * @return ResponseInterface
154
     * @throws ConnectionException
155
     */
156 2
    public function expr(string $string): ResponseInterface
157
    {
158 2
        return $this->run(new ExprMessage(QueryType::START, 'foo'));
0 ignored issues
show
Bug Best Practice introduced by
The expression return $this->run(new TB...eryType::START, 'foo')) returns the type iterable which is incompatible with the type-hinted return TBolier\RethinkQL\Response\ResponseInterface.
Loading history...
159
    }
160
161
    /**
162
     * @inheritdoc
163
     * @throws ConnectionException
164
     */
165 1
    public function rewindFromCursor(MessageInterface $message): ResponseInterface
166
    {
167 1
        return $this->run($message, true);
168
    }
169
170
    /**
171
     * @inheritdoc
172
     * @throws ConnectionException
173
     */
174 20
    public function run(MessageInterface $message, $raw = false)
175
    {
176
        try {
177 20
            $token = $this->generateToken();
178
179 20
            $this->writeQuery($token, $message);
180
181 20
            if ($this->noReply) {
182 1
                return null;
183
            }
184
185 19
            $response = $this->receiveResponse($token, $message);
186
187 19
            if ($response->getType() === ResponseType::SUCCESS_PARTIAL) {
188 1
                $this->activeTokens[$token] = true;
189
            }
190
191 19
            if ($raw || $response->getType() === ResponseType::SUCCESS_ATOM) {
192 17
                return $response;
193
            }
194
195 3
            return $this->createCursorFromResponse($response, $token, $message);
196
        } catch (\Exception $e) {
197
            throw new ConnectionException($e->getMessage(), $e->getCode(), $e);
198
        }
199
    }
200
201
    /**
202
     * @inheritdoc
203
     * @throws ConnectionException
204
     */
205 1
    public function runNoReply(MessageInterface $query): void
206
    {
207 1
        $this->noReply = true;
208 1
        $this->run($query);
209 1
        $this->noReply = false;
210 1
    }
211
212
    /**
213
     * @inheritdoc
214
     * @throws \Exception
215
     */
216 2
    public function server(): ResponseInterface
217
    {
218
        try {
219 2
            $token = $this->generateToken();
220
221 2
            $query = new Message(QueryType::SERVER_INFO);
222 2
            $this->writeQuery($token, $query);
223
224 2
            $response = $this->receiveResponse($token, $query);
225
226 2
            if ($response->getType() !== 5) {
227 2
                throw new ConnectionException('Unexpected response type for server query.');
228
            }
229
        } catch (\Exception $e) {
230
            throw new ConnectionException($e->getMessage(), $e->getCode(), $e);
231
        }
232
233 2
        return $response;
234
    }
235
236
    /**
237
     * @inheritdoc
238
     * @throws \Exception
239
     */
240 1
    public function stopQuery(int $token): ResponseInterface
241
    {
242 1
        $message = (new Message())->setQuery(
243 1
            [QueryType::STOP]
244
        );
245
246 1
        $this->writeQuery($token, $message);
247
248 1
        $response = $this->receiveResponse($token, $message);
249
250 1
        unset($this->activeTokens[$token]);
251
252 1
        return $response;
253
    }
254
255
    /**
256
     * @inheritdoc
257
     */
258 14
    public function use(string $name): void
259
    {
260 14
        $this->dbName = $name;
261 14
    }
262
263
    /**
264
     * @inheritdoc
265
     * @throws \Exception
266
     */
267 26
    public function writeQuery(int $token, MessageInterface $message): int
268
    {
269 26
        if ($this->dbName) {
270 26
            $message->setOptions((new QueryOptions())->setDb($this->dbName));
271
        }
272
273
        try {
274 26
            $request = $this->querySerializer->serialize($message, 'json');
275
        } catch (\Exception $e) {
276
            throw new Exception('Serializing query message failed.', $e->getCode(), $e);
277
        }
278
279 26
        $requestSize = pack('V', \strlen($request));
280 26
        $binaryToken = pack('V', $token) . pack('V', 0);
281
282 26
        return $this->stream->write($binaryToken . $requestSize . $request);
283
    }
284
285
    /**
286
     * @inheritdoc
287
     * @throws ConnectionException
288
     * @throws \Exception
289
     */
290 1
    public function noreplyWait(): void
291
    {
292
        try {
293 1
            $token = $this->generateToken();
294
295 1
            $query = new Message(QueryType::NOREPLY_WAIT);
296 1
            $this->writeQuery($token, $query);
297
298 1
            $response = $this->receiveResponse($token, $query);
299
300 1
            if ($response->getType() !== 4) {
301 1
                throw new ConnectionException('Unexpected response type for noreplyWait query.');
302
            }
303
        } catch (\Exception $e) {
304
            throw new ConnectionException($e->getMessage(), $e->getCode(), $e);
305
        }
306 1
    }
307
308
    /**
309
     * @param ResponseInterface $response
310
     * @param int $token
311
     * @param MessageInterface $message
312
     * @return Iterable
313
     */
314 3
    private function createCursorFromResponse(
315
        ResponseInterface $response,
316
        int $token,
317
        MessageInterface $message
318
    ): Iterable {
319 3
        return new Cursor($this, $token, $response, $message);
320
    }
321
322
    /**
323
     * @return int
324
     * @throws \Exception
325
     */
326 23
    private function generateToken(): int
327
    {
328
        try {
329 23
            $tries = 0;
330 23
            $maxToken = 1 << 30;
331
            do {
332 23
                $token = random_int(0, $maxToken);
333 23
                $haveCollision = isset($this->activeTokens[$token]);
334 23
            } while ($haveCollision && $tries++ < 1024);
335 23
            if ($haveCollision) {
336 23
                throw new \Exception('Unable to generate a unique token for the query.');
337
            }
338
        } catch (\Exception $e) {
339
            throw new ConnectionException('Generating the token failed.', $e->getCode(), $e);
340
        }
341
342 23
        return $token;
343
    }
344
345
    /**
346
     * @param int $token
347
     * @param MessageInterface $message
348
     * @return ResponseInterface
349
     * @throws \RuntimeException
350
     * @throws ConnectionException
351
     */
352 24
    private function receiveResponse(int $token, MessageInterface $message): ResponseInterface
353
    {
354 24
        $responseHeader = $this->stream->read(4 + 8);
355 24
        if (empty($responseHeader)) {
356
            throw new ConnectionException('Empty response headers received from server.');
357
        }
358
359 24
        $responseHeader = unpack('Vtoken/Vtoken2/Vsize', $responseHeader);
360 24
        $responseToken = $responseHeader['token'];
361 24
        if ($responseHeader['token2'] !== 0) {
362
            throw new ConnectionException('Invalid response from server: Invalid token.');
363
        }
364
365 24
        $responseSize = $responseHeader['size'];
366 24
        $responseBuf = $this->stream->read($responseSize);
367
368
        /** @var ResponseInterface $response */
369 24
        $response = $this->responseSerializer->deserialize($responseBuf, Response::class, 'json');
370 24
        $this->validateResponse($response, $responseToken, $token, $message);
371
372 24
        return $response;
373
    }
374
375
    /**
376
     * @param ResponseInterface $response
377
     * @param int $responseToken
378
     * @param int $token
379
     * @param MessageInterface $message
380
     * @throws ConnectionException
381
     */
382 24
    private function validateResponse(
383
        ResponseInterface $response,
384
        int $responseToken,
385
        int $token,
386
        MessageInterface $message
387
    ): void {
388 24
        if (!$response->getType()) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $response->getType() of type null|integer is loosely compared to false; this is ambiguous if the integer can be 0. You might want to explicitly use === null instead.

In PHP, under loose comparison (like ==, or !=, or switch conditions), values of different types might be equal.

For integer values, zero is a special case, in particular the following results might be unexpected:

0   == false // true
0   == null  // true
123 == false // false
123 == null  // false

// It is often better to use strict comparison
0 === false // false
0 === null  // false
Loading history...
389
            throw new ConnectionException('Response message has no type.');
390
        }
391
392 24
        if ($response->getType() === ResponseType::CLIENT_ERROR) {
393
            throw new ConnectionException('Client error: ' . $response->getData()[0] . ' jsonQuery: ' . json_encode($message));
394
        }
395
396 24
        if ($responseToken !== $token) {
397
            throw new ConnectionException(
398
                'Received wrong token. Response does not match the request. '
399
                . 'Expected ' . $token . ', received ' . $responseToken
400
            );
401
        }
402
403 24
        if ($response->getType() === ResponseType::COMPILE_ERROR) {
404
            throw new ConnectionException('Compile error: ' . $response->getData()[0] . ', jsonQuery: ' . json_encode($message));
405
        }
406
407 24
        if ($response->getType() === ResponseType::RUNTIME_ERROR) {
408
            throw new ConnectionException('Runtime error: ' . $response->getData()[0] . ', jsonQuery: ' . json_encode($message));
409
        }
410 24
    }
411
}
412