1
|
|
|
<?php |
2
|
|
|
namespace PhpAmqpLib\Wire\IO; |
3
|
|
|
|
4
|
|
|
use PhpAmqpLib\Exception\AMQPConnectionClosedException; |
5
|
|
|
use PhpAmqpLib\Exception\AMQPDataReadException; |
6
|
|
|
use PhpAmqpLib\Exception\AMQPIOException; |
7
|
|
|
use PhpAmqpLib\Exception\AMQPRuntimeException; |
8
|
|
|
use PhpAmqpLib\Exception\AMQPTimeoutException; |
9
|
|
|
use PhpAmqpLib\Helper\MiscHelper; |
10
|
|
|
|
11
|
|
|
class StreamIO extends AbstractIO |
12
|
|
|
{ |
13
|
|
|
/** @var string */ |
14
|
|
|
protected $protocol; |
15
|
|
|
|
16
|
|
|
/** @var resource */ |
17
|
|
|
protected $context; |
18
|
|
|
|
19
|
|
|
/** @var resource */ |
20
|
|
|
private $sock; |
21
|
|
|
|
22
|
|
|
/** @var string */ |
23
|
|
|
private static $SOCKET_STRERROR_EAGAIN; |
24
|
|
|
|
25
|
|
|
/** @var string */ |
26
|
|
|
private static $SOCKET_STRERROR_EWOULDBLOCK; |
27
|
|
|
|
28
|
|
|
/** @var string */ |
29
|
|
|
private static $SOCKET_STRERROR_EINTR; |
30
|
|
|
|
31
|
|
|
/** |
32
|
|
|
* @param string $host |
33
|
|
|
* @param int $port |
34
|
|
|
* @param float $connection_timeout |
35
|
|
|
* @param float $read_write_timeout |
36
|
|
|
* @param null $context |
37
|
|
|
* @param bool $keepalive |
38
|
|
|
* @param int $heartbeat |
39
|
|
|
*/ |
40
|
102 |
|
public function __construct( |
41
|
|
|
$host, |
42
|
|
|
$port, |
43
|
|
|
$connection_timeout, |
44
|
|
|
$read_write_timeout = 130.0, |
45
|
|
|
$context = null, |
46
|
|
|
$keepalive = false, |
47
|
|
|
$heartbeat = 60, |
48
|
|
|
$ssl_protocol = null |
49
|
|
|
) { |
50
|
102 |
|
if ($heartbeat !== 0 && ($read_write_timeout <= ($heartbeat * 2))) { |
51
|
6 |
|
throw new \InvalidArgumentException('read_write_timeout must be greater than 2x the heartbeat'); |
52
|
|
|
} |
53
|
|
|
|
54
|
|
|
// SOCKET_EAGAIN is not defined in Windows |
55
|
96 |
|
self::$SOCKET_STRERROR_EAGAIN = socket_strerror(defined('SOCKET_EAGAIN') ? SOCKET_EAGAIN : SOCKET_EWOULDBLOCK); |
56
|
96 |
|
self::$SOCKET_STRERROR_EWOULDBLOCK = socket_strerror(SOCKET_EWOULDBLOCK); |
57
|
96 |
|
self::$SOCKET_STRERROR_EINTR = socket_strerror(SOCKET_EINTR); |
58
|
|
|
|
59
|
96 |
|
$this->protocol = 'tcp'; |
60
|
96 |
|
$this->host = $host; |
61
|
96 |
|
$this->port = $port; |
62
|
96 |
|
$this->connection_timeout = $connection_timeout; |
63
|
96 |
|
$this->read_timeout = $read_write_timeout; |
64
|
96 |
|
$this->write_timeout = $read_write_timeout; |
65
|
96 |
|
$this->context = $context; |
66
|
96 |
|
$this->keepalive = $keepalive; |
67
|
96 |
|
$this->heartbeat = $heartbeat; |
68
|
96 |
|
$this->initial_heartbeat = $heartbeat; |
69
|
96 |
|
$this->canDispatchPcntlSignal = $this->isPcntlSignalEnabled(); |
70
|
|
|
|
71
|
96 |
|
if (!is_resource($this->context) || get_resource_type($this->context) !== 'stream-context') { |
72
|
96 |
|
$this->context = stream_context_create(); |
73
|
48 |
|
} |
74
|
|
|
|
75
|
|
|
// tcp_nodelay was added in 7.1.0 |
76
|
96 |
|
if (PHP_VERSION_ID >= 70100) { |
77
|
32 |
|
stream_context_set_option($this->context, 'socket', 'tcp_nodelay', true); |
78
|
|
|
} |
79
|
|
|
|
80
|
96 |
|
$options = stream_context_get_options($this->context); |
81
|
96 |
|
if (!empty($options['ssl'])) { |
82
|
|
|
if (isset($ssl_protocol)) { |
83
|
|
|
$this->protocol = $ssl_protocol; |
84
|
|
|
} else { |
85
|
|
|
$this->protocol = 'ssl'; |
86
|
|
|
} |
87
|
|
|
} |
88
|
96 |
|
} |
89
|
|
|
|
90
|
|
|
/** |
91
|
|
|
* @inheritdoc |
92
|
|
|
*/ |
93
|
90 |
|
public function connect() |
94
|
|
|
{ |
95
|
90 |
|
$errstr = $errno = null; |
96
|
|
|
|
97
|
90 |
|
$remote = sprintf( |
98
|
90 |
|
'%s://%s:%s', |
99
|
90 |
|
$this->protocol, |
100
|
90 |
|
$this->host, |
101
|
90 |
|
$this->port |
102
|
45 |
|
); |
103
|
|
|
|
104
|
90 |
|
$this->set_error_handler(); |
105
|
|
|
|
106
|
|
|
try { |
107
|
90 |
|
$this->sock = stream_socket_client( |
108
|
90 |
|
$remote, |
109
|
75 |
|
$errno, |
110
|
75 |
|
$errstr, |
111
|
90 |
|
$this->connection_timeout, |
112
|
90 |
|
STREAM_CLIENT_CONNECT, |
113
|
90 |
|
$this->context |
114
|
45 |
|
); |
115
|
90 |
|
$this->cleanup_error_handler(); |
116
|
48 |
|
} catch (\ErrorException $e) { |
117
|
6 |
|
throw new AMQPIOException($e->getMessage()); |
118
|
|
|
} |
119
|
|
|
|
120
|
84 |
|
if (false === $this->sock) { |
121
|
|
|
throw new AMQPIOException( |
122
|
|
|
sprintf( |
123
|
|
|
'Error Connecting to server(%s): %s ', |
124
|
|
|
$errno, |
125
|
|
|
$errstr |
126
|
|
|
), |
127
|
|
|
$errno |
128
|
|
|
); |
129
|
|
|
} |
130
|
|
|
|
131
|
84 |
|
if (false === stream_socket_get_name($this->sock, true)) { |
132
|
|
|
throw new AMQPIOException( |
133
|
|
|
sprintf( |
134
|
|
|
'Connection refused: %s ', |
135
|
|
|
$remote |
136
|
|
|
) |
137
|
|
|
); |
138
|
|
|
} |
139
|
|
|
|
140
|
84 |
|
list($sec, $uSec) = MiscHelper::splitSecondsMicroseconds(max($this->read_timeout, $this->write_timeout)); |
141
|
84 |
|
if (!stream_set_timeout($this->sock, $sec, $uSec)) { |
142
|
|
|
throw new AMQPIOException('Timeout could not be set'); |
143
|
|
|
} |
144
|
|
|
|
145
|
|
|
// php cannot capture signals while streams are blocking |
146
|
84 |
|
if ($this->canDispatchPcntlSignal) { |
147
|
84 |
|
stream_set_blocking($this->sock, 0); |
148
|
84 |
|
stream_set_write_buffer($this->sock, 0); |
149
|
84 |
|
if (function_exists('stream_set_read_buffer')) { |
150
|
84 |
|
stream_set_read_buffer($this->sock, 0); |
151
|
42 |
|
} |
152
|
42 |
|
} else { |
153
|
|
|
stream_set_blocking($this->sock, true); |
154
|
|
|
} |
155
|
|
|
|
156
|
84 |
|
if ($this->keepalive) { |
157
|
|
|
$this->enable_keepalive(); |
158
|
|
|
} |
159
|
84 |
|
$this->heartbeat = $this->initial_heartbeat; |
160
|
84 |
|
} |
161
|
|
|
|
162
|
|
|
/** |
163
|
|
|
* @inheritdoc |
164
|
|
|
*/ |
165
|
84 |
|
public function read($len) |
166
|
|
|
{ |
167
|
84 |
|
$this->check_heartbeat(); |
168
|
|
|
|
169
|
84 |
|
list($timeout_sec, $timeout_uSec) = MiscHelper::splitSecondsMicroseconds($this->read_timeout); |
170
|
|
|
|
171
|
84 |
|
$read_start = microtime(true); |
172
|
84 |
|
$read = 0; |
173
|
84 |
|
$data = ''; |
174
|
|
|
|
175
|
84 |
|
while ($read < $len) { |
176
|
84 |
|
if (!is_resource($this->sock) || feof($this->sock)) { |
177
|
|
|
throw new AMQPConnectionClosedException('Broken pipe or closed connection'); |
178
|
|
|
} |
179
|
|
|
|
180
|
84 |
|
$this->set_error_handler(); |
181
|
|
|
try { |
182
|
84 |
|
$buffer = fread($this->sock, ($len - $read)); |
183
|
84 |
|
$this->cleanup_error_handler(); |
184
|
42 |
|
} catch (\ErrorException $e) { |
185
|
|
|
throw new AMQPDataReadException($e->getMessage(), $e->getCode(), $e); |
186
|
|
|
} |
187
|
|
|
|
188
|
84 |
|
if ($buffer === false) { |
189
|
|
|
throw new AMQPDataReadException('Error receiving data'); |
190
|
|
|
} |
191
|
|
|
|
192
|
84 |
View Code Duplication |
if ($buffer === '') { |
|
|
|
|
193
|
|
|
$read_now = microtime(true); |
194
|
|
|
$t_read = $read_now - $read_start; |
195
|
|
|
if ($t_read > $this->read_timeout) { |
196
|
|
|
throw new AMQPTimeoutException('Too many read attempts detected in StreamIO'); |
197
|
|
|
} |
198
|
|
|
$this->select($timeout_sec, $timeout_uSec); |
199
|
|
|
|
200
|
|
|
continue; |
201
|
|
|
} |
202
|
|
|
|
203
|
84 |
|
$this->last_read = microtime(true); |
204
|
84 |
|
$read_start = $this->last_read; |
205
|
84 |
|
$read += mb_strlen($buffer, 'ASCII'); |
206
|
84 |
|
$data .= $buffer; |
207
|
42 |
|
} |
208
|
|
|
|
209
|
84 |
|
if (mb_strlen($data, 'ASCII') !== $len) { |
210
|
|
|
throw new AMQPDataReadException( |
211
|
|
|
sprintf( |
212
|
|
|
'Error reading data. Received %s instead of expected %s bytes', |
213
|
|
|
mb_strlen($data, 'ASCII'), |
214
|
|
|
$len |
215
|
|
|
) |
216
|
|
|
); |
217
|
|
|
} |
218
|
|
|
|
219
|
84 |
|
$this->last_read = microtime(true); |
220
|
|
|
|
221
|
84 |
|
return $data; |
222
|
|
|
} |
223
|
|
|
|
224
|
|
|
/** |
225
|
|
|
* @inheritdoc |
226
|
|
|
*/ |
227
|
90 |
|
public function write($data) |
228
|
|
|
{ |
229
|
90 |
|
$written = 0; |
230
|
90 |
|
$len = mb_strlen($data, 'ASCII'); |
231
|
90 |
|
$write_start = microtime(true); |
232
|
|
|
|
233
|
90 |
|
while ($written < $len) { |
234
|
90 |
|
if (!is_resource($this->sock)) { |
235
|
|
|
throw new AMQPConnectionClosedException('Broken pipe or closed connection'); |
236
|
|
|
} |
237
|
|
|
|
238
|
90 |
|
$result = false; |
239
|
90 |
|
$this->set_error_handler(); |
240
|
|
|
// OpenSSL's C library function SSL_write() can balk on buffers > 8192 |
241
|
|
|
// bytes in length, so we're limiting the write size here. On both TLS |
242
|
|
|
// and plaintext connections, the write loop will continue until the |
243
|
|
|
// buffer has been fully written. |
244
|
|
|
// This behavior has been observed in OpenSSL dating back to at least |
245
|
|
|
// September 2002: |
246
|
|
|
// http://comments.gmane.org/gmane.comp.encryption.openssl.user/4361 |
247
|
|
|
try { |
248
|
90 |
|
$buffer = mb_substr($data, $written, self::BUFFER_SIZE, 'ASCII'); |
249
|
90 |
|
$result = fwrite($this->sock, $buffer); |
250
|
90 |
|
$this->cleanup_error_handler(); |
251
|
45 |
|
} catch (\ErrorException $e) { |
252
|
|
|
$code = $this->last_error['errno']; |
253
|
|
|
switch ($code) { |
254
|
|
|
case 8: // constant is missing for this error type |
255
|
|
|
$this->close(); |
256
|
|
|
throw new AMQPConnectionClosedException('Broken pipe or closed connection', $code, $e); |
257
|
|
|
case SOCKET_ETIMEDOUT: |
258
|
|
|
$this->close(); |
259
|
|
|
throw new AMQPConnectionClosedException('Connection timed out', $code, $e); |
260
|
|
|
default: |
261
|
|
|
throw new AMQPRuntimeException($e->getMessage(), $code, $e); |
262
|
|
|
} |
263
|
|
|
} |
264
|
|
|
|
265
|
90 |
|
if ($result === false) { |
266
|
|
|
throw new AMQPRuntimeException('Error sending data'); |
267
|
|
|
} |
268
|
|
|
|
269
|
90 |
|
if ($this->timed_out()) { |
270
|
|
|
throw AMQPTimeoutException::writeTimeout($this->write_timeout); |
271
|
|
|
} |
272
|
|
|
|
273
|
90 |
|
$now = microtime(true); |
274
|
90 |
|
if ($result > 0) { |
275
|
90 |
|
$this->last_write = $write_start = $now; |
276
|
90 |
|
$written += $result; |
277
|
45 |
|
} else { |
278
|
6 |
|
if (feof($this->sock)) { |
279
|
|
|
$this->close(); |
280
|
|
|
throw new AMQPConnectionClosedException('Broken pipe or closed connection'); |
281
|
|
|
} |
282
|
6 |
|
if (($now - $write_start) > $this->write_timeout) { |
283
|
6 |
|
throw AMQPTimeoutException::writeTimeout($this->write_timeout); |
284
|
|
|
} |
285
|
|
|
// check stream and prevent from high CPU usage |
286
|
6 |
|
$this->select_write(); |
287
|
|
|
} |
288
|
45 |
|
} |
289
|
90 |
|
} |
290
|
|
|
|
291
|
|
|
/** |
292
|
|
|
* @inheritdoc |
293
|
|
|
*/ |
294
|
30 |
|
public function error_handler($errno, $errstr, $errfile, $errline, $errcontext = null) |
295
|
|
|
{ |
296
|
|
|
// fwrite notice that the stream isn't ready - EAGAIN or EWOULDBLOCK |
297
|
30 |
|
if (strpos($errstr, self::$SOCKET_STRERROR_EAGAIN) !== false |
298
|
30 |
|
|| strpos($errstr, self::$SOCKET_STRERROR_EWOULDBLOCK) !== false) { |
299
|
|
|
// it's allowed to retry |
300
|
|
|
return; |
301
|
|
|
} |
302
|
|
|
|
303
|
|
|
// stream_select warning that it has been interrupted by a signal - EINTR |
304
|
30 |
|
if (strpos($errstr, self::$SOCKET_STRERROR_EINTR) !== false) { |
305
|
|
|
// it's allowed while processing signals |
306
|
18 |
|
return; |
307
|
|
|
} |
308
|
|
|
|
309
|
12 |
|
parent::error_handler($errno, $errstr, $errfile, $errline, $errcontext); |
310
|
12 |
|
} |
311
|
|
|
|
312
|
66 |
View Code Duplication |
public function close() |
|
|
|
|
313
|
|
|
{ |
314
|
66 |
|
$this->disableHeartbeat(); |
315
|
66 |
|
if (is_resource($this->sock)) { |
316
|
60 |
|
fclose($this->sock); |
317
|
30 |
|
} |
318
|
66 |
|
$this->sock = null; |
319
|
66 |
|
$this->last_read = null; |
320
|
66 |
|
$this->last_write = null; |
321
|
66 |
|
} |
322
|
|
|
|
323
|
|
|
/** |
324
|
|
|
* @inheritdoc |
325
|
|
|
*/ |
326
|
|
|
public function getSocket() |
327
|
|
|
{ |
328
|
|
|
return $this->sock; |
329
|
|
|
} |
330
|
|
|
|
331
|
|
|
/** |
332
|
|
|
* @inheritdoc |
333
|
|
|
*/ |
334
|
90 |
|
protected function do_select($sec, $usec) |
335
|
|
|
{ |
336
|
90 |
|
$read = array($this->sock); |
337
|
90 |
|
$write = null; |
338
|
90 |
|
$except = null; |
339
|
|
|
|
340
|
90 |
|
return stream_select($read, $write, $except, $sec, $usec); |
341
|
|
|
} |
342
|
|
|
|
343
|
|
|
/** |
344
|
|
|
* @return int|bool |
345
|
|
|
*/ |
346
|
|
|
protected function select_write() |
347
|
|
|
{ |
348
|
|
|
$read = $except = null; |
349
|
|
|
$write = array($this->sock); |
350
|
|
|
|
351
|
|
|
return stream_select($read, $write, $except, 0, 100000); |
352
|
|
|
} |
353
|
|
|
|
354
|
|
|
/** |
355
|
|
|
* @return mixed |
356
|
|
|
*/ |
357
|
84 |
|
protected function timed_out() |
358
|
|
|
{ |
359
|
|
|
// get status of socket to determine whether or not it has timed out |
360
|
84 |
|
$info = stream_get_meta_data($this->sock); |
361
|
|
|
|
362
|
84 |
|
return $info['timed_out']; |
363
|
|
|
} |
364
|
|
|
|
365
|
|
|
/** |
366
|
|
|
* @throws \PhpAmqpLib\Exception\AMQPIOException |
367
|
|
|
*/ |
368
|
|
|
protected function enable_keepalive() |
369
|
|
|
{ |
370
|
|
|
if ($this->protocol === 'ssl') { |
371
|
|
|
throw new AMQPIOException('Can not enable keepalive: ssl connection does not support keepalive (#70939)'); |
372
|
|
|
} |
373
|
|
|
|
374
|
|
|
if ($this->protocol === 'tls') { |
375
|
|
|
throw new AMQPIOException('Can not enable keepalive: tls connection does not support keepalive (#70939)'); |
376
|
|
|
} |
377
|
|
|
|
378
|
|
|
if (!function_exists('socket_import_stream')) { |
379
|
|
|
throw new AMQPIOException('Can not enable keepalive: function socket_import_stream does not exist'); |
380
|
|
|
} |
381
|
|
|
|
382
|
|
|
if (!defined('SOL_SOCKET') || !defined('SO_KEEPALIVE')) { |
383
|
|
|
throw new AMQPIOException('Can not enable keepalive: SOL_SOCKET or SO_KEEPALIVE is not defined'); |
384
|
|
|
} |
385
|
|
|
|
386
|
|
|
$socket = socket_import_stream($this->sock); |
|
|
|
|
387
|
|
|
socket_set_option($socket, SOL_SOCKET, SO_KEEPALIVE, 1); |
388
|
|
|
} |
389
|
|
|
} |
390
|
|
|
|
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.
You can also find more detailed suggestions in the “Code” section of your repository.