Completed
Push — master ( 516ea9...17b012 )
by Arthur
01:34
created

WscMain::setFragmentSize()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 6
rs 10
c 0
b 0
f 0
cc 1
nc 1
nop 1
1
<?php
2
3
namespace WSSC\Components;
4
5
use WSSC\Contracts\CommonsContract;
6
use WSSC\Contracts\WscCommonsContract;
7
use WSSC\Exceptions\BadOpcodeException;
8
use WSSC\Exceptions\BadUriException;
9
use WSSC\Exceptions\ConnectionException;
10
11
/**
12
 * Class WscMain
13
 *
14
 * @package WSSC\Components
15
 *
16
 * @property ClientConfig config
17
 */
18
class WscMain implements WscCommonsContract
19
{
20
21
    private $socket;
22
    private $isConnected = false;
23
    private $isClosing = false;
24
    private $lastOpcode;
25
    private $closeStatus;
26
    private $hugePayload;
27
28
    private static $opcodes = [
29
        CommonsContract::EVENT_TYPE_CONTINUATION => 0,
30
        CommonsContract::EVENT_TYPE_TEXT         => 1,
31
        CommonsContract::EVENT_TYPE_BINARY       => 2,
32
        CommonsContract::EVENT_TYPE_CLOSE        => 8,
33
        CommonsContract::EVENT_TYPE_PING         => 9,
34
        CommonsContract::EVENT_TYPE_PONG         => 10,
35
    ];
36
37
    protected $socketUrl = '';
38
    protected $config;
39
40
    /**
41
     * @throws \InvalidArgumentException
42
     * @throws BadUriException
43
     * @throws ConnectionException
44
     * @throws \Exception
45
     */
46
    protected function connect()
47
    {
48
        $urlParts = parse_url($this->socketUrl);
49
50
        $scheme = $urlParts['scheme'];
51
        $host = $urlParts['host'];
52
        $user = isset($urlParts['user']) ? $urlParts['user'] : '';
53
        $pass = isset($urlParts['pass']) ? $urlParts['pass'] : '';
54
        $port = isset($urlParts['port']) ? $urlParts['port'] : ($scheme === 'wss' ? 443 : 80);
55
56
        $pathWithQuery = $this->getPathWithQuery($urlParts);
57
        $hostUri = $this->getHostUri($scheme, $host);
58
59
        // Set the stream context options if they're already set in the config
60
        $context = $this->getStreamContext();
61
        $this->socket = @stream_socket_client(
62
            $hostUri . ':' . $port, $errno, $errstr, $this->config->getTimeout(), STREAM_CLIENT_CONNECT, $context
63
        );
64
65
        if ($this->socket === false) {
66
            throw new ConnectionException(
67
                "Could not open socket to \"$host:$port\": $errstr ($errno)."
68
            );
69
        }
70
71
        // Set timeout on the stream as well.
72
        stream_set_timeout($this->socket, $this->config->getTimeout());
73
74
        // Generate the WebSocket key.
75
        $key = $this->generateKey();
76
        $headers = [
77
            'Host'                  => $host . ':' . $port,
78
            'User-Agent'            => 'websocket-client-php',
79
            'Connection'            => 'Upgrade',
80
            'Upgrade'               => 'WebSocket',
81
            'Sec-WebSocket-Key'     => $key,
82
            'Sec-Websocket-Version' => '13',
83
        ];
84
85
        // Handle basic authentication.
86
        if ($user || $pass) {
87
            $headers['authorization'] = 'Basic ' . base64_encode($user . ':' . $pass) . "\r\n";
88
        }
89
90
        // Add and override with headers from options.
91
        if (!empty($this->config->getHeaders())) {
92
            $headers = array_merge($headers, $this->config->getHeaders());
93
        }
94
95
        $header = $this->getHeaders($pathWithQuery, $headers);
96
97
        // Send headers.
98
        $this->write($header);
99
100
        // Get server response header
101
        // @todo Handle version switching
102
        $this->validateResponse($scheme, $host, $pathWithQuery, $key);
103
        $this->isConnected = true;
104
    }
105
106
    /**
107
     * @param string $scheme
108
     * @param string $host
109
     * @return string
110
     * @throws BadUriException
111
     */
112
    private function getHostUri(string $scheme, string $host): string
113
    {
114
        if (in_array($scheme, ['ws', 'wss'], true) === false) {
115
            throw new BadUriException(
116
                "Url should have scheme ws or wss, not '$scheme' from URI '$this->socketUrl' ."
117
            );
118
        }
119
120
        return ($scheme === 'wss' ? 'ssl' : 'tcp') . '://' . $host;
121
    }
122
123
    /**
124
     * @param string $scheme
125
     * @param string $host
126
     * @param string $pathWithQuery
127
     * @param string $key
128
     * @throws ConnectionException
129
     */
130
    private function validateResponse(string $scheme, string $host, string $pathWithQuery, string $key)
131
    {
132
        $response = stream_get_line($this->socket, self::DEFAULT_RESPONSE_HEADER, "\r\n\r\n");
133
        if (!preg_match(self::SEC_WEBSOCKET_ACCEPT_PTTRN, $response, $matches)) {
134
            $address = $scheme . '://' . $host . $pathWithQuery;
135
            throw new ConnectionException(
136
                "Connection to '{$address}' failed: Server sent invalid upgrade response:\n"
137
                . $response
138
            );
139
        }
140
141
        $keyAccept = trim($matches[1]);
142
        $expectedResonse = base64_encode(pack('H*', sha1($key . self::SERVER_KEY_ACCEPT)));
143
        if ($keyAccept !== $expectedResonse) {
144
            throw new ConnectionException('Server sent bad upgrade response.');
145
        }
146
    }
147
148
    /**
149
     * @return mixed|resource
150
     * @throws \InvalidArgumentException
151
     */
152
    private function getStreamContext()
153
    {
154
        if ($this->config->getContext() !== NULL) {
155
            // Suppress the error since we'll catch it below
156
            if (@get_resource_type($this->config->getContext()) === 'stream-context') {
157
                return $this->config->getContext();
158
            }
159
160
            throw new \InvalidArgumentException(
161
                "Stream context in \$options['context'] isn't a valid context"
162
            );
163
        }
164
165
        return stream_context_create();
166
    }
167
168
    /**
169
     * @param mixed $urlParts
170
     * @return string
171
     */
172
    private function getPathWithQuery($urlParts): string
173
    {
174
        $path = isset($urlParts['path']) ? $urlParts['path'] : '/';
175
        $query = isset($urlParts['query']) ? $urlParts['query'] : '';
176
        $fragment = isset($urlParts['fragment']) ? $urlParts['fragment'] : '';
177
        $pathWithQuery = $path;
178
        if (!empty($query)) {
179
            $pathWithQuery .= '?' . $query;
180
        }
181
        if (!empty($fragment)) {
182
            $pathWithQuery .= '#' . $fragment;
183
        }
184
185
        return $pathWithQuery;
186
    }
187
188
    /**
189
     * @param string $pathWithQuery
190
     * @param array $headers
191
     * @return string
192
     */
193
    private function getHeaders(string $pathWithQuery, array $headers): string
194
    {
195
        return 'GET ' . $pathWithQuery . " HTTP/1.1\r\n"
196
            . implode(
197
                "\r\n", array_map(
198
                    function ($key, $value) {
199
                        return "$key: $value";
200
                    }, array_keys($headers), $headers
201
                )
202
            )
203
            . "\r\n\r\n";
204
    }
205
206
    /**
207
     * @return string
208
     */
209
    public function getLastOpcode(): string
210
    {
211
        return $this->lastOpcode;
212
    }
213
214
    /**
215
     * @return int
216
     */
217
    public function getCloseStatus(): int
218
    {
219
        return $this->closeStatus;
220
    }
221
222
    /**
223
     * @return bool
224
     */
225
    public function isConnected(): bool
226
    {
227
        return $this->isConnected;
228
    }
229
230
    /**
231
     * @param int $timeout
232
     * @param null $microSecs
233
     * @return WscMain
234
     */
235
    public function setTimeout(int $timeout, $microSecs = NULL): WscMain
236
    {
237
        $this->config->setTimeout($timeout);
238
        if ($this->socket && get_resource_type($this->socket) === 'stream') {
239
            stream_set_timeout($this->socket, $timeout, $microSecs);
240
        }
241
242
        return $this;
243
    }
244
245
    /**
246
     * Sends message to opened socket connection client->server
247
     *
248
     * @param $payload
249
     * @param string $opcode
250
     * @throws \InvalidArgumentException
251
     * @throws BadOpcodeException
252
     * @throws BadUriException
253
     * @throws ConnectionException
254
     * @throws \Exception
255
     */
256
    public function send($payload, $opcode = CommonsContract::EVENT_TYPE_TEXT)
257
    {
258
        if (!$this->isConnected) {
259
            $this->connect();
260
        }
261
        if (array_key_exists($opcode, self::$opcodes) === false) {
262
            throw new BadOpcodeException("Bad opcode '$opcode'.  Try 'text' or 'binary'.");
263
        }
264
        // record the length of the payload
265
        $payloadLength = strlen($payload);
266
267
        $fragmentCursor = 0;
268
        // while we have data to send
269
        while ($payloadLength > $fragmentCursor) {
270
            // get a fragment of the payload
271
            $sub_payload = substr($payload, $fragmentCursor, $this->config->getFragmentSize());
272
273
            // advance the cursor
274
            $fragmentCursor += $this->config->getFragmentSize();
275
276
            // is this the final fragment to send?
277
            $final = $payloadLength <= $fragmentCursor;
278
279
            // send the fragment
280
            $this->sendFragment($final, $sub_payload, $opcode, true);
281
282
            // all fragments after the first will be marked a continuation
283
            $opcode = 'continuation';
284
        }
285
    }
286
287
    /**
288
     * @param $final
289
     * @param $payload
290
     * @param $opcode
291
     * @param $masked
292
     * @throws ConnectionException
293
     * @throws \Exception
294
     */
295
    protected function sendFragment($final, $payload, $opcode, $masked)
296
    {
297
        // Binary string for header.
298
        $frameHeadBin = '';
299
        // Write FIN, final fragment bit.
300
        $frameHeadBin .= (bool)$final ? '1' : '0';
301
        // RSV 1, 2, & 3 false and unused.
302
        $frameHeadBin .= '000';
303
        // Opcode rest of the byte.
304
        $frameHeadBin .= sprintf('%04b', self::$opcodes[$opcode]);
305
        // Use masking?
306
        $frameHeadBin .= $masked ? '1' : '0';
307
308
        // 7 bits of payload length...
309
        $payloadLen = strlen($payload);
310
        if ($payloadLen > self::MAX_BYTES_READ) {
311
            $frameHeadBin .= decbin(self::MASK_127);
312
            $frameHeadBin .= sprintf('%064b', $payloadLen);
313
        } else if ($payloadLen > self::MASK_125) {
314
            $frameHeadBin .= decbin(self::MASK_126);
315
            $frameHeadBin .= sprintf('%016b', $payloadLen);
316
        } else {
317
            $frameHeadBin .= sprintf('%07b', $payloadLen);
318
        }
319
320
        $frame = '';
321
322
        // Write frame head to frame.
323
        foreach (str_split($frameHeadBin, 8) as $binstr) {
324
            $frame .= chr(bindec($binstr));
325
        }
326
        // Handle masking
327
        if ($masked) {
328
            // generate a random mask:
329
            $mask = '';
330 View Code Duplication
            for ($i = 0; $i < 4; $i++) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
331
                $mask .= chr(random_int(0, 255));
332
            }
333
            $frame .= $mask;
334
        }
335
336
        // Append payload to frame:
337 View Code Duplication
        for ($i = 0; $i < $payloadLen; $i++) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
338
            $frame .= ($masked === true) ? $payload[$i] ^ $mask[$i % 4] : $payload[$i];
0 ignored issues
show
Bug introduced by
The variable $mask does not seem to be defined for all execution paths leading up to this point.

If you define a variable conditionally, it can happen that it is not defined for all execution paths.

Let’s take a look at an example:

function myFunction($a) {
    switch ($a) {
        case 'foo':
            $x = 1;
            break;

        case 'bar':
            $x = 2;
            break;
    }

    // $x is potentially undefined here.
    echo $x;
}

In the above example, the variable $x is defined if you pass “foo” or “bar” as argument for $a. However, since the switch statement has no default case statement, if you pass any other value, the variable $x would be undefined.

Available Fixes

  1. Check for existence of the variable explicitly:

    function myFunction($a) {
        switch ($a) {
            case 'foo':
                $x = 1;
                break;
    
            case 'bar':
                $x = 2;
                break;
        }
    
        if (isset($x)) { // Make sure it's always set.
            echo $x;
        }
    }
    
  2. Define a default value for the variable:

    function myFunction($a) {
        $x = ''; // Set a default which gets overridden for certain paths.
        switch ($a) {
            case 'foo':
                $x = 1;
                break;
    
            case 'bar':
                $x = 2;
                break;
        }
    
        echo $x;
    }
    
  3. Add a value for the missing path:

    function myFunction($a) {
        switch ($a) {
            case 'foo':
                $x = 1;
                break;
    
            case 'bar':
                $x = 2;
                break;
    
            // We add support for the missing case.
            default:
                $x = '';
                break;
        }
    
        echo $x;
    }
    
Loading history...
339
        }
