Completed
Push — master ( cb269f...0b32dd )
by Arthur
02:12
created

WebSocketServer::looping()   F

Complexity

Conditions 17
Paths 261

Size

Total Lines 54

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 54
rs 3.6208
c 0
b 0
f 0
cc 17
nc 261
nop 1

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
namespace WSSC;
4
5
use WSSC\Components\Connection;
6
use WSSC\Components\OriginComponent;
7
use WSSC\Components\ServerConfig;
8
use WSSC\Components\WssMain;
9
use WSSC\Contracts\CommonsContract;
10
use WSSC\Contracts\WebSocket;
11
use WSSC\Contracts\WebSocketServerContract;
12
use WSSC\Exceptions\ConnectionException;
13
use WSSC\Exceptions\WebSocketException;
14
15
/**
16
 * Create by Arthur Kushman
17
 *
18
 * @property ServerConfig config
19
 * @property WebSocket handler
20
 */
21
class WebSocketServer extends WssMain implements WebSocketServerContract
22
{
23
    private $clients = [];
24
    // set any template You need ex.: GET /subscription/messenger/token
25
    private $pathParams = [];
26
    private $config;
27
    private $handshakes = [];
28
    private $headersUpgrade = [];
29
    private $totalClients = 0;
30
    private $maxClients = 1;
31
    private $handler;
32
    private $cureentConn;
33
34
    // for the very 1st time must be true
35
    private $stepRecursion = true;
36
37
    private const MAX_BYTES_READ = 8192;
38
    private const HEADER_BYTES_READ = 1024;
39
40
    // stream non-blocking
41
    public const NON_BLOCK = 0;
42
43
    /**
44
     * WebSocketServer constructor.
45
     *
46
     * @param WebSocket $handler
47
     * @param ServerConfig $config
48
     */
49
    public function __construct(
50
        WebSocket $handler,
51
        ServerConfig $config
52
    )
53
    {
54
        ini_set('default_socket_timeout', 5); // this should be >= 5 sec, otherwise there will be broken pipe - tested
55
56
        $this->handler = $handler;
57
        $this->config = $config;
58
        $this->setIsPcntlLoaded(extension_loaded('pcntl'));
59
    }
60
61
    /**
62
     * Runs main process - Anscestor with server socket on TCP
63
     *
64
     * @throws WebSocketException
65
     * @throws ConnectionException
66
     */
67
    public function run()
68
    {
69
        $errno = null;
70
        $errorMessage = '';
71
72
        $server = stream_socket_server("tcp://{$this->config->getHost()}:{$this->config->getPort()}", $errno,
73
            $errorMessage);
74
75
        if ($server === false) {
76
            throw new WebSocketException('Could not bind to socket: ' . $errno . ' - ' . $errorMessage . PHP_EOL,
77
                CommonsContract::SERVER_COULD_NOT_BIND_TO_SOCKET);
78
        }
79
80
        @cli_set_process_title($this->config->getProcessName());
0 ignored issues
show
Security Best Practice introduced by
It seems like you do not handle an error condition here. This can introduce security issues, and is generally not recommended.

If you suppress an error, we recommend checking for the error condition explicitly:

// For example instead of
@mkdir($dir);

// Better use
if (@mkdir($dir) === false) {
    throw new \RuntimeException('The directory '.$dir.' could not be created.');
}
Loading history...
81
        $this->eventLoop($server);
82
    }
83
84
    /**
85
     * Recursive event loop that input intu recusion by remainder = 0 - thus when N users,
86
     * and when forks equals true which prevents it from infinite recursive iterations
87
     *
88
     * @param resource $server server connection
89
     * @param bool $fork flag to fork or run event loop
90
     * @throws WebSocketException
91
     * @throws ConnectionException
92
     */
93
    private function eventLoop($server, bool $fork = false)
94
    {
95
        if ($fork === true && $this->isPcntlLoaded()) {
96
            $pid = pcntl_fork();
97
98
            if ($pid) { // run eventLoop in parent        
99
                @cli_set_process_title($this->config->getProcessName());
0 ignored issues
show
Security Best Practice introduced by
It seems like you do not handle an error condition here. This can introduce security issues, and is generally not recommended.

If you suppress an error, we recommend checking for the error condition explicitly:

// For example instead of
@mkdir($dir);

// Better use
if (@mkdir($dir) === false) {
    throw new \RuntimeException('The directory '.$dir.' could not be created.');
}
Loading history...
100
                $this->eventLoop($server);
101
            }
102
        } else {
103
            $this->looping($server);
104
        }
105
    }
106
107
    /**
108
     * @param resource $server
109
     * @throws WebSocketException
110
     * @throws ConnectionException
111
     */
112
    private function looping($server)
113
    {
114
        while (true) {
115
            $this->totalClients = count($this->clients) + 1;
116
117
            // maxClients prevents process fork on count down
118
            if ($this->totalClients > $this->maxClients) {
119
                $this->maxClients = $this->totalClients;
120
            }
121
122
            $doFork = $this->config->isForking() === true
123
                && $this->totalClients !== 0 // avoid 0 process creation
124
                && $this->stepRecursion === true // only once
125
                && $this->maxClients === $this->totalClients // only if stack grows
126
                && $this->totalClients % $this->config->getClientsPerFork() === 0; // only when N is there
127
            if ($doFork) {
128
                $this->stepRecursion = false;
129
                $this->eventLoop($server, true);
130
            }
131
132
            if ($this->totalClients !== 0 && $this->maxClients > $this->totalClients
133
                && $this->totalClients % $this->config->getClientsPerFork() === 0) { // there is less connection for amount of processes at this moment
134
                exit(1);
135
            }
136
137
            //prepare readable sockets
138
            $readSocks = $this->clients;
139
            $readSocks[] = $server;
140
141
            // clear socket resources that were closed, thus avoiding (stream_select(): supplied resource is not a valid stream resource)
142
            foreach ($readSocks as $k => $sock) {
143
                if (!is_resource($sock)) {
144
                    unset($readSocks[$k]);
145
                }
146
            }
147
148
            //start reading and use a large timeout
149
            if (!stream_select($readSocks, $write, $except, $this->config->getStreamSelectTimeout())) {
150
                throw new WebSocketException('something went wrong while selecting',
151
                    CommonsContract::SERVER_SELECT_ERROR);
152
            }
153
154
            //new client
155
            if (in_array($server, $readSocks, false)) {
156
                $this->acceptNewClient($server, $readSocks);
157
                if ($this->config->isCheckOrigin() && $this->config->isOriginHeader() === false) {
158
                    continue;
159
                }
160
            }
161
162
            //message from existing client
163
            $this->messagesWorker($readSocks);
164
        }
165
    }
166
167
    /**
168
     * @param resource $server
169
     * @param array $readSocks
170
     * @throws ConnectionException
171
     */
172
    private function acceptNewClient($server, array &$readSocks)
173
    {
174
        $newClient = stream_socket_accept($server, 0); // must be 0 to non-block
175
        if ($newClient) {
176
            // important to read from headers here coz later client will change and there will be only msgs on pipe
177
            $headers = fread($newClient, self::HEADER_BYTES_READ);
178
            if ($this->config->isCheckOrigin()) {
179
                $hasOrigin = (new OriginComponent($this->config, $newClient))->checkOrigin($headers);
180
                $this->config->setOriginHeader($hasOrigin);
181
                if ($hasOrigin === false) {
182
                    return;
183
                }
184
            }
185
186
            if (empty($this->handler->pathParams[0]) === false) {
187
                $this->setPathParams($headers);
188
            }
189
190
            $this->clients[] = $newClient;
191
            $this->stepRecursion = true; // set on new client - remainder % is always 0
192
193
            // trigger OPEN event
194
            $this->handler->onOpen(new Connection($newClient, $this->clients));
195
            $this->handshake($newClient, $headers);
196
        }
197
198
        //delete the server socket from the read sockets
199
        unset($readSocks[array_search($server, $readSocks, false)]);
200
    }
201
202
    /**
203
     * @param array $readSocks
204
     * @uses onPing
205
     * @uses onPong
206
     * @uses onMessage
207
     */
208
    private function messagesWorker(array $readSocks)
209
    {
210
        foreach ($readSocks as $kSock => $sock) {
211
            $data = $this->decode(fread($sock, self::MAX_BYTES_READ));
212
            if ($data !== null) {
213
                $dataType = $data['type'];
214
                $dataPayload = $data['payload'];
215
216
                // to manipulate connection through send/close methods via handler, specified in IConnection
217
                $this->cureentConn = new Connection($sock, $this->clients);
218
                if (empty($data) || $dataType === self::EVENT_TYPE_CLOSE) { // close event triggered from client - browser tab or close socket event
219
                    // trigger CLOSE event
220
                    try {
221
                        $this->handler->onClose($this->cureentConn);
222
                    } catch (WebSocketException $e) {
223
                        $e->printStack();
224
                    }
225
226
                    // to avoid event leaks
227
                    unset($this->clients[array_search($sock, $this->clients)], $readSocks[$kSock]);
228
                    continue;
229
                }
230
231
                if (method_exists($this->handler, self::MAP_EVENT_TYPE_TO_METHODS[$dataType])) {
232
                    try {
233
                        // dynamic call: onMessage, onPing, onPong
234
                        $this->handler->{self::MAP_EVENT_TYPE_TO_METHODS[$dataType]}($this->cureentConn, $dataPayload);
235
                    } catch (WebSocketException $e) {
236
                        $e->printStack();
237
                    }
238
                }
239
            }
240
        }
241
    }
242
243
    /**
244
     * Handshakes/upgrade and key parse
245
     *
246
     * @param resource $client Source client socket to write
247
     * @param string $headers Headers that client has been sent
248
     * @return string   socket handshake key (Sec-WebSocket-Key)| false on parse error
249
     * @throws ConnectionException
250
     */
251
    private function handshake($client, string $headers): string
252
    {
253
        $match = [];
254
        preg_match(self::SEC_WEBSOCKET_KEY_PTRN, $headers, $match);
255
        if (empty($match[1])) {
256
            return false;
257
        }
258
259
        $key = $match[1];
260
        $this->handshakes[(int)$client] = $key;
261
262
        // sending header according to WebSocket Protocol
263
        $secWebSocketAccept = base64_encode(sha1(trim($key) . self::HEADER_WEBSOCKET_ACCEPT_HASH, true));
264
        $this->setHeadersUpgrade($secWebSocketAccept);
265
        $upgradeHeaders = $this->getHeadersUpgrade();
266
267
        fwrite($client, $upgradeHeaders);
268
269
        return $key;
270
    }
271
272
    /**
273
     * Sets an array of headers needed to upgrade server/client connection
274
     *
275
     * @param string $secWebSocketAccept base64 encoded Sec-WebSocket-Accept header
276
     */
277
    private function setHeadersUpgrade($secWebSocketAccept)
278
    {
279
        $this->headersUpgrade = [
280
            self::HEADERS_UPGRADE_KEY => self::HEADERS_UPGRADE_VALUE,
281
            self::HEADERS_CONNECTION_KEY => self::HEADERS_CONNECTION_VALUE,
282
            self::HEADERS_SEC_WEBSOCKET_ACCEPT_KEY => ' ' . $secWebSocketAccept
283
            // the space before key is really important
284
        ];
285
    }
286
287
    /**
288
     * Retreives headers from an array of headers to upgrade server/client connection
289
     *
290
     * @return string   Headers to Upgrade communication connection
291
     * @throws ConnectionException
292
     */
293
    private function getHeadersUpgrade(): string
294
    {
295
        $handShakeHeaders = self::HEADER_HTTP1_1 . self::HEADERS_EOL;
296
        if (empty($this->headersUpgrade)) {
297
            throw new ConnectionException('Headers for upgrade handshake are not set' . PHP_EOL,
298
                CommonsContract::SERVER_HEADERS_NOT_SET);
299
        }
300
301
        foreach ($this->headersUpgrade as $key => $header) {
302
            $handShakeHeaders .= $key . ':' . $header . self::HEADERS_EOL;
303
            if ($key === self::HEADERS_SEC_WEBSOCKET_ACCEPT_KEY) { // add additional EOL fo Sec-WebSocket-Accept
304
                $handShakeHeaders .= self::HEADERS_EOL;
305
            }
306
        }
307
308
        return $handShakeHeaders;
309
    }
310
311
    /**
312
     * Parses parameters from GET on web-socket client connection before handshake
313
     *
314
     * @param string $headers
315
     */
316
    private function setPathParams(string $headers)
317
    {
318
        if (empty($this->handler->pathParams) === false) {
319
            $matches = [];
320
            preg_match('/GET\s(.*?)\s/', $headers, $matches);
321
            $left = $matches[1];
322
323
            foreach ($this->handler->pathParams as $k => $param) {
324
                if (empty($this->handler->pathParams[$k + 1]) && strpos($left, '/', 1) === false) {
325
                    // do not eat last char if there is no / at the end
326
                    $this->handler->pathParams[$param] = substr($left, strpos($left, '/') + 1);
327
                } else {
328
                    // eat both slashes
329
                    $this->handler->pathParams[$param] = substr($left, strpos($left, '/') + 1,
330
                        strpos($left, '/', 1) - 1);
331
                }
332
333
                // clear the declaration of parsed param
334
                unset($this->handler->pathParams[array_search($param, $this->handler->pathParams, false)]);
335
                $left = substr($left, strpos($left, '/', 1));
336
            }
337
        }
338
    }
339
}
340