Completed
Branch 0.4-dev (54e67b)
by Evgenij
02:52
created

AsyncSelector::select()   B

Complexity

Conditions 5
Paths 4

Size

Total Lines 27
Code Lines 17

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 17
CRAP Score 5

Importance

Changes 9
Bugs 0 Features 0
Metric Value
c 9
b 0
f 0
dl 0
loc 27
ccs 17
cts 17
cp 1
rs 8.439
cc 5
eloc 17
nc 4
nop 2
crap 5
1
<?php
2
/**
3
 * Async sockets
4
 *
5
 * @copyright Copyright (c) 2015-2016, Efimov Evgenij <[email protected]>
6
 *
7
 * This source file is subject to the MIT license that is bundled
8
 * with this source code in the file LICENSE.
9
 */
10
11
namespace AsyncSockets\Socket;
12
13
use AsyncSockets\Exception\SocketException;
14
use AsyncSockets\Exception\TimeoutException;
15
use AsyncSockets\Operation\OperationInterface;
16
17
/**
18
 * Class AsyncSelector
19
 */
20
class AsyncSelector
21
{
22
    /**
23
     * Delay in microseconds between select attempts, if previous stream_select returned incorrect result
24
     * @link https://bugs.php.net/bug.php?id=65137
25
     */
26
    const ATTEMPT_DELAY = 250000;
27
28
    /**
29
     * Attempt count to use when time out is not set
30
     */
31
    const ATTEMPT_COUNT_FOR_INFINITE_TIMEOUT = 10;
32
33
    /**
34
     * Array of resources indexed by operation
35
     *
36
     * @var StreamResourceInterface[][]
37
     */
38
    private $streamResources = [];
39
40
    /**
41
     * Wait socket resources for network operation
42
     *
43
     * @param int $seconds Number of seconds to wait
44
     * @param int $usec Number of microseconds to add
45
     *
46
     * @return SelectContext
47
     * @throws TimeoutException If operation was interrupted during timeout
48
     * @throws SocketException If network operation failed
49
     * @throws \InvalidArgumentException If there is no socket in the list
50
     */
51 71
    public function select($seconds, $usec = null)
52
    {
53 71
        if (!$this->streamResources) {
54 5
            throw new \InvalidArgumentException('Can not perform select on empty data');
55
        }
56
57 66
        $read     = $this->getSocketsForOperation(OperationInterface::OPERATION_READ);
58 66
        $write    = $this->getSocketsForOperation(OperationInterface::OPERATION_WRITE);
59 66
        $attempts = $this->calculateAttemptsCount($seconds, $usec);
60
61
        do {
62 66
            $this->doStreamSelect($seconds, $usec, $read, $write, $oob);
63
64 56
            $context = $this->extractContext((array) $read, (array) $write, (array) $oob);
65 56
            if ($context) {
66 55
                $this->streamResources = [];
67 55
                return $context;
68
            }
69
70 1
            $attempts -= 1;
71 1
            if ($attempts) {
72 1
                usleep(self::ATTEMPT_DELAY);
73 1
            }
74 1
        } while ($attempts);
75
76 1
        throw new TimeoutException('Select operation was interrupted during timeout');
77
    }
78
79
    /**
80
     * Add socket into selector list
81
     *
82
     * @param StreamResourceInterface $streamResource Resource object
83
     * @param string                  $operation One of OperationInterface::OPERATION_* consts
84
     *
85
     * @return void
86
     */
87 70
    public function addSocketOperation(StreamResourceInterface $streamResource, $operation)
88
    {
89 70
        $this->streamResources[$operation][spl_object_hash($streamResource)] = $streamResource;
90 70
    }
91
92
    /**
93
     * Add array of socket with specified operation
94
     *
95
     * @param StreamResourceInterface[] $streamResources List of resources. Value depends on second argument.
96
     *                                     If string is provided, then it must be array of StreamResourceInterface.
97
     *                                     If $operation parameter is omitted then this argument must contain
98
     *                                     pairs [StreamResourceInterface, operation] for each element
99
     * @param string                    $operation Operation, one of OperationInterface::OPERATION_* consts
100
     *
101
     * @return void
102
     * @throws \InvalidArgumentException
103
     */
104 8
    public function addSocketOperationArray(array $streamResources, $operation = null)
105
    {
106 8
        foreach ($streamResources as $streamResource) {
107 8
            if ($operation !== null) {
108 2
                $this->addSocketOperation($streamResource, $operation);
109 2
            } else {
110 6
                if (!is_array($streamResource) || count($streamResource) !== 2) {
111 2
                    throw new \InvalidArgumentException(
112
                        'First parameter must contain pair (SocketInterface, operation)'
113 2
                    );
114
                }
115
116 4
                $this->addSocketOperation(reset($streamResource), end($streamResource));
117
            }
118 6
        }
119 6
    }
120
121
    /**
122
     * Remove given socket from select list
123
     *
124
     * @param StreamResourceInterface $streamResource Stream resource object
125
     * @param string                  $operation One of OperationInterface::OPERATION_* consts
126
     *
127
     * @return void
128
     */
129 39
    public function removeSocketOperation(StreamResourceInterface $streamResource, $operation)
130
    {
131 39
        $hash = spl_object_hash($streamResource);
132 39
        if (isset($this->streamResources[$operation], $this->streamResources[$operation][$hash])) {
133 12
            unset($this->streamResources[$operation][$hash]);
134 12
            if (!$this->streamResources[$operation]) {
135 12
                unset($this->streamResources[$operation]);
136 12
            }
137 12
        }
138 39
    }
139
140
    /**
141
     * Remove all previously defined operations on this socket and adds socket into list of given operation
142
     *
143
     * @param StreamResourceInterface $streamResource Stream resource object
144
     * @param string                  $operation One of OperationInterface::OPERATION_* consts
145
     *
146
     * @return void
147
     */
148 2
    public function changeSocketOperation(StreamResourceInterface $streamResource, $operation)
149
    {
150 2
        $this->removeAllSocketOperations($streamResource);
151
152 2
        $this->addSocketOperation($streamResource, $operation);
153 2
    }
154
155
    /**
156
     * Return socket objects for operations
157
     *
158
     * @param string $operation One of OperationInterface::OPERATION_* consts
159
     *
160
     * @return resource[]|null List of socket resource
161
     */
162 66
    private function getSocketsForOperation($operation)
163
    {
164 66
        if (!isset($this->streamResources[$operation])) {
165 66
            return null;
166
        }
167
168 66
        $result = [];
169 66
        foreach ($this->streamResources[$operation] as $socket) {
170
            /** @var StreamResourceInterface $socket */
171 66
            $result[] = $socket->getStreamResource();
172 66
        }
173
174 66
        return $result ?: null;
175
    }
176
177
    /**
178
     * Get socket objects by resources and remove them from work list
179
     *
180
     * @param resource[] $resources Stream resources
181
     * @param string     $operation One of OperationInterface::OPERATION_* consts
182
     * @param bool       $isOutOfBand Is it OOB operation
183
     *
184
     * @return StreamResourceInterface[]
185
     */
186 56
    private function popSocketsByResources(array $resources, $operation, $isOutOfBand)
187
    {
188 56
        if (!$resources || !isset($this->streamResources[$operation])) {
189 56
            return [];
190
        }
191
192 56
        $result = [];
193 56
        foreach ($this->streamResources[$operation] as $socket) {
194
            /** @var StreamResourceInterface $socket */
195 56
            $socketResource = $socket->getStreamResource();
196 56
            $isReadySocket  = in_array($socketResource, $resources, true) &&
197 56
                                 $this->isActuallyReadyForIo($socketResource, $operation, $isOutOfBand);
198
199 56
            if ($isReadySocket) {
200 55
                $result[] = $socket;
201 55
            }
202 56
        }
203
204 56
        return $result;
205
    }
206
207
    /**
208
     * Extract context from given lists of resources
209
     *
210
     * @param resource[] $read Read-ready resources
211
     * @param resource[] $write Write-ready resources
212
     * @param resource[] $oob Oob-ready resources
213
     *
214
     * @return SelectContext|null
215
     */
216 56
    private function extractContext(array $read, array $write, array $oob)
217
    {
218 56
        $readyRead  = $this->popSocketsByResources($read, OperationInterface::OPERATION_READ, false);
219 56
        $readyWrite = $this->popSocketsByResources($write, OperationInterface::OPERATION_WRITE, false);
220 56
        $readyOob   = array_merge(
221 56
            $this->popSocketsByResources($oob, OperationInterface::OPERATION_READ, true),
222 56
            $this->popSocketsByResources($oob, OperationInterface::OPERATION_WRITE, true)
223 56
        );
224
225 56
        if ($readyRead || $readyWrite || $readyOob) {
226 55
            return new SelectContext($readyRead, $readyWrite, $readyOob);
227
        }
228
229 1
        return null;
230
    }
231
232
    /**
233
     * Checks whether given socket can process I/O operation after stream_select return
234
     *
235
     * @param resource $stream Socket resource
236
     * @param string   $operation One of OperationInterface::OPERATION_* consts
237
     * @param bool     $isOutOfBand Is it OOB operation
238
     *
239
     * @return bool
240
     */
241 56
    private function isActuallyReadyForIo($stream, $operation, $isOutOfBand)
242
    {
243
        /** map[isServer][operation][isOutOfBand] = result */
244
        $map = [
245
            0 => [
246
                // https://bugs.php.net/bug.php?id=65137
247 56
                OperationInterface::OPERATION_READ => [
248 56
                    0 => stream_socket_recvfrom($stream, 1, STREAM_PEEK) !== false,
249 56
                    1 => stream_socket_recvfrom($stream, 1, STREAM_PEEK | STREAM_OOB) !== false
250 56
                ],
251 56
                OperationInterface::OPERATION_WRITE => [
252 56
                    0 => true,
253
                    1 => true
254 56
                ]
255 56
            ],
256
            1 => [
257 56
                OperationInterface::OPERATION_READ => [
258 56
                    0 => true,
259
                    1 => true
260 56
                ],
261 56
                OperationInterface::OPERATION_WRITE => [
262 56
                    0 => true,
263
                    1 => true
264 56
                ]
265 56
            ]
266 56
        ];
267
268 56
        $serverIdx = (int) (bool) $this->isSocketServer($stream);
269 56
        $oobIdx    = (int) (bool) $isOutOfBand;
270
271 56
        return $map[$serverIdx][$operation][$oobIdx];
272
    }
273
274
    /**
275
     * Remove given socket from all operations
276
     *
277
     * @param StreamResourceInterface $streamResource Resource object
278
     *
279
     * @return void
280
     */
281 37
    public function removeAllSocketOperations(StreamResourceInterface $streamResource)
282
    {
283 37
        $opList = [ OperationInterface::OPERATION_READ,
284 37
                    OperationInterface::OPERATION_WRITE  ];
285
286 37
        foreach ($opList as $op) {
287 37
            $this->removeSocketOperation($streamResource, $op);
288 37
        }
289 37
    }
290
291
    /**
292
     * Make stream_select call
293
     *
294
     * @param int        $seconds Amount of seconds to wait
295
     * @param int        $usec Amount of microseconds to add to $seconds
296
     * @param resource[] &$read List of sockets to check for read. After function return it will be filled with
297
     *      sockets, which are ready to read
298
     * @param resource[] &$write List of sockets to check for write. After function return it will be filled with
299
     *      sockets, which are ready to write
300
     * @param resource[] &$oob After call it will be filled with sockets having OOB data, input value is ignored
301
     *
302
     * @return int Amount of sockets ready for I/O
303
     */
304 66
    private function doStreamSelect(
305
        $seconds,
306
        $usec = null,
307
        array &$read = null,
308
        array &$write = null,
309
        array &$oob = null
310
    ) {
311 66
        $oob    = array_merge((array) $read, (array) $write);
312 66
        $result = stream_select($read, $write, $oob, $seconds, $usec);
313 66
        if ($result === false) {
314 2
            throw new SocketException('Failed to select sockets');
315
        }
316
317 64
        $result = count($read) + count($write) + count($oob);
318 64
        if ($result === 0) {
319 10
            throw new TimeoutException('Select operation was interrupted during timeout');
320
        }
321
322 56
        return $result;
323
    }
324
325
    /**
326
     * Calculate amount of attempts for select operation
327
     *
328
     * @param int|null $seconds Amount of seconds
329
     * @param int|null $usec Amount of microseconds
330
     *
331
     * @return int
332
     */
333 66
    private function calculateAttemptsCount($seconds, $usec)
334
    {
335 66
        $result = $seconds !== null ? ceil(($seconds * 1E6 + $usec) / self::ATTEMPT_DELAY) :
336 66
            self::ATTEMPT_COUNT_FOR_INFINITE_TIMEOUT;
337 66
        if ($result < self::ATTEMPT_COUNT_FOR_INFINITE_TIMEOUT) {
338 21
            $result = self::ATTEMPT_COUNT_FOR_INFINITE_TIMEOUT;
339 21
        }
340
        
341 66
        return $result;
342
    }
343
344
    /**
345
     * Check whether given resource is server socket
346
     *
347
     * @param resource $resource Resource to test
348
     *
349
     * @return bool
350
     */
351 56
    private function isSocketServer($resource)
352
    {
353 56
        return stream_socket_get_name($resource, false) &&
354 56
               !stream_socket_get_name($resource, true);
355
    }
356
}
357