340
341
        $this->write($frame);
342
    }
343
344
    /**
345
     * Receives message client<-server
346
     *
347
     * @return null|string
348
     * @throws \InvalidArgumentException
349
     * @throws BadOpcodeException
350
     * @throws BadUriException
351
     * @throws ConnectionException
352
     * @throws \Exception
353
     */
354
    public function receive()
355
    {
356
        if (!$this->isConnected) {
357
            $this->connect();
358
        }
359
        $this->hugePayload = '';
360
361
        $response = NULL;
362
        while (NULL === $response) {
363
            $response = $this->receiveFragment();
364
        }
365
366
        return $response;
367
    }
368
369
    /**
370
     * @return null|string
371
     * @throws \InvalidArgumentException
372
     * @throws BadOpcodeException
373
     * @throws BadUriException
374
     * @throws ConnectionException
375
     * @throws \Exception
376
     */
377
    protected function receiveFragment()
378
    {
379
        // Just read the main fragment information first.
380
        $data = $this->read(2);
381
382
        // Is this the final fragment?  // Bit 0 in byte 0
383
        /// @todo Handle huge payloads with multiple fragments.
384
        $final = (bool)(ord($data[0]) & 1 << 7);
385
386
        // Parse opcode
387
        $opcode_int = ord($data[0]) & 31; // Bits 4-7
388
        $opcode_ints = array_flip(self::$opcodes);
389
        if (!array_key_exists($opcode_int, $opcode_ints)) {
390
            throw new ConnectionException("Bad opcode in websocket frame: $opcode_int");
391
        }
392
        $opcode = $opcode_ints[$opcode_int];
393
394
        // record the opcode if we are not receiving a continutation fragment
395
        if ($opcode !== 'continuation') {
396
            $this->lastOpcode = $opcode;
397
        }
398
399
        $payloadLength = $this->getPayloadLength($data);
400
        $payload = $this->getPayloadData($data, $payloadLength);
401
402
        if ($opcode === CommonsContract::EVENT_TYPE_CLOSE) {
403
            // Get the close status.
404
            if ($payloadLength >= 2) {
405
                $statusBin = $payload[0] . $payload[1];
406
                $status = bindec(sprintf('%08b%08b', ord($payload[0]), ord($payload[1])));
407
                $this->closeStatus = $status;
408
                $payload = substr($payload, 2);
409
410
                if (!$this->isClosing) {
411
                    $this->send($statusBin . 'Close acknowledged: ' . $status,
412
                        CommonsContract::EVENT_TYPE_CLOSE); // Respond.
413
                }
414
            }
415
416
            if ($this->isClosing) {
417
                $this->isClosing = false; // A close response, all done.
418
            }
419
420
            fclose($this->socket);
421
            $this->isConnected = false;
422
        }
423
424
        if (!$final) {
425
            $this->hugePayload .= $payload;
426
427
            return NULL;
428
        } // this is the last fragment, and we are processing a huge_payload
429
430
        if ($this->hugePayload) {
431
            $payload = $this->hugePayload .= $payload;
432
            $this->hugePayload = NULL;
433
        }
434
435
        return $payload;
436
    }
