Passed
Branch master (ae31ea)
by Timon
04:39 queued 01:38
created

Connection::writeQuery()   C

Complexity

Conditions 8
Paths 8

Size

Total Lines 39
Code Lines 28

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 18
CRAP Score 9.8645

Importance

Changes 0
Metric Value
dl 0
loc 39
ccs 18
cts 26
cp 0.6923
rs 5.3846
c 0
b 0
f 0
cc 8
eloc 28
nc 8
nop 2
crap 9.8645
1
<?php
2
declare(strict_types=1);
3
4
namespace TBolier\RethinkQL\Connection;
5
6
use Psr\Http\Message\StreamInterface;
7
use TBolier\RethinkQL\Connection\Socket\HandshakeInterface;
8
use TBolier\RethinkQL\Connection\Socket\StreamHandlerInterface;
9
use TBolier\RethinkQL\Query\Expr;
10
use TBolier\RethinkQL\Query\Message;
11
use TBolier\RethinkQL\Query\MessageInterface;
12
use TBolier\RethinkQL\Query\Query;
13
use TBolier\RethinkQL\Types\Query\QueryType;
14
use TBolier\RethinkQL\Types\Response\ResponseType;
15
use TBolier\RethinkQL\Types\Term\TermType;
16
17
class Connection implements ConnectionInterface
18
{
19
    /**
20
     * @var OptionsInterface
21
     */
22
    private $options;
23
24
    /**
25
     * @var int[]
26
     */
27
    private $activeTokens;
28
29
    /**
30
     * @var StreamInterface
31
     */
32
    private $stream;
33
34
    /**
35
     * @var string
36
     */
37
    private $dbName;
38
39
    /**
40
     * @var bool
41
     */
42
    private $noReply = false;
43
44
    /**
45
     * @var \Closure
46
     */
47
    private $streamWrapper;
48
49
    /**
50
     * @var HandshakeInterface
51
     */
52
    private $handshake;
53
54
    /**
55
     * @param \Closure $streamWrapper
56
     * @param HandshakeInterface $handshake
57
     * @param string $dbName
58
     */
59 13
    public function __construct(\Closure $streamWrapper, HandshakeInterface $handshake, string $dbName)
60
    {
61 13
        $this->streamWrapper = $streamWrapper;
62 13
        $this->dbName = $dbName;
63 13
        $this->handshake = $handshake;
64 13
    }
65
66
    /**
67
     * @inheritdoc
68
     * @throws ConnectionException
69
     */
70 13
    public function connect(): self
71
    {
72 13
        if ($this->isStreamOpen()) {
73 11
            return $this;
74
        }
75
76
        try {
77 13
            $this->stream = ($this->streamWrapper)();
78 13
            $this->handshake->hello($this->stream);
79
        } catch (\Exception $e) {
80
            throw new ConnectionException($e->getMessage(), $e->getCode(), $e);
81
        }
82
83 13
        return $this;
84
    }
85
86
    /**
87
     * @param bool $noReplyWait
88
     * @throws ConnectionException
89
     * @throws \Exception
90
     */
91
    public function close($noReplyWait = true): void
92
    {
93
        if (!$this->isStreamOpen()) {
94
            throw new ConnectionException('Not connected.');
95
        }
96
97
        if ($noReplyWait) {
98
            $this->noReplyWait();
99
        }
100
101
        $this->stream->close();
102
    }
103
104
    /**
105
     * @throws ConnectionException
106
     * @throws \Exception
107
     */
108
    private function noReplyWait(): void
109
    {
110
        if (!$this->isStreamOpen()) {
111
            throw new ConnectionException('Not connected.');
112
        }
113
114
        try {
115
            $token = $this->generateToken();
116
117
            $query = new Message(QueryType::NOREPLY_WAIT);
118
            $this->writeQuery($token, $query);
119
120
            // Await the response
121
            $response = $this->receiveResponse($token, $query);
122
123
            if ($response['t'] !== 4) {
124
                throw new ConnectionException('Unexpected response type for noreplyWait query.');
125
            }
126
        } catch (\Exception $e) {
127
            throw new ConnectionException($e->getMessage(), $e->getCode(), $e);
128
        }
129
    }
130
131
    /**
132
     * @param MessageInterface $message
133
     * @return array
134
     * @throws ConnectionException
135
     */
136 12
    public function run(MessageInterface $message): array
137
    {
138 12
        if (!$this->isStreamOpen()) {
139
            throw new ConnectionException('Not connected.');
140
        }
141
142
        try {
143 12
            $token = $this->generateToken();
144
145 12
            if ($message instanceof Query) {
146
                $message->setQuery($this->utf8Converter($message->getQuery()));
147
            }
148
149 12
            $this->writeQuery($token, $message);
150
151 12
            if ($this->noReply) {
152
                return [];
153
            }
154
155
            // Await the response
156 12
            $response = $this->receiveResponse($token, $message);
157
158
            // Todo: support all response types, and decide what the return type should be.
159 12
            if ($response['t'] === ResponseType::SUCCESS_PARTIAL) {
160
                $this->activeTokens[$token] = true;
161
            }
162
163
164 12
            return $response['r'];
165
        } catch (\Exception $e) {
166
            throw new ConnectionException($e->getMessage(), $e->getCode(), $e);
167
        }
168
    }
169
170
    /**
171
     * @param MessageInterface $query
172
     * @return array
173
     * @throws ConnectionException
174
     */
175
    public function runNoReply(MessageInterface $query): array
176
    {
177
        $this->noReply = true;
178
        $this->run($query);
179
        $this->noReply = false;
180
    }
181
182
    /**
183
     * @param MessageInterface $message
184
     * @return mixed
185
     */
186
    private function utf8Converter(MessageInterface $message): MessageInterface
187
    {
188
        if (null !== $message->getQuery()) {
189
            return $message;
190
        }
191
192
        array_walk_recursive($message->getQuery(), function (&$item) {
0 ignored issues
show
Bug introduced by
$message->getQuery() cannot be passed to array_walk_recursive() as the parameter $input expects a reference.
Loading history...
193
            if (is_scalar($item) && !mb_detect_encoding((string)$item, 'utf-8', true)) {
194
                $item = utf8_encode($item);
195
            }
196
        });
197
198
        return $message;
199
    }
200
201
    /**
202
     * @return int
203
     * @throws \Exception
204
     */
205 13
    private function generateToken(): int
206
    {
207
        try {
208 13
            $tries = 0;
209 13
            $maxToken = 1 << 30;
210
            do {
211 13
                $token = \random_int(0, $maxToken);
212 13
                $haveCollision = isset($this->activeTokens[$token]);
213 13
            } while ($haveCollision && $tries++ < 1024);
214 13
            if ($haveCollision) {
215 13
                throw new \Exception('Unable to generate a unique token for the query.');
216
            }
217
        } catch (\Exception $e) {
218
            throw new ConnectionException('Generating the token failed.', $e->getCode(), $e);
219
        }
220
221 13
        return $token;
222
    }
223
224
    /**
225
     * @param int $token
226
     * @param MessageInterface $message
227
     * @return int
228
     * @throws \Exception
229
     */
230 13
    private function writeQuery(int $token, MessageInterface $message): int
231
    {
232 13
        $message->setOptions([
233
            'db' => [
234 13
                TermType::DB,
235 13
                [$this->dbName],
236 13
                (object)[],
237
            ],
238
        ]);
239
240 13
        $request = json_encode($message);
241
242 13
        switch (json_last_error()) {
243 13
            case JSON_ERROR_DEPTH:
244
                throw new ConnectionException('JSON error: Maximum stack depth exceeded');
245 13
            case JSON_ERROR_STATE_MISMATCH:
246
                throw new ConnectionException('JSON error: Underflow or the modes mismatch');
247 13
            case JSON_ERROR_CTRL_CHAR:
248
                throw new ConnectionException('JSON error: Unexpected control character found');
249 13
            case JSON_ERROR_SYNTAX:
250
                throw new ConnectionException('JSON error: Syntax error, malformed JSON.');
251 13
            case JSON_ERROR_UTF8:
252
                throw new ConnectionException('JSON error: Malformed UTF-8 characters, possibly incorrectly encoded.');
253 13
            case JSON_ERROR_NONE:
254 13
                break;
255
            default:
256
                throw new ConnectionException('Failed to encode query as JSON: ' . json_last_error());
257
                break;
0 ignored issues
show
Unused Code introduced by
break; does not seem to be reachable.

This check looks for unreachable code. It uses sophisticated control flow analysis techniques to find statements which will never be executed.

Unreachable code is most often the result of return, die or exit statements that have been added for debug purposes.

function fx() {
    try {
        doSomething();
        return true;
    }
    catch (\Exception $e) {
        return false;
    }

    return false;
}

In the above example, the last return false will never be executed, because a return statement has already been met in every possible execution path.

Loading history...
258
        }
259
260 13
        if ($request === false) {
261
            throw new ConnectionException('Failed to encode query as JSON: ' . json_last_error());
262
        }
263
264 13
        $requestSize = pack('V', \strlen($request));
265 13
        $binaryToken = pack('V', $token) . pack('V', 0);
266
267 13
        return $this->stream->write($binaryToken . $requestSize . $request);
268
    }
269
270
    /**
271
     * @param int $token
272
     * @param MessageInterface $message
273
     * @return array
274
     * @throws \RuntimeException
275
     * @throws ConnectionException
276
     */
277 13
    private function receiveResponse(int $token, MessageInterface $message): array
278
    {
279 13
        $responseHeader = $this->stream->read(4 + 8);
280 13
        $responseHeader = unpack('Vtoken/Vtoken2/Vsize', $responseHeader);
281 13
        $responseToken = $responseHeader['token'];
282 13
        if ($responseHeader['token2'] !== 0) {
283
            throw new ConnectionException('Invalid response from server: Invalid token.');
284
        }
285
286 13
        $responseSize = $responseHeader['size'];
287 13
        $responseBuf = $this->stream->read($responseSize);
288
289 13
        $response = json_decode($responseBuf, true);
290 13
        $this->validateResponse($response, $responseToken, $token, $message);
291
292 13
        return $response;
293
    }
294
295
    /**
296
     * @param array $response
297
     * @param int $responseToken
298
     * @param int $token
299
     * @param MessageInterface $message
300
     * @throws ConnectionException
301
     */
302 13
    private function validateResponse(array $response, int $responseToken, int $token, MessageInterface $message): void
303
    {
304 13
        if (isset($response['error'])) {
305
            throw new ConnectionException($response['error'], $response['error_code']);
306
        }
307
308 13
        if (!isset($response['t'])) {
309
            throw new ConnectionException('Response message has no type.');
310
        }
311
312 13
        if ($response['t'] === ResponseType::CLIENT_ERROR) {
313
            throw new ConnectionException('Server says PHP-RQL is buggy: ' . $response['r'][0]);
314
        }
315
316 13
        if ($responseToken !== $token) {
317
            throw new ConnectionException(
318
                'Received wrong token. Response does not match the request. '
319
                . 'Expected ' . $token . ', received ' . $responseToken
320
            );
321
        }
322
323 13
        if ($response['t'] === ResponseType::COMPILE_ERROR) {
324
            throw new ConnectionException('Compile error: ' . $response['r'][0] . ', jsonQuery: ' . json_encode($message));
325
        }
326
327 13
        if ($response['t'] === ResponseType::RUNTIME_ERROR) {
328
            throw new ConnectionException('Runtime error: ' . $response['r'][0] . ', jsonQuery: ' . json_encode($message));
329
        }
330 13
    }
331
332
    /**
333
     * @inheritdoc
334
     */
335 11
    public function use(string $name): void
336
    {
337 11
        $this->dbName = $name;
338 11
    }
339
340
    /**
341
     * @inheritdoc
342
     */
343 13
    public function isStreamOpen(): bool
344
    {
345 13
        return ($this->stream !== null && $this->stream->isWritable());
346
    }
347
348
    /**
349
     * @param MessageInterface $query
350
     * @return array
351
     */
352
    public function changes(MessageInterface $query): array
353
    {
354
        // TODO: Implement changes() method.
355
    }
356
357
    /**
358
     * @return array
359
     * @throws \Exception
360
     */
361 1
    public function server(): array
362
    {
363 1
        if (!$this->isStreamOpen()) {
364
            throw new ConnectionException('Not connected.');
365
        }
366
367
        try {
368 1
            $token = $this->generateToken();
369
370 1
            $query = new Message(QueryType::SERVER_INFO);
371 1
            $this->writeQuery($token, $query);
372
373
            // Await the response
374 1
            $response = $this->receiveResponse($token, $query);
375
376 1
            if ($response['t'] !== 5) {
377 1
                throw new ConnectionException('Unexpected response type for server query.');
378
            }
379
        } catch (\Exception $e) {
380
            throw new ConnectionException($e->getMessage(), $e->getCode(), $e);
381
        }
382
383 1
        return $response;
384
    }
385
386
    /**
387
     * @param string $string
388
     * @return array
389
     * @throws ConnectionException
390
     */
391 1
    public function expr(string $string): array
392
    {
393 1
        $message = new Message();
394 1
        $message->setQueryType(QueryType::START)
395 1
            ->setQuery(new Expr($string));
396
397 1
        return $this->run($message);
398
    }
399
}
400