GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Failed Conditions
Push — master ( ba34d5...ca5af1 )
by Charlotte
07:36
created

ProtocolParser   F

Complexity

Total Complexity 95

Size/Duplication

Total Lines 648
Duplicated Lines 0 %

Test Coverage

Coverage 73.66%

Importance

Changes 0
Metric Value
eloc 283
dl 0
loc 648
ccs 193
cts 262
cp 0.7366
rs 2
c 0
b 0
f 0
wmc 95

17 Methods

Rating   Name   Duplication   Size   Complexity  
A invokeCommand() 0 7 2
A __construct() 0 9 1
A executeCommand() 0 2 1
A getLastOkMessage() 0 2 1
A getState() 0 2 1
A enableCompression() 0 2 1
A markCommandAsFinished() 0 6 2
A getHandshakeMessage() 0 2 1
A setParseCallback() 0 2 1
B sendPacket() 0 33 7
D processBuffer() 0 136 33
C processCommand() 0 36 12
F handleMessage() 0 73 17
A handleClose() 0 8 3
A compressPacket() 0 13 2
B decompressBuffer() 0 38 6
A addEvents() 0 17 4

How to fix   Complexity   

Complex Class

Complex classes like ProtocolParser often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use ProtocolParser, and based on these observations, apply Extract Interface, too.

1
<?php
2
/**
3
 * Plasma Driver MySQL component
4
 * Copyright 2018-2019 PlasmaPHP, All Rights Reserved
5
 *
6
 * Website: https://github.com/PlasmaPHP
7
 * License: https://github.com/PlasmaPHP/driver-mysql/blob/master/LICENSE
8
*/
9
10
namespace Plasma\Drivers\MySQL;
11
12
/**
13
 * The MySQL Protocol Parser.
14
 * @internal
15
 */
