GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Failed Conditions
Pull Request — master (#8)
by Charlotte
03:18
created

src/ProtocolParser.php (3 issues)

1
<?php
2
/**
3
 * Plasma Driver MySQL component
4
 * Copyright 2018-2019 PlasmaPHP, All Rights Reserved
5
 *
6
 * Website: https://github.com/PlasmaPHP
7
 * License: https://github.com/PlasmaPHP/driver-mysql/blob/master/LICENSE
8
*/
9
10
namespace Plasma\Drivers\MySQL;
11
12
/**
13
 * The MySQL Protocol Parser.
14
 * @internal
15
 */
16
class ProtocolParser implements \Evenement\EventEmitterInterface {
17
    use \Evenement\EventEmitterTrait;
18
    
19
    /**
20
     * @var int
21
     */
22
    const STATE_INIT = 0;
23
    
24
    /**
25
     * @var int
26
     */
27
    const STATE_HANDSHAKE = 1;
28
    
29
    /**
30
     * @var int
31
     */
32
    const STATE_HANDSHAKE_ERROR = 2;
33
    
34
    /**
35
     * @var int
36
     */
37
    const STATE_AUTH = 5;
38
    
39
    /**
40
     * @var int
41
     */
42
    const STATE_AUTH_SENT = 6;
43
    
44
    /**
45
     * @var int
46
     */
47
    const STATE_AUTH_ERROR = 7;
48
    
49
    /**
50
     * @var int
51
     */
52
    const STATE_OK = 9;
53
    
54
    /**
55
     * @var int
56
     */
57
    const CLIENT_CAPABILITIES = (
58
        \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_FOUND_ROWS |
59
        \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_LONG_PASSWORD |
60
        \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_LONG_FLAG |
61
        \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_LOCAL_FILES |
62
        \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_INTERACTIVE |
63
        \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_TRANSACTIONS |
64
        \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_SECURE_CONNECTION |
65
        \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_PROTOCOL_41 |
66
        \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_DEPRECATE_EOF
67
    );
68
    
69
    /**
70
     * @var int
71
     */
72
    const CLIENT_MAX_PACKET_SIZE = 0x1000000;
73
    
74
    /**
75
     * @var int
76
     */
77
    const CLIENT_CHARSET_NUMBER = 0x21;
78
    
79
    /**
80
     * @var \Plasma\Drivers\MySQL\Driver
81
     */
82
    protected $driver;
83
    
84
    /**
85
     * @var \React\Socket\ConnectionInterface
86
     */
87
    protected $connection;
88
    
89
    /**
90
     * @var int
91
     */
92
    protected $state = ProtocolParser::STATE_INIT;
93
    
94
    /**
95
     * @var \Plasma\BinaryBuffer
96
     */
97
    protected $buffer;
98
    
99
    /**
100
     * @var \Plasma\BinaryBuffer
101
     */
102
    protected $messageBuffer;
103
    
104
    /**
105
     * The sequence ID is incremented with each packet and may wrap around.
106
     * It starts at 0 and is reset to 0 when a new command begins in the Command Phase.
107
     * @var int
108
     * @see https://dev.mysql.com/doc/internals/en/sequence-id.html
109
     */
110
    protected $sequenceID = -1;
111
    
112
    /**
113
     * Whether we use compression.
114
     * @var bool
115
     */
116
    protected $compressionEnabled = false;
117
    
118
    /**
119
     * The compression ID is incremented with each packet and may wrap around.
120
     * The compression ID is independent to the sequence ID.
121
     * @var int
122
     * @see https://dev.mysql.com/doc/internals/en/compressed-packet-header.html
123
     */
124
    protected $compressionID = -1;
125
    
126
    /**
127
     * Small packets should not be compressed. This defines a minimum size for compression.
128
     * @var int
129
     * @see https://dev.mysql.com/doc/internals/en/uncompressed-payload.html
130
     */
131
    protected $compressionSizeThreshold = 50;
132
    
133
    /**
134
     * @var \Plasma\BinaryBuffer
135
     */
136
    protected $compressionBuffer;
137
    
138
    /**
139
     * @var \Plasma\Drivers\MySQL\Messages\HandshakeMessage|null
140
     */
141
    protected $handshakeMessage;
142
    
143
    /**
144
     * @var \Plasma\Drivers\MySQL\Messages\OkResponseMessage|null
145
     */
146
    protected $lastOkMessage;
147
    
148
    /**
149
     * @var \Plasma\CommandInterface|null
150
     */
151
    protected $currentCommand;
152
    
153
    /**
154
     * @var callable|null
155
     */
156
    protected $parseCallback;
157
    
158
    /**
159
     * Constructor.
160
     * @param \Plasma\Drivers\MySQL\Driver       $driver
161
     * @param \React\Socket\ConnectionInterface  $connection
162
     */
163 60
    function __construct(\Plasma\Drivers\MySQL\Driver $driver, \React\Socket\ConnectionInterface $connection) {
164 60
        $this->driver = $driver;
165 60
        $this->connection = $connection;
166
        
167 60
        $this->buffer = new \Plasma\BinaryBuffer();
168 60
        $this->messageBuffer = new \Plasma\BinaryBuffer();
169 60
        $this->compressionBuffer = new \Plasma\BinaryBuffer();
170
        
171 60
        $this->addEvents();
172 60
    }
173
    
174
    /**
175
     * Invoke a command to execute.
176
     * @param \Plasma\CommandInterface|null  $command
177
     * @return void
178
     */
179 59
    function invokeCommand(?\Plasma\CommandInterface $command): void {
180 59
        if($command === null) {
181 56
            return;
182
        }
183
        
184 59
        $this->currentCommand = $command;
185 59
        $this->processCommand();
186 59
    }
187
    
188
    /**
189
     * Executes a command, without handling any aftermath.
190
     * The `onComplete` callback will be immediately invoked, regardless of the `waitForCompletion` value.
191
     * @param \Plasma\CommandInterface  $command
192
     * @return void
193
     */
194
    function executeCommand(\Plasma\CommandInterface $command): void {
195
        $this->processCommand($command);
196
    }
197
    
198
    /**
199
     * Marks the command itself as finished, if currently running.
200
     * @param \Plasma\Drivers\MySQL\Commands\CommandInterface  $command
201
     * @return void
202
     */
203 56
    function markCommandAsFinished(\Plasma\CommandInterface $command): void {
204 56
        if($command === $this->currentCommand) {
205 56
            $this->currentCommand = null;
206
        }
207
        
208 56
        $command->onComplete();
209 56
    }
210
    
211
    /**
212
     * Get the parser state.
213
     * @return int
214
     */
215 1
    function getState(): int {
216 1
        return $this->state;
217
    }
218
    
219
    /**
220
     * Get the handshake message, or null.
221
     * @return \Plasma\Drivers\MySQL\Messages\HandshakeMessage|null
222
     */
223 59
    function getHandshakeMessage(): ?\Plasma\Drivers\MySQL\Messages\HandshakeMessage {
224 59
        return $this->handshakeMessage;
225
    }
226
    
227
    /**
228
     * Get the last ok response message, or null.
229
     * @return \Plasma\Drivers\MySQL\Messages\OkResponseMessage|null
230
     */
231 4
    function getLastOkMessage(): ?\Plasma\Drivers\MySQL\Messages\OkResponseMessage {
232 4
        return $this->lastOkMessage;
233
    }
234
    
235
    /**
236
     * Enables compression.
237
     * @return void
238
     */
239 4
    function enableCompression(): void {
240 4
        $this->compressionEnabled = true;
241 4
    }
242
    
243
    /**
244
     * Sends a packet to the server.
245
     * @param string  $packet
246
     * @return void
247
     */
248 60
    function sendPacket(string $packet): void {
249 60
        $initPacklen = \strlen($packet);
250
        
251
        do {
252 60
            $partial = \substr($packet, 0, static::CLIENT_MAX_PACKET_SIZE);
253 60
            $partlen = \strlen($partial);
254
            
255 60
            $packet = \substr($packet, static::CLIENT_MAX_PACKET_SIZE);
256 60
            $packlen = \strlen($packet);
257
            
258 60
            $length = \Plasma\BinaryBuffer::writeInt3($partlen);
259 60
            $sequence = \Plasma\BinaryBuffer::writeInt1((++$this->sequenceID));
260
            
261 60
            $packet = $length.$sequence.$partial;
262
            
263 60
            if($this->compressionEnabled && $this->state === static::STATE_OK) {
264 2
                $packet = $this->compressPacket($packet);
265
            }
266
            
267 60
            $this->connection->write($packet);
268 60
        } while($packlen > static::CLIENT_MAX_PACKET_SIZE);
269
        
270
        // If the packet is exactly the max size, we have to send two packets
271 60
        if($initPacklen === static::CLIENT_MAX_PACKET_SIZE) {
272 1
            $length = \Plasma\BinaryBuffer::writeInt3(0);
273 1
            $sequence = \Plasma\BinaryBuffer::writeInt1((++$this->sequenceID));
274 1
            $this->connection->write($length.$sequence);
275
        }
276 60
    }
277
    
278
    /**
279
     * Sets the parse callback.
280
     * @param callable $callback
281
     * @return void
282
     */
283
    function setParseCallback(callable $callback): void {
284
        $this->parseCallback = $callback;
285
    }
286
    
287
    /**
288
     * Processes a command.
289
     * @param \Plasma\CommandInterface|null  $command
290
     * @return void
291
     */
292 59
    protected function processCommand(?\Plasma\CommandInterface $command = null) {
293 59
        if($command === null && $this->currentCommand instanceof \Plasma\CommandInterface) {
294 59
            $command = $this->currentCommand;
295
            
296 59
            if($this->currentCommand instanceof \Plasma\Drivers\MySQL\Commands\CommandInterface) {
297 59
                $state = $command->setParserState();
298 59
                if($state !== -1) {
299 59
                    $this->state = $state;
300
                }
301
            }
302
        }
303
        
304 59
        if($command === null) {
305
            return;
306
        }
307
        
308 59
        if(!($command instanceof \Plasma\Drivers\MySQL\Commands\CommandInterface) || $command->resetSequence()) {
309 56
            $this->sequenceID = -1;
310 56
            $this->compressionID = -1;
311
        }
312
        
313 59
        $this->sendPacket($command->getEncodedMessage());
314
        
315 59
        if($command !== $this->currentCommand || !$command->waitForCompletion()) {
316 35
            $command->onComplete();
317
            
318 35
            if($command === $this->currentCommand) {
319 35
                $this->currentCommand = null;
320
            }
321
        }
322 59
    }
323
    
324
    /**
325
     * Processes the buffer.
326
     * @return void
327
     */
328 59
    protected function processBuffer() {
329 59
        if($this->buffer->getSize() < 4) {
330
            return;
331
        }
332
        
333 59
        $buffer = clone $this->buffer;
334
        
335 59
        $length = $buffer->readInt3();
336 59
        $this->sequenceID = $buffer->readInt1();
337
        
338 59
        if($length === static::CLIENT_MAX_PACKET_SIZE) {
339
            $this->buffer->read(($length + 4));
340
            $this->messageBuffer->append($buffer->read($length));
341
            return;
342 59
        } elseif($this->messageBuffer->getSize() > 0) {
343
            $this->messageBuffer->append($buffer->read($length));
344
            $buffer = $this->messageBuffer;
345
            $this->messageBuffer = new \Plasma\BinaryBuffer();
346
        }
347
        
348 59
        if($buffer->getSize() < $length) {
349
            return;
350
        }
351
        
352 59
        if($length > 0) {
353 59
            $this->buffer->read(($length + 4));
354 59
            $buffer->slice(0, $length);
355
        } else {
356
            $this->buffer->slice($buffer->getSize());
357
        }
358
        
359 59
        if($buffer->getSize() === 0) {
360
            return;
361
        }
362
        
363
        /** @var \Plasma\Drivers\MySQL\Messages\MessageInterface  $message */
364 59
        $message = null;
365
        
366 59
        if($this->state === static::STATE_INIT) {
367 59
            $message = new \Plasma\Drivers\MySQL\Messages\HandshakeMessage($this);
368
        } else {
369 59
            $firstChar = $buffer->read(1);
370
            
371 59
            $okRespID = \Plasma\Drivers\MySQL\Messages\OkResponseMessage::getID();
372
            $isOkMessage = (
373
                (
374 59
                    $firstChar === $okRespID &&
375 58
                    (!($this->currentCommand instanceof \Plasma\Drivers\MySQL\Commands\QueryCommand)
376 58
                        || \strtoupper(\substr($this->currentCommand->getQuery(), 0, 6)) !== 'SELECT') // Fix for MySQL 5.7
377
                ) ||
378
                (
379 47
                    $firstChar === \Plasma\Drivers\MySQL\Messages\EOFMessage::getID() &&
380 59
                    ($this->handshakeMessage->capability & \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_DEPRECATE_EOF) !== 0
381
                )
382
            );
383
            
384
            switch(true) {
385 59
                case ($firstChar === \Plasma\Drivers\MySQL\Messages\ErrResponseMessage::getID()):
386 1
                    $message = new \Plasma\Drivers\MySQL\Messages\ErrResponseMessage($this);
387 1
                break;
388 58
                case ($this->currentCommand instanceof \Plasma\Drivers\MySQL\Commands\StatementPrepareCommand && $firstChar === $okRespID):
389 36
                    $message = new \Plasma\Drivers\MySQL\Messages\PrepareStatementOkMessage($this);
390 36
                break;
391 58
                case $isOkMessage:
392 58
                    $message = new \Plasma\Drivers\MySQL\Messages\OkResponseMessage($this);
393 58
                    $this->lastOkMessage = $message;
394 58
                break;
395 46
                case ($firstChar === \Plasma\Drivers\MySQL\Messages\EOFMessage::getID() && $length < 6):
396
                    $message = new \Plasma\Drivers\MySQL\Messages\EOFMessage($this);
397
                break;
398
                default:
399 46
                    $buffer->prepend($firstChar);
400
                    
401 46
                    if($this->parseCallback !== null) {
402
                        $parse = $this->parseCallback;
403
                        $this->parseCallback = null;
404
                        
405
                        $caller = new \Plasma\Drivers\MySQL\ProtocolOnNextCaller($this, $buffer);
406
                        $parse($caller);
407 46
                    } elseif($this->currentCommand !== null) {
408 46
                        $command = $this->currentCommand;
409
                        
410 46
                        $caller = new \Plasma\Drivers\MySQL\ProtocolOnNextCaller($this, $buffer);
411 46
                        $command->onNext($caller);
412
                        
413 46
                        if($command->hasFinished()) {
414 36
                            $this->currentCommand = null;
415 36
                            $command->onComplete();
416
                        }
417
                    }
418
                    
419 46
                    if($this->buffer->getSize() > 0) {
420
                        $this->driver->getLoop()->futureTick(function () {
421 46
                            $this->processBuffer();
422 46
                        });
423
                    }
424
                    
425 46
                    return;
426
                break;
427
            }
428
        }
429
        
430 59
        $state = $message->setParserState();
431 59
        if($state !== -1) {
432 59
            $this->state = $state;
433
        }
434
        
435 59
        if($message instanceof \Plasma\Drivers\MySQL\Messages\HandshakeMessage) {
436 59
            $this->handshakeMessage = $message;
437
        }
438
        
439 59
        $this->handleMessage($buffer, $message);
440 59
    }
441
    
442
    /**
443
     * Handles an incoming message.
444
     * @param \Plasma\BinaryBuffer                             $buffer
445
     * @param \Plasma\Drivers\MySQL\Messages\MessageInterface  $message
446
     * @return void
447
     */
448 59
    function handleMessage(\Plasma\BinaryBuffer $buffer, \Plasma\Drivers\MySQL\Messages\MessageInterface $message) {
449
        try {
450 59
            $buffer = $message->parseMessage($buffer);
451 59
            if(!$buffer) {
452
                return;
453
            }
454
            
455 59
            if($this->currentCommand !== null) {
456
                if(
457 59
                    ($message instanceof \Plasma\Drivers\MySQL\Messages\OkResponseMessage || $message instanceof \Plasma\Drivers\MySQL\Messages\EOFMessage)
458 59
                    && $this->currentCommand->hasFinished()
459
                ) {
460 58
                    $command = $this->currentCommand;
461 58
                    $this->currentCommand = null;
462
                    
463 58
                    $command->onComplete();
464 57
                } elseif($message instanceof \Plasma\Drivers\MySQL\Messages\ErrResponseMessage) {
465 1
                    $error = new \Plasma\Exception($message->errorMessage, $message->errorCode);
466
                    
467 1
                    $command = $this->currentCommand;
468 1
                    $this->currentCommand = null;
469
                    
470 1
                    $command->onError($error);
471
                } else {
472 56
                    $command = $this->currentCommand;
473 56
                    $command->onNext($message);
474
                    
475 56
                    if($command->hasFinished()) {
476 56
                        if($this->currentCommand === $command) {
477
                            $this->currentCommand = null;
478
                        }
479
                        
480 59
                        $command->onComplete();
481
                    }
482
                }
483 59
            } elseif($message instanceof \Plasma\Drivers\MySQL\Messages\ErrResponseMessage) {
484
                $error = new \Plasma\Exception($message->errorMessage, $message->errorCode);
485
                $this->emit('error', array($error));
486
            }
487
            
488 59
            $this->emit('message', array($message));
489
        } catch (\Plasma\Drivers\MySQL\Messages\ParseException $e) {
490
            $state = $e->getState();
491
            if($state !== null) {
492
                $this->state = $state;
493
            }
494
            
495
            $buffer = $e->getBuffer();
496
            if($buffer !== null) {
497
                $this->buffer->clear();
498
                $this->buffer->append($buffer);
499
            }
500
            
501
            if($this->currentCommand !== null) {
502
                $this->currentCommand->onError($e);
503
            }
504
            
505
            $this->emit('error', array($e));
506
            $this->connection->close();
507
        }
508
        
509 59
        if($this->buffer->getSize() > 0) {
510
            $this->driver->getLoop()->futureTick(function () {
511 36
                $this->processBuffer();
512 36
            });
513
        }
514 59
    }
515
    
516
    /**
517
     * Compresses a packet.
518
     * @param string  $packet
519
     * @return string
520
     */
521 2
    protected function compressPacket(string $packet): string {
522 2
        $length = \strlen($packet);
523 2
        $packetlen = \Plasma\BinaryBuffer::writeInt3($length);
524 2
        $id = \Plasma\BinaryBuffer::writeInt1((++$this->compressionID));
525
        
526 2
        if($length < $this->compressionSizeThreshold) {
527 2
            return $packetlen.$id.\Plasma\BinaryBuffer::writeInt3(0).$packet;
528
        }
529
        
530
        $compressed = \zlib_encode($packet, \ZLIB_ENCODING_DEFLATE);
531
        $compresslen = \Plasma\BinaryBuffer::writeInt3(\strlen($compressed));
0 ignored issues
show
The assignment to $compresslen is dead and can be removed.
Loading history...
532
        
533
        return $comrpesslen.$id.$compressed;
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable $comrpesslen does not exist. Did you maybe mean $compresslen?
Loading history...
534
    }
535
    
536
    /**
537
     * Decompresses the buffer.
538
     * @return void
539
     */
540 2
    protected function decompressBuffer(): void {
541 2
        $buffer = new \Plasma\BinaryBuffer();
542
        
543
        // Copy packet header to new buffer
544 2
        for($i = 0; $i < 7; $i++) {
545 2
            $buffer->append($this->compressionBuffer[$i]);
546
        }
547
        
548 2
        $length = $buffer->readInt3();
549 2
        $this->compressionID = $buffer->readInt1();
550 2
        $uncompressedLength = $buffer->readInt3();
551
        
552 2
        if(($this->compressionBuffer->getSize() - 7) < $length) {
553
            return;
554
        }
555
        
556 2
        $this->compressionBuffer->read(7);
557 2
        $buffer = null;
1 ignored issue
show
The assignment to $buffer is dead and can be removed.
Loading history...
558
        
559 2
        if($uncompressedLength === 0) {
560 2
            $this->buffer->append($this->compressionBuffer->read($length));
561 2
            return;
562
        }
563
        
564 1
        $rawPacket = $this->compressionBuffer->read($length);
565 1
        $packet = \zlib_decode($rawPacket, $uncompressedLength);
566
        
567 1
        if(\strlen($packet) !== $uncompressedLength) {
568
            $packet = "\xFF\x00\x00\x00     Invalid compressed packet";
569
            $this->connection->end($packet);
570
            
571
            return;
572
        }
573
        
574 1
        $this->buffer->append($packet);
575
        
576 1
        if($this->compressionBuffer->getSize() > 7) {
577
            $this->decompressBuffer();
578
        }
579 1
    }
580
    
581
    /**
582
     * Adds the events to the connection.
583
     * @return void
584
     */
585 60
    protected function addEvents() {
586
        $this->connection->on('data', function ($chunk) {
587 59
            if($this->compressionEnabled && $this->state === static::STATE_OK) {
588 2
                $this->compressionBuffer->append($chunk);
589
                
590 2
                if($this->compressionBuffer->getSize() > 7) {
591 2
                    $this->decompressBuffer();
592
                }
593
            } else {
594 59
                $this->buffer->append($chunk);
595
            }
596
            
597 59
            $this->processBuffer();
598 60
        });
599
        
600
        $this->connection->on('close', function () {
601 3
            $this->handleClose();
602 60
        });
603 60
    }
604
    
605
    /**
606
     * Connection close handler.
607
     * @return void
608
     */
609 3
    protected function handleClose() {
610 3
        if($this->state === static::STATE_AUTH || $this->state === static::STATE_AUTH_SENT) {
611
            $this->state = static::STATE_AUTH_ERROR;
612
        }
613
        
614 3
        $this->buffer->clear();
615 3
        $this->messageBuffer->clear();
616 3
        $this->compressionBuffer->clear();
617 3
    }
618
}
619