Completed
Push — master ( f836de...b537f9 )
by y
01:15
created

Reactor   A

Complexity

Total Complexity 14

Size/Duplication

Total Lines 124
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 5

Importance

Changes 0
Metric Value
wmc 14
lcom 1
cbo 5
dl 0
loc 124
rs 10
c 0
b 0
f 0

7 Methods

Rating   Name   Duplication   Size   Complexity  
A select() 0 16 2
A add() 0 4 1
A count() 0 3 1
A getSockets() 0 3 1
A onError() 0 10 3
A react() 0 21 5
A remove() 0 4 1
1
<?php
2
3
namespace Helix\Socket;
4
5
use Countable;
6
use Helix\Socket\WebSocket\WebSocketClient;
7
use Helix\Socket\WebSocket\WebSocketError;
8
use Throwable;
9
10
/**
11
 * Selects and calls reactive sockets when they are readable.
12
 */
13
class Reactor implements Countable {
14
15
    /**
16
     * All sockets in the reactor, keyed by ID.
17
     *
18
     * @var ReactiveInterface[]
19
     */
20
    protected $sockets = [];
21
22
    /**
23
     * Selects instances. Can be used to select non-reactive sockets.
24
     *
25
     * @see https://php.net/socket_select
26
     *
27
     * @param SocketInterface[] $read
28
     * @param SocketInterface[] $write
29
     * @param SocketInterface[] $except
30
     * @param float|null $timeout Maximum seconds to block. `NULL` blocks forever.
31
     * @return int
32
     * @throws SocketError
33
     */
34
    public static function select (array &$read, array &$write, array &$except, ?float $timeout = null): int {
35
        $rwe = [$read, $write, $except];
36
        array_walk_recursive($rwe, function(SocketInterface &$each) {
37
            $each = $each->getResource();
38
        });
39
        $uSec = (int)(fmod($timeout, 1) * 1000000); // ignored if timeout is null
40
        $count = @socket_select($rwe[0], $rwe[1], $rwe[2], $timeout, $uSec); // keys are preserved
41
        if ($count === false) {
42
            $read = $write = $except = [];
43
            throw new SocketError;
44
        }
45
        $read = array_intersect_key($read, $rwe[0]);
46
        $write = array_intersect_key($write, $rwe[1]);
47
        $except = array_intersect_key($except, $rwe[2]);
48
        return $count;
49
    }
50
51
    /**
52
     * Adds a reactive socket for selection.
53
     *
54
     * @param ReactiveInterface $socket
55
     * @return $this
56
     */
57
    public function add (ReactiveInterface $socket) {
58
        $this->sockets[$socket->getId()] = $socket;
59
        return $this;
60
    }
61
62
    /**
63
     * The number of reactive sockets in the reactor.
64
     *
65
     * @return int
66
     */
67
    public function count (): int {
68
        return count($this->sockets);
69
    }
70
71
    /**
72
     * @return ReactiveInterface[]
73
     */
74
    public function getSockets () {
75
        return $this->sockets;
76
    }
77
78
    /**
79
     * @param int $channel
80
     * @param ReactiveInterface $socket
81
     * @param Throwable $error
82
     */
83
    public function onError (int $channel, $socket, Throwable $error) {
84
        unset($channel);
85
        if ($socket instanceof WebSocketClient and $error instanceof WebSocketError) {
86
            $socket->close($error->getCode(), $error->getMessage());
87
        }
88
        else {
89
            $socket->close();
90
        }
91
        echo $error;
92
    }
93
94
    /**
95
     * Selects the reactor's sockets and calls their reactive methods.
96
     *
97
     * Invoke this in a loop that checks {@link Reactor::count()} a condition.
98
     *
99
     * Closed sockets are automatically removed from the reactor.
100
     *
101
     * @param float|null $timeout Maximum seconds to block. `NULL` blocks forever.
102
     * @return int Number of sockets selected.
103
     */
104
    public function react (?float $timeout = null): int {
105
        /** @var ReactiveInterface[][] $rwe */
106
        $rwe = [$this->sockets, [], $this->sockets];
107
        $count = static::select($rwe[0], $rwe[1], $rwe[2], $timeout);
108
        foreach ([2 => 'onOutOfBand', 0 => 'onReadable'] as $channel => $method) {
109
            foreach ($rwe[$channel] as $id => $socket) {
110
                try {
111
                    $socket->{$method}();
112
                }
113
                catch (Throwable $error) {
114
                    $this->onError($channel, $socket, $error);
115
                }
116
                finally {
117
                    if (!$socket->isOpen()) {
118
                        $this->remove($socket);
119
                    }
120
                }
121
            }
122
        }
123
        return $count;
124
    }
125
126
    /**
127
     * Removes a socket from the reactor.
128
     *
129
     * @param ReactiveInterface $socket
130
     * @return $this
131
     */
132
    public function remove (ReactiveInterface $socket) {
133
        unset($this->sockets[$socket->getId()]);
134
        return $this;
135
    }
136
}
137