Completed
Push — master ( 2bc16b...27ac94 )
by John
10s
created

StreamIO   C

Complexity

Total Complexity 57

Size/Duplication

Total Lines 449
Duplicated Lines 0 %

Coupling/Cohesion

Components 2
Dependencies 6

Test Coverage

Coverage 62.83%

Importance

Changes 20
Bugs 10 Features 1
Metric Value
wmc 57
c 20
b 10
f 1
lcom 2
cbo 6
dl 0
loc 449
ccs 120
cts 191
cp 0.6283
rs 6.433

16 Methods

Rating   Name   Duplication   Size   Complexity  
A isPcntlSignalEnabled() 0 6 4
B __construct() 0 30 3
C connect() 0 69 8
A reconnect() 0 5 1
C read() 0 56 10
C write() 0 48 9
A error_handler() 0 19 3
B check_heartbeat() 0 19 6
A write_heartbeat() 0 9 1
A close() 0 7 2
A get_socket() 0 4 1
A getSocket() 0 4 1
A select() 0 18 2
A timed_out() 0 7 1
A enable_keepalive() 0 13 4
A disableHeartbeat() 0 6 1

How to fix   Complexity   

Complex Class

Complex classes like StreamIO often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes. You can also have a look at the cohesion graph to spot any un-connected, or weakly-connected components.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use StreamIO, and based on these observations, apply Extract Interface, too.

