Completed
Branch master (1d1574)
by Evgenij
18:38
created

AsyncSelector   B

Complexity

Total Complexity 40

Size/Duplication

Total Lines 287
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 4

Test Coverage

Coverage 100%

Importance

Changes 14
Bugs 0 Features 1
Metric Value
wmc 40
c 14
b 0
f 1
lcom 1
cbo 4
dl 0
loc 287
ccs 99
cts 99
cp 1
rs 8.2608

12 Methods

Rating   Name   Duplication   Size   Complexity  
B select() 0 29 6
A addSocketOperation() 0 4 1
B addSocketOperationArray() 0 16 5
A removeSocketOperation() 0 10 3
A changeSocketOperation() 0 6 1
A getSocketsForOperation() 0 14 4
B popSocketsByResources() 0 21 6
A isActuallyReadyForIo() 0 11 4
A removeAllSocketOperations() 0 9 2
A doStreamSelect() 0 15 3
A calculateAttemptsCount() 0 10 3
A isSocketServer() 0 5 2

How to fix   Complexity   

Complex Class

Complex classes like AsyncSelector often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes. You can also have a look at the cohesion graph to spot any un-connected, or weakly-connected components.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use AsyncSelector, and based on these observations, apply Extract Interface, too.

1
<?php
2
/**
3
 * Async sockets
4
 *
5
 * @copyright Copyright (c) 2015-2016, Efimov Evgenij <[email protected]>
6
 *
7
 * This source file is subject to the MIT license that is bundled
8
 * with this source code in the file LICENSE.
9
 */
10
11
namespace AsyncSockets\Socket;
12
13
use AsyncSockets\Exception\SocketException;
14
use AsyncSockets\Exception\TimeoutException;
15
use AsyncSockets\Operation\OperationInterface;
16
17
/**
18
 * Class AsyncSelector
19
 */
20
class AsyncSelector
21
{
22
    /**
23
     * Delay in microseconds between select attempts, if previous stream_select returned incorrect result
24
     * @link https://bugs.php.net/bug.php?id=65137
25
     */
26
    const ATTEMPT_DELAY = 250000;
27
28
    /**
29
     * Attempt count to use when time out is not set
30
     */
31
    const ATTEMPT_COUNT_FOR_INFINITE_TIMEOUT = 10;
32
33
    /**
34
     * Array of resources indexed by operation
35
     *
36
     * @var StreamResourceInterface[][]
37
     */
38
    private $streamResources = [];
39
40
    /**
41
     * Wait socket resources for network operation
42
     *
43
     * @param int $seconds Number of seconds to wait
44
     * @param int $usec Number of microseconds to add
45
     *
46
     * @return SelectContext
47
     * @throws TimeoutException If operation was interrupted during timeout
48
     * @throws SocketException If network operation failed
49
     * @throws \InvalidArgumentException If there is no socket in the list
50
     */
51 67
    public function select($seconds, $usec = null)
52
    {
53 67
        if (!$this->streamResources) {
54 5
            throw new \InvalidArgumentException('Can not perform select on empty data');
55
        }
56
57 62
        $read     = $this->getSocketsForOperation(OperationInterface::OPERATION_READ);
58 62
        $write    = $this->getSocketsForOperation(OperationInterface::OPERATION_WRITE);
59 62
        $attempts = $this->calculateAttemptsCount($seconds, $usec);
60
61
        do {
62 62
            $this->doStreamSelect($seconds, $usec, $read, $write);
0 ignored issues
show
Bug introduced by
It seems like $read defined by $this->getSocketsForOper...erface::OPERATION_READ) on line 57 can also be of type array; however, AsyncSockets\Socket\Asyn...ector::doStreamSelect() does only seem to accept null|array<integer,resource>, maybe add an additional type check?

If a method or function can return multiple different values and unless you are sure that you only can receive a single value in this context, we recommend to add an additional type check:

/**
 * @return array|string
 */
function returnsDifferentValues($x) {
    if ($x) {
        return 'foo';
    }

    return array();
}

$x = returnsDifferentValues($y);
if (is_array($x)) {
    // $x is an array.
}

If this a common case that PHP Analyzer should handle natively, please let us know by opening an issue.

Loading history...
Bug introduced by
It seems like $write defined by $this->getSocketsForOper...rface::OPERATION_WRITE) on line 58 can also be of type array; however, AsyncSockets\Socket\Asyn...ector::doStreamSelect() does only seem to accept null|array<integer,resource>, maybe add an additional type check?

If a method or function can return multiple different values and unless you are sure that you only can receive a single value in this context, we recommend to add an additional type check:

/**
 * @return array|string
 */
function returnsDifferentValues($x) {
    if ($x) {
        return 'foo';
    }

    return array();
}

$x = returnsDifferentValues($y);
if (is_array($x)) {
    // $x is an array.
}

If this a common case that PHP Analyzer should handle natively, please let us know by opening an issue.

Loading history...
63
64 52
            $readyRead  = $this->popSocketsByResources((array) $read, OperationInterface::OPERATION_READ);
65 52
            $readyWrite = $this->popSocketsByResources((array) $write, OperationInterface::OPERATION_WRITE);
66
67 52
            if ($readyRead || $readyWrite) {
68 51
                $this->streamResources = [];
69 51
                return new SelectContext($readyRead, $readyWrite);
70
            }
71
72 1
            $attempts -= 1;
73 1
            if ($attempts) {
74 1
                usleep(self::ATTEMPT_DELAY);
75 1
            }
76 1
        } while ($attempts);
