StreamSocketConnectionPool   A
last analyzed

Complexity

Total Complexity 19

Size/Duplication

Total Lines 165
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 2

Importance

Changes 0
Metric Value
wmc 19
lcom 1
cbo 2
dl 0
loc 165
rs 10
c 0
b 0
f 0

8 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 10 1
A getReadableConnections() 0 25 4
A count() 0 6 1
A shutdown() 0 4 1
A close() 0 13 2
A selectConnections() 0 26 5
A acceptConnection() 0 15 2
A removeClosedConnections() 0 9 3
1
<?php
2
3
declare(strict_types=1);
4
5
namespace PHPFastCGI\FastCGIDaemon\Driver\Userland\Connection;
6
7
/**
8
 * The default implementation of the ConnectionPoolInterface using stream
9
 * sockets.
10
 */
11
final class StreamSocketConnectionPool implements ConnectionPoolInterface
12
{
13
    /**
14
     * @var resource
15
     */
16
    private $serverSocket;
17
18
    /**
19
     * @var resource[]
20
     */
21
    private $clientSockets;
22
23
    /**
24
     * @var ConnectionInterface[]
25
     */
26
    private $connections;
27
28
    /**
29
     * @var bool
30
     */
31
    private $shutdown;
32
33
    /**
34
     * Constructor.
35
     *
36
     * @param resource $socket The stream socket to accept connections from
37
     */
38
    public function __construct($socket)
39
    {
40
        stream_set_blocking($socket, false);
41
42
        $this->serverSocket  = $socket;
43
        $this->clientSockets = [];
44
        $this->connections   = [];
45
46
        $this->shutdown = false;
47
    }
48
49
    /**
50
     * {@inheritdoc}
51
     */
52
    public function getReadableConnections(int $timeout): array
53
    {
54
        $this->removeClosedConnections();
55
56
        $readSockets = $this->clientSockets;
57
58
        if (!$this->shutdown) {
59
            $readSockets['pool'] = $this->serverSocket;
60
        }
61
62
        $this->selectConnections($readSockets, $timeout);
63
64
        if (isset($readSockets['pool'])) {
65
            $this->acceptConnection();
66
            unset($readSockets['pool']);
67
        }
68
69
        $readableConnections = [];
70
71
        foreach (array_keys($readSockets) as $id) {
72
            $readableConnections[$id] = $this->connections[$id];
73
        }
74
75
        return $readableConnections;
76
    }
77
78
    /**
79
     * {@inheritdoc}
80
     */
81
    public function count(): int
82
    {
83
        $this->removeClosedConnections();
84
85
        return count($this->connections);
86
    }
87
88
    /**
89
     * {@inheritdoc}
90
     */
91
    public function shutdown(): void
92
    {
93
        $this->shutdown = true;
94
    }
95
96
    /**
97
     * {@inheritdoc}
98
     */
99
    public function close(): void
100
    {
101
        $this->removeClosedConnections();
102
103
        foreach ($this->connections as $id => $connection) {
104
            $connection->close();
105
106
            unset($this->clientSockets[$id]);
107
            unset($this->connections[$id]);
108
        }
109
110
        fclose($this->serverSocket);
111
    }
112
113
    /**
114
     * Uses the stream select function to eliminate all non-readable sockets
115
     * from the read sockets parameter.
116
     *
117
     * @param resource[] $readSockets The sockets to test for readability (output parameter)
118
     * @param int        $timeout     The stream select call timeout
119
     */
120
    private function selectConnections(&$readSockets, int $timeout): void
121
    {
122
        // stream_select will not always preserve array keys
123
        // call it with a (deep) copy so the original is preserved
124
        $read = [];
125
        foreach ($readSockets as $id => $socket) {
126
            $read[] = $socket;
127
        }
128
        $writeSockets = $exceptSockets = [];
129
130
        if (false === @stream_select($read, $writeSockets, $exceptSockets, $timeout)) {
131
            $error = error_get_last();
132
133
            if (false === stripos($error['message'], 'interrupted system call')) {
134
                throw new \RuntimeException('stream_select failed: '.$error['message']);
135
            }
136
137
            $readSockets = [];
138
        } else {
139
            $res = [];
140
            foreach($read as $socket) {
141
                $res[array_search($socket, $readSockets)] = $socket;
142
            }
143
            $readSockets = $res;
144
        }
145
    }
146
147
    /**
148
     * Accept incoming connections from the server stream socket.
149
     */
150
    private function acceptConnection(): void
151
    {
152
        $clientSocket = @stream_socket_accept($this->serverSocket);
153
154
        if (false !== $clientSocket) {
155
            stream_set_blocking($clientSocket, false);
156
157
            $connection = new StreamSocketConnection($clientSocket);
158
159
            $id = spl_object_hash($connection);
160
161
            $this->clientSockets[$id] = $clientSocket;
162
            $this->connections[$id]   = $connection;
163
        }
164
    }
165
166
    private function removeClosedConnections(): void
167
    {
168
        foreach ($this->connections as $id => $connection) {
169
            if ($connection->isClosed()) {
170
                unset($this->clientSockets[$id]);
171
                unset($this->connections[$id]);
172
            }
173
        }
174
    }
175
}
176