Completed
Pull Request — master (#348)
by
unknown
22:06
created

StreamIO::__construct()   B

Complexity

Conditions 3
Paths 3

Size

Total Lines 30
Code Lines 24

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 15
CRAP Score 3.0839

Importance

Changes 6
Bugs 2 Features 0
Metric Value
c 6
b 2
f 0
dl 0
loc 30
ccs 15
cts 19
cp 0.7895
rs 8.8571
cc 3
eloc 24
nc 3
nop 7
crap 3.0839
1
<?php
2
namespace PhpAmqpLib\Wire\IO;
3
4
use PhpAmqpLib\Exception\AMQPIOException;
5
use PhpAmqpLib\Exception\AMQPRuntimeException;
6
use PhpAmqpLib\Exception\AMQPTimeoutException;
7
use PhpAmqpLib\Helper\MiscHelper;
8
use PhpAmqpLib\Wire\AMQPWriter;
9
10
class StreamIO extends AbstractIO
11
{
12
    /** @var string */
13
    protected $protocol;
14
15
    /** @var string */
16
    protected $host;
17
18
    /** @var int */
19
    protected $port;
20
21
    /** @var int */
22
    protected $connection_timeout;
23
24
    /** @var int */
25
    protected $read_write_timeout;
26
27
    /** @var resource */
28
    protected $context;
29
30
    /** @var bool */
31
    protected $keepalive;
32
33
    /** @var int */
34
    protected $heartbeat;
35
36
    /** @var float */
37
    protected $last_read;
38
39
    /** @var float */
40
    protected $last_write;
41
42
    /** @var array */
43
    protected $last_error;
44
45
    /** @var resource */
46
    private $sock;
47
48
    /** @var bool */
49
    private $canSelectNull;
50
51
    /** @var bool */
52
    private $canDispatchPcntlSignal;
53
54
    /**
55
     * @param string $host
56
     * @param int $port
57
     * @param int $connection_timeout
58
     * @param int $read_write_timeout
59
     * @param null $context
60
     * @param bool $keepalive
61
     * @param int $heartbeat
62
     */
63 40
    public function __construct(
64
        $host,
65
        $port,
66
        $connection_timeout,
67
        $read_write_timeout,
68
        $context = null,
69
        $keepalive = false,
70
        $heartbeat = 0
71
    ) {
72 40
        $this->protocol = 'tcp';
73 40
        $this->host = $host;
74 40
        $this->port = $port;
75 40
        $this->connection_timeout = $connection_timeout;
76 40
        $this->read_write_timeout = $read_write_timeout;
77 40
        $this->context = $context;
78 40
        $this->keepalive = $keepalive;
79 40
        $this->heartbeat = $heartbeat;
80 40
        $this->canSelectNull = true;
81 40
        $this->canDispatchPcntlSignal = $this->isPcntlSignalEnabled();
82
83 40
        if (is_null($this->context)) {
84 40
            $this->context = stream_context_create();
85 32
        } else {
86
            $this->protocol = 'ssl';
87
            // php bugs 41631 & 65137 prevent select null from working on ssl streams
88
            if (PHP_VERSION_ID < 50436) {
89
                $this->canSelectNull = false;
90
            }
91
        }
92 40
    }
93
94
    /**
95
     * @return bool
96
     */
97 40
    private function isPcntlSignalEnabled()
98
    {
99 40
        return extension_loaded('pcntl')
100 40
            && function_exists('pcntl_signal_dispatch')
101 40
            && (defined('AMQP_WITHOUT_SIGNALS') && !AMQP_WITHOUT_SIGNALS);
102
    }
103
104
    /**
105
     * Sets up the stream connection
106
     *
107
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
108
     * @throws \Exception
109
     */
110 40
    public function connect()
111
    {
112 40
        $errstr = $errno = null;
113
114 40
        $remote = sprintf(
115 40
            '%s://%s:%s',
116 40
            $this->protocol,
117 40
            $this->host,
118 40
            $this->port
119 32
        );
120
121 40
        set_error_handler(array($this, 'error_handler'));
122
123 40
        $this->sock = stream_socket_client(
124 32
            $remote,
125 32
            $errno,
126 32
            $errstr,
127 40
            $this->connection_timeout,
128 40
            STREAM_CLIENT_CONNECT,
129 40
            $this->context
130 32
        );
131
132 40
        restore_error_handler();
133
134 40
        if (false === $this->sock) {
135
            throw new AMQPRuntimeException(
136
                sprintf(
137
                    'Error Connecting to server(%s): %s ',
138
                    $errno,
139
                    $errstr
140
                ),
141
                $errno
142
            );
143
        }
144
145 40
        if (false === stream_socket_get_name($this->sock, true)) {
146
            throw new AMQPRuntimeException(
147
                sprintf(
148
                    'Connection refused: %s ',
149
                    $remote
150
                )
151
            );
152
        }
153
154 40
        list($sec, $uSec) = MiscHelper::splitSecondsMicroseconds($this->read_write_timeout);
155 40
        if (!stream_set_timeout($this->sock, $sec, $uSec)) {
156
            throw new AMQPIOException('Timeout could not be set');
157
        }
158
159
        // php cannot capture signals while streams are blocking
160 40
        if ($this->canDispatchPcntlSignal) {
161
            stream_set_blocking($this->sock, 0);
162
            stream_set_write_buffer($this->sock, 0);
163
            if (function_exists('stream_set_read_buffer')) {
164
                stream_set_read_buffer($this->sock, 0);
165
            }
166
        } else {
167 40
            stream_set_blocking($this->sock, 1);
168
        }
169
170 40
        if ($this->keepalive) {
171
            $this->enable_keepalive();
172
        }
173 40
    }
174
175
    /**
176
     * Reconnects the socket
177
     */
178 10
    public function reconnect()
179
    {
180 10
        $this->close();
181 10
        $this->connect();
182 10
    }
183
184
    /**
185
     * @param $len
186
     * @throws \PhpAmqpLib\Exception\AMQPIOException
187
     * @return mixed|string
188
     */
189 40
    public function read($len)
190
    {
191 40
        $read = 0;
192 40
        $data = '';
193
194 40
        while ($read < $len) {
195 40
            $this->check_heartbeat();
196
197 40
            if (!is_resource($this->sock) || feof($this->sock)) {
198
                throw new AMQPRuntimeException('Broken pipe or closed connection');
199
            }
200
201 40
            set_error_handler(array($this, 'error_handler'));
202 40
            $buffer = fread($this->sock, ($len - $read));
203 40
            restore_error_handler();
204
205 40
            if ($buffer === false) {
206
                throw new AMQPRuntimeException('Error receiving data');
207
            }
208
209 40
            if ($buffer === '') {
210
                if ($this->canDispatchPcntlSignal) {
211
                    // prevent cpu from being consumed while waiting
212
                    if ($this->canSelectNull) {
213
                        $this->select($this->heartbeat ?  $this->heartbeat + 1 : null, null);
214
                        pcntl_signal_dispatch();
215
                    } else {
216
                        usleep(100000);
217
                        pcntl_signal_dispatch();
218
                    }
219
                }
220
                continue;
221
            }
222
223 40
            $read += mb_strlen($buffer, 'ASCII');
224 40
            $data .= $buffer;
225 32
        }
226
227 40
        if (mb_strlen($data, 'ASCII') !== $len) {
228
            throw new AMQPRuntimeException(
229
                sprintf(
230
                    'Error reading data. Received %s instead of expected %s bytes',
231
                    mb_strlen($data, 'ASCII'),
232
                    $len
233
                )
234
            );
235
        }
236
237 40
        $this->last_read = microtime(true);
238 40
        return $data;
239
    }
240
241
    /**
242
     * @param $data
243
     * @return mixed|void
244
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
245
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException
246
     */
247 40
    public function write($data)
248
    {
249 40
        $written = 0;
250 40
        $buffer  = false;
251
        $tw      = false;
252 40
        $len = mb_strlen($data, 'ASCII');
253
254 40
        while ($written < $len) {
255
	    if ($buffer !== 0 && $this->read_write_timeout) $tw = time();
256
257
            if (!is_resource($this->sock)) {
258 40
                throw new AMQPRuntimeException('Broken pipe or closed connection');
259
            }
260
261
            set_error_handler(array($this, 'error_handler'));
262
            // OpenSSL's C library function SSL_write() can balk on buffers > 8192
263
            // bytes in length, so we're limiting the write size here. On both TLS
264
            // and plaintext connections, the write loop will continue until the
265
            // buffer has been fully written.
266 40
            // This behavior has been observed in OpenSSL dating back to at least
267 40
            // September 2002:
268
            // http://comments.gmane.org/gmane.comp.encryption.openssl.user/4361
269 40
            $buffer = fwrite($this->sock, $data, 8192);
270
            restore_error_handler();
271
272
            if ($buffer === false) {
273 40
                throw new AMQPRuntimeException('Error sending data');
274
            }
275
276
            if ($buffer === 0 && feof($this->sock)) {
277 40
                throw new AMQPRuntimeException('Broken pipe or closed connection');
278
            }
279
280
            if ($buffer === 0 && $tw && (time() - $tw > $this->read_write_timeout)) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $tw of type integer|false is loosely compared to true; this is ambiguous if the integer can be zero. You might want to explicitly use !== null instead.

In PHP, under loose comparison (like ==, or !=, or switch conditions), values of different types might be equal.

For integer values, zero is a special case, in particular the following results might be unexpected:

0   == false // true
0   == null  // true
123 == false // false
123 == null  // false

// It is often better to use strict comparison
0 === false // false
0 === null  // false
Loading history...
281 40
                throw new AMQPTimeoutException('Error sending data. write timed out');
282
            }
283 40
284 40
            if ($this->timed_out()) {
285 32
                throw new AMQPTimeoutException('Error sending data. Socket connection timed out');
286 32
            }
287
288 40
            $written += $buffer;
289 40
290
            if ($buffer > 0) {
291
                $data = substr($data, $buffer, strlen($data) - $buffer);
292
            }
293
        }
294
295
        $this->last_write = microtime(true);
296
    }
297
298
    /**
299
     * Internal error handler to deal with stream and socket errors that need to be ignored
300
     *
301
     * @param  int $errno
302
     * @param  string $errstr
303
     * @param  string $errfile
304
     * @param  int $errline
305
     * @param  array $errcontext
306
     * @return null
307
     * @throws \ErrorException
308
     */
309
    public function error_handler($errno, $errstr, $errfile, $errline, $errcontext = null)
310
    {
311
        $this->last_error = compact('errno', 'errstr', 'errfile', 'errline', 'errcontext');
312
313
        // fwrite notice that the stream isn't ready
314
        if (strstr($errstr, 'Resource temporarily unavailable')) {
315
             // it's allowed to retry
316
            return null;
317
        }
318
319
        // stream_select warning that it has been interrupted by a signal
320
        if (strstr($errstr, 'Interrupted system call')) {
321
             // it's allowed while processing signals
322
            return null;
323
        }
324
325
        restore_error_handler();
326
327 40
        // raise all other issues to exceptions
328
        throw new \ErrorException($errstr, 0, $errno, $errfile, $errline);
329
    }
330 40
331
    /**
332
     * Heartbeat logic: check connection health here
333
     */
334
    protected function check_heartbeat()
335
    {
336
        // ignore unless heartbeat interval is set
337
        if ($this->heartbeat !== 0 && $this->last_read && $this->last_write) {
338
            $t = microtime(true);
339
            $t_read = round($t - $this->last_read);
340
            $t_write = round($t - $this->last_write);
341
342
            // server has gone away
343
            if (($this->heartbeat * 2) < $t_read) {
344
                $this->reconnect();
345 40
            }
346
347
            // time for client to send a heartbeat
348
            if (($this->heartbeat / 2) < $t_write) {
349
                $this->write_heartbeat();
350
            }
351
        }
352
    }
353
354
    /**
355
     * Sends a heartbeat message
356
     */
357
    protected function write_heartbeat()
358
    {
359
        $pkt = new AMQPWriter();
360 40
        $pkt->write_octet(8);
361
        $pkt->write_short(0);
362 40
        $pkt->write_long(0);
363 40
        $pkt->write_octet(0xCE);
364 32
        $this->write($pkt->getvalue());
365 40
    }
366 40
367
    public function close()
368
    {
369
        if (is_resource($this->sock)) {
370
            fclose($this->sock);
371
        }
372
        $this->sock = null;
373
    }
374
375
    /**
376
     * @return resource
377
     */
378
    public function get_socket()
379
    {
380
        return $this->sock;
381
    }
382
383
    /**
384
     * @return resource
385
     */
386
    public function getSocket()
387
    {
388
        return $this->get_socket();
389
    }
390
391
    /**
392
     * @param $sec
393
     * @param $usec
394
     * @return int|mixed
395
     */
396
    public function select($sec, $usec)
397
    {
398
        $read = array($this->sock);
399
        $write = null;
400
        $except = null;
401
        $result = false;
0 ignored issues
show
Unused Code introduced by
$result is not used, you could remove the assignment.

This check looks for variable assignements that are either overwritten by other assignments or where the variable is not used subsequently.

$myVar = 'Value';
$higher = false;

if (rand(1, 6) > 3) {
    $higher = true;
} else {
    $higher = false;
}

Both the $myVar assignment in line 1 and the $higher assignment in line 2 are dead. The first because $myVar is never used and the second because $higher is always overwritten for every possible time line.

Loading history...
402
403
        set_error_handler(array($this, 'error_handler'));
404
        $result = stream_select($read, $write, $except, $sec, $usec);
405
        restore_error_handler();
406 40
407
        return $result;
408
    }
409 40
410
    /**
411 40
     * @return mixed
412
     */
413
    protected function timed_out()
414
    {
415
        // get status of socket to determine whether or not it has timed out
416
        $info = stream_get_meta_data($this->sock);
417
418
        return $info['timed_out'];
419
    }
420
421
    /**
422
     * @throws \PhpAmqpLib\Exception\AMQPIOException
423
     */
424
    protected function enable_keepalive()
425
    {
426
        if (!function_exists('socket_import_stream')) {
427
            throw new AMQPIOException('Can not enable keepalive: function socket_import_stream does not exist');
428
        }
429
430
        if (!defined('SOL_SOCKET') || !defined('SO_KEEPALIVE')) {
431 4
            throw new AMQPIOException('Can not enable keepalive: SOL_SOCKET or SO_KEEPALIVE is not defined');
432
        }
433
434
        $socket = socket_import_stream($this->sock);
435
        socket_set_option($socket, SOL_SOCKET, SO_KEEPALIVE, 1);
436
    }
437
}
438