Completed
Push — master ( 6ae2ec...32167a )
by Arthur
01:15
created

WebSocketServer::acceptNewClient()   B

Complexity

Conditions 7
Paths 12

Size

Total Lines 36

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 36
rs 8.4106
c 0
b 0
f 0
cc 7
nc 12
nop 2
1
<?php
2
3
namespace WSSC;
4
5
use WSSC\Components\Connection;
6
use WSSC\Components\OriginComponent;
7
use WSSC\Components\ServerConfig;
8
use WSSC\Components\WssMain;
9
use WSSC\Contracts\CommonsContract;
10
use WSSC\Contracts\WebSocket;
11
use WSSC\Contracts\WebSocketServerContract;
12
use WSSC\Exceptions\ConnectionException;
13
use WSSC\Exceptions\WebSocketException;
14
15
/**
16
 * Class WebSocketServer
17
 * @package WSSC
18
 */
19
class WebSocketServer extends WssMain implements WebSocketServerContract
20
{
21
    private const MAX_BYTES_READ = 8192;
22
    private const HEADER_BYTES_READ = 1024;
23
24
    /**
25
     * @var ServerConfig
26
     */
27
    protected ServerConfig $config;
0 ignored issues
show
Bug introduced by
This code did not parse for me. Apparently, there is an error somewhere around this line:

Syntax error, unexpected T_STRING, expecting T_FUNCTION or T_CONST
Loading history...
28
29
    /**
30
     * @var array
31
     */
32
    private array $clients = [];
33
34
    /**
35
     * set any template You need ex.: GET /subscription/messenger/token
36
     * @var array
37
     */
38
    private array $pathParams = [];
39
40
    /**
41
     * @var array
42
     */
43
    private array $handshakes = [];
44
45
    /**
46
     * @var array
47
     */
48
    private array $headersUpgrade = [];
49
50
    /**
51
     * @var int
52
     */
53
    private int $maxClients = 1;
54
55
    /**
56
     * @var WebSocket
57
     */
58
    private WebSocket $handler;
59
60
    /**
61
     * @var bool
62
     */
63
    private bool $stepRecursion = true;
64
65
    /**
66
     * WebSocketServer constructor.
67
     *
68
     * @param WebSocket $handler
69
     * @param ServerConfig $config
70
     */
71
    public function __construct(
72
        WebSocket $handler,
73
        ServerConfig $config
74
    )
75
    {
76
        ini_set('default_socket_timeout', 5); // this should be >= 5 sec, otherwise there will be broken pipe - tested
77
78
        $this->handler = $handler;
79
        $this->config = $config;
80
        $this->setIsPcntlLoaded(extension_loaded('pcntl'));
81
    }
82
83
    /**
84
     * Runs main process - Anscestor with server socket on TCP
85
     *
86
     * @throws WebSocketException
87
     * @throws ConnectionException
88
     */
89
    public function run(): void
90
    {
91
        $context = stream_context_create();
92
        $errno = null;
93
        $errorMessage = '';
94
95
        if ($this->config->isSsl() === true) {
96
            stream_context_set_option($context, 'ssl', 'allow_self_signed', $this->config->getAllowSelfSigned());
97
            stream_context_set_option($context, 'ssl', 'verify_peer', false);
98
99
            if (is_file($this->config->getLocalCert()) === false || is_file($this->config->getLocalPk()) === false) {
100
                throw new WebSocketException('SSL certificates must be valid pem files', CommonsContract::SERVER_INVALID_STREAM_CONTEXT);
101
            }
102
            $isLocalCertSet = stream_context_set_option($context, 'ssl', 'local_cert', $this->config->getLocalCert());
103
            $isLocalPkSet = stream_context_set_option($context, 'ssl', 'local_pk', $this->config->getLocalPk());
104
105
            if ($isLocalCertSet === false || $isLocalPkSet === false) {
106
                throw new WebSocketException('SSL certificates could not be set correctly', CommonsContract::SERVER_INVALID_STREAM_CONTEXT);
107
            }
108
        }
109
110
        $server = stream_socket_server("tcp://{$this->config->getHost()}:{$this->config->getPort()}", $errno,
111
            $errorMessage, STREAM_SERVER_BIND | STREAM_SERVER_LISTEN, $context);
112
113
        if ($server === false) {
114
            throw new WebSocketException('Could not bind to socket: ' . $errno . ' - ' . $errorMessage . PHP_EOL,
115
                CommonsContract::SERVER_COULD_NOT_BIND_TO_SOCKET);
116
        }
117
118
        @cli_set_process_title($this->config->getProcessName());
119
        $this->eventLoop($server);
120
    }
121
122
    /**
123
     * Recursive event loop that input intu recusion by remainder = 0 - thus when N users,
124
     * and when forks equals true which prevents it from infinite recursive iterations
125
     *
126
     * @param resource $server server connection
127
     * @param bool $fork flag to fork or run event loop
128
     * @throws WebSocketException
129
     * @throws ConnectionException
130
     */
131
    private function eventLoop($server, bool $fork = false): void
132
    {
133
        if ($fork === true && $this->isPcntlLoaded()) {
134
            $pid = pcntl_fork();
135
136
            if ($pid) { // run eventLoop in parent        
137
                @cli_set_process_title($this->config->getProcessName());
138
                $this->eventLoop($server);
139
            }
140
        } else {
141
            $this->looping($server);
142
        }
143
    }
144
145
    /**
146
     * @param resource $server
147
     * @throws WebSocketException
148
     * @throws ConnectionException
149
     */
150
    private function looping($server): void
151
    {
152
        while (true) {
153
            $totalClients = count($this->clients) + 1;
154
155
            // maxClients prevents process fork on count down
156
            if ($totalClients > $this->maxClients) {
157
                $this->maxClients = $totalClients;
158
            }
159
160
            $doFork = $this->config->isForking() === true
161
                && $totalClients !== 0 // avoid 0 process creation
162
                && $this->stepRecursion === true // only once
163
                && $this->maxClients === $totalClients // only if stack grows
164
                && $totalClients % $this->config->getClientsPerFork() === 0; // only when N is there
165
            if ($doFork) {
166
                $this->stepRecursion = false;
167
                $this->eventLoop($server, true);
168
            }
169
            $this->lessConnThanProc($totalClients, $this->maxClients);
170
171
            //prepare readable sockets
172
            $readSocks = $this->clients;
173
            $readSocks[] = $server;
174
            $this->cleanSocketResources($readSocks);
175
176
            //start reading and use a large timeout
177
            if (!stream_select($readSocks, $write, $except, $this->config->getStreamSelectTimeout())) {
178
                throw new WebSocketException('something went wrong while selecting',
179
                    CommonsContract::SERVER_SELECT_ERROR);
180
            }
181
182
            //new client
183
            if (in_array($server, $readSocks, false)) {
184
                $this->acceptNewClient($server, $readSocks);
185
                if ($this->config->isCheckOrigin() && $this->config->isOriginHeader() === false) {
186
                    continue;
187
                }
188
            }
189
190
            //message from existing client
191
            $this->messagesWorker($readSocks);
192
        }
193
    }
194
195
    /**
196
     * @param resource $server
197
     * @param array $readSocks
198
     * @throws ConnectionException
199
     */
200
    private function acceptNewClient($server, array &$readSocks): void
201
    {
202
        $newClient = stream_socket_accept($server, 0); // must be 0 to non-block
203
        if ($newClient) {
204
            if ($this->config->isSsl() === true) {
205
                $isEnabled = stream_socket_enable_crypto($newClient, true, $this->config->getCryptoType());
206
                if ($isEnabled === false) { // couldn't enable crypto - let's try one more time
207
                    return;
208
                }
209
            }
210
211
            // important to read from headers here coz later client will change and there will be only msgs on pipe
212
            $headers = fread($newClient, self::HEADER_BYTES_READ);
213
            if ($this->config->isCheckOrigin()) {
214
                $hasOrigin = (new OriginComponent($this->config, $newClient))->checkOrigin($headers);
215
                $this->config->setOriginHeader($hasOrigin);
216
                if ($hasOrigin === false) {
217
                    return;
218
                }
219
            }
220
221
            if (empty($this->handler->pathParams[0]) === false) {
222
                $this->setPathParams($headers);
223
            }
224
225
            $this->clients[] = $newClient;
226
            $this->stepRecursion = true; // set on new client - remainder % is always 0
227
228
            // trigger OPEN event
229
            $this->handler->onOpen(new Connection($newClient, $this->clients));
230
            $this->handshake($newClient, $headers);
231
        }
232
233
        //delete the server socket from the read sockets
234
        unset($readSocks[array_search($server, $readSocks, false)]);
235
    }
236
237
    /**
238
     * @param array $readSocks
239
     * @uses onPing
240
     * @uses onPong
241
     * @uses onMessage
242
     */
243
    private function messagesWorker(array $readSocks): void
244
    {
245
        foreach ($readSocks as $kSock => $sock) {
246
            $data = $this->decode(fread($sock, self::MAX_BYTES_READ));
247
            if ($data !== null) {
248
                $dataType = $data['type'];
249
                $dataPayload = $data['payload'];
250
251
                // to manipulate connection through send/close methods via handler, specified in IConnection
252
                $cureentConn = new Connection($sock, $this->clients);
253
                if (empty($data) || $dataType === self::EVENT_TYPE_CLOSE) { // close event triggered from client - browser tab or close socket event
254
                    // trigger CLOSE event
255
                    try {
256
                        $this->handler->onClose($cureentConn);
257
                    } catch (WebSocketException $e) {
258
                        $e->printStack();
259
                    }
260
261
                    // to avoid event leaks
262
                    unset($this->clients[array_search($sock, $this->clients)], $readSocks[$kSock]);
263
                    continue;
264
                }
265
266
                $isSupportedMethod = empty(self::MAP_EVENT_TYPE_TO_METHODS[$dataType]) === false
267
                    && method_exists($this->handler, self::MAP_EVENT_TYPE_TO_METHODS[$dataType]);
268
                if ($isSupportedMethod) {
269
                    try {
270
                        // dynamic call: onMessage, onPing, onPong
271
                        $this->handler->{self::MAP_EVENT_TYPE_TO_METHODS[$dataType]}($cureentConn, $dataPayload);
272
                    } catch (WebSocketException $e) {
273
                        $e->printStack();
274
                    }
275
                }
276
            }
277
        }
278
    }
279
280
    /**
281
     * Handshakes/upgrade and key parse
282
     *
283
     * @param resource $client Source client socket to write
284
     * @param string $headers Headers that client has been sent
285
     * @return string   socket handshake key (Sec-WebSocket-Key)| false on parse error
286
     * @throws ConnectionException
287
     */
288
    private function handshake($client, string $headers): string
289
    {
290
        $match = [];
291
        preg_match(self::SEC_WEBSOCKET_KEY_PTRN, $headers, $match);
292
        if (empty($match[1])) {
293
            return false;
294
        }
295
296
        $key = $match[1];
297
        $this->handshakes[(int)$client] = $key;
298
299
        // sending header according to WebSocket Protocol
300
        $secWebSocketAccept = base64_encode(sha1(trim($key) . self::HEADER_WEBSOCKET_ACCEPT_HASH, true));
301
        $this->setHeadersUpgrade($secWebSocketAccept);
302
        $upgradeHeaders = $this->getHeadersUpgrade();
303
304
        fwrite($client, $upgradeHeaders);
305
306
        return $key;
307
    }
308
309
    /**
310
     * Sets an array of headers needed to upgrade server/client connection
311
     *
312
     * @param string $secWebSocketAccept base64 encoded Sec-WebSocket-Accept header
313
     */
314
    private function setHeadersUpgrade(string $secWebSocketAccept): void
315
    {
316
        $this->headersUpgrade = [
317
            self::HEADERS_UPGRADE_KEY => self::HEADERS_UPGRADE_VALUE,
318
            self::HEADERS_CONNECTION_KEY => self::HEADERS_CONNECTION_VALUE,
319
            self::HEADERS_SEC_WEBSOCKET_ACCEPT_KEY => ' ' . $secWebSocketAccept
320
            // the space before key is really important
321
        ];
322
    }
323
324
    /**
325
     * Retreives headers from an array of headers to upgrade server/client connection
326
     *
327
     * @return string   Headers to Upgrade communication connection
328
     * @throws ConnectionException
329
     */
330
    private function getHeadersUpgrade(): string
331
    {
332
        $handShakeHeaders = self::HEADER_HTTP1_1 . self::HEADERS_EOL;
333
        if (empty($this->headersUpgrade)) {
334
            throw new ConnectionException('Headers for upgrade handshake are not set' . PHP_EOL,
335
                CommonsContract::SERVER_HEADERS_NOT_SET);
336
        }
337
338
        foreach ($this->headersUpgrade as $key => $header) {
339
            $handShakeHeaders .= $key . ':' . $header . self::HEADERS_EOL;
340
            if ($key === self::HEADERS_SEC_WEBSOCKET_ACCEPT_KEY) { // add additional EOL fo Sec-WebSocket-Accept
341
                $handShakeHeaders .= self::HEADERS_EOL;
342
            }
343
        }
344
345
        return $handShakeHeaders;
346
    }
347
348
    /**
349
     * Parses parameters from GET on web-socket client connection before handshake
350
     *
351
     * @param string $headers
352
     */
353
    private function setPathParams(string $headers): void
354
    {
355
        if (empty($this->handler->pathParams) === false) {
356
            $matches = [];
357
            preg_match('/GET\s(.*?)\s/', $headers, $matches);
358
            $left = $matches[1];
359
360
            foreach ($this->handler->pathParams as $k => $param) {
361
                if (empty($this->handler->pathParams[$k + 1]) && strpos($left, '/', 1) === false) {
362
                    // do not eat last char if there is no / at the end
363
                    $this->handler->pathParams[$param] = substr($left, strpos($left, '/') + 1);
364
                } else {
365
                    // eat both slashes
366
                    $this->handler->pathParams[$param] = substr($left, strpos($left, '/') + 1,
367
                        strpos($left, '/', 1) - 1);
368
                }
369
370
                // clear the declaration of parsed param
371
                unset($this->handler->pathParams[array_search($param, $this->handler->pathParams, false)]);
372
                $left = substr($left, strpos($left, '/', 1));
373
            }
374
        }
375
    }
376
}
377