1
|
|
|
<?php |
2
|
|
|
namespace PhpAmqpLib\Wire\IO; |
3
|
|
|
|
4
|
|
|
use PhpAmqpLib\Exception\AMQPIOException; |
5
|
|
|
use PhpAmqpLib\Exception\AMQPRuntimeException; |
6
|
|
|
use PhpAmqpLib\Helper\MiscHelper; |
7
|
|
|
use PhpAmqpLib\Wire\AMQPWriter; |
8
|
|
|
|
9
|
|
|
class SocketIO extends AbstractIO |
10
|
|
|
{ |
11
|
|
|
/** @var string */ |
12
|
|
|
protected $host; |
13
|
|
|
|
14
|
|
|
/** @var int */ |
15
|
|
|
protected $port; |
16
|
|
|
|
17
|
|
|
/** @var float */ |
18
|
|
|
protected $send_timeout; |
19
|
|
|
|
20
|
|
|
/** @var float */ |
21
|
|
|
protected $read_timeout; |
22
|
|
|
|
23
|
|
|
/** @var int */ |
24
|
|
|
protected $heartbeat; |
25
|
|
|
|
26
|
|
|
/** @var float */ |
27
|
|
|
protected $last_read; |
28
|
|
|
|
29
|
|
|
/** @var float */ |
30
|
|
|
protected $last_write; |
31
|
|
|
|
32
|
|
|
/** @var resource */ |
33
|
|
|
private $sock; |
34
|
|
|
|
35
|
|
|
/** @var bool */ |
36
|
|
|
private $keepalive; |
37
|
|
|
|
38
|
|
|
/** |
39
|
|
|
* @param string $host |
40
|
|
|
* @param int $port |
41
|
|
|
* @param float $read_timeout |
42
|
|
|
* @param bool $keepalive |
43
|
|
|
* @param float|null $write_timeout if null defaults to read timeout |
44
|
|
|
* @param int $heartbeat how often to send heartbeat. 0 means off |
45
|
|
|
*/ |
46
|
66 |
|
public function __construct($host, $port, $read_timeout, $keepalive = false, $write_timeout = null, $heartbeat = 0) |
47
|
|
|
{ |
48
|
66 |
|
$this->host = $host; |
49
|
66 |
|
$this->port = $port; |
50
|
66 |
|
$this->read_timeout = $read_timeout; |
51
|
66 |
|
$this->send_timeout = $write_timeout ?: $read_timeout; |
52
|
66 |
|
$this->heartbeat = $heartbeat; |
53
|
66 |
|
$this->keepalive = $keepalive; |
54
|
66 |
|
} |
55
|
|
|
|
56
|
|
|
/** |
57
|
|
|
* Sets up the socket connection |
58
|
|
|
* |
59
|
|
|
* @throws \Exception |
60
|
|
|
*/ |
61
|
66 |
|
public function connect() |
62
|
|
|
{ |
63
|
66 |
|
$this->sock = socket_create(AF_INET, SOCK_STREAM, SOL_TCP); |
64
|
|
|
|
65
|
66 |
|
list($sec, $uSec) = MiscHelper::splitSecondsMicroseconds($this->send_timeout); |
66
|
66 |
|
socket_set_option($this->sock, SOL_SOCKET, SO_SNDTIMEO, array('sec' => $sec, 'usec' => $uSec)); |
67
|
66 |
|
list($sec, $uSec) = MiscHelper::splitSecondsMicroseconds($this->read_timeout); |
68
|
66 |
|
socket_set_option($this->sock, SOL_SOCKET, SO_RCVTIMEO, array('sec' => $sec, 'usec' => $uSec)); |
69
|
|
|
|
70
|
66 |
|
if (!socket_connect($this->sock, $this->host, $this->port)) { |
71
|
6 |
|
$errno = socket_last_error($this->sock); |
72
|
6 |
|
$errstr = socket_strerror($errno); |
73
|
6 |
|
throw new AMQPIOException(sprintf( |
74
|
6 |
|
'Error Connecting to server (%s): %s', |
75
|
4 |
|
$errno, |
76
|
2 |
|
$errstr |
77
|
6 |
|
), $errno); |
78
|
|
|
} |
79
|
|
|
|
80
|
60 |
|
socket_set_block($this->sock); |
81
|
60 |
|
socket_set_option($this->sock, SOL_TCP, TCP_NODELAY, 1); |
82
|
|
|
|
83
|
60 |
|
if ($this->keepalive) { |
84
|
6 |
|
$this->enable_keepalive(); |
85
|
4 |
|
} |
86
|
60 |
|
} |
87
|
|
|
|
88
|
|
|
/** |
89
|
|
|
* @return resource |
90
|
|
|
*/ |
91
|
|
|
public function getSocket() |
92
|
|
|
{ |
93
|
|
|
return $this->sock; |
94
|
|
|
} |
95
|
|
|
|
96
|
|
|
/** |
97
|
|
|
* Reconnects the socket |
98
|
|
|
*/ |
99
|
24 |
|
public function reconnect() |
100
|
|
|
{ |
101
|
24 |
|
$this->close(); |
102
|
24 |
|
$this->connect(); |
103
|
24 |
|
} |
104
|
|
|
|
105
|
|
|
/** |
106
|
|
|
* @param int $n |
107
|
|
|
* @return mixed|string |
108
|
|
|
* @throws \PhpAmqpLib\Exception\AMQPIOException |
109
|
|
|
* @throws \PhpAmqpLib\Exception\AMQPRuntimeException |
110
|
|
|
*/ |
111
|
60 |
|
public function read($n) |
112
|
|
|
{ |
113
|
60 |
|
if (is_null($this->sock)) { |
114
|
6 |
|
throw new AMQPRuntimeException(sprintf( |
115
|
6 |
|
'Socket was null! Last SocketError was: %s', |
116
|
6 |
|
socket_strerror(socket_last_error()) |
117
|
4 |
|
)); |
118
|
|
|
} |
119
|
54 |
|
$res = ''; |
120
|
54 |
|
$read = 0; |
121
|
54 |
|
$buf = socket_read($this->sock, $n); |
122
|
54 |
|
while ($read < $n && $buf !== '' && $buf !== false) { |
123
|
54 |
|
$this->check_heartbeat(); |
124
|
|
|
|
125
|
54 |
|
$read += mb_strlen($buf, 'ASCII'); |
126
|
54 |
|
$res .= $buf; |
127
|
54 |
|
$buf = socket_read($this->sock, $n - $read); |
128
|
36 |
|
} |
129
|
|
|
|
130
|
54 |
|
if (mb_strlen($res, 'ASCII') != $n) { |
131
|
|
|
throw new AMQPIOException(sprintf( |
132
|
|
|
'Error reading data. Received %s instead of expected %s bytes', |
133
|
|
|
mb_strlen($res, 'ASCII'), |
134
|
|
|
$n |
135
|
|
|
)); |
136
|
|
|
} |
137
|
|
|
|
138
|
54 |
|
$this->last_read = microtime(true); |
139
|
|
|
|
140
|
54 |
|
return $res; |
141
|
|
|
} |
142
|
|
|
|
143
|
|
|
/** |
144
|
|
|
* @param string $data |
145
|
|
|
* @return void |
146
|
|
|
* |
147
|
|
|
* @throws \PhpAmqpLib\Exception\AMQPIOException |
148
|
|
|
* @throws \PhpAmqpLib\Exception\AMQPRuntimeException |
149
|
|
|
*/ |
150
|
60 |
|
public function write($data) |
151
|
|
|
{ |
152
|
60 |
|
$len = mb_strlen($data, 'ASCII'); |
153
|
|
|
|
154
|
60 |
|
while (true) { |
155
|
|
|
// Null sockets are invalid, throw exception |
156
|
60 |
|
if (is_null($this->sock)) { |
157
|
6 |
|
throw new AMQPRuntimeException(sprintf( |
158
|
6 |
|
'Socket was null! Last SocketError was: %s', |
159
|
6 |
|
socket_strerror(socket_last_error()) |
160
|
4 |
|
)); |
161
|
|
|
} |
162
|
|
|
|
163
|
54 |
|
$sent = socket_write($this->sock, $data, $len); |
164
|
54 |
|
if ($sent === false) { |
165
|
|
|
throw new AMQPIOException(sprintf( |
166
|
|
|
'Error sending data. Last SocketError: %s', |
167
|
|
|
socket_strerror(socket_last_error()) |
168
|
|
|
)); |
169
|
|
|
} |
170
|
|
|
|
171
|
|
|
// Check if the entire message has been sent |
172
|
54 |
|
if ($sent < $len) { |
173
|
|
|
// If not sent the entire message. |
174
|
|
|
// Get the part of the message that has not yet been sent as message |
175
|
|
|
$data = mb_substr($data, $sent, mb_strlen($data, 'ASCII') - $sent, 'ASCII'); |
176
|
|
|
// Get the length of the not sent part |
177
|
|
|
$len -= $sent; |
178
|
|
|
} else { |
179
|
54 |
|
break; |
180
|
|
|
} |
181
|
|
|
} |
182
|
|
|
|
183
|
54 |
|
$this->last_write = microtime(true); |
184
|
54 |
|
} |
185
|
|
|
|
186
|
60 |
View Code Duplication |
public function close() |
|
|
|
|
187
|
|
|
{ |
188
|
60 |
|
if (is_resource($this->sock)) { |
189
|
60 |
|
socket_close($this->sock); |
190
|
40 |
|
} |
191
|
60 |
|
$this->sock = null; |
192
|
60 |
|
$this->last_read = null; |
193
|
60 |
|
$this->last_write = null; |
194
|
60 |
|
} |
195
|
|
|
|
196
|
|
|
/** |
197
|
|
|
* @param int $sec |
198
|
|
|
* @param int $usec |
199
|
|
|
* @return int|mixed |
200
|
|
|
*/ |
201
|
6 |
|
public function select($sec, $usec) |
202
|
|
|
{ |
203
|
6 |
|
$read = array($this->sock); |
204
|
6 |
|
$write = null; |
205
|
6 |
|
$except = null; |
206
|
|
|
|
207
|
6 |
|
return socket_select($read, $write, $except, $sec, $usec); |
208
|
|
|
} |
209
|
|
|
|
210
|
|
|
/** |
211
|
|
|
* @throws \PhpAmqpLib\Exception\AMQPIOException |
212
|
|
|
*/ |
213
|
6 |
|
protected function enable_keepalive() |
214
|
|
|
{ |
215
|
6 |
|
if (!defined('SOL_SOCKET') || !defined('SO_KEEPALIVE')) { |
216
|
|
|
throw new AMQPIOException('Can not enable keepalive: SOL_SOCKET or SO_KEEPALIVE is not defined'); |
217
|
|
|
} |
218
|
|
|
|
219
|
6 |
|
socket_set_option($this->sock, SOL_SOCKET, SO_KEEPALIVE, 1); |
220
|
6 |
|
} |
221
|
|
|
|
222
|
|
|
/** |
223
|
|
|
* Heartbeat logic: check connection health here |
224
|
|
|
*/ |
225
|
54 |
View Code Duplication |
protected function check_heartbeat() |
|
|
|
|
226
|
|
|
{ |
227
|
|
|
// ignore unless heartbeat interval is set |
228
|
54 |
|
if ($this->heartbeat !== 0 && $this->last_read && $this->last_write) { |
229
|
|
|
$t = microtime(true); |
230
|
|
|
$t_read = round($t - $this->last_read); |
231
|
|
|
$t_write = round($t - $this->last_write); |
232
|
|
|
|
233
|
|
|
// server has gone away |
234
|
|
|
if (($this->heartbeat * 2) < $t_read) { |
235
|
|
|
$this->reconnect(); |
236
|
|
|
} |
237
|
|
|
|
238
|
|
|
// time for client to send a heartbeat |
239
|
|
|
if (($this->heartbeat / 2) < $t_write) { |
240
|
|
|
$this->write_heartbeat(); |
241
|
|
|
} |
242
|
|
|
} |
243
|
54 |
|
} |
244
|
|
|
|
245
|
|
|
/** |
246
|
|
|
* Sends a heartbeat message |
247
|
|
|
*/ |
248
|
|
View Code Duplication |
protected function write_heartbeat() |
|
|
|
|
249
|
|
|
{ |
250
|
|
|
$pkt = new AMQPWriter(); |
251
|
|
|
$pkt->write_octet(8); |
252
|
|
|
$pkt->write_short(0); |
253
|
|
|
$pkt->write_long(0); |
254
|
|
|
$pkt->write_octet(0xCE); |
255
|
|
|
$this->write($pkt->getvalue()); |
256
|
|
|
} |
257
|
|
|
} |
258
|
|
|
|
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.
You can also find more detailed suggestions in the “Code” section of your repository.