Completed
Push — master ( 90e37c...d0ea51 )
by Arthur
01:26
created

WebSocketServer   F

Complexity

Total Complexity 61

Size/Duplication

Total Lines 417
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 5

Importance

Changes 0
Metric Value
wmc 61
lcom 1
cbo 5
dl 0
loc 417
rs 3.52
c 0
b 0
f 0

13 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 10 1
A run() 0 14 2
A eventLoop() 0 13 4
C looping() 0 50 15
A acceptNewClient() 0 22 3
B messagesWorker() 0 32 7
D decode() 0 88 15
A handshake() 0 20 2
A setHeadersUpgrade() 0 9 1
A getHeadersUpgrade() 0 16 4
A setPathParams() 0 23 5
A isPcntlLoaded() 0 4 1
A setIsPcntlLoaded() 0 4 1

How to fix   Complexity   

Complex Class

Complex classes like WebSocketServer often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes. You can also have a look at the cohesion graph to spot any un-connected, or weakly-connected components.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use WebSocketServer, and based on these observations, apply Extract Interface, too.

1
<?php
2
3
namespace WSSC;
4
5
use WSSC\Components\Connection;
6
use WSSC\Components\ServerConfig;
7
use WSSC\Contracts\CommonsContract;
8
use WSSC\Contracts\WebSocket;
9
use WSSC\Contracts\WebSocketServerContract;
10
use WSSC\Exceptions\ConnectionException;
11
use WSSC\Exceptions\WebSocketException;
12
13
/**
14
 * Create by Arthur Kushman
15
 *
16
 * @property ServerConfig config
17
 * @property WebSocket handler
18
 */
