1
|
|
|
<?php |
2
|
|
|
/** |
3
|
|
|
* Async sockets |
4
|
|
|
* |
5
|
|
|
* @copyright Copyright (c) 2015-2016, Efimov Evgenij <[email protected]> |
6
|
|
|
* |
7
|
|
|
* This source file is subject to the MIT license that is bundled |
8
|
|
|
* with this source code in the file LICENSE. |
9
|
|
|
*/ |
10
|
|
|
|
11
|
|
|
namespace AsyncSockets\Socket; |
12
|
|
|
|
13
|
|
|
use AsyncSockets\Exception\SocketException; |
14
|
|
|
use AsyncSockets\Exception\TimeoutException; |
15
|
|
|
use AsyncSockets\Operation\OperationInterface; |
16
|
|
|
|
17
|
|
|
/** |
18
|
|
|
* Class AsyncSelector |
19
|
|
|
*/ |
20
|
|
|
class AsyncSelector |
21
|
|
|
{ |
22
|
|
|
/** |
23
|
|
|
* Delay in microseconds between select attempts, if previous stream_select returned incorrect result |
24
|
|
|
* @link https://bugs.php.net/bug.php?id=65137 |
25
|
|
|
*/ |
26
|
|
|
const ATTEMPT_DELAY = 250000; |
27
|
|
|
|
28
|
|
|
/** |
29
|
|
|
* Attempt count to use when time out is not set |
30
|
|
|
*/ |
31
|
|
|
const ATTEMPT_COUNT_FOR_INFINITE_TIMEOUT = 10; |
32
|
|
|
|
33
|
|
|
/** |
34
|
|
|
* Array of resources indexed by operation |
35
|
|
|
* |
36
|
|
|
* @var StreamResourceInterface[][] |
37
|
|
|
*/ |
38
|
|
|
private $streamResources = []; |
39
|
|
|
|
40
|
|
|
/** |
41
|
|
|
* Wait socket resources for network operation |
42
|
|
|
* |
43
|
|
|
* @param int $seconds Number of seconds to wait |
44
|
|
|
* @param int $usec Number of microseconds to add |
45
|
|
|
* |
46
|
|
|
* @return SelectContext |
47
|
|
|
* @throws TimeoutException If operation was interrupted during timeout |
48
|
|
|
* @throws SocketException If network operation failed |
49
|
|
|
* @throws \InvalidArgumentException If there is no socket in the list |
50
|
|
|
*/ |
51
|
67 |
|
public function select($seconds, $usec = null) |
52
|
|
|
{ |
53
|
67 |
|
if (!$this->streamResources) { |
54
|
5 |
|
throw new \InvalidArgumentException('Can not perform select on empty data'); |
55
|
|
|
} |
56
|
|
|
|
57
|
62 |
|
$read = $this->getSocketsForOperation(OperationInterface::OPERATION_READ); |
58
|
62 |
|
$write = $this->getSocketsForOperation(OperationInterface::OPERATION_WRITE); |
59
|
62 |
|
$attempts = $this->calculateAttemptsCount($seconds, $usec); |
60
|
|
|
|
61
|
|
|
do { |
62
|
62 |
|
$this->doStreamSelect($seconds, $usec, $read, $write, $oob); |
63
|
|
|
|
64
|
52 |
|
$readyRead = $this->popSocketsByResources((array) $read, OperationInterface::OPERATION_READ, false); |
65
|
52 |
|
$readyWrite = $this->popSocketsByResources((array) $write, OperationInterface::OPERATION_WRITE, false); |
66
|
|
|
$readyOob = array_merge( |
67
|
52 |
|
$this->popSocketsByResources((array) $oob, OperationInterface::OPERATION_READ, true), |
68
|
51 |
|
$this->popSocketsByResources((array) $oob, OperationInterface::OPERATION_WRITE, true) |
69
|
51 |
|
); |
70
|
|
|
|
71
|
|
|
if ($readyRead || $readyWrite || $readyOob) { |
|
|
|
|
72
|
1 |
|
$this->streamResources = []; |
73
|
1 |
|
return new SelectContext($readyRead, $readyWrite, $readyOob); |
74
|
1 |
|
} |
75
|
1 |
|
|
76
|
1 |
|
$attempts -= 1; |
77
|
|
|
if ($attempts) { |
78
|
1 |
|
usleep(self::ATTEMPT_DELAY); |
79
|
|
|
} |
80
|
|
|
} while ($attempts); |
81
|
|
|
|
82
|
|
|
throw new TimeoutException('Select operation was interrupted during timeout'); |
83
|
|
|
} |
84
|
|
|
|
85
|
|
|
/** |
86
|
|
|
* Add socket into selector list |
87
|
|
|
* |
88
|
|
|
* @param StreamResourceInterface $streamResource Resource object |
89
|
66 |
|
* @param string $operation One of OperationInterface::OPERATION_* consts |
90
|
|
|
* |
91
|
66 |
|
* @return void |
92
|
66 |
|
*/ |
93
|
|
|
public function addSocketOperation(StreamResourceInterface $streamResource, $operation) |
94
|
|
|
{ |
95
|
|
|
$this->streamResources[$operation][spl_object_hash($streamResource)] = $streamResource; |
96
|
|
|
} |
97
|
|
|
|
98
|
|
|
/** |
99
|
|
|
* Add array of socket with specified operation |
100
|
|
|
* |
101
|
|
|
* @param StreamResourceInterface[] $streamResources List of resources. Value depends on second argument. |
102
|
|
|
* If string is provided, then it must be array of StreamResourceInterface. |
103
|
|
|
* If $operation parameter is omitted then this argument must contain |
104
|
|
|
* pairs [StreamResourceInterface, operation] for each element |
105
|
|
|
* @param string $operation Operation, one of OperationInterface::OPERATION_* consts |
106
|
8 |
|
* |
107
|
|
|
* @return void |
108
|
8 |
|
* @throws \InvalidArgumentException |
109
|
8 |
|
*/ |
110
|
2 |
|
public function addSocketOperationArray(array $streamResources, $operation = null) |
111
|
2 |
|
{ |
112
|
6 |
|
foreach ($streamResources as $streamResource) { |
113
|
2 |
|
if ($operation !== null) { |
114
|
|
|
$this->addSocketOperation($streamResource, $operation); |
115
|
2 |
|
} else { |
116
|
|
|
if (!is_array($streamResource) || count($streamResource) !== 2) { |
117
|
|
|
throw new \InvalidArgumentException( |
118
|
4 |
|
'First parameter must contain pair (SocketInterface, operation)' |
119
|
|
|
); |
120
|
6 |
|
} |
121
|
6 |
|
|
122
|
|
|
$this->addSocketOperation(reset($streamResource), end($streamResource)); |
123
|
|
|
} |
124
|
|
|
} |
125
|
|
|
} |
126
|
|
|
|
127
|
|
|
/** |
128
|
|
|
* Remove given socket from select list |
129
|
|
|
* |
130
|
|
|
* @param StreamResourceInterface $streamResource Stream resource object |
131
|
59 |
|
* @param string $operation One of OperationInterface::OPERATION_* consts |
132
|
|
|
* |
133
|
59 |
|
* @return void |
134
|
59 |
|
*/ |
135
|
59 |
|
public function removeSocketOperation(StreamResourceInterface $streamResource, $operation) |
136
|
59 |
|
{ |
137
|
59 |
|
$hash = spl_object_hash($streamResource); |
138
|
59 |
|
if (isset($this->streamResources[$operation], $this->streamResources[$operation][$hash])) { |
139
|
59 |
|
unset($this->streamResources[$operation][$hash]); |
140
|
59 |
|
if (!$this->streamResources[$operation]) { |
141
|
|
|
unset($this->streamResources[$operation]); |
142
|
|
|
} |
143
|
|
|
} |
144
|
|
|
} |
145
|
|
|
|
146
|
|
|
/** |
147
|
|
|
* Remove all previously defined operations on this socket and adds socket into list of given operation |
148
|
|
|
* |
149
|
|
|
* @param StreamResourceInterface $streamResource Stream resource object |
150
|
2 |
|
* @param string $operation One of OperationInterface::OPERATION_* consts |
151
|
|
|
* |
152
|
2 |
|
* @return void |
153
|
|
|
*/ |
154
|
2 |
|
public function changeSocketOperation(StreamResourceInterface $streamResource, $operation) |
155
|
2 |
|
{ |
156
|
|
|
$this->removeAllSocketOperations($streamResource); |
157
|
|
|
|
158
|
|
|
$this->addSocketOperation($streamResource, $operation); |
159
|
|
|
} |
160
|
|
|
|
161
|
|
|
/** |
162
|
|
|
* Return socket objects for operations |
163
|
|
|
* |
164
|
62 |
|
* @param string $operation One of OperationInterface::OPERATION_* consts |
165
|
|
|
* |
166
|
62 |
|
* @return resource[]|null List of socket resource |
167
|
62 |
|
*/ |
168
|
|
|
private function getSocketsForOperation($operation) |
169
|
|
|
{ |
170
|
62 |
|
if (!isset($this->streamResources[$operation])) { |
171
|
62 |
|
return null; |
172
|
|
|
} |
173
|
62 |
|
|
174
|
62 |
|
$result = []; |
175
|
|
|
foreach ($this->streamResources[$operation] as $socket) { |
176
|
62 |
|
/** @var StreamResourceInterface $socket */ |
177
|
|
|
$result[] = $socket->getStreamResource(); |
178
|
|
|
} |
179
|
|
|
|
180
|
|
|
return $result ?: null; |
181
|
|
|
} |
182
|
|
|
|
183
|
|
|
/** |
184
|
|
|
* Get socket objects by resources and remove them from work list |
185
|
|
|
* |
186
|
|
|
* @param resource[] $resources Stream resources |
187
|
52 |
|
* @param string $operation One of OperationInterface::OPERATION_* consts |
188
|
|
|
* @param bool $isOutOfBand Is it OOB operation |
189
|
52 |
|
* |
190
|
52 |
|
* @return StreamResourceInterface[] |
191
|
|
|
*/ |
192
|
|
|
private function popSocketsByResources(array $resources, $operation, $isOutOfBand) |
193
|
52 |
|
{ |
194
|
52 |
|
if (!$resources || !isset($this->streamResources[$operation])) { |
195
|
|
|
return []; |
196
|
52 |
|
} |
197
|
52 |
|
|
198
|
52 |
|
$result = []; |
199
|
|
|
foreach ($this->streamResources[$operation] as $socket) { |
200
|
52 |
|
/** @var StreamResourceInterface $socket */ |
201
|
51 |
|
$socketResource = $socket->getStreamResource(); |
202
|
51 |
|
$isReadySocket = in_array($socketResource, $resources, true) && |
203
|
51 |
|
$this->isActuallyReadyForIo($socketResource, $operation, $isOutOfBand); |
204
|
52 |
|
|
205
|
|
|
if ($isReadySocket) { |
206
|
52 |
|
//$this->removeSocketOperation($socket, $operation); |
|
|
|
|
207
|
|
|
$result[] = $socket; |
208
|
|
|
} |
209
|
|
|
} |
210
|
|
|
|
211
|
|
|
return $result; |
212
|
|
|
} |
213
|
|
|
|
214
|
|
|
/** |
215
|
|
|
* Checks whether given socket can process I/O operation after stream_select return |
216
|
|
|
* |
217
|
52 |
|
* @param resource $stream Socket resource |
218
|
|
|
* @param string $operation One of OperationInterface::OPERATION_* consts |
219
|
52 |
|
* @param bool $isOutOfBand Is it OOB operation |
220
|
51 |
|
* |
221
|
|
|
* @return bool |
222
|
|
|
*/ |
223
|
27 |
|
private function isActuallyReadyForIo($stream, $operation, $isOutOfBand) |
224
|
51 |
|
{ |
225
|
|
|
return $this->isSocketServer($stream) || ( |
226
|
52 |
|
$operation === OperationInterface::OPERATION_READ && |
227
|
|
|
|
228
|
|
|
// https://bugs.php.net/bug.php?id=65137 |
229
|
|
|
( |
230
|
|
|
(!$isOutOfBand && stream_socket_recvfrom($stream, 1, STREAM_PEEK) !== false) || |
231
|
|
|
($isOutOfBand && stream_socket_recvfrom($stream, 1, STREAM_PEEK | STREAM_OOB) !== false) |
232
|
|
|
) |
233
|
|
|
) || ( |
234
|
|
|
$operation === OperationInterface::OPERATION_WRITE |
235
|
|
|
); |
236
|
35 |
|
} |
237
|
|
|
|
238
|
35 |
|
/** |
239
|
35 |
|
* Remove given socket from all operations |
240
|
|
|
* |
241
|
35 |
|
* @param StreamResourceInterface $streamResource Resource object |
242
|
35 |
|
* |
243
|
35 |
|
* @return void |
244
|
35 |
|
*/ |
245
|
|
|
public function removeAllSocketOperations(StreamResourceInterface $streamResource) |
246
|
|
|
{ |
247
|
|
|
$opList = [ OperationInterface::OPERATION_READ, |
248
|
|
|
OperationInterface::OPERATION_WRITE ]; |
249
|
|
|
|
250
|
|
|
foreach ($opList as $op) { |
251
|
|
|
$this->removeSocketOperation($streamResource, $op); |
252
|
|
|
} |
253
|
|
|
} |
254
|
|
|
|
255
|
|
|
/** |
256
|
|
|
* Make stream_select call |
257
|
|
|
* |
258
|
|
|
* @param int $seconds Amount of seconds to wait |
259
|
62 |
|
* @param int $usec Amount of microseconds to add to $seconds |
260
|
|
|
* @param resource[] &$read List of sockets to check for read. After function return it will be filled with |
261
|
62 |
|
* sockets, which are ready to read |
262
|
62 |
|
* @param resource[] &$write List of sockets to check for write. After function return it will be filled with |
263
|
62 |
|
* sockets, which are ready to write |
264
|
2 |
|
* @param resource[] &$oob After call it will be filled with sockets having OOB data, input value is ignored |
265
|
|
|
* |
266
|
|
|
* @return int Amount of sockets ready for I/O |
267
|
60 |
|
*/ |
268
|
60 |
|
private function doStreamSelect( |
269
|
10 |
|
$seconds, |
270
|
|
|
$usec = null, |
271
|
|
|
array &$read = null, |
272
|
52 |
|
array &$write = null, |
273
|
|
|
array &$oob = null |
274
|
|
|
) { |
275
|
|
|
$oob = array_merge((array) $read, (array) $write); |
276
|
|
|
$result = stream_select($read, $write, $oob, $seconds, $usec); |
277
|
|
|
if ($result === false) { |
278
|
|
|
throw new SocketException('Failed to select sockets'); |
279
|
|
|
} |
280
|
|
|
|
281
|
|
|
$result = count($read) + count($write) + count($oob); |
282
|
|
|
if ($result === 0) { |
283
|
62 |
|
throw new TimeoutException('Select operation was interrupted during timeout'); |
284
|
|
|
} |
285
|
62 |
|
|
286
|
62 |
|
return $result; |
287
|
62 |
|
} |
288
|
21 |
|
|
289
|
21 |
|
/** |
290
|
|
|
* Calculate amount of attempts for select operation |
291
|
62 |
|
* |
292
|
|
|
* @param int|null $seconds Amount of seconds |
293
|
|
|
* @param int|null $usec Amount of microseconds |
294
|
|
|
* |
295
|
|
|
* @return int |
296
|
|
|
*/ |
297
|
|
|
private function calculateAttemptsCount($seconds, $usec) |
298
|
|
|
{ |
299
|
|
|
$result = $seconds !== null ? ceil(($seconds * 1E6 + $usec) / self::ATTEMPT_DELAY) : |
300
|
|
|
self::ATTEMPT_COUNT_FOR_INFINITE_TIMEOUT; |
301
|
52 |
|
if ($result < self::ATTEMPT_COUNT_FOR_INFINITE_TIMEOUT) { |
302
|
|
|
$result = self::ATTEMPT_COUNT_FOR_INFINITE_TIMEOUT; |
303
|
52 |
|
} |
304
|
52 |
|
|
305
|
|
|
return $result; |
306
|
|
|
} |
307
|
|
|
|
308
|
|
|
/** |
309
|
|
|
* Check whether given resource is server socket |
310
|
|
|
* |
311
|
|
|
* @param resource $resource Resource to test |
312
|
|
|
* |
313
|
|
|
* @return bool |
314
|
|
|
*/ |
315
|
|
|
private function isSocketServer($resource) |
316
|
|
|
{ |
317
|
|
|
return stream_socket_get_name($resource, false) && |
318
|
|
|
!stream_socket_get_name($resource, true); |
319
|
|
|
} |
320
|
|
|
} |
321
|
|
|
|
This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.
Consider making the comparison explicit by using
empty(..)
or! empty(...)
instead.