Completed
Push — master ( 88d0e2...efa721 )
by Arthur
01:47
created

WebSocketServer::acceptNewClient()   A

Complexity

Conditions 3
Paths 3

Size

Total Lines 22

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 22
rs 9.568
c 0
b 0
f 0
cc 3
nc 3
nop 2
1
<?php
2
3
namespace WSSC;
4
5
use WSSC\Components\Connection;
6
use WSSC\Components\ServerConfig;
7
use WSSC\Contracts\CommonsContract;
8
use WSSC\Contracts\WebSocket;
9
use WSSC\Contracts\WebSocketServerContract;
10
use WSSC\Exceptions\WebSocketException;
11
12
/**
13
 * Create by Arthur Kushman
14
 *
15
 * @property ServerConfig config
16
 * @property WebSocket handler
17
 */
18
class WebSocketServer implements WebSocketServerContract, CommonsContract
19
{
20
21
    private $clients = [];
22
    // set any template You need ex.: GET /subscription/messenger/token
23
    private $pathParams = [];
24
    private $config;
25
    private $handshakes = [];
26
    private $headersUpgrade = [];
27
    private $totalClients = 0;
28
    private $maxClients = 1;
29
    private $handler;
30
    private $cureentConn;
31
32
    // for the very 1st time must be true
33
    private $stepRecursion = true;
34
35
    const MAX_BYTES_READ    = 8192;
36
    const HEADER_BYTES_READ = 1024;
37
38
    // stream non-blocking
39
    const NON_BLOCK  = 0;
40
    const PROC_TITLE = 'php-wss';
41
42
    /**
43
     * WebSocketServer constructor.
44
     *
45
     * @param WebSocket $handler
46
     * @param ServerConfig $config
47
     */
48
    public function __construct(
49
        WebSocket $handler,
50
        ServerConfig $config
51
    ) {
52
        ini_set('default_socket_timeout', 5); // this should be >= 5 sec, otherwise there will be broken pipe - tested
53
54
        $this->handler = $handler;
55
        $this->config = $config;
56
    }
57
58
    /**
59
     * Runs main process - Anscestor with server socket on TCP
60
     *
61
     * @throws WebSocketException
62
     */
63
    public function run()
64
    {
65
        $errno = NULL;
66
        $errorMessage = '';
67
68
        $server = stream_socket_server("tcp://{$this->config->getHost()}:{$this->config->getPort()}", $errno, $errorMessage);
69
70
        if ($server === false) {
71
           throw new WebSocketException('Could not bind to socket: ' . $errno . ' - ' . $errorMessage . PHP_EOL);
72
        }
73
74
        @cli_set_process_title(self::PROC_TITLE);
0 ignored issues
show
Security Best Practice introduced by
It seems like you do not handle an error condition here. This can introduce security issues, and is generally not recommended.

If you suppress an error, we recommend checking for the error condition explicitly:

// For example instead of
@mkdir($dir);

// Better use
if (@mkdir($dir) === false) {
    throw new \RuntimeException('The directory '.$dir.' could not be created.');
}
Loading history...
75
        $this->eventLoop($server);
76
    }
77
78
    /**
79
     * Recursive event loop that input intu recusion by remainder = 0 - thus when N users,
80
     * and when forks equals true which prevents it from infinite recursive iterations
81
     *
82
     * @param resource $server server connection
83
     * @param bool $fork       flag to fork or run event loop
84
     */
85
    private function eventLoop($server, bool $fork = false)
86
    {
87
        if ($fork === true) {
88
            $pid = pcntl_fork();
89
90
            if ($pid) { // run eventLoop in parent        
91
                @cli_set_process_title(self::PROC_TITLE);
0 ignored issues
show
Security Best Practice introduced by
It seems like you do not handle an error condition here. This can introduce security issues, and is generally not recommended.

If you suppress an error, we recommend checking for the error condition explicitly:

// For example instead of
@mkdir($dir);

// Better use
if (@mkdir($dir) === false) {
    throw new \RuntimeException('The directory '.$dir.' could not be created.');
}
Loading history...
92
                $this->eventLoop($server);
93
            }
94
        } else {
95
            $this->looping($server);
96
        }
97
    }
98
99
    /**
100
     * @param resource $server
101
     */
102
    private function looping($server)
103
    {
104
        while (true) {
105
            $this->totalClients = count($this->clients) + 1;
106
107
            // maxClients prevents process fork on count down
108
            if ($this->totalClients > $this->maxClients) {
109
                $this->maxClients = $this->totalClients;
110
            }
111
112
            if ($this->config->isForking() === true
113
                && $this->totalClients !== 0 // avoid 0 process creation
114
                && true === $this->stepRecursion // only once
115
                && $this->maxClients === $this->totalClients // only if stack grows
116
                && $this->totalClients % $this->config->getClientsPerFork() === 0 // only when N is there
117
            ) {
118
                $this->stepRecursion = false;
119
                $this->eventLoop($server, true);
120
            }
121
122
            if ($this->totalClients !== 0 && $this->maxClients > $this->totalClients
123
                && $this->totalClients % $this->config->getClientsPerFork() === 0) { // there is less connection for amount of processes at this moment
124
                exit(1);
125
            }
126
127
            //prepare readable sockets
128
            $readSocks = $this->clients;
129
            $readSocks[] = $server;
130
131
            // clear socket resources that were closed, thus avoiding (stream_select(): supplied resource is not a valid stream resource)
132
            foreach ($readSocks as $k => $sock) {
133
                if (!is_resource($sock)) {
134
                    unset($readSocks[$k]);
135
                }
136
            }
137
138
            //start reading and use a large timeout
139
            if (!stream_select($readSocks, $write, $except, $this->config->getStreamSelectTimeout())) {
140
                die('something went wrong while selecting');
141
            }
142
143
            //new client
144
            if (in_array($server, $readSocks, false)) {
145
                $this->acceptNewClient($server, $readSocks);
146
            }
147
148
            //message from existing client
149
            $this->messagesWorker($readSocks);
150
        }
151
    }
152
153
    /**
154
     * @param resource $server
155
     * @param array $readSocks
156
     */
157
    private function acceptNewClient($server, array &$readSocks)
158
    {
159
        $newClient = stream_socket_accept($server, 0); // must be 0 to non-block
160
        if ($newClient) {
161
162
            // important to read from headers here coz later client will change and there will be only msgs on pipe
163
            $headers = fread($newClient, self::HEADER_BYTES_READ);
164
            if (empty($this->handler->pathParams[0]) === false) {
165
                $this->setPathParams($headers);
166
            }
167
168
            $this->clients[] = $newClient;
169
            $this->stepRecursion = true; // set on new client - remainder % is always 0
170
171
            // trigger OPEN event
172
            $this->handler->onOpen(new Connection($newClient, $this->clients));
173
            $this->handshake($newClient, $headers);
174
        }
175
176
        //delete the server socket from the read sockets
177
        unset($readSocks[array_search($server, $readSocks, false)]);
178
    }
179
180
    /**
181
     * @uses onMessage
182
     * @uses onPing
183
     * @uses onPong
184
     * @param array $readSocks
185
     */
186
    private function messagesWorker(array $readSocks)
187
    {
188
        foreach ($readSocks as $kSock => $sock) {
189
            $data = $this->decode(fread($sock, self::MAX_BYTES_READ));
190
            $dataType = $data['type'];
191
            $dataPayload = $data['payload'];
192
193
            // to manipulate connection through send/close methods via handler, specified in IConnection
194
            $this->cureentConn = new Connection($sock, $this->clients);
195
            if (empty($data) || $dataType === self::EVENT_TYPE_CLOSE) { // close event triggered from client - browser tab or close socket event
196
                // trigger CLOSE event
197
                try {
198
                    $this->handler->onClose($this->cureentConn);
199
                } catch (WebSocketException $e) {
200
                    $e->printStack();
201
                }
202
203
                // to avoid event leaks
204
                unset($this->clients[array_search($sock, $this->clients)], $readSocks[$kSock]);
205
                continue;
206
            }
207
208
            if (method_exists($this->handler, self::MAP_EVENT_TYPE_TO_METHODS[$dataType])) {
209
                try {
210
                    // dynamic call: onMessage, onPing, onPong
211
                    $this->handler->{self::MAP_EVENT_TYPE_TO_METHODS[$dataType]}($this->cureentConn, $dataPayload);
212
                } catch (WebSocketException $e) {
213
                    $e->printStack();
214
                }
215
            }
216
        }
217
    }
218
219
    /**
220
     * Message frames decoder
221
     *
222
     * @param string $data
223
     * @return mixed null on empty data|false on improper data|array - on success
224
     */
225
    private function decode(string $data)
226
    {
227
        if (empty($data)) {
228
            return NULL; // close has been sent
229
        }
230
231
        $unmaskedPayload = '';
232
        $decodedData = [];
233
234
        // estimate frame type:
235
        $firstByteBinary = sprintf('%08b', ord($data[0]));
236
        $secondByteBinary = sprintf('%08b', ord($data[1]));
237
        $opcode = bindec(substr($firstByteBinary, 4, 4));
238
        $isMasked = $secondByteBinary[0] === '1';
239
        $payloadLength = ord($data[1]) & self::MASK_127;
240
241
        // unmasked frame is received:
242
        if (!$isMasked) {
243
            return ['type' => '', 'payload' => '', 'error' => self::ERR_PROTOCOL];
244
        }
245
246
        switch ($opcode) {
247
            // text frame:
248
            case self::DECODE_TEXT:
249
                $decodedData['type'] = self::EVENT_TYPE_TEXT;
250
                break;
251
            case self::DECODE_BINARY:
252
                $decodedData['type'] = self::EVENT_TYPE_BINARY;
253
                break;
254
            // connection close frame:
255
            case self::DECODE_CLOSE:
256
                $decodedData['type'] = self::EVENT_TYPE_CLOSE;
257
                break;
258
            // ping frame:
259
            case self::DECODE_PING:
260
                $decodedData['type'] = self::EVENT_TYPE_PING;
261
                break;
262
            // pong frame:
263
            case self::DECODE_PONG:
264
                $decodedData['type'] = self::EVENT_TYPE_PONG;
265
                break;
266
            default:
267
                return ['type' => '', 'payload' => '', 'error' => self::ERR_UNKNOWN_OPCODE];
268
        }
269
270
        if ($payloadLength === self::MASK_126) {
271
            $mask = substr($data, 4, 4);
272
            $payloadOffset = self::PAYLOAD_OFFSET_8;
273
            $dataLength = bindec(sprintf('%08b', ord($data[2])) . sprintf('%08b', ord($data[3]))) + $payloadOffset;
274
        } elseif ($payloadLength === self::MASK_127) {
275
            $mask = substr($data, 10, 4);
276
            $payloadOffset = self::PAYLOAD_OFFSET_14;
277
            $tmp = '';
278
            for ($i = 0; $i < 8; $i++) {
279
                $tmp .= sprintf('%08b', ord($data[$i + 2]));
280
            }
281
            $dataLength = bindec($tmp) + $payloadOffset;
282
            unset($tmp);
283
        } else {
284
            $mask = substr($data, 2, 4);
285
            $payloadOffset = self::PAYLOAD_OFFSET_6;
286
            $dataLength = $payloadLength + $payloadOffset;
287
        }
288
289
        /**
290
         * We have to check for large frames here. socket_recv cuts at 1024 bytes
291
         * so if websocket-frame is > 1024 bytes we have to wait until whole
292
         * data is transferd.
293
         */
294
        if (strlen($data) < $dataLength) {
295
            return false;
296
        }
297
298
        if ($isMasked) {
299
            for ($i = $payloadOffset; $i < $dataLength; $i++) {
300
                $j = $i - $payloadOffset;
301
                if (isset($data[$i])) {
302
                    $unmaskedPayload .= $data[$i] ^ $mask[$j % 4];
303
                }
304
            }
305
            $decodedData['payload'] = $unmaskedPayload;
306
        } else {
307
            $payloadOffset -= 4;
308
            $decodedData['payload'] = substr($data, $payloadOffset);
309
        }
310
311
        return $decodedData;
312
    }
313
314
    /**
315
     * Handshakes/upgrade and key parse
316
     *
317
     * @param resource $client Source client socket to write
318
     * @param string $headers  Headers that client has been sent
319
     * @return string   socket handshake key (Sec-WebSocket-Key)| false on parse error
320
     */
321
    private function handshake($client, string $headers): string
322
    {
323
        $match = [];
324
        preg_match(self::SEC_WEBSOCKET_KEY_PTRN, $headers, $match);
325
        if (empty($match[1])) {
326
            return false;
327
        }
328
329
        $key = $match[1];
330
        $this->handshakes[(int)$client] = $key;
331
332
        // sending header according to WebSocket Protocol
333
        $secWebSocketAccept = base64_encode(sha1(trim($key) . self::HEADER_WEBSOCKET_ACCEPT_HASH, true));
334
        $this->setHeadersUpgrade($secWebSocketAccept);
335
        $upgradeHeaders = $this->getHeadersUpgrade();
336
337
        fwrite($client, $upgradeHeaders);
338
339
        return $key;
340
    }
341
342
    /**
343
     * Sets an array of headers needed to upgrade server/client connection
344
     *
345
     * @param string $secWebSocketAccept base64 encoded Sec-WebSocket-Accept header
346
     */
347
    private function setHeadersUpgrade($secWebSocketAccept)
348
    {
349
        $this->headersUpgrade = [
350
            self::HEADERS_UPGRADE_KEY              => self::HEADERS_UPGRADE_VALUE,
351
            self::HEADERS_CONNECTION_KEY           => self::HEADERS_CONNECTION_VALUE,
352
            self::HEADERS_SEC_WEBSOCKET_ACCEPT_KEY => ' ' . $secWebSocketAccept
353
            // the space before key is really important
354
        ];
355
    }
356
357
    /**
358
     * Retreives headers from an array of headers to upgrade server/client connection
359
     *
360
     * @return string   Headers to Upgrade communication connection
361
     */
362
    private function getHeadersUpgrade(): string
363
    {
364
        $handShakeHeaders = self::HEADER_HTTP1_1 . self::HEADERS_EOL;
365
        if (empty($this->headersUpgrade)) {
366
            die('Headers array is not set' . PHP_EOL);
367
        }
368
369
        foreach ($this->headersUpgrade as $key => $header) {
370
            $handShakeHeaders .= $key . ':' . $header . self::HEADERS_EOL;
371
            if ($key === self::HEADERS_SEC_WEBSOCKET_ACCEPT_KEY) { // add additional EOL fo Sec-WebSocket-Accept
372
                $handShakeHeaders .= self::HEADERS_EOL;
373
            }
374
        }
375
376
        return $handShakeHeaders;
377
    }
378
379
    /**
380
     * Parses parameters from GET on web-socket client connection before handshake
381
     *
382
     * @param string $headers
383
     */
384
    private function setPathParams(string $headers)
385
    {
386
        if (empty($this->handler->pathParams) === false) {
387
            $matches = [];
388
            preg_match('/GET\s(.*?)\s/', $headers, $matches);
389
            $left = $matches[1];
390
391
            foreach ($this->handler->pathParams as $k => $param) {
392
                if (empty($this->handler->pathParams[$k + 1]) && strpos($left, '/', 1) === false) {
393
                    // do not eat last char if there is no / at the end
394
                    $this->handler->pathParams[$param] = substr($left, strpos($left, '/') + 1);
395
                } else {
396
                    // eat both slashes
397
                    $this->handler->pathParams[$param] = substr($left, strpos($left, '/') + 1,
398
                        strpos($left, '/', 1) - 1);
399
                }
400
401
                // clear the declaration of parsed param
402
                unset($this->handler->pathParams[array_search($param, $this->handler->pathParams, false)]);
403
                $left = substr($left, strpos($left, '/', 1));
404
            }
405
        }
406
    }
407
}
408