77
78 1
        throw new TimeoutException('Select operation was interrupted during timeout');
79
    }
80
81
    /**
82
     * Add socket into selector list
83
     *
84
     * @param StreamResourceInterface $streamResource Resource object
85
     * @param string                  $operation One of OperationInterface::OPERATION_* consts
86
     *
87
     * @return void
88
     */
89 66
    public function addSocketOperation(StreamResourceInterface $streamResource, $operation)
90
    {
91 66
        $this->streamResources[$operation][spl_object_hash($streamResource)] = $streamResource;
92 66
    }
93
94
    /**
95
     * Add array of socket with specified operation
96
     *
97
     * @param StreamResourceInterface[] $streamResources List of resources. Value depends on second argument.
98
     *                                     If string is provided, then it must be array of StreamResourceInterface.
99
     *                                     If $operation parameter is omitted then this argument must contain
100
     *                                     pairs [StreamResourceInterface, operation] for each element
101
     * @param string                    $operation Operation, one of OperationInterface::OPERATION_* consts
102
     *
103
     * @return void
104
     * @throws \InvalidArgumentException
105
     */
106 8
    public function addSocketOperationArray(array $streamResources, $operation = null)
107
    {
108 8
        foreach ($streamResources as $streamResource) {
109 8
            if ($operation !== null) {
110 2
                $this->addSocketOperation($streamResource, $operation);
111 2
            } else {
112 6
                if (!is_array($streamResource) || count($streamResource) !== 2) {
113 2
                    throw new \InvalidArgumentException(
114
                        'First parameter must contain pair (SocketInterface, operation)'
115 2
                    );
116
                }
117
118 4
                $this->addSocketOperation(reset($streamResource), end($streamResource));
119
            }
120 6
        }
121 6
    }
122
123
    /**
124
     * Remove given socket from select list
125
     *
126
     * @param StreamResourceInterface $streamResource Stream resource object
127
     * @param string                  $operation One of OperationInterface::OPERATION_* consts
128
     *
129
     * @return void
130
     */
131 59
    public function removeSocketOperation(StreamResourceInterface $streamResource, $operation)
132
    {
133 59
        $hash = spl_object_hash($streamResource);
134 59
        if (isset($this->streamResources[$operation], $this->streamResources[$operation][$hash])) {
135 59
            unset($this->streamResources[$operation][$hash]);
136 59
            if (!$this->streamResources[$operation]) {
137 59
                unset($this->streamResources[$operation]);
138 59
            }
139 59
        }
140 59
    }
141
142
    /**
143
     * Remove all previously defined operations on this socket and adds socket into list of given operation
144
     *
145
     * @param StreamResourceInterface $streamResource Stream resource object
146
     * @param string                  $operation One of OperationInterface::OPERATION_* consts
147
     *
148
     * @return void
149
     */
150 2
    public function changeSocketOperation(StreamResourceInterface $streamResource, $operation)
151
    {
152 2
        $this->removeAllSocketOperations($streamResource);
153
154 2
        $this->addSocketOperation($streamResource, $operation);
155 2
    }
156
157
    /**
158
     * Return socket objects for operations
159
     *
160
     * @param string $operation One of OperationInterface::OPERATION_* consts
161
     *
162
     * @return resource[]|null List of socket resource
163
     */
164 62
    private function getSocketsForOperation($operation)
165
    {
166 62
        if (!isset($this->streamResources[$operation])) {
167 62
            return null;
168
        }
169
170 62
        $result = [];
171 62
        foreach ($this->streamResources[$operation] as $socket) {
172
            /** @var StreamResourceInterface $socket */
173 62
            $result[] = $socket->getStreamResource();
174 62
        }
175
176 62
        return $result ?: null;
177
    }
