Passed
Push — master ( ae31ea...e44bbb )
by Michel
02:38
created

Connection::writeQuery()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 15
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 2.0625

Importance

Changes 0
Metric Value
dl 0
loc 15
ccs 6
cts 8
cp 0.75
rs 9.4285
c 0
b 0
f 0
cc 2
eloc 9
nc 2
nop 2
crap 2.0625
1
<?php
2
declare(strict_types=1);
3
4
namespace TBolier\RethinkQL\Connection;
5
6
use Psr\Http\Message\StreamInterface;
7
use Symfony\Component\Serializer\SerializerInterface;
8
use TBolier\RethinkQL\Connection\Socket\Exception;
9
use TBolier\RethinkQL\Connection\Socket\HandshakeInterface;
10
use TBolier\RethinkQL\Query\Expr;
11
use TBolier\RethinkQL\Query\Message;
12
use TBolier\RethinkQL\Query\MessageInterface;
13
use TBolier\RethinkQL\Query\Options as QueryOptions;
14
use TBolier\RethinkQL\Response\Response;
15
use TBolier\RethinkQL\Response\ResponseInterface;
16
use TBolier\RethinkQL\Types\Query\QueryType;
17
use TBolier\RethinkQL\Types\Response\ResponseType;
18
19
class Connection implements ConnectionInterface
20
{
21
    /**
22
     * @var int[]
23
     */
24
    private $activeTokens;
25
26
    /**
27
     * @var StreamInterface
28
     */
29
    private $stream;
30
31
    /**
32
     * @var string
33
     */
34
    private $dbName;
35
36
    /**
37
     * @var bool
38
     */
39
    private $noReply = false;
40
41
    /**
42
     * @var \Closure
43
     */
44
    private $streamWrapper;
45
46
    /**
47
     * @var HandshakeInterface
48
     */
49
    private $handshake;
50
51
    /**
52
     * @var SerializerInterface
53
     */
54
    private $querySerializer;
55
56
    /**
57
     * @var SerializerInterface
58
     */
59
    private $responseSerializer;
60
61
    /**
62
     * @param \Closure $streamWrapper
63
     * @param HandshakeInterface $handshake
64
     * @param string $dbName
65
     * @param SerializerInterface $querySerializer
66
     * @param SerializerInterface $responseSerializer
67
     */
68 13
    public function __construct(
69
        \Closure $streamWrapper,
70
        HandshakeInterface $handshake,
71
        string $dbName,
72
        SerializerInterface $querySerializer,
73
        SerializerInterface $responseSerializer
74
    ) {
75 13
        $this->streamWrapper = $streamWrapper;
76 13
        $this->dbName = $dbName;
77 13
        $this->handshake = $handshake;
78 13
        $this->querySerializer = $querySerializer;
79 13
        $this->responseSerializer = $responseSerializer;
80 13
    }
81
82
    /**
83
     * @inheritdoc
84
     * @throws ConnectionException
85
     */
86 13
    public function connect(): self
87
    {
88 13
        if ($this->isStreamOpen()) {
89 11
            return $this;
90
        }
91
92
        try {
93 13
            $this->stream = ($this->streamWrapper)();
94 13
            $this->handshake->hello($this->stream);
95
        } catch (\Exception $e) {
96
            throw new ConnectionException($e->getMessage(), $e->getCode(), $e);
97
        }
98
99 13
        return $this;
100
    }
101
102
    /**
103
     * @param bool $noReplyWait
104
     * @throws ConnectionException
105
     * @throws \Exception
106
     */
107
    public function close($noReplyWait = true): void
108
    {
109
        if (!$this->isStreamOpen()) {
110
            throw new ConnectionException('Not connected.');
111
        }
112
113
        if ($noReplyWait) {
114
            $this->noReplyWait();
115
        }
116
117
        $this->stream->close();
118
    }
119
120
    /**
121
     * @throws ConnectionException
122
     * @throws \Exception
123
     */
124
    private function noReplyWait(): void
125
    {
126
        if (!$this->isStreamOpen()) {
127
            throw new ConnectionException('Not connected.');
128
        }
129
130
        try {
131
            $token = $this->generateToken();
132
133
            $query = new Message(QueryType::NOREPLY_WAIT);
134
            $this->writeQuery($token, $query);
135
136
            // Await the response
137
            $response = $this->receiveResponse($token, $query);
138
139
            if ($response->getType() !== 4) {
140
                throw new ConnectionException('Unexpected response type for noreplyWait query.');
141
            }
142
        } catch (\Exception $e) {
143
            throw new ConnectionException($e->getMessage(), $e->getCode(), $e);
144
        }
145
    }
146
147
    /**
148
     * @param MessageInterface $message
149
     * @return array
150
     * @throws ConnectionException
151
     */
152 12
    public function run(MessageInterface $message)
153
    {
154 12
        if (!$this->isStreamOpen()) {
155
            throw new ConnectionException('Not connected.');
156
        }
157
158
        try {
159 12
            $token = $this->generateToken();
160
161 12
            $this->writeQuery($token, $message);
162
163 12
            if ($this->noReply) {
164
                return [];
0 ignored issues
show
Bug Best Practice introduced by
The return type of return array(); (array) is incompatible with the return type declared by the interface TBolier\RethinkQL\Connec...onnectionInterface::run of type TBolier\RethinkQL\Response\ResponseInterface.

If you return a value from a function or method, it should be a sub-type of the type that is given by the parent type f.e. an interface, or abstract method. This is more formally defined by the Lizkov substitution principle, and guarantees that classes that depend on the parent type can use any instance of a child type interchangably. This principle also belongs to the SOLID principles for object oriented design.

Let’s take a look at an example:

class Author {
    private $name;

    public function __construct($name) {
        $this->name = $name;
    }

    public function getName() {
        return $this->name;
    }
}

abstract class Post {
    public function getAuthor() {
        return 'Johannes';
    }
}

class BlogPost extends Post {
    public function getAuthor() {
        return new Author('Johannes');
    }
}

class ForumPost extends Post { /* ... */ }

function my_function(Post $post) {
    echo strtoupper($post->getAuthor());
}

Our function my_function expects a Post object, and outputs the author of the post. The base class Post returns a simple string and outputting a simple string will work just fine. However, the child class BlogPost which is a sub-type of Post instead decided to return an object, and is therefore violating the SOLID principles. If a BlogPost were passed to my_function, PHP would not complain, but ultimately fail when executing the strtoupper call in its body.

Loading history...
165
            }
166
167
            // Await the response
168 12
            $response = $this->receiveResponse($token, $message);
169
170 12
            if ($response->getType() === ResponseType::SUCCESS_PARTIAL) {
171
                $this->activeTokens[$token] = true;
172
            }
173
174 12
            if ($response->getType() === ResponseType::SUCCESS_ATOM) {
175 12
                return $response;
176
            } else {
177
                // @TODO: create cursor with response.
178 1
                return $response;
179
            }
180
        } catch (\Exception $e) {
181
            throw new ConnectionException($e->getMessage(), $e->getCode(), $e);
182
        }
183
    }
184
185
    /**
186
     * @param MessageInterface $query
187
     * @return array
188
     * @throws ConnectionException
189
     */
190
    public function runNoReply(MessageInterface $query): array
191
    {
192
        $this->noReply = true;
193
        $this->run($query);
194
        $this->noReply = false;
195
    }
196
197
    /**
198
     * @return int
199
     * @throws \Exception
200
     */
201 13
    private function generateToken(): int
202
    {
203
        try {
204 13
            $tries = 0;
205 13
            $maxToken = 1 << 30;
206
            do {
207 13
                $token = \random_int(0, $maxToken);
208 13
                $haveCollision = isset($this->activeTokens[$token]);
209 13
            } while ($haveCollision && $tries++ < 1024);
210 13
            if ($haveCollision) {
211 13
                throw new \Exception('Unable to generate a unique token for the query.');
212
            }
213
        } catch (\Exception $e) {
214
            throw new ConnectionException('Generating the token failed.', $e->getCode(), $e);
215
        }
216
217 13
        return $token;
218
    }
219
220
    /**
221
     * @param int $token
222
     * @param MessageInterface $message
223
     * @return int
224
     * @throws \Exception
225
     */
226 13
    private function writeQuery(int $token, MessageInterface $message): int
227
    {
228 13
        $message->setOptions((new QueryOptions())->setDb($this->dbName));
229
230
        try {
231 13
            $request = $this->querySerializer->serialize($message, 'json');
232
        } catch (\Exception $e) {
233
            throw new Exception('Serializing query message failed.', $e->getCode(), $e);
234
        }
235
236 13
        $requestSize = pack('V', \strlen($request));
237 13
        $binaryToken = pack('V', $token) . pack('V', 0);
238
239 13
        return $this->stream->write($binaryToken . $requestSize . $request);
240
    }
241
242
    /**
243
     * @param int $token
244
     * @param MessageInterface $message
245
     * @return ResponseInterface
246
     * @throws ConnectionException
247
     */
248 13
    private function receiveResponse(int $token, MessageInterface $message): ResponseInterface
249
    {
250 13
        $responseHeader = $this->stream->read(4 + 8);
251 13
        $responseHeader = unpack('Vtoken/Vtoken2/Vsize', $responseHeader);
252 13
        $responseToken = $responseHeader['token'];
253 13
        if ($responseHeader['token2'] !== 0) {
254
            throw new ConnectionException('Invalid response from server: Invalid token.');
255
        }
256
257 13
        $responseSize = $responseHeader['size'];
258 13
        $responseBuf = $this->stream->read($responseSize);
259
260 13
        $response = $this->responseSerializer->deserialize($responseBuf, Response::class, 'json');
261 13
        $this->validateResponse($response, $responseToken, $token, $message);
262
263 13
        return $response;
264
    }
265
266
    /**
267
     * @param ResponseInterface $response
268
     * @param int $responseToken
269
     * @param int $token
270
     * @param MessageInterface $message
271
     * @throws ConnectionException
272
     */
273 13
    private function validateResponse(
274
        ResponseInterface $response,
275
        int $responseToken,
276
        int $token,
277
        MessageInterface $message
278
    ): void {
279 13
        if (!$response->getType()) {
280
            throw new ConnectionException('Response message has no type.');
281
        }
282
283 13
        if ($response->getType() === ResponseType::CLIENT_ERROR) {
284
            throw new ConnectionException('Server says PHP-RQL is buggy: ' . $response->getData()[0]);
285
        }
286
287 13
        if ($responseToken !== $token) {
288
            throw new ConnectionException(
289
                'Received wrong token. Response does not match the request. '
290
                . 'Expected ' . $token . ', received ' . $responseToken
291
            );
292
        }
293
294 13
        if ($response->getType() === ResponseType::COMPILE_ERROR) {
295
            throw new ConnectionException('Compile error: ' . $response->getData()[0] . ', jsonQuery: ' . json_encode($message));
296
        }
297
298 13
        if ($response->getType() === ResponseType::RUNTIME_ERROR) {
299
            throw new ConnectionException('Runtime error: ' . $response->getData()[0] . ', jsonQuery: ' . json_encode($message));
300
        }
301 13
    }
302
303
    /**
304
     * @inheritdoc
305
     */
306 11
    public function use(string $name): void
307
    {
308 11
        $this->dbName = $name;
309 11
    }
310
311
    /**
312
     * @inheritdoc
313
     */
314 13
    public function isStreamOpen(): bool
315
    {
316 13
        return ($this->stream !== null && $this->stream->isWritable());
317
    }
318
319
    /**
320
     * @param MessageInterface $query
321
     * @return array
322
     */
323
    public function changes(MessageInterface $query): array
324
    {
325
        // TODO: Implement changes() method.
326
    }
327
328
    /**
329
     * @return ResponseInterface
330
     * @throws \Exception
331
     */
332 1
    public function server(): ResponseInterface
333
    {
334 1
        if (!$this->isStreamOpen()) {
335
            throw new ConnectionException('Not connected.');
336
        }
337
338
        try {
339 1
            $token = $this->generateToken();
340
341 1
            $query = new Message(QueryType::SERVER_INFO);
342 1
            $this->writeQuery($token, $query);
343
344
            // Await the response
345 1
            $response = $this->receiveResponse($token, $query);
346
347 1
            if ($response->getType() !== 5) {
348 1
                throw new ConnectionException('Unexpected response type for server query.');
349
            }
350
        } catch (\Exception $e) {
351
            throw new ConnectionException($e->getMessage(), $e->getCode(), $e);
352
        }
353
354 1
        return $response;
355
    }
356
357
    /**
358
     * @param string $string
359
     * @return ResponseInterface
360
     * @throws ConnectionException
361
     */
362 1
    public function expr(string $string): ResponseInterface
363
    {
364 1
        $message = new Message();
365 1
        $message->setQueryType(QueryType::START)
366 1
            ->setQuery(new Expr($string));
367
368 1
        return $this->run($message);
369
    }
370
}
371