Completed
Pull Request — master (#348)
by
unknown
21:36
created

StreamIO::connect()   B

Complexity

Conditions 7
Paths 9

Size

Total Lines 64
Code Lines 40

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 26
CRAP Score 11.0278

Importance

Changes 7
Bugs 4 Features 1
Metric Value
c 7
b 4
f 1
dl 0
loc 64
ccs 26
cts 46
cp 0.5652
rs 7.2058
cc 7
eloc 40
nc 9
nop 0
crap 11.0278

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
namespace PhpAmqpLib\Wire\IO;
3
4
use PhpAmqpLib\Exception\AMQPIOException;
5
use PhpAmqpLib\Exception\AMQPRuntimeException;
6
use PhpAmqpLib\Exception\AMQPTimeoutException;
7
use PhpAmqpLib\Helper\MiscHelper;
8
use PhpAmqpLib\Wire\AMQPWriter;
9
10
class StreamIO extends AbstractIO
11
{
12
    /** @var string */
13
    protected $protocol;
14
15
    /** @var string */
16
    protected $host;
17
18
    /** @var int */
19
    protected $port;
20
21
    /** @var int */
22
    protected $connection_timeout;
23
24
    /** @var int */
25
    protected $read_write_timeout;
26
27
    /** @var resource */
28
    protected $context;
29
30
    /** @var bool */
31
    protected $keepalive;
32
33
    /** @var int */
34
    protected $heartbeat;
35
36
    /** @var float */
37
    protected $last_read;
38
39
    /** @var float */
40
    protected $last_write;
41
42
    /** @var array */
43
    protected $last_error;
44
45
    /** @var resource */
46
    private $sock;
47
48
    /** @var bool */
49
    private $canSelectNull;
50
51
    /** @var bool */
52
    private $canDispatchPcntlSignal;
53
54
    /**
55
     * @param string $host
56
     * @param int $port
57
     * @param int $connection_timeout
58
     * @param int $read_write_timeout
59
     * @param null $context
60
     * @param bool $keepalive
61
     * @param int $heartbeat
62
     */
63 40
    public function __construct(
64
        $host,
65
        $port,
66
        $connection_timeout,
67
        $read_write_timeout,
68
        $context = null,
69
        $keepalive = false,
70
        $heartbeat = 0
71
    ) {
72 40
        $this->protocol = 'tcp';
73 40
        $this->host = $host;
74 40
        $this->port = $port;
75 40
        $this->connection_timeout = $connection_timeout;
76 40
        $this->read_write_timeout = $read_write_timeout;
77 40
        $this->context = $context;
78 40
        $this->keepalive = $keepalive;
79 40
        $this->heartbeat = $heartbeat;
80 40
        $this->canSelectNull = true;
81 40
        $this->canDispatchPcntlSignal = $this->isPcntlSignalEnabled();
82
83 40
        if (is_null($this->context)) {
84 40
            $this->context = stream_context_create();
85 32
        } else {
86
            $this->protocol = 'ssl';
87
            // php bugs 41631 & 65137 prevent select null from working on ssl streams
88
            if (PHP_VERSION_ID < 50436) {
89
                $this->canSelectNull = false;
90
            }
91
        }
92 40
    }
93
94
    /**
95
     * @return bool
96
     */
97 40
    private function isPcntlSignalEnabled()
98
    {
99 40
        return extension_loaded('pcntl')
100 40
            && function_exists('pcntl_signal_dispatch')
101 40
            && (defined('AMQP_WITHOUT_SIGNALS') && !AMQP_WITHOUT_SIGNALS);
102
    }
103
104
    /**
105
     * Sets up the stream connection
106
     *
107
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
108
     * @throws \Exception
109
     */
110 40
    public function connect()
111
    {
112 40
        $errstr = $errno = null;
113
114 40
        $remote = sprintf(
115 40
            '%s://%s:%s',
116 40
            $this->protocol,
117 40
            $this->host,
118 40
            $this->port
119 32
        );
120
121 40
        set_error_handler(array($this, 'error_handler'));
122
123 40
        $this->sock = stream_socket_client(
124 32
            $remote,
125 32
            $errno,
126 32
            $errstr,
127 40
            $this->connection_timeout,
128 40
            STREAM_CLIENT_CONNECT,
129 40
            $this->context
130 32
        );
131
132 40
        restore_error_handler();
133
134 40
        if (false === $this->sock) {
135
            throw new AMQPRuntimeException(
136
                sprintf(
137
                    'Error Connecting to server(%s): %s ',
138
                    $errno,
139
                    $errstr
140
                ),
141
                $errno
142
            );
143
        }
144
145 40
        if (false === stream_socket_get_name($this->sock, true)) {
146
            throw new AMQPRuntimeException(
147
                sprintf(
148
                    'Connection refused: %s ',
149
                    $remote
150
                )
151
            );
152
        }
153
154 40
        list($sec, $uSec) = MiscHelper::splitSecondsMicroseconds($this->read_write_timeout);
155 40
        if (!stream_set_timeout($this->sock, $sec, $uSec)) {
156
            throw new AMQPIOException('Timeout could not be set');
157
        }
158
159
        // php cannot capture signals while streams are blocking
160 40
        if ($this->canDispatchPcntlSignal) {
161
            stream_set_blocking($this->sock, 0);
162
            stream_set_write_buffer($this->sock, 0);
163
            if (function_exists('stream_set_read_buffer')) {
164
                stream_set_read_buffer($this->sock, 0);
165
            }
166
        } else {
167 40
            stream_set_blocking($this->sock, 1);
168
        }
169
170 40
        if ($this->keepalive) {
171
            $this->enable_keepalive();
172
        }
173 40
    }
174
175
    /**
176
     * Reconnects the socket
177
     */
178 10
    public function reconnect()
179
    {
180 10
        $this->close();
181 10
        $this->connect();
182 10
    }
183
184
    /**
185
     * @param $len
186
     * @throws \PhpAmqpLib\Exception\AMQPIOException
187
     * @return mixed|string
188
     */
189 40
    public function read($len)
190
    {
191 40
        $read = 0;
192 40
        $data = '';
193
194 40
        while ($read < $len) {
195 40
            $this->check_heartbeat();
196
197 40
            if (!is_resource($this->sock) || feof($this->sock)) {
198
                throw new AMQPRuntimeException('Broken pipe or closed connection');
199
            }
200
201 40
            set_error_handler(array($this, 'error_handler'));
202 40
            $buffer = fread($this->sock, ($len - $read));
203 40
            restore_error_handler();
204
205 40
            if ($buffer === false) {
206
                throw new AMQPRuntimeException('Error receiving data');
207
            }
208
209 40
            if ($buffer === '') {
210
                if ($this->canDispatchPcntlSignal) {
211
                    // prevent cpu from being consumed while waiting
212
                    if ($this->canSelectNull) {
213
                        $this->select($this->heartbeat ?  $this->heartbeat + 1 : null, null);
214
                        pcntl_signal_dispatch();
215
                    } else {
216
                        usleep(100000);
217
                        pcntl_signal_dispatch();
218
                    }
219
                }
220
                continue;
221
            }
222
223 40
            $read += mb_strlen($buffer, 'ASCII');
224 40
            $data .= $buffer;
225 32
        }
226
227 40
        if (mb_strlen($data, 'ASCII') !== $len) {
228
            throw new AMQPRuntimeException(
229
                sprintf(
230
                    'Error reading data. Received %s instead of expected %s bytes',
231
                    mb_strlen($data, 'ASCII'),
232
                    $len
233
                )
234
            );
235
        }
236
237 40
        $this->last_read = microtime(true);
238 40
        return $data;
239
    }
240
241
    /**
242
     * @param $data
243
     * @return mixed|void
244
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
245
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException
246
     */
247 40
    public function write($data)
248
    {
249 40
        $written = 0;
250 40
        $buffer  = false;
251
        $tw      = false;
252 40
        $len = mb_strlen($data, 'ASCII');
253
254 40
        while ($written < $len) {
255
            if ($buffer !== 0 && $this->read_write_timeout) {
256
                $tw = time();
257
            }
258 40
259
            if (!is_resource($this->sock)) {
260
                throw new AMQPRuntimeException('Broken pipe or closed connection');
261
            }
262
263
            set_error_handler(array($this, 'error_handler'));
264
            // OpenSSL's C library function SSL_write() can balk on buffers > 8192
265
            // bytes in length, so we're limiting the write size here. On both TLS
266 40
            // and plaintext connections, the write loop will continue until the
267 40
            // buffer has been fully written.
268
            // This behavior has been observed in OpenSSL dating back to at least
269 40
            // September 2002:
270
            // http://comments.gmane.org/gmane.comp.encryption.openssl.user/4361
271
            $buffer = fwrite($this->sock, $data, 8192);
272
            restore_error_handler();
273 40
274
            if ($buffer === false) {
275
                throw new AMQPRuntimeException('Error sending data');
276
            }
277 40
278
            if ($buffer === 0 && feof($this->sock)) {
279
                throw new AMQPRuntimeException('Broken pipe or closed connection');
280
            }
281 40
282
            if ($buffer === 0 && $tw && (time() - $tw > $this->read_write_timeout)) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $tw of type integer|false is loosely compared to true; this is ambiguous if the integer can be zero. You might want to explicitly use !== null instead.

In PHP, under loose comparison (like ==, or !=, or switch conditions), values of different types might be equal.

For integer values, zero is a special case, in particular the following results might be unexpected:

0   == false // true
0   == null  // true
123 == false // false
123 == null  // false

// It is often better to use strict comparison
0 === false // false
0 === null  // false
Loading history...
283 40
                throw new AMQPTimeoutException('Error sending data. write timed out');
284 40
            }
285 32
286 32
            if ($this->timed_out()) {
287
                throw new AMQPTimeoutException('Error sending data. Socket connection timed out');
288 40
            }
289 40
290
            $written += $buffer;
291
292
            if ($buffer > 0) {
293
                $data = substr($data, $buffer, strlen($data) - $buffer);
294
            }
295
        }
296
297
        $this->last_write = microtime(true);
298
    }
299
300
    /**
301
     * Internal error handler to deal with stream and socket errors that need to be ignored
302
     *
303
     * @param  int $errno
304
     * @param  string $errstr
305
     * @param  string $errfile
306
     * @param  int $errline
307
     * @param  array $errcontext
308
     * @return null
309
     * @throws \ErrorException
310
     */
311
    public function error_handler($errno, $errstr, $errfile, $errline, $errcontext = null)
312
    {
313
        $this->last_error = compact('errno', 'errstr', 'errfile', 'errline', 'errcontext');
314
315
        // fwrite notice that the stream isn't ready
316
        if (strstr($errstr, 'Resource temporarily unavailable')) {
317
             // it's allowed to retry
318
            return null;
319
        }
320
321
        // stream_select warning that it has been interrupted by a signal
322
        if (strstr($errstr, 'Interrupted system call')) {
323
             // it's allowed while processing signals
324
            return null;
325
        }
326
327 40
        restore_error_handler();
328
329
        // raise all other issues to exceptions
330 40
        throw new \ErrorException($errstr, 0, $errno, $errfile, $errline);
331
    }
332
333
    /**
334
     * Heartbeat logic: check connection health here
335
     */
336
    protected function check_heartbeat()
337
    {
338
        // ignore unless heartbeat interval is set
339
        if ($this->heartbeat !== 0 && $this->last_read && $this->last_write) {
340
            $t = microtime(true);
341
            $t_read = round($t - $this->last_read);
342
            $t_write = round($t - $this->last_write);
343
344
            // server has gone away
345 40
            if (($this->heartbeat * 2) < $t_read) {
346
                $this->reconnect();
347
            }
348
349
            // time for client to send a heartbeat
350
            if (($this->heartbeat / 2) < $t_write) {
351
                $this->write_heartbeat();
352
            }
353
        }
354
    }
355
356
    /**
357
     * Sends a heartbeat message
358
     */
359
    protected function write_heartbeat()
360 40
    {
361
        $pkt = new AMQPWriter();
362 40
        $pkt->write_octet(8);
363 40
        $pkt->write_short(0);
364 32
        $pkt->write_long(0);
365 40
        $pkt->write_octet(0xCE);
366 40
        $this->write($pkt->getvalue());
367
    }
368
369
    public function close()
370
    {
371
        if (is_resource($this->sock)) {
372
            fclose($this->sock);
373
        }
374
        $this->sock = null;
375
    }
376
377
    /**
378
     * @return resource
379
     */
380
    public function get_socket()
381
    {
382
        return $this->sock;
383
    }
384
385
    /**
386
     * @return resource
387
     */
388
    public function getSocket()
389
    {
390
        return $this->get_socket();
391
    }
392
393
    /**
394
     * @param $sec
395
     * @param $usec
396
     * @return int|mixed
397
     */
398
    public function select($sec, $usec)
399
    {
400
        $read = array($this->sock);
401
        $write = null;
402
        $except = null;
403
        $result = false;
0 ignored issues
show
Unused Code introduced by
$result is not used, you could remove the assignment.

This check looks for variable assignements that are either overwritten by other assignments or where the variable is not used subsequently.

$myVar = 'Value';
$higher = false;

if (rand(1, 6) > 3) {
    $higher = true;
} else {
    $higher = false;
}

Both the $myVar assignment in line 1 and the $higher assignment in line 2 are dead. The first because $myVar is never used and the second because $higher is always overwritten for every possible time line.

Loading history...
404
405
        set_error_handler(array($this, 'error_handler'));
406 40
        $result = stream_select($read, $write, $except, $sec, $usec);
407
        restore_error_handler();
408
409 40
        return $result;
410
    }
411 40
412
    /**
413
     * @return mixed
414
     */
415
    protected function timed_out()
416
    {
417
        // get status of socket to determine whether or not it has timed out
418
        $info = stream_get_meta_data($this->sock);
419
420
        return $info['timed_out'];
421
    }
422
423
    /**
424
     * @throws \PhpAmqpLib\Exception\AMQPIOException
425
     */
426
    protected function enable_keepalive()
427
    {
428
        if (!function_exists('socket_import_stream')) {
429
            throw new AMQPIOException('Can not enable keepalive: function socket_import_stream does not exist');
430
        }
431 4
432
        if (!defined('SOL_SOCKET') || !defined('SO_KEEPALIVE')) {
433
            throw new AMQPIOException('Can not enable keepalive: SOL_SOCKET or SO_KEEPALIVE is not defined');
434
        }
435
436
        $socket = socket_import_stream($this->sock);
437
        socket_set_option($socket, SOL_SOCKET, SO_KEEPALIVE, 1);
438
    }
439
}
440