Completed
Push — master ( 83b09e...64d68f )
by
unknown
18s queued 10s
created

StreamIO::extract_error_code()   A

Complexity

Conditions 3
Paths 4

Size

Total Lines 15

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 12

Importance

Changes 0
Metric Value
dl 0
loc 15
ccs 0
cts 5
cp 0
rs 9.7666
c 0
b 0
f 0
cc 3
nc 4
nop 1
crap 12
1
<?php
2
namespace PhpAmqpLib\Wire\IO;
3
4
use PhpAmqpLib\Exception\AMQPConnectionClosedException;
5
use PhpAmqpLib\Exception\AMQPDataReadException;
6
use PhpAmqpLib\Exception\AMQPIOException;
7
use PhpAmqpLib\Exception\AMQPRuntimeException;
8
use PhpAmqpLib\Exception\AMQPTimeoutException;
9
use PhpAmqpLib\Helper\MiscHelper;
10
11
class StreamIO extends AbstractIO
12
{
13
    /** @var string */
14
    protected $protocol;
15
16
    /** @var resource */
17
    protected $context;
18
19
    /** @var resource */
20
    private $sock;
21
22
    /**
23
     * @param string $host
24
     * @param int $port
25
     * @param float $connection_timeout
26
     * @param float $read_write_timeout
27
     * @param null $context
28
     * @param bool $keepalive
29
     * @param int $heartbeat
30
     */
31
    public function __construct(
32
        $host,
33
        $port,
34
        $connection_timeout,
35
        $read_write_timeout = 130.0,
36
        $context = null,
37
        $keepalive = false,
38
        $heartbeat = 60,
39
        $ssl_protocol = null
40 102
    ) {
41
        if ($heartbeat !== 0 && ($read_write_timeout <= ($heartbeat * 2))) {
42
            throw new \InvalidArgumentException('read_write_timeout must be greater than 2x the heartbeat');
43
        }
44
45
        $this->protocol = 'tcp';
46
        $this->host = $host;
47
        $this->port = $port;
48
        $this->connection_timeout = $connection_timeout;
49
        $this->read_timeout = $read_write_timeout;
50 102
        $this->write_timeout = $read_write_timeout;
51 6
        $this->context = $context;
52
        $this->keepalive = $keepalive;
53
        $this->heartbeat = $heartbeat;
54
        $this->initial_heartbeat = $heartbeat;
55 96
        $this->canDispatchPcntlSignal = $this->isPcntlSignalEnabled();
56 96
57 96
        if (!is_resource($this->context) || get_resource_type($this->context) !== 'stream-context') {
58
            $this->context = stream_context_create();
59 96
        }
60 96
61 96
        // tcp_nodelay was added in 7.1.0
62 96
        if (PHP_VERSION_ID >= 70100) {
63 96
            stream_context_set_option($this->context, 'socket', 'tcp_nodelay', true);
64 96
        }
65 96
66 96
        $options = stream_context_get_options($this->context);
67 96
        if (!empty($options['ssl'])) {
68 96
            if (isset($ssl_protocol)) {
69 96
                $this->protocol = $ssl_protocol;
70
            } else {
71 96
                $this->protocol = 'ssl';
72 96
            }
73 48
        }
74
    }
75
76 96
    /**
77 32
     * @inheritdoc
78
     */
79
    public function connect()
80 96
    {
81 96
        $errstr = $errno = null;
82
83
        $remote = sprintf(
84
            '%s://%s:%s',
85
            $this->protocol,
86
            $this->host,
87
            $this->port
88 96
        );
89
90
        $this->set_error_handler();
91
92
        try {
93 90
            $this->sock = stream_socket_client(
94
                $remote,
95 90
                $errno,
96
                $errstr,
97 90
                $this->connection_timeout,
98 90
                STREAM_CLIENT_CONNECT,
99 90
                $this->context
100 90
            );
101 90
            $this->cleanup_error_handler();
102 45
        } catch (\ErrorException $e) {
103
            throw new AMQPIOException($e->getMessage());
104 90
        }
105
106
        if (false === $this->sock) {
107 90
            throw new AMQPIOException(
108 90
                sprintf(
109 75
                    'Error Connecting to server(%s): %s ',
110 75
                    $errno,
111 90
                    $errstr
112 90
                ),
113 90
                $errno
114 45
            );
115 90
        }
116 48
117 6
        if (false === stream_socket_get_name($this->sock, true)) {
118
            throw new AMQPIOException(
119
                sprintf(
120 84
                    'Connection refused: %s ',
121
                    $remote
122
                )
123
            );
124
        }
125
126
        list($sec, $uSec) = MiscHelper::splitSecondsMicroseconds(max($this->read_timeout, $this->write_timeout));
127
        if (!stream_set_timeout($this->sock, $sec, $uSec)) {
128
            throw new AMQPIOException('Timeout could not be set');
129
        }
130
131 84
        // php cannot capture signals while streams are blocking
132
        if ($this->canDispatchPcntlSignal) {
133
            stream_set_blocking($this->sock, 0);
134
            stream_set_write_buffer($this->sock, 0);
135
            if (function_exists('stream_set_read_buffer')) {
136
                stream_set_read_buffer($this->sock, 0);
137
            }
138
        } else {
139
            stream_set_blocking($this->sock, true);
140 84
        }
141 84
142
        if ($this->keepalive) {
143
            $this->enable_keepalive();
144
        }
145
        $this->heartbeat = $this->initial_heartbeat;
146 84
    }
147 84
148 84
    /**
149 84
     * @inheritdoc
150 84
     */
151 42
    public function read($len)
152 42
    {
153
        $this->check_heartbeat();
154
155
        list($timeout_sec, $timeout_uSec) = MiscHelper::splitSecondsMicroseconds($this->read_timeout);
156 84
157
        $read_start = microtime(true);
158
        $read = 0;
159 84
        $data = '';
160 84
161
        while ($read < $len) {
162
            if (!is_resource($this->sock) || feof($this->sock)) {
163
                $this->close();
164
                throw new AMQPConnectionClosedException('Broken pipe or closed connection');
165 84
            }
166
167 84
            $this->set_error_handler();
168
            try {
169 84
                $buffer = fread($this->sock, ($len - $read));
170
                $this->cleanup_error_handler();
171 84
            } catch (\ErrorException $e) {
172 84
                throw new AMQPDataReadException($e->getMessage(), $e->getCode(), $e);
173 84
            }
174
175 84
            if ($buffer === false) {
176 84
                throw new AMQPDataReadException('Error receiving data');
177
            }
178
179 View Code Duplication
            if ($buffer === '') {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
180 84
                $read_now = microtime(true);
181
                $t_read = $read_now - $read_start;
182 84
                if ($t_read > $this->read_timeout) {
183 84
                    throw new AMQPTimeoutException('Too many read attempts detected in StreamIO');
184 42
                }
185
                $this->select($timeout_sec, $timeout_uSec);
186
187
                continue;
188 84
            }
189
190
            $this->last_read = microtime(true);
191
            $read_start = $this->last_read;
192 84
            $read += mb_strlen($buffer, 'ASCII');
193
            $data .= $buffer;
194
        }
195
196
        if (mb_strlen($data, 'ASCII') !== $len) {
197
            throw new AMQPDataReadException(
198
                sprintf(
199
                    'Error reading data. Received %s instead of expected %s bytes',
200
                    mb_strlen($data, 'ASCII'),
201
                    $len
202
                )
203 84
            );
204 84
        }
205 84
206 84
        $this->last_read = microtime(true);
207 42
208
        return $data;
209 84
    }
210
211
    /**
212
     * @inheritdoc
213
     */
214
    public function write($data)
215
    {
216
        $written = 0;
217
        $len = mb_strlen($data, 'ASCII');
218
        $write_start = microtime(true);
219 84
220
        while ($written < $len) {
221 84
            if (!is_resource($this->sock)) {
222
                throw new AMQPConnectionClosedException('Broken pipe or closed connection');
223
            }
224
225
            $result = false;
226
            $this->set_error_handler();
227 90
            // OpenSSL's C library function SSL_write() can balk on buffers > 8192
228
            // bytes in length, so we're limiting the write size here. On both TLS
229 90
            // and plaintext connections, the write loop will continue until the
230 90
            // buffer has been fully written.
231 90
            // This behavior has been observed in OpenSSL dating back to at least
232
            // September 2002:
233 90
            // http://comments.gmane.org/gmane.comp.encryption.openssl.user/4361
234 90
            try {
235
                $buffer = mb_substr($data, $written, self::BUFFER_SIZE, 'ASCII');
236
                $result = fwrite($this->sock, $buffer);
237
                $this->cleanup_error_handler();
238 90
            } catch (\ErrorException $e) {
239 90
                $code = $this->last_error['errno'];
240
                switch ($code) {
241
                    case SOCKET_EPIPE:
242
                    case SOCKET_ENETDOWN:
243
                    case SOCKET_ENETUNREACH:
244
                    case SOCKET_ENETRESET:
245
                    case SOCKET_ECONNABORTED:
246
                    case SOCKET_ECONNRESET:
247
                    case SOCKET_ECONNREFUSED:
248 90
                    case SOCKET_ETIMEDOUT:
249 90
                        $this->close();
250 90
                        throw new AMQPConnectionClosedException(socket_strerror($code), $code, $e);
251 45
                    default:
252
                        throw new AMQPRuntimeException($e->getMessage(), $code, $e);
253
                }
254
            }
255
256
            if ($result === false) {
257
                throw new AMQPRuntimeException('Error sending data');
258
            }
259
260
            if ($this->timed_out()) {
261
                throw AMQPTimeoutException::writeTimeout($this->write_timeout);
262
            }
263
264
            $now = microtime(true);
265 90
            if ($result > 0) {
266
                $this->last_write = $write_start = $now;
267
                $written += $result;
268
            } else {
269 90
                if (feof($this->sock)) {
270
                    $this->close();
271
                    throw new AMQPConnectionClosedException('Broken pipe or closed connection');
272
                }
273 90
                if (($now - $write_start) > $this->write_timeout) {
274 90
                    throw AMQPTimeoutException::writeTimeout($this->write_timeout);
275 90
                }
276 90
                // check stream and prevent from high CPU usage
277 45
                $this->select_write();
278 6
            }
279
        }
280
    }
281
282 6
    /**
283 6
     * @inheritdoc
284
     */
285
    public function error_handler($errno, $errstr, $errfile, $errline, $errcontext = null)
286 6
    {
287
        $code = $this->extract_error_code($errstr);
288 45
        switch ($code) {
289 90
            // fwrite notice that the stream isn't ready - EAGAIN or EWOULDBLOCK
290
            case SOCKET_EAGAIN:
291
            case SOCKET_EWOULDBLOCK:
292
            // stream_select warning that it has been interrupted by a signal - EINTR
293
            case SOCKET_EINTR:
294 30
                return;
295
        }
296
297 30
        parent::error_handler($code > 0 ? $code : $errno, $errstr, $errfile, $errline, $errcontext);
298 30
    }
299
300 View Code Duplication
    public function close()
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
301
    {
302
        $this->disableHeartbeat();
303
        if (is_resource($this->sock)) {
304 30
            fclose($this->sock);
305
        }
306 18
        $this->sock = null;
307
        $this->last_read = null;
308
        $this->last_write = null;
309 12
    }
310 12
311
    /**
312 66
     * @inheritdoc
313
     */
314 66
    public function getSocket()
315 66
    {
316 60
        return $this->sock;
317 30
    }
318 66
319 66
    /**
320 66
     * @inheritdoc
321 66
     */
322
    protected function do_select($sec, $usec)
323
    {
324
        $read = array($this->sock);
325
        $write = null;
326
        $except = null;
327
328
        return stream_select($read, $write, $except, $sec, $usec);
329
    }
330
331
    /**
332
     * @return int|bool
333
     */
334 90
    protected function select_write()
335
    {
336 90
        $read = $except = null;
337 90
        $write = array($this->sock);
338 90
339
        return stream_select($read, $write, $except, 0, 100000);
340 90
    }
341
342
    /**
343
     * @return mixed
344
     */
345
    protected function timed_out()
346
    {
347
        // get status of socket to determine whether or not it has timed out
348
        $info = stream_get_meta_data($this->sock);
349
350
        return $info['timed_out'];
351
    }
352
353
    /**
354
     * @throws \PhpAmqpLib\Exception\AMQPIOException
355
     */
356
    protected function enable_keepalive()
357 84
    {
358
        if ($this->protocol === 'ssl') {
359
            throw new AMQPIOException('Can not enable keepalive: ssl connection does not support keepalive (#70939)');
360 84
        }
361
362 84
        if ($this->protocol === 'tls') {
363
            throw new AMQPIOException('Can not enable keepalive: tls connection does not support keepalive (#70939)');
364
        }
365
366
        if (!function_exists('socket_import_stream')) {
367
            throw new AMQPIOException('Can not enable keepalive: function socket_import_stream does not exist');
368
        }
369
370
        if (!defined('SOL_SOCKET') || !defined('SO_KEEPALIVE')) {
371
            throw new AMQPIOException('Can not enable keepalive: SOL_SOCKET or SO_KEEPALIVE is not defined');
372
        }
373
374
        $socket = socket_import_stream($this->sock);
0 ignored issues
show
Bug introduced by
Are you sure the assignment to $socket is correct as socket_import_stream($this->sock) (which targets socket_import_stream()) seems to always return null.

This check looks for function or method calls that always return null and whose return value is assigned to a variable.

class A
{
    function getObject()
    {
        return null;
    }

}

$a = new A();
$object = $a->getObject();

The method getObject() can return nothing but null, so it makes no sense to assign that value to a variable.

The reason is most likely that a function or method is imcomplete or has been reduced for debug purposes.

Loading history...
375
        socket_set_option($socket, SOL_SOCKET, SO_KEEPALIVE, 1);
376
    }
377
378
    /**
379
     * @param string $message
380
     * @return int
381
     */
382
    protected function extract_error_code($message)
383
    {
384
        if (0 === strpos($message, 'stream_select():')) {
385
            $pattern = '/\s+\[(\d+)\]:\s+/';
386
        } else {
387
            $pattern = '/\s+errno=(\d+)\s+/';
388
        }
389
        $matches = array();
390
        $result = preg_match($pattern, $message, $matches);
391
        if ($result > 0) {
392
            return (int)$matches[1];
393
        }
394
395
        return 0;
396
    }
397
}
398