Completed
Push — master ( 0f7687...6db947 )
by Arthur
01:19
created

WebSocketServer::run()   B

Complexity

Conditions 7
Paths 6

Size

Total Lines 32

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 32
rs 8.4746
c 0
b 0
f 0
cc 7
nc 6
nop 0
1
<?php
2
3
namespace WSSC;
4
5
use WSSC\Components\Connection;
6
use WSSC\Components\OriginComponent;
7
use WSSC\Components\ServerConfig;
8
use WSSC\Components\WssMain;
9
use WSSC\Contracts\CommonsContract;
10
use WSSC\Contracts\WebSocket;
11
use WSSC\Contracts\WebSocketServerContract;
12
use WSSC\Exceptions\ConnectionException;
13
use WSSC\Exceptions\WebSocketException;
14
15
/**
16
 * Create by Arthur Kushman
17
 *
18
 * @property ServerConfig config
19
 * @property WebSocket handler
20
 */
21
class WebSocketServer extends WssMain implements WebSocketServerContract
22
{
23
    protected $config;
24
25
    private $clients = [];
26
    // set any template You need ex.: GET /subscription/messenger/token
27
    private $pathParams = [];
28
    private $handshakes = [];
29
    private $headersUpgrade = [];
30
    private $totalClients = 0;
31
    private $maxClients = 1;
32
    private $handler;
33
    private $cureentConn;
34
35
    // for the very 1st time must be true
36
    private $stepRecursion = true;
37
38
    private const MAX_BYTES_READ = 8192;
39
    private const HEADER_BYTES_READ = 1024;
40
41
    /**
42
     * WebSocketServer constructor.
43
     *
44
     * @param WebSocket $handler
45
     * @param ServerConfig $config
46
     */
47
    public function __construct(
48
        WebSocket $handler,
49
        ServerConfig $config
50
    )
51
    {
52
        ini_set('default_socket_timeout', 5); // this should be >= 5 sec, otherwise there will be broken pipe - tested
53
54
        $this->handler = $handler;
55
        $this->config = $config;
56
        $this->setIsPcntlLoaded(extension_loaded('pcntl'));
57
    }
58
59
    /**
60
     * Runs main process - Anscestor with server socket on TCP
61
     *
62
     * @throws WebSocketException
63
     * @throws ConnectionException
64
     */
65
    public function run()
66
    {
67
        $context = stream_context_create();
68
        $errno = null;
69
        $errorMessage = '';
70
71
        if ($this->config->isSsl() === true) {
72
            stream_context_set_option($context, 'ssl', 'allow_self_signed', $this->config->getAllowSelfSigned());
73
            stream_context_set_option($context, 'ssl', 'verify_peer', false);
74
75
            if (is_file($this->config->getLocalCert()) === false || is_file($this->config->getLocalPk()) === false) {
76
                throw new WebSocketException('SSL certificates must be valid pem files', CommonsContract::SERVER_INVALID_STREAM_CONTEXT);
77
            }
78
            $isLocalCertSet = stream_context_set_option($context, 'ssl', 'local_cert', $this->config->getLocalCert());
79
            $isLocalPkSet = stream_context_set_option($context, 'ssl', 'local_pk', $this->config->getLocalPk());
80
81
            if ($isLocalCertSet === false || $isLocalPkSet === false) {
82
                throw new WebSocketException('SSL certificates could not be set correctly', CommonsContract::SERVER_INVALID_STREAM_CONTEXT);
83
            }
84
        }
85
86
        $server = stream_socket_server("tcp://{$this->config->getHost()}:{$this->config->getPort()}", $errno,
87
            $errorMessage, STREAM_SERVER_BIND | STREAM_SERVER_LISTEN, $context);
88
89
        if ($server === false) {
90
            throw new WebSocketException('Could not bind to socket: ' . $errno . ' - ' . $errorMessage . PHP_EOL,
91
                CommonsContract::SERVER_COULD_NOT_BIND_TO_SOCKET);
92
        }
93
94
        @cli_set_process_title($this->config->getProcessName());
0 ignored issues
show
Security Best Practice introduced by
It seems like you do not handle an error condition here. This can introduce security issues, and is generally not recommended.

If you suppress an error, we recommend checking for the error condition explicitly:

// For example instead of
@mkdir($dir);

// Better use
if (@mkdir($dir) === false) {
    throw new \RuntimeException('The directory '.$dir.' could not be created.');
}
Loading history...
95
        $this->eventLoop($server);
96
    }
97
98
    /**
99
     * Recursive event loop that input intu recusion by remainder = 0 - thus when N users,
100
     * and when forks equals true which prevents it from infinite recursive iterations
101
     *
102
     * @param resource $server server connection
103
     * @param bool $fork flag to fork or run event loop
104
     * @throws WebSocketException
105
     * @throws ConnectionException
106
     */
107
    private function eventLoop($server, bool $fork = false)
108
    {
109
        if ($fork === true && $this->isPcntlLoaded()) {
110
            $pid = pcntl_fork();
111
112
            if ($pid) { // run eventLoop in parent        
113
                @cli_set_process_title($this->config->getProcessName());
0 ignored issues
show
Security Best Practice introduced by
It seems like you do not handle an error condition here. This can introduce security issues, and is generally not recommended.

If you suppress an error, we recommend checking for the error condition explicitly:

// For example instead of
@mkdir($dir);

// Better use
if (@mkdir($dir) === false) {
    throw new \RuntimeException('The directory '.$dir.' could not be created.');
}
Loading history...
114
                $this->eventLoop($server);
115
            }
116
        } else {
117
            $this->looping($server);
118
        }
119
    }
120
121
    /**
122
     * @param resource $server
123
     * @throws WebSocketException
124
     * @throws ConnectionException
125
     */
126
    private function looping($server): void
127
    {
128
        while (true) {
129
            $this->totalClients = count($this->clients) + 1;
130
131
            // maxClients prevents process fork on count down
132
            if ($this->totalClients > $this->maxClients) {
133
                $this->maxClients = $this->totalClients;
134
            }
135
136
            $doFork = $this->config->isForking() === true
137
                && $this->totalClients !== 0 // avoid 0 process creation
138
                && $this->stepRecursion === true // only once
139
                && $this->maxClients === $this->totalClients // only if stack grows
140
                && $this->totalClients % $this->config->getClientsPerFork() === 0; // only when N is there
141
            if ($doFork) {
142
                $this->stepRecursion = false;
143
                $this->eventLoop($server, true);
144
            }
145
            $this->lessConnThanProc($this->totalClients, $this->maxClients);
146
147
            //prepare readable sockets
148
            $readSocks = $this->clients;
149
            $readSocks[] = $server;
150
            $this->cleanSocketResources($readSocks);
151
152
            //start reading and use a large timeout
153
            if (!stream_select($readSocks, $write, $except, $this->config->getStreamSelectTimeout())) {
154
                throw new WebSocketException('something went wrong while selecting',
155
                    CommonsContract::SERVER_SELECT_ERROR);
156
            }
157
158
            //new client
159
            if (in_array($server, $readSocks, false)) {
160
                $this->acceptNewClient($server, $readSocks);
161
                if ($this->config->isCheckOrigin() && $this->config->isOriginHeader() === false) {
162
                    continue;
163
                }
164
            }
165
166
            //message from existing client
167
            $this->messagesWorker($readSocks);
168
        }
169
    }
170
171
    /**
172
     * @param resource $server
173
     * @param array $readSocks
174
     * @throws ConnectionException
175
     */
176
    private function acceptNewClient($server, array &$readSocks): void
177
    {
178
        $newClient = stream_socket_accept($server, 0); // must be 0 to non-block
179
        if ($newClient) {
180
            if ($this->config->isSsl() === true) {
181
                $isEnabled = stream_socket_enable_crypto($newClient, true, $this->config->getCryptoType());
182
                if ($isEnabled === false) { // couldn't enable crypto - let's try one more time
183
                    return;
184
                }
185
            }
186
187
            // important to read from headers here coz later client will change and there will be only msgs on pipe
188
            $headers = fread($newClient, self::HEADER_BYTES_READ);
189
            if ($this->config->isCheckOrigin()) {
190
                $hasOrigin = (new OriginComponent($this->config, $newClient))->checkOrigin($headers);
191
                $this->config->setOriginHeader($hasOrigin);
192
                if ($hasOrigin === false) {
193
                    return;
194
                }
195
            }
196
197
            if (empty($this->handler->pathParams[0]) === false) {
198
                $this->setPathParams($headers);
199
            }
200
201
            $this->clients[] = $newClient;
202
            $this->stepRecursion = true; // set on new client - remainder % is always 0
203
204
            // trigger OPEN event
205
            $this->handler->onOpen(new Connection($newClient, $this->clients));
206
            $this->handshake($newClient, $headers);
207
        }
208
209
        //delete the server socket from the read sockets
210
        unset($readSocks[array_search($server, $readSocks, false)]);
211
    }
212
213
    /**
214
     * @param array $readSocks
215
     * @uses onPing
216
     * @uses onPong
217
     * @uses onMessage
218
     */
219
    private function messagesWorker(array $readSocks): void
220
    {
221
        foreach ($readSocks as $kSock => $sock) {
222
            $data = $this->decode(fread($sock, self::MAX_BYTES_READ));
223
            if ($data !== null) {
224
                $dataType = $data['type'];
225
                $dataPayload = $data['payload'];
226
227
                // to manipulate connection through send/close methods via handler, specified in IConnection
228
                $this->cureentConn = new Connection($sock, $this->clients);
229
                if (empty($data) || $dataType === self::EVENT_TYPE_CLOSE) { // close event triggered from client - browser tab or close socket event
230
                    // trigger CLOSE event
231
                    try {
232
                        $this->handler->onClose($this->cureentConn);
233
                    } catch (WebSocketException $e) {
234
                        $e->printStack();
235
                    }
236
237
                    // to avoid event leaks
238
                    unset($this->clients[array_search($sock, $this->clients)], $readSocks[$kSock]);
239
                    continue;
240
                }
241
242
                $isSupportedMethod = empty(self::MAP_EVENT_TYPE_TO_METHODS[$dataType]) === false
243
                    && method_exists($this->handler, self::MAP_EVENT_TYPE_TO_METHODS[$dataType]);
244
                if ($isSupportedMethod) {
245
                    try {
246
                        // dynamic call: onMessage, onPing, onPong
247
                        $this->handler->{self::MAP_EVENT_TYPE_TO_METHODS[$dataType]}($this->cureentConn, $dataPayload);
248
                    } catch (WebSocketException $e) {
249
                        $e->printStack();
250
                    }
251
                }
252
            }
253
        }
254
    }
255
256
    /**
257
     * Handshakes/upgrade and key parse
258
     *
259
     * @param resource $client Source client socket to write
260
     * @param string $headers Headers that client has been sent
261
     * @return string   socket handshake key (Sec-WebSocket-Key)| false on parse error
262
     * @throws ConnectionException
263
     */
264
    private function handshake($client, string $headers): string
265
    {
266
        $match = [];
267
        preg_match(self::SEC_WEBSOCKET_KEY_PTRN, $headers, $match);
268
        if (empty($match[1])) {
269
            return false;
270
        }
271
272
        $key = $match[1];
273
        $this->handshakes[(int)$client] = $key;
274
275
        // sending header according to WebSocket Protocol
276
        $secWebSocketAccept = base64_encode(sha1(trim($key) . self::HEADER_WEBSOCKET_ACCEPT_HASH, true));
277
        $this->setHeadersUpgrade($secWebSocketAccept);
278
        $upgradeHeaders = $this->getHeadersUpgrade();
279
280
        fwrite($client, $upgradeHeaders);
281
282
        return $key;
283
    }
284
285
    /**
286
     * Sets an array of headers needed to upgrade server/client connection
287
     *
288
     * @param string $secWebSocketAccept base64 encoded Sec-WebSocket-Accept header
289
     */
290
    private function setHeadersUpgrade($secWebSocketAccept)
291
    {
292
        $this->headersUpgrade = [
293
            self::HEADERS_UPGRADE_KEY => self::HEADERS_UPGRADE_VALUE,
294
            self::HEADERS_CONNECTION_KEY => self::HEADERS_CONNECTION_VALUE,
295
            self::HEADERS_SEC_WEBSOCKET_ACCEPT_KEY => ' ' . $secWebSocketAccept
296
            // the space before key is really important
297
        ];
298
    }
299
300
    /**
301
     * Retreives headers from an array of headers to upgrade server/client connection
302
     *
303
     * @return string   Headers to Upgrade communication connection
304
     * @throws ConnectionException
305
     */
306
    private function getHeadersUpgrade(): string
307
    {
308
        $handShakeHeaders = self::HEADER_HTTP1_1 . self::HEADERS_EOL;
309
        if (empty($this->headersUpgrade)) {
310
            throw new ConnectionException('Headers for upgrade handshake are not set' . PHP_EOL,
311
                CommonsContract::SERVER_HEADERS_NOT_SET);
312
        }
313
314
        foreach ($this->headersUpgrade as $key => $header) {
315
            $handShakeHeaders .= $key . ':' . $header . self::HEADERS_EOL;
316
            if ($key === self::HEADERS_SEC_WEBSOCKET_ACCEPT_KEY) { // add additional EOL fo Sec-WebSocket-Accept
317
                $handShakeHeaders .= self::HEADERS_EOL;
318
            }
319
        }
320
321
        return $handShakeHeaders;
322
    }
323
324
    /**
325
     * Parses parameters from GET on web-socket client connection before handshake
326
     *
327
     * @param string $headers
328
     */
329
    private function setPathParams(string $headers): void
330
    {
331
        if (empty($this->handler->pathParams) === false) {
332
            $matches = [];
333
            preg_match('/GET\s(.*?)\s/', $headers, $matches);
334
            $left = $matches[1];
335
336
            foreach ($this->handler->pathParams as $k => $param) {
337
                if (empty($this->handler->pathParams[$k + 1]) && strpos($left, '/', 1) === false) {
338
                    // do not eat last char if there is no / at the end
339
                    $this->handler->pathParams[$param] = substr($left, strpos($left, '/') + 1);
340
                } else {
341
                    // eat both slashes
342
                    $this->handler->pathParams[$param] = substr($left, strpos($left, '/') + 1,
343
                        strpos($left, '/', 1) - 1);
344
                }
345
346
                // clear the declaration of parsed param
347
                unset($this->handler->pathParams[array_search($param, $this->handler->pathParams, false)]);
348
                $left = substr($left, strpos($left, '/', 1));
349
            }
350
        }
351
    }
352
}
353