Completed
Push — master ( d75eab...48de7d )
by
unknown
24s
created

StreamIO::enable_keepalive()   B

Complexity

Conditions 6
Paths 5

Size

Total Lines 21

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 42

Importance

Changes 0
Metric Value
dl 0
loc 21
ccs 0
cts 12
cp 0
rs 8.9617
c 0
b 0
f 0
cc 6
nc 5
nop 0
crap 42
1
<?php
2
namespace PhpAmqpLib\Wire\IO;
3
4
use PhpAmqpLib\Exception\AMQPConnectionClosedException;
5
use PhpAmqpLib\Exception\AMQPDataReadException;
6
use PhpAmqpLib\Exception\AMQPIOException;
7
use PhpAmqpLib\Exception\AMQPRuntimeException;
8
use PhpAmqpLib\Exception\AMQPTimeoutException;
9
use PhpAmqpLib\Helper\MiscHelper;
10
11
class StreamIO extends AbstractIO
12
{
13
    /** @var string */
14
    protected $protocol;
15
16
    /** @var resource */
17
    protected $context;
18
19
    /** @var resource */
20
    private $sock;
21
22
    /** @var string */
23
    private static $SOCKET_STRERROR_EAGAIN;
24
25
    /** @var string */
26
    private static $SOCKET_STRERROR_EWOULDBLOCK;
27
28
    /** @var string */
29
    private static $SOCKET_STRERROR_EINTR;
30
31
    /**
32
     * @param string $host
33
     * @param int $port
34
     * @param float $connection_timeout
35
     * @param float $read_write_timeout
36
     * @param null $context
37
     * @param bool $keepalive
38
     * @param int $heartbeat
39
     */
40 102
    public function __construct(
41
        $host,
42
        $port,
43
        $connection_timeout,
44
        $read_write_timeout = 130.0,
45
        $context = null,
46
        $keepalive = false,
47
        $heartbeat = 60,
48
        $ssl_protocol = null
49
    ) {
50 102
        if ($heartbeat !== 0 && ($read_write_timeout <= ($heartbeat * 2))) {
51 6
            throw new \InvalidArgumentException('read_write_timeout must be greater than 2x the heartbeat');
52
        }
53
54
        // SOCKET_EAGAIN is not defined in Windows
55 96
        self::$SOCKET_STRERROR_EAGAIN = socket_strerror(defined('SOCKET_EAGAIN') ? SOCKET_EAGAIN : SOCKET_EWOULDBLOCK);
56 96
        self::$SOCKET_STRERROR_EWOULDBLOCK = socket_strerror(SOCKET_EWOULDBLOCK);
57 96
        self::$SOCKET_STRERROR_EINTR = socket_strerror(SOCKET_EINTR);
58
59 96
        $this->protocol = 'tcp';
60 96
        $this->host = $host;
61 96
        $this->port = $port;
62 96
        $this->connection_timeout = $connection_timeout;
63 96
        $this->read_timeout = $read_write_timeout;
64 96
        $this->write_timeout = $read_write_timeout;
65 96
        $this->context = $context;
66 96
        $this->keepalive = $keepalive;
67 96
        $this->heartbeat = $heartbeat;
68 96
        $this->initial_heartbeat = $heartbeat;
69 96
        $this->canDispatchPcntlSignal = $this->isPcntlSignalEnabled();
70
71 96
        if (!is_resource($this->context) || get_resource_type($this->context) !== 'stream-context') {
72 96
            $this->context = stream_context_create();
73 48
        }
74
75
        // tcp_nodelay was added in 7.1.0
76 96
        if (PHP_VERSION_ID >= 70100) {
77 32
            stream_context_set_option($this->context, 'socket', 'tcp_nodelay', true);
78
        }
79
80 96
        $options = stream_context_get_options($this->context);
81 96
        if (!empty($options['ssl'])) {
82
            if (isset($ssl_protocol)) {
83
                $this->protocol = $ssl_protocol;
84
            } else {
85
                $this->protocol = 'ssl';
86
            }
87
        }
88 96
    }
89
90
    /**
91
     * @inheritdoc
92
     */
93 90
    public function connect()
94
    {
95 90
        $errstr = $errno = null;
96
97 90
        $remote = sprintf(
98 90
            '%s://%s:%s',
99 90
            $this->protocol,
100 90
            $this->host,
101 90
            $this->port
102 45
        );
103
104 90
        $this->set_error_handler();
105
106
        try {
107 90
            $this->sock = stream_socket_client(
108 90
                $remote,
109 75
                $errno,
110 75
                $errstr,
111 90
                $this->connection_timeout,
112 90
                STREAM_CLIENT_CONNECT,
113 90
                $this->context
114 45
            );
115 90
            $this->cleanup_error_handler();
116 48
        } catch (\ErrorException $e) {
117 6
            throw new AMQPIOException($e->getMessage());
118
        }
119
120 84
        if (false === $this->sock) {
121
            throw new AMQPIOException(
122
                sprintf(
123
                    'Error Connecting to server(%s): %s ',
124
                    $errno,
125
                    $errstr
126
                ),
127
                $errno
128
            );
129
        }
130
131 84
        if (false === stream_socket_get_name($this->sock, true)) {
132
            throw new AMQPIOException(
133
                sprintf(
134
                    'Connection refused: %s ',
135
                    $remote
136
                )
137
            );
138
        }
139
140 84
        list($sec, $uSec) = MiscHelper::splitSecondsMicroseconds(max($this->read_timeout, $this->write_timeout));
141 84
        if (!stream_set_timeout($this->sock, $sec, $uSec)) {
142
            throw new AMQPIOException('Timeout could not be set');
143
        }
144
145
        // php cannot capture signals while streams are blocking
146 84
        if ($this->canDispatchPcntlSignal) {
147 84
            stream_set_blocking($this->sock, 0);
148 84
            stream_set_write_buffer($this->sock, 0);
149 84
            if (function_exists('stream_set_read_buffer')) {
150 84
                stream_set_read_buffer($this->sock, 0);
151 42
            }
152 42
        } else {
153
            stream_set_blocking($this->sock, true);
154
        }
155
156 84
        if ($this->keepalive) {
157
            $this->enable_keepalive();
158
        }
159 84
        $this->heartbeat = $this->initial_heartbeat;
160 84
    }
161
162
    /**
163
     * @inheritdoc
164
     */
165 84
    public function read($len)
166
    {
167 84
        $this->check_heartbeat();
168
169 84
        list($timeout_sec, $timeout_uSec) = MiscHelper::splitSecondsMicroseconds($this->read_timeout);
170
171 84
        $read_start = microtime(true);
172 84
        $read = 0;
173 84
        $data = '';
174
175 84
        while ($read < $len) {
176 84
            if (!is_resource($this->sock) || feof($this->sock)) {
177
                throw new AMQPConnectionClosedException('Broken pipe or closed connection');
178
            }
179
180 84
            $this->set_error_handler();
181
            try {
182 84
                $buffer = fread($this->sock, ($len - $read));
183 84
                $this->cleanup_error_handler();
184 42
            } catch (\ErrorException $e) {
185
                throw new AMQPDataReadException($e->getMessage(), $e->getCode(), $e);
186
            }
187
188 84
            if ($buffer === false) {
189
                throw new AMQPDataReadException('Error receiving data');
190
            }
191
192 84 View Code Duplication
            if ($buffer === '') {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
193
                $read_now = microtime(true);
194
                $t_read = $read_now - $read_start;
195
                if ($t_read > $this->read_timeout) {
196
                    throw new AMQPTimeoutException('Too many read attempts detected in StreamIO');
197
                }
198
                $this->select($timeout_sec, $timeout_uSec);
199
200
                continue;
201
            }
202
203 84
            $this->last_read = microtime(true);
204 84
            $read_start = $this->last_read;
205 84
            $read += mb_strlen($buffer, 'ASCII');
206 84
            $data .= $buffer;
207 42
        }
208
209 84
        if (mb_strlen($data, 'ASCII') !== $len) {
210
            throw new AMQPDataReadException(
211
                sprintf(
212
                    'Error reading data. Received %s instead of expected %s bytes',
213
                    mb_strlen($data, 'ASCII'),
214
                    $len
215
                )
216
            );
217
        }
218
219 84
        $this->last_read = microtime(true);
220
221 84
        return $data;
222
    }
223
224
    /**
225
     * @inheritdoc
226
     */
227 90
    public function write($data)
228
    {
229 90
        $written = 0;
230 90
        $len = mb_strlen($data, 'ASCII');
231 90
        $write_start = microtime(true);
232
233 90
        while ($written < $len) {
234 90
            if (!is_resource($this->sock)) {
235
                throw new AMQPConnectionClosedException('Broken pipe or closed connection');
236
            }
237
238 90
            $result = false;
239 90
            $this->set_error_handler();
240
            // OpenSSL's C library function SSL_write() can balk on buffers > 8192
241
            // bytes in length, so we're limiting the write size here. On both TLS
242
            // and plaintext connections, the write loop will continue until the
243
            // buffer has been fully written.
244
            // This behavior has been observed in OpenSSL dating back to at least
245
            // September 2002:
246
            // http://comments.gmane.org/gmane.comp.encryption.openssl.user/4361
247
            try {
248 90
                $buffer = mb_substr($data, $written, self::BUFFER_SIZE, 'ASCII');
249 90
                $result = fwrite($this->sock, $buffer);
250 90
                $this->cleanup_error_handler();
251 45
            } catch (\ErrorException $e) {
252
                $code = $this->last_error['errno'];
253
                switch ($code) {
254
                    case 8: // constant is missing for this error type
255
                        $this->close();
256
                        throw new AMQPConnectionClosedException('Broken pipe or closed connection', $code, $e);
257
                    case SOCKET_ETIMEDOUT:
258
                        $this->close();
259
                        throw new AMQPConnectionClosedException('Connection timed out', $code, $e);
260
                    default:
261
                        throw new AMQPRuntimeException($e->getMessage(), $code, $e);
262
                }
263
            }
264
265 90
            if ($result === false) {
266
                throw new AMQPRuntimeException('Error sending data');
267
            }
268
269 90
            if ($this->timed_out()) {
270
                throw AMQPTimeoutException::writeTimeout($this->write_timeout);
271
            }
272
273 90
            $now = microtime(true);
274 90
            if ($result > 0) {
275 90
                $this->last_write = $write_start = $now;
276 90
                $written += $result;
277 45
            } else {
278 6
                if (feof($this->sock)) {
279
                    $this->close();
280
                    throw new AMQPConnectionClosedException('Broken pipe or closed connection');
281
                }
282 6
                if (($now - $write_start) > $this->write_timeout) {
283 6
                    throw AMQPTimeoutException::writeTimeout($this->write_timeout);
284
                }
285
                // check stream and prevent from high CPU usage
286 6
                $this->select_write();
287
            }
288 45
        }
289 90
    }
290
291
    /**
292
     * @inheritdoc
293
     */
294 30
    public function error_handler($errno, $errstr, $errfile, $errline, $errcontext = null)
295
    {
296
        // fwrite notice that the stream isn't ready - EAGAIN or EWOULDBLOCK
297 30
        if (strpos($errstr, self::$SOCKET_STRERROR_EAGAIN) !== false
298 30
            || strpos($errstr, self::$SOCKET_STRERROR_EWOULDBLOCK) !== false) {
299
             // it's allowed to retry
300
            return;
301
        }
302
303
        // stream_select warning that it has been interrupted by a signal - EINTR
304 30
        if (strpos($errstr, self::$SOCKET_STRERROR_EINTR) !== false) {
305
             // it's allowed while processing signals
306 18
            return;
307
        }
308
309 12
        parent::error_handler($errno, $errstr, $errfile, $errline, $errcontext);
310 12
    }
311
312 66 View Code Duplication
    public function close()
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
313
    {
314 66
        $this->disableHeartbeat();
315 66
        if (is_resource($this->sock)) {
316 60
            fclose($this->sock);
317 30
        }
318 66
        $this->sock = null;
319 66
        $this->last_read = null;
320 66
        $this->last_write = null;
321 66
    }
322
323
    /**
324
     * @inheritdoc
325
     */
326
    public function getSocket()
327
    {
328
        return $this->sock;
329
    }
330
331
    /**
332
     * @inheritdoc
333
     */
334 90
    protected function do_select($sec, $usec)
335
    {
336 90
        $read = array($this->sock);
337 90
        $write = null;
338 90
        $except = null;
339
340 90
        return stream_select($read, $write, $except, $sec, $usec);
341
    }
342
343
    /**
344
     * @return int|bool
345
     */
346
    protected function select_write()
347
    {
348
        $read = $except = null;
349
        $write = array($this->sock);
350
351
        return stream_select($read, $write, $except, 0, 100000);
352
    }
353
354
    /**
355
     * @return mixed
356
     */
357 84
    protected function timed_out()
358
    {
359
        // get status of socket to determine whether or not it has timed out
360 84
        $info = stream_get_meta_data($this->sock);
361
362 84
        return $info['timed_out'];
363
    }
364
365
    /**
366
     * @throws \PhpAmqpLib\Exception\AMQPIOException
367
     */
368
    protected function enable_keepalive()
369
    {
370
        if ($this->protocol === 'ssl') {
371
            throw new AMQPIOException('Can not enable keepalive: ssl connection does not support keepalive (#70939)');
372
        }
373
374
        if ($this->protocol === 'tls') {
375
            throw new AMQPIOException('Can not enable keepalive: tls connection does not support keepalive (#70939)');
376
        }
377
378
        if (!function_exists('socket_import_stream')) {
379
            throw new AMQPIOException('Can not enable keepalive: function socket_import_stream does not exist');
380
        }
381
382
        if (!defined('SOL_SOCKET') || !defined('SO_KEEPALIVE')) {
383
            throw new AMQPIOException('Can not enable keepalive: SOL_SOCKET or SO_KEEPALIVE is not defined');
384
        }
385
386
        $socket = socket_import_stream($this->sock);
0 ignored issues
show
Bug introduced by
Are you sure the assignment to $socket is correct as socket_import_stream($this->sock) (which targets socket_import_stream()) seems to always return null.

This check looks for function or method calls that always return null and whose return value is assigned to a variable.

class A
{
    function getObject()
    {
        return null;
    }

}

$a = new A();
$object = $a->getObject();

The method getObject() can return nothing but null, so it makes no sense to assign that value to a variable.

The reason is most likely that a function or method is imcomplete or has been reduced for debug purposes.

Loading history...
387
        socket_set_option($socket, SOL_SOCKET, SO_KEEPALIVE, 1);
388
    }
389
}
390