437
438
    /**
439
     * @param string $data
440
     * @param int $payloadLength
441
     * @return string
442
     * @throws ConnectionException
443
     */
444
    private function getPayloadData(string $data, int $payloadLength): string
445
    {
446
        // Masking?
447
        $mask = (bool)(ord($data[1]) >> 7);  // Bit 0 in byte 1
448
        $payload = '';
449
        $maskingKey = '';
450
451
        // Get masking key.
452
        if ($mask) {
453
            $maskingKey = $this->read(4);
454
        }
455
456
        // Get the actual payload, if any (might not be for e.g. close frames.
457
        if ($payloadLength > 0) {
458
            $data = $this->read($payloadLength);
459
460
            if ($mask) {
461
                // Unmask payload.
462
                for ($i = 0; $i < $payloadLength; $i++) {
463
                    $payload .= ($data[$i] ^ $maskingKey[$i % 4]);
464
                }
465
            } else {
466
                $payload = $data;
467
            }
468
        }
469
470
        return $payload;
471
    }
472
473
    /**
474
     * @param string $data
475
     * @return float|int
476
     * @throws ConnectionException
477
     */
478
    private function getPayloadLength(string $data)
479
    {
480
        $payloadLength = (int)ord($data[1]) & self::MASK_127; // Bits 1-7 in byte 1
481
        if ($payloadLength > self::MASK_125) {
482
            if ($payloadLength === self::MASK_126) {
483
                $data = $this->read(2); // 126: Payload is a 16-bit unsigned int
484
            } else {
485
                $data = $this->read(8); // 127: Payload is a 64-bit unsigned int
486
            }
487
            $payloadLength = bindec(self::sprintB($data));
488
        }
489
490
        return $payloadLength;
491
    }
