Completed
Push — master ( d0ea51...fa824b )
by Arthur
01:26
created

WebSocketServer   B

Complexity

Total Complexity 44

Size/Duplication

Total Lines 303
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 6

Importance

Changes 0
Metric Value
wmc 44
lcom 1
cbo 6
dl 0
loc 303
rs 8.8798
c 0
b 0
f 0

10 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 10 1
A run() 0 14 2
A eventLoop() 0 13 4
A acceptNewClient() 0 22 3
B messagesWorker() 0 32 7
C looping() 0 50 15
A handshake() 0 20 2
A setHeadersUpgrade() 0 9 1
A getHeadersUpgrade() 0 16 4
A setPathParams() 0 23 5

How to fix   Complexity   

Complex Class

Complex classes like WebSocketServer often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes. You can also have a look at the cohesion graph to spot any un-connected, or weakly-connected components.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use WebSocketServer, and based on these observations, apply Extract Interface, too.

1
<?php
2
3
namespace WSSC;
4
5
use WSSC\Components\Connection;
6
use WSSC\Components\ServerConfig;
7
use WSSC\Components\WssMain;
8
use WSSC\Contracts\CommonsContract;
9
use WSSC\Contracts\WebSocket;
10
use WSSC\Contracts\WebSocketServerContract;
11
use WSSC\Exceptions\ConnectionException;
12
use WSSC\Exceptions\WebSocketException;
13
14
/**
15
 * Create by Arthur Kushman
16
 *
17
 * @property ServerConfig config
18
 * @property WebSocket handler
19
 */
