GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.

ProtocolParser::markCommandAsFinished()   A
last analyzed

Complexity

Conditions 2
Paths 2

Size

Total Lines 6
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 2

Importance

Changes 0
Metric Value
cc 2
eloc 3
nc 2
nop 1
dl 0
loc 6
ccs 4
cts 4
cp 1
crap 2
rs 10
c 0
b 0
f 0
1
<?php
2
/**
3
 * Plasma Driver MySQL component
4
 * Copyright 2018-2019 PlasmaPHP, All Rights Reserved
5
 *
6
 * Website: https://github.com/PlasmaPHP
7
 * License: https://github.com/PlasmaPHP/driver-mysql/blob/master/LICENSE
8
*/
9
10
namespace Plasma\Drivers\MySQL;
11
12
/**
13
 * The MySQL Protocol Parser.
14
 * @internal
15
 */
16
class ProtocolParser implements \Evenement\EventEmitterInterface {
17
    use \Evenement\EventEmitterTrait;
18
    
19
    /**
20
     * @var int
21
     */
22
    const STATE_INIT = 0;
23
    
24
    /**
25
     * @var int
26
     */
27
    const STATE_HANDSHAKE = 1;
28
    
29
    /**
30
     * @var int
31
     */
32
    const STATE_HANDSHAKE_ERROR = 2;
33
    
34
    /**
35
     * @var int
36
     */
37
    const STATE_AUTH = 5;
38
    
39
    /**
40
     * @var int
41
     */
42
    const STATE_AUTH_SENT = 6;
43
    
44
    /**
45
     * @var int
46
     */
47
    const STATE_AUTH_ERROR = 7;
48
    
49
    /**
50
     * @var int
51
     */
52
    const STATE_OK = 9;
53
    
54
    /**
55
     * @var int
56
     */
57
    const CLIENT_CAPABILITIES = (
58
        \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_FOUND_ROWS |
59
        \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_LONG_PASSWORD |
60
        \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_LONG_FLAG |
61
        \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_LOCAL_FILES |
62
        \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_INTERACTIVE |
63
        \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_TRANSACTIONS |
64
        \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_SECURE_CONNECTION |
65
        \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_PROTOCOL_41 |
66
        \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_DEPRECATE_EOF |
67
        \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_PS_MULTI_RESULTS
68
    );
69
    
70
    /**
71
     * @var int
72
     */
73
    const CLIENT_MAX_PACKET_SIZE = 0xFFFFFF;
74
    
75
    /**
76
     * @var int
77
     */
78
    const CLIENT_CHARSET_NUMBER = 0x21;
79
    
80
    /**
81
     * @var \Plasma\Drivers\MySQL\Driver
82
     */
83
    protected $driver;
84
    
85
    /**
86
     * @var \React\Socket\ConnectionInterface
87
     */
88
    protected $connection;
89
    
90
    /**
91
     * @var int
92
     */
93
    protected $state = ProtocolParser::STATE_INIT;
94
    
95
    /**
96
     * @var \Plasma\BinaryBuffer
97
     */
98
    protected $buffer;
99
    
100
    /**
101
     * @var \Plasma\BinaryBuffer
102
     */
103
    protected $messageBuffer;
104
    
105
    /**
106
     * The sequence ID is incremented with each packet and may wrap around.
107
     * It starts at 0 and is reset to 0 when a new command begins in the Command Phase.
108
     * @var int
109
     * @see https://dev.mysql.com/doc/internals/en/sequence-id.html
110
     */
111
    protected $sequenceID = -1;
112
    
113
    /**
114
     * Whether we use compression.
115
     * @var bool
116
     */
117
    protected $compressionEnabled = false;
118
    
119
    /**
120
     * The compression ID is incremented with each packet and may wrap around.
121
     * The compression ID is independent to the sequence ID.
122
     * @var int
123
     * @see https://dev.mysql.com/doc/internals/en/compressed-packet-header.html
124
     */
125
    protected $compressionID = -1;
126
    
127
    /**
128
     * Small packets should not be compressed. This defines a minimum size for compression.
129
     * @var int
130
     * @see https://dev.mysql.com/doc/internals/en/uncompressed-payload.html
131
     */
132
    protected $compressionSizeThreshold = 50;
133
    
134
    /**
135
     * @var \Plasma\BinaryBuffer
136
     */
137
    protected $compressionBuffer;
138
    
139
    /**
140
     * @var \Plasma\Drivers\MySQL\Messages\HandshakeMessage|null
141
     */
142
    protected $handshakeMessage;
143
    
144
    /**
145
     * @var \Plasma\Drivers\MySQL\Messages\OkResponseMessage|null
146
     */
147
    protected $lastOkMessage;
148
    
149
    /**
150
     * @var \Plasma\CommandInterface|null
151
     */
152
    protected $currentCommand;
153
    
154
    /**
155
     * @var callable|null
156
     */
157
    protected $parseCallback;
158
    
159
    /**
160
     * Constructor.
161
     * @param \Plasma\Drivers\MySQL\Driver       $driver
162
     * @param \React\Socket\ConnectionInterface  $connection
163
     */
164 66
    function __construct(\Plasma\Drivers\MySQL\Driver $driver, \React\Socket\ConnectionInterface $connection) {
165 66
        $this->driver = $driver;
166 66
        $this->connection = $connection;
167
        
168 66
        $this->buffer = new \Plasma\BinaryBuffer();
169 66
        $this->messageBuffer = new \Plasma\BinaryBuffer();
170 66
        $this->compressionBuffer = new \Plasma\BinaryBuffer();
171
        
172 66
        $this->addEvents();
173 66
    }
174
    
175
    /**
176
     * Invoke a command to execute.
177
     * @param \Plasma\CommandInterface|null  $command
178
     * @return void
179
     */
180 64
    function invokeCommand(?\Plasma\CommandInterface $command): void {
181 64
        if($command === null) {
182 60
            return;
183
        }
184
        
185 64
        $this->currentCommand = $command;
186 64
        $this->processCommand();
187 64
    }
188
    
189
    /**
190
     * Executes a command, without handling any aftermath.
191
     * The `onComplete` callback will be immediately invoked, regardless of the `waitForCompletion` value.
192
     * @param \Plasma\CommandInterface  $command
193
     * @return void
194
     */
195
    function executeCommand(\Plasma\CommandInterface $command): void {
196
        $this->processCommand($command);
197
    }
198
    
199
    /**
200
     * Marks the command itself as finished, if currently running.
201
     * @param \Plasma\Drivers\MySQL\Commands\CommandInterface  $command
202
     * @return void
203
     */
204 60
    function markCommandAsFinished(\Plasma\CommandInterface $command): void {
205 60
        if($command === $this->currentCommand) {
206 60
            $this->currentCommand = null;
207
        }
208
        
209 60
        $command->onComplete();
210 60
    }
211
    
212
    /**
213
     * Get the parser state.
214
     * @return int
215
     */
216 1
    function getState(): int {
217 1
        return $this->state;
218
    }
219
    
220
    /**
221
     * Get the handshake message, or null.
222
     * @return \Plasma\Drivers\MySQL\Messages\HandshakeMessage|null
223
     */
224 63
    function getHandshakeMessage(): ?\Plasma\Drivers\MySQL\Messages\HandshakeMessage {
225 63
        return $this->handshakeMessage;
226
    }
227
    
228
    /**
229
     * Get the last ok response message, or null.
230
     * @return \Plasma\Drivers\MySQL\Messages\OkResponseMessage|null
231
     */
232 4
    function getLastOkMessage(): ?\Plasma\Drivers\MySQL\Messages\OkResponseMessage {
233 4
        return $this->lastOkMessage;
234
    }
235
    
236
    /**
237
     * Enables compression.
238
     * @return void
239
     */
240 4
    function enableCompression(): void {
241 4
        $this->compressionEnabled = true;
242 4
    }
243
    
244
    /**
245
     * Sends a packet to the server.
246
     * @param string  $packet
247
     * @return void
248
     */
249 64
    function sendPacket(string $packet): void {
250 64
        $initPacklen = \strlen($packet);
251
        
252
        do {
253 64
            $partial = \substr($packet, 0, static::CLIENT_MAX_PACKET_SIZE);
254 64
            $partlen = \strlen($partial);
255
            
256 64
            $packet = \substr($packet, static::CLIENT_MAX_PACKET_SIZE);
257 64
            $packlen = \strlen($packet);
258
            
259 64
            $length = \Plasma\BinaryBuffer::writeInt3($partlen);
260 64
            $sequence = \Plasma\BinaryBuffer::writeInt1((++$this->sequenceID));
261
            
262 64
            $packet = $length.$sequence.$partial;
263
            
264 64
            if($this->compressionEnabled && $this->state === static::STATE_OK) {
265 2
                $packet = $this->compressPacket($packet);
266
            }
267
            
268 64
            $this->connection->write($packet);
269 64
        } while($packlen > static::CLIENT_MAX_PACKET_SIZE);
270
        
271
        // If the packet is exactly the max size, we have to send two packets
272 64
        if($initPacklen === static::CLIENT_MAX_PACKET_SIZE) {
273 1
            $length = \Plasma\BinaryBuffer::writeInt3(0);
274 1
            $sequence = \Plasma\BinaryBuffer::writeInt1((++$this->sequenceID));
275 1
            $packet = $length.$sequence;
276
            
277 1
            if($this->compressionEnabled && $this->state === static::STATE_OK) {
278
                $packet = $this->compressPacket($packet);
279
            }
280
            
281 1
            $this->connection->write($packet);
282
        }
283 64
    }
284
    
285
    /**
286
     * Sets the parse callback.
287
     * @param callable $callback
288
     * @return void
289
     */
290
    function setParseCallback(callable $callback): void {
291
        $this->parseCallback = $callback;
292
    }
293
    
294
    /**
295
     * Processes a command.
296
     * @param \Plasma\CommandInterface|null  $command
297
     * @return void
298
     */
299 64
    protected function processCommand(?\Plasma\CommandInterface $command = null) {
300 64
        if($command === null && $this->currentCommand instanceof \Plasma\CommandInterface) {
301 64
            $command = $this->currentCommand;
302
            
303 64
            if($this->currentCommand instanceof \Plasma\Drivers\MySQL\Commands\CommandInterface) {
304 64
                $state = $command->setParserState();
305 64
                if($state !== -1) {
306 63
                    $this->state = $state;
307
                }
308
            }
309
        }
310
        
311 64
        if($command === null) {
312
            return;
313
        }
314
        
315 64
        if(!($command instanceof \Plasma\Drivers\MySQL\Commands\CommandInterface) || $command->resetSequence()) {
316 61
            $this->sequenceID = -1;
317 61
            $this->compressionID = -1;
318
        }
319
        
320
        try {
321 64
            $msg = $command->getEncodedMessage();
322 2
        } catch (\Plasma\Exception $e) {
323 2
            $this->currentCommand = null;
324 2
            $command->onError($e);
325 2
            return;
326
        }
327
        
328 63
        $this->sendPacket($msg);
329
        
330 63
        if($command !== $this->currentCommand || !$command->waitForCompletion()) {
331 37
            $command->onComplete();
332
            
333 37
            if($command === $this->currentCommand) {
334 37
                $this->currentCommand = null;
335
            }
336
        }
337 63
    }
338
    
339
    /**
340
     * Processes the buffer.
341
     * @return void
342
     */
343 63
    protected function processBuffer() {
344 63
        if($this->buffer->getSize() < 4) {
345
            return;
346
        }
347
        
348 63
        $buffer = clone $this->buffer;
349
        
350 63
        $length = $buffer->readInt3();
351 63
        $this->sequenceID = $buffer->readInt1();
352
        
353 63
        if($length === static::CLIENT_MAX_PACKET_SIZE) {
354
            $this->buffer->read(($length + 4));
355
            $this->messageBuffer->append($buffer->read($length));
356
            return;
357 63
        } elseif($this->messageBuffer->getSize() > 0) {
358
            $this->messageBuffer->append($buffer->read($length));
359
            $buffer = $this->messageBuffer;
360
            $this->messageBuffer = new \Plasma\BinaryBuffer();
361
        }
362
        
363 63
        if($buffer->getSize() < $length) {
364
            return;
365
        }
366
        
367 63
        if($length > 0) {
368 63
            $this->buffer->read(($length + 4));
369 63
            $buffer->slice(0, $length);
370
        } else {
371
            $this->buffer->slice($buffer->getSize());
372
        }
373
        
374 63
        if($buffer->getSize() === 0) {
375
            return;
376
        }
377
        
378
        /** @var \Plasma\Drivers\MySQL\Messages\MessageInterface  $message */
379 63
        $message = null;
380
        
381 63
        if($this->state === static::STATE_INIT) {
382 63
            $message = new \Plasma\Drivers\MySQL\Messages\HandshakeMessage($this);
383
        } else {
384 63
            $firstChar = $buffer->read(1);
385
            
386 63
            $okRespID = \Plasma\Drivers\MySQL\Messages\OkResponseMessage::getID();
387
            $isOkMessage = (
388
                (
389 63
                    $firstChar === $okRespID &&
390 62
                    (!($this->currentCommand instanceof \Plasma\Drivers\MySQL\Commands\QueryCommand)
391 62
                        || \strtoupper(\substr($this->currentCommand->getQuery(), 0, 6)) !== 'SELECT') // Fix for MySQL 5.7
392
                ) ||
393
                (
394 49
                    $firstChar === \Plasma\Drivers\MySQL\Messages\EOFMessage::getID() &&
395 63
                    ($this->handshakeMessage->capability & \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_DEPRECATE_EOF) !== 0
396
                )
397
            );
398
            
399
            switch(true) {
400 63
                case ($firstChar === \Plasma\Drivers\MySQL\Messages\ErrResponseMessage::getID()):
401 1
                    $message = new \Plasma\Drivers\MySQL\Messages\ErrResponseMessage($this);
402 1
                break;
403 62
                case ($this->currentCommand instanceof \Plasma\Drivers\MySQL\Commands\StatementPrepareCommand && $firstChar === $okRespID):
404 38
                    $message = new \Plasma\Drivers\MySQL\Messages\PrepareStatementOkMessage($this);
405 38
                break;
406 62
                case $isOkMessage:
407 62
                    $message = new \Plasma\Drivers\MySQL\Messages\OkResponseMessage($this);
408 62
                    $this->lastOkMessage = $message;
409
                    
410
                    $this->driver->getLoop()->futureTick(function () use ($message) {
411 62
                        $this->driver->emit('eventRelay', array('serverOkMessage', $message));
412 62
                    });
0 ignored issues
show
Coding Style introduced by
Line indented incorrectly; expected 16 spaces, found 20
Loading history...
413 62
                break;
414 48
                case ($this->state < static::STATE_OK && $firstChar === \Plasma\Drivers\MySQL\Messages\AuthMoreDataMessage::getID()):
415
                    $message = new \Plasma\Drivers\MySQL\Messages\AuthMoreDataMessage($this);
416
                break;
417 48
                case ($this->state < static::STATE_OK && $firstChar === \Plasma\Drivers\MySQL\Messages\AuthSwitchRequestMessage::getID()):
418
                    $message = new \Plasma\Drivers\MySQL\Messages\AuthSwitchRequestMessage($this);
419
                break;
420 48
                case ($this->state < static::STATE_OK && $firstChar === \Plasma\Drivers\MySQL\Messages\AuthMoreDataMessage::getID()):
421
                    $message = new \Plasma\Drivers\MySQL\Messages\AuthMoreDataMessage($this);
422
                break;
423 48
                case ($firstChar === \Plasma\Drivers\MySQL\Messages\EOFMessage::getID() && $length < 6):
424
                    $message = new \Plasma\Drivers\MySQL\Messages\EOFMessage($this);
425
                break;
426 48
                case ($firstChar === \Plasma\Drivers\MySQL\Messages\LocalInFileRequestMessage::getID()):
427
                    if($this->driver->getOptions()['localInFile.enable']) {
428
                        $message = new \Plasma\Drivers\MySQL\Messages\LocalInFileRequestMessage($this);
429
                    } else {
430
                        $this->emit('error', array((new \Plasma\Exception('MySQL server requested a local file, but the driver options is disabled'))));
431
                        
432
                        if($this->buffer->getSize() > 0) {
433
                            $this->driver->getLoop()->futureTick(function () {
434
                                $this->processBuffer();
435
                            });
436
                        }
437
                        
438
                        return;
439
                    }
440
                break;
441
                default:
442 48
                    $buffer->prepend($firstChar);
443
                    
444 48
                    if($this->parseCallback !== null) {
445
                        $parse = $this->parseCallback;
446
                        $this->parseCallback = null;
447
                        
448
                        $caller = new \Plasma\Drivers\MySQL\ProtocolOnNextCaller($this, $buffer);
449
                        $parse($caller);
450 48
                    } elseif($this->currentCommand !== null) {
451 48
                        $command = $this->currentCommand;
452
                        
453 48
                        $caller = new \Plasma\Drivers\MySQL\ProtocolOnNextCaller($this, $buffer);
454 48
                        $command->onNext($caller);
455
                        
456 48
                        if($command->hasFinished()) {
457 38
                            $this->currentCommand = null;
458 38
                            $command->onComplete();
459
                        }
460
                    }
461
                    
462 48
                    if($this->buffer->getSize() > 0) {
463
                        $this->driver->getLoop()->futureTick(function () {
464 48
                            $this->processBuffer();
465 48
                        });
466
                    }
467
                    
468 48
                    return;
469
                break;
470
            }
471
        }
472
        
473 63
        $state = $message->setParserState();
474 63
        if($state !== -1) {
475 63
            $this->state = $state;
476
        }
477
        
478 63
        if($message instanceof \Plasma\Drivers\MySQL\Messages\HandshakeMessage) {
479 63
            $this->handshakeMessage = $message;
480
        }
481
        
482 63
        $this->handleMessage($buffer, $message);
483 63
    }
484
    
485
    /**
486
     * Handles an incoming message.
487
     * @param \Plasma\BinaryBuffer                             $buffer
488
     * @param \Plasma\Drivers\MySQL\Messages\MessageInterface  $message
489
     * @return void
490
     */
491 63
    function handleMessage(\Plasma\BinaryBuffer $buffer, \Plasma\Drivers\MySQL\Messages\MessageInterface $message) {
492
        try {
493 63
            $buffer = $message->parseMessage($buffer);
494 63
            if(!$buffer) {
495
                return;
496
            }
497
            
498 63
            if($this->currentCommand !== null) {
499
                if(
500 63
                    ($message instanceof \Plasma\Drivers\MySQL\Messages\OkResponseMessage || $message instanceof \Plasma\Drivers\MySQL\Messages\EOFMessage)
501 63
                    && $this->currentCommand->hasFinished()
502
                ) {
503 62
                    $command = $this->currentCommand;
504 62
                    $this->currentCommand = null;
505
                    
506 62
                    $command->onComplete();
507 61
                } elseif($message instanceof \Plasma\Drivers\MySQL\Messages\ErrResponseMessage) {
508 1
                    $error = new \Plasma\Exception($message->errorMessage, $message->errorCode);
509
                    
510 1
                    $command = $this->currentCommand;
511 1
                    $this->currentCommand = null;
512
                    
513 1
                    $command->onError($error);
514
                } else {
515 60
                    $command = $this->currentCommand;
516 60
                    $command->onNext($message);
517
                    
518 60
                    if($command->hasFinished()) {
519 60
                        if($this->currentCommand === $command) {
520
                            $this->currentCommand = null;
521
                        }
522
                        
523 63
                        $command->onComplete();
524
                    }
525
                }
526 63
            } elseif($message instanceof \Plasma\Drivers\MySQL\Messages\ErrResponseMessage) {
527
                $error = new \Plasma\Exception($message->errorMessage, $message->errorCode);
528
                $this->emit('error', array($error));
529
            }
530
            
531 63
            $this->emit('message', array($message));
532
        } catch (\Plasma\Drivers\MySQL\Messages\ParseException $e) {
533
            $state = $e->getState();
534
            if($state !== null) {
535
                $this->state = $state;
536
            }
537
            
538
            $buffer = $e->getBuffer();
539
            if($buffer !== null) {
540
                $this->buffer->clear();
541
                $this->buffer->append($buffer);
542
            }
543
            
544
            if($this->currentCommand !== null) {
545
                $this->currentCommand->onError($e);
546
            }
547
            
548
            $this->emit('error', array($e));
549
            $this->connection->close();
550
        } catch (\Plasma\Exception $e) {
551
            if($this->currentCommand !== null) {
552
                $command = $this->currentCommand;
553
                $this->currentCommand = null;
554
                
555
                $command->onError($e);
556
            } else {
557
                $this->emit('error', array($e));
558
            }
559
        }
560
        
561 63
        if($this->buffer->getSize() > 0) {
562
            $this->driver->getLoop()->futureTick(function () {
563 38
                $this->processBuffer();
564 38
            });
565
        }
566 63
    }
567
    
568
    /**
569
     * Compresses a packet.
570
     * @param string  $packet
571
     * @return string
572
     */
573 2
    protected function compressPacket(string $packet): string {
574 2
        $length = \strlen($packet);
575 2
        $packetlen = \Plasma\BinaryBuffer::writeInt3($length);
576 2
        $id = \Plasma\BinaryBuffer::writeInt1((++$this->compressionID));
577
        
578 2
        if($length < $this->compressionSizeThreshold) {
579 2
            return $packetlen.$id.\Plasma\BinaryBuffer::writeInt3(0).$packet;
580
        }
581
        
582 1
        $compressed = \zlib_encode($packet, \ZLIB_ENCODING_DEFLATE);
583 1
        $compresslen = \Plasma\BinaryBuffer::writeInt3(\strlen($compressed));
584
        
585 1
        return $compresslen.$id.$packetlen.$compressed;
586
    }
587
    
588
    /**
589
     * Decompresses the buffer.
590
     * @return void
591
     */
592 2
    protected function decompressBuffer(): void {
593 2
        $buffer = new \Plasma\BinaryBuffer();
594
        
595
        // Copy packet header to new buffer
596 2
        for($i = 0; $i < 7; $i++) {
597 2
            $buffer->append($this->compressionBuffer[$i]);
598
        }
599
        
600 2
        $length = $buffer->readInt3();
601 2
        $this->compressionID = $buffer->readInt1();
602 2
        $uncompressedLength = $buffer->readInt3();
603
        
604 2
        if(($this->compressionBuffer->getSize() - 7) < $length) {
605
            return;
606
        }
607
        
608 2
        $this->compressionBuffer->read(7);
609 2
        $buffer = null;
610
        
611 2
        if($uncompressedLength === 0) {
612 2
            $this->buffer->append($this->compressionBuffer->read($length));
613 2
            return;
614
        }
615
        
616 1
        $rawPacket = $this->compressionBuffer->read($length);
617 1
        $packet = \zlib_decode($rawPacket, $uncompressedLength);
618
        
619 1
        if(\strlen($packet) !== $uncompressedLength) {
620
            $packet = "\xFF\x00\x00\x00     Invalid compressed packet";
621
            $this->connection->end($packet);
622
            
623
            return;
624
        }
625
        
626 1
        $this->buffer->append($packet);
627
        
628 1
        if($this->compressionBuffer->getSize() > 7) {
629
            $this->decompressBuffer();
630
        }
631 1
    }
632
    
633
    /**
634
     * Adds the events to the connection.
635
     * @return void
636
     */
637 66
    protected function addEvents() {
638
        $this->connection->on('data', function ($chunk) {
639 63
            if($this->compressionEnabled && $this->state === static::STATE_OK) {
640 2
                $this->compressionBuffer->append($chunk);
641
                
642 2
                if($this->compressionBuffer->getSize() > 7) {
643 2
                    $this->decompressBuffer();
644
                }
645
            } else {
646 63
                $this->buffer->append($chunk);
647
            }
648
            
649 63
            $this->processBuffer();
650 66
        });
651
        
652
        $this->connection->on('close', function () {
653 3
            $this->handleClose();
654 66
        });
655 66
    }
656
    
657
    /**
658
     * Connection close handler.
659
     * @return void
660
     */
661 3
    protected function handleClose() {
662 3
        if($this->state === static::STATE_AUTH || $this->state === static::STATE_AUTH_SENT) {
663
            $this->state = static::STATE_AUTH_ERROR;
664
        }
665
        
666 3
        $this->buffer->clear();
667 3
        $this->messageBuffer->clear();
668 3
        $this->compressionBuffer->clear();
669 3
    }
670
}
671