StreamIO   F
last analyzed

Complexity

Total Complexity 73

Size/Duplication

Total Lines 447
Duplicated Lines 0 %

Test Coverage

Coverage 69.68%

Importance

Changes 35
Bugs 5 Features 2
Metric Value
wmc 73
eloc 221
c 35
b 5
f 2
dl 0
loc 447
ccs 131
cts 188
cp 0.6968
rs 2.56

14 Methods

Rating   Name   Duplication   Size   Complexity  
B read() 0 60 9
A do_select() 0 16 5
A getSocket() 0 3 1
A timed_out() 0 6 1
A select_write() 0 6 1
A __construct() 0 35 2
B connect() 0 75 9
F write() 0 72 19
A close() 0 9 2
A setupContext() 0 17 6
A extract_error_code() 0 14 3
A enable_keepalive() 0 12 4
A error_handler() 0 14 5
A enableCrypto() 0 22 6

How to fix   Complexity   

Complex Class

Complex classes like StreamIO often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use StreamIO, and based on these observations, apply Extract Interface, too.

1
<?php
2
3
namespace PhpAmqpLib\Wire\IO;
4
5
use PhpAmqpLib\Exception\AMQPConnectionClosedException;
6
use PhpAmqpLib\Exception\AMQPDataReadException;
7
use PhpAmqpLib\Exception\AMQPIOException;
8
use PhpAmqpLib\Exception\AMQPRuntimeException;
9
use PhpAmqpLib\Exception\AMQPTimeoutException;
10
use PhpAmqpLib\Helper\MiscHelper;
11
use PhpAmqpLib\Helper\SocketConstants;
12
13
class StreamIO extends AbstractIO
14
{
15
    /** @var null|resource */
16
    protected $context;
17
18
    /** @var null|resource */
19
    private $sock;
20
21
    /**
22
     * @param string $host
23
     * @param int $port
24
     * @param float $connection_timeout
25
     * @param float $read_write_timeout
26
     * @param resource|null $context
27
     * @param bool $keepalive
28
     * @param int $heartbeat
29
     * @param string|null $ssl_protocol @deprecated
30
     */
31
    public function __construct(
32
        $host,
33
        $port,
34 30
        $connection_timeout,
35
        $read_write_timeout,
36
        $context = null,
37
        $keepalive = false,
38
        $heartbeat = 0,
39
        $ssl_protocol = null
0 ignored issues
show
Unused Code introduced by
The parameter $ssl_protocol is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

39
        /** @scrutinizer ignore-unused */ $ssl_protocol = null

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
40
    ) {
41
        if (func_num_args() === 8) {
42
            trigger_error(
43
                '$ssl_protocol parameter is deprecated, use stream_context_set_option($context, \'ssl\', \'crypto_method\', $ssl_protocol) instead (see https://www.php.net/manual/en/function.stream-socket-enable-crypto.php for possible values)',
44
                E_USER_DEPRECATED
45
            );
46
        }
47
        // TODO FUTURE change comparison to <=
48
        // php-amqplib/php-amqplib#648, php-amqplib/php-amqplib#666
49
        /*
50
            TODO FUTURE enable this check
51
        if ($heartbeat !== 0 && ($read_write_timeout < ($heartbeat * 2))) {
52
            throw new \InvalidArgumentException('read_write_timeout must be at least 2x the heartbeat');
53 30
        }
54 27
         */
55
56
        $this->host = $host;
57 30
        $this->port = $port;
58 30
        $this->connection_timeout = $connection_timeout;
59 30
        $this->read_timeout = (float)$read_write_timeout;
60 30
        $this->write_timeout = (float)$read_write_timeout;
61 30
        $this->context = $context;
62 30
        $this->keepalive = $keepalive;
63 30
        $this->heartbeat = $heartbeat;
64 30
        $this->initial_heartbeat = $heartbeat;
65 30
        $this->canDispatchPcntlSignal = $this->isPcntlSignalEnabled();
66 30
    }
67 30
68
    /**
69 30
     * @inheritdoc
70
     */
71 30
    public function connect()
72 30
    {
73 3
        $errstr = $errno = null;
74 3
75
        $remote = sprintf(
76
            'tcp://%s:%s',
77
            $this->host,
78
            $this->port
79
        );
80
81
        $context = $this->setupContext();
82
        $this->setErrorHandler();
83
84 29
        try {
85
            $this->sock = stream_socket_client(
86 29
                $remote,
87
                $errno,
88 29
                $errstr,
89
                $this->connection_timeout,
90 29
                STREAM_CLIENT_CONNECT,
91 29
                $context
92 29
            );
93
            $this->throwOnError();
94
        } catch (\ErrorException $e) {
95 29
            throw new AMQPIOException($e->getMessage());
96
        } finally {
97
            $this->restoreErrorHandler();
98 29
        }
99
100
        if (false === $this->sock) {
101
            throw new AMQPIOException(
102 29
                sprintf(
103
                    'Error Connecting to server(%s): %s ',
104 29
                    $errno,
105
                    $errstr
106 29
                ),
107 1
                $errno
108 1
            );
109 28
        }
110 29
111
        if (!stream_socket_get_name($this->sock, true)) {
112
            throw new AMQPIOException(
113 28
                sprintf(
114
                    'Connection refused: %s ',
115
                    $remote
116
                )
117
            );
118
        }
119
120
        list($sec, $uSec) = MiscHelper::splitSecondsMicroseconds(max($this->read_timeout, $this->write_timeout));
121
        if (!stream_set_timeout($this->sock, $sec, $uSec)) {
122
            throw new AMQPIOException('Timeout could not be set');
123
        }
124 28
125
        // php cannot capture signals while streams are blocking
126
        if ($this->canDispatchPcntlSignal) {
127
            stream_set_blocking($this->sock, false);
128
            stream_set_write_buffer($this->sock, 0);
129
            if (function_exists('stream_set_read_buffer')) {
130
                stream_set_read_buffer($this->sock, 0);
131
            }
132
        } else {
133 28
            stream_set_blocking($this->sock, true);
134 28
        }
135
136
        if ($this->keepalive) {
137
            $this->enable_keepalive();
138
        }
139 28
140 28
        $options = stream_context_get_options($context);
141 28
        if (isset($options['ssl']['crypto_method'])) {
142 28
            $this->enableCrypto();
143 28
        }
144
145
        $this->heartbeat = $this->initial_heartbeat;
146
    }
147
148
    /**
149 28
     * @return resource
150
     * @throws AMQPIOException
151
     */
152 28
    private function setupContext()
153
    {
154
        $context = $this->context;
155
        if (!is_resource($context) || get_resource_type($context) !== 'stream-context') {
156
            $context = stream_context_create();
157
        }
158 28
159
        stream_context_set_option($context, 'socket', 'tcp_nodelay', true);
160 28
161
        $options = stream_context_get_options($context);
162 28
        if (!empty($options['ssl']) && !isset($options['ssl']['crypto_method'])) {
163
            if (!stream_context_set_option($context, 'ssl', 'crypto_method', STREAM_CRYPTO_METHOD_ANY_CLIENT)) {
164 28
                throw new AMQPIOException("Can not set ssl.crypto_method stream context option");
165 28
            }
166 28
        }
167
168 28
        return $context;
169 28
    }
170
171
    /**
172
     * @inheritdoc
173
     */
174 28
    public function read($len)
175
    {
176 28
        $this->check_heartbeat();
177 28
178
        list($timeout_sec, $timeout_uSec) = MiscHelper::splitSecondsMicroseconds($this->read_timeout);
179
180 28
        $read_start = microtime(true);
181 28
        $read = 0;
182
        $data = '';
183
184 28
        while ($read < $len) {
185
            if (!is_resource($this->sock) || feof($this->sock)) {
186
                $this->close();
187
                throw new AMQPConnectionClosedException('Broken pipe or closed connection');
188 28
            }
189
190
            $this->setErrorHandler();
191
            try {
192
                $buffer = fread($this->sock, ($len - $read));
193
                $this->throwOnError();
194
            } catch (\ErrorException $e) {
195
                throw new AMQPDataReadException($e->getMessage(), $e->getCode(), $e);
196
            } finally {
197
                $this->restoreErrorHandler();
198
            }
199 28
200 28
            if ($buffer === false) {
201 28
                throw new AMQPDataReadException('Error receiving data');
202 28
            }
203
204
            if ($buffer === '') {
205 28
                $read_now = microtime(true);
206
                $t_read = $read_now - $read_start;
207
                if ($t_read > $this->read_timeout) {
208
                    throw new AMQPTimeoutException('Too many read attempts detected in StreamIO');
209
                }
210
                $this->select($timeout_sec, $timeout_uSec);
211
212
                continue;
213
            }
214
215 28
            $this->last_read = microtime(true);
0 ignored issues
show
Documentation Bug introduced by
It seems like microtime(true) can also be of type string. However, the property $last_read is declared as type double|integer. Maybe add an additional type check?

Our type inference engine has found a suspicous assignment of a value to a property. This check raises an issue when a value that can be of a mixed type is assigned to a property that is type hinted more strictly.

For example, imagine you have a variable $accountId that can either hold an Id object or false (if there is no account id yet). Your code now assigns that value to the id property of an instance of the Account class. This class holds a proper account, so the id value must no longer be false.

Either this assignment is in error or a type check should be added for that assignment.

class Id
{
    public $id;

    public function __construct($id)
    {
        $this->id = $id;
    }

}

class Account
{
    /** @var  Id $id */
    public $id;
}

$account_id = false;

if (starsAreRight()) {
    $account_id = new Id(42);
}

$account = new Account();
if ($account instanceof Id)
{
    $account->id = $account_id;
}
Loading history...
216
            $read_start = $this->last_read;
217 28
            $read += mb_strlen($buffer, 'ASCII');
218
            $data .= $buffer;
219
        }
220
221
        if (mb_strlen($data, 'ASCII') !== $len) {
222
            throw new AMQPDataReadException(
223 33
                sprintf(
224
                    'Error reading data. Received %s instead of expected %s bytes',
225 33
                    mb_strlen($data, 'ASCII'),
226
                    $len
227 33
                )
228 33
            );
229 33
        }
230
231 33
        $this->last_read = microtime(true);
232 33
233
        return $data;
234
    }
235
236
    /**
237
     * @inheritdoc
238 33
     */
239 33
    public function write($data)
240
    {
241
        $this->checkBrokerHeartbeat();
242
243
        $written = 0;
244
        $len = mb_strlen($data, 'ASCII');
245
        $write_start = microtime(true);
246
247
        while ($written < $len) {
248
            if (!is_resource($this->sock) || feof($this->sock)) {
249 33
                $this->close();
250 33
                $constants = SocketConstants::getInstance();
251 33
                throw new AMQPConnectionClosedException('Broken pipe or closed connection', $constants->SOCKET_EPIPE);
252 33
            }
253
254
            $result = false;
0 ignored issues
show
Unused Code introduced by
The assignment to $result is dead and can be removed.
Loading history...
255
            $this->setErrorHandler();
256
            // OpenSSL's C library function SSL_write() can balk on buffers > 8192
257
            // bytes in length, so we're limiting the write size here. On both TLS
258
            // and plaintext connections, the write loop will continue until the
259
            // buffer has been fully written.
260
            // This behavior has been observed in OpenSSL dating back to at least
261
            // September 2002:
262
            // http://comments.gmane.org/gmane.comp.encryption.openssl.user/4361
263
            try {
264
                // check stream and prevent from high CPU usage
265
                $result = 0;
266
                if ($this->select_write()) {
267
                    $buffer = mb_substr($data, $written, self::BUFFER_SIZE, 'ASCII');
268
                    $result = fwrite($this->sock, $buffer);
269
                }
270 33
                $this->throwOnError();
271 33
            } catch (\ErrorException $e) {
272
                $code = $this->last_error['errno'];
273
                $constants = SocketConstants::getInstance();
274 33
                switch ($code) {
275
                    case $constants->SOCKET_EPIPE:
276
                    case $constants->SOCKET_ENETDOWN:
277
                    case $constants->SOCKET_ENETUNREACH:
278 33
                    case $constants->SOCKET_ENETRESET:
279
                    case $constants->SOCKET_ECONNABORTED:
280
                    case $constants->SOCKET_ECONNRESET:
281
                    case $constants->SOCKET_ECONNREFUSED:
282 33
                    case $constants->SOCKET_ETIMEDOUT:
283 33
                        $this->close();
284 33
                        throw new AMQPConnectionClosedException(socket_strerror($code), $code, $e);
285 33
                    default:
286
                        throw new AMQPRuntimeException($e->getMessage(), $code, $e);
287 1
                }
288
            } finally {
289
                $this->restoreErrorHandler();
290
            }
291 1
292 1
            if ($result === false) {
293
                throw new AMQPRuntimeException('Error sending data');
294
            }
295
296
            if ($this->timed_out()) {
297
                throw AMQPTimeoutException::writeTimeout($this->write_timeout);
298
            }
299
300
            $now = microtime(true);
301 4
            if ($result > 0) {
302
                $this->last_write = $write_start = $now;
0 ignored issues
show
Documentation Bug introduced by
It seems like $write_start = $now can also be of type string. However, the property $last_write is declared as type double|integer. Maybe add an additional type check?

Our type inference engine has found a suspicous assignment of a value to a property. This check raises an issue when a value that can be of a mixed type is assigned to a property that is type hinted more strictly.

For example, imagine you have a variable $accountId that can either hold an Id object or false (if there is no account id yet). Your code now assigns that value to the id property of an instance of the Account class. This class holds a proper account, so the id value must no longer be false.

Either this assignment is in error or a type check should be added for that assignment.

class Id
{
    public $id;

    public function __construct($id)
    {
        $this->id = $id;
    }

}

class Account
{
    /** @var  Id $id */
    public $id;
}

$account_id = false;

if (starsAreRight()) {
    $account_id = new Id(42);
}

$account = new Account();
if ($account instanceof Id)
{
    $account->id = $account_id;
}
Loading history...
303 4
                $written += $result;
304 4
            } else {
305
                if (feof($this->sock)) {
306
                    $this->close();
307 4
                    throw new AMQPConnectionClosedException('Broken pipe or closed connection');
308 4
                }
309
                if (($now - $write_start) > $this->write_timeout) {
310 4
                    throw AMQPTimeoutException::writeTimeout($this->write_timeout);
311 3
                }
312
            }
313
        }
314 1
    }
315
316
    /**
317 24
     * @inheritdoc
318
     */
319 24
    public function error_handler($errno, $errstr, $errfile, $errline): void
320 24
    {
321 23
        $code = $this->extract_error_code($errstr);
322
        $constants = SocketConstants::getInstance();
323 24
        switch ($code) {
324 24
            // fwrite notice that the stream isn't ready - EAGAIN or EWOULDBLOCK
325 24
            case $constants->SOCKET_EAGAIN:
326
            case $constants->SOCKET_EWOULDBLOCK:
327
            // stream_select warning that it has been interrupted by a signal - EINTR
328
            case $constants->SOCKET_EINTR:
329
                return;
330
        }
331
332
        parent::error_handler($code > 0 ? $code : $errno, $errstr, $errfile, $errline);
333
    }
334
335
    public function close()
336
    {
337
        $this->disableHeartbeat();
338
        if (is_resource($this->sock)) {
339 29
            fclose($this->sock);
340
        }
341 29
        $this->sock = null;
342 1
        $this->last_read = 0;
343 1
        $this->last_write = 0;
344
    }
345
346 28
    /**
347 28
     * @deprecated
348 28
     * @return null|resource|\Socket
349
     */
350 28
    public function getSocket()
351
    {
352
        return $this->sock;
353
    }
354 28
355
    /**
356
     * @inheritdoc
357
     */
358
    protected function do_select(?int $sec, int $usec)
359
    {
360 28
        if ($this->sock === null || !is_resource($this->sock)) {
361
            $this->sock = null;
362 28
            throw new AMQPConnectionClosedException('Broken pipe or closed connection', 0);
363 28
        }
364
365 28
        $read = array($this->sock);
366
        $write = null;
367
        $except = null;
368
369
        if ($sec === null && PHP_VERSION_ID >= 80100) {
370
            $usec = 0;
371 28
        }
372
373
        return stream_select($read, $write, $except, $sec, $usec);
374 28
    }
375
376 28
    /**
377
     * @return int|bool
378
     */
379
    protected function select_write()
380
    {
381
        $read = $except = null;
382
        $write = array($this->sock);
383
384
        return stream_select($read, $write, $except, 0, 100000);
385
    }
386
387
    /**
388
     * @return mixed
389
     */
390
    protected function timed_out()
391
    {
392
        // get status of socket to determine whether or not it has timed out
393
        $info = stream_get_meta_data($this->sock);
394
395
        return $info['timed_out'];
396
    }
397
398
    /**
399
     * @throws \PhpAmqpLib\Exception\AMQPIOException
400
     */
401
    protected function enable_keepalive(): void
402
    {
403
        if (!function_exists('socket_import_stream')) {
404
            throw new AMQPIOException('Can not enable keepalive: function socket_import_stream does not exist');
405
        }
406
407
        if (!defined('SOL_SOCKET') || !defined('SO_KEEPALIVE')) {
408 4
            throw new AMQPIOException('Can not enable keepalive: SOL_SOCKET or SO_KEEPALIVE is not defined');
409
        }
410 4
411 3
        $socket = socket_import_stream($this->sock);
412
        socket_set_option($socket, SOL_SOCKET, SO_KEEPALIVE, 1);
413 1
    }
414
415 4
    /**
416 4
     * @param string $message
417 4
     * @return int
418 3
     */
419
    protected function extract_error_code($message)
420
    {
421 1
        if (0 === strpos($message, 'stream_select():')) {
422
            $pattern = '/\s+\[(\d+)\]:\s+/';
423
        } else {
424
            $pattern = '/\s+errno=(\d+)\s+/';
425
        }
426
        $matches = array();
427
        $result = preg_match($pattern, $message, $matches);
428
        if ($result > 0) {
429
            return (int)$matches[1];
430
        }
431
432
        return 0;
433
    }
434
435
    /**
436
     * @throws AMQPIOException
437
     */
438
    private function enableCrypto(): void
439
    {
440
        $timeout_at = time() + ($this->read_timeout + $this->write_timeout) * 2; // 2 round-trips during handshake
441
442
        try {
443
            $this->setErrorHandler();
444
            do {
445
                $enabled = stream_socket_enable_crypto($this->sock, true);
446
                if ($enabled === true) {
447
                    return;
448
                }
449
                $this->throwOnError();
450
                usleep(1e3);
451
            } while ($enabled === 0 && time() < $timeout_at);
452
        } catch (\ErrorException $exception) {
453
            throw new AMQPIOException($exception->getMessage(), $exception->getCode(), $exception);
454
        } finally {
455
            $this->restoreErrorHandler();
456
        }
457
458
        if ($enabled !== true) {
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable $enabled does not seem to be defined for all execution paths leading up to this point.
Loading history...
459
            throw new AMQPIOException('Could not enable socket crypto');
460
        }
461
    }
462
}
463