GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Pull Request — master (#32)
by Charlotte
02:14
created

ProtocolParser   F

Complexity

Total Complexity 95

Size/Duplication

Total Lines 653
Duplicated Lines 0 %

Test Coverage

Coverage 19.32%

Importance

Changes 10
Bugs 1 Features 0
Metric Value
eloc 286
c 10
b 1
f 0
dl 0
loc 653
ccs 51
cts 264
cp 0.1932
rs 2
wmc 95

17 Methods

Rating   Name   Duplication   Size   Complexity  
A executeCommand() 0 2 1
A setParseCallback() 0 2 1
A __construct() 0 9 1
D processBuffer() 0 140 33
A invokeCommand() 0 7 2
A getLastOkMessage() 0 2 1
A getState() 0 2 1
A enableCompression() 0 2 1
A markCommandAsFinished() 0 6 2
A getHandshakeMessage() 0 2 1
F handleMessage() 0 73 17
B sendPacket() 0 33 7
A handleClose() 0 8 3
A compressPacket() 0 13 2
B decompressBuffer() 0 38 6
C processCommand() 0 36 12
A addEvents() 0 17 4

How to fix   Complexity   

Complex Class

Complex classes like ProtocolParser often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use ProtocolParser, and based on these observations, apply Extract Interface, too.

1
<?php
2
/**
3
 * Plasma Driver MySQL component
4
 * Copyright 2018-2019 PlasmaPHP, All Rights Reserved
5
 *
6
 * Website: https://github.com/PlasmaPHP
7
 * License: https://github.com/PlasmaPHP/driver-mysql/blob/master/LICENSE
8
*/
9
10
namespace Plasma\Drivers\MySQL;
11
12
/**
13
 * The MySQL Protocol Parser.
14
 * @internal
15
 */
16
class ProtocolParser implements \Evenement\EventEmitterInterface {
17
    use \Evenement\EventEmitterTrait;
18
    
19
    /**
20
     * @var int
21
     */
22
    const STATE_INIT = 0;
23
    
24
    /**
25
     * @var int
26
     */
27
    const STATE_HANDSHAKE = 1;
28
    
29
    /**
30
     * @var int
31
     */
32
    const STATE_HANDSHAKE_ERROR = 2;
33
    
34
    /**
35
     * @var int
36
     */
37
    const STATE_AUTH = 5;
38
    
39
    /**
40
     * @var int
41
     */
42
    const STATE_AUTH_SENT = 6;
43
    
44
    /**
45
     * @var int
46
     */
47
    const STATE_AUTH_ERROR = 7;
48
    
49
    /**
50
     * @var int
51
     */
52
    const STATE_OK = 9;
53
    
54
    /**
55
     * @var int
56
     */
57
    const CLIENT_CAPABILITIES = (
58
        \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_FOUND_ROWS |
59
        \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_LONG_PASSWORD |
60
        \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_LONG_FLAG |
61
        \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_LOCAL_FILES |
62
        \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_INTERACTIVE |
63
        \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_TRANSACTIONS |
64
        \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_SECURE_CONNECTION |
65
        \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_PROTOCOL_41 |
66
        \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_DEPRECATE_EOF |
67
        \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_PS_MULTI_RESULTS
68
    );
69
    
70
    /**
71
     * @var int
72
     */
73
    const CLIENT_MAX_PACKET_SIZE = 0xFFFFFF;
74
    
75
    /**
76
     * @var int
77
     */
78
    const CLIENT_CHARSET_NUMBER = 0x21;
79
    
80
    /**
81
     * @var \Plasma\Drivers\MySQL\Driver
82
     */
83
    protected $driver;
84
    
85
    /**
86
     * @var \React\Socket\ConnectionInterface
87
     */
88
    protected $connection;
89
    
90
    /**
91
     * @var int
92
     */
93
    protected $state = ProtocolParser::STATE_INIT;
94
    
95
    /**
96
     * @var \Plasma\BinaryBuffer
97
     */
98
    protected $buffer;
99
    
100
    /**
101
     * @var \Plasma\BinaryBuffer
102
     */
103
    protected $messageBuffer;
104
    
105
    /**
106
     * The sequence ID is incremented with each packet and may wrap around.
107
     * It starts at 0 and is reset to 0 when a new command begins in the Command Phase.
108
     * @var int
109
     * @see https://dev.mysql.com/doc/internals/en/sequence-id.html
110
     */
111
    protected $sequenceID = -1;
112
    
113
    /**
114
     * Whether we use compression.
115
     * @var bool
116
     */
117
    protected $compressionEnabled = false;
118
    
119
    /**
120
     * The compression ID is incremented with each packet and may wrap around.
121
     * The compression ID is independent to the sequence ID.
122
     * @var int
123
     * @see https://dev.mysql.com/doc/internals/en/compressed-packet-header.html
124
     */
125
    protected $compressionID = -1;
126
    
127
    /**
128
     * Small packets should not be compressed. This defines a minimum size for compression.
129
     * @var int
130
     * @see https://dev.mysql.com/doc/internals/en/uncompressed-payload.html
131
     */
132
    protected $compressionSizeThreshold = 50;
133
    
134
    /**
135
     * @var \Plasma\BinaryBuffer
136
     */
137
    protected $compressionBuffer;
138
    
139
    /**
140
     * @var \Plasma\Drivers\MySQL\Messages\HandshakeMessage|null
141
     */
142
    protected $handshakeMessage;
143
    
144
    /**
145
     * @var \Plasma\Drivers\MySQL\Messages\OkResponseMessage|null
146
     */
147
    protected $lastOkMessage;
148
    
149
    /**
150
     * @var \Plasma\CommandInterface|null
151
     */
152
    protected $currentCommand;
153
    
154
    /**
155
     * @var callable|null
156
     */
157
    protected $parseCallback;
158
    
159
    /**
160
     * Constructor.
161
     * @param \Plasma\Drivers\MySQL\Driver       $driver
162
     * @param \React\Socket\ConnectionInterface  $connection
163
     */
164 3
    function __construct(\Plasma\Drivers\MySQL\Driver $driver, \React\Socket\ConnectionInterface $connection) {
165 3
        $this->driver = $driver;
166 3
        $this->connection = $connection;
167
        
168 3
        $this->buffer = new \Plasma\BinaryBuffer();
169 3
        $this->messageBuffer = new \Plasma\BinaryBuffer();
170 3
        $this->compressionBuffer = new \Plasma\BinaryBuffer();
171
        
172 3
        $this->addEvents();
173 3
    }
174
    
175
    /**
176
     * Invoke a command to execute.
177
     * @param \Plasma\CommandInterface|null  $command
178
     * @return void
179
     */
180 1
    function invokeCommand(?\Plasma\CommandInterface $command): void {
181 1
        if($command === null) {
182
            return;
183
        }
184
        
185 1
        $this->currentCommand = $command;
186 1
        $this->processCommand();
187 1
    }
188
    
189
    /**
190
     * Executes a command, without handling any aftermath.
191
     * The `onComplete` callback will be immediately invoked, regardless of the `waitForCompletion` value.
192
     * @param \Plasma\CommandInterface  $command
193
     * @return void
194
     */
195
    function executeCommand(\Plasma\CommandInterface $command): void {
196
        $this->processCommand($command);
197
    }
198
    
199
    /**
200
     * Marks the command itself as finished, if currently running.
201
     * @param \Plasma\Drivers\MySQL\Commands\CommandInterface  $command
202
     * @return void
203
     */
204
    function markCommandAsFinished(\Plasma\CommandInterface $command): void {
205
        if($command === $this->currentCommand) {
206
            $this->currentCommand = null;
207
        }
208
        
209
        $command->onComplete();
210
    }
211
    
212
    /**
213
     * Get the parser state.
214
     * @return int
215
     */
216
    function getState(): int {
217
        return $this->state;
218
    }
219
    
220
    /**
221
     * Get the handshake message, or null.
222
     * @return \Plasma\Drivers\MySQL\Messages\HandshakeMessage|null
223
     */
224
    function getHandshakeMessage(): ?\Plasma\Drivers\MySQL\Messages\HandshakeMessage {
225
        return $this->handshakeMessage;
226
    }
227
    
228
    /**
229
     * Get the last ok response message, or null.
230
     * @return \Plasma\Drivers\MySQL\Messages\OkResponseMessage|null
231
     */
232
    function getLastOkMessage(): ?\Plasma\Drivers\MySQL\Messages\OkResponseMessage {
233
        return $this->lastOkMessage;
234
    }
235
    
236
    /**
237
     * Enables compression.
238
     * @return void
239
     */
240
    function enableCompression(): void {
241
        $this->compressionEnabled = true;
242
    }
243
    
244
    /**
245
     * Sends a packet to the server.
246
     * @param string  $packet
247
     * @return void
248
     */
249 1
    function sendPacket(string $packet): void {
250 1
        $initPacklen = \strlen($packet);
251
        
252
        do {
253 1
            $partial = \substr($packet, 0, static::CLIENT_MAX_PACKET_SIZE);
254 1
            $partlen = \strlen($partial);
255
            
256 1
            $packet = \substr($packet, static::CLIENT_MAX_PACKET_SIZE);
257 1
            $packlen = \strlen($packet);
258
            
259 1
            $length = \Plasma\BinaryBuffer::writeInt3($partlen);
260 1
            $sequence = \Plasma\BinaryBuffer::writeInt1((++$this->sequenceID));
261
            
262 1
            $packet = $length.$sequence.$partial;
263
            
264 1
            if($this->compressionEnabled && $this->state === static::STATE_OK) {
265
                $packet = $this->compressPacket($packet);
266
            }
267
            
268 1
            $this->connection->write($packet);
269 1
        } while($packlen > static::CLIENT_MAX_PACKET_SIZE);
270
        
271
        // If the packet is exactly the max size, we have to send two packets
272 1
        if($initPacklen === static::CLIENT_MAX_PACKET_SIZE) {
273 1
            $length = \Plasma\BinaryBuffer::writeInt3(0);
274 1
            $sequence = \Plasma\BinaryBuffer::writeInt1((++$this->sequenceID));
275 1
            $packet = $length.$sequence;
276
            
277 1
            if($this->compressionEnabled && $this->state === static::STATE_OK) {
278
                $packet = $this->compressPacket($packet);
279
            }
280
            
281 1
            $this->connection->write($packet);
282
        }
283 1
    }
284
    
285
    /**
286
     * Sets the parse callback.
287
     * @param callable $callback
288
     * @return void
289
     */
290
    function setParseCallback(callable $callback): void {
291
        $this->parseCallback = $callback;
292
    }
293
    
294
    /**
295
     * Processes a command.
296
     * @param \Plasma\CommandInterface|null  $command
297
     * @return void
298
     */
299 1
    protected function processCommand(?\Plasma\CommandInterface $command = null) {
300 1
        if($command === null && $this->currentCommand instanceof \Plasma\CommandInterface) {
301 1
            $command = $this->currentCommand;
302
            
303 1
            if($this->currentCommand instanceof \Plasma\Drivers\MySQL\Commands\CommandInterface) {
304 1
                $state = $command->setParserState();
305 1
                if($state !== -1) {
306
                    $this->state = $state;
307
                }
308
            }
309
        }
310
        
311 1
        if($command === null) {
312
            return;
313
        }
314
        
315 1
        if(!($command instanceof \Plasma\Drivers\MySQL\Commands\CommandInterface) || $command->resetSequence()) {
316 1
            $this->sequenceID = -1;
317 1
            $this->compressionID = -1;
318
        }
319
        
320
        try {
321 1
            $msg = $command->getEncodedMessage();
322 1
        } catch (\Plasma\Exception $e) {
323 1
            $this->currentCommand = null;
324 1
            $command->onError($e);
325 1
            return;
326
        }
327
        
328
        $this->sendPacket($msg);
329
        
330
        if($command !== $this->currentCommand || !$command->waitForCompletion()) {
331
            $command->onComplete();
332
            
333
            if($command === $this->currentCommand) {
334
                $this->currentCommand = null;
335
            }
336
        }
337
    }
338
    
339
    /**
340
     * Processes the buffer.
341
     * @return void
342
     */
343
    protected function processBuffer() {
344
        if($this->buffer->getSize() < 4) {
345
            return;
346
        }
347
        
348
        $buffer = clone $this->buffer;
349
        
350
        $length = $buffer->readInt3();
351
        $this->sequenceID = $buffer->readInt1();
352
        
353
        if($length === static::CLIENT_MAX_PACKET_SIZE) {
354
            $this->buffer->read(($length + 4));
355
            $this->messageBuffer->append($buffer->read($length));
356
            return;
357
        } elseif($this->messageBuffer->getSize() > 0) {
358
            $this->messageBuffer->append($buffer->read($length));
359
            $buffer = $this->messageBuffer;
360
            $this->messageBuffer = new \Plasma\BinaryBuffer();
361
        }
362
        
363
        if($buffer->getSize() < $length) {
364
            return;
365
        }
366
        
367
        if($length > 0) {
368
            $this->buffer->read(($length + 4));
369
            $buffer->slice(0, $length);
370
        } else {
371
            $this->buffer->slice($buffer->getSize());
372
        }
373
        
374
        if($buffer->getSize() === 0) {
375
            return;
376
        }
377
        
378
        /** @var \Plasma\Drivers\MySQL\Messages\MessageInterface  $message */
379
        $message = null;
380
        
381
        if($this->state === static::STATE_INIT) {
382
            $message = new \Plasma\Drivers\MySQL\Messages\HandshakeMessage($this);
383
        } else {
384
            $firstChar = $buffer->read(1);
385
            
386
            $okRespID = \Plasma\Drivers\MySQL\Messages\OkResponseMessage::getID();
387
            $isOkMessage = (
388
                (
389
                    $firstChar === $okRespID &&
390
                    (!($this->currentCommand instanceof \Plasma\Drivers\MySQL\Commands\QueryCommand)
391
                        || \strtoupper(\substr($this->currentCommand->getQuery(), 0, 6)) !== 'SELECT') // Fix for MySQL 5.7
392
                ) ||
393
                (
394
                    $firstChar === \Plasma\Drivers\MySQL\Messages\EOFMessage::getID() &&
395
                    ($this->handshakeMessage->capability & \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_DEPRECATE_EOF) !== 0
396
                )
397
            );
398
            
399
            switch(true) {
400
                case ($firstChar === \Plasma\Drivers\MySQL\Messages\ErrResponseMessage::getID()):
401
                    $message = new \Plasma\Drivers\MySQL\Messages\ErrResponseMessage($this);
402
                break;
403
                case ($this->currentCommand instanceof \Plasma\Drivers\MySQL\Commands\StatementPrepareCommand && $firstChar === $okRespID):
404
                    $message = new \Plasma\Drivers\MySQL\Messages\PrepareStatementOkMessage($this);
405
                break;
406
                case $isOkMessage:
407
                    $message = new \Plasma\Drivers\MySQL\Messages\OkResponseMessage($this);
408
                    $this->lastOkMessage = $message;
409
                    
410
                    $this->driver->getLoop()->futureTick(function () use ($message) {
411
                        $this->driver->emit('eventRelay', array('serverOkMessage', $message));
412
                    });
0 ignored issues
show
Coding Style introduced by
Line indented incorrectly; expected 16 spaces, found 20
Loading history...
413
                break;
414
                case ($this->state < static::STATE_OK && $firstChar === \Plasma\Drivers\MySQL\Messages\AuthMoreDataMessage::getID()):
415
                    $message = new \Plasma\Drivers\MySQL\Messages\AuthMoreDataMessage($this);
416
                break;
417
                case ($this->state < static::STATE_OK && $firstChar === \Plasma\Drivers\MySQL\Messages\AuthSwitchRequestMessage::getID()):
418
                    $message = new \Plasma\Drivers\MySQL\Messages\AuthSwitchRequestMessage($this);
419
                break;
420
                case ($this->state < static::STATE_OK && $firstChar === \Plasma\Drivers\MySQL\Messages\AuthMoreDataMessage::getID()):
421
                    $message = new \Plasma\Drivers\MySQL\Messages\AuthMoreDataMessage($this);
422
                break;
423
                case ($firstChar === \Plasma\Drivers\MySQL\Messages\EOFMessage::getID() && $length < 6):
424
                    $message = new \Plasma\Drivers\MySQL\Messages\EOFMessage($this);
425
                break;
426
                case ($firstChar === \Plasma\Drivers\MySQL\Messages\LocalInFileRequestMessage::getID()):
427
                    if($this->driver->getOptions()['localInFile.enable']) {
428
                        $message = new \Plasma\Drivers\MySQL\Messages\LocalInFileRequestMessage($this);
429
                    } else {
430
                        $this->emit('error', array((new \Plasma\Exception('MySQL server requested a local file, but the driver options is disabled'))));
431
                        
432
                        if($this->buffer->getSize() > 0) {
433
                            $this->driver->getLoop()->futureTick(function () {
434
                                $this->processBuffer();
435
                            });
436
                        }
437
                        
438
                        return;
439
                    }
440
                break;
441
                default:
442
                    $buffer->prepend($firstChar);
443
                    
444
                    if($this->parseCallback !== null) {
445
                        $parse = $this->parseCallback;
446
                        $this->parseCallback = null;
447
                        
448
                        $caller = new \Plasma\Drivers\MySQL\ProtocolOnNextCaller($this, $buffer);
449
                        $parse($caller);
450
                    } elseif($this->currentCommand !== null) {
451
                        $command = $this->currentCommand;
452
                        
453
                        $caller = new \Plasma\Drivers\MySQL\ProtocolOnNextCaller($this, $buffer);
454
                        $command->onNext($caller);
455
                        
456
                        if($command->hasFinished()) {
457
                            $this->currentCommand = null;
458
                            $command->onComplete();
459
                        }
460
                    }
461
                    
462
                    if($this->buffer->getSize() > 0) {
463
                        $this->driver->getLoop()->futureTick(function () {
464
                            $this->processBuffer();
465
                        });
466
                    }
467
                    
468
                    return;
469
                break;
470
            }
471
        }
472
        
473
        $state = $message->setParserState();
474
        if($state !== -1) {
475
            $this->state = $state;
476
        }
477
        
478
        if($message instanceof \Plasma\Drivers\MySQL\Messages\HandshakeMessage) {
479
            $this->handshakeMessage = $message;
480
        }
481
        
482
        $this->handleMessage($buffer, $message);
483
    }
484
    
485
    /**
486
     * Handles an incoming message.
487
     * @param \Plasma\BinaryBuffer                             $buffer
488
     * @param \Plasma\Drivers\MySQL\Messages\MessageInterface  $message
489
     * @return void
490
     */
491
    function handleMessage(\Plasma\BinaryBuffer $buffer, \Plasma\Drivers\MySQL\Messages\MessageInterface $message) {
492
        try {
493
            $buffer = $message->parseMessage($buffer);
494
            if(!$buffer) {
495
                return;
496
            }
497
            
498
            if($this->currentCommand !== null) {
499
                if(
500
                    ($message instanceof \Plasma\Drivers\MySQL\Messages\OkResponseMessage || $message instanceof \Plasma\Drivers\MySQL\Messages\EOFMessage)
501
                    && $this->currentCommand->hasFinished()
502
                ) {
503
                    $command = $this->currentCommand;
504
                    $this->currentCommand = null;
505
                    
506
                    $command->onComplete();
507
                } elseif($message instanceof \Plasma\Drivers\MySQL\Messages\ErrResponseMessage) {
508
                    $error = new \Plasma\Exception($message->errorMessage, $message->errorCode);
509
                    
510
                    $command = $this->currentCommand;
511
                    $this->currentCommand = null;
512
                    
513
                    $command->onError($error);
514
                } else {
515
                    $command = $this->currentCommand;
516
                    $command->onNext($message);
517
                    
518
                    if($command->hasFinished()) {
519
                        if($this->currentCommand === $command) {
520
                            $this->currentCommand = null;
521
                        }
522
                        
523
                        $command->onComplete();
524
                    }
525
                }
526
            } elseif($message instanceof \Plasma\Drivers\MySQL\Messages\ErrResponseMessage) {
527
                $error = new \Plasma\Exception($message->errorMessage, $message->errorCode);
528
                $this->emit('error', array($error));
529
            }
530
            
531
            $this->emit('message', array($message));
532
        } catch (\Plasma\Drivers\MySQL\Messages\ParseException $e) {
533
            $state = $e->getState();
534
            if($state !== null) {
535
                $this->state = $state;
536
            }
537
            
538
            $buffer = $e->getBuffer();
539
            if($buffer !== null) {
540
                $this->buffer->clear();
541
                $this->buffer->append($buffer);
542
            }
543
            
544
            if($this->currentCommand !== null) {
545
                $this->currentCommand->onError($e);
546
            }
547
            
548
            $this->emit('error', array($e));
549
            $this->connection->close();
550
        } catch (\Plasma\Exception $e) {
551
            if($this->currentCommand !== null) {
552
                $command = $this->currentCommand;
553
                $this->currentCommand = null;
554
                
555
                $command->onError($e);
556
            } else {
557
                $this->emit('error', array($e));
558
            }
559
        }
560
        
561
        if($this->buffer->getSize() > 0) {
562
            $this->driver->getLoop()->futureTick(function () {
563
                $this->processBuffer();
564
            });
565
        }
566
    }
567
    
568
    /**
569
     * Compresses a packet.
570
     * @param string  $packet
571
     * @return string
572
     */
573
    protected function compressPacket(string $packet): string {
574
        $length = \strlen($packet);
575
        $packetlen = \Plasma\BinaryBuffer::writeInt3($length);
576
        $id = \Plasma\BinaryBuffer::writeInt1((++$this->compressionID));
577
        
578
        if($length < $this->compressionSizeThreshold) {
579
            return $packetlen.$id.\Plasma\BinaryBuffer::writeInt3(0).$packet;
580
        }
581
        
582
        $compressed = \zlib_encode($packet, \ZLIB_ENCODING_DEFLATE);
583
        $compresslen = \Plasma\BinaryBuffer::writeInt3(\strlen($compressed));
584
        
585
        return $compresslen.$id.$packetlen.$compressed;
586
    }
587
    
588
    /**
589
     * Decompresses the buffer.
590
     * @return void
591
     */
592
    protected function decompressBuffer(): void {
593
        $buffer = new \Plasma\BinaryBuffer();
594
        
595
        // Copy packet header to new buffer
596
        for($i = 0; $i < 7; $i++) {
597
            $buffer->append($this->compressionBuffer[$i]);
598
        }
599
        
600
        $length = $buffer->readInt3();
601
        $this->compressionID = $buffer->readInt1();
602
        $uncompressedLength = $buffer->readInt3();
603
        
604
        if(($this->compressionBuffer->getSize() - 7) < $length) {
605
            return;
606
        }
607
        
608
        $this->compressionBuffer->read(7);
609
        $buffer = null;
610
        
611
        if($uncompressedLength === 0) {
612
            $this->buffer->append($this->compressionBuffer->read($length));
613
            return;
614
        }
615
        
616
        $rawPacket = $this->compressionBuffer->read($length);
617
        $packet = \zlib_decode($rawPacket, $uncompressedLength);
618
        
619
        if(\strlen($packet) !== $uncompressedLength) {
620
            $packet = "\xFF\x00\x00\x00     Invalid compressed packet";
621
            $this->connection->end($packet);
622
            
623
            return;
624
        }
625
        
626
        $this->buffer->append($packet);
627
        
628
        if($this->compressionBuffer->getSize() > 7) {
629
            $this->decompressBuffer();
630
        }
631
    }
632
    
633
    /**
634
     * Adds the events to the connection.
635
     * @return void
636
     */
637 3
    protected function addEvents() {
638
        $this->connection->on('data', function ($chunk) {
639
            if($this->compressionEnabled && $this->state === static::STATE_OK) {
640
                $this->compressionBuffer->append($chunk);
641
                
642
                if($this->compressionBuffer->getSize() > 7) {
643
                    $this->decompressBuffer();
644
                }
645
            } else {
646
                $this->buffer->append($chunk);
647
            }
648
            
649
            $this->processBuffer();
650 3
        });
651
        
652
        $this->connection->on('close', function () {
653
            $this->handleClose();
654 3
        });
655 3
    }
656
    
657
    /**
658
     * Connection close handler.
659
     * @return void
660
     */
661
    protected function handleClose() {
662
        if($this->state === static::STATE_AUTH || $this->state === static::STATE_AUTH_SENT) {
663
            $this->state = static::STATE_AUTH_ERROR;
664
        }
665
        
666
        $this->buffer->clear();
667
        $this->messageBuffer->clear();
668
        $this->compressionBuffer->clear();
669
    }
670
}
671