1
<?php
2
namespace PhpAmqpLib\Wire\IO;
3
4
use PhpAmqpLib\Exception\AMQPIOException;
5
use PhpAmqpLib\Exception\AMQPRuntimeException;
6
use PhpAmqpLib\Exception\AMQPTimeoutException;
7
use PhpAmqpLib\Helper\MiscHelper;
8
use PhpAmqpLib\Wire\AMQPWriter;
9
10
class StreamIO extends AbstractIO
11
{
12
    /** @var string */
13
    protected $protocol;
14
15
    /** @var string */
16
    protected $host;
17
18
    /** @var int */
19
    protected $port;
20
21
    /** @var int */
22
    protected $connection_timeout;
23
24
    /** @var int */
25
    protected $read_write_timeout;
26
27
    /** @var resource */
28
    protected $context;
29
30
    /** @var bool */
31
    protected $keepalive;
32
33
    /** @var int */
34
    protected $heartbeat;
35
36
    /** @var float */
37
    protected $last_read;
38
39
    /** @var float */
40
    protected $last_write;
41
42
    /** @var array */
43
    protected $last_error;
44
45
    /** @var resource */
46
    private $sock;
47
48
    /** @var bool */
49
    private $canSelectNull;
50
51
    /** @var bool */
52
    private $canDispatchPcntlSignal;
53
54
    /**
55
     * @param string $host
56
     * @param int $port
57
     * @param int $connection_timeout
58
     * @param int $read_write_timeout
59
     * @param null $context
60
     * @param bool $keepalive
61
     * @param int $heartbeat
62
     */
63 36
    public function __construct(
64
        $host,
65
        $port,
66
        $connection_timeout,
67
        $read_write_timeout,
68
        $context = null,
69
        $keepalive = false,
70
        $heartbeat = 0
71
    ) {
72 36
        $this->protocol = 'tcp';
73 36
        $this->host = $host;
74 36
        $this->port = $port;
75 36
        $this->connection_timeout = $connection_timeout;
76 36
        $this->read_write_timeout = $read_write_timeout;
77 36
        $this->context = $context;
78 36
        $this->keepalive = $keepalive;
79 36
        $this->heartbeat = $heartbeat;
80 36
        $this->canSelectNull = true;
81 36
        $this->canDispatchPcntlSignal = $this->isPcntlSignalEnabled();
82
83 36
        if (is_null($this->context)) {
84 36
            $this->context = stream_context_create();
85 24
        } else {
86
            $this->protocol = 'ssl';
87
            // php bugs 41631 & 65137 prevent select null from working on ssl streams
88
            if (PHP_VERSION_ID < 50436) {
89
                $this->canSelectNull = false;
90
            }
91
        }
92 36
    }
93
94
    /**
95
     * @return bool
96
     */
97 36
    private function isPcntlSignalEnabled()
98
    {
99 36
        return extension_loaded('pcntl')
100 36
            && function_exists('pcntl_signal_dispatch')
101 36
            && (defined('AMQP_WITHOUT_SIGNALS') ? !AMQP_WITHOUT_SIGNALS : true);
102
    }
103
104
    /**
105
     * Sets up the stream connection
106
     *
107
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
108
     * @throws \Exception
109
     */
110 36
    public function connect()
111
    {
112 36
        $errstr = $errno = null;
113
114 36
        $remote = sprintf(
115 36
            '%s://%s:%s',
116 36
            $this->protocol,
117 36
            $this->host,
118 36
            $this->port
119 24
        );
120
121 36
        set_error_handler(array($this, 'error_handler'));
122
123 36
        try {
124 24
            $this->sock = stream_socket_client(
125 24
                $remote,
126 24
                $errno,
127 36
                $errstr,
128 36
                $this->connection_timeout,
129 36
                STREAM_CLIENT_CONNECT,
130 24
                $this->context
131
            );
132 36
        } catch (\ErrorException $e) {
133
            restore_error_handler();
134 36
            throw $e;
135
        }
136
137
        restore_error_handler();
138
139
        if (false === $this->sock) {
140
            throw new AMQPRuntimeException(
141
                sprintf(
142
                    'Error Connecting to server(%s): %s ',
143
                    $errno,
144
                    $errstr
145 36
                ),
146
                $errno
147
            );
148
        }
149
150
        if (false === stream_socket_get_name($this->sock, true)) {
151
            throw new AMQPRuntimeException(
152
                sprintf(
153
                    'Connection refused: %s ',
154 36
                    $remote
155 36
                )
156
            );
157
        }
158
159
        list($sec, $uSec) = MiscHelper::splitSecondsMicroseconds($this->read_write_timeout);
160 36
        if (!stream_set_timeout($this->sock, $sec, $uSec)) {
161 36
            throw new AMQPIOException('Timeout could not be set');
162 36
        }
163 36
164 36
        // php cannot capture signals while streams are blocking
165 24
        if ($this->canDispatchPcntlSignal) {
166 24
            stream_set_blocking($this->sock, 0);
167
            stream_set_write_buffer($this->sock, 0);
168
            if (function_exists('stream_set_read_buffer')) {
169
                stream_set_read_buffer($this->sock, 0);
170 36
            }
171
        } else {
172
            stream_set_blocking($this->sock, 1);
173 36
        }
174
175
        if ($this->keepalive) {
176
            $this->enable_keepalive();
177
        }
178 12
    }
179
180 12
    /**
181 12
     * Reconnects the socket
182 12
     */
183
    public function reconnect()
184
    {
185
        $this->close();
186
        $this->connect();
187
    }
188
189 36
    /**
190
     * @param $len
191 36
     * @throws \PhpAmqpLib\Exception\AMQPIOException
192 36
     * @return mixed|string
193
     */
194 36
    public function read($len)
195 36
    {
196
        $read = 0;
197 36
        $data = '';
198
199
        while ($read < $len) {
200
            $this->check_heartbeat();
201 36
202 36
            if (!is_resource($this->sock) || feof($this->sock)) {
203 36
                throw new AMQPRuntimeException('Broken pipe or closed connection');
204
            }
205 36
206
            set_error_handler(array($this, 'error_handler'));
207
            try {
208
                $buffer = fread($this->sock, ($len - $read));
209 36
            } catch (\ErrorException $e) {
210 36
                restore_error_handler();
211
                throw $e;
212 36
            }
213 36
            restore_error_handler();
214 36
215 24
            if ($buffer === false) {
216
                throw new AMQPRuntimeException('Error receiving data');
217
            }
218
219 24
            if ($buffer === '') {
220 36
                if ($this->canDispatchPcntlSignal) {
221
                    // prevent cpu from being consumed while waiting
222
                    if ($this->canSelectNull) {
223 36
                        $this->select(null, null);
224 36
                        pcntl_signal_dispatch();
225 24
                    } else {
226
                        usleep(100000);
227 36
                        pcntl_signal_dispatch();
228
                    }
229
                }
230
                continue;
231
            }
232
233
            $read += mb_strlen($buffer, 'ASCII');
234
            $data .= $buffer;
235
        }
236
237 36
        if (mb_strlen($data, 'ASCII') !== $len) {
238 36
            throw new AMQPRuntimeException(
239
                sprintf(
240
                    'Error reading data. Received %s instead of expected %s bytes',
241
                    mb_strlen($data, 'ASCII'),
242
                    $len
243
                )
244
            );
245
        }
246
247 36
        $this->last_read = microtime(true);
248
        return $data;
249 36
    }
250 36
251
    /**
252 36
     * @param $data
253
     * @return mixed|void
254 36
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
255
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException
256
     */
257
    public function write($data)
258 36
    {
259
        $written = 0;
260
        $len = mb_strlen($data, 'ASCII');
261
262
        while ($written < $len) {
263
264
            if (!is_resource($this->sock)) {
265
                throw new AMQPRuntimeException('Broken pipe or closed connection');
266 36
            }
267 36
268
            set_error_handler(array($this, 'error_handler'));
269 36
            // OpenSSL's C library function SSL_write() can balk on buffers > 8192
270
            // bytes in length, so we're limiting the write size here. On both TLS
271
            // and plaintext connections, the write loop will continue until the
272
            // buffer has been fully written.
273 36
            // This behavior has been observed in OpenSSL dating back to at least
274
            // September 2002:
275
            // http://comments.gmane.org/gmane.comp.encryption.openssl.user/4361
276
            try {
277 36
                $buffer = fwrite($this->sock, $data, 8192);
278
            } catch (\ErrorException $e) {
279
                restore_error_handler();
280
                throw $e;
281 36
            }
282
            restore_error_handler();
283 36
284 36
            if ($buffer === false) {
285 24
                throw new AMQPRuntimeException('Error sending data');
286 24
            }
287
288 36
            if ($buffer === 0 && feof($this->sock)) {
289 36
                throw new AMQPRuntimeException('Broken pipe or closed connection');
290
            }
291
292
            if ($this->timed_out()) {
293
                throw new AMQPTimeoutException('Error sending data. Socket connection timed out');
294
            }
295
296
            $written += $buffer;
297
298
            if ($buffer > 0) {
299
                $data = mb_substr($data, $buffer, mb_strlen($data, 'ASCII') - $buffer, 'ASCII');
300
            }
301
        }
302
303
        $this->last_write = microtime(true);
304
    }
305
306
    /**
307
     * Internal error handler to deal with stream and socket errors that need to be ignored
308
     *
309
     * @param  int $errno
310
     * @param  string $errstr
311
     * @param  string $errfile
312
     * @param  int $errline
313
     * @param  array $errcontext
314
     * @return null
315
     * @throws \ErrorException
316
     */
317
    public function error_handler($errno, $errstr, $errfile, $errline, $errcontext = null)
318
    {
319
        $this->last_error = compact('errno', 'errstr', 'errfile', 'errline', 'errcontext');
320
321
        // fwrite notice that the stream isn't ready
322
        if (strstr($errstr, 'Resource temporarily unavailable')) {
323
             // it's allowed to retry
324
            return null;
325
        }
326
327 36
        // stream_select warning that it has been interrupted by a signal
328
        if (strstr($errstr, 'Interrupted system call')) {
329
             // it's allowed while processing signals
330 36
            return null;
331
        }
332
333
        // raise all other issues to exceptions
334
        throw new \ErrorException($errstr, 0, $errno, $errfile, $errline);
335
    }
336
337
    /**
338
     * Heartbeat logic: check connection health here
339
     */
340
    protected function check_heartbeat()
341
    {
342
        // ignore unless heartbeat interval is set
343
        if ($this->heartbeat !== 0 && $this->last_read && $this->last_write) {
344
            $t = microtime(true);
345 36
            $t_read = round($t - $this->last_read);
346
            $t_write = round($t - $this->last_write);
347
348
            // server has gone away
349
            if (($this->heartbeat * 2) < $t_read) {
350
                $this->reconnect();
351
            }
352
353
            // time for client to send a heartbeat
354
            if (($this->heartbeat / 2) < $t_write) {
355
                $this->write_heartbeat();
356
            }
357
        }
358
    }
359
360 36
    /**
361
     * Sends a heartbeat message
362 36
     */
363 36
    protected function write_heartbeat()
364 24
    {
365 36
        $pkt = new AMQPWriter();
366 36
        $pkt->write_octet(8);
367
        $pkt->write_short(0);
368
        $pkt->write_long(0);
369
        $pkt->write_octet(0xCE);
370
        $this->write($pkt->getvalue());
371
    }
372
373
    public function close()
374
    {
375
        if (is_resource($this->sock)) {
376
            fclose($this->sock);
377
        }
378
        $this->sock = null;
379
    }
380
381
    /**
382
     * @return resource
383
     */
384
    public function get_socket()
385
    {
386
        return $this->sock;
387
    }
388
389 36
    /**
390
     * @return resource
391 36
     */
392 36
    public function getSocket()
393 36
    {
394 36
        return $this->get_socket();
395
    }
396 36
397 36
    /**
398 36
     * @param $sec
399
     * @param $usec
400 36
     * @return int|mixed
401
     */
402
    public function select($sec, $usec)
403
    {
404
        $read = array($this->sock);
405
        $write = null;
406 36
        $except = null;
407
        $result = false;
408
409 36
        set_error_handler(array($this, 'error_handler'));
410
        try {
411 36
            $result = stream_select($read, $write, $except, $sec, $usec);
412
        } catch (\ErrorException $e) {
413
            restore_error_handler();
414
            throw $e;
415
        }
416
        restore_error_handler();
417
418
        return $result;
419
    }
420
421
    /**
422
     * @return mixed
423
     */
424
    protected function timed_out()
425
    {
426
        // get status of socket to determine whether or not it has timed out
427
        $info = stream_get_meta_data($this->sock);
428
429
        return $info['timed_out'];
430
    }
431
432
    /**
433
     * @throws \PhpAmqpLib\Exception\AMQPIOException
434 36
     */
435
    protected function enable_keepalive()
436 36
    {
437
        if (!function_exists('socket_import_stream')) {
438 36
            throw new AMQPIOException('Can not enable keepalive: function socket_import_stream does not exist');
439
        }
440
441
        if (!defined('SOL_SOCKET') || !defined('SO_KEEPALIVE')) {
442
            throw new AMQPIOException('Can not enable keepalive: SOL_SOCKET or SO_KEEPALIVE is not defined');
443
        }
444
445
        $socket = socket_import_stream($this->sock);
446
        socket_set_option($socket, SOL_SOCKET, SO_KEEPALIVE, 1);
447
    }
448
449
    /**
450
     * @return $this
451
     */
452
    public function disableHeartbeat()
453
    {
454
        $this->heartbeat = 0;
455
456
        return $this;
457
    }
458
}
459