1
|
|
|
<?php |
2
|
|
|
namespace PhpAmqpLib\Wire\IO; |
3
|
|
|
|
4
|
|
|
use PhpAmqpLib\Exception\AMQPConnectionClosedException; |
5
|
|
|
use PhpAmqpLib\Exception\AMQPDataReadException; |
6
|
|
|
use PhpAmqpLib\Exception\AMQPIOException; |
7
|
|
|
use PhpAmqpLib\Exception\AMQPRuntimeException; |
8
|
|
|
use PhpAmqpLib\Exception\AMQPTimeoutException; |
9
|
|
|
use PhpAmqpLib\Helper\MiscHelper; |
10
|
|
|
|
11
|
|
|
class StreamIO extends AbstractIO |
12
|
|
|
{ |
13
|
|
|
/** @var string */ |
14
|
|
|
protected $protocol; |
15
|
|
|
|
16
|
|
|
/** @var resource */ |
17
|
|
|
protected $context; |
18
|
|
|
|
19
|
|
|
/** @var resource */ |
20
|
|
|
private $sock; |
21
|
|
|
|
22
|
|
|
/** |
23
|
|
|
* @param string $host |
24
|
|
|
* @param int $port |
25
|
|
|
* @param float $connection_timeout |
26
|
|
|
* @param float $read_write_timeout |
27
|
|
|
* @param null $context |
28
|
|
|
* @param bool $keepalive |
29
|
|
|
* @param int $heartbeat |
30
|
|
|
*/ |
31
|
|
|
public function __construct( |
32
|
|
|
$host, |
33
|
|
|
$port, |
34
|
|
|
$connection_timeout, |
35
|
|
|
$read_write_timeout = 130.0, |
36
|
|
|
$context = null, |
37
|
|
|
$keepalive = false, |
38
|
|
|
$heartbeat = 60, |
39
|
|
|
$ssl_protocol = null |
40
|
102 |
|
) { |
41
|
|
|
if ($heartbeat !== 0 && ($read_write_timeout <= ($heartbeat * 2))) { |
42
|
|
|
throw new \InvalidArgumentException('read_write_timeout must be greater than 2x the heartbeat'); |
43
|
|
|
} |
44
|
|
|
|
45
|
|
|
$this->protocol = 'tcp'; |
46
|
|
|
$this->host = $host; |
47
|
|
|
$this->port = $port; |
48
|
|
|
$this->connection_timeout = $connection_timeout; |
49
|
|
|
$this->read_timeout = $read_write_timeout; |
50
|
102 |
|
$this->write_timeout = $read_write_timeout; |
51
|
6 |
|
$this->context = $context; |
52
|
|
|
$this->keepalive = $keepalive; |
53
|
|
|
$this->heartbeat = $heartbeat; |
54
|
|
|
$this->initial_heartbeat = $heartbeat; |
55
|
96 |
|
$this->canDispatchPcntlSignal = $this->isPcntlSignalEnabled(); |
56
|
96 |
|
|
57
|
96 |
|
if (!is_resource($this->context) || get_resource_type($this->context) !== 'stream-context') { |
58
|
|
|
$this->context = stream_context_create(); |
59
|
96 |
|
} |
60
|
96 |
|
|
61
|
96 |
|
// tcp_nodelay was added in 7.1.0 |
62
|
96 |
|
if (PHP_VERSION_ID >= 70100) { |
63
|
96 |
|
stream_context_set_option($this->context, 'socket', 'tcp_nodelay', true); |
64
|
96 |
|
} |
65
|
96 |
|
|
66
|
96 |
|
$options = stream_context_get_options($this->context); |
67
|
96 |
|
if (!empty($options['ssl'])) { |
68
|
96 |
|
if (isset($ssl_protocol)) { |
69
|
96 |
|
$this->protocol = $ssl_protocol; |
70
|
|
|
} else { |
71
|
96 |
|
$this->protocol = 'ssl'; |
72
|
96 |
|
} |
73
|
48 |
|
} |
74
|
|
|
} |
75
|
|
|
|
76
|
96 |
|
/** |
77
|
32 |
|
* @inheritdoc |
78
|
|
|
*/ |
79
|
|
|
public function connect() |
80
|
96 |
|
{ |
81
|
96 |
|
$errstr = $errno = null; |
82
|
|
|
|
83
|
|
|
$remote = sprintf( |
84
|
|
|
'%s://%s:%s', |
85
|
|
|
$this->protocol, |
86
|
|
|
$this->host, |
87
|
|
|
$this->port |
88
|
96 |
|
); |
89
|
|
|
|
90
|
|
|
$this->set_error_handler(); |
91
|
|
|
|
92
|
|
|
try { |
93
|
90 |
|
$this->sock = stream_socket_client( |
94
|
|
|
$remote, |
95
|
90 |
|
$errno, |
96
|
|
|
$errstr, |
97
|
90 |
|
$this->connection_timeout, |
98
|
90 |
|
STREAM_CLIENT_CONNECT, |
99
|
90 |
|
$this->context |
100
|
90 |
|
); |
101
|
90 |
|
$this->cleanup_error_handler(); |
102
|
45 |
|
} catch (\ErrorException $e) { |
103
|
|
|
throw new AMQPIOException($e->getMessage()); |
104
|
90 |
|
} |
105
|
|
|
|
106
|
|
|
if (false === $this->sock) { |
107
|
90 |
|
throw new AMQPIOException( |
108
|
90 |
|
sprintf( |
109
|
75 |
|
'Error Connecting to server(%s): %s ', |
110
|
75 |
|
$errno, |
111
|
90 |
|
$errstr |
112
|
90 |
|
), |
113
|
90 |
|
$errno |
114
|
45 |
|
); |
115
|
90 |
|
} |
116
|
48 |
|
|
117
|
6 |
|
if (false === stream_socket_get_name($this->sock, true)) { |
118
|
|
|
throw new AMQPIOException( |
119
|
|
|
sprintf( |
120
|
84 |
|
'Connection refused: %s ', |
121
|
|
|
$remote |
122
|
|
|
) |
123
|
|
|
); |
124
|
|
|
} |
125
|
|
|
|
126
|
|
|
list($sec, $uSec) = MiscHelper::splitSecondsMicroseconds(max($this->read_timeout, $this->write_timeout)); |
127
|
|
|
if (!stream_set_timeout($this->sock, $sec, $uSec)) { |
128
|
|
|
throw new AMQPIOException('Timeout could not be set'); |
129
|
|
|
} |
130
|
|
|
|
131
|
84 |
|
// php cannot capture signals while streams are blocking |
132
|
|
|
if ($this->canDispatchPcntlSignal) { |
133
|
|
|
stream_set_blocking($this->sock, 0); |
134
|
|
|
stream_set_write_buffer($this->sock, 0); |
135
|
|
|
if (function_exists('stream_set_read_buffer')) { |
136
|
|
|
stream_set_read_buffer($this->sock, 0); |
137
|
|
|
} |
138
|
|
|
} else { |
139
|
|
|
stream_set_blocking($this->sock, true); |
140
|
84 |
|
} |
141
|
84 |
|
|
142
|
|
|
if ($this->keepalive) { |
143
|
|
|
$this->enable_keepalive(); |
144
|
|
|
} |
145
|
|
|
$this->heartbeat = $this->initial_heartbeat; |
146
|
84 |
|
} |
147
|
84 |
|
|
148
|
84 |
|
/** |
149
|
84 |
|
* @inheritdoc |
150
|
84 |
|
*/ |
151
|
42 |
|
public function read($len) |
152
|
42 |
|
{ |
153
|
|
|
$this->check_heartbeat(); |
154
|
|
|
|
155
|
|
|
list($timeout_sec, $timeout_uSec) = MiscHelper::splitSecondsMicroseconds($this->read_timeout); |
156
|
84 |
|
|
157
|
|
|
$read_start = microtime(true); |
158
|
|
|
$read = 0; |
159
|
84 |
|
$data = ''; |
160
|
84 |
|
|
161
|
|
|
while ($read < $len) { |
162
|
|
|
if (!is_resource($this->sock) || feof($this->sock)) { |
163
|
|
|
$this->close(); |
164
|
|
|
throw new AMQPConnectionClosedException('Broken pipe or closed connection'); |
165
|
84 |
|
} |
166
|
|
|
|
167
|
84 |
|
$this->set_error_handler(); |
168
|
|
|
try { |
169
|
84 |
|
$buffer = fread($this->sock, ($len - $read)); |
170
|
|
|
$this->cleanup_error_handler(); |
171
|
84 |
|
} catch (\ErrorException $e) { |
172
|
84 |
|
throw new AMQPDataReadException($e->getMessage(), $e->getCode(), $e); |
173
|
84 |
|
} |
174
|
|
|
|
175
|
84 |
|
if ($buffer === false) { |
176
|
84 |
|
throw new AMQPDataReadException('Error receiving data'); |
177
|
|
|
} |
178
|
|
|
|
179
|
|
View Code Duplication |
if ($buffer === '') { |
|
|
|
|
180
|
84 |
|
$read_now = microtime(true); |
181
|
|
|
$t_read = $read_now - $read_start; |
182
|
84 |
|
if ($t_read > $this->read_timeout) { |
183
|
84 |
|
throw new AMQPTimeoutException('Too many read attempts detected in StreamIO'); |
184
|
42 |
|
} |
185
|
|
|
$this->select($timeout_sec, $timeout_uSec); |
186
|
|
|
|
187
|
|
|
continue; |
188
|
84 |
|
} |
189
|
|
|
|
190
|
|
|
$this->last_read = microtime(true); |
191
|
|
|
$read_start = $this->last_read; |
192
|
84 |
|
$read += mb_strlen($buffer, 'ASCII'); |
193
|
|
|
$data .= $buffer; |
194
|
|
|
} |
195
|
|
|
|
196
|
|
|
if (mb_strlen($data, 'ASCII') !== $len) { |
197
|
|
|
throw new AMQPDataReadException( |
198
|
|
|
sprintf( |
199
|
|
|
'Error reading data. Received %s instead of expected %s bytes', |
200
|
|
|
mb_strlen($data, 'ASCII'), |
201
|
|
|
$len |
202
|
|
|
) |
203
|
84 |
|
); |
204
|
84 |
|
} |
205
|
84 |
|
|
206
|
84 |
|
$this->last_read = microtime(true); |
207
|
42 |
|
|
208
|
|
|
return $data; |
209
|
84 |
|
} |
210
|
|
|
|
211
|
|
|
/** |
212
|
|
|
* @inheritdoc |
213
|
|
|
*/ |
214
|
|
|
public function write($data) |
215
|
|
|
{ |
216
|
|
|
$written = 0; |
217
|
|
|
$len = mb_strlen($data, 'ASCII'); |
218
|
|
|
$write_start = microtime(true); |
219
|
84 |
|
|
220
|
|
|
while ($written < $len) { |
221
|
84 |
|
if (!is_resource($this->sock)) { |
222
|
|
|
throw new AMQPConnectionClosedException('Broken pipe or closed connection'); |
223
|
|
|
} |
224
|
|
|
|
225
|
|
|
$result = false; |
226
|
|
|
$this->set_error_handler(); |
227
|
90 |
|
// OpenSSL's C library function SSL_write() can balk on buffers > 8192 |
228
|
|
|
// bytes in length, so we're limiting the write size here. On both TLS |
229
|
90 |
|
// and plaintext connections, the write loop will continue until the |
230
|
90 |
|
// buffer has been fully written. |
231
|
90 |
|
// This behavior has been observed in OpenSSL dating back to at least |
232
|
|
|
// September 2002: |
233
|
90 |
|
// http://comments.gmane.org/gmane.comp.encryption.openssl.user/4361 |
234
|
90 |
|
try { |
235
|
|
|
$buffer = mb_substr($data, $written, self::BUFFER_SIZE, 'ASCII'); |
236
|
|
|
$result = fwrite($this->sock, $buffer); |
237
|
|
|
$this->cleanup_error_handler(); |
238
|
90 |
|
} catch (\ErrorException $e) { |
239
|
90 |
|
$code = $this->last_error['errno']; |
240
|
|
|
switch ($code) { |
241
|
|
|
case SOCKET_EPIPE: |
242
|
|
|
case SOCKET_ENETDOWN: |
243
|
|
|
case SOCKET_ENETUNREACH: |
244
|
|
|
case SOCKET_ENETRESET: |
245
|
|
|
case SOCKET_ECONNABORTED: |
246
|
|
|
case SOCKET_ECONNRESET: |
247
|
|
|
case SOCKET_ECONNREFUSED: |
248
|
90 |
|
case SOCKET_ETIMEDOUT: |
249
|
90 |
|
$this->close(); |
250
|
90 |
|
throw new AMQPConnectionClosedException(socket_strerror($code), $code, $e); |
251
|
45 |
|
default: |
252
|
|
|
throw new AMQPRuntimeException($e->getMessage(), $code, $e); |
253
|
|
|
} |
254
|
|
|
} |
255
|
|
|
|
256
|
|
|
if ($result === false) { |
257
|
|
|
throw new AMQPRuntimeException('Error sending data'); |
258
|
|
|
} |
259
|
|
|
|
260
|
|
|
if ($this->timed_out()) { |
261
|
|
|
throw AMQPTimeoutException::writeTimeout($this->write_timeout); |
262
|
|
|
} |
263
|
|
|
|
264
|
|
|
$now = microtime(true); |
265
|
90 |
|
if ($result > 0) { |
266
|
|
|
$this->last_write = $write_start = $now; |
267
|
|
|
$written += $result; |
268
|
|
|
} else { |
269
|
90 |
|
if (feof($this->sock)) { |
270
|
|
|
$this->close(); |
271
|
|
|
throw new AMQPConnectionClosedException('Broken pipe or closed connection'); |
272
|
|
|
} |
273
|
90 |
|
if (($now - $write_start) > $this->write_timeout) { |
274
|
90 |
|
throw AMQPTimeoutException::writeTimeout($this->write_timeout); |
275
|
90 |
|
} |
276
|
90 |
|
// check stream and prevent from high CPU usage |
277
|
45 |
|
$this->select_write(); |
278
|
6 |
|
} |
279
|
|
|
} |
280
|
|
|
} |
281
|
|
|
|
282
|
6 |
|
/** |
283
|
6 |
|
* @inheritdoc |
284
|
|
|
*/ |
285
|
|
|
public function error_handler($errno, $errstr, $errfile, $errline, $errcontext = null) |
286
|
6 |
|
{ |
287
|
|
|
$code = $this->extract_error_code($errstr); |
288
|
45 |
|
switch ($code) { |
289
|
90 |
|
// fwrite notice that the stream isn't ready - EAGAIN or EWOULDBLOCK |
290
|
|
|
case SOCKET_EAGAIN: |
291
|
|
|
case SOCKET_EWOULDBLOCK: |
292
|
|
|
// stream_select warning that it has been interrupted by a signal - EINTR |
293
|
|
|
case SOCKET_EINTR: |
294
|
30 |
|
return; |
295
|
|
|
} |
296
|
|
|
|
297
|
30 |
|
parent::error_handler($code > 0 ? $code : $errno, $errstr, $errfile, $errline, $errcontext); |
298
|
30 |
|
} |
299
|
|
|
|
300
|
|
View Code Duplication |
public function close() |
|
|
|
|
301
|
|
|
{ |
302
|
|
|
$this->disableHeartbeat(); |
303
|
|
|
if (is_resource($this->sock)) { |
304
|
30 |
|
fclose($this->sock); |
305
|
|
|
} |
306
|
18 |
|
$this->sock = null; |
307
|
|
|
$this->last_read = null; |
308
|
|
|
$this->last_write = null; |
309
|
12 |
|
} |
310
|
12 |
|
|
311
|
|
|
/** |
312
|
66 |
|
* @inheritdoc |
313
|
|
|
*/ |
314
|
66 |
|
public function getSocket() |
315
|
66 |
|
{ |
316
|
60 |
|
return $this->sock; |
317
|
30 |
|
} |
318
|
66 |
|
|
319
|
66 |
|
/** |
320
|
66 |
|
* @inheritdoc |
321
|
66 |
|
*/ |
322
|
|
|
protected function do_select($sec, $usec) |
323
|
|
|
{ |
324
|
|
|
$read = array($this->sock); |
325
|
|
|
$write = null; |
326
|
|
|
$except = null; |
327
|
|
|
|
328
|
|
|
return stream_select($read, $write, $except, $sec, $usec); |
329
|
|
|
} |
330
|
|
|
|
331
|
|
|
/** |
332
|
|
|
* @return int|bool |
333
|
|
|
*/ |
334
|
90 |
|
protected function select_write() |
335
|
|
|
{ |
336
|
90 |
|
$read = $except = null; |
337
|
90 |
|
$write = array($this->sock); |
338
|
90 |
|
|
339
|
|
|
return stream_select($read, $write, $except, 0, 100000); |
340
|
90 |
|
} |
341
|
|
|
|
342
|
|
|
/** |
343
|
|
|
* @return mixed |
344
|
|
|
*/ |
345
|
|
|
protected function timed_out() |
346
|
|
|
{ |
347
|
|
|
// get status of socket to determine whether or not it has timed out |
348
|
|
|
$info = stream_get_meta_data($this->sock); |
349
|
|
|
|
350
|
|
|
return $info['timed_out']; |
351
|
|
|
} |
352
|
|
|
|
353
|
|
|
/** |
354
|
|
|
* @throws \PhpAmqpLib\Exception\AMQPIOException |
355
|
|
|
*/ |
356
|
|
|
protected function enable_keepalive() |
357
|
84 |
|
{ |
358
|
|
|
if ($this->protocol === 'ssl') { |
359
|
|
|
throw new AMQPIOException('Can not enable keepalive: ssl connection does not support keepalive (#70939)'); |
360
|
84 |
|
} |
361
|
|
|
|
362
|
84 |
|
if ($this->protocol === 'tls') { |
363
|
|
|
throw new AMQPIOException('Can not enable keepalive: tls connection does not support keepalive (#70939)'); |
364
|
|
|
} |
365
|
|
|
|
366
|
|
|
if (!function_exists('socket_import_stream')) { |
367
|
|
|
throw new AMQPIOException('Can not enable keepalive: function socket_import_stream does not exist'); |
368
|
|
|
} |
369
|
|
|
|
370
|
|
|
if (!defined('SOL_SOCKET') || !defined('SO_KEEPALIVE')) { |
371
|
|
|
throw new AMQPIOException('Can not enable keepalive: SOL_SOCKET or SO_KEEPALIVE is not defined'); |
372
|
|
|
} |
373
|
|
|
|
374
|
|
|
$socket = socket_import_stream($this->sock); |
|
|
|
|
375
|
|
|
socket_set_option($socket, SOL_SOCKET, SO_KEEPALIVE, 1); |
376
|
|
|
} |
377
|
|
|
|
378
|
|
|
/** |
379
|
|
|
* @param string $message |
380
|
|
|
* @return int |
381
|
|
|
*/ |
382
|
|
|
protected function extract_error_code($message) |
383
|
|
|
{ |
384
|
|
|
if (0 === strpos($message, 'stream_select():')) { |
385
|
|
|
$pattern = '/\s+\[(\d+)\]:\s+/'; |
386
|
|
|
} else { |
387
|
|
|
$pattern = '/\s+errno=(\d+)\s+/'; |
388
|
|
|
} |
389
|
|
|
$matches = array(); |
390
|
|
|
$result = preg_match($pattern, $message, $matches); |
391
|
|
|
if ($result > 0) { |
392
|
|
|
return (int)$matches[1]; |
393
|
|
|
} |
394
|
|
|
|
395
|
|
|
return 0; |
396
|
|
|
} |
397
|
|
|
} |
398
|
|
|
|
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.
You can also find more detailed suggestions in the “Code” section of your repository.