Connection::continueQuery()   A
last analyzed

Complexity

Conditions 2
Paths 2

Size

Total Lines 13
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 6

Importance

Changes 0
Metric Value
eloc 6
dl 0
loc 13
rs 10
c 0
b 0
f 0
ccs 0
cts 3
cp 0
cc 2
nc 2
nop 1
crap 6
1
<?php
2
declare(strict_types=1);
3
4
namespace TBolier\RethinkQL\Connection;
5
6
use Psr\Http\Message\StreamInterface;
7
use Symfony\Component\Serializer\SerializerInterface;
8
use TBolier\RethinkQL\Connection\Socket\Exception;
9
use TBolier\RethinkQL\Connection\Socket\HandshakeInterface;
10
use TBolier\RethinkQL\Message\ExprMessage;
11
use TBolier\RethinkQL\Message\Message;
12
use TBolier\RethinkQL\Message\MessageInterface;
13
use TBolier\RethinkQL\Query\Options as QueryOptions;
14
use TBolier\RethinkQL\Response\Cursor;
15
use TBolier\RethinkQL\Response\Response;
16
use TBolier\RethinkQL\Response\ResponseInterface;
17
use TBolier\RethinkQL\Types\Query\QueryType;
18
use TBolier\RethinkQL\Types\Response\ResponseType;
19
20
class Connection implements ConnectionInterface, ConnectionCursorInterface
21
{
22
    /**
23
     * @var int[]
24
     */
25
    private $activeTokens;
26
27
    /**
28
     * @var string
29
     */
30
    private $dbName;
31
32
    /**
33
     * @var HandshakeInterface
34
     */
35
    private $handshake;
36
37
    /**
38
     * @var bool
39
     */
40
    private $noReply = false;
41
42
    /**
43
     * @var SerializerInterface
44
     */
45
    private $querySerializer;
46
47
    /**
48
     * @var SerializerInterface
49
     */
50
    private $responseSerializer;
51
52
    /**
53
     * @var StreamInterface
54
     */
55
    private $stream;
56
57
    /**
58
     * @var \Closure
59
     */
60
    private $streamWrapper;
61
62 33
    public function __construct(
63
        \Closure $streamWrapper,
64
        HandshakeInterface $handshake,
65
        string $dbName,
66
        SerializerInterface $querySerializer,
67
        SerializerInterface $responseSerializer
68
    ) {
69 33
        $this->streamWrapper = $streamWrapper;
70 33
        $this->dbName = $dbName;
71 33
        $this->handshake = $handshake;
72 33
        $this->querySerializer = $querySerializer;
73 33
        $this->responseSerializer = $responseSerializer;
74 33
    }
75
76
    /**
77
     * @throws \Exception
78
     */
79
    public function close($noreplyWait = true): void
80 1
    {
81
        if ($noreplyWait) {
82
            $this->noreplyWait();
83 1
        }
84
85
        $this->stream->close();
86
    }
87
88
    /**
89 2
     * @throws ConnectionException
90
     */
91 2
    public function connect(): Connection
92 1
    {
93
        if ($this->stream !== null && $this->stream->isWritable()) {
94
            return $this;
95 2
        }
96 2
97
        try {
98
            $this->stream = ($this->streamWrapper)();
99
            $this->handshake->hello($this->stream);
100
        } catch (\Exception $e) {
101
            throw new ConnectionException($e->getMessage(), $e->getCode(), $e);
102 29
        }
103
104 29
        return $this;
105 14
    }
106
107
    /**
108
     * @throws \Exception
109 29
     */
110 29
    public function reconnect($noreplyWait = true): Connection
111 1
    {
112 1
        $this->close($noreplyWait);
113
114
        return $this->connect();
115 28
    }
116
117
    /**
118
     * @throws \Exception
119
     */
120
    public function continueQuery(int $token): ResponseInterface
121
    {
122
        $message = new Message(QueryType::CONTINUE);
123
        $this->writeQuery($token, $message);
124
125
        // Await the response
126
        $response = $this->receiveResponse($token, $message);
127
128
        if ($response->getType() !== ResponseType::SUCCESS_PARTIAL) {
129
            unset($this->activeTokens[$token]);
130
        }
131
132
        return $response;
133 1
    }
134
135 1
    /**
136 1
     * @throws ConnectionException
137
     */
138
    public function expr(string $string): ResponseInterface
139 1
    {
140
        $response = $this->run(new ExprMessage(QueryType::START, 'foo'));
141
142 1
        if ($response instanceof ResponseInterface) {
143
            return $response;
144 1
        }
145 1
146
        return new Response();
147
    }
148 1
149
    /**
150
     * @throws ConnectionException
151
     */
152
    public function rewindFromCursor(MessageInterface $message): ResponseInterface
153
    {
154
        return $this->run($message, true);
155
    }
156 2
157
    /**
158 2
     * @throws ConnectionException
159
     */
160
    public function run(MessageInterface $message, $raw = false)
161
    {
162
        try {
163
            $token = $this->generateToken();
164
165 1
            $this->writeQuery($token, $message);
166
167 1
            if ($this->noReply) {
168
                return new Response();
169
            }
170
171
            $response = $this->receiveResponse($token, $message);
172
173
            if ($response->getType() === ResponseType::SUCCESS_PARTIAL) {
174 21
                $this->activeTokens[$token] = true;
175
            }
176
177 21
            if ($raw || $response->getType() === ResponseType::SUCCESS_ATOM) {
178
                return $response;
179 21
            }
180
181 21
            return $this->createCursorFromResponse($response, $token, $message);
182 1
        } catch (\Exception $e) {
183
            throw new ConnectionException($e->getMessage(), $e->getCode(), $e);
184
        }
185 20
    }
186
187 20
    /**
188 1
     * @throws ConnectionException
189
     */
190
    public function runNoReply(MessageInterface $query): void
191 20
    {
192 18
        $this->noReply = true;
193
        $this->run($query);
194
        $this->noReply = false;
195 3
    }
196
197
    /**
198
     * @throws \Exception
199
     */
200
    public function server(): ResponseInterface
201
    {
202
        try {
203
            $token = $this->generateToken();
204
205 1
            $message = new Message(QueryType::SERVER_INFO);
206
            $this->writeQuery($token, $message);
207 1
208 1
            $response = $this->receiveResponse($token, $message);
209 1
210 1
            if ($response->getType() !== 5) {
211
                throw new ConnectionException('Unexpected response type for server query.');
212
            }
213
        } catch (\Exception $e) {
214
            throw new ConnectionException($e->getMessage(), $e->getCode(), $e);
215
        }
216 2
217
        return $response;
218
    }
219 2
220
    /**
221 2
     * @throws \Exception
222 2
     */
223
    public function stopQuery(int $token): ResponseInterface
224 2
    {
225
        $message = new Message(QueryType::STOP);
226 2
227 2
        $this->writeQuery($token, $message);
228
229
        $response = $this->receiveResponse($token, $message);
230
231
        unset($this->activeTokens[$token]);
232
233 2
        return $response;
234
    }
235
236
    public function use(string $name): void
237
    {
238
        $this->dbName = $name;
239
    }
240 1
241
    /**
242 1
     * @throws \Exception
243 1
     */
244
    public function writeQuery(int $token, MessageInterface $message): int
245
    {
246 1
        if (!$this->stream) {
247
            throw new ConnectionException('No open stream, please connect first');
248 1
        }
249
250 1
        if ($this->dbName) {
251
            $message->setOptions((new QueryOptions())->setDb($this->dbName));
252 1
        }
253
254
        try {
255
            $request = $this->querySerializer->serialize($message, 'json');
256
        } catch (\Exception $e) {
257
            throw new Exception('Serializing query message failed.', $e->getCode(), $e);
258 15
        }
259
260 15
        $requestSize = pack('V', \strlen($request));
261 15
        $binaryToken = pack('V', $token) . pack('V', 0);
262
263
        return $this->stream->write($binaryToken . $requestSize . $request);
264
    }
265
266
    /**
267 27
     * @throws ConnectionException
268
     * @throws \Exception
269 27
     */
270 27
    public function noreplyWait(): void
271
    {
272
        try {
273
            $token = $this->generateToken();
274 27
275
            $message = new Message(QueryType::NOREPLY_WAIT);
276
            $this->writeQuery($token, $message);
277
278
            $response = $this->receiveResponse($token, $message);
279 27
280 27
            if ($response->getType() !== 4) {
281
                throw new ConnectionException('Unexpected response type for noreplyWait query.');
282 27
            }
283
        } catch (\Exception $e) {
284
            throw new ConnectionException($e->getMessage(), $e->getCode(), $e);
285
        }
286
    }
287
288
    private function createCursorFromResponse(
289
        ResponseInterface $response,
290 1
        int $token,
291
        MessageInterface $message
292
    ): Iterable {
293 1
        return new Cursor($this, $token, $response, $message);
294
    }
295 1
296 1
    /**
297
     * @throws \Exception
298 1
     */
299
    private function generateToken(): int
300 1
    {
301 1
        try {
302
            $tries = 0;
303
            $maxToken = 1 << 30;
304
            do {
305
                $token = random_int(0, $maxToken);
306 1
                $haveCollision = isset($this->activeTokens[$token]);
307
            } while ($haveCollision && $tries++ < 1024);
308
            if ($haveCollision) {
309
                throw new \Exception('Unable to generate a unique token for the query.');
310
            }
311
        } catch (\Exception $e) {
312
            throw new ConnectionException('Generating the token failed.', $e->getCode(), $e);
313
        }
314 3
315
        return $token;
316
    }
317
318
    /**
319 3
     * @throws \RuntimeException
320
     * @throws ConnectionException
321
     */
322
    private function receiveResponse(int $token, MessageInterface $message): ResponseInterface
323
    {
324
        $responseHeader = $this->stream->read(4 + 8);
325
        if (empty($responseHeader)) {
326 24
            throw new ConnectionException('Empty response headers received from server.');
327
        }
328
329 24
        $responseHeader = unpack('Vtoken/Vtoken2/Vsize', $responseHeader);
330 24
        $responseToken = $responseHeader['token'];
331
        if ($responseHeader['token2'] !== 0) {
332 24
            throw new ConnectionException('Invalid response from server: Invalid token.');
333 24
        }
334 24
335 24
        $responseSize = $responseHeader['size'];
336 24
        $responseBuf = $this->stream->read($responseSize);
337
338
        /** @var ResponseInterface $response */
339
        $response = $this->responseSerializer->deserialize($responseBuf, Response::class, 'json');
340
        $this->validateResponse($response, $responseToken, $token, $message);
341
342 24
        return $response;
343
    }
344
345
    /**
346
     * @throws ConnectionException
347
     */
348
    private function validateResponse(
349
        ResponseInterface $response,
350
        int $responseToken,
351
        int $token,
352 25
        MessageInterface $message
353
    ): void {
354 25
        if ($response->getType() === null) {
355 25
            throw new ConnectionException('Response message has no type.');
356
        }
357
358
        if ($response->getType() === ResponseType::CLIENT_ERROR) {
359 25
            throw new ConnectionException(
360 25
                'Client error: ' . $response->getData()[0] . ' jsonQuery: ' . json_encode($message)
361 25
            );
362
        }
363
364
        if ($responseToken !== $token) {
365 25
            throw new ConnectionException(
366 25
                'Received wrong token. Response does not match the request. '
367
                . 'Expected ' . $token . ', received ' . $responseToken
368
            );
369 25
        }
370 25
371
        if ($response->getType() === ResponseType::COMPILE_ERROR) {
372 25
            throw new ConnectionException(
373
                'Compile error: ' . $response->getData()[0] . ', jsonQuery: ' . json_encode($message)
374
            );
375
        }
376
377
        if ($response->getType() === ResponseType::RUNTIME_ERROR) {
378
            throw new ConnectionException(
379
                'Runtime error: ' . $response->getData()[0] . ', jsonQuery: ' . json_encode($message)
380
            );
381
        }
382 25
    }
383
}
384