Completed
Branch 0.4-dev (d01bc8)
by Evgenij
05:19 queued 02:49
created

AsyncSelector::isActuallyReadyForIo()   B

Complexity

Conditions 7
Paths 8

Size

Total Lines 14
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 7

Importance

Changes 2
Bugs 0 Features 0
Metric Value
c 2
b 0
f 0
dl 0
loc 14
ccs 4
cts 4
cp 1
rs 8.2222
cc 7
eloc 7
nc 8
nop 3
crap 7
1
<?php
2
/**
3
 * Async sockets
4
 *
5
 * @copyright Copyright (c) 2015-2016, Efimov Evgenij <[email protected]>
6
 *
7
 * This source file is subject to the MIT license that is bundled
8
 * with this source code in the file LICENSE.
9
 */
10
11
namespace AsyncSockets\Socket;
12
13
use AsyncSockets\Exception\SocketException;
14
use AsyncSockets\Exception\TimeoutException;
15
use AsyncSockets\Operation\OperationInterface;
16
17
/**
18
 * Class AsyncSelector
19
 */
20
class AsyncSelector
21
{
22
    /**
23
     * Delay in microseconds between select attempts, if previous stream_select returned incorrect result
24
     * @link https://bugs.php.net/bug.php?id=65137
25
     */
26
    const ATTEMPT_DELAY = 250000;
27
28
    /**
29
     * Attempt count to use when time out is not set
30
     */
31
    const ATTEMPT_COUNT_FOR_INFINITE_TIMEOUT = 10;
32
33
    /**
34
     * Array of resources indexed by operation
35
     *
36
     * @var StreamResourceInterface[][]
37
     */
38
    private $streamResources = [];
39
40
    /**
41
     * Wait socket resources for network operation
42
     *
43
     * @param int $seconds Number of seconds to wait
44
     * @param int $usec Number of microseconds to add
45
     *
46
     * @return SelectContext
47
     * @throws TimeoutException If operation was interrupted during timeout
48
     * @throws SocketException If network operation failed
49
     * @throws \InvalidArgumentException If there is no socket in the list
50
     */
51 67
    public function select($seconds, $usec = null)
52
    {
53 67
        if (!$this->streamResources) {
54 5
            throw new \InvalidArgumentException('Can not perform select on empty data');
55
        }
56
57 62
        $read     = $this->getSocketsForOperation(OperationInterface::OPERATION_READ);
58 62
        $write    = $this->getSocketsForOperation(OperationInterface::OPERATION_WRITE);
59 62
        $attempts = $this->calculateAttemptsCount($seconds, $usec);
60
61
        do {
62 62
            $this->doStreamSelect($seconds, $usec, $read, $write, $oob);
63
64 52
            $readyRead  = $this->popSocketsByResources((array) $read, OperationInterface::OPERATION_READ, false);
65 52
            $readyWrite = $this->popSocketsByResources((array) $write, OperationInterface::OPERATION_WRITE, false);
66
            $readyOob   = array_merge(
67 52
                $this->popSocketsByResources((array) $oob, OperationInterface::OPERATION_READ, true),
68 51
                $this->popSocketsByResources((array) $oob, OperationInterface::OPERATION_WRITE, true)
69 51
            );
70
71
            if ($readyRead || $readyWrite || $readyOob) {
1 ignored issue
show
Bug Best Practice introduced by
The expression $readyOob of type AsyncSockets\Socket\StreamResourceInterface[] is implicitly converted to a boolean; are you sure this is intended? If so, consider using ! empty($expr) instead to make it clear that you intend to check for an array without elements.

This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.

Consider making the comparison explicit by using empty(..) or ! empty(...) instead.

Loading history...
72 1
                $this->streamResources = [];
73 1
                return new SelectContext($readyRead, $readyWrite, $readyOob);
74 1
            }
75 1
76 1
            $attempts -= 1;
77
            if ($attempts) {
78 1
                usleep(self::ATTEMPT_DELAY);
79
            }
80
        } while ($attempts);
81
82
        throw new TimeoutException('Select operation was interrupted during timeout');
83
    }
84
85
    /**
86
     * Add socket into selector list
87
     *
88
     * @param StreamResourceInterface $streamResource Resource object
89 66
     * @param string                  $operation One of OperationInterface::OPERATION_* consts
90
     *
91 66
     * @return void
92 66
     */
93
    public function addSocketOperation(StreamResourceInterface $streamResource, $operation)
94
    {
95
        $this->streamResources[$operation][spl_object_hash($streamResource)] = $streamResource;
96
    }
97
98
    /**
99
     * Add array of socket with specified operation
100
     *
101
     * @param StreamResourceInterface[] $streamResources List of resources. Value depends on second argument.
102
     *                                     If string is provided, then it must be array of StreamResourceInterface.
103
     *                                     If $operation parameter is omitted then this argument must contain
104
     *                                     pairs [StreamResourceInterface, operation] for each element
105
     * @param string                    $operation Operation, one of OperationInterface::OPERATION_* consts
106 8
     *
107
     * @return void
108 8
     * @throws \InvalidArgumentException
109 8
     */
110 2
    public function addSocketOperationArray(array $streamResources, $operation = null)
111 2
    {
112 6
        foreach ($streamResources as $streamResource) {
113 2
            if ($operation !== null) {
114
                $this->addSocketOperation($streamResource, $operation);
115 2
            } else {
116
                if (!is_array($streamResource) || count($streamResource) !== 2) {
117
                    throw new \InvalidArgumentException(
118 4
                        'First parameter must contain pair (SocketInterface, operation)'
119
                    );
120 6
                }
121 6
122
                $this->addSocketOperation(reset($streamResource), end($streamResource));
123
            }
124
        }
125
    }
126
127
    /**
128
     * Remove given socket from select list
129
     *
130
     * @param StreamResourceInterface $streamResource Stream resource object
131 59
     * @param string                  $operation One of OperationInterface::OPERATION_* consts
132
     *
133 59
     * @return void
134 59
     */
135 59
    public function removeSocketOperation(StreamResourceInterface $streamResource, $operation)
136 59
    {
137 59
        $hash = spl_object_hash($streamResource);
138 59
        if (isset($this->streamResources[$operation], $this->streamResources[$operation][$hash])) {
139 59
            unset($this->streamResources[$operation][$hash]);
140 59
            if (!$this->streamResources[$operation]) {
141
                unset($this->streamResources[$operation]);
142
            }
143
        }
144
    }
145
146
    /**
147
     * Remove all previously defined operations on this socket and adds socket into list of given operation
148
     *
149
     * @param StreamResourceInterface $streamResource Stream resource object
150 2
     * @param string                  $operation One of OperationInterface::OPERATION_* consts
151
     *
152 2
     * @return void
153
     */
154 2
    public function changeSocketOperation(StreamResourceInterface $streamResource, $operation)
155 2
    {
156
        $this->removeAllSocketOperations($streamResource);
157
158
        $this->addSocketOperation($streamResource, $operation);
159
    }
160
161
    /**
162
     * Return socket objects for operations
163
     *
164 62
     * @param string $operation One of OperationInterface::OPERATION_* consts
165
     *
166 62
     * @return resource[]|null List of socket resource
167 62
     */
168
    private function getSocketsForOperation($operation)
169
    {
170 62
        if (!isset($this->streamResources[$operation])) {
171 62
            return null;
172
        }
173 62
174 62
        $result = [];
175
        foreach ($this->streamResources[$operation] as $socket) {
176 62
            /** @var StreamResourceInterface $socket */
177
            $result[] = $socket->getStreamResource();
178
        }
179
180
        return $result ?: null;
181
    }
182
183
    /**
184
     * Get socket objects by resources and remove them from work list
185
     *
186
     * @param resource[] $resources Stream resources
187 52
     * @param string     $operation One of OperationInterface::OPERATION_* consts
188
     * @param bool       $isOutOfBand Is it OOB operation
189 52
     *
190 52
     * @return StreamResourceInterface[]
191
     */
192
    private function popSocketsByResources(array $resources, $operation, $isOutOfBand)
193 52
    {
194 52
        if (!$resources || !isset($this->streamResources[$operation])) {
195
            return [];
196 52
        }
197 52
198 52
        $result = [];
199
        foreach ($this->streamResources[$operation] as $socket) {
200 52
            /** @var StreamResourceInterface $socket */
201 51
            $socketResource = $socket->getStreamResource();
202 51
            $isReadySocket  = in_array($socketResource, $resources, true) &&
203 51
                                 $this->isActuallyReadyForIo($socketResource, $operation, $isOutOfBand);
204 52
205
            if ($isReadySocket) {
206 52
                //$this->removeSocketOperation($socket, $operation);
0 ignored issues
show
Unused Code Comprehensibility introduced by
80% of this comment could be valid code. Did you maybe forget this after debugging?

Sometimes obsolete code just ends up commented out instead of removed. In this case it is better to remove the code once you have checked you do not need it.

The code might also have been commented out for debugging purposes. In this case it is vital that someone uncomments it again or your project may behave in very unexpected ways in production.

This check looks for comments that seem to be mostly valid code and reports them.

Loading history...
207
                $result[] = $socket;
208
            }
209
        }
210
211
        return $result;
212
    }
213
214
    /**
215
     * Checks whether given socket can process I/O operation after stream_select return
216
     *
217 52
     * @param resource $stream Socket resource
218
     * @param string   $operation One of OperationInterface::OPERATION_* consts
219 52
     * @param bool     $isOutOfBand Is it OOB operation
220 51
     *
221
     * @return bool
222
     */
223 27
    private function isActuallyReadyForIo($stream, $operation, $isOutOfBand)
224 51
    {
225
        return $this->isSocketServer($stream) || (
226 52
            $operation === OperationInterface::OPERATION_READ &&
227
228
            // https://bugs.php.net/bug.php?id=65137
229
            (
230
                (!$isOutOfBand && stream_socket_recvfrom($stream, 1, STREAM_PEEK) !== false) ||
231
                ($isOutOfBand && stream_socket_recvfrom($stream, 1, STREAM_PEEK | STREAM_OOB) !== false)
232
            )
233
        ) || (
234
            $operation === OperationInterface::OPERATION_WRITE
235
        );
236 35
    }
237
238 35
    /**
239 35
     * Remove given socket from all operations
240
     *
241 35
     * @param StreamResourceInterface $streamResource Resource object
242 35
     *
243 35
     * @return void
244 35
     */
245
    public function removeAllSocketOperations(StreamResourceInterface $streamResource)
246
    {
247
        $opList = [ OperationInterface::OPERATION_READ,
248
                    OperationInterface::OPERATION_WRITE  ];
249
250
        foreach ($opList as $op) {
251
            $this->removeSocketOperation($streamResource, $op);
252
        }
253
    }
254
255
    /**
256
     * Make stream_select call
257
     *
258
     * @param int        $seconds Amount of seconds to wait
259 62
     * @param int        $usec Amount of microseconds to add to $seconds
260
     * @param resource[] &$read List of sockets to check for read. After function return it will be filled with
261 62
     *      sockets, which are ready to read
262 62
     * @param resource[] &$write List of sockets to check for write. After function return it will be filled with
263 62
     *      sockets, which are ready to write
264 2
     * @param resource[] &$oob After call it will be filled with sockets having OOB data, input value is ignored
265
     *
266
     * @return int Amount of sockets ready for I/O
267 60
     */
268 60
    private function doStreamSelect(
269 10
        $seconds,
270
        $usec = null,
271
        array &$read = null,
272 52
        array &$write = null,
273
        array &$oob = null
274
    ) {
275
        $oob    = array_merge((array) $read, (array) $write);
276
        $result = stream_select($read, $write, $oob, $seconds, $usec);
277
        if ($result === false) {
278
            throw new SocketException('Failed to select sockets');
279
        }
280
281
        $result = count($read) + count($write) + count($oob);
282
        if ($result === 0) {
283 62
            throw new TimeoutException('Select operation was interrupted during timeout');
284
        }
285 62
286 62
        return $result;
287 62
    }
288 21
289 21
    /**
290
     * Calculate amount of attempts for select operation
291 62
     *
292
     * @param int|null $seconds Amount of seconds
293
     * @param int|null $usec Amount of microseconds
294
     *
295
     * @return int
296
     */
297
    private function calculateAttemptsCount($seconds, $usec)
298
    {
299
        $result = $seconds !== null ? ceil(($seconds * 1E6 + $usec) / self::ATTEMPT_DELAY) :
300
            self::ATTEMPT_COUNT_FOR_INFINITE_TIMEOUT;
301 52
        if ($result < self::ATTEMPT_COUNT_FOR_INFINITE_TIMEOUT) {
302
            $result = self::ATTEMPT_COUNT_FOR_INFINITE_TIMEOUT;
303 52
        }
304 52
        
305
        return $result;
306
    }
307
308
    /**
309
     * Check whether given resource is server socket
310
     *
311
     * @param resource $resource Resource to test
312
     *
313
     * @return bool
314
     */
315
    private function isSocketServer($resource)
316
    {
317
        return stream_socket_get_name($resource, false) &&
318
               !stream_socket_get_name($resource, true);
319
    }
320
}
321