Passed
Pull Request — master (#80)
by Lito
01:37
created

WebSocketServer::__construct()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 10
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 4
Bugs 1 Features 0
Metric Value
cc 1
eloc 4
nc 1
nop 2
dl 0
loc 10
rs 10
c 4
b 1
f 0
1
<?php
2
3
namespace WSSC;
4
5
use WSSC\Components\Connection;
6
use WSSC\Components\OriginComponent;
7
use WSSC\Components\ServerConfig;
8
use WSSC\Components\WssMain;
9
use WSSC\Contracts\CommonsContract;
10
use WSSC\Contracts\WebSocket;
11
use WSSC\Contracts\WebSocketServerContract;
12
use WSSC\Exceptions\ConnectionException;
13
use WSSC\Exceptions\WebSocketException;
14
15
/**
16
 * Class WebSocketServer
17
 * @package WSSC
18
 */
19
class WebSocketServer extends WssMain implements WebSocketServerContract
20
{
21
    private const MAX_BYTES_READ = 8192;
22
    private const HEADER_BYTES_READ = 1024;
23
24
    /**
25
     * @var ServerConfig
26
     */
27
    protected ServerConfig $config;
28
29
    /**
30
     * @var array
31
     */
32
    private array $clients = [];
33
34
    /**
35
     * @var array
36
     */
37
    private array $headersUpgrade = [];
38
39
    /**
40
     * @var int
41
     */
42
    private int $maxClients = 1;
43
44
    /**
45
     * @var WebSocket
46
     */
47
    private WebSocket $handler;
48
49
    /**
50
     * @var bool
51
     */
52
    private bool $stepRecursion = true;
53
54
    /**
55
     * @var int
56
     */
57
    private int $loopingDelay = 1000;
58
59
     * @var bool
0 ignored issues
show
Bug introduced by
A parse error occurred: Syntax error, unexpected '*', expecting T_FUNCTION or T_CONST on line 59 at column 5
Loading history...
60
     */
61
    private bool $printException = true;
62
63
    /**
64
     * WebSocketServer constructor.
65
     *
66
     * @param WebSocket $handler
67
     * @param ServerConfig $config
68
     */
69
    public function __construct(
70
        WebSocket $handler,
71
        ServerConfig $config
72
    )
73
    {
74
        ini_set('default_socket_timeout', 5); // this should be >= 5 sec, otherwise there will be broken pipe - tested
75
76
        $this->handler = $handler;
77
        $this->config = $config;
78
        $this->setIsPcntlLoaded(extension_loaded('pcntl'));
79
    }
80
81
    /**
82
     * Set the looping sleep in microseconds
83
     *
84
     * @return self
85
     */
86
    public function loopingDelay(int $loopingDelay): self
87
    {
88
        $this->loopingDelay = $loopingDelay;
89
90
        return $this;
91
    }
92
93
    /**
94
     * Configure if error exceptions should be printed
95
     *
96
     * @return self
97
     */
98
    public function printException(bool $printException): self
99
    {
100
        $this->printException = $printException;
101
102
        return $this;
103
    }
104
105
    /**
106
     * Runs main process - Anscestor with server socket on TCP
107
     *
108
     * @throws WebSocketException
109
     * @throws ConnectionException
110
     */
111
    public function run(): void
112
    {
113
        $context = stream_context_create();
114
        $errno = null;
115
        $errorMessage = '';
116
117
        if ($this->config->isSsl() === true) {
118
            stream_context_set_option($context, 'ssl', 'allow_self_signed', $this->config->getAllowSelfSigned());
119
            stream_context_set_option($context, 'ssl', 'verify_peer', false);
120
121
            if (is_file($this->config->getLocalCert()) === false || is_file($this->config->getLocalPk()) === false) {
122
                throw new WebSocketException('SSL certificates must be valid pem files', CommonsContract::SERVER_INVALID_STREAM_CONTEXT);
123
            }
124
            $isLocalCertSet = stream_context_set_option($context, 'ssl', 'local_cert', $this->config->getLocalCert());
125
            $isLocalPkSet = stream_context_set_option($context, 'ssl', 'local_pk', $this->config->getLocalPk());
126
127
            if ($isLocalCertSet === false || $isLocalPkSet === false) {
128
                throw new WebSocketException('SSL certificates could not be set correctly', CommonsContract::SERVER_INVALID_STREAM_CONTEXT);
129
            }
130
        }
131
132
        $server = stream_socket_server("tcp://{$this->config->getHost()}:{$this->config->getPort()}", $errno,
133
            $errorMessage, STREAM_SERVER_BIND | STREAM_SERVER_LISTEN, $context);
134
135
        if ($server === false) {
136
            throw new WebSocketException('Could not bind to socket: ' . $errno . ' - ' . $errorMessage . PHP_EOL,
137
                CommonsContract::SERVER_COULD_NOT_BIND_TO_SOCKET);
138
        }
139
140
        @cli_set_process_title($this->config->getProcessName());
141
        $this->eventLoop($server);
142
    }
143
144
    /**
145
     * Recursive event loop that input intu recusion by remainder = 0 - thus when N users,
146
     * and when forks equals true which prevents it from infinite recursive iterations
147
     *
148
     * @param resource $server server connection
149
     * @param bool $fork flag to fork or run event loop
150
     * @throws WebSocketException
151
     * @throws ConnectionException
152
     */
153
    private function eventLoop($server, bool $fork = false): void
154
    {
155
        if ($fork === true && $this->isPcntlLoaded()) {
156
            $pid = pcntl_fork();
157
158
            if ($pid) { // run eventLoop in parent
159
                @cli_set_process_title($this->config->getProcessName());
160
                $this->eventLoop($server);
161
            }
162
        } else {
163
            $this->looping($server);
164
        }
165
    }
166
167
    /**
168
     * @param resource $server
169
     * @throws WebSocketException
170
     * @throws ConnectionException
171
     */
172
    private function looping($server): void
173
    {
174
        while (true) {
175
            usleep($this->loopingDelay);
176
177
            $totalClients = count($this->clients) + 1;
178
179
            // maxClients prevents process fork on count down
180
            if ($totalClients > $this->maxClients) {
181
                $this->maxClients = $totalClients;
182
            }
183
184
            $doFork = $this->config->isForking() === true
185
                && $totalClients !== 0 // avoid 0 process creation
186
                && $this->stepRecursion === true // only once
187
                && $this->maxClients === $totalClients // only if stack grows
188
                && $totalClients % $this->config->getClientsPerFork() === 0; // only when N is there
189
            if ($doFork) {
190
                $this->stepRecursion = false;
191
                $this->eventLoop($server, true);
192
            }
193
            $this->lessConnThanProc($totalClients, $this->maxClients);
194
195
            //prepare readable sockets
196
            $readSocks = $this->clients;
197
            $readSocks[] = $server;
198
            $this->cleanSocketResources($readSocks);
199
200
            //start reading and use a large timeout
201
            if (!stream_select($readSocks, $write, $except, $this->config->getStreamSelectTimeout())) {
202
                throw new WebSocketException('something went wrong while selecting',
203
                    CommonsContract::SERVER_SELECT_ERROR);
204
            }
205
206
            //new client
207
            if (in_array($server, $readSocks, false)) {
208
                $this->acceptNewClient($server, $readSocks);
209
                if ($this->config->isCheckOrigin() && $this->config->isOriginHeader() === false) {
210
                    continue;
211
                }
212
            }
213
214
            //message from existing client
215
            $this->messagesWorker($readSocks);
216
        }
217
    }
218
219
    /**
220
     * @param resource $server
221
     * @param array $readSocks
222
     * @throws ConnectionException
223
     */
224
    private function acceptNewClient($server, array &$readSocks): void
225
    {
226
        $newClient = stream_socket_accept($server, -1); // must be 0 to non-block
227
        if ($newClient) {
228
            if ($this->config->isSsl() === true) {
229
                $isEnabled = stream_socket_enable_crypto($newClient, true, $this->config->getCryptoType());
230
                if ($isEnabled === false) { // couldn't enable crypto - let's try one more time
231
                    return;
232
                }
233
            }
234
235
            // important to read from headers here coz later client will change and there will be only msgs on pipe
236
            $headers = fread($newClient, self::HEADER_BYTES_READ);
237
            if ($this->config->isCheckOrigin()) {
238
                $hasOrigin = (new OriginComponent($this->config, $newClient))->checkOrigin($headers);
239
                $this->config->setOriginHeader($hasOrigin);
240
                if ($hasOrigin === false) {
241
                    return;
242
                }
243
            }
244
245
            if (empty($this->handler->pathParams[0]) === false) {
246
                $this->setPathParams($headers);
247
            }
248
249
            $this->clients[] = $newClient;
250
            $this->stepRecursion = true; // set on new client - remainder % is always 0
251
252
            // trigger OPEN event
253
            $this->handler->onOpen(new Connection($newClient, $this->clients));
254
            $this->handshake($newClient, $headers);
255
        }
256
257
        //delete the server socket from the read sockets
258
        unset($readSocks[array_search($server, $readSocks, false)]);
259
    }
260
261
    /**
262
     * @param array $readSocks
263
     * @uses onPing
264
     * @uses onPong
265
     * @uses onMessage
266
     */
267
    private function messagesWorker(array $readSocks): void
268
    {
269
        foreach ($readSocks as $kSock => $sock) {
270
            $data = $this->decode(fread($sock, self::MAX_BYTES_READ));
271
            if ($data !== null) {
272
                $dataType = null;
273
                $dataPayload = null;
274
                if ($data !== false) { // payload is too large - waiting for remained data
275
                    $dataType = $data['type'];
276
                    $dataPayload = $data['payload'];
277
                }
278
279
                // to manipulate connection through send/close methods via handler, specified in IConnection
280
                $cureentConn = new Connection($sock, $this->clients);
281
                if (empty($data) || $dataType === self::EVENT_TYPE_CLOSE) { // close event triggered from client - browser tab or close socket event
282
                    // trigger CLOSE event
283
                    try {
284
                        $this->handler->onClose($cureentConn);
285
                    } catch (WebSocketException $e) {
286
                        $this->handleMessagesWorkerException($cureentConn, $e);
287
                    }
288
289
                    // to avoid event leaks
290
                    unset($this->clients[array_search($sock, $this->clients)], $readSocks[$kSock]);
291
                    continue;
292
                }
293
294
                $isSupportedMethod = empty(self::MAP_EVENT_TYPE_TO_METHODS[$dataType]) === false
295
                    && method_exists($this->handler, self::MAP_EVENT_TYPE_TO_METHODS[$dataType]);
296
                if ($isSupportedMethod) {
297
                    try {
298
                        // dynamic call: onMessage, onPing, onPong
299
                        $this->handler->{self::MAP_EVENT_TYPE_TO_METHODS[$dataType]}($cureentConn, $dataPayload);
300
                    } catch (WebSocketException $e) {
301
                        $this->handleMessagesWorkerException($cureentConn, $e);
302
                    }
303
                }
304
            }
305
        }
306
    }
307
308
    /**
309
     * Handshakes/upgrade and key parse
310
     *
311
     * @param resource $client Source client socket to write
312
     * @param string $headers Headers that client has been sent
313
     * @return string   socket handshake key (Sec-WebSocket-Key)| false on parse error
314
     * @throws ConnectionException
315
     */
316
    private function handshake($client, string $headers): string
317
    {
318
        $match = [];
319
        preg_match(self::SEC_WEBSOCKET_KEY_PTRN, $headers, $match);
320
        if (empty($match[1])) {
321
            return '';
322
        }
323
324
        $key = $match[1];
325
        // sending header according to WebSocket Protocol
326
        $secWebSocketAccept = base64_encode(sha1(trim($key) . self::HEADER_WEBSOCKET_ACCEPT_HASH, true));
327
        $this->setHeadersUpgrade($secWebSocketAccept);
328
        $upgradeHeaders = $this->getHeadersUpgrade();
329
330
        fwrite($client, $upgradeHeaders);
331
332
        return $key;
333
    }
334
335
    /**
336
     * Sets an array of headers needed to upgrade server/client connection
337
     *
338
     * @param string $secWebSocketAccept base64 encoded Sec-WebSocket-Accept header
339
     */
340
    private function setHeadersUpgrade(string $secWebSocketAccept): void
341
    {
342
        $this->headersUpgrade = [
343
            self::HEADERS_UPGRADE_KEY => self::HEADERS_UPGRADE_VALUE,
344
            self::HEADERS_CONNECTION_KEY => self::HEADERS_CONNECTION_VALUE,
345
            self::HEADERS_SEC_WEBSOCKET_ACCEPT_KEY => ' ' . $secWebSocketAccept
346
            // the space before key is really important
347
        ];
348
    }
349
350
    /**
351
     * Retreives headers from an array of headers to upgrade server/client connection
352
     *
353
     * @return string   Headers to Upgrade communication connection
354
     * @throws ConnectionException
355
     */
356
    private function getHeadersUpgrade(): string
357
    {
358
        $handShakeHeaders = self::HEADER_HTTP1_1 . self::HEADERS_EOL;
359
        if (empty($this->headersUpgrade)) {
360
            throw new ConnectionException('Headers for upgrade handshake are not set' . PHP_EOL,
361
                CommonsContract::SERVER_HEADERS_NOT_SET);
362
        }
363
364
        foreach ($this->headersUpgrade as $key => $header) {
365
            $handShakeHeaders .= $key . ':' . $header . self::HEADERS_EOL;
366
            if ($key === self::HEADERS_SEC_WEBSOCKET_ACCEPT_KEY) { // add additional EOL fo Sec-WebSocket-Accept
367
                $handShakeHeaders .= self::HEADERS_EOL;
368
            }
369
        }
370
371
        return $handShakeHeaders;
372
    }
373
374
    /**
375
     * Parses parameters from GET on web-socket client connection before handshake
376
     *
377
     * @param string $headers
378
     */
379
    private function setPathParams(string $headers): void
380
    {
381
        if (empty($this->handler->pathParams) === false) {
382
            $matches = [];
383
            preg_match('/GET\s(.*?)\s/', $headers, $matches);
384
            $left = $matches[1];
385
386
            foreach ($this->handler->pathParams as $k => $param) {
387
                if (empty($this->handler->pathParams[$k + 1]) && strpos($left, '/', 1) === false) {
388
                    // do not eat last char if there is no / at the end
389
                    $this->handler->pathParams[$param] = substr($left, strpos($left, '/') + 1);
390
                } else {
391
                    // eat both slashes
392
                    $this->handler->pathParams[$param] = substr($left, strpos($left, '/') + 1,
393
                        strpos($left, '/', 1) - 1);
394
                }
395
396
                // clear the declaration of parsed param
397
                unset($this->handler->pathParams[array_search($param, $this->handler->pathParams, false)]);
398
                $left = substr($left, strpos($left, '/', 1));
399
            }
400
        }
401
    }
402
403
    /**
404
     * Manage messagesWorker Exceptions
405
     *
406
     * @param Connection $connection
407
     * @param WebSocketException $e
408
     */
409
    private function handleMessagesWorkerException(Connection $connection, WebSocketException $e): void
410
    {
411
        $this->handler->onError($connection, $e);
412
413
        if ($this->printException) {
414
            $e->printStack();
415
        }
416
    }
417
}
418