492
493
    /**
494
     * Tell the socket to close.
495
     *
496
     * @param integer $status http://tools.ietf.org/html/rfc6455#section-7.4
497
     * @param string $message A closing message, max 125 bytes.
498
     * @return bool|null|string
499
     * @throws BadOpcodeException
500
     */
501
    public function close(int $status = 1000, string $message = 'ttfn')
502
    {
503
        $statusBin = sprintf('%016b', $status);
504
        $status_str = '';
505
506
        foreach (str_split($statusBin, 8) as $binstr) {
507
            $status_str .= chr(bindec($binstr));
508
        }
509
510
        $this->send($status_str . $message, CommonsContract::EVENT_TYPE_CLOSE);
511
        $this->isClosing = true;
512
513
        return $this->receive(); // Receiving a close frame will close the socket now.
514
    }
515
516
    /**
517
     * @param $data
518
     * @throws ConnectionException
519
     */
520
    protected function write(string $data)
521
    {
522
        $written = fwrite($this->socket, $data);
523
524
        if ($written < strlen($data)) {
525
            throw new ConnectionException(
526
                "Could only write $written out of " . strlen($data) . ' bytes.'
527
            );
528
        }
529
    }
530
531
    /**
532
     * @param int $len
533
     * @return string
534
     * @throws ConnectionException
535
     */
