GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Push — master ( 1b0f9b...f731b0 )
by Charlotte
04:44 queued 02:01
created

ProtocolParser::sendPacket()   A

Complexity

Conditions 5
Paths 4

Size

Total Lines 27
Code Lines 17

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 16
CRAP Score 5.005

Importance

Changes 0
Metric Value
cc 5
eloc 17
nc 4
nop 1
dl 0
loc 27
ccs 16
cts 17
cp 0.9412
crap 5.005
rs 9.3888
c 0
b 0
f 0
1
<?php
2
/**
3
 * Plasma Driver MySQL component
4
 * Copyright 2018-2019 PlasmaPHP, All Rights Reserved
5
 *
6
 * Website: https://github.com/PlasmaPHP
7
 * License: https://github.com/PlasmaPHP/driver-mysql/blob/master/LICENSE
8
*/
9
10
namespace Plasma\Drivers\MySQL;
11
12
/**
13
 * The MySQL Protocol Parser.
14
 * @internal
15
 */
16
class ProtocolParser implements \Evenement\EventEmitterInterface {
17
    use \Evenement\EventEmitterTrait;
18
    
19
    /**
20
     * @var int
21
     */
22
    const STATE_INIT = 0;
23
    
24
    /**
25
     * @var int
26
     */
27
    const STATE_HANDSHAKE = 1;
28
    
29
    /**
30
     * @var int
31
     */
32
    const STATE_HANDSHAKE_ERROR = 2;
33
    
34
    /**
35
     * @var int
36
     */
37
    const STATE_AUTH = 5;
38
    
39
    /**
40
     * @var int
41
     */
42
    const STATE_AUTH_SENT = 6;
43
    
44
    /**
45
     * @var int
46
     */
47
    const STATE_AUTH_ERROR = 7;
48
    
49
    /**
50
     * @var int
51
     */
52
    const STATE_OK = 9;
53
    
54
    /**
55
     * @var int
56
     */
57
    const CLIENT_CAPABILITIES = (
58
        \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_FOUND_ROWS |
59
        \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_LONG_PASSWORD |
60
        \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_LONG_FLAG |
61
        \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_LOCAL_FILES |
62
        \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_INTERACTIVE |
63
        \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_TRANSACTIONS |
64
        \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_SECURE_CONNECTION |
65
        \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_PROTOCOL_41 |
66
        \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_DEPRECATE_EOF
67
    );
68
    
69
    /**
70
     * @var int
71
     */
72
    const CLIENT_MAX_PACKET_SIZE = 0x1000000;
73
    
74
    /**
75
     * @var int
76
     */
77
    const CLIENT_CHARSET_NUMBER = 0x21;
78
    
79
    /**
80
     * @var \Plasma\Drivers\MySQL\Driver
81
     */
82
    protected $driver;
83
    
84
    /**
85
     * @var \React\Socket\ConnectionInterface
86
     */
87
    protected $connection;
88
    
89
    /**
90
     * @var int
91
     */
92
    protected $state = ProtocolParser::STATE_INIT;
93
    
94
    /**
95
     * @var \Plasma\BinaryBuffer
96
     */
97
    protected $buffer;
98
    
99
    /**
100
     * @var \Plasma\BinaryBuffer
101
     */
102
    protected $messageBuffer;
103
    
104
    /**
105
     * The sequence ID is incremented with each packet and may wrap around.
106
     * It starts at 0 and is reset to 0 when a new command begins in the Command Phase.
107
     * @var int
108
     * @see https://dev.mysql.com/doc/internals/en/sequence-id.html
109
     */
110
    protected $sequenceID = -1;
111
    
112
    /**
113
     * Whether we use compression.
114
     * @var bool
115
     */
116
    protected $compressionEnabled = false;
117
    
118
    /**
119
     * The compression ID is incremented with each packet and may wrap around.
120
     * The compression ID is independent to the sequence ID.
121
     * @var int
122
     * @see https://dev.mysql.com/doc/internals/en/compressed-packet-header.html
123
     */
124
    protected $compressionID = -1;
125
    
126
    /**
127
     * Small packets should not be compressed. This defines a minimum size for compression.
128
     * @var int
129
     * @see https://dev.mysql.com/doc/internals/en/uncompressed-payload.html
130
     */
131
    protected $compressionSizeThreshold = 50;
132
    
133
    /**
134
     * @var \Plasma\BinaryBuffer
135
     */
136
    protected $compressionBuffer;
137
    
138
    /**
139
     * @var \Plasma\Drivers\MySQL\Messages\HandshakeMessage|null
140
     */
141
    protected $handshakeMessage;
142
    
143
    /**
144
     * @var \Plasma\Drivers\MySQL\Messages\OkResponseMessage|null
145
     */
146
    protected $lastOkMessage;
147
    
148
    /**
149
     * @var \Plasma\CommandInterface|null
150
     */
151
    protected $currentCommand;
152
    
153
    /**
154
     * @var callable|null
155
     */
156
    protected $parseCallback;
157
    
158
    /**
159
     * Constructor.
160
     * @param \Plasma\Drivers\MySQL\Driver       $driver
161
     * @param \React\Socket\ConnectionInterface  $connection
162
     */
163 1
    function __construct(\Plasma\Drivers\MySQL\Driver $driver, \React\Socket\ConnectionInterface $connection) {
164 1
        $this->driver = $driver;
165 1
        $this->connection = $connection;
166
        
167 1
        $this->buffer = new \Plasma\BinaryBuffer();
168 1
        $this->messageBuffer = new \Plasma\BinaryBuffer();
169 1
        $this->compressionBuffer = new \Plasma\BinaryBuffer();
170
        
171 1
        $this->addEvents();
172 1
    }
173
    
174
    /**
175
     * Invoke a command to execute.
176
     * @param \Plasma\CommandInterface|null  $command
177
     * @return void
178
     */
179
    function invokeCommand(?\Plasma\CommandInterface $command): void {
180
        if($command === null) {
181
            return;
182
        }
183
        
184
        $this->currentCommand = $command;
185
        $this->processCommand();
186
    }
187
    
188
    /**
189
     * Executes a command, without handling any aftermath.
190
     * The `onComplete` callback will be immediately invoked, regardless of the `waitForCompletion` value.
191
     * @param \Plasma\CommandInterface  $command
192
     * @return void
193
     */
194
    function executeCommand(\Plasma\CommandInterface $command): void {
195
        $this->processCommand($command);
196
    }
197
    
198
    /**
199
     * Marks the command itself as finished, if currently running.
200
     * @param \Plasma\Drivers\MySQL\Commands\CommandInterface  $command
201
     * @return void
202
     */
203
    function markCommandAsFinished(\Plasma\CommandInterface $command): void {
204
        if($command === $this->currentCommand) {
205
            $this->currentCommand = null;
206
        }
207
        
208
        $command->onComplete();
209
    }
210
    
211
    /**
212
     * Get the parser state.
213
     * @return int
214
     */
215
    function getState(): int {
216
        return $this->state;
217
    }
218
    
219
    /**
220
     * Get the handshake message, or null.
221
     * @return \Plasma\Drivers\MySQL\Messages\HandshakeMessage|null
222
     */
223
    function getHandshakeMessage(): ?\Plasma\Drivers\MySQL\Messages\HandshakeMessage {
224
        return $this->handshakeMessage;
225
    }
226
    
227
    /**
228
     * Get the last ok response message, or null.
229
     * @return \Plasma\Drivers\MySQL\Messages\OkResponseMessage|null
230
     */
231
    function getLastOkMessage(): ?\Plasma\Drivers\MySQL\Messages\OkResponseMessage {
232
        return $this->lastOkMessage;
233
    }
234
    
235
    /**
236
     * Enables compression.
237
     * @return void
238
     */
239
    function enableCompression(): void {
240
        $this->compressionEnabled = true;
241
    }
242
    
243
    /**
244
     * Sends a packet to the server.
245
     * @param string  $packet
246
     * @return void
247
     */
248 1
    function sendPacket(string $packet): void {
249 1
        $initPacklen = \strlen($packet);
250
        
251
        do {
252 1
            $partial = \substr($packet, 0, static::CLIENT_MAX_PACKET_SIZE);
253 1
            $partlen = \strlen($partial);
254
            
255 1
            $packet = \substr($packet, static::CLIENT_MAX_PACKET_SIZE);
256 1
            $packlen = \strlen($packet);
257
            
258 1
            $length = \Plasma\BinaryBuffer::writeInt3($partlen);
259 1
            $sequence = \Plasma\BinaryBuffer::writeInt1((++$this->sequenceID));
260
            
261 1
            $packet = $length.$sequence.$partial;
262
            
263 1
            if($this->compressionEnabled && $this->state === static::STATE_OK) {
264
                $packet = $this->compressPacket($packet);
265
            }
266
            
267 1
            $this->connection->write($packet);
268 1
        } while($packlen > static::CLIENT_MAX_PACKET_SIZE);
269
        
270
        // If the packet is exactly the max size, we have to send two packets
271 1
        if($initPacklen === static::CLIENT_MAX_PACKET_SIZE) {
272 1
            $length = \Plasma\BinaryBuffer::writeInt3(0);
273 1
            $sequence = \Plasma\BinaryBuffer::writeInt1((++$this->sequenceID));
274 1
            $this->connection->write($length.$sequence);
275
        }
276 1
    }
277
    
278
    /**
279
     * Sets the parse callback.
280
     * @param callable $callback
281
     * @return void
282
     */
283
    function setParseCallback(callable $callback): void {
284
        $this->parseCallback = $callback;
285
    }
286
    
287
    /**
288
     * Processes a command.
289
     * @param \Plasma\CommandInterface|null  $command
290
     * @return void
291
     */
292
    protected function processCommand(?\Plasma\CommandInterface $command = null) {
293
        if($command === null && $this->currentCommand instanceof \Plasma\CommandInterface) {
294
            $command = $this->currentCommand;
295
            
296
            if($this->currentCommand instanceof \Plasma\Drivers\MySQL\Commands\CommandInterface) {
297
                $state = $command->setParserState();
298
                if($state !== -1) {
299
                    $this->state = $state;
300
                }
301
            }
302
        }
303
        
304
        if($command === null) {
305
            return;
306
        }
307
        
308
        if(!($command instanceof \Plasma\Drivers\MySQL\Commands\CommandInterface) || $command->resetSequence()) {
309
            $this->sequenceID = -1;
310
            $this->compressionID = -1;
311
        }
312
        
313
        $this->sendPacket($command->getEncodedMessage());
314
        
315
        if($command !== $this->currentCommand || !$command->waitForCompletion()) {
316
            $command->onComplete();
317
            
318
            if($command === $this->currentCommand) {
319
                $this->currentCommand = null;
320
            }
321
        }
322
    }
323
    
324
    /**
325
     * Processes the buffer.
326
     * @return void
327
     */
328
    protected function processBuffer() {
329
        if($this->buffer->getSize() < 4) {
330
            return;
331
        }
332
        
333
        $buffer = clone $this->buffer;
334
        
335
        $length = $buffer->readInt3();
336
        $this->sequenceID = $buffer->readInt1();
337
        
338
        if($length === static::CLIENT_MAX_PACKET_SIZE) {
339
            $this->buffer->read(($length + 4));
340
            $this->messageBuffer->append($buffer->read($length));
341
            return;
342
        } elseif($this->messageBuffer->getSize() > 0) {
343
            $this->messageBuffer->append($buffer->read($length));
344
            $buffer = $this->messageBuffer;
345
            $this->messageBuffer = new \Plasma\BinaryBuffer();
346
        }
347
        
348
        if($buffer->getSize() < $length) {
349
            return;
350
        }
351
        
352
        if($length > 0) {
353
            $this->buffer->read(($length + 4));
354
            $buffer->slice(0, $length);
355
        } else {
356
            $this->buffer->slice($buffer->getSize());
357
        }
358
        
359
        if($buffer->getSize() === 0) {
360
            return;
361
        }
362
        
363
        /** @var \Plasma\Drivers\MySQL\Messages\MessageInterface  $message */
364
        $message = null;
365
        
366
        if($this->state === static::STATE_INIT) {
367
            $message = new \Plasma\Drivers\MySQL\Messages\HandshakeMessage($this);
368
        } else {
369
            $firstChar = $buffer->read(1);
370
            
371
            $okRespID = \Plasma\Drivers\MySQL\Messages\OkResponseMessage::getID();
372
            $isOkMessage = (
373
                (
374
                    $firstChar === $okRespID &&
375
                    (!($this->currentCommand instanceof \Plasma\Drivers\MySQL\Commands\QueryCommand)
376
                        || \strtoupper(\substr($this->currentCommand->getQuery(), 0, 6)) !== 'SELECT') // Fix for MySQL 5.7
377
                ) ||
378
                (
379
                    $firstChar === \Plasma\Drivers\MySQL\Messages\EOFMessage::getID() &&
380
                    ($this->handshakeMessage->capability & \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_DEPRECATE_EOF) !== 0
381
                )
382
            );
383
            
384
            switch(true) {
385
                case ($firstChar === \Plasma\Drivers\MySQL\Messages\ErrResponseMessage::getID()):
386
                    $message = new \Plasma\Drivers\MySQL\Messages\ErrResponseMessage($this);
387
                break;
388
                case ($this->currentCommand instanceof \Plasma\Drivers\MySQL\Commands\StatementPrepareCommand && $firstChar === $okRespID):
389
                    $message = new \Plasma\Drivers\MySQL\Messages\PrepareStatementOkMessage($this);
390
                break;
391
                case $isOkMessage:
392
                    $message = new \Plasma\Drivers\MySQL\Messages\OkResponseMessage($this);
393
                    $this->lastOkMessage = $message;
394
                break;
395
                case ($firstChar === \Plasma\Drivers\MySQL\Messages\EOFMessage::getID() && $length < 6):
396
                    $message = new \Plasma\Drivers\MySQL\Messages\EOFMessage($this);
397
                break;
398
                default:
399
                    $buffer->prepend($firstChar);
400
                    
401
                    if($this->parseCallback !== null) {
402
                        $parse = $this->parseCallback;
403
                        $this->parseCallback = null;
404
                        
405
                        $caller = new \Plasma\Drivers\MySQL\ProtocolOnNextCaller($this, $buffer);
406
                        $parse($caller);
407
                    } elseif($this->currentCommand !== null) {
408
                        $command = $this->currentCommand;
409
                        
410
                        $caller = new \Plasma\Drivers\MySQL\ProtocolOnNextCaller($this, $buffer);
411
                        $command->onNext($caller);
412
                        
413
                        if($command->hasFinished()) {
414
                            $this->currentCommand = null;
415
                            $command->onComplete();
416
                        }
417
                    }
418
                    
419
                    if($this->buffer->getSize() > 0) {
420
                        $this->driver->getLoop()->futureTick(function () {
421
                            $this->processBuffer();
422
                        });
423
                    }
424
                    
425
                    return;
426
                break;
427
            }
428
        }
429
        
430
        $state = $message->setParserState();
431
        if($state !== -1) {
432
            $this->state = $state;
433
        }
434
        
435
        if($message instanceof \Plasma\Drivers\MySQL\Messages\HandshakeMessage) {
436
            $this->handshakeMessage = $message;
437
        }
438
        
439
        $this->handleMessage($buffer, $message);
440
    }
441
    
442
    /**
443
     * Handles an incoming message.
444
     * @param \Plasma\BinaryBuffer                             $buffer
445
     * @param \Plasma\Drivers\MySQL\Messages\MessageInterface  $message
446
     * @return void
447
     */
448
    function handleMessage(\Plasma\BinaryBuffer $buffer, \Plasma\Drivers\MySQL\Messages\MessageInterface $message) {
449
        try {
450
            $buffer = $message->parseMessage($buffer);
451
            if(!$buffer) {
452
                return;
453
            }
454
            
455
            if($this->currentCommand !== null) {
456
                if(
457
                    ($message instanceof \Plasma\Drivers\MySQL\Messages\OkResponseMessage || $message instanceof \Plasma\Drivers\MySQL\Messages\EOFMessage)
458
                    && $this->currentCommand->hasFinished()
459
                ) {
460
                    $command = $this->currentCommand;
461
                    $this->currentCommand = null;
462
                    
463
                    $command->onComplete();
464
                } elseif($message instanceof \Plasma\Drivers\MySQL\Messages\ErrResponseMessage) {
465
                    $error = new \Plasma\Exception($message->errorMessage, $message->errorCode);
466
                    
467
                    $command = $this->currentCommand;
468
                    $this->currentCommand = null;
469
                    
470
                    $command->onError($error);
471
                } else {
472
                    $command = $this->currentCommand;
473
                    $command->onNext($message);
474
                    
475
                    if($command->hasFinished()) {
476
                        if($this->currentCommand === $command) {
477
                            $this->currentCommand = null;
478
                        }
479
                        
480
                        $command->onComplete();
481
                    }
482
                }
483
            } elseif($message instanceof \Plasma\Drivers\MySQL\Messages\ErrResponseMessage) {
484
                $error = new \Plasma\Exception($message->errorMessage, $message->errorCode);
485
                $this->emit('error', array($error));
486
            }
487
            
488
            $this->emit('message', array($message));
489
        } catch (\Plasma\Drivers\MySQL\Messages\ParseException $e) {
490
            $state = $e->getState();
491
            if($state !== null) {
492
                $this->state = $state;
493
            }
494
            
495
            $buffer = $e->getBuffer();
496
            if($buffer !== null) {
497
                $this->buffer->clear();
498
                $this->buffer->append($buffer);
499
            }
500
            
501
            if($this->currentCommand !== null) {
502
                $this->currentCommand->onError($e);
503
            }
504
            
505
            $this->emit('error', array($e));
506
            $this->connection->close();
507
        }
508
        
509
        if($this->buffer->getSize() > 0) {
510
            $this->driver->getLoop()->futureTick(function () {
511
                $this->processBuffer();
512
            });
513
        }
514
    }
515
    
516
    /**
517
     * Compresses a packet.
518
     * @param string  $packet
519
     * @return string
520
     */
521
    protected function compressPacket(string $packet): string {
522
        $length = \strlen($packet);
523
        $packetlen = \Plasma\BinaryBuffer::writeInt3($length);
524
        $id = \Plasma\BinaryBuffer::writeInt1((++$this->compressionID));
525
        
526
        if($length < $this->compressionSizeThreshold) {
527
            return $packetlen.$id.\Plasma\BinaryBuffer::writeInt3(0).$packet;
528
        }
529
        
530
        $compressed = \zlib_encode($packet, \ZLIB_ENCODING_DEFLATE);
531
        $compresslen = \Plasma\BinaryBuffer::writeInt3(\strlen($compressed));
532
        
533
        return $compresslen.$id.$compressed;
534
    }
535
    
536
    /**
537
     * Decompresses the buffer.
538
     * @return void
539
     */
540
    protected function decompressBuffer(): void {
541
        $buffer = new \Plasma\BinaryBuffer();
542
        
543
        // Copy packet header to new buffer
544
        for($i = 0; $i < 7; $i++) {
545
            $buffer->append($this->compressionBuffer[$i]);
546
        }
547
        
548
        $length = $buffer->readInt3();
549
        $this->compressionID = $buffer->readInt1();
550
        $uncompressedLength = $buffer->readInt3();
551
        
552
        if(($this->compressionBuffer->getSize() - 7) < $length) {
553
            return;
554
        }
555
        
556
        $this->compressionBuffer->read(7);
557
        $buffer = null;
558
        
559
        if($uncompressedLength === 0) {
560
            $this->buffer->append($this->compressionBuffer->read($length));
561
            return;
562
        }
563
        
564
        $rawPacket = $this->compressionBuffer->read($length);
565
        $packet = \zlib_decode($rawPacket, $uncompressedLength);
566
        
567
        if(\strlen($packet) !== $uncompressedLength) {
568
            $packet = "\xFF\x00\x00\x00     Invalid compressed packet";
569
            $this->connection->end($packet);
570
            
571
            return;
572
        }
573
        
574
        $this->buffer->append($packet);
575
        
576
        if($this->compressionBuffer->getSize() > 7) {
577
            $this->decompressBuffer();
578
        }
579
    }
580
    
581
    /**
582
     * Adds the events to the connection.
583
     * @return void
584
     */
585 1
    protected function addEvents() {
586
        $this->connection->on('data', function ($chunk) {
587
            if($this->compressionEnabled && $this->state === static::STATE_OK) {
588
                $this->compressionBuffer->append($chunk);
589
                
590
                if($this->compressionBuffer->getSize() > 7) {
591
                    $this->decompressBuffer();
592
                }
593
            } else {
594
                $this->buffer->append($chunk);
595
            }
596
            
597
            $this->processBuffer();
598 1
        });
599
        
600
        $this->connection->on('close', function () {
601
            $this->handleClose();
602 1
        });
603 1
    }
604
    
605
    /**
606
     * Connection close handler.
607
     * @return void
608
     */
609
    protected function handleClose() {
610
        if($this->state === static::STATE_AUTH || $this->state === static::STATE_AUTH_SENT) {
611
            $this->state = static::STATE_AUTH_ERROR;
612
        }
613
        
614
        $this->buffer->clear();
615
        $this->messageBuffer->clear();
616
        $this->compressionBuffer->clear();
617
    }
618
}
619