Completed
Push — master ( 0b32dd...f0bf18 )
by Arthur
01:30
created

WebSocketServer::looping()   C

Complexity

Conditions 12
Paths 81

Size

Total Lines 44

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 44
rs 6.9666
c 0
b 0
f 0
cc 12
nc 81
nop 1

How to fix   Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
namespace WSSC;
4
5
use WSSC\Components\Connection;
6
use WSSC\Components\OriginComponent;
7
use WSSC\Components\ServerConfig;
8
use WSSC\Components\WssMain;
9
use WSSC\Contracts\CommonsContract;
10
use WSSC\Contracts\WebSocket;
11
use WSSC\Contracts\WebSocketServerContract;
12
use WSSC\Exceptions\ConnectionException;
13
use WSSC\Exceptions\WebSocketException;
14
15
/**
16
 * Create by Arthur Kushman
17
 *
18
 * @property ServerConfig config
19
 * @property WebSocket handler
20
 */
21
class WebSocketServer extends WssMain implements WebSocketServerContract
22
{
23
    protected $config;
24
25
    private $clients = [];
26
    // set any template You need ex.: GET /subscription/messenger/token
27
    private $pathParams = [];
28
    private $handshakes = [];
29
    private $headersUpgrade = [];
30
    private $totalClients = 0;
31
    private $maxClients = 1;
32
    private $handler;
33
    private $cureentConn;
34
35
    // for the very 1st time must be true
36
    private $stepRecursion = true;
37
38
    private const MAX_BYTES_READ = 8192;
39
    private const HEADER_BYTES_READ = 1024;
40
41
    // stream non-blocking
42
    public const NON_BLOCK = 0;
43
44
    /**
45
     * WebSocketServer constructor.
46
     *
47
     * @param WebSocket $handler
48
     * @param ServerConfig $config
49
     */
50
    public function __construct(
51
        WebSocket $handler,
52
        ServerConfig $config
53
    )
54
    {
55
        ini_set('default_socket_timeout', 5); // this should be >= 5 sec, otherwise there will be broken pipe - tested
56
57
        $this->handler = $handler;
58
        $this->config = $config;
59
        $this->setIsPcntlLoaded(extension_loaded('pcntl'));
60
    }
61
62
    /**
63
     * Runs main process - Anscestor with server socket on TCP
64
     *
65
     * @throws WebSocketException
66
     * @throws ConnectionException
67
     */
68
    public function run()
69
    {
70
        $errno = null;
71
        $errorMessage = '';
72
73
        $server = stream_socket_server("tcp://{$this->config->getHost()}:{$this->config->getPort()}", $errno,
74
            $errorMessage);
75
76
        if ($server === false) {
77
            throw new WebSocketException('Could not bind to socket: ' . $errno . ' - ' . $errorMessage . PHP_EOL,
78
                CommonsContract::SERVER_COULD_NOT_BIND_TO_SOCKET);
79
        }
80
81
        @cli_set_process_title($this->config->getProcessName());
0 ignored issues
show
Security Best Practice introduced by
It seems like you do not handle an error condition here. This can introduce security issues, and is generally not recommended.

If you suppress an error, we recommend checking for the error condition explicitly:

// For example instead of
@mkdir($dir);

// Better use
if (@mkdir($dir) === false) {
    throw new \RuntimeException('The directory '.$dir.' could not be created.');
}
Loading history...
82
        $this->eventLoop($server);
83
    }
84
85
    /**
86
     * Recursive event loop that input intu recusion by remainder = 0 - thus when N users,
87
     * and when forks equals true which prevents it from infinite recursive iterations
88
     *
89
     * @param resource $server server connection
90
     * @param bool $fork flag to fork or run event loop
91
     * @throws WebSocketException
92
     * @throws ConnectionException
93
     */
94
    private function eventLoop($server, bool $fork = false)
95
    {
96
        if ($fork === true && $this->isPcntlLoaded()) {
97
            $pid = pcntl_fork();
98
99
            if ($pid) { // run eventLoop in parent        
100
                @cli_set_process_title($this->config->getProcessName());
0 ignored issues
show
Security Best Practice introduced by
It seems like you do not handle an error condition here. This can introduce security issues, and is generally not recommended.

If you suppress an error, we recommend checking for the error condition explicitly:

// For example instead of
@mkdir($dir);

// Better use
if (@mkdir($dir) === false) {
    throw new \RuntimeException('The directory '.$dir.' could not be created.');
}
Loading history...
101
                $this->eventLoop($server);
102
            }
103
        } else {
104
            $this->looping($server);
105
        }
106
    }
107
108
    /**
109
     * @param resource $server
110
     * @throws WebSocketException
111
     * @throws ConnectionException
112
     */
113
    private function looping($server)
114
    {
115
        while (true) {
116
            $this->totalClients = count($this->clients) + 1;
117
118
            // maxClients prevents process fork on count down
119
            if ($this->totalClients > $this->maxClients) {
120
                $this->maxClients = $this->totalClients;
121
            }
122
123
            $doFork = $this->config->isForking() === true
124
                && $this->totalClients !== 0 // avoid 0 process creation
125
                && $this->stepRecursion === true // only once
126
                && $this->maxClients === $this->totalClients // only if stack grows
127
                && $this->totalClients % $this->config->getClientsPerFork() === 0; // only when N is there
128
            if ($doFork) {
129
                $this->stepRecursion = false;
130
                $this->eventLoop($server, true);
131
            }
132
            $this->lessConnThanProc($this->totalClients, $this->maxClients);
133
134
            //prepare readable sockets
135
            $readSocks = $this->clients;
136
            $readSocks[] = $server;
137
            $this->cleanSocketResources($readSocks);
138
139
            //start reading and use a large timeout
140
            if (!stream_select($readSocks, $write, $except, $this->config->getStreamSelectTimeout())) {
141
                throw new WebSocketException('something went wrong while selecting',
142
                    CommonsContract::SERVER_SELECT_ERROR);
143
            }
144
145
            //new client
146
            if (in_array($server, $readSocks, false)) {
147
                $this->acceptNewClient($server, $readSocks);
148
                if ($this->config->isCheckOrigin() && $this->config->isOriginHeader() === false) {
149
                    continue;
150
                }
151
            }
152
153
            //message from existing client
154
            $this->messagesWorker($readSocks);
155
        }
156
    }
157
158
    /**
159
     * @param resource $server
160
     * @param array $readSocks
161
     * @throws ConnectionException
162
     */
163
    private function acceptNewClient($server, array &$readSocks)
164
    {
165
        $newClient = stream_socket_accept($server, 0); // must be 0 to non-block
166
        if ($newClient) {
167
            // important to read from headers here coz later client will change and there will be only msgs on pipe
168
            $headers = fread($newClient, self::HEADER_BYTES_READ);
169
            if ($this->config->isCheckOrigin()) {
170
                $hasOrigin = (new OriginComponent($this->config, $newClient))->checkOrigin($headers);
171
                $this->config->setOriginHeader($hasOrigin);
172
                if ($hasOrigin === false) {
173
                    return;
174
                }
175
            }
176
177
            if (empty($this->handler->pathParams[0]) === false) {
178
                $this->setPathParams($headers);
179
            }
180
181
            $this->clients[] = $newClient;
182
            $this->stepRecursion = true; // set on new client - remainder % is always 0
183
184
            // trigger OPEN event
185
            $this->handler->onOpen(new Connection($newClient, $this->clients));
186
            $this->handshake($newClient, $headers);
187
        }
188
189
        //delete the server socket from the read sockets
190
        unset($readSocks[array_search($server, $readSocks, false)]);
191
    }
192
193
    /**
194
     * @param array $readSocks
195
     * @uses onPing
196
     * @uses onPong
197
     * @uses onMessage
198
     */
199
    private function messagesWorker(array $readSocks)
200
    {
201
        foreach ($readSocks as $kSock => $sock) {
202
            $data = $this->decode(fread($sock, self::MAX_BYTES_READ));
203
            if ($data !== null) {
204
                $dataType = $data['type'];
205
                $dataPayload = $data['payload'];
206
207
                // to manipulate connection through send/close methods via handler, specified in IConnection
208
                $this->cureentConn = new Connection($sock, $this->clients);
209
                if (empty($data) || $dataType === self::EVENT_TYPE_CLOSE) { // close event triggered from client - browser tab or close socket event
210
                    // trigger CLOSE event
211
                    try {
212
                        $this->handler->onClose($this->cureentConn);
213
                    } catch (WebSocketException $e) {
214
                        $e->printStack();
215
                    }
216
217
                    // to avoid event leaks
218
                    unset($this->clients[array_search($sock, $this->clients)], $readSocks[$kSock]);
219
                    continue;
220
                }
221
222
                if (method_exists($this->handler, self::MAP_EVENT_TYPE_TO_METHODS[$dataType])) {
223
                    try {
224
                        // dynamic call: onMessage, onPing, onPong
225
                        $this->handler->{self::MAP_EVENT_TYPE_TO_METHODS[$dataType]}($this->cureentConn, $dataPayload);
226
                    } catch (WebSocketException $e) {
227
                        $e->printStack();
228
                    }
229
                }
230
            }
231
        }
232
    }
233
234
    /**
235
     * Handshakes/upgrade and key parse
236
     *
237
     * @param resource $client Source client socket to write
238
     * @param string $headers Headers that client has been sent
239
     * @return string   socket handshake key (Sec-WebSocket-Key)| false on parse error
240
     * @throws ConnectionException
241
     */
242
    private function handshake($client, string $headers): string
243
    {
244
        $match = [];
245
        preg_match(self::SEC_WEBSOCKET_KEY_PTRN, $headers, $match);
246
        if (empty($match[1])) {
247
            return false;
248
        }
249
250
        $key = $match[1];
251
        $this->handshakes[(int)$client] = $key;
252
253
        // sending header according to WebSocket Protocol
254
        $secWebSocketAccept = base64_encode(sha1(trim($key) . self::HEADER_WEBSOCKET_ACCEPT_HASH, true));
255
        $this->setHeadersUpgrade($secWebSocketAccept);
256
        $upgradeHeaders = $this->getHeadersUpgrade();
257
258
        fwrite($client, $upgradeHeaders);
259
260
        return $key;
261
    }
262
263
    /**
264
     * Sets an array of headers needed to upgrade server/client connection
265
     *
266
     * @param string $secWebSocketAccept base64 encoded Sec-WebSocket-Accept header
267
     */
268
    private function setHeadersUpgrade($secWebSocketAccept)
269
    {
270
        $this->headersUpgrade = [
271
            self::HEADERS_UPGRADE_KEY => self::HEADERS_UPGRADE_VALUE,
272
            self::HEADERS_CONNECTION_KEY => self::HEADERS_CONNECTION_VALUE,
273
            self::HEADERS_SEC_WEBSOCKET_ACCEPT_KEY => ' ' . $secWebSocketAccept
274
            // the space before key is really important
275
        ];
276
    }
277
278
    /**
279
     * Retreives headers from an array of headers to upgrade server/client connection
280
     *
281
     * @return string   Headers to Upgrade communication connection
282
     * @throws ConnectionException
283
     */
284
    private function getHeadersUpgrade(): string
285
    {
286
        $handShakeHeaders = self::HEADER_HTTP1_1 . self::HEADERS_EOL;
287
        if (empty($this->headersUpgrade)) {
288
            throw new ConnectionException('Headers for upgrade handshake are not set' . PHP_EOL,
289
                CommonsContract::SERVER_HEADERS_NOT_SET);
290
        }
291
292
        foreach ($this->headersUpgrade as $key => $header) {
293
            $handShakeHeaders .= $key . ':' . $header . self::HEADERS_EOL;
294
            if ($key === self::HEADERS_SEC_WEBSOCKET_ACCEPT_KEY) { // add additional EOL fo Sec-WebSocket-Accept
295
                $handShakeHeaders .= self::HEADERS_EOL;
296
            }
297
        }
298
299
        return $handShakeHeaders;
300
    }
301
302
    /**
303
     * Parses parameters from GET on web-socket client connection before handshake
304
     *
305
     * @param string $headers
306
     */
307
    private function setPathParams(string $headers)
308
    {
309
        if (empty($this->handler->pathParams) === false) {
310
            $matches = [];
311
            preg_match('/GET\s(.*?)\s/', $headers, $matches);
312
            $left = $matches[1];
313
314
            foreach ($this->handler->pathParams as $k => $param) {
315
                if (empty($this->handler->pathParams[$k + 1]) && strpos($left, '/', 1) === false) {
316
                    // do not eat last char if there is no / at the end
317
                    $this->handler->pathParams[$param] = substr($left, strpos($left, '/') + 1);
318
                } else {
319
                    // eat both slashes
320
                    $this->handler->pathParams[$param] = substr($left, strpos($left, '/') + 1,
321
                        strpos($left, '/', 1) - 1);
322
                }
323
324
                // clear the declaration of parsed param
325
                unset($this->handler->pathParams[array_search($param, $this->handler->pathParams, false)]);
326
                $left = substr($left, strpos($left, '/', 1));
327
            }
328
        }
329
    }
330
}
331