Passed
Push — master ( a91d54...875978 )
by y
02:08
created

Reactor::select()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 15
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 12
nc 2
nop 4
dl 0
loc 15
rs 9.8666
c 0
b 0
f 0
1
<?php
2
3
namespace Helix\Socket;
4
5
use Countable;
6
7
/**
8
 * Selects and calls reactive sockets when they are readable.
9
 */
10
class Reactor implements Countable {
11
12
    /**
13
     * All sockets in the reactor, keyed by ID.
14
     *
15
     * @var ReactiveInterface[]
16
     */
17
    protected $sockets = [];
18
19
    /**
20
     * Selects instances.
21
     *
22
     * @link https://php.net/socket_select
23
     *
24
     * @param SocketInterface[] $read
25
     * @param SocketInterface[] $write
26
     * @param SocketInterface[] $except
27
     * @param float|null $timeout Maximum seconds to block. `NULL` blocks forever.
28
     * @return int
29
     * @throws SocketError
30
     */
31
    public static function select (array &$read, array &$write, array &$except, $timeout = null) {
32
        $rwe = [$read, $write, $except];
33
        array_walk_recursive($rwe, function(SocketInterface &$each) {
34
            $each = $each->getResource();
35
        });
36
        $uSec = (int)(fmod($timeout, 1) * 1000000); // ignored if timeout is null
37
        $count = @socket_select($rwe[0], $rwe[1], $rwe[2], $timeout, $uSec); // keys are preserved
0 ignored issues
show
Bug introduced by
It seems like $timeout can also be of type double; however, parameter $tv_sec of socket_select() does only seem to accept integer, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

37
        $count = @socket_select($rwe[0], $rwe[1], $rwe[2], /** @scrutinizer ignore-type */ $timeout, $uSec); // keys are preserved
Loading history...
38
        if ($count === false) {
39
            $read = $write = $except = [];
40
            throw new SocketError;
41
        }
42
        $read = array_intersect_key($read, $rwe[0]);
43
        $write = array_intersect_key($write, $rwe[1]);
44
        $except = array_intersect_key($except, $rwe[2]);
45
        return $count;
46
    }
47
48
    /**
49
     * Adds a socket for selection.
50
     *
51
     * @param ReactiveInterface $socket
52
     * @return $this
53
     */
54
    public function add (ReactiveInterface $socket) {
55
        $this->sockets[$socket->getId()] = $socket;
56
        return $this;
57
    }
58
59
    /**
60
     * The number of sockets in the reactor.
61
     *
62
     * @return int
63
     */
64
    public function count () {
65
        return count($this->sockets);
66
    }
67
68
    /**
69
     * @return SocketInterface[]
70
     */
71
    public function getSockets () {
72
        return $this->sockets;
73
    }
74
75
    /**
76
     * Selects sockets for readability and calls their reactive methods.
77
     *
78
     * Invoke this in a loop that checks the reactor count as a condition.
79
     *
80
     * Closed sockets are automatically removed from the reactor.
81
     *
82
     * @param float|null $timeout Maximum seconds to block. `NULL` blocks forever.
83
     * @return int Number of sockets selected.
84
     */
85
    public function react ($timeout = null) {
86
        /** @var ReactiveInterface[][] $rwe */
87
        $rwe = [$this->sockets, [], $this->sockets];
88
        $count = static::select($rwe[0], $rwe[1], $rwe[2], $timeout);
89
        try {
90
            foreach ($rwe[2] as $id => $socket) {
91
                $socket->onOutOfBand();
92
            }
93
            foreach ($rwe[0] as $id => $socket) {
94
                $socket->onReadable();
95
            }
96
        }
97
        finally {
98
            array_walk_recursive($rwe, function(ReactiveInterface $each) {
99
                if (!$each->isOpen()) {
100
                    $this->remove($each);
101
                }
102
            });
103
        }
104
        return $count;
105
    }
106
107
    /**
108
     * Removes a socket from the reactor by ID.
109
     *
110
     * @param int|ReactiveInterface $id
111
     * @return $this
112
     */
113
    public function remove ($id) {
114
        unset($this->sockets[$id instanceof ReactiveInterface ? $id->getId() : $id]);
115
        return $this;
116
    }
117
118
}
119