Completed
Push — master ( 239007...7bbda7 )
by Andrew
02:04
created

StreamSocketConnectionPool::selectConnections()   B

Complexity

Conditions 5
Paths 8

Size

Total Lines 26
Code Lines 15

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 26
rs 8.439
cc 5
eloc 15
nc 8
nop 2
1
<?php
2
3
namespace PHPFastCGI\FastCGIDaemon\Driver\Userland\Connection;
4
5
/**
6
 * The default implementation of the ConnectionPoolInterface using stream
7
 * sockets.
8
 */
9
class StreamSocketConnectionPool implements ConnectionPoolInterface
10
{
11
    /**
12
     * @var resource
13
     */
14
    private $serverSocket;
15
16
    /**
17
     * @var resource[]
18
     */
19
    private $clientSockets;
20
21
    /**
22
     * @var Connection[]
23
     */
24
    private $connections;
25
26
    /**
27
     * @var bool
28
     */
29
    private $shutdown;
30
31
    /**
32
     * Constructor.
33
     *
34
     * @param resource $socket The stream socket to accept connections from
35
     */
36
    public function __construct($socket)
37
    {
38
        stream_set_blocking($socket, 0);
39
40
        $this->serverSocket  = $socket;
41
        $this->clientSockets = [];
42
        $this->connections   = [];
43
44
        $this->shutdown = false;
45
    }
46
47
    /**
48
     * {@inheritdoc}
49
     */
50
    public function getReadableConnections($timeout)
51
    {
52
        $this->removeClosedConnections();
53
54
        $readSockets = $this->clientSockets;
55
56
        if (!$this->shutdown) {
57
            $readSockets['pool'] = $this->serverSocket;
58
        }
59
60
        $this->selectConnections($readSockets, $timeout);
61
62
        if (isset($readSockets['pool'])) {
63
            $this->acceptConnection();
64
            unset($readSockets['pool']);
65
        }
66
67
        $readableConnections = [];
68
69
        foreach (array_keys($readSockets) as $id) {
70
            $readableConnections[$id] = $this->connections[$id];
71
        }
72
73
        return $readableConnections;
74
    }
75
76
    /**
77
     * {@inheritdoc}
78
     */
79
    public function count()
80
    {
81
        $this->removeClosedConnections();
82
83
        return count($this->connections);
84
    }
85
86
    /**
87
     * {@inheritdoc}
88
     */
89
    public function shutdown()
90
    {
91
        $this->shutdown = true;
92
    }
93
94
    /**
95
     * {@inheritdoc}
96
     */
97
    public function close()
98
    {
99
        $this->removeClosedConnections();
100
101
        foreach ($this->connections as $id => $connection) {
102
            $connection->close();
103
104
            unset($this->clientSockets[$id]);
105
            unset($this->connections[$id]);
106
        }
107
108
        fclose($this->serverSocket);
109
    }
110
111
    /**
112
     * Uses the stream select function to eliminate all non-readable sockets
113
     * from the read sockets parameter.
114
     *
115
     * @param resource[] $readSockets The sockets to test for readability (output parameter)
116
     * @param int        $timeout     The stream select call timeout
117
     */
118
    private function selectConnections(&$readSockets, $timeout)
119
    {
120
        // stream_select will not always preserve array keys
121
        // call it with a (deep) copy so the original is preserved
122
        $read = [];
123
        foreach ($readSockets as $id => $socket) {
124
            $read[] = $socket;
125
        }
126
        $writeSockets = $exceptSockets = [];
127
128
        if (false === @stream_select($read, $writeSockets, $exceptSockets, $timeout)) {
129
            $error = error_get_last();
130
131
            if (false === stripos($error['message'], 'interrupted system call')) {
132
                throw new \RuntimeException('stream_select failed: '.$error['message']);
133
            }
134
135
            $readSockets = [];
136
        } else {
137
            $res = [];
138
            foreach($read as $socket) {
139
                $res[array_search($socket, $readSockets)] = $socket;
140
            }
141
            $readSockets = $res;
142
        }
143
    }
144
145
    /**
146
     * Accept incoming connections from the server stream socket.
147
     */
148
    private function acceptConnection()
149
    {
150
        $clientSocket = @stream_socket_accept($this->serverSocket);
151
152
        if (false !== $clientSocket) {
153
            stream_set_blocking($clientSocket, 0);
154
155
            $connection = new StreamSocketConnection($clientSocket);
156
157
            $id = spl_object_hash($connection);
158
159
            $this->clientSockets[$id] = $clientSocket;
160
            $this->connections[$id]   = $connection;
161
        }
162
    }
163
164
    /**
165
     * Remove connections.
166
     */
167
    private function removeClosedConnections()
168
    {
169
        foreach ($this->connections as $id => $connection) {
170
            if ($connection->isClosed()) {
171
                unset($this->clientSockets[$id]);
172
                unset($this->connections[$id]);
173
            }
174
        }
175
    }
176
}
177