1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace PhpAmqpLib\Wire\IO; |
4
|
|
|
|
5
|
|
|
use PhpAmqpLib\Connection\AMQPConnectionConfig; |
6
|
|
|
use PhpAmqpLib\Exception\AMQPConnectionClosedException; |
7
|
|
|
use PhpAmqpLib\Exception\AMQPIOException; |
8
|
|
|
use PhpAmqpLib\Exception\AMQPSocketException; |
9
|
|
|
use PhpAmqpLib\Exception\AMQPTimeoutException; |
10
|
|
|
use PhpAmqpLib\Helper\MiscHelper; |
11
|
|
|
use PhpAmqpLib\Helper\SocketConstants; |
12
|
|
|
|
13
|
|
|
class SocketIO extends AbstractIO |
14
|
|
|
{ |
15
|
|
|
/** @var null|resource|\Socket */ |
16
|
|
|
private $sock; |
17
|
|
|
|
18
|
|
|
/** |
19
|
|
|
* @param string $host |
20
|
|
|
* @param int $port |
21
|
|
|
* @param int|float $read_timeout |
22
|
|
|
* @param bool $keepalive |
23
|
|
|
* @param int|float|null $write_timeout if null defaults to read timeout |
24
|
|
|
* @param int $heartbeat how often to send heartbeat. 0 means off |
25
|
23 |
|
* @param null|AMQPConnectionConfig $config |
26
|
|
|
*/ |
27
|
|
|
public function __construct( |
28
|
|
|
$host, |
29
|
|
|
$port, |
30
|
|
|
$read_timeout = 3, |
31
|
|
|
$keepalive = false, |
32
|
|
|
$write_timeout = null, |
33
|
23 |
|
$heartbeat = 0, |
34
|
23 |
|
?AMQPConnectionConfig $config = null |
35
|
23 |
|
) { |
36
|
23 |
|
$this->config = $config; |
37
|
23 |
|
$this->host = str_replace(['[', ']'], '', $host); |
38
|
23 |
|
$this->port = $port; |
39
|
23 |
|
$this->read_timeout = (float)$read_timeout; |
40
|
23 |
|
$this->write_timeout = (float)($write_timeout ?: $read_timeout); |
41
|
|
|
$this->heartbeat = $heartbeat; |
42
|
|
|
$this->initial_heartbeat = $heartbeat; |
43
|
|
|
$this->keepalive = $keepalive; |
44
|
|
|
$this->canDispatchPcntlSignal = $this->isPcntlSignalEnabled(); |
45
|
|
|
|
46
|
|
|
/* |
47
|
|
|
TODO FUTURE enable this check |
48
|
|
|
php-amqplib/php-amqplib#648, php-amqplib/php-amqplib#666 |
49
|
|
|
if ($this->heartbeat !== 0 && ($this->read_timeout <= ($this->heartbeat * 2))) { |
50
|
|
|
throw new \InvalidArgumentException('read_timeout must be greater than 2x the heartbeat'); |
51
|
|
|
} |
52
|
|
|
if ($this->heartbeat !== 0 && ($this->write_timeout <= ($this->heartbeat * 2))) { |
53
|
|
|
throw new \InvalidArgumentException('send_timeout must be greater than 2x the heartbeat'); |
54
|
|
|
} |
55
|
|
|
*/ |
56
|
|
|
} |
57
|
22 |
|
|
58
|
|
|
/** |
59
|
22 |
|
* @inheritdoc |
60
|
|
|
*/ |
61
|
22 |
|
public function connect() |
62
|
22 |
|
{ |
63
|
22 |
|
$this->sock = socket_create(!$this->isIpv6() ? AF_INET : AF_INET6, SOCK_STREAM, SOL_TCP); |
64
|
22 |
|
|
65
|
|
|
list($sec, $uSec) = MiscHelper::splitSecondsMicroseconds($this->write_timeout); |
66
|
22 |
|
socket_set_option($this->sock, SOL_SOCKET, SO_SNDTIMEO, array('sec' => $sec, 'usec' => $uSec)); |
67
|
|
|
list($sec, $uSec) = MiscHelper::splitSecondsMicroseconds($this->read_timeout); |
68
|
22 |
|
socket_set_option($this->sock, SOL_SOCKET, SO_RCVTIMEO, array('sec' => $sec, 'usec' => $uSec)); |
69
|
22 |
|
|
70
|
1 |
|
$this->setErrorHandler(); |
71
|
1 |
|
try { |
72
|
22 |
|
$connected = socket_connect($this->sock, $this->host, $this->port); |
73
|
22 |
|
$this->throwOnError(); |
74
|
|
|
} catch (\ErrorException $e) { |
75
|
22 |
|
$connected = false; |
76
|
1 |
|
} finally { |
77
|
1 |
|
$this->restoreErrorHandler(); |
78
|
1 |
|
} |
79
|
|
|
if (!$connected) { |
80
|
|
|
$errno = socket_last_error($this->sock); |
81
|
|
|
$errstr = socket_strerror($errno); |
82
|
|
|
throw new AMQPIOException(sprintf( |
83
|
|
|
'Error Connecting to server (%s): %s', |
84
|
|
|
$errno, |
85
|
21 |
|
$errstr |
86
|
21 |
|
), $errno); |
87
|
|
|
} |
88
|
21 |
|
|
89
|
1 |
|
socket_set_block($this->sock); |
90
|
|
|
socket_set_option($this->sock, SOL_TCP, TCP_NODELAY, 1); |
91
|
|
|
if ($this->config && $this->config->getSendBufferSize() > 0) { |
92
|
21 |
|
socket_set_option($this->sock, SOL_SOCKET, SO_SNDBUF, $this->config->getSendBufferSize()); |
93
|
|
|
} |
94
|
|
|
|
95
|
|
|
if ($this->keepalive) { |
96
|
|
|
$this->enable_keepalive(); |
97
|
|
|
} |
98
|
|
|
|
99
|
|
|
$this->heartbeat = $this->initial_heartbeat; |
100
|
|
|
} |
101
|
|
|
|
102
|
|
|
/** |
103
|
|
|
* @deprecated |
104
|
|
|
* @return null|resource|\Socket |
105
|
|
|
*/ |
106
|
21 |
|
public function getSocket() |
107
|
|
|
{ |
108
|
21 |
|
return $this->sock; |
109
|
1 |
|
} |
110
|
|
|
|
111
|
1 |
|
/** |
112
|
|
|
* @inheritdoc |
113
|
|
|
*/ |
114
|
|
|
public function read($len) |
115
|
20 |
|
{ |
116
|
|
|
if (is_null($this->sock)) { |
117
|
20 |
|
throw new AMQPSocketException(sprintf( |
118
|
20 |
|
'Socket was null! Last SocketError was: %s', |
119
|
20 |
|
socket_strerror(socket_last_error()) |
120
|
20 |
|
)); |
121
|
20 |
|
} |
122
|
20 |
|
|
123
|
20 |
|
$this->check_heartbeat(); |
124
|
20 |
|
|
125
|
|
|
list($timeout_sec, $timeout_uSec) = MiscHelper::splitSecondsMicroseconds($this->read_timeout); |
126
|
|
|
$read_start = microtime(true); |
127
|
|
|
$read = 0; |
128
|
|
|
$data = ''; |
129
|
|
|
while ($read < $len) { |
130
|
|
|
$buffer = null; |
131
|
|
|
$result = socket_recv($this->sock, $buffer, $len - $read, 0); |
132
|
|
|
if ($result === 0) { |
133
|
20 |
|
// From linux recv() manual: |
134
|
|
|
// When a stream socket peer has performed an orderly shutdown, |
135
|
|
|
// the return value will be 0 (the traditional "end-of-file" return). |
136
|
|
|
// http://php.net/manual/en/function.socket-recv.php#47182 |
137
|
|
|
$this->close(); |
138
|
|
|
throw new AMQPConnectionClosedException('Broken pipe or closed connection'); |
139
|
|
|
} |
140
|
|
|
|
141
|
|
|
if (empty($buffer)) { |
142
|
|
|
$read_now = microtime(true); |
143
|
20 |
|
$t_read = $read_now - $read_start; |
144
|
20 |
|
if ($t_read > $this->read_timeout) { |
145
|
|
|
throw new AMQPTimeoutException('Too many read attempts detected in SocketIO'); |
146
|
|
|
} |
147
|
20 |
|
$this->select($timeout_sec, $timeout_uSec); |
148
|
|
|
continue; |
149
|
|
|
} |
150
|
|
|
|
151
|
|
|
$read += mb_strlen($buffer, 'ASCII'); |
152
|
|
|
$data .= $buffer; |
153
|
|
|
} |
154
|
|
|
|
155
|
20 |
|
if (mb_strlen($data, 'ASCII') !== $len) { |
156
|
|
|
throw new AMQPIOException(sprintf( |
157
|
20 |
|
'Error reading data. Received %s instead of expected %s bytes', |
158
|
|
|
mb_strlen($data, 'ASCII'), |
159
|
|
|
$len |
160
|
|
|
)); |
161
|
|
|
} |
162
|
|
|
|
163
|
26 |
|
$this->last_read = microtime(true); |
|
|
|
|
164
|
|
|
|
165
|
|
|
return $data; |
166
|
26 |
|
} |
167
|
1 |
|
|
168
|
|
|
/** |
169
|
1 |
|
* @inheritdoc |
170
|
|
|
*/ |
171
|
|
|
public function write($data) |
172
|
|
|
{ |
173
|
25 |
|
// Null sockets are invalid, throw exception |
174
|
|
|
if (is_null($this->sock)) { |
175
|
25 |
|
throw new AMQPSocketException(sprintf( |
176
|
25 |
|
'Socket was null! Last SocketError was: %s', |
177
|
25 |
|
socket_strerror(socket_last_error()) |
178
|
|
|
)); |
179
|
25 |
|
} |
180
|
25 |
|
|
181
|
|
|
$this->checkBrokerHeartbeat(); |
182
|
25 |
|
|
183
|
25 |
|
$written = 0; |
184
|
25 |
|
$len = mb_strlen($data, 'ASCII'); |
185
|
25 |
|
$write_start = microtime(true); |
186
|
|
|
|
187
|
|
|
while ($written < $len) { |
188
|
|
|
$this->setErrorHandler(); |
189
|
|
|
try { |
190
|
|
|
$result = 0; |
191
|
|
|
if ($this->select_write()) { |
192
|
|
|
// if data is smaller than buffer - no need to cut part of it |
193
|
|
|
if ($len <= self::BUFFER_SIZE) { |
194
|
|
|
$buffer = $data; |
195
|
|
|
} else { |
196
|
|
|
$buffer = mb_substr($data, $written, self::BUFFER_SIZE, 'ASCII'); |
197
|
|
|
} |
198
|
|
|
$result = socket_write($this->sock, $buffer); |
199
|
|
|
} |
200
|
|
|
$this->throwOnError(); |
201
|
|
|
} catch (\ErrorException $e) { |
202
|
|
|
$code = socket_last_error($this->sock); |
203
|
|
|
$constants = SocketConstants::getInstance(); |
204
|
|
|
switch ($code) { |
205
|
|
|
case $constants->SOCKET_EPIPE: |
206
|
25 |
|
case $constants->SOCKET_ENETDOWN: |
207
|
25 |
|
case $constants->SOCKET_ENETUNREACH: |
208
|
|
|
case $constants->SOCKET_ENETRESET: |
209
|
|
|
case $constants->SOCKET_ECONNABORTED: |
210
|
25 |
|
case $constants->SOCKET_ECONNRESET: |
211
|
|
|
case $constants->SOCKET_ECONNREFUSED: |
212
|
|
|
case $constants->SOCKET_ETIMEDOUT: |
213
|
|
|
$this->close(); |
214
|
|
|
throw new AMQPConnectionClosedException(socket_strerror($code), $code, $e); |
215
|
|
|
default: |
216
|
|
|
throw new AMQPIOException(sprintf( |
217
|
25 |
|
'Error sending data. Last SocketError: %s', |
218
|
25 |
|
socket_strerror($code) |
219
|
25 |
|
), $code, $e); |
220
|
25 |
|
} |
221
|
|
|
} finally { |
222
|
1 |
|
$this->restoreErrorHandler(); |
223
|
1 |
|
} |
224
|
|
|
|
225
|
|
|
if ($result === false) { |
226
|
|
|
throw new AMQPIOException(sprintf( |
227
|
|
|
'Error sending data. Last SocketError: %s', |
228
|
|
|
socket_strerror(socket_last_error($this->sock)) |
229
|
|
|
)); |
230
|
|
|
} |
231
|
|
|
|
232
|
19 |
|
$now = microtime(true); |
233
|
|
|
if ($result > 0) { |
234
|
19 |
|
$this->last_write = $write_start = $now; |
|
|
|
|
235
|
19 |
|
$written += $result; |
236
|
19 |
|
} else { |
237
|
|
|
if (($now - $write_start) > $this->write_timeout) { |
238
|
19 |
|
throw AMQPTimeoutException::writeTimeout($this->write_timeout); |
239
|
19 |
|
} |
240
|
19 |
|
} |
241
|
|
|
} |
242
|
|
|
} |
243
|
|
|
|
244
|
|
|
/** |
245
|
|
|
* @inheritdoc |
246
|
22 |
|
*/ |
247
|
|
|
public function close() |
248
|
22 |
|
{ |
249
|
1 |
|
$this->disableHeartbeat(); |
250
|
1 |
|
if (is_resource($this->sock) || is_a($this->sock, \Socket::class)) { |
251
|
|
|
socket_close($this->sock); |
252
|
|
|
} |
253
|
21 |
|
$this->sock = null; |
254
|
21 |
|
$this->last_read = 0; |
255
|
21 |
|
$this->last_write = 0; |
256
|
|
|
} |
257
|
21 |
|
|
258
|
|
|
/** |
259
|
|
|
* @inheritdoc |
260
|
|
|
*/ |
261
|
|
|
protected function do_select(?int $sec, int $usec) |
262
|
|
|
{ |
263
|
20 |
|
if (!is_resource($this->sock) && !is_a($this->sock, \Socket::class)) { |
264
|
|
|
$this->sock = null; |
265
|
20 |
|
throw new AMQPConnectionClosedException('Broken pipe or closed connection', 0); |
266
|
20 |
|
} |
267
|
|
|
|
268
|
20 |
|
$read = array($this->sock); |
269
|
|
|
$write = null; |
270
|
|
|
$except = null; |
271
|
|
|
|
272
|
|
|
return socket_select($read, $write, $except, $sec, $usec); |
|
|
|
|
273
|
|
|
} |
274
|
1 |
|
|
275
|
|
|
/** |
276
|
1 |
|
* @return int|bool |
277
|
|
|
*/ |
278
|
|
|
protected function select_write() |
279
|
|
|
{ |
280
|
1 |
|
$read = $except = null; |
281
|
|
|
$write = array($this->sock); |
282
|
|
|
|
283
|
|
|
return socket_select($read, $write, $except, 0, 100000); |
|
|
|
|
284
|
|
|
} |
285
|
|
|
|
286
|
4 |
|
/** |
287
|
|
|
* @throws \PhpAmqpLib\Exception\AMQPIOException |
288
|
4 |
|
*/ |
289
|
|
|
protected function enable_keepalive(): void |
290
|
4 |
|
{ |
291
|
|
|
if (!defined('SOL_SOCKET') || !defined('SO_KEEPALIVE')) { |
292
|
2 |
|
throw new AMQPIOException('Can not enable keepalive: SOL_SOCKET or SO_KEEPALIVE is not defined'); |
293
|
|
|
} |
294
|
|
|
|
295
|
2 |
|
socket_set_option($this->sock, SOL_SOCKET, SO_KEEPALIVE, 1); |
296
|
|
|
} |
297
|
|
|
|
298
|
|
|
/** |
299
|
|
|
* @inheritdoc |
300
|
|
|
*/ |
301
|
23 |
|
public function error_handler($errno, $errstr, $errfile, $errline): void |
302
|
|
|
{ |
303
|
23 |
|
$constants = SocketConstants::getInstance(); |
304
|
23 |
|
// socket_select warning that it has been interrupted by a signal - EINTR |
305
|
|
|
if (isset($constants->SOCKET_EINTR) && false !== strrpos($errstr, socket_strerror($constants->SOCKET_EINTR))) { |
306
|
|
|
// it's allowed while processing signals |
307
|
|
|
return; |
308
|
|
|
} |
309
|
|
|
|
310
|
|
|
parent::error_handler($errno, $errstr, $errfile, $errline); |
311
|
|
|
} |
312
|
|
|
|
313
|
|
|
/** |
314
|
|
|
* @inheritdoc |
315
|
|
|
*/ |
316
|
|
|
protected function setErrorHandler(): void |
317
|
|
|
{ |
318
|
|
|
parent::setErrorHandler(); |
319
|
|
|
socket_clear_error($this->sock); |
320
|
|
|
} |
321
|
|
|
|
322
|
|
|
private function isIpv6(): bool |
323
|
|
|
{ |
324
|
|
|
$ipv6 = filter_var($this->host, FILTER_VALIDATE_IP, FILTER_FLAG_IPV6); |
325
|
|
|
|
326
|
|
|
if ($ipv6 !== false || checkdnsrr($this->host, 'AAAA')) { |
327
|
|
|
return true; |
328
|
|
|
} |
329
|
|
|
|
330
|
|
|
return false; |
331
|
|
|
} |
332
|
|
|
} |
333
|
|
|
|
Our type inference engine has found a suspicous assignment of a value to a property. This check raises an issue when a value that can be of a mixed type is assigned to a property that is type hinted more strictly.
For example, imagine you have a variable
$accountId
that can either hold an Id object or false (if there is no account id yet). Your code now assigns that value to theid
property of an instance of theAccount
class. This class holds a proper account, so the id value must no longer be false.Either this assignment is in error or a type check should be added for that assignment.