1
|
|
|
<?php |
2
|
|
|
namespace PhpAmqpLib\Wire\IO; |
3
|
|
|
|
4
|
|
|
use PhpAmqpLib\Exception\AMQPConnectionClosedException; |
5
|
|
|
use PhpAmqpLib\Exception\AMQPIOException; |
6
|
|
|
use PhpAmqpLib\Exception\AMQPSocketException; |
7
|
|
|
use PhpAmqpLib\Exception\AMQPTimeoutException; |
8
|
|
|
use PhpAmqpLib\Helper\MiscHelper; |
9
|
|
|
|
10
|
|
|
class SocketIO extends AbstractIO |
11
|
|
|
{ |
12
|
|
|
/** @var resource */ |
13
|
|
|
private $sock; |
14
|
|
|
|
15
|
|
|
/** |
16
|
|
|
* @param string $host |
17
|
|
|
* @param int $port |
18
|
|
|
* @param int|float $read_timeout |
19
|
|
|
* @param bool $keepalive |
20
|
|
|
* @param int|float|null $write_timeout if null defaults to read timeout |
21
|
|
|
* @param int $heartbeat how often to send heartbeat. 0 means off |
22
|
|
|
*/ |
23
|
114 |
|
public function __construct($host, $port, $read_timeout = 130, $keepalive = false, $write_timeout = null, $heartbeat = 60) |
24
|
|
|
{ |
25
|
114 |
|
$this->host = $host; |
26
|
114 |
|
$this->port = $port; |
27
|
114 |
|
$this->read_timeout = $read_timeout; |
28
|
114 |
|
$this->write_timeout = $write_timeout ?: $read_timeout; |
29
|
114 |
|
$this->heartbeat = $heartbeat; |
30
|
114 |
|
$this->initial_heartbeat = $heartbeat; |
31
|
114 |
|
$this->keepalive = $keepalive; |
32
|
114 |
|
$this->canDispatchPcntlSignal = $this->isPcntlSignalEnabled(); |
33
|
|
|
|
34
|
|
|
/* |
35
|
|
|
TODO FUTURE enable this check |
36
|
|
|
php-amqplib/php-amqplib#648, php-amqplib/php-amqplib#666 |
37
|
|
|
if ($this->heartbeat !== 0 && ($this->read_timeout <= ($this->heartbeat * 2))) { |
38
|
|
|
throw new \InvalidArgumentException('read_timeout must be greater than 2x the heartbeat'); |
39
|
|
|
} |
40
|
|
|
if ($this->heartbeat !== 0 && ($this->write_timeout <= ($this->heartbeat * 2))) { |
41
|
|
|
throw new \InvalidArgumentException('send_timeout must be greater than 2x the heartbeat'); |
42
|
|
|
} |
43
|
|
|
*/ |
44
|
114 |
|
} |
45
|
|
|
|
46
|
|
|
/** |
47
|
|
|
* @inheritdoc |
48
|
|
|
*/ |
49
|
114 |
|
public function connect() |
50
|
|
|
{ |
51
|
114 |
|
$this->sock = socket_create(AF_INET, SOCK_STREAM, SOL_TCP); |
52
|
|
|
|
53
|
114 |
|
list($sec, $uSec) = MiscHelper::splitSecondsMicroseconds($this->write_timeout); |
54
|
114 |
|
socket_set_option($this->sock, SOL_SOCKET, SO_SNDTIMEO, array('sec' => $sec, 'usec' => $uSec)); |
55
|
114 |
|
list($sec, $uSec) = MiscHelper::splitSecondsMicroseconds($this->read_timeout); |
56
|
114 |
|
socket_set_option($this->sock, SOL_SOCKET, SO_RCVTIMEO, array('sec' => $sec, 'usec' => $uSec)); |
57
|
|
|
|
58
|
114 |
|
$this->set_error_handler(); |
59
|
|
|
try { |
60
|
114 |
|
$connected = socket_connect($this->sock, $this->host, $this->port); |
61
|
114 |
|
$this->cleanup_error_handler(); |
62
|
60 |
|
} catch (\ErrorException $e) { |
63
|
6 |
|
$connected = false; |
64
|
|
|
} |
65
|
114 |
|
if (!$connected) { |
66
|
6 |
|
$errno = socket_last_error($this->sock); |
67
|
6 |
|
$errstr = socket_strerror($errno); |
68
|
6 |
|
throw new AMQPIOException(sprintf( |
69
|
6 |
|
'Error Connecting to server (%s): %s', |
70
|
5 |
|
$errno, |
71
|
2 |
|
$errstr |
72
|
5 |
|
), $errno); |
73
|
|
|
} |
74
|
|
|
|
75
|
108 |
|
socket_set_block($this->sock); |
76
|
108 |
|
socket_set_option($this->sock, SOL_TCP, TCP_NODELAY, 1); |
77
|
|
|
|
78
|
108 |
|
if ($this->keepalive) { |
79
|
6 |
|
$this->enable_keepalive(); |
80
|
3 |
|
} |
81
|
|
|
|
82
|
108 |
|
$this->heartbeat = $this->initial_heartbeat; |
83
|
108 |
|
} |
84
|
|
|
|
85
|
|
|
/** |
86
|
|
|
* @inheritdoc |
87
|
|
|
*/ |
88
|
|
|
public function getSocket() |
89
|
|
|
{ |
90
|
|
|
return $this->sock; |
91
|
|
|
} |
92
|
|
|
|
93
|
|
|
/** |
94
|
|
|
* @inheritdoc |
95
|
|
|
*/ |
96
|
108 |
|
public function read($len) |
97
|
|
|
{ |
98
|
108 |
|
if (is_null($this->sock)) { |
99
|
6 |
|
throw new AMQPSocketException(sprintf( |
100
|
6 |
|
'Socket was null! Last SocketError was: %s', |
101
|
6 |
|
socket_strerror(socket_last_error()) |
102
|
3 |
|
)); |
103
|
|
|
} |
104
|
|
|
|
105
|
102 |
|
$this->check_heartbeat(); |
106
|
|
|
|
107
|
102 |
|
list($timeout_sec, $timeout_uSec) = MiscHelper::splitSecondsMicroseconds($this->read_timeout); |
108
|
102 |
|
$read_start = microtime(true); |
109
|
102 |
|
$read = 0; |
110
|
102 |
|
$data = ''; |
111
|
102 |
|
while ($read < $len) { |
112
|
102 |
|
$buffer = null; |
113
|
102 |
|
$result = socket_recv($this->sock, $buffer, $len - $read, 0); |
114
|
102 |
|
if ($result === 0) { |
115
|
|
|
// From linux recv() manual: |
116
|
|
|
// When a stream socket peer has performed an orderly shutdown, |
117
|
|
|
// the return value will be 0 (the traditional "end-of-file" return). |
118
|
|
|
// http://php.net/manual/en/function.socket-recv.php#47182 |
119
|
|
|
$this->close(); |
120
|
|
|
throw new AMQPConnectionClosedException('Broken pipe or closed connection'); |
121
|
|
|
} |
122
|
|
|
|
123
|
102 |
View Code Duplication |
if (empty($buffer)) { |
|
|
|
|
124
|
|
|
$read_now = microtime(true); |
125
|
|
|
$t_read = $read_now - $read_start; |
126
|
|
|
if ($t_read > $this->read_timeout) { |
127
|
|
|
throw new AMQPTimeoutException('Too many read attempts detected in SocketIO'); |
128
|
|
|
} |
129
|
|
|
$this->select($timeout_sec, $timeout_uSec); |
130
|
|
|
continue; |
131
|
|
|
} |
132
|
|
|
|
133
|
102 |
|
$read += mb_strlen($buffer, 'ASCII'); |
134
|
102 |
|
$data .= $buffer; |
135
|
51 |
|
} |
136
|
|
|
|
137
|
102 |
|
if (mb_strlen($data, 'ASCII') != $len) { |
138
|
|
|
throw new AMQPIOException(sprintf( |
139
|
|
|
'Error reading data. Received %s instead of expected %s bytes', |
140
|
|
|
mb_strlen($data, 'ASCII'), |
141
|
|
|
$len |
142
|
|
|
)); |
143
|
|
|
} |
144
|
|
|
|
145
|
102 |
|
$this->last_read = microtime(true); |
146
|
|
|
|
147
|
102 |
|
return $data; |
148
|
|
|
} |
149
|
|
|
|
150
|
|
|
/** |
151
|
|
|
* @inheritdoc |
152
|
|
|
*/ |
153
|
114 |
|
public function write($data) |
154
|
|
|
{ |
155
|
114 |
|
$written = 0; |
156
|
114 |
|
$len = mb_strlen($data, 'ASCII'); |
157
|
114 |
|
$write_start = microtime(true); |
158
|
|
|
|
159
|
114 |
|
while ($written < $len) { |
160
|
|
|
// Null sockets are invalid, throw exception |
161
|
114 |
|
if (is_null($this->sock)) { |
162
|
6 |
|
throw new AMQPSocketException(sprintf( |
163
|
6 |
|
'Socket was null! Last SocketError was: %s', |
164
|
6 |
|
socket_strerror(socket_last_error()) |
165
|
3 |
|
)); |
166
|
|
|
} |
167
|
|
|
|
168
|
108 |
|
$this->set_error_handler(); |
169
|
|
|
try { |
170
|
108 |
|
$buffer = mb_substr($data, $written, self::BUFFER_SIZE, 'ASCII'); |
171
|
108 |
|
$result = socket_write($this->sock, $buffer, self::BUFFER_SIZE); |
172
|
108 |
|
$this->cleanup_error_handler(); |
173
|
54 |
|
} catch (\ErrorException $e) { |
174
|
|
|
$code = socket_last_error($this->sock); |
175
|
|
|
switch ($code) { |
176
|
|
|
case SOCKET_EPIPE: |
177
|
|
|
case SOCKET_ENETDOWN: |
178
|
|
|
case SOCKET_ENETUNREACH: |
179
|
|
|
case SOCKET_ENETRESET: |
180
|
|
|
case SOCKET_ECONNABORTED: |
181
|
|
|
case SOCKET_ECONNRESET: |
182
|
|
|
case SOCKET_ECONNREFUSED: |
183
|
|
|
case SOCKET_ETIMEDOUT: |
184
|
|
|
$this->close(); |
185
|
|
|
throw new AMQPConnectionClosedException(socket_strerror($code), $code, $e); |
186
|
|
|
default: |
187
|
|
|
throw new AMQPIOException(sprintf( |
188
|
|
|
'Error sending data. Last SocketError: %s', |
189
|
|
|
socket_strerror($code) |
190
|
|
|
), $code, $e); |
191
|
|
|
} |
192
|
|
|
} |
193
|
|
|
|
194
|
108 |
|
if ($result === false) { |
195
|
|
|
throw new AMQPIOException(sprintf( |
196
|
|
|
'Error sending data. Last SocketError: %s', |
197
|
|
|
socket_strerror(socket_last_error($this->sock)) |
198
|
|
|
)); |
199
|
|
|
} |
200
|
|
|
|
201
|
108 |
|
$now = microtime(true); |
202
|
108 |
|
if ($result > 0) { |
203
|
108 |
|
$this->last_write = $write_start = $now; |
204
|
108 |
|
$written += $result; |
205
|
54 |
|
} else { |
206
|
6 |
|
if (($now - $write_start) > $this->write_timeout) { |
207
|
6 |
|
throw AMQPTimeoutException::writeTimeout($this->write_timeout); |
208
|
|
|
} |
209
|
6 |
|
$this->select_write(); |
210
|
|
|
} |
211
|
54 |
|
} |
212
|
108 |
|
} |
213
|
|
|
|
214
|
|
|
/** |
215
|
|
|
* @inheritdoc |
216
|
|
|
*/ |
217
|
96 |
View Code Duplication |
public function close() |
|
|
|
|
218
|
|
|
{ |
219
|
96 |
|
$this->disableHeartbeat(); |
220
|
96 |
|
if (is_resource($this->sock)) { |
221
|
96 |
|
socket_close($this->sock); |
222
|
48 |
|
} |
223
|
96 |
|
$this->sock = null; |
224
|
96 |
|
$this->last_read = null; |
225
|
96 |
|
$this->last_write = null; |
226
|
96 |
|
} |
227
|
|
|
|
228
|
|
|
/** |
229
|
|
|
* @inheritdoc |
230
|
|
|
*/ |
231
|
102 |
|
protected function do_select($sec, $usec) |
232
|
|
|
{ |
233
|
102 |
|
$read = array($this->sock); |
234
|
102 |
|
$write = null; |
235
|
102 |
|
$except = null; |
236
|
|
|
|
237
|
102 |
|
return socket_select($read, $write, $except, $sec, $usec); |
238
|
|
|
} |
239
|
|
|
|
240
|
|
|
/** |
241
|
|
|
* @return int|bool |
242
|
|
|
*/ |
243
|
|
|
protected function select_write() |
244
|
|
|
{ |
245
|
|
|
$read = $except = null; |
246
|
|
|
$write = array($this->sock); |
247
|
|
|
|
248
|
|
|
return socket_select($read, $write, $except, 0, 100000); |
249
|
|
|
} |
250
|
|
|
|
251
|
|
|
/** |
252
|
|
|
* @throws \PhpAmqpLib\Exception\AMQPIOException |
253
|
|
|
*/ |
254
|
6 |
|
protected function enable_keepalive() |
255
|
|
|
{ |
256
|
6 |
|
if (!defined('SOL_SOCKET') || !defined('SO_KEEPALIVE')) { |
257
|
|
|
throw new AMQPIOException('Can not enable keepalive: SOL_SOCKET or SO_KEEPALIVE is not defined'); |
258
|
|
|
} |
259
|
|
|
|
260
|
6 |
|
socket_set_option($this->sock, SOL_SOCKET, SO_KEEPALIVE, 1); |
261
|
6 |
|
} |
262
|
|
|
|
263
|
|
|
/** |
264
|
|
|
* @inheritdoc |
265
|
|
|
*/ |
266
|
18 |
|
public function error_handler($errno, $errstr, $errfile, $errline, $errcontext = null) |
267
|
|
|
{ |
268
|
|
|
// socket_select warning that it has been interrupted by a signal - EINTR |
269
|
18 |
|
if (false !== strrpos($errstr, socket_strerror(SOCKET_EINTR))) { |
270
|
|
|
// it's allowed while processing signals |
271
|
12 |
|
return; |
272
|
|
|
} |
273
|
|
|
|
274
|
6 |
|
parent::error_handler($errno, $errstr, $errfile, $errline, $errcontext); |
275
|
6 |
|
} |
276
|
|
|
|
277
|
|
|
/** |
278
|
|
|
* @inheritdoc |
279
|
|
|
*/ |
280
|
114 |
|
protected function set_error_handler() |
281
|
|
|
{ |
282
|
114 |
|
parent::set_error_handler(); |
283
|
114 |
|
socket_clear_error($this->sock); |
284
|
114 |
|
} |
285
|
|
|
} |
286
|
|
|
|
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.
You can also find more detailed suggestions in the “Code” section of your repository.