GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Passed
Push — master ( b9fd49...bfd6a8 )
by Charlotte
02:33
created

ProtocolParser::markCommandAsFinished()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 6
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 2

Importance

Changes 0
Metric Value
cc 2
eloc 3
nc 2
nop 1
dl 0
loc 6
ccs 4
cts 4
cp 1
crap 2
rs 10
c 0
b 0
f 0
1
<?php
2
/**
3
 * Plasma Driver MySQL component
4
 * Copyright 2018-2019 PlasmaPHP, All Rights Reserved
5
 *
6
 * Website: https://github.com/PlasmaPHP
7
 * License: https://github.com/PlasmaPHP/driver-mysql/blob/master/LICENSE
8
*/
9
10
namespace Plasma\Drivers\MySQL;
11
12
/**
13
 * The MySQL Protocol Parser.
14
 * @internal
15
 */
16
class ProtocolParser implements \Evenement\EventEmitterInterface {
17
    use \Evenement\EventEmitterTrait;
18
    
19
    /**
20
     * @var int
21
     */
22
    const STATE_INIT = 0;
23
    
24
    /**
25
     * @var int
26
     */
27
    const STATE_HANDSHAKE = 1;
28
    
29
    /**
30
     * @var int
31
     */
32
    const STATE_HANDSHAKE_ERROR = 2;
33
    
34
    /**
35
     * @var int
36
     */
37
    const STATE_AUTH = 5;
38
    
39
    /**
40
     * @var int
41
     */
42
    const STATE_AUTH_SENT = 6;
43
    
44
    /**
45
     * @var int
46
     */
47
    const STATE_AUTH_ERROR = 7;
48
    
49
    /**
50
     * @var int
51
     */
52
    const STATE_OK = 9;
53
    
54
    /**
55
     * @var int
56
     */
57
    const CLIENT_CAPABILITIES = (
58
        \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_FOUND_ROWS |
59
        \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_LONG_PASSWORD |
60
        \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_LONG_FLAG |
61
        \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_LOCAL_FILES |
62
        \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_INTERACTIVE |
63
        \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_TRANSACTIONS |
64
        \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_SECURE_CONNECTION |
65
        \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_PROTOCOL_41 |
66
        \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_DEPRECATE_EOF
67
    );
68
    
69
    /**
70
     * @var int
71
     */
72
    const CLIENT_MAX_PACKET_SIZE = 0x1000000;
73
    
74
    /**
75
     * @var int
76
     */
77
    const CLIENT_CHARSET_NUMBER = 0x21;
78
    
79
    /**
80
     * @var \Plasma\Drivers\MySQL\Driver
81
     */
82
    protected $driver;
83
    
84
    /**
85
     * @var \React\Socket\ConnectionInterface
86
     */
87
    protected $connection;
88
    
89
    /**
90
     * @var int
91
     */
92
    protected $maxAllowedPacket;
93
    
94
    /**
95
     * @var int
96
     */
97
    protected $state = ProtocolParser::STATE_INIT;
98
    
99
    /**
100
     * @var \Plasma\BinaryBuffer
101
     */
102
    protected $buffer;
103
    
104
    /**
105
     * @var \Plasma\BinaryBuffer
106
     */
107
    protected $messageBuffer;
108
    
109
    /**
110
     * The sequence ID is incremented with each packet and may wrap around.
111
     * It starts at 0 and is reset to 0 when a new command begins in the Command Phase.
112
     * @var int
113
     * @see https://dev.mysql.com/doc/internals/en/sequence-id.html
114
     */
115
    protected $sequenceID = -1;
116
    
117
    /**
118
     * Whether we use compression.
119
     * @var bool
120
     */
121
    protected $compressionEnabled = false;
122
    
123
    /**
124
     * The compression ID is incremented with each packet and may wrap around.
125
     * The compression ID is independent to the sequence ID.
126
     * @var int
127
     * @see https://dev.mysql.com/doc/internals/en/compressed-packet-header.html
128
     */
129
    protected $compressionID = -1;
130
    
131
    /**
132
     * Small packets should not be compressed. This defines a minimum size for compression.
133
     * @var int
134
     * @see https://dev.mysql.com/doc/internals/en/uncompressed-payload.html
135
     */
136
    protected $compressionSizeThreshold = 50;
137
    
138
    /**
139
     * @var \Plasma\BinaryBuffer
140
     */
141
    protected $compressionBuffer;
142
    
143
    /**
144
     * @var \Plasma\Drivers\MySQL\Messages\HandshakeMessage|null
145
     */
146
    protected $handshakeMessage;
147
    
148
    /**
149
     * @var \Plasma\Drivers\MySQL\Messages\OkResponseMessage|null
150
     */
151
    protected $lastOkMessage;
152
    
153
    /**
154
     * @var \Plasma\CommandInterface|null
155
     */
156
    protected $currentCommand;
157
    
158
    /**
159
     * @var callable|null
160
     */
161
    protected $parseCallback;
162
    
163
    /**
164
     * Constructor.
165
     * @param \Plasma\Drivers\MySQL\Driver       $driver
166
     * @param \React\Socket\ConnectionInterface  $connection
167
     * @param int|null                           $maxAllowedPacket
168
     */
169 60
    function __construct(\Plasma\Drivers\MySQL\Driver $driver, \React\Socket\ConnectionInterface $connection, ?int $maxAllowedPacket = null) {
170 60
        $this->driver = $driver;
171 60
        $this->connection = $connection;
172 60
        $this->maxAllowedPacket = $maxAllowedPacket ?? static::CLIENT_MAX_PACKET_SIZE;
173
        
174 60
        $this->buffer = new \Plasma\BinaryBuffer();
175 60
        $this->messageBuffer = new \Plasma\BinaryBuffer();
176 60
        $this->compressionBuffer = new \Plasma\BinaryBuffer();
177
        
178 60
        $this->addEvents();
179 60
    }
180
    
181
    /**
182
     * Invoke a command to execute.
183
     * @param \Plasma\CommandInterface|null  $command
184
     * @return void
185
     */
186 59
    function invokeCommand(?\Plasma\CommandInterface $command): void {
187 59
        if($command === null) {
188 56
            return;
189
        }
190
        
191 59
        $this->currentCommand = $command;
192 59
        $this->processCommand();
193 59
    }
194
    
195
    /**
196
     * Executes a command, without handling any aftermath.
197
     * The `onComplete` callback will be immediately invoked, regardless of the `waitForCompletion` value.
198
     * @param \Plasma\CommandInterface  $command
199
     * @return void
200
     */
201
    function executeCommand(\Plasma\CommandInterface $command): void {
202
        $this->processCommand($command);
203
    }
204
    
205
    /**
206
     * Marks the command itself as finished, if currently running.
207
     * @param \Plasma\Drivers\MySQL\Commands\CommandInterface  $command
208
     * @return void
209
     */
210 56
    function markCommandAsFinished(\Plasma\CommandInterface $command): void {
211 56
        if($command === $this->currentCommand) {
212 56
            $this->currentCommand = null;
213
        }
214
        
215 56
        $command->onComplete();
216 56
    }
217
    
218
    /**
219
     * Get the parser state.
220
     * @return int
221
     */
222 1
    function getState(): int {
223 1
        return $this->state;
224
    }
225
    
226
    /**
227
     * Get the handshake message, or null.
228
     * @return \Plasma\Drivers\MySQL\Messages\HandshakeMessage|null
229
     */
230 59
    function getHandshakeMessage(): ?\Plasma\Drivers\MySQL\Messages\HandshakeMessage {
231 59
        return $this->handshakeMessage;
232
    }
233
    
234
    /**
235
     * Get the last ok response message, or null.
236
     * @return \Plasma\Drivers\MySQL\Messages\OkResponseMessage|null
237
     */
238 4
    function getLastOkMessage(): ?\Plasma\Drivers\MySQL\Messages\OkResponseMessage {
239 4
        return $this->lastOkMessage;
240
    }
241
    
242
    /**
243
     * Enables compression.
244
     * @return void
245
     */
246 4
    function enableCompression(): void {
247 4
        $this->compressionEnabled = true;
248 4
    }
249
    
250
    /**
251
     * Sends a packet to the server.
252
     * @param string  $packet
253
     * @return void
254
     */
255 60
    function sendPacket(string $packet): void {
256 60
        $initPacklen = \strlen($packet);
257
        
258
        do {
259 60
            $partial = \substr($packet, 0, $this->maxAllowedPacket);
260 60
            $partlen = \strlen($partial);
261
            
262 60
            $packet = \substr($packet, $this->maxAllowedPacket);
263 60
            $packlen = \strlen($packet);
264
            
265 60
            $length = \Plasma\BinaryBuffer::writeInt3($partlen);
266 60
            $sequence = \Plasma\BinaryBuffer::writeInt1((++$this->sequenceID));
267
            
268 60
            $packet = $length.$sequence.$partial;
269
            
270 60
            if($this->compressionEnabled && $this->state === static::STATE_OK) {
271 2
                $packet = $this->compressPacket($packet);
272
            }
273
            
274 60
            $this->connection->write($packet);
275 60
        } while($packlen > $this->maxAllowedPacket);
276
        
277
        // If the packet is exactly the max size, we have to send two packets
278 60
        if($initPacklen === $this->maxAllowedPacket) {
279 1
            $length = \Plasma\BinaryBuffer::writeInt3(0);
280 1
            $sequence = \Plasma\BinaryBuffer::writeInt1((++$this->sequenceID));
281 1
            $packet = $length.$sequence;
282
            
283 1
            if($this->compressionEnabled && $this->state === static::STATE_OK) {
284
                $packet = $this->compressPacket($packet);
285
            }
286
            
287 1
            $this->connection->write($packet);
288
        }
289 60
    }
290
    
291
    /**
292
     * Sets the parse callback.
293
     * @param callable $callback
294
     * @return void
295
     */
296
    function setParseCallback(callable $callback): void {
297
        $this->parseCallback = $callback;
298
    }
299
    
300
    /**
301
     * Processes a command.
302
     * @param \Plasma\CommandInterface|null  $command
303
     * @return void
304
     */
305 59
    protected function processCommand(?\Plasma\CommandInterface $command = null) {
306 59
        if($command === null && $this->currentCommand instanceof \Plasma\CommandInterface) {
307 59
            $command = $this->currentCommand;
308
            
309 59
            if($this->currentCommand instanceof \Plasma\Drivers\MySQL\Commands\CommandInterface) {
310 59
                $state = $command->setParserState();
311 59
                if($state !== -1) {
312 59
                    $this->state = $state;
313
                }
314
            }
315
        }
316
        
317 59
        if($command === null) {
318
            return;
319
        }
320
        
321 59
        if(!($command instanceof \Plasma\Drivers\MySQL\Commands\CommandInterface) || $command->resetSequence()) {
322 56
            $this->sequenceID = -1;
323 56
            $this->compressionID = -1;
324
        }
325
        
326 59
        $this->sendPacket($command->getEncodedMessage());
327
        
328 59
        if($command !== $this->currentCommand || !$command->waitForCompletion()) {
329 35
            $command->onComplete();
330
            
331 35
            if($command === $this->currentCommand) {
332 35
                $this->currentCommand = null;
333
            }
334
        }
335 59
    }
336
    
337
    /**
338
     * Processes the buffer.
339
     * @return void
340
     */
341 59
    protected function processBuffer() {
342 59
        if($this->buffer->getSize() < 4) {
343
            return;
344
        }
345
        
346 59
        $buffer = clone $this->buffer;
347
        
348 59
        $length = $buffer->readInt3();
349 59
        $this->sequenceID = $buffer->readInt1();
350
        
351 59
        if($length === $this->maxAllowedPacket) {
352
            $this->buffer->read(($length + 4));
353
            $this->messageBuffer->append($buffer->read($length));
354
            return;
355 59
        } elseif($this->messageBuffer->getSize() > 0) {
356
            $this->messageBuffer->append($buffer->read($length));
357
            $buffer = $this->messageBuffer;
358
            $this->messageBuffer = new \Plasma\BinaryBuffer();
359
        }
360
        
361 59
        if($buffer->getSize() < $length) {
362
            return;
363
        }
364
        
365 59
        if($length > 0) {
366 59
            $this->buffer->read(($length + 4));
367 59
            $buffer->slice(0, $length);
368
        } else {
369
            $this->buffer->slice($buffer->getSize());
370
        }
371
        
372 59
        if($buffer->getSize() === 0) {
373
            return;
374
        }
375
        
376
        /** @var \Plasma\Drivers\MySQL\Messages\MessageInterface  $message */
377 59
        $message = null;
378
        
379 59
        if($this->state === static::STATE_INIT) {
380 59
            $message = new \Plasma\Drivers\MySQL\Messages\HandshakeMessage($this);
381
        } else {
382 59
            $firstChar = $buffer->read(1);
383
            
384 59
            $okRespID = \Plasma\Drivers\MySQL\Messages\OkResponseMessage::getID();
385
            $isOkMessage = (
386
                (
387 59
                    $firstChar === $okRespID &&
388 58
                    (!($this->currentCommand instanceof \Plasma\Drivers\MySQL\Commands\QueryCommand)
389 58
                        || \strtoupper(\substr($this->currentCommand->getQuery(), 0, 6)) !== 'SELECT') // Fix for MySQL 5.7
390
                ) ||
391
                (
392 47
                    $firstChar === \Plasma\Drivers\MySQL\Messages\EOFMessage::getID() &&
393 59
                    ($this->handshakeMessage->capability & \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_DEPRECATE_EOF) !== 0
394
                )
395
            );
396
            
397
            switch(true) {
398 59
                case ($firstChar === \Plasma\Drivers\MySQL\Messages\ErrResponseMessage::getID()):
399 1
                    $message = new \Plasma\Drivers\MySQL\Messages\ErrResponseMessage($this);
400 1
                break;
401 58
                case ($this->currentCommand instanceof \Plasma\Drivers\MySQL\Commands\StatementPrepareCommand && $firstChar === $okRespID):
402 36
                    $message = new \Plasma\Drivers\MySQL\Messages\PrepareStatementOkMessage($this);
403 36
                break;
404 58
                case $isOkMessage:
405 58
                    $message = new \Plasma\Drivers\MySQL\Messages\OkResponseMessage($this);
406 58
                    $this->lastOkMessage = $message;
407 58
                break;
408 46
                case ($firstChar === \Plasma\Drivers\MySQL\Messages\EOFMessage::getID() && $length < 6):
409
                    $message = new \Plasma\Drivers\MySQL\Messages\EOFMessage($this);
410
                break;
411
                default:
412 46
                    $buffer->prepend($firstChar);
413
                    
414 46
                    if($this->parseCallback !== null) {
415
                        $parse = $this->parseCallback;
416
                        $this->parseCallback = null;
417
                        
418
                        $caller = new \Plasma\Drivers\MySQL\ProtocolOnNextCaller($this, $buffer);
419
                        $parse($caller);
420 46
                    } elseif($this->currentCommand !== null) {
421 46
                        $command = $this->currentCommand;
422
                        
423 46
                        $caller = new \Plasma\Drivers\MySQL\ProtocolOnNextCaller($this, $buffer);
424 46
                        $command->onNext($caller);
425
                        
426 46
                        if($command->hasFinished()) {
427 36
                            $this->currentCommand = null;
428 36
                            $command->onComplete();
429
                        }
430
                    }
431
                    
432 46
                    if($this->buffer->getSize() > 0) {
433
                        $this->driver->getLoop()->futureTick(function () {
434 46
                            $this->processBuffer();
435 46
                        });
436
                    }
437
                    
438 46
                    return;
439
                break;
440
            }
441
        }
442
        
443 59
        $state = $message->setParserState();
444 59
        if($state !== -1) {
445 59
            $this->state = $state;
446
        }
447
        
448 59
        if($message instanceof \Plasma\Drivers\MySQL\Messages\HandshakeMessage) {
449 59
            $this->handshakeMessage = $message;
450
        }
451
        
452 59
        $this->handleMessage($buffer, $message);
453 59
    }
454
    
455
    /**
456
     * Handles an incoming message.
457
     * @param \Plasma\BinaryBuffer                             $buffer
458
     * @param \Plasma\Drivers\MySQL\Messages\MessageInterface  $message
459
     * @return void
460
     */
461 59
    function handleMessage(\Plasma\BinaryBuffer $buffer, \Plasma\Drivers\MySQL\Messages\MessageInterface $message) {
462
        try {
463 59
            $buffer = $message->parseMessage($buffer);
464 59
            if(!$buffer) {
465
                return;
466
            }
467
            
468 59
            if($this->currentCommand !== null) {
469
                if(
470 59
                    ($message instanceof \Plasma\Drivers\MySQL\Messages\OkResponseMessage || $message instanceof \Plasma\Drivers\MySQL\Messages\EOFMessage)
471 59
                    && $this->currentCommand->hasFinished()
472
                ) {
473 58
                    $command = $this->currentCommand;
474 58
                    $this->currentCommand = null;
475
                    
476 58
                    $command->onComplete();
477 57
                } elseif($message instanceof \Plasma\Drivers\MySQL\Messages\ErrResponseMessage) {
478 1
                    $error = new \Plasma\Exception($message->errorMessage, $message->errorCode);
479
                    
480 1
                    $command = $this->currentCommand;
481 1
                    $this->currentCommand = null;
482
                    
483 1
                    $command->onError($error);
484
                } else {
485 56
                    $command = $this->currentCommand;
486 56
                    $command->onNext($message);
487
                    
488 56
                    if($command->hasFinished()) {
489 56
                        if($this->currentCommand === $command) {
490
                            $this->currentCommand = null;
491
                        }
492
                        
493 59
                        $command->onComplete();
494
                    }
495
                }
496 59
            } elseif($message instanceof \Plasma\Drivers\MySQL\Messages\ErrResponseMessage) {
497
                $error = new \Plasma\Exception($message->errorMessage, $message->errorCode);
498
                $this->emit('error', array($error));
499
            }
500
            
501 59
            $this->emit('message', array($message));
502
        } catch (\Plasma\Drivers\MySQL\Messages\ParseException $e) {
503
            $state = $e->getState();
504
            if($state !== null) {
505
                $this->state = $state;
506
            }
507
            
508
            $buffer = $e->getBuffer();
509
            if($buffer !== null) {
510
                $this->buffer->clear();
511
                $this->buffer->append($buffer);
512
            }
513
            
514
            if($this->currentCommand !== null) {
515
                $this->currentCommand->onError($e);
516
            }
517
            
518
            $this->emit('error', array($e));
519
            $this->connection->close();
520
        }
521
        
522 59
        if($this->buffer->getSize() > 0) {
523
            $this->driver->getLoop()->futureTick(function () {
524 36
                $this->processBuffer();
525 36
            });
526
        }
527 59
    }
528
    
529
    /**
530
     * Compresses a packet.
531
     * @param string  $packet
532
     * @return string
533
     */
534 2
    protected function compressPacket(string $packet): string {
535 2
        $length = \strlen($packet);
536 2
        $packetlen = \Plasma\BinaryBuffer::writeInt3($length);
537 2
        $id = \Plasma\BinaryBuffer::writeInt1((++$this->compressionID));
538
        
539 2
        if($length < $this->compressionSizeThreshold) {
540 2
            return $packetlen.$id.\Plasma\BinaryBuffer::writeInt3(0).$packet;
541
        }
542
        
543
        $compressed = \zlib_encode($packet, \ZLIB_ENCODING_DEFLATE);
544
        $compresslen = \Plasma\BinaryBuffer::writeInt3(\strlen($compressed));
545
        
546
        return $compresslen.$id.$compressed;
547
    }
548
    
549
    /**
550
     * Decompresses the buffer.
551
     * @return void
552
     */
553 2
    protected function decompressBuffer(): void {
554 2
        $buffer = new \Plasma\BinaryBuffer();
555
        
556
        // Copy packet header to new buffer
557 2
        for($i = 0; $i < 7; $i++) {
558 2
            $buffer->append($this->compressionBuffer[$i]);
559
        }
560
        
561 2
        $length = $buffer->readInt3();
562 2
        $this->compressionID = $buffer->readInt1();
563 2
        $uncompressedLength = $buffer->readInt3();
564
        
565 2
        if(($this->compressionBuffer->getSize() - 7) < $length) {
566
            return;
567
        }
568
        
569 2
        $this->compressionBuffer->read(7);
570 2
        $buffer = null;
571
        
572 2
        if($uncompressedLength === 0) {
573 2
            $this->buffer->append($this->compressionBuffer->read($length));
574 2
            return;
575
        }
576
        
577 1
        $rawPacket = $this->compressionBuffer->read($length);
578 1
        $packet = \zlib_decode($rawPacket, $uncompressedLength);
579
        
580 1
        if(\strlen($packet) !== $uncompressedLength) {
581
            $packet = "\xFF\x00\x00\x00     Invalid compressed packet";
582
            $this->connection->end($packet);
583
            
584
            return;
585
        }
586
        
587 1
        $this->buffer->append($packet);
588
        
589 1
        if($this->compressionBuffer->getSize() > 7) {
590
            $this->decompressBuffer();
591
        }
592 1
    }
593
    
594
    /**
595
     * Adds the events to the connection.
596
     * @return void
597
     */
598 60
    protected function addEvents() {
599
        $this->connection->on('data', function ($chunk) {
600 59
            if($this->compressionEnabled && $this->state === static::STATE_OK) {
601 2
                $this->compressionBuffer->append($chunk);
602
                
603 2
                if($this->compressionBuffer->getSize() > 7) {
604 2
                    $this->decompressBuffer();
605
                }
606
            } else {
607 59
                $this->buffer->append($chunk);
608
            }
609
            
610 59
            $this->processBuffer();
611 60
        });
612
        
613
        $this->connection->on('close', function () {
614 3
            $this->handleClose();
615 60
        });
616 60
    }
617
    
618
    /**
619
     * Connection close handler.
620
     * @return void
621
     */
622 3
    protected function handleClose() {
623 3
        if($this->state === static::STATE_AUTH || $this->state === static::STATE_AUTH_SENT) {
624
            $this->state = static::STATE_AUTH_ERROR;
625
        }
626
        
627 3
        $this->buffer->clear();
628 3
        $this->messageBuffer->clear();
629 3
        $this->compressionBuffer->clear();
630 3
    }
631
}
632