19
class WebSocketServer implements WebSocketServerContract, CommonsContract
20
{
21
22
    private $clients = [];
23
    // set any template You need ex.: GET /subscription/messenger/token
24
    private $pathParams = [];
25
    private $config;
26
    private $handshakes = [];
27
    private $headersUpgrade = [];
28
    private $totalClients = 0;
29
    private $maxClients = 1;
30
    private $handler;
31
    private $cureentConn;
32
    private $isPcntlLoaded = false;
33
34
    // for the very 1st time must be true
35
    private $stepRecursion = true;
36
37
    private const MAX_BYTES_READ    = 8192;
38
    private const HEADER_BYTES_READ = 1024;
39
40
    // stream non-blocking
41
    public const NON_BLOCK  = 0;
42
43
    /**
44
     * WebSocketServer constructor.
45
     *
46
     * @param WebSocket $handler
47
     * @param ServerConfig $config
48
     */
49
    public function __construct(
50
        WebSocket $handler,
51
        ServerConfig $config
52
    ) {
53
        ini_set('default_socket_timeout', 5); // this should be >= 5 sec, otherwise there will be broken pipe - tested
54
55
        $this->handler = $handler;
56
        $this->config = $config;
57
        $this->setIsPcntlLoaded(extension_loaded('pcntl'));
58
    }
59
60
    /**
61
     * Runs main process - Anscestor with server socket on TCP
62
     *
63
     * @throws WebSocketException
64
     * @throws ConnectionException
65
     */
66
    public function run()
67
    {
68
        $errno = NULL;
69
        $errorMessage = '';
70
71
        $server = stream_socket_server("tcp://{$this->config->getHost()}:{$this->config->getPort()}", $errno, $errorMessage);
72
73
        if ($server === false) {
74
           throw new WebSocketException('Could not bind to socket: ' . $errno . ' - ' . $errorMessage . PHP_EOL, CommonsContract::SERVER_COULD_NOT_BIND_TO_SOCKET);
75
        }
76
77
        @cli_set_process_title($this->config->getProcessName());
0 ignored issues
show
Security Best Practice introduced by
It seems like you do not handle an error condition here. This can introduce security issues, and is generally not recommended.

If you suppress an error, we recommend checking for the error condition explicitly:

// For example instead of
@mkdir($dir);

// Better use
if (@mkdir($dir) === false) {
    throw new \RuntimeException('The directory '.$dir.' could not be created.');
}
Loading history...
78
        $this->eventLoop($server);
79
    }
80
81
    /**
82
     * Recursive event loop that input intu recusion by remainder = 0 - thus when N users,
83
     * and when forks equals true which prevents it from infinite recursive iterations
84
     *
85
     * @param resource $server server connection
86
     * @param bool $fork       flag to fork or run event loop
87
     * @throws WebSocketException
88
     * @throws ConnectionException
89
     */
90
    private function eventLoop($server, bool $fork = false)
91
    {
92
        if ($fork === true && $this->isPcntlLoaded()) {
93
            $pid = pcntl_fork();
94
95
            if ($pid) { // run eventLoop in parent        
96
                @cli_set_process_title($this->config->getProcessName());
0 ignored issues
show
Security Best Practice introduced by
It seems like you do not handle an error condition here. This can introduce security issues, and is generally not recommended.

If you suppress an error, we recommend checking for the error condition explicitly:

// For example instead of
@mkdir($dir);

// Better use
if (@mkdir($dir) === false) {
    throw new \RuntimeException('The directory '.$dir.' could not be created.');
}
Loading history...
97
                $this->eventLoop($server);
98
            }
99
        } else {
100
            $this->looping($server);
101
        }
102
    }
103
104
    /**
105
     * @param resource $server
106
     * @throws WebSocketException
107
     * @throws ConnectionException
108
     */
109
    private function looping($server)
110
    {
111
        while (true) {
112
            $this->totalClients = count($this->clients) + 1;
113
114
            // maxClients prevents process fork on count down
115
            if ($this->totalClients > $this->maxClients) {
116
                $this->maxClients = $this->totalClients;
117
            }
118
119
            if ($this->config->isForking() === true
120
                && $this->totalClients !== 0 // avoid 0 process creation
121
                && true === $this->stepRecursion // only once
122
                && $this->maxClients === $this->totalClients // only if stack grows
123
                && $this->totalClients % $this->config->getClientsPerFork() === 0 // only when N is there
124
            ) {
125
                $this->stepRecursion = false;
126
                $this->eventLoop($server, true);
127
            }
128
129
            if ($this->totalClients !== 0 && $this->maxClients > $this->totalClients
130
                && $this->totalClients % $this->config->getClientsPerFork() === 0) { // there is less connection for amount of processes at this moment
131
                exit(1);
132
            }
133
134
            //prepare readable sockets
135
            $readSocks = $this->clients;
136
            $readSocks[] = $server;
137
138
            // clear socket resources that were closed, thus avoiding (stream_select(): supplied resource is not a valid stream resource)
139
            foreach ($readSocks as $k => $sock) {
140
                if (!is_resource($sock)) {
141
                    unset($readSocks[$k]);
142
                }
143
            }
144
145
            //start reading and use a large timeout
146
            if (!stream_select($readSocks, $write, $except, $this->config->getStreamSelectTimeout())) {
147
                throw new WebSocketException('something went wrong while selecting', CommonsContract::SERVER_SELECT_ERROR);
148
            }
149
150
            //new client
151
            if (in_array($server, $readSocks, false)) {
152
                $this->acceptNewClient($server, $readSocks);
153
            }
154
155
            //message from existing client
156
            $this->messagesWorker($readSocks);
157
        }
158
    }
159
160
    /**
161
     * @param resource $server
162
     * @param array $readSocks
163
     * @throws ConnectionException
164
     */
165
    private function acceptNewClient($server, array &$readSocks)
166
    {
167
        $newClient = stream_socket_accept($server, 0); // must be 0 to non-block
168
        if ($newClient) {
169
170
            // important to read from headers here coz later client will change and there will be only msgs on pipe
171
            $headers = fread($newClient, self::HEADER_BYTES_READ);
172
            if (empty($this->handler->pathParams[0]) === false) {
173
                $this->setPathParams($headers);
174
            }
175
176
            $this->clients[] = $newClient;
177
            $this->stepRecursion = true; // set on new client - remainder % is always 0
178
179
            // trigger OPEN event
180
            $this->handler->onOpen(new Connection($newClient, $this->clients));
181
            $this->handshake($newClient, $headers);
182
        }
183
184
        //delete the server socket from the read sockets
185
        unset($readSocks[array_search($server, $readSocks, false)]);
186
    }
187
188
    /**
189
     * @uses onMessage
190
     * @uses onPing
191
     * @uses onPong
192
     * @param array $readSocks
193
     */
194
    private function messagesWorker(array $readSocks)
195
    {
196
        foreach ($readSocks as $kSock => $sock) {
197
            $data = $this->decode(fread($sock, self::MAX_BYTES_READ));
198
            $dataType = $data['type'];
199
            $dataPayload = $data['payload'];
200
201
            // to manipulate connection through send/close methods via handler, specified in IConnection
202
            $this->cureentConn = new Connection($sock, $this->clients);
203
            if (empty($data) || $dataType === self::EVENT_TYPE_CLOSE) { // close event triggered from client - browser tab or close socket event
204
                // trigger CLOSE event
205
                try {
206
                    $this->handler->onClose($this->cureentConn);
207
                } catch (WebSocketException $e) {
208
                    $e->printStack();
209
                }
210
211
                // to avoid event leaks
212
                unset($this->clients[array_search($sock, $this->clients)], $readSocks[$kSock]);
213
                continue;
214
            }
215
216
            if (method_exists($this->handler, self::MAP_EVENT_TYPE_TO_METHODS[$dataType])) {
217
                try {
218
                    // dynamic call: onMessage, onPing, onPong
219
                    $this->handler->{self::MAP_EVENT_TYPE_TO_METHODS[$dataType]}($this->cureentConn, $dataPayload);
220
                } catch (WebSocketException $e) {
221
                    $e->printStack();
222
                }
223
            }
224
        }
225
    }
226
227
    /**
228
     * Message frames decoder
229
     *
230
     * @param string $data
231
     * @return mixed null on empty data|false on improper data|array - on success
232
     */
233
    private function decode(string $data)
234
    {
235
        if (empty($data)) {
236
            return NULL; // close has been sent
237
        }
238
239
        $unmaskedPayload = '';
240
        $decodedData = [];
241
242
        // estimate frame type:
243
        $firstByteBinary = sprintf('%08b', ord($data[0]));
244
        $secondByteBinary = sprintf('%08b', ord($data[1]));
245
        $opcode = bindec(substr($firstByteBinary, 4, 4));
246
        $isMasked = $secondByteBinary[0] === '1';
247
        $payloadLength = ord($data[1]) & self::MASK_127;
248
249
        // unmasked frame is received:
250
        if (!$isMasked) {
251
            return ['type' => '', 'payload' => '', 'error' => self::ERR_PROTOCOL];
252
        }
253
254
        switch ($opcode) {
255
            // text frame:
256
            case self::DECODE_TEXT:
257
                $decodedData['type'] = self::EVENT_TYPE_TEXT;
258
                break;
259
            case self::DECODE_BINARY:
260
                $decodedData['type'] = self::EVENT_TYPE_BINARY;
261
                break;
262
            // connection close frame:
263
            case self::DECODE_CLOSE:
264
                $decodedData['type'] = self::EVENT_TYPE_CLOSE;
265
                break;
266
            // ping frame:
267
            case self::DECODE_PING:
268
                $decodedData['type'] = self::EVENT_TYPE_PING;
269
                break;
270
            // pong frame:
271
            case self::DECODE_PONG:
272
                $decodedData['type'] = self::EVENT_TYPE_PONG;
273
                break;
274
            default:
275
                return ['type' => '', 'payload' => '', 'error' => self::ERR_UNKNOWN_OPCODE];
276
        }
277
278
        if ($payloadLength === self::MASK_126) {
279
            $mask = substr($data, 4, 4);
280
            $payloadOffset = self::PAYLOAD_OFFSET_8;
281
            $dataLength = bindec(sprintf('%08b', ord($data[2])) . sprintf('%08b', ord($data[3]))) + $payloadOffset;
282
        } elseif ($payloadLength === self::MASK_127) {
283
            $mask = substr($data, 10, 4);
284
            $payloadOffset = self::PAYLOAD_OFFSET_14;
285
            $tmp = '';
286
            for ($i = 0; $i < 8; $i++) {
287
                $tmp .= sprintf('%08b', ord($data[$i + 2]));
288
            }
289
            $dataLength = bindec($tmp) + $payloadOffset;
290
            unset($tmp);
291
        } else {
292
            $mask = substr($data, 2, 4);
293
            $payloadOffset = self::PAYLOAD_OFFSET_6;
294
            $dataLength = $payloadLength + $payloadOffset;
295
        }
296
297
        /**
298
         * We have to check for large frames here. socket_recv cuts at 1024 bytes
299
         * so if websocket-frame is > 1024 bytes we have to wait until whole
300
         * data is transferd.
301
         */
302
        if (strlen($data) < $dataLength) {
303
            return false;
304
        }
305
306
        if ($isMasked) {
307
            for ($i = $payloadOffset; $i < $dataLength; $i++) {
308
                $j = $i - $payloadOffset;
309
                if (isset($data[$i])) {
310
                    $unmaskedPayload .= $data[$i] ^ $mask[$j % 4];
311
                }
312
            }
313
            $decodedData['payload'] = $unmaskedPayload;
314
        } else {
315
            $payloadOffset -= 4;
316
            $decodedData['payload'] = substr($data, $payloadOffset);
317
        }
318
319
        return $decodedData;
320
    }
321
322
    /**
323
     * Handshakes/upgrade and key parse
324
     *
325
     * @param resource $client Source client socket to write
326
     * @param string $headers  Headers that client has been sent
327
     * @return string   socket handshake key (Sec-WebSocket-Key)| false on parse error
328
     * @throws ConnectionException
329
     */
330
    private function handshake($client, string $headers): string
331
    {
332
        $match = [];
333
        preg_match(self::SEC_WEBSOCKET_KEY_PTRN, $headers, $match);
334
        if (empty($match[1])) {
335
            return false;
336
        }
337
338
        $key = $match[1];
339
        $this->handshakes[(int)$client] = $key;
340
341
        // sending header according to WebSocket Protocol
342
        $secWebSocketAccept = base64_encode(sha1(trim($key) . self::HEADER_WEBSOCKET_ACCEPT_HASH, true));
343
        $this->setHeadersUpgrade($secWebSocketAccept);
344
        $upgradeHeaders = $this->getHeadersUpgrade();
345
346
        fwrite($client, $upgradeHeaders);
347
348
        return $key;
349
    }
350
351
    /**
352
     * Sets an array of headers needed to upgrade server/client connection
353
     *
354
     * @param string $secWebSocketAccept base64 encoded Sec-WebSocket-Accept header
355
     */
356
    private function setHeadersUpgrade($secWebSocketAccept)
357
    {
358
        $this->headersUpgrade = [
359
            self::HEADERS_UPGRADE_KEY              => self::HEADERS_UPGRADE_VALUE,
360
            self::HEADERS_CONNECTION_KEY           => self::HEADERS_CONNECTION_VALUE,
361
            self::HEADERS_SEC_WEBSOCKET_ACCEPT_KEY => ' ' . $secWebSocketAccept
362
            // the space before key is really important
363
        ];
364
    }
365
366
    /**
367
     * Retreives headers from an array of headers to upgrade server/client connection
368
     *
369
     * @return string   Headers to Upgrade communication connection
370
     * @throws ConnectionException
371
     */
372
    private function getHeadersUpgrade(): string
373
    {
374
        $handShakeHeaders = self::HEADER_HTTP1_1 . self::HEADERS_EOL;
375
        if (empty($this->headersUpgrade)) {
376
            throw new ConnectionException('Headers for upgrade handshake are not set' . PHP_EOL, CommonsContract::SERVER_HEADERS_NOT_SET);
377
        }
378
379
        foreach ($this->headersUpgrade as $key => $header) {
380
            $handShakeHeaders .= $key . ':' . $header . self::HEADERS_EOL;
381
            if ($key === self::HEADERS_SEC_WEBSOCKET_ACCEPT_KEY) { // add additional EOL fo Sec-WebSocket-Accept
382
                $handShakeHeaders .= self::HEADERS_EOL;
383
            }
384
        }
385
386
        return $handShakeHeaders;
387
    }
388
389
    /**
390
     * Parses parameters from GET on web-socket client connection before handshake
391
     *
392
     * @param string $headers
393
     */
394
    private function setPathParams(string $headers)
395
    {
396
        if (empty($this->handler->pathParams) === false) {
397
            $matches = [];
398
            preg_match('/GET\s(.*?)\s/', $headers, $matches);
399
            $left = $matches[1];
400
401
            foreach ($this->handler->pathParams as $k => $param) {
402
                if (empty($this->handler->pathParams[$k + 1]) && strpos($left, '/', 1) === false) {
403
                    // do not eat last char if there is no / at the end
404
                    $this->handler->pathParams[$param] = substr($left, strpos($left, '/') + 1);
405
                } else {
406
                    // eat both slashes
407
                    $this->handler->pathParams[$param] = substr($left, strpos($left, '/') + 1,
408
                        strpos($left, '/', 1) - 1);
409
                }
410
411
                // clear the declaration of parsed param
412
                unset($this->handler->pathParams[array_search($param, $this->handler->pathParams, false)]);
413
                $left = substr($left, strpos($left, '/', 1));
414
            }
415
        }
416
    }
417
418
    /**
419
     * Returns true if pcntl ext loaded and false otherwise
420
     * @return bool
421
     */
422
    private function isPcntlLoaded(): bool
423
    {
424
        return $this->isPcntlLoaded;
425
    }
426
427
    /**
428
     * Sets pre-loaded pcntl state
429
     * @param bool $isPcntlLoaded
430
     */
431
    private function setIsPcntlLoaded(bool $isPcntlLoaded): void
432
    {
433
        $this->isPcntlLoaded = $isPcntlLoaded;
434
    }
435
}
436