Passed
Pull Request — master (#11)
by Michel
02:36
created

Connection::writeQuery()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 15
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 8
CRAP Score 2

Importance

Changes 0
Metric Value
cc 2
eloc 9
nc 2
nop 2
dl 0
loc 15
rs 9.4285
c 0
b 0
f 0
ccs 8
cts 8
cp 1
crap 2
1
<?php
2
declare(strict_types=1);
3
4
namespace TBolier\RethinkQL\Connection;
5
6
use Psr\Http\Message\StreamInterface;
7
use Symfony\Component\Serializer\SerializerInterface;
8
use TBolier\RethinkQL\Connection\Socket\Exception;
9
use TBolier\RethinkQL\Connection\Socket\HandshakeInterface;
10
use TBolier\RethinkQL\Query\Expr;
11
use TBolier\RethinkQL\Query\Message;
12
use TBolier\RethinkQL\Query\MessageInterface;
13
use TBolier\RethinkQL\Query\Options as QueryOptions;
14
use TBolier\RethinkQL\Response\Response;
15
use TBolier\RethinkQL\Response\ResponseInterface;
16
use TBolier\RethinkQL\Types\Query\QueryType;
17
use TBolier\RethinkQL\Types\Response\ResponseType;
18
19
class Connection implements ConnectionInterface
20
{
21
    /**
22
     * @var int[]
23
     */
24
    private $activeTokens;
25
26
    /**
27
     * @var StreamInterface
28
     */
29
    private $stream;
30
31
    /**
32
     * @var string
33
     */
34
    private $dbName;
35
36
    /**
37
     * @var bool
38
     */
39
    private $noReply = false;
40
41
    /**
42
     * @var \Closure
43
     */
44
    private $streamWrapper;
45
46
    /**
47
     * @var HandshakeInterface
48
     */
49
    private $handshake;
50
51
    /**
52
     * @var SerializerInterface
53
     */
54
    private $querySerializer;
55
56
    /**
57
     * @var SerializerInterface
58
     */
59
    private $responseSerializer;
60
61
    /**
62
     * @param \Closure $streamWrapper
63
     * @param HandshakeInterface $handshake
64
     * @param string $dbName
65
     * @param SerializerInterface $querySerializer
66
     * @param SerializerInterface $responseSerializer
67
     */
68 35
    public function __construct(
69
        \Closure $streamWrapper,
70
        HandshakeInterface $handshake,
71
        string $dbName,
72
        SerializerInterface $querySerializer,
73
        SerializerInterface $responseSerializer
74
    ) {
75 35
        $this->streamWrapper = $streamWrapper;
76 35
        $this->dbName = $dbName;
77 35
        $this->handshake = $handshake;
78 35
        $this->querySerializer = $querySerializer;
79 35
        $this->responseSerializer = $responseSerializer;
80 35
    }
81
82
    /**
83
     * @inheritdoc
84
     * @throws ConnectionException
85
     */
86 29
    public function connect(): self
87
    {
88 29
        if ($this->isStreamOpen()) {
89 12
            return $this;
90
        }
91
92
        try {
93 29
            $this->stream = ($this->streamWrapper)();
94 29
            $this->handshake->hello($this->stream);
95 1
        } catch (\Exception $e) {
96 1
            throw new ConnectionException($e->getMessage(), $e->getCode(), $e);
97
        }
98
99 28
        return $this;
100
    }
101
102
    /**
103
     * @param bool $noReplyWait
104
     * @throws ConnectionException
105
     * @throws \Exception
106
     */
107 6
    public function close($noReplyWait = true): void
108
    {
109 6
        if (!$this->isStreamOpen()) {
110 1
            throw new ConnectionException('Not connected.');
111
        }
112
113 5
        if ($noReplyWait) {
114 2
            $this->noReplyWait();
115
        }
116
117 4
        $this->stream->close();
118 4
    }
119
120
    /**
121
     * @throws ConnectionException
122
     * @throws \Exception
123
     */
124 2
    private function noReplyWait(): void
125
    {
126 2
        if (!$this->isStreamOpen()) {
127
            throw new ConnectionException('Not connected.');
128
        }
129
130
        try {
131 2
            $token = $this->generateToken();
132
133 1
            $query = new Message(QueryType::NOREPLY_WAIT);
134 1
            $this->writeQuery($token, $query);
135
136
            // Await the response
137 1
            $response = $this->receiveResponse($token, $query);
138
139 1
            if ($response->getType() !== 4) {
140 1
                throw new ConnectionException('Unexpected response type for noreplyWait query.');
141
            }
142 1
        } catch (\Exception $e) {
143 1
            throw new ConnectionException($e->getMessage(), $e->getCode(), $e);
144
        }
145 1
    }
146
147
    /**
148
     * @param MessageInterface $message
149
     * @return array
150
     * @throws ConnectionException
151
     */
152 23
    public function run(MessageInterface $message)
153
    {
154 23
        if (!$this->isStreamOpen()) {
155 1
            throw new ConnectionException('Not connected.');
156
        }
157
158
        try {
159 22
            $token = $this->generateToken();
160
161 21
            $this->writeQuery($token, $message);
162
163 20
            if ($this->noReply) {
164 1
                return [];
0 ignored issues
show
Bug Best Practice introduced by
The return type of return array(); (array) is incompatible with the return type declared by the interface TBolier\RethinkQL\Connec...onnectionInterface::run of type TBolier\RethinkQL\Response\ResponseInterface.

If you return a value from a function or method, it should be a sub-type of the type that is given by the parent type f.e. an interface, or abstract method. This is more formally defined by the Lizkov substitution principle, and guarantees that classes that depend on the parent type can use any instance of a child type interchangably. This principle also belongs to the SOLID principles for object oriented design.

Let’s take a look at an example:

class Author {
    private $name;

    public function __construct($name) {
        $this->name = $name;
    }

    public function getName() {
        return $this->name;
    }
}

abstract class Post {
    public function getAuthor() {
        return 'Johannes';
    }
}

class BlogPost extends Post {
    public function getAuthor() {
        return new Author('Johannes');
    }
}

class ForumPost extends Post { /* ... */ }

function my_function(Post $post) {
    echo strtoupper($post->getAuthor());
}

Our function my_function expects a Post object, and outputs the author of the post. The base class Post returns a simple string and outputting a simple string will work just fine. However, the child class BlogPost which is a sub-type of Post instead decided to return an object, and is therefore violating the SOLID principles. If a BlogPost were passed to my_function, PHP would not complain, but ultimately fail when executing the strtoupper call in its body.

Loading history...
165
            }
166
167
            // Await the response
168 19
            $response = $this->receiveResponse($token, $message);
169
170 13
            if ($response->getType() === ResponseType::SUCCESS_PARTIAL) {
171 1
                $this->activeTokens[$token] = true;
172
            }
173
174 13
            if ($response->getType() === ResponseType::SUCCESS_ATOM) {
175 12
                return $response;
176
            } else {
177
                // @TODO: create cursor with response.
178 2
                return $response;
179
            }
180 8
        } catch (\Exception $e) {
181 8
            throw new ConnectionException($e->getMessage(), $e->getCode(), $e);
182
        }
183
    }
184
185
    /**
186
     * @param MessageInterface $query
187
     * @return array
188
     * @throws ConnectionException
189
     */
190 1
    public function runNoReply(MessageInterface $query): array
191
    {
192 1
        $this->noReply = true;
193 1
        $result = $this->run($query);
194 1
        $this->noReply = false;
195
196 1
        return $result;
197
    }
198
199
    /**
200
     * @return int
201
     * @throws \Exception
202
     */
203 26
    private function generateToken(): int
204
    {
205
        try {
206 26
            $tries = 0;
207 26
            $maxToken = 1 << 30;
208
            do {
209 26
                $token = random_int(0, $maxToken);
210 24
                $haveCollision = isset($this->activeTokens[$token]);
211 24
            } while ($haveCollision && $tries++ < 1024);
212 24
            if ($haveCollision) {
213 24
                throw new \Exception('Unable to generate a unique token for the query.');
214
            }
215 2
        } catch (\Exception $e) {
216 2
            throw new ConnectionException('Generating the token failed.', $e->getCode(), $e);
217
        }
218
219 24
        return $token;
220
    }
221
222
    /**
223
     * @param int $token
224
     * @param MessageInterface $message
225
     * @return int
226
     * @throws \Exception
227
     */
228 24
    private function writeQuery(int $token, MessageInterface $message): int
229
    {
230 24
        $message->setOptions((new QueryOptions())->setDb($this->dbName));
231
232
        try {
233 24
            $request = $this->querySerializer->serialize($message, 'json');
234 2
        } catch (\Exception $e) {
235 2
            throw new Exception('Serializing query message failed.', $e->getCode(), $e);
236
        }
237
238 22
        $requestSize = pack('V', \strlen($request));
239 22
        $binaryToken = pack('V', $token) . pack('V', 0);
240
241 22
        return $this->stream->write($binaryToken . $requestSize . $request);
242
    }
243
244
    /**
245
     * @param int $token
246
     * @param MessageInterface $message
247
     * @return ResponseInterface
248
     * @throws ConnectionException
249
     */
250 21
    private function receiveResponse(int $token, MessageInterface $message): ResponseInterface
251
    {
252 21
        $responseHeader = $this->stream->read(4 + 8);
253 21
        $responseHeader = unpack('Vtoken/Vtoken2/Vsize', $responseHeader);
254 21
        $responseToken = $responseHeader['token'];
255 21
        if ($responseHeader['token2'] !== 0) {
256 1
            throw new ConnectionException('Invalid response from server: Invalid token.');
257
        }
258
259 20
        $responseSize = $responseHeader['size'];
260 20
        $responseBuf = $this->stream->read($responseSize);
261
262 20
        $response = $this->responseSerializer->deserialize($responseBuf, Response::class, 'json');
263 20
        $this->validateResponse($response, $responseToken, $token, $message);
264
265 15
        return $response;
266
    }
267
268
    /**
269
     * @param ResponseInterface $response
270
     * @param int $responseToken
271
     * @param int $token
272
     * @param MessageInterface $message
273
     * @throws ConnectionException
274
     */
275 20
    private function validateResponse(
276
        ResponseInterface $response,
277
        int $responseToken,
278
        int $token,
279
        MessageInterface $message
280
    ): void {
281 20
        if (!$response->getType()) {
282 1
            throw new ConnectionException('Response message has no type.');
283
        }
284
285 19
        if ($response->getType() === ResponseType::CLIENT_ERROR) {
286 1
            throw new ConnectionException('Server says PHP-RQL is buggy: ' . $response->getData()[0]);
287
        }
288
289 18
        if ($responseToken !== $token) {
290 1
            throw new ConnectionException(
291
                'Received wrong token. Response does not match the request. '
292 1
                . 'Expected ' . $token . ', received ' . $responseToken
293
            );
294
        }
295
296 17
        if ($response->getType() === ResponseType::COMPILE_ERROR) {
297 1
            throw new ConnectionException('Compile error: ' . $response->getData()[0] . ', jsonQuery: ' . json_encode($message));
298
        }
299
300 16
        if ($response->getType() === ResponseType::RUNTIME_ERROR) {
301 1
            throw new ConnectionException('Runtime error: ' . $response->getData()[0] . ', jsonQuery: ' . json_encode($message));
302
        }
303 15
    }
304
305
    /**
306
     * @inheritdoc
307
     */
308 12
    public function use(string $name): void
309
    {
310 12
        $this->dbName = $name;
311 12
    }
312
313
    /**
314
     * @inheritdoc
315
     */
316 32
    public function isStreamOpen(): bool
317
    {
318 32
        return ($this->stream !== null && $this->stream->isWritable());
319
    }
320
321
    /**
322
     * @param MessageInterface $query
323
     * @return array
324
     */
325
    public function changes(MessageInterface $query): array
326
    {
327
        // TODO: Implement changes() method.
328
    }
329
330
    /**
331
     * @return ResponseInterface
332
     * @throws \Exception
333
     */
334 3
    public function server(): ResponseInterface
335
    {
336 3
        if (!$this->isStreamOpen()) {
337 1
            throw new ConnectionException('Not connected.');
338
        }
339
340
        try {
341 2
            $token = $this->generateToken();
342
343 2
            $query = new Message(QueryType::SERVER_INFO);
344 2
            $this->writeQuery($token, $query);
345
346
            // Await the response
347 1
            $response = $this->receiveResponse($token, $query);
348
349 1
            if ($response->getType() !== 5) {
350 1
                throw new ConnectionException('Unexpected response type for server query.');
351
            }
352 1
        } catch (\Exception $e) {
353 1
            throw new ConnectionException($e->getMessage(), $e->getCode(), $e);
354
        }
355
356 1
        return $response;
357
    }
358
359
    /**
360
     * @param string $string
361
     * @return ResponseInterface
362
     * @throws ConnectionException
363
     */
364 1
    public function expr(string $string): ResponseInterface
365
    {
366 1
        $message = new Message();
367 1
        $message->setQueryType(QueryType::START)
368 1
            ->setQuery(new Expr($string));
369
370 1
        return $this->run($message);
371
    }
372
}
373