178
179
    /**
180
     * Get socket objects by resources and remove them from work list
181
     *
182
     * @param resource[] $resources Stream resources
183
     * @param string     $operation One of OperationInterface::OPERATION_* consts
184
     *
185
     * @return StreamResourceInterface[]
186
     */
187 52
    private function popSocketsByResources(array $resources, $operation)
188
    {
189 52
        if (!$resources || !isset($this->streamResources[$operation])) {
190 52
            return [];
191
        }
192
193 52
        $result = [];
194 52
        foreach ($this->streamResources[$operation] as $socket) {
195
            /** @var StreamResourceInterface $socket */
196 52
            $socketResource = $socket->getStreamResource();
197 52
            $isReadySocket  = in_array($socketResource, $resources, true) &&
198 52
                                 $this->isActuallyReadyForIo($socketResource, $operation);
199
200 52
            if ($isReadySocket) {
201 51
                $this->removeSocketOperation($socket, $operation);
202 51
                $result[] = $socket;
203 51
            }
204 52
        }
205
206 52
        return $result;
207
    }
208
209
    /**
210
     * Checks whether given socket can process I/O operation after stream_select return
211
     *
212
     * @param resource $stream Socket resource
213
     * @param string   $operation One of OperationInterface::OPERATION_* consts
214
     *
215
     * @return bool
216
     */
217 52
    private function isActuallyReadyForIo($stream, $operation)
218
    {
219 52
        return $this->isSocketServer($stream) || (
220 51
            $operation === OperationInterface::OPERATION_READ &&
221
222
            // https://bugs.php.net/bug.php?id=65137
223 27
            stream_socket_recvfrom($stream, 1, STREAM_PEEK) !== false
224 51
        ) || (
225
            $operation === OperationInterface::OPERATION_WRITE
226 52
        );
227
    }
228
229
    /**
230
     * Remove given socket from all operations
231
     *
232
     * @param StreamResourceInterface $streamResource Resource object
233
     *
234
     * @return void
235
     */
236 35
    public function removeAllSocketOperations(StreamResourceInterface $streamResource)
237
    {
238 35
        $opList = [ OperationInterface::OPERATION_READ,
239 35
                    OperationInterface::OPERATION_WRITE  ];
240
241 35
        foreach ($opList as $op) {
242 35
            $this->removeSocketOperation($streamResource, $op);
243 35
        }
244 35
    }
245
246
    /**
247
     * Make stream_select call
248
     *
249
     * @param int $seconds Amount of seconds to wait
250
     * @param int $usec Amount of microseconds to add to $seconds
251
     * @param resource[] $read List of sockets to check for read. After function return it will be filled with
252
     *      sockets, which are ready to read
253
     * @param resource[] $write List of sockets to check for write. After function return it will be filled with
254
     *      sockets, which are ready to write
255
     *
256
     * @return int Amount of sockets ready for I/O
257
     * @throws SocketException
258
     */
259 62
    private function doStreamSelect($seconds, $usec = null, array &$read = null, array &$write = null)
260
    {
261 62
        $except = null;
262 62
        $result = stream_select($read, $write, $except, $seconds, $usec);
263 62
        if ($result === false) {
264 2
            throw new SocketException('Failed to select sockets');
265
        }
266
267 60
        $result = count($read) + count($write);
268 60
        if ($result === 0) {
269 10
            throw new TimeoutException('Select operation was interrupted during timeout');
270
        }
271
272 52
        return $result;
273
    }
274
275
    /**
276
     * Calculate amount of attempts for select operation
277
     *
278
     * @param int|null $seconds Amount of seconds
279
     * @param int|null $usec Amount of microseconds
280
     *
281
     * @return int
282
     */
283 62
    private function calculateAttemptsCount($seconds, $usec)
284
    {
285 62
        $result = $seconds !== null ? ceil(($seconds * 1E6 + $usec) / self::ATTEMPT_DELAY) :
286 62
            self::ATTEMPT_COUNT_FOR_INFINITE_TIMEOUT;
287 62
        if ($result < self::ATTEMPT_COUNT_FOR_INFINITE_TIMEOUT) {
288 21
            $result = self::ATTEMPT_COUNT_FOR_INFINITE_TIMEOUT;
289 21
        }
290
        
291 62
        return $result;
292
    }
293
294
    /**
295
     * Check whether given resource is server socket
296
     *
297
     * @param resource $resource Resource to test
298
     *
299
     * @return bool
300
     */
301 52
    private function isSocketServer($resource)
302
    {
303 52
        return stream_socket_get_name($resource, false) &&
304 52
               !stream_socket_get_name($resource, true);
305
    }
306
}
307