Completed
Push — master ( 2acff7...79c6d0 )
by John
20:11 queued 12s
created

StreamIO::disableHeartbeat()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 6
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 6
ccs 0
cts 0
cp 0
rs 9.4285
cc 1
eloc 3
nc 1
nop 0
crap 2
1
<?php
2
namespace PhpAmqpLib\Wire\IO;
3
4
use PhpAmqpLib\Exception\AMQPIOException;
5
use PhpAmqpLib\Exception\AMQPRuntimeException;
6
use PhpAmqpLib\Exception\AMQPTimeoutException;
7
use PhpAmqpLib\Helper\MiscHelper;
8
use PhpAmqpLib\Wire\AMQPWriter;
9
10
class StreamIO extends AbstractIO
11
{
12
    /** @var string */
13
    protected $protocol;
14
15
    /** @var string */
16
    protected $host;
17
18
    /** @var int */
19
    protected $port;
20
21
    /** @var int */
22
    protected $connection_timeout;
23
24
    /** @var int */
25
    protected $read_write_timeout;
26
27
    /** @var resource */
28
    protected $context;
29
30
    /** @var bool */
31
    protected $keepalive;
32
33
    /** @var int */
34
    protected $heartbeat;
35
36
    /** @var float */
37
    protected $last_read;
38
39
    /** @var float */
40
    protected $last_write;
41
42
    /** @var array */
43
    protected $last_error;
44
45
    /** @var resource */
46
    private $sock;
47
48
    /** @var bool */
49
    private $canSelectNull;
50
51
    /** @var bool */
52
    private $canDispatchPcntlSignal;
53
54
    /**
55
     * @param string $host
56
     * @param int $port
57
     * @param int $connection_timeout
58
     * @param int $read_write_timeout
59
     * @param null $context
60
     * @param bool $keepalive
61
     * @param int $heartbeat
62
     */
63 48
    public function __construct(
64
        $host,
65
        $port,
66
        $connection_timeout,
67
        $read_write_timeout,
68
        $context = null,
69
        $keepalive = false,
70
        $heartbeat = 0
71
    ) {
72 48
        $this->protocol = 'tcp';
73 48
        $this->host = $host;
74 48
        $this->port = $port;
75 48
        $this->connection_timeout = $connection_timeout;
76 48
        $this->read_write_timeout = $read_write_timeout;
77 48
        $this->context = $context;
78 48
        $this->keepalive = $keepalive;
79 48
        $this->heartbeat = $heartbeat;
80 48
        $this->canSelectNull = true;
81 48
        $this->canDispatchPcntlSignal = $this->isPcntlSignalEnabled();
82
83 48
        if (is_null($this->context)) {
84 48
            $this->context = stream_context_create();
85 32
        } else {
86
            $this->protocol = 'ssl';
87
            // php bugs 41631 & 65137 prevent select null from working on ssl streams
88
            if (PHP_VERSION_ID < 50436) {
89
                $this->canSelectNull = false;
90
            }
91
        }
92 48
    }
93
94
    /**
95
     * @return bool
96
     */
97 48
    private function isPcntlSignalEnabled()
98
    {
99 48
        return extension_loaded('pcntl')
100 48
            && function_exists('pcntl_signal_dispatch')
101 48
            && (defined('AMQP_WITHOUT_SIGNALS') && !AMQP_WITHOUT_SIGNALS);
102
    }
103
104
    /**
105
     * Sets up the stream connection
106
     *
107
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
108
     * @throws \Exception
109
     */
110 48
    public function connect()
111
    {
112 48
        $errstr = $errno = null;
113
114 48
        $remote = sprintf(
115 48
            '%s://%s:%s',
116 48
            $this->protocol,
117 48
            $this->host,
118 48
            $this->port
119 32
        );
120
121 48
        set_error_handler(array($this, 'error_handler'));
122
123 48
        $this->sock = stream_socket_client(
124 32
            $remote,
125 32
            $errno,
126 32
            $errstr,
127 48
            $this->connection_timeout,
128 48
            STREAM_CLIENT_CONNECT,
129 48
            $this->context
130 32
        );
131
132 48
        restore_error_handler();
133
134 48
        if (false === $this->sock) {
135
            throw new AMQPRuntimeException(
136
                sprintf(
137
                    'Error Connecting to server(%s): %s ',
138
                    $errno,
139
                    $errstr
140
                ),
141
                $errno
142
            );
143
        }
144
145 48
        if (false === stream_socket_get_name($this->sock, true)) {
146
            throw new AMQPRuntimeException(
147
                sprintf(
148
                    'Connection refused: %s ',
149
                    $remote
150
                )
151
            );
152
        }
153
154 48
        list($sec, $uSec) = MiscHelper::splitSecondsMicroseconds($this->read_write_timeout);
155 48
        if (!stream_set_timeout($this->sock, $sec, $uSec)) {
156
            throw new AMQPIOException('Timeout could not be set');
157
        }
158
159
        // php cannot capture signals while streams are blocking
160 48
        if ($this->canDispatchPcntlSignal) {
161
            stream_set_blocking($this->sock, 0);
162
            stream_set_write_buffer($this->sock, 0);
163
            if (function_exists('stream_set_read_buffer')) {
164
                stream_set_read_buffer($this->sock, 0);
165
            }
166
        } else {
167 48
            stream_set_blocking($this->sock, 1);
168
        }
169
170 48
        if ($this->keepalive) {
171
            $this->enable_keepalive();
172
        }
173 48
    }
174
175
    /**
176
     * Reconnects the socket
177
     */
178 12
    public function reconnect()
179
    {
180 12
        $this->close();
181 12
        $this->connect();
182 12
    }
183
184
    /**
185
     * @param $len
186
     * @throws \PhpAmqpLib\Exception\AMQPIOException
187
     * @return mixed|string
188
     */
189 48
    public function read($len)
190
    {
191 48
        $read = 0;
192 48
        $data = '';
193
194 48
        while ($read < $len) {
195 48
            $this->check_heartbeat();
196
197 48
            if (!is_resource($this->sock) || feof($this->sock)) {
198
                throw new AMQPRuntimeException('Broken pipe or closed connection');
199
            }
200
201 48
            set_error_handler(array($this, 'error_handler'));
202 48
            $buffer = fread($this->sock, ($len - $read));
203 48
            restore_error_handler();
204
205 48
            if ($buffer === false) {
206
                throw new AMQPRuntimeException('Error receiving data');
207
            }
208
209 48
            if ($buffer === '') {
210
                if ($this->canDispatchPcntlSignal) {
211
                    // prevent cpu from being consumed while waiting
212
                    if ($this->canSelectNull) {
213
                        $this->select(null, null);
214
                        pcntl_signal_dispatch();
215
                    } else {
216
                        usleep(100000);
217
                        pcntl_signal_dispatch();
218
                    }
219
                }
220
                continue;
221
            }
222
223 48
            $read += mb_strlen($buffer, 'ASCII');
224 48
            $data .= $buffer;
225 32
        }
226
227 48
        if (mb_strlen($data, 'ASCII') !== $len) {
228
            throw new AMQPRuntimeException(
229
                sprintf(
230
                    'Error reading data. Received %s instead of expected %s bytes',
231
                    mb_strlen($data, 'ASCII'),
232
                    $len
233
                )
234
            );
235
        }
236
237 48
        $this->last_read = microtime(true);
238 48
        return $data;
239
    }
240
241
    /**
242
     * @param $data
243
     * @return mixed|void
244
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
245
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException
246
     */
247 48
    public function write($data)
248
    {
249 48
        $written = 0;
250 48
        $len = mb_strlen($data, 'ASCII');
251
252 48
        while ($written < $len) {
253
254 48
            if (!is_resource($this->sock)) {
255
                throw new AMQPRuntimeException('Broken pipe or closed connection');
256
            }
257
258 48
            set_error_handler(array($this, 'error_handler'));
259
            // OpenSSL's C library function SSL_write() can balk on buffers > 8192
260
            // bytes in length, so we're limiting the write size here. On both TLS
261
            // and plaintext connections, the write loop will continue until the
262
            // buffer has been fully written.
263
            // This behavior has been observed in OpenSSL dating back to at least
264
            // September 2002:
265
            // http://comments.gmane.org/gmane.comp.encryption.openssl.user/4361
266 48
            $buffer = fwrite($this->sock, $data, 8192);
267 48
            restore_error_handler();
268
269 48
            if ($buffer === false) {
270
                throw new AMQPRuntimeException('Error sending data');
271
            }
272
273 48
            if ($buffer === 0 && feof($this->sock)) {
274
                throw new AMQPRuntimeException('Broken pipe or closed connection');
275
            }
276
277 48
            if ($this->timed_out()) {
278
                throw new AMQPTimeoutException('Error sending data. Socket connection timed out');
279
            }
280
281 48
            $written += $buffer;
282
283 48
            if ($buffer > 0) {
284 48
                $data = mb_substr($data, $buffer, mb_strlen($data, 'ASCII') - $buffer, 'ASCII');
285 32
            }
286 32
        }
287
288 48
        $this->last_write = microtime(true);
289 48
    }
290
291
    /**
292
     * Internal error handler to deal with stream and socket errors that need to be ignored
293
     *
294
     * @param  int $errno
295
     * @param  string $errstr
296
     * @param  string $errfile
297
     * @param  int $errline
298
     * @param  array $errcontext
299
     * @return null
300
     * @throws \ErrorException
301
     */
302
    public function error_handler($errno, $errstr, $errfile, $errline, $errcontext = null)
303
    {
304
        $this->last_error = compact('errno', 'errstr', 'errfile', 'errline', 'errcontext');
305
306
        // fwrite notice that the stream isn't ready
307
        if (strstr($errstr, 'Resource temporarily unavailable')) {
308
             // it's allowed to retry
309
            return null;
310
        }
311
312
        // stream_select warning that it has been interrupted by a signal
313
        if (strstr($errstr, 'Interrupted system call')) {
314
             // it's allowed while processing signals
315
            return null;
316
        }
317
318
        restore_error_handler();
319
320
        // raise all other issues to exceptions
321
        throw new \ErrorException($errstr, 0, $errno, $errfile, $errline);
322
    }
323
324
    /**
325
     * Heartbeat logic: check connection health here
326
     */
327 48
    protected function check_heartbeat()
328
    {
329
        // ignore unless heartbeat interval is set
330 48
        if ($this->heartbeat !== 0 && $this->last_read && $this->last_write) {
331
            $t = microtime(true);
332
            $t_read = round($t - $this->last_read);
333
            $t_write = round($t - $this->last_write);
334
335
            // server has gone away
336
            if (($this->heartbeat * 2) < $t_read) {
337
                $this->reconnect();
338
            }
339
340
            // time for client to send a heartbeat
341
            if (($this->heartbeat / 2) < $t_write) {
342
                $this->write_heartbeat();
343
            }
344
        }
345 48
    }
346
347
    /**
348
     * Sends a heartbeat message
349
     */
350
    protected function write_heartbeat()
351
    {
352
        $pkt = new AMQPWriter();
353
        $pkt->write_octet(8);
354
        $pkt->write_short(0);
355
        $pkt->write_long(0);
356
        $pkt->write_octet(0xCE);
357
        $this->write($pkt->getvalue());
358
    }
359
360 48
    public function close()
361
    {
362 48
        if (is_resource($this->sock)) {
363 48
            fclose($this->sock);
364 32
        }
365 48
        $this->sock = null;
366 48
    }
367
368
    /**
369
     * @return resource
370
     */
371
    public function get_socket()
372
    {
373
        return $this->sock;
374
    }
375
376
    /**
377
     * @return resource
378
     */
379
    public function getSocket()
380
    {
381
        return $this->get_socket();
382
    }
383
384
    /**
385
     * @param $sec
386
     * @param $usec
387
     * @return int|mixed
388
     */
389
    public function select($sec, $usec)
390
    {
391
        $read = array($this->sock);
392
        $write = null;
393
        $except = null;
394
        $result = false;
0 ignored issues
show
Unused Code introduced by
$result is not used, you could remove the assignment.

This check looks for variable assignements that are either overwritten by other assignments or where the variable is not used subsequently.

$myVar = 'Value';
$higher = false;

if (rand(1, 6) > 3) {
    $higher = true;
} else {
    $higher = false;
}

Both the $myVar assignment in line 1 and the $higher assignment in line 2 are dead. The first because $myVar is never used and the second because $higher is always overwritten for every possible time line.

Loading history...
395
396
        set_error_handler(array($this, 'error_handler'));
397
        $result = stream_select($read, $write, $except, $sec, $usec);
398
        restore_error_handler();
399
400
        return $result;
401
    }
402
403
    /**
404
     * @return mixed
405
     */
406 48
    protected function timed_out()
407
    {
408
        // get status of socket to determine whether or not it has timed out
409 48
        $info = stream_get_meta_data($this->sock);
410
411 48
        return $info['timed_out'];
412
    }
413
414
    /**
415
     * @throws \PhpAmqpLib\Exception\AMQPIOException
416
     */
417
    protected function enable_keepalive()
418
    {
419
        if (!function_exists('socket_import_stream')) {
420
            throw new AMQPIOException('Can not enable keepalive: function socket_import_stream does not exist');
421
        }
422
423
        if (!defined('SOL_SOCKET') || !defined('SO_KEEPALIVE')) {
424
            throw new AMQPIOException('Can not enable keepalive: SOL_SOCKET or SO_KEEPALIVE is not defined');
425
        }
426
427
        $socket = socket_import_stream($this->sock);
428
        socket_set_option($socket, SOL_SOCKET, SO_KEEPALIVE, 1);
429
    }
430
431 4
    /**
432
     * @return $this
433
     */
434
    public function disableHeartbeat()
435
    {
436
        $this->heartbeat = 0;
437
438
        return $this;
439
    }
440
}
441