20
class WebSocketServer extends WssMain implements WebSocketServerContract
21
{
22
23
    private $clients = [];
24
    // set any template You need ex.: GET /subscription/messenger/token
25
    private $pathParams = [];
26
    private $config;
27
    private $handshakes = [];
28
    private $headersUpgrade = [];
29
    private $totalClients = 0;
30
    private $maxClients = 1;
31
    private $handler;
32
    private $cureentConn;
33
34
    // for the very 1st time must be true
35
    private $stepRecursion = true;
36
37
    private const MAX_BYTES_READ    = 8192;
38
    private const HEADER_BYTES_READ = 1024;
39
40
    // stream non-blocking
41
    public const NON_BLOCK  = 0;
42
43
    /**
44
     * WebSocketServer constructor.
45
     *
46
     * @param WebSocket $handler
47
     * @param ServerConfig $config
48
     */
49
    public function __construct(
50
        WebSocket $handler,
51
        ServerConfig $config
52
    ) {
53
        ini_set('default_socket_timeout', 5); // this should be >= 5 sec, otherwise there will be broken pipe - tested
54
55
        $this->handler = $handler;
56
        $this->config = $config;
57
        $this->setIsPcntlLoaded(extension_loaded('pcntl'));
58
    }
59
60
    /**
61
     * Runs main process - Anscestor with server socket on TCP
62
     *
63
     * @throws WebSocketException
64
     * @throws ConnectionException
65
     */
66
    public function run()
67
    {
68
        $errno = NULL;
69
        $errorMessage = '';
70
71
        $server = stream_socket_server("tcp://{$this->config->getHost()}:{$this->config->getPort()}", $errno, $errorMessage);
72
73
        if ($server === false) {
74
           throw new WebSocketException('Could not bind to socket: ' . $errno . ' - ' . $errorMessage . PHP_EOL, CommonsContract::SERVER_COULD_NOT_BIND_TO_SOCKET);
75
        }
76
77
        @cli_set_process_title($this->config->getProcessName());
0 ignored issues
show
Security Best Practice introduced by
It seems like you do not handle an error condition here. This can introduce security issues, and is generally not recommended.

If you suppress an error, we recommend checking for the error condition explicitly:

// For example instead of
@mkdir($dir);

// Better use
if (@mkdir($dir) === false) {
    throw new \RuntimeException('The directory '.$dir.' could not be created.');
}
Loading history...
78
        $this->eventLoop($server);
79
    }
80
81
    /**
82
     * Recursive event loop that input intu recusion by remainder = 0 - thus when N users,
83
     * and when forks equals true which prevents it from infinite recursive iterations
84
     *
85
     * @param resource $server server connection
86
     * @param bool $fork       flag to fork or run event loop
87
     * @throws WebSocketException
88
     * @throws ConnectionException
89
     */
90
    private function eventLoop($server, bool $fork = false)
91
    {
92
        if ($fork === true && $this->isPcntlLoaded()) {
93
            $pid = pcntl_fork();
94
95
            if ($pid) { // run eventLoop in parent        
96
                @cli_set_process_title($this->config->getProcessName());
0 ignored issues
show
Security Best Practice introduced by
It seems like you do not handle an error condition here. This can introduce security issues, and is generally not recommended.

If you suppress an error, we recommend checking for the error condition explicitly:

// For example instead of
@mkdir($dir);

// Better use
if (@mkdir($dir) === false) {
    throw new \RuntimeException('The directory '.$dir.' could not be created.');
}
Loading history...
97
                $this->eventLoop($server);
98
            }
99
        } else {
100
            $this->looping($server);
101
        }
102
    }
103
104
    /**
105
     * @param resource $server
106
     * @throws WebSocketException
107
     * @throws ConnectionException
108
     */
109
    private function looping($server)
110
    {
111
        while (true) {
112
            $this->totalClients = count($this->clients) + 1;
113
114
            // maxClients prevents process fork on count down
115
            if ($this->totalClients > $this->maxClients) {
116
                $this->maxClients = $this->totalClients;
117
            }
118
119
            $doFork = $this->config->isForking() === true
120
                && $this->totalClients !== 0 // avoid 0 process creation
121
                && $this->stepRecursion === true // only once
122
                && $this->maxClients === $this->totalClients // only if stack grows
123
                && $this->totalClients % $this->config->getClientsPerFork() === 0; // only when N is there
124
            if ($doFork) {
125
                $this->stepRecursion = false;
126
                $this->eventLoop($server, true);
127
            }
128
129
            if ($this->totalClients !== 0 && $this->maxClients > $this->totalClients
130
                && $this->totalClients % $this->config->getClientsPerFork() === 0) { // there is less connection for amount of processes at this moment
131
                exit(1);
132
            }
133
134
            //prepare readable sockets
135
            $readSocks = $this->clients;
136
            $readSocks[] = $server;
137
138
            // clear socket resources that were closed, thus avoiding (stream_select(): supplied resource is not a valid stream resource)
139
            foreach ($readSocks as $k => $sock) {
140
                if (!is_resource($sock)) {
141
                    unset($readSocks[$k]);
142
                }
143
            }
144
145
            //start reading and use a large timeout
146
            if (!stream_select($readSocks, $write, $except, $this->config->getStreamSelectTimeout())) {
147
                throw new WebSocketException('something went wrong while selecting', CommonsContract::SERVER_SELECT_ERROR);
148
            }
149
150
            //new client
151
            if (in_array($server, $readSocks, false)) {
152
                $this->acceptNewClient($server, $readSocks);
153
            }
154
155
            //message from existing client
156
            $this->messagesWorker($readSocks);
157
        }
158
    }
159
160
    /**
161
     * @param resource $server
162
     * @param array $readSocks
163
     * @throws ConnectionException
164
     */
165
    private function acceptNewClient($server, array &$readSocks)
166
    {
167
        $newClient = stream_socket_accept($server, 0); // must be 0 to non-block
168
        if ($newClient) {
169
170
            // important to read from headers here coz later client will change and there will be only msgs on pipe
171
            $headers = fread($newClient, self::HEADER_BYTES_READ);
172
            if (empty($this->handler->pathParams[0]) === false) {
173
                $this->setPathParams($headers);
174
            }
175
176
            $this->clients[] = $newClient;
177
            $this->stepRecursion = true; // set on new client - remainder % is always 0
178
179
            // trigger OPEN event
180
            $this->handler->onOpen(new Connection($newClient, $this->clients));
181
            $this->handshake($newClient, $headers);
182
        }
183
184
        //delete the server socket from the read sockets
185
        unset($readSocks[array_search($server, $readSocks, false)]);
186
    }
187
188
    /**
189
     * @uses onMessage
190
     * @uses onPing
191
     * @uses onPong
192
     * @param array $readSocks
193
     */
194
    private function messagesWorker(array $readSocks)
195
    {
196
        foreach ($readSocks as $kSock => $sock) {
197
            $data = $this->decode(fread($sock, self::MAX_BYTES_READ));
198
            $dataType = $data['type'];
199
            $dataPayload = $data['payload'];
200
201
            // to manipulate connection through send/close methods via handler, specified in IConnection
202
            $this->cureentConn = new Connection($sock, $this->clients);
203
            if (empty($data) || $dataType === self::EVENT_TYPE_CLOSE) { // close event triggered from client - browser tab or close socket event
204
                // trigger CLOSE event
205
                try {
206
                    $this->handler->onClose($this->cureentConn);
207
                } catch (WebSocketException $e) {
208
                    $e->printStack();
209
                }
210
211
                // to avoid event leaks
212
                unset($this->clients[array_search($sock, $this->clients)], $readSocks[$kSock]);
213
                continue;
214
            }
215
216
            if (method_exists($this->handler, self::MAP_EVENT_TYPE_TO_METHODS[$dataType])) {
217
                try {
218
                    // dynamic call: onMessage, onPing, onPong
219
                    $this->handler->{self::MAP_EVENT_TYPE_TO_METHODS[$dataType]}($this->cureentConn, $dataPayload);
220
                } catch (WebSocketException $e) {
221
                    $e->printStack();
222
                }
223
            }
224
        }
225
    }
226
227
    /**
228
     * Handshakes/upgrade and key parse
229
     *
230
     * @param resource $client Source client socket to write
231
     * @param string $headers  Headers that client has been sent
232
     * @return string   socket handshake key (Sec-WebSocket-Key)| false on parse error
233
     * @throws ConnectionException
234
     */
235
    private function handshake($client, string $headers): string
236
    {
237
        $match = [];
238
        preg_match(self::SEC_WEBSOCKET_KEY_PTRN, $headers, $match);
239
        if (empty($match[1])) {
240
            return false;
241
        }
242
243
        $key = $match[1];
244
        $this->handshakes[(int)$client] = $key;
245
246
        // sending header according to WebSocket Protocol
247
        $secWebSocketAccept = base64_encode(sha1(trim($key) . self::HEADER_WEBSOCKET_ACCEPT_HASH, true));
248
        $this->setHeadersUpgrade($secWebSocketAccept);
249
        $upgradeHeaders = $this->getHeadersUpgrade();
250
251
        fwrite($client, $upgradeHeaders);
252
253
        return $key;
254
    }
255
256
    /**
257
     * Sets an array of headers needed to upgrade server/client connection
258
     *
259
     * @param string $secWebSocketAccept base64 encoded Sec-WebSocket-Accept header
260
     */
261
    private function setHeadersUpgrade($secWebSocketAccept)
262
    {
263
        $this->headersUpgrade = [
264
            self::HEADERS_UPGRADE_KEY              => self::HEADERS_UPGRADE_VALUE,
265
            self::HEADERS_CONNECTION_KEY           => self::HEADERS_CONNECTION_VALUE,
266
            self::HEADERS_SEC_WEBSOCKET_ACCEPT_KEY => ' ' . $secWebSocketAccept
267
            // the space before key is really important
268
        ];
269
    }
270
271
    /**
272
     * Retreives headers from an array of headers to upgrade server/client connection
273
     *
274
     * @return string   Headers to Upgrade communication connection
275
     * @throws ConnectionException
276
     */
277
    private function getHeadersUpgrade(): string
278
    {
279
        $handShakeHeaders = self::HEADER_HTTP1_1 . self::HEADERS_EOL;
280
        if (empty($this->headersUpgrade)) {
281
            throw new ConnectionException('Headers for upgrade handshake are not set' . PHP_EOL, CommonsContract::SERVER_HEADERS_NOT_SET);
282
        }
283
284
        foreach ($this->headersUpgrade as $key => $header) {
285
            $handShakeHeaders .= $key . ':' . $header . self::HEADERS_EOL;
286
            if ($key === self::HEADERS_SEC_WEBSOCKET_ACCEPT_KEY) { // add additional EOL fo Sec-WebSocket-Accept
287
                $handShakeHeaders .= self::HEADERS_EOL;
288
            }
289
        }
290
291
        return $handShakeHeaders;
292
    }
293
294
    /**
295
     * Parses parameters from GET on web-socket client connection before handshake
296
     *
297
     * @param string $headers
298
     */
299
    private function setPathParams(string $headers)
300
    {
301
        if (empty($this->handler->pathParams) === false) {
302
            $matches = [];
303
            preg_match('/GET\s(.*?)\s/', $headers, $matches);
304
            $left = $matches[1];
305
306
            foreach ($this->handler->pathParams as $k => $param) {
307
                if (empty($this->handler->pathParams[$k + 1]) && strpos($left, '/', 1) === false) {
308
                    // do not eat last char if there is no / at the end
309
                    $this->handler->pathParams[$param] = substr($left, strpos($left, '/') + 1);
310
                } else {
311
                    // eat both slashes
312
                    $this->handler->pathParams[$param] = substr($left, strpos($left, '/') + 1,
313
                        strpos($left, '/', 1) - 1);
314
                }
315
316
                // clear the declaration of parsed param
317
                unset($this->handler->pathParams[array_search($param, $this->handler->pathParams, false)]);
318
                $left = substr($left, strpos($left, '/', 1));
319
            }
320
        }
321
    }
322
}
323