Completed
Push — master ( 6d7292...93f7a5 )
by y
01:26
created

Reactor::react()   B

Complexity

Conditions 6
Paths 11

Size

Total Lines 22

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 22
rs 8.9457
c 0
b 0
f 0
cc 6
nc 11
nop 1
1
<?php
2
3
namespace Helix\Socket;
4
5
use Countable;
6
use Helix\Socket\WebSocket\WebSocketClient;
7
use Helix\Socket\WebSocket\WebSocketError;
8
use Throwable;
9
10
/**
11
 * Selects and calls reactive sockets when they are readable.
12
 */
13
class Reactor implements Countable {
14
15
    /**
16
     * All sockets in the reactor, keyed by ID.
17
     *
18
     * @var ReactiveInterface[]
19
     */
20
    protected $sockets = [];
21
22
    /**
23
     * Selects instances. Can be used to select non-reactive sockets.
24
     *
25
     * @see https://php.net/socket_select
26
     *
27
     * @param SocketInterface[] $read
28
     * @param SocketInterface[] $write
29
     * @param SocketInterface[] $except
30
     * @param float|null $timeout Maximum seconds to block. `NULL` blocks forever.
31
     * @return int
32
     * @throws SocketError
33
     */
34
    public static function select (array &$read, array &$write, array &$except, ?float $timeout = null): int {
35
        $rwe = [$read, $write, $except];
36
        array_walk_recursive($rwe, function(SocketInterface &$each) {
37
            $each = $each->getResource();
38
        });
39
        $uSec = (int)(fmod($timeout, 1) * 1000000); // ignored if timeout is null
40
        $count = @socket_select($rwe[0], $rwe[1], $rwe[2], $timeout, $uSec); // keys are preserved
41
        if ($count === false) {
42
            $read = $write = $except = [];
43
            throw new SocketError;
44
        }
45
        $read = array_intersect_key($read, $rwe[0]);
46
        $write = array_intersect_key($write, $rwe[1]);
47
        $except = array_intersect_key($except, $rwe[2]);
48
        return $count;
49
    }
50
51
    /**
52
     * Adds a reactive socket for selection.
53
     *
54
     * @param ReactiveInterface $socket
55
     * @return $this
56
     */
57
    public function add (ReactiveInterface $socket) {
58
        $this->sockets[$socket->getId()] = $socket;
59
        return $this;
60
    }
61
62
    /**
63
     * The number of reactive sockets in the reactor.
64
     *
65
     * @return int
66
     */
67
    public function count (): int {
68
        return count($this->sockets);
69
    }
70
71
    /**
72
     * @return ReactiveInterface[]
73
     */
74
    public function getSockets () {
75
        return $this->sockets;
76
    }
77
78
    /**
79
     * Whether a socket is in the reactor.
80
     *
81
     * @param ReactiveInterface $socket
82
     * @return bool
83
     */
84
    public function has (ReactiveInterface $socket): bool {
85
        return isset($sockets[$socket->getId()]);
0 ignored issues
show
Bug introduced by
The variable $sockets does not exist. Did you mean $socket?

This check looks for variables that are accessed but have not been defined. It raises an issue if it finds another variable that has a similar name.

The variable may have been renamed without also renaming all references.

Loading history...
86
    }
87
88
    /**
89
     * @param int $channel
90
     * @param ReactiveInterface $socket
91
     * @param Throwable $error
92
     */
93
    public function onError (int $channel, $socket, Throwable $error): void {
94
        unset($channel);
95
        echo "{$error}\n\n";
96
        if ($socket->isOpen()) {
97
            if ($socket instanceof WebSocketClient and $error instanceof WebSocketError) {
98
                $socket->close($error->getCode(), $error->getMessage());
99
            }
100
            else {
101
                $socket->close();
102
            }
103
        }
104
    }
105
106
    /**
107
     * Selects the reactor's sockets and calls their reactive methods.
108
     *
109
     * Invoke this in a loop that checks {@link Reactor::count()} a condition.
110
     *
111
     * Closed sockets are automatically removed from the reactor.
112
     *
113
     * @param float|null $timeout Maximum seconds to block. `NULL` blocks forever.
114
     * @return int Number of sockets selected.
115
     */
116
    public function react (?float $timeout = null): int {
117
        /** @var ReactiveInterface[][] $rwe */
118
        $rwe = [$this->sockets, [], $this->sockets];
119
        $count = static::select($rwe[0], $rwe[1], $rwe[2], $timeout);
120
        foreach ([2 => 'onOutOfBand', 0 => 'onReadable'] as $channel => $method) {
121
            foreach ($rwe[$channel] as $id => $socket) {
122
                try {
123
                    $socket->{$method}();
124
                }
125
                catch (Throwable $error) {
126
                    unset($rwe[0][$id]); // prevent onReadable() if this is an OOB error.
127
                    $this->onError($channel, $socket, $error);
128
                }
129
                finally {
130
                    if (!$socket->isOpen() and $this->has($socket)) {
131
                        $this->remove($socket);
132
                    }
133
                }
134
            }
135
        }
136
        return $count;
137
    }
138
139
    /**
140
     * Removes a socket from the reactor.
141
     *
142
     * @param ReactiveInterface $socket
143
     * @return $this
144
     */
145
    public function remove (ReactiveInterface $socket) {
146
        unset($this->sockets[$socket->getId()]);
147
        return $this;
148
    }
149
}
150