536
    protected function read(int $len): string
537
    {
538
        $data = '';
539
        while (($dataLen = strlen($data)) < $len) {
540
            $buff = fread($this->socket, $len - $dataLen);
541
542
            if ($buff === false) {
543
                $metadata = stream_get_meta_data($this->socket);
544
                throw new ConnectionException(
545
                    'Broken frame, read ' . strlen($data) . ' of stated '
546
                    . $len . ' bytes.  Stream state: '
547
                    . json_encode($metadata)
548
                );
549
            }
550
551
            if ($buff === '') {
552
                $metadata = stream_get_meta_data($this->socket);
553
                throw new ConnectionException(
554
                    'Empty read; connection dead?  Stream state: ' . json_encode($metadata)
555
                );
556
            }
557
            $data .= $buff;
558
        }
559
560
        return $data;
561
    }
562
563
    /**
564
     * Helper to convert a binary to a string of '0' and '1'.
565
     *
566
     * @param $string
567
     * @return string
568
     */
569
    protected static function sprintB(string $string): string
570
    {
571
        $return = '';
572
        $strLen = strlen($string);
573
        for ($i = 0; $i < $strLen; $i++) {
574
            $return .= sprintf('%08b', ord($string[$i]));
575
        }
576
577
        return $return;
578
    }
579
580
    /**
581
     * Sec-WebSocket-Key generator
582
     *
583
     * @return string   the 16 character length key
584
     * @throws \Exception
585
     */
586
    private function generateKey(): string
587
    {
588
        $chars = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!"$&/()=[]{}0123456789';
589
        $key = '';
590
        $chLen = strlen($chars);
591
        for ($i = 0; $i < self::KEY_GEN_LENGTH; $i++) {
592
            $key .= $chars[random_int(0, $chLen - 1)];
593
        }
594
595
        return base64_encode($key);
596
    }
597
598
}
599