Passed
Pull Request — development (#3708)
by Martyn
15:25
created

StreamConnection   F

Complexity

Total Complexity 70

Size/Duplication

Total Lines 332
Duplicated Lines 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
eloc 138
c 1
b 0
f 0
dl 0
loc 332
rs 2.8
wmc 70

12 Methods

Rating   Name   Duplication   Size   Complexity  
B tcpStreamInitializer() 0 25 7
A disconnect() 0 5 2
A createResource() 0 16 6
B tlsStreamInitializer() 0 29 7
A createStreamSocket() 0 18 5
A connect() 0 8 5
A writeRequest() 0 16 2
A assertParameters() 0 15 6
A __destruct() 0 7 3
A write() 0 16 6
C read() 0 66 16
A unixStreamInitializer() 0 21 5

How to fix   Complexity   

Complex Class

Complex classes like StreamConnection often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use StreamConnection, and based on these observations, apply Extract Interface, too.

1
<?php
2
3
/*
4
 * This file is part of the Predis package.
5
 *
6
 * (c) 2009-2020 Daniele Alessandri
7
 * (c) 2021-2023 Till Krüss
8
 *
9
 * For the full copyright and license information, please view the LICENSE
10
 * file that was distributed with this source code.
11
 */
12
13
namespace Predis\Connection;
14
15
use InvalidArgumentException;
16
use Predis\Command\CommandInterface;
17
use Predis\Response\Error as ErrorResponse;
18
use Predis\Response\ErrorInterface as ErrorResponseInterface;
19
use Predis\Response\Status as StatusResponse;
20
21
/**
22
 * Standard connection to Redis servers implemented on top of PHP's streams.
23
 * The connection parameters supported by this class are:.
24
 *
25
 *  - scheme: it can be either 'redis', 'tcp', 'rediss', 'tls' or 'unix'.
26
 *  - host: hostname or IP address of the server.
27
 *  - port: TCP port of the server.
28
 *  - path: path of a UNIX domain socket when scheme is 'unix'.
29
 *  - timeout: timeout to perform the connection (default is 5 seconds).
30
 *  - read_write_timeout: timeout of read / write operations.
31
 *  - async_connect: performs the connection asynchronously.
32
 *  - tcp_nodelay: enables or disables Nagle's algorithm for coalescing.
33
 *  - persistent: the connection is left intact after a GC collection.
34
 *  - ssl: context options array (see http://php.net/manual/en/context.ssl.php)
35
 */
36
class StreamConnection extends AbstractConnection
37
{
38
    /**
39
     * Disconnects from the server and destroys the underlying resource when the
40
     * garbage collector kicks in only if the connection has not been marked as
41
     * persistent.
42
     */
43
    public function __destruct()
44
    {
45
        if (isset($this->parameters->persistent) && $this->parameters->persistent) {
46
            return;
47
        }
48
49
        $this->disconnect();
50
    }
51
52
    /**
53
     * {@inheritdoc}
54
     */
55
    protected function assertParameters(ParametersInterface $parameters)
56
    {
57
        switch ($parameters->scheme) {
58
            case 'tcp':
59
            case 'redis':
60
            case 'unix':
61
            case 'tls':
62
            case 'rediss':
63
                break;
64
65
            default:
66
                throw new InvalidArgumentException("Invalid scheme: '$parameters->scheme'.");
67
        }
68
69
        return $parameters;
70
    }
71
72
    /**
73
     * {@inheritdoc}
74
     */
75
    protected function createResource()
76
    {
77
        switch ($this->parameters->scheme) {
78
            case 'tcp':
79
            case 'redis':
80
                return $this->tcpStreamInitializer($this->parameters);
81
82
            case 'unix':
83
                return $this->unixStreamInitializer($this->parameters);
84
85
            case 'tls':
86
            case 'rediss':
87
                return $this->tlsStreamInitializer($this->parameters);
88
89
            default:
90
                throw new InvalidArgumentException("Invalid scheme: '{$this->parameters->scheme}'.");
91
        }
92
    }
93
94
    /**
95
     * Creates a connected stream socket resource.
96
     *
97
     * @param ParametersInterface $parameters Connection parameters.
98
     * @param string              $address    Address for stream_socket_client().
99
     * @param int                 $flags      Flags for stream_socket_client().
100
     *
101
     * @return resource
102
     */
103
    protected function createStreamSocket(ParametersInterface $parameters, $address, $flags)
104
    {
105
        $timeout = (isset($parameters->timeout) ? (float) $parameters->timeout : 5.0);
106
        $context = stream_context_create(['socket' => ['tcp_nodelay' => (bool) $parameters->tcp_nodelay]]);
107
108
        if (!$resource = @stream_socket_client($address, $errno, $errstr, $timeout, $flags, $context)) {
109
            $this->onConnectionError(trim($errstr), $errno);
110
        }
111
112
        if (isset($parameters->read_write_timeout)) {
113
            $rwtimeout = (float) $parameters->read_write_timeout;
114
            $rwtimeout = $rwtimeout > 0 ? $rwtimeout : -1;
115
            $timeoutSeconds = floor($rwtimeout);
116
            $timeoutUSeconds = ($rwtimeout - $timeoutSeconds) * 1000000;
117
            stream_set_timeout($resource, $timeoutSeconds, $timeoutUSeconds);
0 ignored issues
show
Bug introduced by
$timeoutUSeconds of type double is incompatible with the type integer expected by parameter $microseconds of stream_set_timeout(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

117
            stream_set_timeout($resource, $timeoutSeconds, /** @scrutinizer ignore-type */ $timeoutUSeconds);
Loading history...
Bug introduced by
$timeoutSeconds of type double is incompatible with the type integer expected by parameter $seconds of stream_set_timeout(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

117
            stream_set_timeout($resource, /** @scrutinizer ignore-type */ $timeoutSeconds, $timeoutUSeconds);
Loading history...
118
        }
119
120
        return $resource;
121
    }
122
123
    /**
124
     * Initializes a TCP stream resource.
125
     *
126
     * @param ParametersInterface $parameters Initialization parameters for the connection.
127
     *
128
     * @return resource
129
     */
130
    protected function tcpStreamInitializer(ParametersInterface $parameters)
131
    {
132
        if (!filter_var($parameters->host, FILTER_VALIDATE_IP, FILTER_FLAG_IPV6)) {
133
            $address = "tcp://$parameters->host:$parameters->port";
134
        } else {
135
            $address = "tcp://[$parameters->host]:$parameters->port";
136
        }
137
138
        $flags = STREAM_CLIENT_CONNECT;
139
140
        if (isset($parameters->async_connect) && $parameters->async_connect) {
141
            $flags |= STREAM_CLIENT_ASYNC_CONNECT;
142
        }
143
144
        if (isset($parameters->persistent)) {
145
            if (false !== $persistent = filter_var($parameters->persistent, FILTER_VALIDATE_BOOLEAN, FILTER_NULL_ON_FAILURE)) {
146
                $flags |= STREAM_CLIENT_PERSISTENT;
147
148
                if ($persistent === null) {
149
                    $address = "{$address}/{$parameters->persistent}";
150
                }
151
            }
152
        }
153
154
        return $this->createStreamSocket($parameters, $address, $flags);
155
    }
156
157
    /**
158
     * Initializes a UNIX stream resource.
159
     *
160
     * @param ParametersInterface $parameters Initialization parameters for the connection.
161
     *
162
     * @return resource
163
     */
164
    protected function unixStreamInitializer(ParametersInterface $parameters)
165
    {
166
        if (!isset($parameters->path)) {
167
            throw new InvalidArgumentException('Missing UNIX domain socket path.');
168
        }
169
170
        $flags = STREAM_CLIENT_CONNECT;
171
172
        if (isset($parameters->persistent)) {
173
            if (false !== $persistent = filter_var($parameters->persistent, FILTER_VALIDATE_BOOLEAN, FILTER_NULL_ON_FAILURE)) {
174
                $flags |= STREAM_CLIENT_PERSISTENT;
175
176
                if ($persistent === null) {
177
                    throw new InvalidArgumentException(
178
                        'Persistent connection IDs are not supported when using UNIX domain sockets.'
179
                    );
180
                }
181
            }
182
        }
183
184
        return $this->createStreamSocket($parameters, "unix://{$parameters->path}", $flags);
185
    }
186
187
    /**
188
     * Initializes a SSL-encrypted TCP stream resource.
189
     *
190
     * @param ParametersInterface $parameters Initialization parameters for the connection.
191
     *
192
     * @return resource
193
     */
194
    protected function tlsStreamInitializer(ParametersInterface $parameters)
195
    {
196
        $resource = $this->tcpStreamInitializer($parameters);
197
        $metadata = stream_get_meta_data($resource);
198
199
        // Detect if crypto mode is already enabled for this stream (PHP >= 7.0.0).
200
        if (isset($metadata['crypto'])) {
201
            return $resource;
202
        }
203
204
        if (isset($parameters->ssl) && is_array($parameters->ssl)) {
0 ignored issues
show
Bug Best Practice introduced by
The property ssl does not exist on Predis\Connection\ParametersInterface. Since you implemented __get, consider adding a @property annotation.
Loading history...
205
            $options = $parameters->ssl;
206
        } else {
207
            $options = [];
208
        }
209
210
        if (!isset($options['crypto_type'])) {
211
            $options['crypto_type'] = STREAM_CRYPTO_METHOD_TLS_CLIENT;
212
        }
213
214
        if (!stream_context_set_option($resource, ['ssl' => $options])) {
215
            $this->onConnectionError('Error while setting SSL context options');
216
        }
217
218
        if (!stream_socket_enable_crypto($resource, true, $options['crypto_type'])) {
219
            $this->onConnectionError('Error while switching to encrypted communication');
220
        }
221
222
        return $resource;
223
    }
224
225
    /**
226
     * {@inheritdoc}
227
     */
228
    public function connect()
229
    {
230
        if (parent::connect() && $this->initCommands) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $this->initCommands of type array is implicitly converted to a boolean; are you sure this is intended? If so, consider using ! empty($expr) instead to make it clear that you intend to check for an array without elements.

This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.

Consider making the comparison explicit by using empty(..) or ! empty(...) instead.

Loading history...
231
            foreach ($this->initCommands as $command) {
232
                $response = $this->executeCommand($command);
233
234
                if ($response instanceof ErrorResponseInterface) {
235
                    $this->onConnectionError("`{$command->getId()}` failed: {$response->getMessage()}", 0);
236
                }
237
            }
238
        }
239
    }
240
241
    /**
242
     * {@inheritdoc}
243
     */
244
    public function disconnect()
245
    {
246
        if ($this->isConnected()) {
247
            fclose($this->getResource());
248
            parent::disconnect();
249
        }
250
    }
251
252
    /**
253
     * Performs a write operation over the stream of the buffer containing a
254
     * command serialized with the Redis wire protocol.
255
     *
256
     * @param string $buffer Representation of a command in the Redis wire protocol.
257
     */
258
    protected function write($buffer)
259
    {
260
        $socket = $this->getResource();
261
262
        while (($length = strlen($buffer)) > 0) {
263
            $written = is_resource($socket) ? @fwrite($socket, $buffer) : false;
264
265
            if ($length === $written) {
266
                return;
267
            }
268
269
            if ($written === false || $written === 0) {
270
                $this->onConnectionError('Error while writing bytes to the server.');
271
            }
272
273
            $buffer = substr($buffer, $written);
274
        }
275
    }
276
277
    /**
278
     * {@inheritdoc}
279
     */
280
    public function read()
281
    {
282
        $socket = $this->getResource();
283
        $chunk = fgets($socket);
284
285
        if ($chunk === false || $chunk === '') {
286
            $this->onConnectionError('Error while reading line from the server.');
287
        }
288
289
        $prefix = $chunk[0];
290
        $payload = substr($chunk, 1, -2);
291
292
        switch ($prefix) {
293
            case '+':
294
                return StatusResponse::get($payload);
295
296
            case '$':
297
                $size = (int) $payload;
298
299
                if ($size === -1) {
300
                    return;
301
                }
302
303
                $bulkData = '';
304
                $bytesLeft = ($size += 2);
305
306
                do {
307
                    $chunk = is_resource($socket) ? fread($socket, min($bytesLeft, 4096)) : false;
308
309
                    if ($chunk === false || $chunk === '') {
310
                        $this->onConnectionError('Error while reading bytes from the server.');
311
                    }
312
313
                    $bulkData .= $chunk;
314
                    $bytesLeft = $size - strlen($bulkData);
315
                } while ($bytesLeft > 0);
316
317
                return substr($bulkData, 0, -2);
318
319
            case '*':
320
                $count = (int) $payload;
321
322
                if ($count === -1) {
323
                    return;
324
                }
325
326
                $multibulk = [];
327
328
                for ($i = 0; $i < $count; ++$i) {
329
                    $multibulk[$i] = $this->read();
330
                }
331
332
                return $multibulk;
333
334
            case ':':
335
                $integer = (int) $payload;
336
337
                return $integer == $payload ? $integer : $payload;
338
339
            case '-':
340
                return new ErrorResponse($payload);
341
342
            default:
343
                $this->onProtocolError("Unknown response prefix: '$prefix'.");
344
345
                return;
346
        }
347
    }
348
349
    /**
350
     * {@inheritdoc}
351
     */
352
    public function writeRequest(CommandInterface $command)
353
    {
354
        $commandID = $command->getId();
355
        $arguments = $command->getArguments();
356
357
        $cmdlen = strlen($commandID);
358
        $reqlen = count($arguments) + 1;
359
360
        $buffer = "*{$reqlen}\r\n\${$cmdlen}\r\n{$commandID}\r\n";
361
362
        foreach ($arguments as $argument) {
363
            $arglen = strlen(strval($argument));
364
            $buffer .= "\${$arglen}\r\n{$argument}\r\n";
365
        }
366
367
        $this->write($buffer);
368
    }
369
}
370