Passed
Pull Request — master (#9)
by Timon
09:13
created

Connection::writeQuery()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 17
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 2.0625

Importance

Changes 0
Metric Value
dl 0
loc 17
ccs 6
cts 8
cp 0.75
rs 9.4285
c 0
b 0
f 0
cc 2
eloc 9
nc 2
nop 2
crap 2.0625
1
<?php
2
declare(strict_types=1);
3
4
namespace TBolier\RethinkQL\Connection;
5
6
use Psr\Http\Message\StreamInterface;
7
use Symfony\Component\Serializer\SerializerInterface;
8
use TBolier\RethinkQL\Connection\Socket\Exception;
9
use TBolier\RethinkQL\Connection\Socket\HandshakeInterface;
10
use TBolier\RethinkQL\Query\Cursor;
11
use TBolier\RethinkQL\Query\Expr;
12
use TBolier\RethinkQL\Query\Message;
13
use TBolier\RethinkQL\Query\MessageInterface;
14
use TBolier\RethinkQL\Query\Options as QueryOptions;
15
use TBolier\RethinkQL\Response\Response;
16
use TBolier\RethinkQL\Response\ResponseInterface;
17
use TBolier\RethinkQL\Types\Query\QueryType;
18
use TBolier\RethinkQL\Types\Response\ResponseType;
19
20
class Connection implements ConnectionInterface
21
{
22
    /**
23
     * @var int[]
24
     */
25
    private $activeTokens;
26
27
    /**
28
     * @var StreamInterface
29
     */
30
    private $stream;
31
32
    /**
33
     * @var string
34
     */
35
    private $dbName;
36
37
    /**
38
     * @var bool
39
     */
40
    private $noReply = false;
41
42
    /**
43
     * @var \Closure
44
     */
45
    private $streamWrapper;
46
47
    /**
48
     * @var HandshakeInterface
49
     */
50
    private $handshake;
51
52
    /**
53
     * @var SerializerInterface
54
     */
55
    private $querySerializer;
56
57
    /**
58
     * @var SerializerInterface
59
     */
60
    private $responseSerializer;
61
62
    /**
63
     * @param \Closure $streamWrapper
64
     * @param HandshakeInterface $handshake
65
     * @param string $dbName
66
     * @param SerializerInterface $querySerializer
67
     */
68 13
    public function __construct(
69
        \Closure $streamWrapper,
70
        HandshakeInterface $handshake,
71
        string $dbName,
72
        SerializerInterface $querySerializer,
73
        SerializerInterface $responseSerializer
74
    ) {
75 13
        $this->streamWrapper = $streamWrapper;
76 13
        $this->dbName = $dbName;
77 13
        $this->handshake = $handshake;
78 13
        $this->querySerializer = $querySerializer;
79 13
        $this->responseSerializer = $responseSerializer;
80 13
    }
81
82
    /**
83
     * @inheritdoc
84
     * @throws ConnectionException
85
     */
86 13
    public function connect(): self
87
    {
88 13
        if ($this->isStreamOpen()) {
89 11
            return $this;
90
        }
91
92
        try {
93 13
            $this->stream = ($this->streamWrapper)();
94 13
            $this->handshake->hello($this->stream);
95
        } catch (\Exception $e) {
96
            throw new ConnectionException($e->getMessage(), $e->getCode(), $e);
97
        }
98
99 13
        return $this;
100
    }
101
102
    /**
103
     * @param bool $noReplyWait
104
     * @throws ConnectionException
105
     * @throws \Exception
106
     */
107
    public function close($noReplyWait = true): void
108
    {
109
        if (!$this->isStreamOpen()) {
110
            throw new ConnectionException('Not connected.');
111
        }
112
113
        if ($noReplyWait) {
114
            $this->noReplyWait();
115
        }
116
117
        $this->stream->close();
118
    }
119
120
    /**
121
     * @throws ConnectionException
122
     * @throws \Exception
123
     */
124
    private function noReplyWait(): void
125
    {
126
        if (!$this->isStreamOpen()) {
127
            throw new ConnectionException('Not connected.');
128
        }
129
130
        try {
131
            $token = $this->generateToken();
132
133
            $query = new Message(QueryType::NOREPLY_WAIT);
134
            $this->writeQuery($token, $query);
135
136
            // Await the response
137
            $response = $this->receiveResponse($token, $query);
138
139
            if ($response->getType() !== 4) {
140
                throw new ConnectionException('Unexpected response type for noreplyWait query.');
141
            }
142
        } catch (\Exception $e) {
143
            throw new ConnectionException($e->getMessage(), $e->getCode(), $e);
144
        }
145
    }
146
147
    /**
148
     * @param MessageInterface $message
149
     * @return array
150
     * @throws ConnectionException
151
     */
152 12
    public function run(MessageInterface $message)
153
    {
154 12
        if (!$this->isStreamOpen()) {
155
            throw new ConnectionException('Not connected.');
156
        }
157
158
        try {
159 12
            $token = $this->generateToken();
160
161 12
            $this->writeQuery($token, $message);
162
163 12
            if ($this->noReply) {
164
                return [];
0 ignored issues
show
Bug Best Practice introduced by
The return type of return array(); (array) is incompatible with the return type declared by the interface TBolier\RethinkQL\Connec...onnectionInterface::run of type TBolier\RethinkQL\Response\ResponseInterface.

If you return a value from a function or method, it should be a sub-type of the type that is given by the parent type f.e. an interface, or abstract method. This is more formally defined by the Lizkov substitution principle, and guarantees that classes that depend on the parent type can use any instance of a child type interchangably. This principle also belongs to the SOLID principles for object oriented design.

Let’s take a look at an example:

class Author {
    private $name;

    public function __construct($name) {
        $this->name = $name;
    }

    public function getName() {
        return $this->name;
    }
}

abstract class Post {
    public function getAuthor() {
        return 'Johannes';
    }
}

class BlogPost extends Post {
    public function getAuthor() {
        return new Author('Johannes');
    }
}

class ForumPost extends Post { /* ... */ }

function my_function(Post $post) {
    echo strtoupper($post->getAuthor());
}

Our function my_function expects a Post object, and outputs the author of the post. The base class Post returns a simple string and outputting a simple string will work just fine. However, the child class BlogPost which is a sub-type of Post instead decided to return an object, and is therefore violating the SOLID principles. If a BlogPost were passed to my_function, PHP would not complain, but ultimately fail when executing the strtoupper call in its body.

Loading history...
165
            }
166
167
            // Await the response
168 12
            $response = $this->receiveResponse($token, $message);
169
170 12
            if ($response->getType() === ResponseType::SUCCESS_PARTIAL) {
171
                $this->activeTokens[$token] = true;
172
            }
173
174 12
            if ($response->getType() === ResponseType::SUCCESS_ATOM) {
175 12
                return $response;
176
            } else {
177
                // @TODO: create cursor with response.
178 1
                return $response;
179
            }
180
        } catch (\Exception $e) {
181
            throw new ConnectionException($e->getMessage(), $e->getCode(), $e);
182
        }
183
    }
184
185
    /**
186
     * @param MessageInterface $query
187
     * @return array
188
     * @throws ConnectionException
189
     */
190
    public function runNoReply(MessageInterface $query): array
191
    {
192
        $this->noReply = true;
193
        $this->run($query);
194
        $this->noReply = false;
195
    }
196
197
    /**
198
     * @return int
199
     * @throws \Exception
200
     */
201 13
    private function generateToken(): int
202
    {
203
        try {
204 13
            $tries = 0;
205 13
            $maxToken = 1 << 30;
206
            do {
207 13
                $token = \random_int(0, $maxToken);
208 13
                $haveCollision = isset($this->activeTokens[$token]);
209 13
            } while ($haveCollision && $tries++ < 1024);
210 13
            if ($haveCollision) {
211 13
                throw new \Exception('Unable to generate a unique token for the query.');
212
            }
213
        } catch (\Exception $e) {
214
            throw new ConnectionException('Generating the token failed.', $e->getCode(), $e);
215
        }
216
217 13
        return $token;
218
    }
219
220
    /**
221
     * @param int $token
222
     * @param MessageInterface $message
223
     * @return int
224
     * @throws \Exception
225
     */
226 13
    private function writeQuery(int $token, MessageInterface $message): int
227
    {
228 13
        $message->setOptions((new QueryOptions())->setDb($this->dbName));
229
230
        try {
231 13
            $request = $this->querySerializer->serialize($message, 'json');
232
        } catch (\Exception $e) {
233
            throw new Exception('Serializing query message failed.', $e->getCode(), $e);
234
        }
235
236
        /** [1,[56,[[15,["tabletest"],{}],[98,["{\"documentId\":1,\"title\":\"Test document\",\"description\":\"My first document.\"}"],{}]],{"conflict":[69,[[2,[1,2,3],{}],null],{}]}],{"db":[14,["test"],{}]}] */
237
        /** [1,[56,[[15,["tabletest"]],[98,["{\"documentId\":1,\"title\":\"Test document\",\"description\":\"My first document.\"}"]]]],{"db":[14,["test"]]}] $requestSize */
238 13
        $requestSize = pack('V', \strlen($request));
239 13
        $binaryToken = pack('V', $token) . pack('V', 0);
240
241 13
        return $this->stream->write($binaryToken . $requestSize . $request);
242
    }
243
244
    /**
245
     * @param int $token
246
     * @param MessageInterface $message
247
     * @return array
248
     * @throws \RuntimeException
249
     * @throws ConnectionException
250
     */
251 13
    private function receiveResponse(int $token, MessageInterface $message): ResponseInterface
252
    {
253 13
        $responseHeader = $this->stream->read(4 + 8);
254 13
        $responseHeader = unpack('Vtoken/Vtoken2/Vsize', $responseHeader);
255 13
        $responseToken = $responseHeader['token'];
256 13
        if ($responseHeader['token2'] !== 0) {
257
            throw new ConnectionException('Invalid response from server: Invalid token.');
258
        }
259
260 13
        $responseSize = $responseHeader['size'];
261 13
        $responseBuf = $this->stream->read($responseSize);
262
263 13
        $response = $this->responseSerializer->deserialize($responseBuf, Response::class, 'json');
264 13
        $this->validateResponse($response, $responseToken, $token, $message);
265
266 13
        return $response;
267
    }
268
269
    /**
270
     * @param ResponseInterface $response
271
     * @param int $responseToken
272
     * @param int $token
273
     * @param MessageInterface $message
274
     * @throws ConnectionException
275
     */
276 13
    private function validateResponse(ResponseInterface $response, int $responseToken, int $token, MessageInterface $message): void
277
    {
278 13
        if (!$response->getType()) {
279
            throw new ConnectionException('Response message has no type.');
280
        }
281
282 13
        if ($response->getType() === ResponseType::CLIENT_ERROR) {
283
            throw new ConnectionException('Server says PHP-RQL is buggy: ' . $response->getData()[0]);
284
        }
285
286 13
        if ($responseToken !== $token) {
287
            throw new ConnectionException(
288
                'Received wrong token. Response does not match the request. '
289
                . 'Expected ' . $token . ', received ' . $responseToken
290
            );
291
        }
292
293 13
        if ($response->getType() === ResponseType::COMPILE_ERROR) {
294
            throw new ConnectionException('Compile error: ' . $response->getData()[0] . ', jsonQuery: ' . json_encode($message));
295
        }
296
297 13
        if ($response->getType() === ResponseType::RUNTIME_ERROR) {
298
            throw new ConnectionException('Runtime error: ' . $response->getData()[0] . ', jsonQuery: ' . json_encode($message));
299
        }
300 13
    }
301
302
    /**
303
     * @inheritdoc
304
     */
305 11
    public function use(string $name): void
306
    {
307 11
        $this->dbName = $name;
308 11
    }
309
310
    /**
311
     * @inheritdoc
312
     */
313 13
    public function isStreamOpen(): bool
314
    {
315 13
        return ($this->stream !== null && $this->stream->isWritable());
316
    }
317
318
    /**
319
     * @param MessageInterface $query
320
     * @return array
321
     */
322
    public function changes(MessageInterface $query): array
323
    {
324
        // TODO: Implement changes() method.
325
    }
326
327
    /**
328
     * @return ResponseInterface
329
     * @throws \Exception
330
     */
331 1
    public function server(): ResponseInterface
332
    {
333 1
        if (!$this->isStreamOpen()) {
334
            throw new ConnectionException('Not connected.');
335
        }
336
337
        try {
338 1
            $token = $this->generateToken();
339
340 1
            $query = new Message(QueryType::SERVER_INFO);
341 1
            $this->writeQuery($token, $query);
342
343
            // Await the response
344 1
            $response = $this->receiveResponse($token, $query);
345
346 1
            if ($response->getType() !== 5) {
347 1
                throw new ConnectionException('Unexpected response type for server query.');
348
            }
349
        } catch (\Exception $e) {
350
            throw new ConnectionException($e->getMessage(), $e->getCode(), $e);
351
        }
352
353 1
        return $response;
354
    }
355
356
    /**
357
     * @param string $string
358
     * @return ResponseInterface
359
     * @throws ConnectionException
360
     */
361 1
    public function expr(string $string): ResponseInterface
362
    {
363 1
        $message = new Message();
364 1
        $message->setQueryType(QueryType::START)
365 1
            ->setQuery(new Expr($string));
366
367 1
        return $this->run($message);
368
    }
369
}
370