GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Passed
Push — master ( 34d0ea...b9fd49 )
by Charlotte
02:48
created

ProtocolParser::getState()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 2
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 1
nc 1
nop 0
dl 0
loc 2
ccs 2
cts 2
cp 1
crap 1
rs 10
c 0
b 0
f 0
1
<?php
2
/**
3
 * Plasma Driver MySQL component
4
 * Copyright 2018-2019 PlasmaPHP, All Rights Reserved
5
 *
6
 * Website: https://github.com/PlasmaPHP
7
 * License: https://github.com/PlasmaPHP/driver-mysql/blob/master/LICENSE
8
*/
9
10
namespace Plasma\Drivers\MySQL;
11
12
/**
13
 * The MySQL Protocol Parser.
14
 * @internal
15
 */
16
class ProtocolParser implements \Evenement\EventEmitterInterface {
17
    use \Evenement\EventEmitterTrait;
18
    
19
    /**
20
     * @var int
21
     */
22
    const STATE_INIT = 0;
23
    
24
    /**
25
     * @var int
26
     */
27
    const STATE_HANDSHAKE = 1;
28
    
29
    /**
30
     * @var int
31
     */
32
    const STATE_HANDSHAKE_ERROR = 2;
33
    
34
    /**
35
     * @var int
36
     */
37
    const STATE_AUTH = 5;
38
    
39
    /**
40
     * @var int
41
     */
42
    const STATE_AUTH_SENT = 6;
43
    
44
    /**
45
     * @var int
46
     */
47
    const STATE_AUTH_ERROR = 7;
48
    
49
    /**
50
     * @var int
51
     */
52
    const STATE_OK = 9;
53
    
54
    /**
55
     * @var int
56
     */
57
    const CLIENT_CAPABILITIES = (
58
        \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_FOUND_ROWS |
59
        \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_LONG_PASSWORD |
60
        \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_LONG_FLAG |
61
        \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_LOCAL_FILES |
62
        \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_INTERACTIVE |
63
        \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_TRANSACTIONS |
64
        \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_SECURE_CONNECTION |
65
        \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_PROTOCOL_41 |
66
        \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_DEPRECATE_EOF
67
    );
68
    
69
    /**
70
     * @var int
71
     */
72
    const CLIENT_MAX_PACKET_SIZE = 0x1000000;
73
    
74
    /**
75
     * @var int
76
     */
77
    const CLIENT_CHARSET_NUMBER = 0x21;
78
    
79
    /**
80
     * @var \Plasma\Drivers\MySQL\Driver
81
     */
82
    protected $driver;
83
    
84
    /**
85
     * @var \React\Socket\ConnectionInterface
86
     */
87
    protected $connection;
88
    
89
    /**
90
     * @var int
91
     */
92
    protected $state = ProtocolParser::STATE_INIT;
93
    
94
    /**
95
     * @var \Plasma\BinaryBuffer
96
     */
97
    protected $buffer;
98
    
99
    /**
100
     * @var \Plasma\BinaryBuffer
101
     */
102
    protected $messageBuffer;
103
    
104
    /**
105
     * The sequence ID is incremented with each packet and may wrap around.
106
     * It starts at 0 and is reset to 0 when a new command begins in the Command Phase.
107
     * @var int
108
     * @see https://dev.mysql.com/doc/internals/en/sequence-id.html
109
     */
110
    protected $sequenceID = -1;
111
    
112
    /**
113
     * Whether we use compression.
114
     * @var bool
115
     */
116
    protected $compressionEnabled = false;
117
    
118
    /**
119
     * The compression ID is incremented with each packet and may wrap around.
120
     * The compression ID is independent to the sequence ID.
121
     * @var int
122
     * @see https://dev.mysql.com/doc/internals/en/compressed-packet-header.html
123
     */
124
    protected $compressionID = -1;
125
    
126
    /**
127
     * Small packets should not be compressed. This defines a minimum size for compression.
128
     * @var int
129
     * @see https://dev.mysql.com/doc/internals/en/uncompressed-payload.html
130
     */
131
    protected $compressionSizeThreshold = 50;
132
    
133
    /**
134
     * @var \Plasma\BinaryBuffer
135
     */
136
    protected $compressionBuffer;
137
    
138
    /**
139
     * @var \Plasma\Drivers\MySQL\Messages\HandshakeMessage|null
140
     */
141
    protected $handshakeMessage;
142
    
143
    /**
144
     * @var \Plasma\Drivers\MySQL\Messages\OkResponseMessage|null
145
     */
146
    protected $lastOkMessage;
147
    
148
    /**
149
     * @var \Plasma\CommandInterface|null
150
     */
151
    protected $currentCommand;
152
    
153
    /**
154
     * @var callable|null
155
     */
156
    protected $parseCallback;
157
    
158
    /**
159
     * Constructor.
160
     * @param \Plasma\Drivers\MySQL\Driver       $driver
161
     * @param \React\Socket\ConnectionInterface  $connection
162
     */
163 60
    function __construct(\Plasma\Drivers\MySQL\Driver $driver, \React\Socket\ConnectionInterface $connection) {
164 60
        $this->driver = $driver;
165 60
        $this->connection = $connection;
166
        
167 60
        $this->buffer = new \Plasma\BinaryBuffer();
168 60
        $this->messageBuffer = new \Plasma\BinaryBuffer();
169 60
        $this->compressionBuffer = new \Plasma\BinaryBuffer();
170
        
171 60
        $this->addEvents();
172 60
    }
173
    
174
    /**
175
     * Invoke a command to execute.
176
     * @param \Plasma\CommandInterface|null  $command
177
     * @return void
178
     */
179 59
    function invokeCommand(?\Plasma\CommandInterface $command): void {
180 59
        if($command === null) {
181 56
            return;
182
        }
183
        
184 59
        $this->currentCommand = $command;
185 59
        $this->processCommand();
186 59
    }
187
    
188
    /**
189
     * Executes a command, without handling any aftermath.
190
     * The `onComplete` callback will be immediately invoked, regardless of the `waitForCompletion` value.
191
     * @param \Plasma\CommandInterface  $command
192
     * @return void
193
     */
194
    function executeCommand(\Plasma\CommandInterface $command): void {
195
        $this->processCommand($command);
196
    }
197
    
198
    /**
199
     * Marks the command itself as finished, if currently running.
200
     * @param \Plasma\Drivers\MySQL\Commands\CommandInterface  $command
201
     * @return void
202
     */
203 56
    function markCommandAsFinished(\Plasma\CommandInterface $command): void {
204 56
        if($command === $this->currentCommand) {
205 56
            $this->currentCommand = null;
206
        }
207
        
208 56
        $command->onComplete();
209 56
    }
210
    
211
    /**
212
     * Get the parser state.
213
     * @return int
214
     */
215 1
    function getState(): int {
216 1
        return $this->state;
217
    }
218
    
219
    /**
220
     * Get the handshake message, or null.
221
     * @return \Plasma\Drivers\MySQL\Messages\HandshakeMessage|null
222
     */
223 59
    function getHandshakeMessage(): ?\Plasma\Drivers\MySQL\Messages\HandshakeMessage {
224 59
        return $this->handshakeMessage;
225
    }
226
    
227
    /**
228
     * Get the last ok response message, or null.
229
     * @return \Plasma\Drivers\MySQL\Messages\OkResponseMessage|null
230
     */
231 4
    function getLastOkMessage(): ?\Plasma\Drivers\MySQL\Messages\OkResponseMessage {
232 4
        return $this->lastOkMessage;
233
    }
234
    
235
    /**
236
     * Enables compression.
237
     * @return void
238
     */
239 4
    function enableCompression(): void {
240 4
        $this->compressionEnabled = true;
241 4
    }
242
    
243
    /**
244
     * Sends a packet to the server.
245
     * @param string  $packet
246
     * @return void
247
     */
248 60
    function sendPacket(string $packet): void {
249 60
        $initPacklen = \strlen($packet);
250
        
251
        do {
252 60
            $partial = \substr($packet, 0, static::CLIENT_MAX_PACKET_SIZE);
253 60
            $partlen = \strlen($partial);
254
            
255 60
            $packet = \substr($packet, static::CLIENT_MAX_PACKET_SIZE);
256 60
            $packlen = \strlen($packet);
257
            
258 60
            $length = \Plasma\BinaryBuffer::writeInt3($partlen);
259 60
            $sequence = \Plasma\BinaryBuffer::writeInt1((++$this->sequenceID));
260
            
261 60
            $packet = $length.$sequence.$partial;
262
            
263 60
            if($this->compressionEnabled && $this->state === static::STATE_OK) {
264 2
                $packet = $this->compressPacket($packet);
265
            }
266
            
267 60
            $this->connection->write($packet);
268 60
        } while($packlen > static::CLIENT_MAX_PACKET_SIZE);
269
        
270
        // If the packet is exactly the max size, we have to send two packets
271 60
        if($initPacklen === static::CLIENT_MAX_PACKET_SIZE) {
272 1
            $length = \Plasma\BinaryBuffer::writeInt3(0);
273 1
            $sequence = \Plasma\BinaryBuffer::writeInt1((++$this->sequenceID));
274 1
            $packet = $length.$sequence;
275
            
276 1
            if($this->compressionEnabled && $this->state === static::STATE_OK) {
277
                $packet = $this->compressPacket($packet);
278
            }
279
            
280 1
            $this->connection->write($packet);
281
        }
282 60
    }
283
    
284
    /**
285
     * Sets the parse callback.
286
     * @param callable $callback
287
     * @return void
288
     */
289
    function setParseCallback(callable $callback): void {
290
        $this->parseCallback = $callback;
291
    }
292
    
293
    /**
294
     * Processes a command.
295
     * @param \Plasma\CommandInterface|null  $command
296
     * @return void
297
     */
298 59
    protected function processCommand(?\Plasma\CommandInterface $command = null) {
299 59
        if($command === null && $this->currentCommand instanceof \Plasma\CommandInterface) {
300 59
            $command = $this->currentCommand;
301
            
302 59
            if($this->currentCommand instanceof \Plasma\Drivers\MySQL\Commands\CommandInterface) {
303 59
                $state = $command->setParserState();
304 59
                if($state !== -1) {
305 59
                    $this->state = $state;
306
                }
307
            }
308
        }
309
        
310 59
        if($command === null) {
311
            return;
312
        }
313
        
314 59
        if(!($command instanceof \Plasma\Drivers\MySQL\Commands\CommandInterface) || $command->resetSequence()) {
315 56
            $this->sequenceID = -1;
316 56
            $this->compressionID = -1;
317
        }
318
        
319 59
        $this->sendPacket($command->getEncodedMessage());
320
        
321 59
        if($command !== $this->currentCommand || !$command->waitForCompletion()) {
322 35
            $command->onComplete();
323
            
324 35
            if($command === $this->currentCommand) {
325 35
                $this->currentCommand = null;
326
            }
327
        }
328 59
    }
329
    
330
    /**
331
     * Processes the buffer.
332
     * @return void
333
     */
334 59
    protected function processBuffer() {
335 59
        if($this->buffer->getSize() < 4) {
336
            return;
337
        }
338
        
339 59
        $buffer = clone $this->buffer;
340
        
341 59
        $length = $buffer->readInt3();
342 59
        $this->sequenceID = $buffer->readInt1();
343
        
344 59
        if($length === static::CLIENT_MAX_PACKET_SIZE) {
345
            $this->buffer->read(($length + 4));
346
            $this->messageBuffer->append($buffer->read($length));
347
            return;
348 59
        } elseif($this->messageBuffer->getSize() > 0) {
349
            $this->messageBuffer->append($buffer->read($length));
350
            $buffer = $this->messageBuffer;
351
            $this->messageBuffer = new \Plasma\BinaryBuffer();
352
        }
353
        
354 59
        if($buffer->getSize() < $length) {
355
            return;
356
        }
357
        
358 59
        if($length > 0) {
359 59
            $this->buffer->read(($length + 4));
360 59
            $buffer->slice(0, $length);
361
        } else {
362
            $this->buffer->slice($buffer->getSize());
363
        }
364
        
365 59
        if($buffer->getSize() === 0) {
366
            return;
367
        }
368
        
369
        /** @var \Plasma\Drivers\MySQL\Messages\MessageInterface  $message */
370 59
        $message = null;
371
        
372 59
        if($this->state === static::STATE_INIT) {
373 59
            $message = new \Plasma\Drivers\MySQL\Messages\HandshakeMessage($this);
374
        } else {
375 59
            $firstChar = $buffer->read(1);
376
            
377 59
            $okRespID = \Plasma\Drivers\MySQL\Messages\OkResponseMessage::getID();
378
            $isOkMessage = (
379
                (
380 59
                    $firstChar === $okRespID &&
381 58
                    (!($this->currentCommand instanceof \Plasma\Drivers\MySQL\Commands\QueryCommand)
382 58
                        || \strtoupper(\substr($this->currentCommand->getQuery(), 0, 6)) !== 'SELECT') // Fix for MySQL 5.7
383
                ) ||
384
                (
385 47
                    $firstChar === \Plasma\Drivers\MySQL\Messages\EOFMessage::getID() &&
386 59
                    ($this->handshakeMessage->capability & \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_DEPRECATE_EOF) !== 0
387
                )
388
            );
389
            
390
            switch(true) {
391 59
                case ($firstChar === \Plasma\Drivers\MySQL\Messages\ErrResponseMessage::getID()):
392 1
                    $message = new \Plasma\Drivers\MySQL\Messages\ErrResponseMessage($this);
393 1
                break;
394 58
                case ($this->currentCommand instanceof \Plasma\Drivers\MySQL\Commands\StatementPrepareCommand && $firstChar === $okRespID):
395 36
                    $message = new \Plasma\Drivers\MySQL\Messages\PrepareStatementOkMessage($this);
396 36
                break;
397 58
                case $isOkMessage:
398 58
                    $message = new \Plasma\Drivers\MySQL\Messages\OkResponseMessage($this);
399 58
                    $this->lastOkMessage = $message;
400 58
                break;
401 46
                case ($firstChar === \Plasma\Drivers\MySQL\Messages\EOFMessage::getID() && $length < 6):
402
                    $message = new \Plasma\Drivers\MySQL\Messages\EOFMessage($this);
403
                break;
404
                default:
405 46
                    $buffer->prepend($firstChar);
406
                    
407 46
                    if($this->parseCallback !== null) {
408
                        $parse = $this->parseCallback;
409
                        $this->parseCallback = null;
410
                        
411
                        $caller = new \Plasma\Drivers\MySQL\ProtocolOnNextCaller($this, $buffer);
412
                        $parse($caller);
413 46
                    } elseif($this->currentCommand !== null) {
414 46
                        $command = $this->currentCommand;
415
                        
416 46
                        $caller = new \Plasma\Drivers\MySQL\ProtocolOnNextCaller($this, $buffer);
417 46
                        $command->onNext($caller);
418
                        
419 46
                        if($command->hasFinished()) {
420 36
                            $this->currentCommand = null;
421 36
                            $command->onComplete();
422
                        }
423
                    }
424
                    
425 46
                    if($this->buffer->getSize() > 0) {
426
                        $this->driver->getLoop()->futureTick(function () {
427 46
                            $this->processBuffer();
428 46
                        });
429
                    }
430
                    
431 46
                    return;
432
                break;
433
            }
434
        }
435
        
436 59
        $state = $message->setParserState();
437 59
        if($state !== -1) {
438 59
            $this->state = $state;
439
        }
440
        
441 59
        if($message instanceof \Plasma\Drivers\MySQL\Messages\HandshakeMessage) {
442 59
            $this->handshakeMessage = $message;
443
        }
444
        
445 59
        $this->handleMessage($buffer, $message);
446 59
    }
447
    
448
    /**
449
     * Handles an incoming message.
450
     * @param \Plasma\BinaryBuffer                             $buffer
451
     * @param \Plasma\Drivers\MySQL\Messages\MessageInterface  $message
452
     * @return void
453
     */
454 59
    function handleMessage(\Plasma\BinaryBuffer $buffer, \Plasma\Drivers\MySQL\Messages\MessageInterface $message) {
455
        try {
456 59
            $buffer = $message->parseMessage($buffer);
457 59
            if(!$buffer) {
458
                return;
459
            }
460
            
461 59
            if($this->currentCommand !== null) {
462
                if(
463 59
                    ($message instanceof \Plasma\Drivers\MySQL\Messages\OkResponseMessage || $message instanceof \Plasma\Drivers\MySQL\Messages\EOFMessage)
464 59
                    && $this->currentCommand->hasFinished()
465
                ) {
466 58
                    $command = $this->currentCommand;
467 58
                    $this->currentCommand = null;
468
                    
469 58
                    $command->onComplete();
470 57
                } elseif($message instanceof \Plasma\Drivers\MySQL\Messages\ErrResponseMessage) {
471 1
                    $error = new \Plasma\Exception($message->errorMessage, $message->errorCode);
472
                    
473 1
                    $command = $this->currentCommand;
474 1
                    $this->currentCommand = null;
475
                    
476 1
                    $command->onError($error);
477
                } else {
478 56
                    $command = $this->currentCommand;
479 56
                    $command->onNext($message);
480
                    
481 56
                    if($command->hasFinished()) {
482 56
                        if($this->currentCommand === $command) {
483
                            $this->currentCommand = null;
484
                        }
485
                        
486 59
                        $command->onComplete();
487
                    }
488
                }
489 59
            } elseif($message instanceof \Plasma\Drivers\MySQL\Messages\ErrResponseMessage) {
490
                $error = new \Plasma\Exception($message->errorMessage, $message->errorCode);
491
                $this->emit('error', array($error));
492
            }
493
            
494 59
            $this->emit('message', array($message));
495
        } catch (\Plasma\Drivers\MySQL\Messages\ParseException $e) {
496
            $state = $e->getState();
497
            if($state !== null) {
498
                $this->state = $state;
499
            }
500
            
501
            $buffer = $e->getBuffer();
502
            if($buffer !== null) {
503
                $this->buffer->clear();
504
                $this->buffer->append($buffer);
505
            }
506
            
507
            if($this->currentCommand !== null) {
508
                $this->currentCommand->onError($e);
509
            }
510
            
511
            $this->emit('error', array($e));
512
            $this->connection->close();
513
        }
514
        
515 59
        if($this->buffer->getSize() > 0) {
516
            $this->driver->getLoop()->futureTick(function () {
517 36
                $this->processBuffer();
518 36
            });
519
        }
520 59
    }
521
    
522
    /**
523
     * Compresses a packet.
524
     * @param string  $packet
525
     * @return string
526
     */
527 2
    protected function compressPacket(string $packet): string {
528 2
        $length = \strlen($packet);
529 2
        $packetlen = \Plasma\BinaryBuffer::writeInt3($length);
530 2
        $id = \Plasma\BinaryBuffer::writeInt1((++$this->compressionID));
531
        
532 2
        if($length < $this->compressionSizeThreshold) {
533 2
            return $packetlen.$id.\Plasma\BinaryBuffer::writeInt3(0).$packet;
534
        }
535
        
536
        $compressed = \zlib_encode($packet, \ZLIB_ENCODING_DEFLATE);
537
        $compresslen = \Plasma\BinaryBuffer::writeInt3(\strlen($compressed));
538
        
539
        return $compresslen.$id.$compressed;
540
    }
541
    
542
    /**
543
     * Decompresses the buffer.
544
     * @return void
545
     */
546 2
    protected function decompressBuffer(): void {
547 2
        $buffer = new \Plasma\BinaryBuffer();
548
        
549
        // Copy packet header to new buffer
550 2
        for($i = 0; $i < 7; $i++) {
551 2
            $buffer->append($this->compressionBuffer[$i]);
552
        }
553
        
554 2
        $length = $buffer->readInt3();
555 2
        $this->compressionID = $buffer->readInt1();
556 2
        $uncompressedLength = $buffer->readInt3();
557
        
558 2
        if(($this->compressionBuffer->getSize() - 7) < $length) {
559
            return;
560
        }
561
        
562 2
        $this->compressionBuffer->read(7);
563 2
        $buffer = null;
564
        
565 2
        if($uncompressedLength === 0) {
566 2
            $this->buffer->append($this->compressionBuffer->read($length));
567 2
            return;
568
        }
569
        
570 1
        $rawPacket = $this->compressionBuffer->read($length);
571 1
        $packet = \zlib_decode($rawPacket, $uncompressedLength);
572
        
573 1
        if(\strlen($packet) !== $uncompressedLength) {
574
            $packet = "\xFF\x00\x00\x00     Invalid compressed packet";
575
            $this->connection->end($packet);
576
            
577
            return;
578
        }
579
        
580 1
        $this->buffer->append($packet);
581
        
582 1
        if($this->compressionBuffer->getSize() > 7) {
583
            $this->decompressBuffer();
584
        }
585 1
    }
586
    
587
    /**
588
     * Adds the events to the connection.
589
     * @return void
590
     */
591 60
    protected function addEvents() {
592
        $this->connection->on('data', function ($chunk) {
593 59
            if($this->compressionEnabled && $this->state === static::STATE_OK) {
594 2
                $this->compressionBuffer->append($chunk);
595
                
596 2
                if($this->compressionBuffer->getSize() > 7) {
597 2
                    $this->decompressBuffer();
598
                }
599
            } else {
600 59
                $this->buffer->append($chunk);
601
            }
602
            
603 59
            $this->processBuffer();
604 60
        });
605
        
606
        $this->connection->on('close', function () {
607 3
            $this->handleClose();
608 60
        });
609 60
    }
610
    
611
    /**
612
     * Connection close handler.
613
     * @return void
614
     */
615 3
    protected function handleClose() {
616 3
        if($this->state === static::STATE_AUTH || $this->state === static::STATE_AUTH_SENT) {
617
            $this->state = static::STATE_AUTH_ERROR;
618
        }
619
        
620 3
        $this->buffer->clear();
621 3
        $this->messageBuffer->clear();
622 3
        $this->compressionBuffer->clear();
623 3
    }
624
}
625