16
class ProtocolParser implements \Evenement\EventEmitterInterface {
17
    use \Evenement\EventEmitterTrait;
18
    
19
    /**
20
     * @var int
21
     */
22
    const STATE_INIT = 0;
23
    
24
    /**
25
     * @var int
26
     */
27
    const STATE_HANDSHAKE = 1;
28
    
29
    /**
30
     * @var int
31
     */
32
    const STATE_HANDSHAKE_ERROR = 2;
33
    
34
    /**
35
     * @var int
36
     */
37
    const STATE_AUTH = 5;
38
    
39
    /**
40
     * @var int
41
     */
42
    const STATE_AUTH_SENT = 6;
43
    
44
    /**
45
     * @var int
46
     */
47
    const STATE_AUTH_ERROR = 7;
48
    
49
    /**
50
     * @var int
51
     */
52
    const STATE_OK = 9;
53
    
54
    /**
55
     * @var int
56
     */
57
    const CLIENT_CAPABILITIES = (
58
        \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_FOUND_ROWS |
59
        \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_LONG_PASSWORD |
60
        \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_LONG_FLAG |
61
        \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_LOCAL_FILES |
62
        \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_INTERACTIVE |
63
        \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_TRANSACTIONS |
64
        \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_SECURE_CONNECTION |
65
        \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_PROTOCOL_41 |
66
        \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_DEPRECATE_EOF
67
    );
68
    
69
    /**
70
     * @var int
71
     */
72
    const CLIENT_MAX_PACKET_SIZE = 0xFFFFFF;
73
    
74
    /**
75
     * @var int
76
     */
77
    const CLIENT_CHARSET_NUMBER = 0x21;
78
    
79
    /**
80
     * @var \Plasma\Drivers\MySQL\Driver
81
     */
82
    protected $driver;
83
    
84
    /**
85
     * @var \React\Socket\ConnectionInterface
86
     */
87
    protected $connection;
88
    
89
    /**
90
     * @var int
91
     */
92
    protected $state = ProtocolParser::STATE_INIT;
93
    
94
    /**
95
     * @var \Plasma\BinaryBuffer
96
     */
97
    protected $buffer;
98
    
99
    /**
100
     * @var \Plasma\BinaryBuffer
101
     */
102
    protected $messageBuffer;
103
    
104
    /**
105
     * The sequence ID is incremented with each packet and may wrap around.
106
     * It starts at 0 and is reset to 0 when a new command begins in the Command Phase.
107
     * @var int
108
     * @see https://dev.mysql.com/doc/internals/en/sequence-id.html
109
     */
110
    protected $sequenceID = -1;
111
    
112
    /**
113
     * Whether we use compression.
114
     * @var bool
115
     */
116
    protected $compressionEnabled = false;
117
    
118
    /**
119
     * The compression ID is incremented with each packet and may wrap around.
120
     * The compression ID is independent to the sequence ID.
121
     * @var int
122
     * @see https://dev.mysql.com/doc/internals/en/compressed-packet-header.html
123
     */
124
    protected $compressionID = -1;
125
    
126
    /**
127
     * Small packets should not be compressed. This defines a minimum size for compression.
128
     * @var int
129
     * @see https://dev.mysql.com/doc/internals/en/uncompressed-payload.html
130
     */
131
    protected $compressionSizeThreshold = 50;
132
    
133
    /**
134
     * @var \Plasma\BinaryBuffer
135
     */
136
    protected $compressionBuffer;
137
    
138
    /**
139
     * @var \Plasma\Drivers\MySQL\Messages\HandshakeMessage|null
140
     */
141
    protected $handshakeMessage;
142
    
143
    /**
144
     * @var \Plasma\Drivers\MySQL\Messages\OkResponseMessage|null
145
     */
146
    protected $lastOkMessage;
147
    
148
    /**
149
     * @var \Plasma\CommandInterface|null
150
     */
151
    protected $currentCommand;
152
    
153
    /**
154
     * @var callable|null
155
     */
156
    protected $parseCallback;
157
    
158
    /**
159
     * Constructor.
160
     * @param \Plasma\Drivers\MySQL\Driver       $driver
161
     * @param \React\Socket\ConnectionInterface  $connection
162
     */
163 65
    function __construct(\Plasma\Drivers\MySQL\Driver $driver, \React\Socket\ConnectionInterface $connection) {
164 65
        $this->driver = $driver;
165 65
        $this->connection = $connection;
166
        
167 65
        $this->buffer = new \Plasma\BinaryBuffer();
168 65
        $this->messageBuffer = new \Plasma\BinaryBuffer();
169 65
        $this->compressionBuffer = new \Plasma\BinaryBuffer();
170
        
171 65
        $this->addEvents();
172 65
    }
173
    
174
    /**
175
     * Invoke a command to execute.
176
     * @param \Plasma\CommandInterface|null  $command
177
     * @return void
178
     */
179 63
    function invokeCommand(?\Plasma\CommandInterface $command): void {
180 63
        if($command === null) {
181 59
            return;
182
        }
183
        
184 63
        $this->currentCommand = $command;
185 63
        $this->processCommand();
186 63
    }
187
    
188
    /**
189
     * Executes a command, without handling any aftermath.
190
     * The `onComplete` callback will be immediately invoked, regardless of the `waitForCompletion` value.
191
     * @param \Plasma\CommandInterface  $command
192
     * @return void
193
     */
194
    function executeCommand(\Plasma\CommandInterface $command): void {
195
        $this->processCommand($command);
196
    }
197
    
198
    /**
199
     * Marks the command itself as finished, if currently running.
200
     * @param \Plasma\Drivers\MySQL\Commands\CommandInterface  $command
201
     * @return void
202
     */
203 59
    function markCommandAsFinished(\Plasma\CommandInterface $command): void {
204 59
        if($command === $this->currentCommand) {
205 59
            $this->currentCommand = null;
206
        }
207
        
208 59
        $command->onComplete();
209 59
    }
210
    
211
    /**
212
     * Get the parser state.
213
     * @return int
214
     */
215 1
    function getState(): int {
216 1
        return $this->state;
217
    }
218
    
219
    /**
220
     * Get the handshake message, or null.
221
     * @return \Plasma\Drivers\MySQL\Messages\HandshakeMessage|null
222
     */
223 62
    function getHandshakeMessage(): ?\Plasma\Drivers\MySQL\Messages\HandshakeMessage {
224 62
        return $this->handshakeMessage;
225
    }
226
    
227
    /**
228
     * Get the last ok response message, or null.
229
     * @return \Plasma\Drivers\MySQL\Messages\OkResponseMessage|null
230
     */
231 4
    function getLastOkMessage(): ?\Plasma\Drivers\MySQL\Messages\OkResponseMessage {
232 4
        return $this->lastOkMessage;
233
    }
234
    
235
    /**
236
     * Enables compression.
237
     * @return void
238
     */
239 2
    function enableCompression(): void {
240 2
        $this->compressionEnabled = true;
241 2
    }
242
    
243
    /**
244
     * Sends a packet to the server.
245
     * @param string  $packet
246
     * @return void
247
     */
248 63
    function sendPacket(string $packet): void {
249 63
        $initPacklen = \strlen($packet);
250
        
251
        do {
252 63
            $partial = \substr($packet, 0, static::CLIENT_MAX_PACKET_SIZE);
253 63
            $partlen = \strlen($partial);
254
            
255 63
            $packet = \substr($packet, static::CLIENT_MAX_PACKET_SIZE);
256 63
            $packlen = \strlen($packet);
257
            
258 63
            $length = \Plasma\BinaryBuffer::writeInt3($partlen);
259 63
            $sequence = \Plasma\BinaryBuffer::writeInt1((++$this->sequenceID));
260
            
261 63
            $packet = $length.$sequence.$partial;
262
            
263 63
            if($this->compressionEnabled && $this->state === static::STATE_OK) {
264 2
                $packet = $this->compressPacket($packet);
265
            }
266
            
267 63
            $this->connection->write($packet);
268 63
        } while($packlen > static::CLIENT_MAX_PACKET_SIZE);
269
        
270
        // If the packet is exactly the max size, we have to send two packets
271 63
        if($initPacklen === static::CLIENT_MAX_PACKET_SIZE) {
272 1
            $length = \Plasma\BinaryBuffer::writeInt3(0);
273 1
            $sequence = \Plasma\BinaryBuffer::writeInt1((++$this->sequenceID));
274 1
            $packet = $length.$sequence;
275
            
276 1
            if($this->compressionEnabled && $this->state === static::STATE_OK) {
277
                $packet = $this->compressPacket($packet);
278
            }
279
            
280 1
            $this->connection->write($packet);
281
        }
282 63
    }
283
    
284
    /**
285
     * Sets the parse callback.
286
     * @param callable $callback
287
     * @return void
288
     */
289
    function setParseCallback(callable $callback): void {
290
        $this->parseCallback = $callback;
291
    }
292
    
293
    /**
294
     * Processes a command.
295
     * @param \Plasma\CommandInterface|null  $command
296
     * @return void
297
     */
298 63
    protected function processCommand(?\Plasma\CommandInterface $command = null) {
299 63
        if($command === null && $this->currentCommand instanceof \Plasma\CommandInterface) {
300 63
            $command = $this->currentCommand;
301
            
302 63
            if($this->currentCommand instanceof \Plasma\Drivers\MySQL\Commands\CommandInterface) {
303 63
                $state = $command->setParserState();
304 63
                if($state !== -1) {
305 62
                    $this->state = $state;
306
                }
307
            }
308
        }
309
        
310 63
        if($command === null) {
311
            return;
312
        }
313
        
314 63
        if(!($command instanceof \Plasma\Drivers\MySQL\Commands\CommandInterface) || $command->resetSequence()) {
315 60
            $this->sequenceID = -1;
316 60
            $this->compressionID = -1;
317
        }
318
        
319
        try {
320 63
            $msg = $command->getEncodedMessage();
321 2
        } catch (\Plasma\Exception $e) {
322 2
            $this->currentCommand = null;
323 2
            $command->onError($e);
324 2
            return;
325
        }
326
        
327 62
        $this->sendPacket($msg);
328
        
329 62
        if($command !== $this->currentCommand || !$command->waitForCompletion()) {
330 36
            $command->onComplete();
331
            
332 36
            if($command === $this->currentCommand) {
333 36
                $this->currentCommand = null;
334
            }
335
        }
336 62
    }
337
    
338
    /**
339
     * Processes the buffer.
340
     * @return void
341
     */
342 62
    protected function processBuffer() {
343 62
        if($this->buffer->getSize() < 4) {
344
            return;
345
        }
346
        
347 62
        $buffer = clone $this->buffer;
348
        
349 62
        $length = $buffer->readInt3();
350 62
        $this->sequenceID = $buffer->readInt1();
351
        
352 62
        if($length === static::CLIENT_MAX_PACKET_SIZE) {
353
            $this->buffer->read(($length + 4));
354
            $this->messageBuffer->append($buffer->read($length));
355
            return;
356 62
        } elseif($this->messageBuffer->getSize() > 0) {
357
            $this->messageBuffer->append($buffer->read($length));
358
            $buffer = $this->messageBuffer;
359
            $this->messageBuffer = new \Plasma\BinaryBuffer();
360
        }
361
        
362 62
        if($buffer->getSize() < $length) {
363
            return;
364
        }
365
        
366 62
        if($length > 0) {
367 62
            $this->buffer->read(($length + 4));
368 62
            $buffer->slice(0, $length);
369
        } else {
370
            $this->buffer->slice($buffer->getSize());
371
        }
372
        
373 62
        if($buffer->getSize() === 0) {
374
            return;
375
        }
376
        
377
        /** @var \Plasma\Drivers\MySQL\Messages\MessageInterface  $message */
378 62
        $message = null;
379
        
380 62
        if($this->state === static::STATE_INIT) {
381 62
            $message = new \Plasma\Drivers\MySQL\Messages\HandshakeMessage($this);
382
        } else {
383 62
            $firstChar = $buffer->read(1);
384
            
385 62
            $okRespID = \Plasma\Drivers\MySQL\Messages\OkResponseMessage::getID();
386
            $isOkMessage = (
387
                (
388 62
                    $firstChar === $okRespID &&
389 61
                    (!($this->currentCommand instanceof \Plasma\Drivers\MySQL\Commands\QueryCommand)
390 61
                        || \strtoupper(\substr($this->currentCommand->getQuery(), 0, 6)) !== 'SELECT') // Fix for MySQL 5.7
391
                ) ||
392
                (
393 48
                    $firstChar === \Plasma\Drivers\MySQL\Messages\EOFMessage::getID() &&
394 62
                    ($this->handshakeMessage->capability & \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_DEPRECATE_EOF) !== 0
395
                )
396
            );
397
            
398
            switch(true) {
399 62
                case ($firstChar === \Plasma\Drivers\MySQL\Messages\ErrResponseMessage::getID()):
400 1
                    $message = new \Plasma\Drivers\MySQL\Messages\ErrResponseMessage($this);
401 1
                break;
402 61
                case ($this->currentCommand instanceof \Plasma\Drivers\MySQL\Commands\StatementPrepareCommand && $firstChar === $okRespID):
403 37
                    $message = new \Plasma\Drivers\MySQL\Messages\PrepareStatementOkMessage($this);
404 37
                break;
405 61
                case $isOkMessage:
406 61
                    $message = new \Plasma\Drivers\MySQL\Messages\OkResponseMessage($this);
407 61
                    $this->lastOkMessage = $message;
408 61
                break;
409 47
                case ($this->state < static::STATE_OK && $firstChar === \Plasma\Drivers\MySQL\Messages\AuthMoreDataMessage::getID()):
410
                    $message = new \Plasma\Drivers\MySQL\Messages\AuthMoreDataMessage($this);
411
                break;
412 47
                case ($this->state < static::STATE_OK && $firstChar === \Plasma\Drivers\MySQL\Messages\AuthSwitchRequestMessage::getID()):
413
                    $message = new \Plasma\Drivers\MySQL\Messages\AuthSwitchRequestMessage($this);
414
                break;
415 47
                case ($this->state < static::STATE_OK && $firstChar === \Plasma\Drivers\MySQL\Messages\AuthMoreDataMessage::getID()):
416
                    $message = new \Plasma\Drivers\MySQL\Messages\AuthMoreDataMessage($this);
417
                break;
418 47
                case ($firstChar === \Plasma\Drivers\MySQL\Messages\EOFMessage::getID() && $length < 6):
419
                    $message = new \Plasma\Drivers\MySQL\Messages\EOFMessage($this);
420
                break;
421 47
                case ($firstChar === \Plasma\Drivers\MySQL\Messages\LocalInFileRequestMessage::getID()):
422
                    if($this->driver->getOptions()['localInFile.enable']) {
423
                        $message = new \Plasma\Drivers\MySQL\Messages\LocalInFileRequestMessage($this);
424
                    } else {
425
                        $this->emit('error', array((new \Plasma\Exception('MySQL server requested a local file, but the driver options is disabled'))));
426
                        
427
                        if($this->buffer->getSize() > 0) {
428
                            $this->driver->getLoop()->futureTick(function () {
429
                                $this->processBuffer();
430
                            });
431
                        }
432
                        
433
                        return;
434
                    }
435
                break;
436
                default:
437 47
                    $buffer->prepend($firstChar);
438
                    
439 47
                    if($this->parseCallback !== null) {
440
                        $parse = $this->parseCallback;
441
                        $this->parseCallback = null;
442
                        
443
                        $caller = new \Plasma\Drivers\MySQL\ProtocolOnNextCaller($this, $buffer);
444
                        $parse($caller);
445 47
                    } elseif($this->currentCommand !== null) {
446 47
                        $command = $this->currentCommand;
447
                        
448 47
                        $caller = new \Plasma\Drivers\MySQL\ProtocolOnNextCaller($this, $buffer);
449 47
                        $command->onNext($caller);
450
                        
451 47
                        if($command->hasFinished()) {
452 37
                            $this->currentCommand = null;
453 37
                            $command->onComplete();
454
                        }
455
                    }
456
                    
457 47
                    if($this->buffer->getSize() > 0) {
458
                        $this->driver->getLoop()->futureTick(function () {
459 47
                            $this->processBuffer();
460 47
                        });
461
                    }
462
                    
463 47
                    return;
464
                break;
465
            }
466
        }
467
        
468 62
        $state = $message->setParserState();
469 62
        if($state !== -1) {
470 62
            $this->state = $state;
471
        }
472
        
473 62
        if($message instanceof \Plasma\Drivers\MySQL\Messages\HandshakeMessage) {
474 62
            $this->handshakeMessage = $message;
475
        }
476
        
477 62
        $this->handleMessage($buffer, $message);
478 62
    }
479
    
480
    /**
481
     * Handles an incoming message.
482
     * @param \Plasma\BinaryBuffer                             $buffer
483
     * @param \Plasma\Drivers\MySQL\Messages\MessageInterface  $message
484
     * @return void
485
     */
486 62
    function handleMessage(\Plasma\BinaryBuffer $buffer, \Plasma\Drivers\MySQL\Messages\MessageInterface $message) {
487
        try {
488 62
            $buffer = $message->parseMessage($buffer);
489 62
            if(!$buffer) {
490
                return;
491
            }
492
            
493 62
            if($this->currentCommand !== null) {
494
                if(
495 62
                    ($message instanceof \Plasma\Drivers\MySQL\Messages\OkResponseMessage || $message instanceof \Plasma\Drivers\MySQL\Messages\EOFMessage)
496 62
                    && $this->currentCommand->hasFinished()
497
                ) {
498 61
                    $command = $this->currentCommand;
499 61
                    $this->currentCommand = null;
500
                    
501 61
                    $command->onComplete();
502 60
                } elseif($message instanceof \Plasma\Drivers\MySQL\Messages\ErrResponseMessage) {
503 1
                    $error = new \Plasma\Exception($message->errorMessage, $message->errorCode);
504
                    
505 1
                    $command = $this->currentCommand;
506 1
                    $this->currentCommand = null;
507
                    
508 1
                    $command->onError($error);
509
                } else {
510 59
                    $command = $this->currentCommand;
511 59
                    $command->onNext($message);
512
                    
513 59
                    if($command->hasFinished()) {
514 59
                        if($this->currentCommand === $command) {
515
                            $this->currentCommand = null;
516
                        }
517
                        
518 62
                        $command->onComplete();
519
                    }
520
                }
521 62
            } elseif($message instanceof \Plasma\Drivers\MySQL\Messages\ErrResponseMessage) {
522
                $error = new \Plasma\Exception($message->errorMessage, $message->errorCode);
523
                $this->emit('error', array($error));
524
            }
525
            
526 62
            $this->emit('message', array($message));
527
        } catch (\Plasma\Drivers\MySQL\Messages\ParseException $e) {
528
            $state = $e->getState();
529
            if($state !== null) {
530
                $this->state = $state;
531
            }
532
            
533
            $buffer = $e->getBuffer();
534
            if($buffer !== null) {
535
                $this->buffer->clear();
536
                $this->buffer->append($buffer);
537
            }
538
            
539
            if($this->currentCommand !== null) {
540
                $this->currentCommand->onError($e);
541
            }
542
            
543
            $this->emit('error', array($e));
544
            $this->connection->close();
545
        } catch (\Plasma\Exception $e) {
546
            if($this->currentCommand !== null) {
547
                $command = $this->currentCommand;
548
                $this->currentCommand = null;
549
                
550
                $command->onError($error);
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable $error does not seem to be defined for all execution paths leading up to this point.
Loading history...
551
            } else {
552
                $this->emit('error', array($e));
553
            }
554
        }
555
        
556 62
        if($this->buffer->getSize() > 0) {
557
            $this->driver->getLoop()->futureTick(function () {
558 37
                $this->processBuffer();
559 37
            });
560
        }
561 62
    }
562
    
563
    /**
564
     * Compresses a packet.
565
     * @param string  $packet
566
     * @return string
567
     */
568 2
    protected function compressPacket(string $packet): string {
569 2
        $length = \strlen($packet);
570 2
        $packetlen = \Plasma\BinaryBuffer::writeInt3($length);
571 2
        $id = \Plasma\BinaryBuffer::writeInt1((++$this->compressionID));
572
        
573 2
        if($length < $this->compressionSizeThreshold) {
574 2
            return $packetlen.$id.\Plasma\BinaryBuffer::writeInt3(0).$packet;
575
        }
576
        
577
        $compressed = \zlib_encode($packet, \ZLIB_ENCODING_DEFLATE);
578
        $compresslen = \Plasma\BinaryBuffer::writeInt3(\strlen($compressed));
579
        
580
        return $compresslen.$id.$compressed;
581
    }
582
    
583
    /**
584
     * Decompresses the buffer.
585
     * @return void
586
     */
587 2
    protected function decompressBuffer(): void {
588 2
        $buffer = new \Plasma\BinaryBuffer();
589
        
590
        // Copy packet header to new buffer
591 2
        for($i = 0; $i < 7; $i++) {
592 2
            $buffer->append($this->compressionBuffer[$i]);
593
        }
594
        
595 2
        $length = $buffer->readInt3();
596 2
        $this->compressionID = $buffer->readInt1();
597 2
        $uncompressedLength = $buffer->readInt3();
598
        
599 2
        if(($this->compressionBuffer->getSize() - 7) < $length) {
600
            return;
601
        }
602
        
603 2
        $this->compressionBuffer->read(7);
604 2
        $buffer = null;
605
        
606 2
        if($uncompressedLength === 0) {
607 2
            $this->buffer->append($this->compressionBuffer->read($length));
608 2
            return;
609
        }
610
        
611 1
        $rawPacket = $this->compressionBuffer->read($length);
612 1
        $packet = \zlib_decode($rawPacket, $uncompressedLength);
613
        
614 1
        if(\strlen($packet) !== $uncompressedLength) {
615
            $packet = "\xFF\x00\x00\x00     Invalid compressed packet";
616
            $this->connection->end($packet);
617
            
618
            return;
619
        }
620
        
621 1
        $this->buffer->append($packet);
622
        
623 1
        if($this->compressionBuffer->getSize() > 7) {
624
            $this->decompressBuffer();
625
        }
626 1
    }
627
    
628
    /**
629
     * Adds the events to the connection.
630
     * @return void
631
     */
632 65
    protected function addEvents() {
633
        $this->connection->on('data', function ($chunk) {
634 62
            if($this->compressionEnabled && $this->state === static::STATE_OK) {
635 2
                $this->compressionBuffer->append($chunk);
636
                
637 2
                if($this->compressionBuffer->getSize() > 7) {
638 2
                    $this->decompressBuffer();
639
                }
640
            } else {
641 62
                $this->buffer->append($chunk);
642
            }
643
            
644 62
            $this->processBuffer();
645 65
        });
646
        
647
        $this->connection->on('close', function () {
648 3
            $this->handleClose();
649 65
        });
650 65
    }
651
    
652
    /**
653
     * Connection close handler.
654
     * @return void
655
     */
656 3
    protected function handleClose() {
657 3
        if($this->state === static::STATE_AUTH || $this->state === static::STATE_AUTH_SENT) {
658
            $this->state = static::STATE_AUTH_ERROR;
659
        }
660
        
661 3
        $this->buffer->clear();
662 3
        $this->messageBuffer->clear();
663 3
        $this->compressionBuffer->clear();
664 3
    }
665
}
666