GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Passed
Push — master ( bfd6a8...307248 )
by Charlotte
09:25
created

Driver::close()   A

Complexity

Conditions 4
Paths 3

Size

Total Lines 34
Code Lines 18

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 17
CRAP Score 4.0027

Importance

Changes 0
Metric Value
cc 4
eloc 18
nc 3
nop 0
dl 0
loc 34
ccs 17
cts 18
cp 0.9444
crap 4.0027
rs 9.6666
c 0
b 0
f 0
1
<?php
2
/**
3
 * Plasma Driver MySQL component
4
 * Copyright 2018-2019 PlasmaPHP, All Rights Reserved
5
 *
6
 * Website: https://github.com/PlasmaPHP
7
 * License: https://github.com/PlasmaPHP/driver-mysql/blob/master/LICENSE
8
*/
9
10
namespace Plasma\Drivers\MySQL;
11
12
/**
13
 * The MySQL Driver.
14
 * @internal
15
 */
16
class Driver implements \Plasma\DriverInterface {
17
    use \Evenement\EventEmitterTrait;
18
    
19
    /**
20
     * @var \React\EventLoop\LoopInterface
21
     */
22
    protected $loop;
23
    
24
    /**
25
     * @var array
26
     */
27
    protected $options = array(
28
        'characters.set' => 'utf8mb4',
29
        'characters.collate' => null,
30
        'compression.enable' => true,
31
        'packet.maxAllowedSize' => \Plasma\Drivers\MySQL\ProtocolParser::CLIENT_MAX_PACKET_SIZE,
32
        'tls.context' => array(),
33
        'tls.force' => true,
34
        'tls.forceLocal' => false
35
    );
36
    
37
    /**
38
     * @var string[]
39
     */
40
    protected $allowedSchemes = array('mysql', 'tcp', 'tls', 'unix');
41
    
42
    /**
43
     * @var \React\Socket\ConnectorInterface
44
     */
45
    protected $connector;
46
    
47
    /**
48
     * Internal class is intentional used, as there's no other way currently.
49
     * @var \React\Socket\StreamEncryption
50
     * @see https://github.com/reactphp/socket/issues/180
51
     */
52
    protected $encryption;
53
    
54
    /**
55
     * @var \React\Promise\Promise|null
56
     */
57
    protected $connectPromise;
58
    
59
    /**
60
     * @var \React\Socket\Connection
61
     */
62
    protected $connection;
63
    
64
    /**
65
     * @var int
66
     */
67
    protected $connectionState = \Plasma\DriverInterface::CONNECTION_CLOSED;
68
    
69
    /**
70
     * @var \Plasma\Drivers\MySQL\ProtocolParser
71
     */
72
    protected $parser;
73
    
74
    /**
75
     * @var array
76
     */
77
    protected $queue;
78
    
79
    /**
80
     * @var int
81
     */
82
    protected $busy = \Plasma\DriverInterface::STATE_IDLE;
83
    
84
    /**
85
     * @var bool
86
     */
87
    protected $transaction = false;
88
    
89
    /**
90
     * @var \React\Promise\Deferred
91
     */
92
    protected $goingAway;
93
    
94
    /**
95
     * @var string|null
96
     */
97
    protected $charset;
98
    
99
    /**
100
     * Constructor.
101
     * @param \React\EventLoop\LoopInterface  $loop
102
     * @param array                           $options
103
     */
104 83
    function __construct(\React\EventLoop\LoopInterface $loop, array $options) {
105 83
        $this->validateOptions($options);
106
        
107 83
        $this->loop = $loop;
108 83
        $this->options = \array_merge($this->options, $options);
109
        
110 83
        $this->connector = ($options['connector'] ?? (new \React\Socket\Connector($loop)));
111 83
        $this->encryption = new \React\Socket\StreamEncryption($this->loop, false);
112 83
        $this->queue = array();
113 83
    }
114
    
115
    /**
116
     * Destructor.
117
     * @return void
118
     * @internal
119
     */
120 23
    function __destruct() {
121 23
        $this->close();
122 23
    }
123
    
124
    /**
125
     * Returns the event loop.
126
     * @return \React\EventLoop\LoopInterface
127
     */
128 57
    function getLoop(): \React\EventLoop\LoopInterface {
129 57
        return $this->loop;
130
    }
131
    
132
    /**
133
     * Retrieves the current connection state.
134
     * @return int
135
     */
136 9
    function getConnectionState(): int {
137 9
        return $this->connectionState;
138
    }
139
    
140
    /**
141
     * Retrieves the current busy state.
142
     * @return int
143
     */
144 1
    function getBusyState(): int {
145 1
        return $this->busy;
146
    }
147
    
148
    /**
149
     * Get the length of the driver backlog queue.
150
     * @return int
151
     */
152 1
    function getBacklogLength(): int {
153 1
        return \count($this->queue);
154
    }
155
    
156
    /**
157
     * Connects to the given URI.
158
     * @param string  $uri
159
     * @return \React\Promise\PromiseInterface
160
     */
161 62
    function connect(string $uri): \React\Promise\PromiseInterface {
162 62
        if($this->goingAway || $this->connectionState === \Plasma\DriverInterface::CONNECTION_UNUSABLE) {
163 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
164 61
        } elseif($this->connectionState === \Plasma\DriverInterface::CONNECTION_OK) {
165 1
            return \React\Promise\resolve();
166 61
        } elseif($this->connectPromise !== null) {
167 1
            return $this->connectPromise;
168
        }
169
        
170 61
        if(\strpos($uri, '://') === false) {
171 2
            $uri = 'tcp://'.$uri;
172
        }
173
        
174 61
        $parts = \parse_url($uri);
175 61
        if(!isset($parts['scheme']) || !isset($parts['host']) || !\in_array($parts['scheme'], $this->allowedSchemes)) {
176 2
            return \React\Promise\reject((new \InvalidArgumentException('Invalid connect uri given')));
177
        }
178
        
179 59
        if($parts['scheme'] === 'mysql') {
180 1
            $parts['scheme'] = 'tcp';
181
        }
182
        
183 59
        $host = $parts['scheme'].'://'.$parts['host'].($parts['scheme'] !== 'unix' ? ':'.($parts['port'] ?? 3306) : '');
184 59
        $this->connectionState = static::CONNECTION_STARTED;
185 59
        $resolved = false;
186
        
187
        $this->connectPromise =  $this->connector->connect($host)->then(function (\React\Socket\ConnectionInterface $connection) use ($parts, &$resolved) {
188
            // See description of property encryption
189 59
            if(!($connection instanceof \React\Socket\Connection)) {
190
                throw new \LogicException('Custom connection class is NOT supported yet (encryption limitation)');
191
            }
192
            
193 59
            $this->busy = static::STATE_BUSY;
194 59
            $this->connectionState = static::CONNECTION_MADE;
195 59
            $this->connection = $connection;
196
            
197
            $this->connection->on('error', function (\Throwable $error) {
198
                $this->emit('error', array($error));
199 59
            });
200
            
201
            $this->connection->on('close', function () {
202 3
                $this->connection = null;
203 3
                $this->connectionState = static::CONNECTION_UNUSABLE;
204
                
205 3
                $this->emit('close');
206 59
            });
207
            
208 59
            $deferred = new \React\Promise\Deferred();
209 59
            $this->parser = new \Plasma\Drivers\MySQL\ProtocolParser($this, $this->connection, $this->options['packet.maxAllowedSize']);
210
            
211
            $this->parser->on('error', function (\Throwable $error) use (&$deferred, &$resolved) {
212
                if($resolved) {
213
                    $this->emit('error', array($error));
214
                } else {
215
                    $deferred->reject($error);
216
                }
217 59
            });
218
            
219 59
            $user = ($parts['user'] ?? 'root');
220 59
            $password = ($parts['pass'] ?? '');
221 59
            $db = (!empty($parts['path']) ? \ltrim($parts['path'], '/') : '');
222
            
223 59
            $credentials = \compact('user', 'password', 'db');
224
            
225 59
            $this->startHandshake($credentials, $deferred);
226
            return $deferred->promise()->then(function () use (&$resolved) {
227 58
                $this->busy = static::STATE_IDLE;
228 58
                $resolved = true;
229
                
230 58
                if(\count($this->queue) > 0) {
231
                    $this->parser->invokeCommand($this->getNextCommand());
232
                }
233 59
            });
234 59
        });
235
        
236 59
        if($this->options['characters.set']) {
237 57
            $this->charset = $this->options['characters.set'];
238
            $this->connectPromise = $this->connectPromise->then(function () {
239 56
                $query = 'SET NAMES "'.$this->options['characters.set'].'"'
240 56
                    .($this->options['characters.collate'] ? ' COLLATE "'.$this->options['characters.collate'].'"' : '');
241
                
242 56
                $cmd = new \Plasma\Drivers\MySQL\Commands\QueryCommand($this, $query);
243 56
                $this->executeCommand($cmd);
244
                
245 56
                return $cmd->getPromise();
246 57
            });
247
        }
248
        
249 59
        return $this->connectPromise;
250
    }
251
    
252
    /**
253
     * Pauses the underlying stream I/O consumption.
254
     * If consumption is already paused, this will do nothing.
255
     * @return bool  Whether the operation was successful.
256
     */
257 1
    function pauseStreamConsumption(): bool {
258 1
        if($this->connection === null || $this->goingAway) {
259 1
            return false;
260
        }
261
        
262
        $this->connection->pause();
263
        return true;
264
    }
265
    
266
    /**
267
     * Resumes the underlying stream I/O consumption.
268
     * If consumption is not paused, this will do nothing.
269
     * @return bool  Whether the operation was successful.
270
     */
271 1
    function resumeStreamConsumption(): bool {
272 1
        if($this->connection === null || $this->goingAway) {
273 1
            return false;
274
        }
275
        
276
        $this->connection->resume();
277
        return true;
278
    }
279
    
280
    /**
281
     * Closes all connections gracefully after processing all outstanding requests.
282
     * @return \React\Promise\PromiseInterface
283
     */
284 24
    function close(): \React\Promise\PromiseInterface {
285 24
        if($this->goingAway) {
286 10
            return $this->goingAway->promise();
287
        }
288
        
289 15
        $state = $this->connectionState;
290 15
        $this->connectionState = \Plasma\DriverInterface::CONNECTION_UNUSABLE;
291
        
292 15
        $this->goingAway = new \React\Promise\Deferred();
293
        
294 15
        if(\count($this->queue) === 0) {
295 14
            $this->goingAway->resolve();
296
        }
297
        
298
        return $this->goingAway->promise()->then(function () use ($state) {
299 15
            if($state !== static::CONNECTION_OK) {
300 14
                return;
301
            }
302
            
303 1
            $deferred = new \React\Promise\Deferred();
304
            
305 1
            $quit = new \Plasma\Drivers\MySQL\Commands\QuitCommand();
306
            
307
            $this->connection->once('close', function () use (&$deferred) {
308 1
                $deferred->resolve();
309 1
            });
310
            
311
            $quit->once('end', function () {
312
                $this->connection->close();
313 1
            });
314
            
315 1
            $this->parser->invokeCommand($quit);
316
            
317 1
            return $deferred->promise();
318 15
        });
319
    }
320
    
321
    /**
322
     * Forcefully closes the connection, without waiting for any outstanding requests. This will reject all outstanding requests.
323
     * @return void
324
     */
325 10
    function quit(): void {
326 10
        if($this->goingAway) {
327 1
            return;
328
        }
329
        
330 10
        $state = $this->connectionState;
331 10
        $this->connectionState = \Plasma\DriverInterface::CONNECTION_UNUSABLE;
332
        
333 10
        $this->goingAway = new \React\Promise\Deferred();
334 10
        $this->goingAway->resolve();
335
        
336
        /** @var \Plasma\Drivers\MySQL\Commands\CommandInterface  $command */
337 10
        while($command = \array_shift($this->queue)) {
338 1
            $command->emit('error', array((new \Plasma\Exception('Connection is going away'))));
339
        }
340
        
341 10
        if($state === static::CONNECTION_OK) {
342 1
            $quit = new \Plasma\Drivers\MySQL\Commands\QuitCommand();
343 1
            $this->parser->invokeCommand($quit);
344
            
345 1
            $this->connection->close();
346
        }
347 10
    }
348
    
349
    /**
350
     * Whether this driver is currently in a transaction.
351
     * @return bool
352
     */
353 2
    function isInTransaction(): bool {
354 2
        return $this->transaction;
355
    }
356
    
357
    /**
358
     * Executes a plain query. Resolves with a `QueryResultInterface` instance.
359
     * When the command is done, the driver must check itself back into the client.
360
     * @param \Plasma\ClientInterface  $client
361
     * @param string                   $query
362
     * @return \React\Promise\PromiseInterface
363
     * @throws \Plasma\Exception
364
     * @see \Plasma\QueryResultInterface
365
     */
366 48
    function query(\Plasma\ClientInterface $client, string $query): \React\Promise\PromiseInterface {
367 48
        if($this->goingAway) {
368 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
369 47
        } elseif($this->connectionState !== \Plasma\DriverInterface::CONNECTION_OK) {
370 1
            if($this->connectPromise !== null) {
371
                return $this->connectPromise->then(function () use (&$client, &$query) {
372
                    return $this->query($client, $query);
373
                });
374
            }
375
            
376 1
            throw new \Plasma\Exception('Unable to continue without connection');
377
        }
378
        
379 46
        $command = new \Plasma\Drivers\MySQL\Commands\QueryCommand($this, $query);
380 46
        $this->executeCommand($command);
381
        
382 46
        if(!$this->transaction) {
383
            $command->once('end', function () use (&$client) {
384 44
                $client->checkinConnection($this);
385 44
            });
386
        }
387
        
388 46
        return $command->getPromise();
389
    }
390
    
391
    /**
392
     * Prepares a query. Resolves with a `StatementInterface` instance.
393
     * When the command is done, the driver must check itself back into the client.
394
     * @param \Plasma\ClientInterface  $client
395
     * @param string                   $query
396
     * @return \React\Promise\PromiseInterface
397
     * @throws \Plasma\Exception
398
     * @see \Plasma\StatementInterface
399
     */
400 38
    function prepare(\Plasma\ClientInterface $client, string $query): \React\Promise\PromiseInterface {
401 38
        if($this->goingAway) {
402 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
403 37
        } elseif($this->connectionState !== \Plasma\DriverInterface::CONNECTION_OK) {
404 1
            if($this->connectPromise !== null) {
405
                return $this->connectPromise->then(function () use (&$client, &$query) {
406
                    return $this->prepare($client, $query);
407
                });
408
            }
409
            
410 1
            throw new \Plasma\Exception('Unable to continue without connection');
411
        }
412
        
413 36
        $command = new \Plasma\Drivers\MySQL\Commands\StatementPrepareCommand($client, $this, $query);
414 36
        $this->executeCommand($command);
415
        
416 36
        return $command->getPromise();
417
    }
418
    
419
    /**
420
     * Prepares and executes a query. Resolves with a `QueryResultInterface` instance.
421
     * This is equivalent to prepare -> execute -> close.
422
     * If you need to execute a query multiple times, prepare the query manually for performance reasons.
423
     * @param \Plasma\ClientInterface  $client
424
     * @param string                   $query
425
     * @param array                    $params
426
     * @return \React\Promise\PromiseInterface
427
     * @throws \Plasma\Exception
428
     * @see \Plasma\StatementInterface
429
     */
430 37
    function execute(\Plasma\ClientInterface $client, string $query, array $params = array()): \React\Promise\PromiseInterface {
431 37
        if($this->goingAway) {
432 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
433 36
        } elseif($this->connectionState !== \Plasma\DriverInterface::CONNECTION_OK) {
434 1
            if($this->connectPromise !== null) {
435
                return $this->connectPromise->then(function () use (&$client, &$query, $params) {
436
                    return $this->execute($client, $query, $params);
437
                });
438
            }
439
            
440 1
            throw new \Plasma\Exception('Unable to continue without connection');
441
        }
442
        
443
        return $this->prepare($client, $query)->then(function (\Plasma\StatementInterface $statement) use ($params) {
444
            return $statement->execute($params)->then(function (\Plasma\QueryResultInterface $result) use (&$statement) {
445 34
                if($result instanceof \Plasma\StreamQueryResultInterface) {
446
                    $statement->close(null, function (\Throwable $error) {
447
                        $this->emit('error', array($error));
448 34
                    });
449
                    
450 34
                    return $result;
451
                }
452
                
453
                return $statement->close()->then(function () use ($result) {
454 33
                    return $result;
455 33
                });
456
            }, function (\Throwable $error) use (&$statement) {
457
                return $statement->close()->then(function () use ($error) {
458
                    throw $error;
459
                });
460 35
            });
461 35
        });
462
    }
463
    
464
    /**
465
     * Quotes the string for use in the query.
466
     * @param string  $str
467
     * @param int     $type  For types, see the constants.
468
     * @return string
469
     * @throws \LogicException  Thrown if the driver does not support quoting.
470
     * @throws \Plasma\Exception
471
     */
472 5
    function quote(string $str, int $type = \Plasma\DriverInterface::QUOTE_TYPE_VALUE): string {
473 5
        if($this->parser === null) {
474 1
            throw new \Plasma\Exception('Unable to continue without connection');
475
        }
476
        
477 4
        $message = $this->parser->getLastOkMessage();
478 4
        if($message === null) {
479
            $message = $this->parser->getHandshakeMessage();
480
            
481
            if($message === null) {
482
                throw new \Plasma\Exception('Unable to quote without a previous handshake');
483
            }
484
        }
485
        
486 4
        $pos = \strpos($this->charset, '_');
487 4
        $dbCharset = \substr($this->charset, 0, ($pos !== false ? $pos : \strlen($this->charset)));
488 4
        $realCharset = $this->getRealCharset($dbCharset);
489
        
490 4
        if(($message->statusFlags & \Plasma\Drivers\MySQL\StatusFlags::SERVER_STATUS_NO_BACKSLASH_ESCAPES) !== 0) {
491
            return $this->escapeUsingQuotes($realCharset, $str, $type);
492
        }
493
        
494 4
        return $this->escapeUsingBackslashes($realCharset, $str, $type);
495
    }
496
    
497
    /**
498
     * Escapes using quotes.
499
     * @param string  $realCharset
500
     * @param string  $str
501
     * @param int     $type
502
     * @return string
503
     */
504 2
    function escapeUsingQuotes(string $realCharset, string $str, int $type): string {
505 2
        if($type === \Plasma\DriverInterface::QUOTE_TYPE_IDENTIFIER) {
506 1
            return '`'.\str_replace('`', '``', $str).'`';
507
        }
508
        
509
        $escapeChars = array(
510 1
            '"',
511
            '\\',
512
        );
513
        
514
        $escapeReplace = array(
515 1
            '""',
516
            '\\\\',
517
        );
518
        
519 1
        return '"'.\str_replace($escapeChars, $escapeReplace, $str).'"';
520
    }
521
    
522
    /**
523
     * Escapes using backslashes.
524
     * @param string  $realCharset
525
     * @param string  $str
526
     * @param int     $type
527
     * @return string
528
     */
529 6
    function escapeUsingBackslashes(string $realCharset, string $str, int $type): string {
530 6
        if($type === \Plasma\DriverInterface::QUOTE_TYPE_IDENTIFIER) {
531 3
            return '`'.\str_replace('`', '\\`', $str).'`';
532
        }
533
        
534
        $escapeChars = array(
535 3
            '\\',
536
            '"',
537
        );
538
        
539
        $escapeReplace = array(
540 3
            '\\\\',
541
            '\"',
542
        );
543
        
544 3
        return '"'.\str_replace($escapeChars, $escapeReplace, $str).'"';
545
    }
546
    
547
    /**
548
     * Begins a transaction. Resolves with a `TransactionInterface` instance.
549
     *
550
     * Checks out a connection until the transaction gets committed or rolled back.
551
     * It must be noted that the user is responsible for finishing the transaction. The client WILL NOT automatically
552
     * check the connection back into the pool, as long as the transaction is not finished.
553
     *
554
     * Some databases, including MySQL, automatically issue an implicit COMMIT when a database definition language (DDL)
555
     * statement such as DROP TABLE or CREATE TABLE is issued within a transaction.
556
     * The implicit COMMIT will prevent you from rolling back any other changes within the transaction boundary.
557
     * @param \Plasma\ClientInterface  $client
558
     * @param int                      $isolation  See the `TransactionInterface` constants.
559
     * @return \React\Promise\PromiseInterface
560
     * @throws \Plasma\Exception
561
     * @see \Plasma\TransactionInterface
562
     */
563 3
    function beginTransaction(\Plasma\ClientInterface $client, int $isolation = \Plasma\TransactionInterface::ISOLATION_COMMITTED): \React\Promise\PromiseInterface {
564 3
        if($this->goingAway) {
565 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
566
        }
567
        
568 2
        if($this->transaction) {
569 1
            throw new \Plasma\Exception('Driver is already in transaction');
570
        }
571
        
572
        switch ($isolation) {
573 2
            case \Plasma\TransactionInterface::ISOLATION_UNCOMMITTED:
574
                $query = 'SET SESSION TRANSACTION ISOLATION LEVEL READ UNCOMMITTED';
575
            break;
576 2
            case \Plasma\TransactionInterface::ISOLATION_COMMITTED:
577 2
                $query = 'SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED';
578 2
            break;
579
            case \Plasma\TransactionInterface::ISOLATION_REPEATABLE:
580
                $query = 'SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ';
581
            break;
582
            case \Plasma\TransactionInterface::ISOLATION_SERIALIZABLE:
583
                $query = 'SET SESSION TRANSACTION ISOLATION LEVEL SERIALIZABLE';
584
            break;
585
            default:
586
                throw new \Plasma\Exception('Invalid isolation level given');
587
            break;
588
        }
589
        
590 2
        $this->transaction = true;
591
        
592
        return $this->query($client, $query)->then(function () use (&$client) {
593 2
            return $this->query($client, 'START TRANSACTION');
594
        })->then(function () use (&$client, $isolation) {
595 2
            return (new \Plasma\Transaction($client, $this, $isolation));
596
        })->then(null, function (\Throwable $e) {
597
            $this->transaction = false;
598
            throw $e;
599 2
        });
600
    }
601
    
602
    /**
603
     * Informationally closes a transaction. This method is used by `Transaction` to inform the driver of the end of the transaction.
604
     * @return void
605
     */
606 1
    function endTransaction(): void {
607 1
        $this->transaction = false;
608 1
    }
609
    
610
    /**
611
     * Runs the given command.
612
     * Returns a Promise, which resolves with the `end` event argument (defaults to `null),
613
     * or rejects with the `Throwable` of the `error` event.
614
     * When the command is done, the driver must check itself back into the client.
615
     * @param \Plasma\ClientInterface   $client
616
     * @param \Plasma\CommandInterface  $command
617
     * @return \React\Promise\PromiseInterface
618
     */
619 6
    function runCommand(\Plasma\ClientInterface $client, \Plasma\CommandInterface $command) {
620 6
        if($this->goingAway) {
621 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
622
        }
623
        
624
        return (new \React\Promise\Promise(function (callable $resolve, callable $reject) use (&$client, &$command) {
625
            $command->once('end', function ($value = null) use (&$client, &$resolve) {
626 3
                if(!$this->transaction) {
627 3
                    $client->checkinConnection($this);
628
                }
629
                
630 3
                $resolve($value);
631 5
            });
632
            
633
            $command->once('error', function (\Throwable $error) use (&$client, &$reject) {
634 2
                if(!$this->transaction) {
635 2
                    $client->checkinConnection($this);
636
                }
637
                
638 2
                $reject($error);
639 5
            });
640
            
641 5
            $this->executeCommand($command);
642 5
        }));
643
    }
644
    
645
    /**
646
     * Executes a command.
647
     * @param \Plasma\CommandInterface  $command
648
     * @return void
649
     * @internal
650
     */
651 57
    function executeCommand(\Plasma\CommandInterface $command): void {
652 57
        $this->queue[] = $command;
653
        
654 57
        if($this->parser && $this->busy === static::STATE_IDLE) {
655 56
            $this->parser->invokeCommand($this->getNextCommand());
656
        }
657 57
    }
658
    
659
    /**
660
     * Get the handshake message, or null if none received yet.
661
     * @return \Plasma\Drivers\MySQL\Messages\HandshakeMessage|null
662
     */
663 56
    function getHandshake(): ?\Plasma\Drivers\MySQL\Messages\HandshakeMessage {
664 56
        if($this->parser) {
665 56
            return $this->parser->getHandshakeMessage();
666
        }
667
        
668
        return null;
669
    }
670
    
671
    /**
672
     * Get the next command, or null.
673
     * @return \Plasma\CommandInterface|null
674
     * @internal
675
     */
676 56
    function getNextCommand(): ?\Plasma\CommandInterface {
677 56
        if(\count($this->queue) === 0) {
678 56
            if($this->goingAway) {
679
                $this->goingAway->resolve();
680
            }
681
            
682 56
            return null;
683 56
        } elseif($this->busy === static::STATE_BUSY) {
684
            return null;
685
        }
686
        
687
        /** @var \Plasma\CommandInterface  $command */
688 56
        $command =  \array_shift($this->queue);
689
        
690 56
        if($command->waitForCompletion()) {
691 56
            $this->busy = static::STATE_BUSY;
692
            
693
            $command->once('error', function () use (&$command) {
694 2
                $this->busy = static::STATE_IDLE;
695
                
696 2
                $this->endCommand();
697 56
            });
698
            
699
            $command->once('end', function () use (&$command) {
700 56
                $this->busy = static::STATE_IDLE;
701
                
702 56
                $this->endCommand();
703 56
            });
704
        } else {
705 35
            $this->endCommand();
706
        }
707
        
708 56
        return $command;
709
    }
710
    
711
    /**
712
     * Finishes up a command.
713
     * @return void
714
     */
715 56
    protected function endCommand() {
716
        $this->loop->futureTick(function () {
717 56
            if($this->goingAway && \count($this->queue) === 0) {
718 1
                return $this->goingAway->resolve();
719
            }
720
            
721 56
            $this->parser->invokeCommand($this->getNextCommand());
722 56
        });
723 56
    }
724
    
725
    /**
726
     * Starts the handshake process.
727
     * @param array                    $credentials
728
     * @param \React\Promise\Deferred  $deferred
729
     * @return void
730
     */
731 59
    protected function startHandshake(array $credentials, \React\Promise\Deferred $deferred) {
732
        $listener = function (\Plasma\Drivers\MySQL\Messages\MessageInterface $message) use ($credentials, &$deferred, &$listener) {
733 59
            if($message instanceof \Plasma\Drivers\MySQL\Messages\HandshakeMessage) {
734 59
                $this->parser->removeListener('message', $listener);
735
                
736 59
                $this->connectionState = static::CONNECTION_SETENV;
737 59
                $clientFlags = \Plasma\Drivers\MySQL\ProtocolParser::CLIENT_CAPABILITIES;
738
                
739 59
                \extract($credentials);
740
                
741 59
                if($db !== '') {
742 46
                    $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_CONNECT_WITH_DB;
743
                }
744
                
745 59
                if($this->charset === null) {
746 2
                    $this->charset = \Plasma\Drivers\MySQL\CharacterSetFlags::CHARSET_MAP[$message->characterSet] ?? 'latin1';
747
                }
748
                
749 59
                if(($message->capability & \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_COMPRESS) !== 0 && \extension_loaded('zlib') && $this->options['compression.enable']) {
750 4
                    $this->parser->enableCompression();
751 4
                    $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_COMPRESS;
752
                }
753
                
754
                // Check if we support auth plugins
755 59
                $plugins = \Plasma\Drivers\MySQL\DriverFactory::getAuthPlugins();
756 59
                $plugin = null;
757
                
758 59
                foreach($plugins as $key => $plug) {
759 59
                    if(\is_int($key) && ($message->capability & $key) !== 0) {
760 59
                        $plugin = $plug;
761 59
                        $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_PLUGIN_AUTH;
762 59
                        break;
763
                    } elseif($key === $message->authPluginName) {
764
                        $plugin = $plug;
765
                        $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_PLUGIN_AUTH;
766
                        break;
767
                    }
768
                }
769
                
770 59
                $remote = \parse_url($this->connection->getRemoteAddress())['host'];
771
                
772 59
                if($remote !== '127.0.0.1' || $this->options['tls.forceLocal']) {
773
                    if(($message->capability & \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_SSL) !== 0) { // If SSL supported, connect through SSL
774
                        $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_SSL;
775
                        
776
                        $ssl = new \Plasma\Drivers\MySQL\Commands\SSLRequestCommand($message, $clientFlags);
777
                        
778
                        $ssl->once('end', function () use ($credentials, $clientFlags, $plugin, &$deferred, &$message) {
779
                            $this->connectionState = static::CONNECTION_SSL_STARTUP;
780
                            
781
                            $this->enableTLS()->then(function () use ($credentials, $clientFlags, $plugin, &$deferred, &$message) {
782
                                $this->createHandshakeResponse($message, $credentials, $clientFlags, $plugin, $deferred);
783
                            }, function (\Throwable $error) use (&$deferred) {
784
                                $deferred->reject($$error);
785
                                $this->connection->close();
786
                            });
787
                        });
788
                        
789
                        return $this->parser->invokeCommand($ssl);
790
                    } elseif($this->options['tls.force'] || $this->options['tls.forceLocal']) {
791
                        $deferred->reject((new \Plasma\Exception('TLS is not supported by the server')));
792
                        $this->connection->close();
793
                        return;
794
                    }
795
                }
796
                
797 59
                $this->createHandshakeResponse($message, $credentials, $clientFlags, $plugin, $deferred);
798
            }
799 59
        };
800
        
801 59
        $this->parser->on('message', $listener);
802
        
803
        $this->parser->on('message', function (\Plasma\Drivers\MySQL\Messages\MessageInterface $message) {
804 59
            if($message instanceof \Plasma\Drivers\MySQL\Messages\OkResponseMessage) {
805 58
                $this->connectionState = static::CONNECTION_OK;
806
            }
807
            
808 59
            $this->emit('eventRelay', array('message', $message));
809 59
        });
810 59
    }
811
    
812
    /**
813
     * Enables TLS on the connection.
814
     * @return \React\Promise\PromiseInterface
815
     */
816
    protected function enableTLS(): \React\Promise\PromiseInterface {
817
        // Set required SSL/TLS context options
818
        foreach($this->options['tls.context'] as $name => $value) {
819
            \stream_context_set_option($this->connection->stream, 'ssl', $name, $value);
820
        }
821
        
822
        return $this->encryption->enable($this->connection)->then(null, function (\Throwable $error) {
823
            $this->connection->close();
824
            throw new \RuntimeException('Connection failed during TLS handshake: '.$error->getMessage(), $error->getCode());
825
        });
826
    }
827
    
828
    /**
829
     * Sends the auth command.
830
     * @param \Plasma\Drivers\MySQL\Messages\HandshakeMessage  $message
831
     * @param array                                            $credentials
832
     * @param int                                              $clientFlags
833
     * @param string|null                                      $plugin
834
     * @param \React\Promise\Deferred                          $deferred
835
     * @return void
836
     */
837 59
    protected function createHandshakeResponse(
838
        \Plasma\Drivers\MySQL\Messages\HandshakeMessage $message, array $credentials, int $clientFlags, ?string $plugin, \React\Promise\Deferred $deferred
839
    ) {
840 59
        \extract($credentials);
841
        
842 59
        $auth = new \Plasma\Drivers\MySQL\Commands\HandshakeResponseCommand($this->parser, $message, $clientFlags, $plugin, $user, $password, $db);
843
        
844
        $auth->once('end', function () use (&$deferred) {
845
            $this->loop->futureTick(function () use (&$deferred) {
846 58
                $deferred->resolve();
847 58
            });
848 59
        });
849
        
850
        $auth->once('error', function (\Throwable $error) use (&$deferred) {
851 1
            $deferred->reject($error);
852 1
            $this->connection->close();
853 59
        });
854
        
855 59
        if($plugin) {
856
            $listener = function (\Plasma\Drivers\MySQL\Messages\MessageInterface $message) use ($password, &$deferred, &$listener) {
857
                /** @var \Plasma\Drivers\MySQL\AuthPlugins\AuthPluginInterface|null  $plugin */
858 59
                static $plugin;
859
                
860 59
                if($message instanceof \Plasma\Drivers\MySQL\Messages\AuthSwitchRequestMessage) {
861
                    $name = $message->authPluginName;
862
                    
863
                    if($name !== null) {
864
                        $plugins = \Plasma\Drivers\MySQL\DriverFactory::getAuthPlugins();
865
                        foreach($plugins as $key => $plug) {
866
                            if($key === $name) {
867
                                $plugin = new $plug($this->parser, $this->parser->getHandshakeMessage());
868
                                
869
                                $command = new \Plasma\Drivers\MySQL\Commands\AuthSwitchResponseCommand($message, $plugin, $password);
870
                                return $this->parser->invokeCommand($command);
871
                            }
872
                        }
873
                    }
874
                    
875
                    $deferred->reject((new \Plasma\Exception('Requested authentication method '.($name ? '"'.$name.'" ' : '').'is not supported')));
876 59
                } elseif($message instanceof \Plasma\Drivers\MySQL\Messages\AuthMoreDataMessage) {
877
                    if($plugin === null) {
878
                        $deferred->reject((new \Plasma\Exception('No auth plugin is in use, but we received auth more data packet')));
879
                        return $this->connection->close();
880
                    }
881
                    
882
                    try {
883
                        $command = $plugin->receiveMoreData($message);
884
                        return $this->parser->invokeCommand($command);
885
                    } catch (\Plasma\Exception $e) {
886
                        $deferred->reject($e);
887
                        $this->connection->close();
888
                    }
889 59
                } elseif($message instanceof \Plasma\Drivers\MySQL\Messages\OkResponseMessage) {
890 58
                    $this->parser->removeListener('message', $listener);
891
                }
892 59
            };
893
            
894 59
            $this->parser->on('message', $listener);
895
        }
896
        
897 59
        $this->parser->invokeCommand($auth);
898 59
        $this->connectionState = static::CONNECTION_AWAITING_RESPONSE;
899 59
    }
900
    
901
    /**
902
     * Get the real charset from the DB charset.
903
     * @param string  $charset
904
     * @return string
905
     */
906 4
    protected function getRealCharset(string $charset): string {
907 4
        if(\substr($charset, 0, 4) === 'utf8') {
908 2
            return 'UTF-8';
909
        }
910
        
911 2
        $charsets = \mb_list_encodings();
912
        
913 2
        foreach($charsets as $set) {
914 2
            if(\stripos($set, $charset) === 0) {
915 2
                return $set;
916
            }
917
        }
918
        
919 2
        return 'UTF-8';
920
    }
921
    
922
    /**
923
     * Validates the given options.
924
     * @param array  $options
925
     * @return void
926
     * @throws \InvalidArgumentException
927
     */
928 83
    protected function validateOptions(array $options) {
929 83
        \CharlotteDunois\Validation\Validator::make($options, array(
930 83
            'characters.set' => 'string',
931
            'characters.collate' => 'string',
932
            'compression.enable' => 'boolean',
933
            'connector' => 'class:'.\React\Socket\ConnectorInterface::class.'=object',
934
            'packet.maxAllowedSize' => 'integer|min:0|max:'.\Plasma\Drivers\MySQL\ProtocolParser::CLIENT_MAX_PACKET_SIZE,
935
            'tls.context' => 'array',
936
            'tls.force' => 'boolean',
937
            'tls.forceLocal' => 'boolean'
938 83
        ), true)->throw(\InvalidArgumentException::class);
939 83
    }
940
}
941