AsyncSelector::popSocketsByResources()   B
last analyzed

Complexity

Conditions 6
Paths 6

Size

Total Lines 20
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 13
CRAP Score 6

Importance

Changes 0
Metric Value
dl 0
loc 20
ccs 13
cts 13
cp 1
rs 8.8571
c 0
b 0
f 0
cc 6
eloc 11
nc 6
nop 3
crap 6
1
<?php
2
/**
3
 * Async sockets
4
 *
5
 * @copyright Copyright (c) 2015-2017, Efimov Evgenij <[email protected]>
6
 *
7
 * This source file is subject to the MIT license that is bundled
8
 * with this source code in the file LICENSE.
9
 */
10
11
namespace AsyncSockets\Socket;
12
13
use AsyncSockets\Exception\SocketException;
14
use AsyncSockets\Exception\TimeoutException;
15
use AsyncSockets\Operation\OperationInterface;
16
17
/**
18
 * Class AsyncSelector
19
 */
20
class AsyncSelector
21
{
22
    /**
23
     * Delay in microseconds between select attempts, if previous stream_select returned incorrect result
24
     * @link https://bugs.php.net/bug.php?id=65137
25
     */
26
    const ATTEMPT_DELAY = 250000;
27
28
    /**
29
     * Attempt count to use when time out is not set
30
     */
31
    const ATTEMPT_COUNT_FOR_INFINITE_TIMEOUT = 10;
32
33
    /**
34
     * Array of resources indexed by operation
35
     *
36
     * @var StreamResourceInterface[][]
37
     */
38
    private $streamResources = [];
39
40
    /**
41
     * Wait socket resources for network operation
42
     *
43
     * @param int $seconds Number of seconds to wait
44
     * @param int $usec Number of microseconds to add
45
     *
46
     * @return SelectContext
47
     * @throws TimeoutException If operation was interrupted during timeout
48
     * @throws SocketException If network operation failed
49
     * @throws \InvalidArgumentException If there is no socket in the list
50
     */
51 85
    public function select($seconds, $usec = null)
52
    {
53 85
        if (!$this->streamResources) {
54 5
            throw new \InvalidArgumentException('Can not perform select on empty data');
55
        }
56
57 80
        $read     = $this->getSocketsForOperation(OperationInterface::OPERATION_READ);
58 80
        $write    = $this->getSocketsForOperation(OperationInterface::OPERATION_WRITE);
59 80
        $attempts = $this->calculateAttemptsCount($seconds, $usec);
60
61
        do {
62 80
            $this->doStreamSelect($seconds, $usec, $read, $write, $oob);
63
64 68
            $context = $this->extractContext((array) $read, (array) $write, (array) $oob);
65 68
            if ($context) {
66 64
                $this->streamResources = [];
67 64
                return $context;
68
            }
69
70 4
            $attempts -= 1;
71 4
            if ($attempts) {
72 4
                usleep(self::ATTEMPT_DELAY);
73 4
            }
74 4
        } while ($attempts);
75
76 4
        throw new TimeoutException('Select operation was interrupted during timeout');
77
    }
78
79
    /**
80
     * Return socket objects for operations
81
     *
82
     * @param string $operation One of OperationInterface::OPERATION_* consts
83
     *
84
     * @return resource[]|null List of socket resource
85
     */
86 80
    private function getSocketsForOperation($operation)
87
    {
88 80
        if (!isset($this->streamResources[$operation])) {
89 80
            return null;
90
        }
91
92 80
        $result = [];
93 80
        foreach ($this->streamResources[$operation] as $socket) {
94
            /** @var StreamResourceInterface $socket */
95 80
            $result[] = $socket->getStreamResource();
96 80
        }
97
98 80
        return $result ?: null;
99
    }
100
101
    /**
102
     * Calculate amount of attempts for select operation
103
     *
104
     * @param int|null $seconds Amount of seconds
105
     * @param int|null $usec Amount of microseconds
106
     *
107
     * @return int
108
     */
109 80
    private function calculateAttemptsCount($seconds, $usec)
110
    {
111 80
        $result = $seconds !== null ? ceil(($seconds * 1E6 + $usec) / self::ATTEMPT_DELAY) :
112 80
            self::ATTEMPT_COUNT_FOR_INFINITE_TIMEOUT;
113 80
        if ($result < self::ATTEMPT_COUNT_FOR_INFINITE_TIMEOUT) {
114 34
            $result = self::ATTEMPT_COUNT_FOR_INFINITE_TIMEOUT;
115 34
        }
116
117 80
        return $result;
118
    }
119
120
    /**
121
     * Make stream_select call
122
     *
123
     * @param int        $seconds Amount of seconds to wait
124
     * @param int        $usec Amount of microseconds to add to $seconds
125
     * @param resource[] &$read List of sockets to check for read. After function return it will be filled with
126
     *      sockets, which are ready to read
127
     * @param resource[] &$write List of sockets to check for write. After function return it will be filled with
128
     *      sockets, which are ready to write
129
     * @param resource[] &$oob After call it will be filled with sockets having OOB data, input value is ignored
130
     *
131
     * @return int Amount of sockets ready for I/O
132
     */
133 80
    private function doStreamSelect(
134
        $seconds,
135
        $usec = null,
136
        array &$read = null,
137
        array &$write = null,
138
        array &$oob = null
139
    ) {
140 80
        $oob    = array_merge((array) $read, (array) $write);
141 80
        $result = stream_select($read, $write, $oob, $seconds, $usec);
142 80
        if ($result === false) {
143 2
            throw new SocketException('Failed to select sockets');
144
        }
145
146 78
        $result = count($read) + count($write) + count($oob);
147 78
        if ($result === 0) {
148 12
            throw new TimeoutException('Select operation was interrupted during timeout');
149
        }
150
151 68
        return $result;
152
    }
153
154
    /**
155
     * Extract context from given lists of resources
156
     *
157
     * @param resource[] $read Read-ready resources
158
     * @param resource[] $write Write-ready resources
159
     * @param resource[] $oob Oob-ready resources
160
     *
161
     * @return SelectContext|null
162
     */
163 68
    private function extractContext(array $read, array $write, array $oob)
164
    {
165 68
        $readyRead  = $this->popSocketsByResources($read, OperationInterface::OPERATION_READ, false);
166 68
        $readyWrite = $this->popSocketsByResources($write, OperationInterface::OPERATION_WRITE, false);
167 68
        $readyOob   = array_merge(
168 68
            $this->popSocketsByResources($oob, OperationInterface::OPERATION_READ, true),
169 68
            $this->popSocketsByResources($oob, OperationInterface::OPERATION_WRITE, true)
170 68
        );
171
172 68
        if ($readyRead || $readyWrite || $readyOob) {
173 64
            return new SelectContext($readyRead, $readyWrite, $readyOob);
174
        }
175
176 4
        return null;
177
    }
178
179
    /**
180
     * Get socket objects by resources and remove them from work list
181
     *
182
     * @param resource[] $resources Stream resources
183
     * @param string     $operation One of OperationInterface::OPERATION_* consts
184
     * @param bool       $isOutOfBand Is it OOB operation
185
     *
186
     * @return StreamResourceInterface[]
187
     */
188 68
    private function popSocketsByResources(array $resources, $operation, $isOutOfBand)
189
    {
190 68
        if (!$resources || !isset($this->streamResources[$operation])) {
191 68
            return [];
192
        }
193
194 68
        $result = [];
195 68
        foreach ($this->streamResources[$operation] as $socket) {
196
            /** @var StreamResourceInterface $socket */
197 68
            $socketResource = $socket->getStreamResource();
198 68
            $isReadySocket  = in_array($socketResource, $resources, true) &&
199 68
                                 $this->isActuallyReadyForIo($socketResource, $operation, $isOutOfBand);
200
201 68
            if ($isReadySocket) {
202 64
                $result[] = $socket;
203 64
            }
204 68
        }
205
206 68
        return $result;
207
    }
208
209
    /**
210
     * Checks whether given socket can process I/O operation after stream_select return
211
     *
212
     * @param resource $stream Socket resource
213
     * @param string   $operation One of OperationInterface::OPERATION_* consts
214
     * @param bool     $isOutOfBand Is it OOB operation
215
     *
216
     * @return bool
217
     */
218 68
    private function isActuallyReadyForIo($stream, $operation, $isOutOfBand)
219
    {
220
        /** map[isServer][operation][isOutOfBand] = result */
221 68
        $hasOobData = stream_socket_recvfrom($stream, 1, STREAM_PEEK | STREAM_OOB) !== false;
222
        $map = [
223
            0 => [
224
                // https://bugs.php.net/bug.php?id=65137
225 68
                OperationInterface::OPERATION_READ => [
226 68
                    0 => stream_socket_recvfrom($stream, 1, STREAM_PEEK) !== false,
227
                    1 => $hasOobData
228 68
                ],
229 68
                OperationInterface::OPERATION_WRITE => [
230 68
                    0 => true,
231
                    1 => $hasOobData
232 68
                ]
233 68
            ],
234
            1 => [
235 68
                OperationInterface::OPERATION_READ => [
236 68
                    0 => true,
237
                    1 => $hasOobData
238 68
                ],
239 68
                OperationInterface::OPERATION_WRITE => [
240 68
                    0 => true,
241
                    1 => $hasOobData
242 68
                ]
243 68
            ]
244 68
        ];
245
246 68
        $serverIdx = (int) (bool) $this->isSocketServer($stream);
247 68
        $oobIdx    = (int) (bool) $isOutOfBand;
248
249 68
        return $map[$serverIdx][$operation][$oobIdx];
250
    }
251
252
    /**
253
     * Check whether given resource is server socket
254
     *
255
     * @param resource $resource Resource to test
256
     *
257
     * @return bool
258
     */
259 68
    private function isSocketServer($resource)
260
    {
261 68
        return stream_socket_get_name($resource, false) &&
262 68
               !stream_socket_get_name($resource, true);
263
    }
264
265
    /**
266
     * Add array of socket with specified operation
267
     *
268
     * @param StreamResourceInterface[] $streamResources List of resources. Value depends on second argument.
269
     *                                     If string is provided, then it must be array of StreamResourceInterface.
270
     *                                     If $operation parameter is omitted then this argument must contain
271
     *                                     pairs [StreamResourceInterface, operation] for each element
272
     * @param string                    $operation Operation, one of OperationInterface::OPERATION_* consts
273
     *
274
     * @return void
275
     * @throws \InvalidArgumentException
276
     */
277 8
    public function addSocketOperationArray(array $streamResources, $operation = null)
278
    {
279 8
        foreach ($streamResources as $streamResource) {
280 8
            if ($operation !== null) {
281 2
                $this->addSocketOperation($streamResource, $operation);
282 2
            } else {
283 6
                if (!is_array($streamResource) || count($streamResource) !== 2) {
284 2
                    throw new \InvalidArgumentException(
285
                        'First parameter must contain pair (SocketInterface, operation)'
286 2
                    );
287
                }
288
289 4
                $this->addSocketOperation(reset($streamResource), end($streamResource));
290
            }
291 6
        }
292 6
    }
293
294
    /**
295
     * Add socket into selector list
296
     *
297
     * @param StreamResourceInterface $streamResource Resource object
298
     * @param string                  $operation One of OperationInterface::OPERATION_* consts
299
     *
300
     * @return void
301
     */
302 84
    public function addSocketOperation(StreamResourceInterface $streamResource, $operation)
303
    {
304 84
        $this->streamResources[$operation][spl_object_hash($streamResource)] = $streamResource;
305 84
    }
306
307
    /**
308
     * Remove all previously defined operations on this socket and adds socket into list of given operation
309
     *
310
     * @param StreamResourceInterface $streamResource Stream resource object
311
     * @param string                  $operation One of OperationInterface::OPERATION_* consts
312
     *
313
     * @return void
314
     */
315 2
    public function changeSocketOperation(StreamResourceInterface $streamResource, $operation)
316
    {
317 2
        $this->removeAllSocketOperations($streamResource);
318
319 2
        $this->addSocketOperation($streamResource, $operation);
320 2
    }
321
322
    /**
323
     * Remove given socket from all operations
324
     *
325
     * @param StreamResourceInterface $streamResource Resource object
326
     *
327
     * @return void
328
     */
329 38
    public function removeAllSocketOperations(StreamResourceInterface $streamResource)
330
    {
331 38
        $opList = [ OperationInterface::OPERATION_READ,
332 38
                    OperationInterface::OPERATION_WRITE  ];
333
334 38
        foreach ($opList as $op) {
335 38
            $this->removeSocketOperation($streamResource, $op);
336 38
        }
337 38
    }
338
339
    /**
340
     * Remove given socket from select list
341
     *
342
     * @param StreamResourceInterface $streamResource Stream resource object
343
     * @param string                  $operation One of OperationInterface::OPERATION_* consts
344
     *
345
     * @return void
346
     */
347 40
    public function removeSocketOperation(StreamResourceInterface $streamResource, $operation)
348
    {
349 40
        $hash = spl_object_hash($streamResource);
350 40
        if (isset($this->streamResources[$operation], $this->streamResources[$operation][$hash])) {
351 12
            unset($this->streamResources[$operation][$hash]);
352 12
            if (!$this->streamResources[$operation]) {
353 12
                unset($this->streamResources[$operation]);
354 12
            }
355 12
        }
356 40
    }
357
}
358