1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace PhpAmqpLib\Wire\IO; |
4
|
|
|
|
5
|
|
|
use PhpAmqpLib\Exception\AMQPConnectionClosedException; |
6
|
|
|
use PhpAmqpLib\Exception\AMQPDataReadException; |
7
|
|
|
use PhpAmqpLib\Exception\AMQPIOException; |
8
|
|
|
use PhpAmqpLib\Exception\AMQPRuntimeException; |
9
|
|
|
use PhpAmqpLib\Exception\AMQPTimeoutException; |
10
|
|
|
use PhpAmqpLib\Helper\MiscHelper; |
11
|
|
|
use PhpAmqpLib\Helper\SocketConstants; |
12
|
|
|
|
13
|
|
|
class StreamIO extends AbstractIO |
14
|
|
|
{ |
15
|
|
|
/** @var null|resource */ |
16
|
|
|
protected $context; |
17
|
|
|
|
18
|
|
|
/** @var null|resource */ |
19
|
|
|
private $sock; |
20
|
|
|
|
21
|
|
|
/** |
22
|
|
|
* @param string $host |
23
|
|
|
* @param int $port |
24
|
|
|
* @param float $connection_timeout |
25
|
|
|
* @param float $read_write_timeout |
26
|
|
|
* @param resource|null $context |
27
|
|
|
* @param bool $keepalive |
28
|
|
|
* @param int $heartbeat |
29
|
|
|
* @param string|null $ssl_protocol @deprecated |
30
|
|
|
*/ |
31
|
|
|
public function __construct( |
32
|
|
|
$host, |
33
|
|
|
$port, |
34
|
30 |
|
$connection_timeout, |
35
|
|
|
$read_write_timeout, |
36
|
|
|
$context = null, |
37
|
|
|
$keepalive = false, |
38
|
|
|
$heartbeat = 0, |
39
|
|
|
$ssl_protocol = null |
|
|
|
|
40
|
|
|
) { |
41
|
|
|
if (func_num_args() === 8) { |
42
|
|
|
trigger_error( |
43
|
|
|
'$ssl_protocol parameter is deprecated, use stream_context_set_option($context, \'ssl\', \'crypto_method\', $ssl_protocol) instead (see https://www.php.net/manual/en/function.stream-socket-enable-crypto.php for possible values)', |
44
|
|
|
E_USER_DEPRECATED |
45
|
|
|
); |
46
|
|
|
} |
47
|
|
|
// TODO FUTURE change comparison to <= |
48
|
|
|
// php-amqplib/php-amqplib#648, php-amqplib/php-amqplib#666 |
49
|
|
|
/* |
50
|
|
|
TODO FUTURE enable this check |
51
|
|
|
if ($heartbeat !== 0 && ($read_write_timeout < ($heartbeat * 2))) { |
52
|
|
|
throw new \InvalidArgumentException('read_write_timeout must be at least 2x the heartbeat'); |
53
|
30 |
|
} |
54
|
27 |
|
*/ |
55
|
|
|
|
56
|
|
|
$this->host = $host; |
57
|
30 |
|
$this->port = $port; |
58
|
30 |
|
$this->connection_timeout = $connection_timeout; |
59
|
30 |
|
$this->read_timeout = (float)$read_write_timeout; |
60
|
30 |
|
$this->write_timeout = (float)$read_write_timeout; |
61
|
30 |
|
$this->context = $context; |
62
|
30 |
|
$this->keepalive = $keepalive; |
63
|
30 |
|
$this->heartbeat = $heartbeat; |
64
|
30 |
|
$this->initial_heartbeat = $heartbeat; |
65
|
30 |
|
$this->canDispatchPcntlSignal = $this->isPcntlSignalEnabled(); |
66
|
30 |
|
} |
67
|
30 |
|
|
68
|
|
|
/** |
69
|
30 |
|
* @inheritdoc |
70
|
|
|
*/ |
71
|
30 |
|
public function connect() |
72
|
30 |
|
{ |
73
|
3 |
|
$errstr = $errno = null; |
74
|
3 |
|
|
75
|
|
|
$remote = sprintf( |
76
|
|
|
'tcp://%s:%s', |
77
|
|
|
$this->host, |
78
|
|
|
$this->port |
79
|
|
|
); |
80
|
|
|
|
81
|
|
|
$context = $this->setupContext(); |
82
|
|
|
$this->setErrorHandler(); |
83
|
|
|
|
84
|
29 |
|
try { |
85
|
|
|
$this->sock = stream_socket_client( |
86
|
29 |
|
$remote, |
87
|
|
|
$errno, |
88
|
29 |
|
$errstr, |
89
|
|
|
$this->connection_timeout, |
90
|
29 |
|
STREAM_CLIENT_CONNECT, |
91
|
29 |
|
$context |
92
|
29 |
|
); |
93
|
|
|
$this->throwOnError(); |
94
|
|
|
} catch (\ErrorException $e) { |
95
|
29 |
|
throw new AMQPIOException($e->getMessage()); |
96
|
|
|
} finally { |
97
|
|
|
$this->restoreErrorHandler(); |
98
|
29 |
|
} |
99
|
|
|
|
100
|
|
|
if (false === $this->sock) { |
101
|
|
|
throw new AMQPIOException( |
102
|
29 |
|
sprintf( |
103
|
|
|
'Error Connecting to server(%s): %s ', |
104
|
29 |
|
$errno, |
105
|
|
|
$errstr |
106
|
29 |
|
), |
107
|
1 |
|
$errno |
108
|
1 |
|
); |
109
|
28 |
|
} |
110
|
29 |
|
|
111
|
|
|
if (!stream_socket_get_name($this->sock, true)) { |
112
|
|
|
throw new AMQPIOException( |
113
|
28 |
|
sprintf( |
114
|
|
|
'Connection refused: %s ', |
115
|
|
|
$remote |
116
|
|
|
) |
117
|
|
|
); |
118
|
|
|
} |
119
|
|
|
|
120
|
|
|
list($sec, $uSec) = MiscHelper::splitSecondsMicroseconds(max($this->read_timeout, $this->write_timeout)); |
121
|
|
|
if (!stream_set_timeout($this->sock, $sec, $uSec)) { |
122
|
|
|
throw new AMQPIOException('Timeout could not be set'); |
123
|
|
|
} |
124
|
28 |
|
|
125
|
|
|
// php cannot capture signals while streams are blocking |
126
|
|
|
if ($this->canDispatchPcntlSignal) { |
127
|
|
|
stream_set_blocking($this->sock, false); |
128
|
|
|
stream_set_write_buffer($this->sock, 0); |
129
|
|
|
if (function_exists('stream_set_read_buffer')) { |
130
|
|
|
stream_set_read_buffer($this->sock, 0); |
131
|
|
|
} |
132
|
|
|
} else { |
133
|
28 |
|
stream_set_blocking($this->sock, true); |
134
|
28 |
|
} |
135
|
|
|
|
136
|
|
|
if ($this->keepalive) { |
137
|
|
|
$this->enable_keepalive(); |
138
|
|
|
} |
139
|
28 |
|
|
140
|
28 |
|
$options = stream_context_get_options($context); |
141
|
28 |
|
if (isset($options['ssl']['crypto_method'])) { |
142
|
28 |
|
$this->enableCrypto(); |
143
|
28 |
|
} |
144
|
|
|
|
145
|
|
|
$this->heartbeat = $this->initial_heartbeat; |
146
|
|
|
} |
147
|
|
|
|
148
|
|
|
/** |
149
|
28 |
|
* @return resource |
150
|
|
|
* @throws AMQPIOException |
151
|
|
|
*/ |
152
|
28 |
|
private function setupContext() |
153
|
|
|
{ |
154
|
|
|
$context = $this->context; |
155
|
|
|
if (!is_resource($context) || get_resource_type($context) !== 'stream-context') { |
156
|
|
|
$context = stream_context_create(); |
157
|
|
|
} |
158
|
28 |
|
|
159
|
|
|
stream_context_set_option($context, 'socket', 'tcp_nodelay', true); |
160
|
28 |
|
|
161
|
|
|
$options = stream_context_get_options($context); |
162
|
28 |
|
if (!empty($options['ssl']) && !isset($options['ssl']['crypto_method'])) { |
163
|
|
|
if (!stream_context_set_option($context, 'ssl', 'crypto_method', STREAM_CRYPTO_METHOD_ANY_CLIENT)) { |
164
|
28 |
|
throw new AMQPIOException("Can not set ssl.crypto_method stream context option"); |
165
|
28 |
|
} |
166
|
28 |
|
} |
167
|
|
|
|
168
|
28 |
|
return $context; |
169
|
28 |
|
} |
170
|
|
|
|
171
|
|
|
/** |
172
|
|
|
* @inheritdoc |
173
|
|
|
*/ |
174
|
28 |
|
public function read($len) |
175
|
|
|
{ |
176
|
28 |
|
$this->check_heartbeat(); |
177
|
28 |
|
|
178
|
|
|
list($timeout_sec, $timeout_uSec) = MiscHelper::splitSecondsMicroseconds($this->read_timeout); |
179
|
|
|
|
180
|
28 |
|
$read_start = microtime(true); |
181
|
28 |
|
$read = 0; |
182
|
|
|
$data = ''; |
183
|
|
|
|
184
|
28 |
|
while ($read < $len) { |
185
|
|
|
if (!is_resource($this->sock) || feof($this->sock)) { |
186
|
|
|
$this->close(); |
187
|
|
|
throw new AMQPConnectionClosedException('Broken pipe or closed connection'); |
188
|
28 |
|
} |
189
|
|
|
|
190
|
|
|
$this->setErrorHandler(); |
191
|
|
|
try { |
192
|
|
|
$buffer = fread($this->sock, ($len - $read)); |
193
|
|
|
$this->throwOnError(); |
194
|
|
|
} catch (\ErrorException $e) { |
195
|
|
|
throw new AMQPDataReadException($e->getMessage(), $e->getCode(), $e); |
196
|
|
|
} finally { |
197
|
|
|
$this->restoreErrorHandler(); |
198
|
|
|
} |
199
|
28 |
|
|
200
|
28 |
|
if ($buffer === false) { |
201
|
28 |
|
throw new AMQPDataReadException('Error receiving data'); |
202
|
28 |
|
} |
203
|
|
|
|
204
|
|
|
if ($buffer === '') { |
205
|
28 |
|
$read_now = microtime(true); |
206
|
|
|
$t_read = $read_now - $read_start; |
207
|
|
|
if ($t_read > $this->read_timeout) { |
208
|
|
|
throw new AMQPTimeoutException('Too many read attempts detected in StreamIO'); |
209
|
|
|
} |
210
|
|
|
$this->select($timeout_sec, $timeout_uSec); |
211
|
|
|
|
212
|
|
|
continue; |
213
|
|
|
} |
214
|
|
|
|
215
|
28 |
|
$this->last_read = microtime(true); |
|
|
|
|
216
|
|
|
$read_start = $this->last_read; |
217
|
28 |
|
$read += mb_strlen($buffer, 'ASCII'); |
218
|
|
|
$data .= $buffer; |
219
|
|
|
} |
220
|
|
|
|
221
|
|
|
if (mb_strlen($data, 'ASCII') !== $len) { |
222
|
|
|
throw new AMQPDataReadException( |
223
|
33 |
|
sprintf( |
224
|
|
|
'Error reading data. Received %s instead of expected %s bytes', |
225
|
33 |
|
mb_strlen($data, 'ASCII'), |
226
|
|
|
$len |
227
|
33 |
|
) |
228
|
33 |
|
); |
229
|
33 |
|
} |
230
|
|
|
|
231
|
33 |
|
$this->last_read = microtime(true); |
232
|
33 |
|
|
233
|
|
|
return $data; |
234
|
|
|
} |
235
|
|
|
|
236
|
|
|
/** |
237
|
|
|
* @inheritdoc |
238
|
33 |
|
*/ |
239
|
33 |
|
public function write($data) |
240
|
|
|
{ |
241
|
|
|
$this->checkBrokerHeartbeat(); |
242
|
|
|
|
243
|
|
|
$written = 0; |
244
|
|
|
$len = mb_strlen($data, 'ASCII'); |
245
|
|
|
$write_start = microtime(true); |
246
|
|
|
|
247
|
|
|
while ($written < $len) { |
248
|
|
|
if (!is_resource($this->sock) || feof($this->sock)) { |
249
|
33 |
|
$this->close(); |
250
|
33 |
|
$constants = SocketConstants::getInstance(); |
251
|
33 |
|
throw new AMQPConnectionClosedException('Broken pipe or closed connection', $constants->SOCKET_EPIPE); |
252
|
33 |
|
} |
253
|
|
|
|
254
|
|
|
$result = false; |
|
|
|
|
255
|
|
|
$this->setErrorHandler(); |
256
|
|
|
// OpenSSL's C library function SSL_write() can balk on buffers > 8192 |
257
|
|
|
// bytes in length, so we're limiting the write size here. On both TLS |
258
|
|
|
// and plaintext connections, the write loop will continue until the |
259
|
|
|
// buffer has been fully written. |
260
|
|
|
// This behavior has been observed in OpenSSL dating back to at least |
261
|
|
|
// September 2002: |
262
|
|
|
// http://comments.gmane.org/gmane.comp.encryption.openssl.user/4361 |
263
|
|
|
try { |
264
|
|
|
// check stream and prevent from high CPU usage |
265
|
|
|
$result = 0; |
266
|
|
|
if ($this->select_write()) { |
267
|
|
|
$buffer = mb_substr($data, $written, self::BUFFER_SIZE, 'ASCII'); |
268
|
|
|
$result = fwrite($this->sock, $buffer); |
269
|
|
|
} |
270
|
33 |
|
$this->throwOnError(); |
271
|
33 |
|
} catch (\ErrorException $e) { |
272
|
|
|
$code = $this->last_error['errno']; |
273
|
|
|
$constants = SocketConstants::getInstance(); |
274
|
33 |
|
switch ($code) { |
275
|
|
|
case $constants->SOCKET_EPIPE: |
276
|
|
|
case $constants->SOCKET_ENETDOWN: |
277
|
|
|
case $constants->SOCKET_ENETUNREACH: |
278
|
33 |
|
case $constants->SOCKET_ENETRESET: |
279
|
|
|
case $constants->SOCKET_ECONNABORTED: |
280
|
|
|
case $constants->SOCKET_ECONNRESET: |
281
|
|
|
case $constants->SOCKET_ECONNREFUSED: |
282
|
33 |
|
case $constants->SOCKET_ETIMEDOUT: |
283
|
33 |
|
$this->close(); |
284
|
33 |
|
throw new AMQPConnectionClosedException(socket_strerror($code), $code, $e); |
285
|
33 |
|
default: |
286
|
|
|
throw new AMQPRuntimeException($e->getMessage(), $code, $e); |
287
|
1 |
|
} |
288
|
|
|
} finally { |
289
|
|
|
$this->restoreErrorHandler(); |
290
|
|
|
} |
291
|
1 |
|
|
292
|
1 |
|
if ($result === false) { |
293
|
|
|
throw new AMQPRuntimeException('Error sending data'); |
294
|
|
|
} |
295
|
|
|
|
296
|
|
|
if ($this->timed_out()) { |
297
|
|
|
throw AMQPTimeoutException::writeTimeout($this->write_timeout); |
298
|
|
|
} |
299
|
|
|
|
300
|
|
|
$now = microtime(true); |
301
|
4 |
|
if ($result > 0) { |
302
|
|
|
$this->last_write = $write_start = $now; |
|
|
|
|
303
|
4 |
|
$written += $result; |
304
|
4 |
|
} else { |
305
|
|
|
if (feof($this->sock)) { |
306
|
|
|
$this->close(); |
307
|
4 |
|
throw new AMQPConnectionClosedException('Broken pipe or closed connection'); |
308
|
4 |
|
} |
309
|
|
|
if (($now - $write_start) > $this->write_timeout) { |
310
|
4 |
|
throw AMQPTimeoutException::writeTimeout($this->write_timeout); |
311
|
3 |
|
} |
312
|
|
|
} |
313
|
|
|
} |
314
|
1 |
|
} |
315
|
|
|
|
316
|
|
|
/** |
317
|
24 |
|
* @inheritdoc |
318
|
|
|
*/ |
319
|
24 |
|
public function error_handler($errno, $errstr, $errfile, $errline): void |
320
|
24 |
|
{ |
321
|
23 |
|
$code = $this->extract_error_code($errstr); |
322
|
|
|
$constants = SocketConstants::getInstance(); |
323
|
24 |
|
switch ($code) { |
324
|
24 |
|
// fwrite notice that the stream isn't ready - EAGAIN or EWOULDBLOCK |
325
|
24 |
|
case $constants->SOCKET_EAGAIN: |
326
|
|
|
case $constants->SOCKET_EWOULDBLOCK: |
327
|
|
|
// stream_select warning that it has been interrupted by a signal - EINTR |
328
|
|
|
case $constants->SOCKET_EINTR: |
329
|
|
|
return; |
330
|
|
|
} |
331
|
|
|
|
332
|
|
|
parent::error_handler($code > 0 ? $code : $errno, $errstr, $errfile, $errline); |
333
|
|
|
} |
334
|
|
|
|
335
|
|
|
public function close() |
336
|
|
|
{ |
337
|
|
|
$this->disableHeartbeat(); |
338
|
|
|
if (is_resource($this->sock)) { |
339
|
29 |
|
fclose($this->sock); |
340
|
|
|
} |
341
|
29 |
|
$this->sock = null; |
342
|
1 |
|
$this->last_read = 0; |
343
|
1 |
|
$this->last_write = 0; |
344
|
|
|
} |
345
|
|
|
|
346
|
28 |
|
/** |
347
|
28 |
|
* @deprecated |
348
|
28 |
|
* @return null|resource|\Socket |
349
|
|
|
*/ |
350
|
28 |
|
public function getSocket() |
351
|
|
|
{ |
352
|
|
|
return $this->sock; |
353
|
|
|
} |
354
|
28 |
|
|
355
|
|
|
/** |
356
|
|
|
* @inheritdoc |
357
|
|
|
*/ |
358
|
|
|
protected function do_select(?int $sec, int $usec) |
359
|
|
|
{ |
360
|
28 |
|
if ($this->sock === null || !is_resource($this->sock)) { |
361
|
|
|
$this->sock = null; |
362
|
28 |
|
throw new AMQPConnectionClosedException('Broken pipe or closed connection', 0); |
363
|
28 |
|
} |
364
|
|
|
|
365
|
28 |
|
$read = array($this->sock); |
366
|
|
|
$write = null; |
367
|
|
|
$except = null; |
368
|
|
|
|
369
|
|
|
if ($sec === null && PHP_VERSION_ID >= 80100) { |
370
|
|
|
$usec = 0; |
371
|
28 |
|
} |
372
|
|
|
|
373
|
|
|
return stream_select($read, $write, $except, $sec, $usec); |
374
|
28 |
|
} |
375
|
|
|
|
376
|
28 |
|
/** |
377
|
|
|
* @return int|bool |
378
|
|
|
*/ |
379
|
|
|
protected function select_write() |
380
|
|
|
{ |
381
|
|
|
$read = $except = null; |
382
|
|
|
$write = array($this->sock); |
383
|
|
|
|
384
|
|
|
return stream_select($read, $write, $except, 0, 100000); |
385
|
|
|
} |
386
|
|
|
|
387
|
|
|
/** |
388
|
|
|
* @return mixed |
389
|
|
|
*/ |
390
|
|
|
protected function timed_out() |
391
|
|
|
{ |
392
|
|
|
// get status of socket to determine whether or not it has timed out |
393
|
|
|
$info = stream_get_meta_data($this->sock); |
394
|
|
|
|
395
|
|
|
return $info['timed_out']; |
396
|
|
|
} |
397
|
|
|
|
398
|
|
|
/** |
399
|
|
|
* @throws \PhpAmqpLib\Exception\AMQPIOException |
400
|
|
|
*/ |
401
|
|
|
protected function enable_keepalive(): void |
402
|
|
|
{ |
403
|
|
|
if (!function_exists('socket_import_stream')) { |
404
|
|
|
throw new AMQPIOException('Can not enable keepalive: function socket_import_stream does not exist'); |
405
|
|
|
} |
406
|
|
|
|
407
|
|
|
if (!defined('SOL_SOCKET') || !defined('SO_KEEPALIVE')) { |
408
|
4 |
|
throw new AMQPIOException('Can not enable keepalive: SOL_SOCKET or SO_KEEPALIVE is not defined'); |
409
|
|
|
} |
410
|
4 |
|
|
411
|
3 |
|
$socket = socket_import_stream($this->sock); |
412
|
|
|
socket_set_option($socket, SOL_SOCKET, SO_KEEPALIVE, 1); |
413
|
1 |
|
} |
414
|
|
|
|
415
|
4 |
|
/** |
416
|
4 |
|
* @param string $message |
417
|
4 |
|
* @return int |
418
|
3 |
|
*/ |
419
|
|
|
protected function extract_error_code($message) |
420
|
|
|
{ |
421
|
1 |
|
if (0 === strpos($message, 'stream_select():')) { |
422
|
|
|
$pattern = '/\s+\[(\d+)\]:\s+/'; |
423
|
|
|
} else { |
424
|
|
|
$pattern = '/\s+errno=(\d+)\s+/'; |
425
|
|
|
} |
426
|
|
|
$matches = array(); |
427
|
|
|
$result = preg_match($pattern, $message, $matches); |
428
|
|
|
if ($result > 0) { |
429
|
|
|
return (int)$matches[1]; |
430
|
|
|
} |
431
|
|
|
|
432
|
|
|
return 0; |
433
|
|
|
} |
434
|
|
|
|
435
|
|
|
/** |
436
|
|
|
* @throws AMQPIOException |
437
|
|
|
*/ |
438
|
|
|
private function enableCrypto(): void |
439
|
|
|
{ |
440
|
|
|
$timeout_at = time() + ($this->read_timeout + $this->write_timeout) * 2; // 2 round-trips during handshake |
441
|
|
|
|
442
|
|
|
try { |
443
|
|
|
$this->setErrorHandler(); |
444
|
|
|
do { |
445
|
|
|
$enabled = stream_socket_enable_crypto($this->sock, true); |
446
|
|
|
if ($enabled === true) { |
447
|
|
|
return; |
448
|
|
|
} |
449
|
|
|
$this->throwOnError(); |
450
|
|
|
usleep(1e3); |
451
|
|
|
} while ($enabled === 0 && time() < $timeout_at); |
452
|
|
|
} catch (\ErrorException $exception) { |
453
|
|
|
throw new AMQPIOException($exception->getMessage(), $exception->getCode(), $exception); |
454
|
|
|
} finally { |
455
|
|
|
$this->restoreErrorHandler(); |
456
|
|
|
} |
457
|
|
|
|
458
|
|
|
if ($enabled !== true) { |
|
|
|
|
459
|
|
|
throw new AMQPIOException('Could not enable socket crypto'); |
460
|
|
|
} |
461
|
|
|
} |
462
|
|
|
} |
463
|
|
|
|
This check looks for parameters that have been defined for a function or method, but which are not used in the method body.