GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Push — master ( 0c5a9c...6047c1 )
by Charlotte
02:52 queued 44s
created

Driver::__construct()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 9
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 7
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 6
nc 1
nop 2
dl 0
loc 9
ccs 7
cts 7
cp 1
crap 1
rs 10
c 0
b 0
f 0
1
<?php
2
/**
3
 * Plasma Driver MySQL component
4
 * Copyright 2018-2019 PlasmaPHP, All Rights Reserved
5
 *
6
 * Website: https://github.com/PlasmaPHP
7
 * License: https://github.com/PlasmaPHP/driver-mysql/blob/master/LICENSE
8
*/
9
10
namespace Plasma\Drivers\MySQL;
11
12
/**
13
 * The MySQL Driver.
14
 * @internal
15
 */
16
class Driver implements \Plasma\DriverInterface {
17
    use \Evenement\EventEmitterTrait;
18
    
19
    /**
20
     * @var \React\EventLoop\LoopInterface
21
     */
22
    protected $loop;
23
    
24
    /**
25
     * @var array
26
     */
27
    protected $options = array(
28
        'characters.set' => 'utf8mb4',
29
        'characters.collate' => null,
30
        'compression.enable' => !true,
31
        'tls.context' => array(),
32
        'tls.force' => true,
33
        'tls.forceLocal' => false
34
    );
35
    
36
    /**
37
     * @var string[]
38
     */
39
    protected $allowedSchemes = array('mysql', 'tcp', 'tls', 'unix');
40
    
41
    /**
42
     * @var \React\Socket\ConnectorInterface
43
     */
44
    protected $connector;
45
    
46
    /**
47
     * Internal class is intentional used, as there's no other way currently.
48
     * @var \React\Socket\StreamEncryption
49
     * @see https://github.com/reactphp/socket/issues/180
50
     */
51
    protected $encryption;
52
    
53
    /**
54
     * @var \React\Promise\Promise|null
55
     */
56
    protected $connectPromise;
57
    
58
    /**
59
     * @var \React\Socket\Connection
60
     */
61
    protected $connection;
62
    
63
    /**
64
     * @var int
65
     */
66
    protected $connectionState = \Plasma\DriverInterface::CONNECTION_CLOSED;
67
    
68
    /**
69
     * @var \Plasma\Drivers\MySQL\ProtocolParser
70
     */
71
    protected $parser;
72
    
73
    /**
74
     * @var array
75
     */
76
    protected $queue;
77
    
78
    /**
79
     * @var int
80
     */
81
    protected $busy = \Plasma\DriverInterface::STATE_IDLE;
82
    
83
    /**
84
     * @var bool
85
     */
86
    protected $transaction = false;
87
    
88
    /**
89
     * @var \React\Promise\Deferred
90
     */
91
    protected $goingAway;
92
    
93
    /**
94
     * @var string|null
95
     */
96
    protected $charset;
97
    
98
    /**
99
     * Constructor.
100
     * @param \React\EventLoop\LoopInterface  $loop
101
     * @param array                           $options
102
     */
103 84
    function __construct(\React\EventLoop\LoopInterface $loop, array $options) {
104 84
        $this->validateOptions($options);
105
        
106 84
        $this->loop = $loop;
107 84
        $this->options = \array_merge($this->options, $options);
108
        
109 84
        $this->connector = ($options['connector'] ?? (new \React\Socket\Connector($loop)));
110 84
        $this->encryption = new \React\Socket\StreamEncryption($this->loop, false);
111 84
        $this->queue = array();
112 84
    }
113
    
114
    /**
115
     * Destructor.
116
     * @return void
117
     * @internal
118
     */
119 23
    function __destruct() {
120 23
        $this->close();
121 23
    }
122
    
123
    /**
124
     * Returns the event loop.
125
     * @return \React\EventLoop\LoopInterface
126
     */
127 58
    function getLoop(): \React\EventLoop\LoopInterface {
128 58
        return $this->loop;
129
    }
130
    
131
    /**
132
     * Retrieves the current connection state.
133
     * @return int
134
     */
135 10
    function getConnectionState(): int {
136 10
        return $this->connectionState;
137
    }
138
    
139
    /**
140
     * Retrieves the current busy state.
141
     * @return int
142
     */
143 1
    function getBusyState(): int {
144 1
        return $this->busy;
145
    }
146
    
147
    /**
148
     * Get the length of the driver backlog queue.
149
     * @return int
150
     */
151 1
    function getBacklogLength(): int {
152 1
        return \count($this->queue);
153
    }
154
    
155
    /**
156
     * Connects to the given URI.
157
     * @param string  $uri
158
     * @return \React\Promise\PromiseInterface
159
     */
160 63
    function connect(string $uri): \React\Promise\PromiseInterface {
161 63
        if($this->goingAway || $this->connectionState === \Plasma\DriverInterface::CONNECTION_UNUSABLE) {
162 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
163 62
        } elseif($this->connectionState === \Plasma\DriverInterface::CONNECTION_OK) {
164 1
            return \React\Promise\resolve();
165 62
        } elseif($this->connectPromise !== null) {
166 1
            return $this->connectPromise;
167
        }
168
        
169 62
        $pos = \strpos($uri, '://');
170 62
        $unix = false;
0 ignored issues
show
Unused Code introduced by
The assignment to $unix is dead and can be removed.
Loading history...
171
        
172 62
        if($pos === false) {
173 2
            $uri = 'tcp://'.$uri;
174 60
        } elseif(\substr($uri, 0, $pos) === 'unix') {
175 1
            $defaultSocket = '/tmp/mysql.sock';
176
            
177 1
            $apos = \strpos($uri, '@');
178 1
            $spos = \strrpos($uri, '/');
179
            
180 1
            if($apos !== false) {
181 1
                $socket = \substr($uri, ($apos + 1), ($spos - ($apos + 1)));
182 1
                $uri = \substr($uri, 0, ($apos + 1)).'localhost'.\substr($uri, $spos);
183
            } else {
184
                $socket = \substr($uri, ($pos + 3), ($spos - ($pos + 3)));
185
                $uri = 'unix://localhost'.\substr($uri, $spos);
186
            }
187
            
188 1
            if($socket === 'localhost' || $socket === '127.0.0.1' || $socket === '::1') {
189
                $socket = $defaultSocket;
190
            }
191
        }
192
        
193 62
        $parts = \parse_url($uri);
194 62
        if(!isset($parts['scheme']) || !isset($parts['host']) || !\in_array($parts['scheme'], $this->allowedSchemes)) {
195 2
            return \React\Promise\reject((new \InvalidArgumentException('Invalid connect uri given')));
196
        }
197
        
198 60
        if(isset($socket)) {
199 1
            $parts['host'] = $socket;
200
        }
201
        
202 60
        if($parts['scheme'] === 'mysql') {
203 1
            $parts['scheme'] = 'tcp';
204
        }
205
        
206 60
        $host = $parts['scheme'].'://'.$parts['host'].($parts['scheme'] !== 'unix' ? ':'.($parts['port'] ?? 3306) : '');
207 60
        $this->connectionState = static::CONNECTION_STARTED;
208 60
        $resolved = false;
209
        
210
        $this->connectPromise =  $this->connector->connect($host)->then(function (\React\Socket\ConnectionInterface $connection) use ($parts, &$resolved) {
211
            // See description of property encryption
212 60
            if(!($connection instanceof \React\Socket\Connection)) {
213
                throw new \LogicException('Custom connection class is NOT supported yet (encryption limitation)');
214
            }
215
            
216 60
            $this->busy = static::STATE_BUSY;
217 60
            $this->connectionState = static::CONNECTION_MADE;
218 60
            $this->connection = $connection;
219
            
220
            $this->connection->on('error', function (\Throwable $error) {
221
                $this->emit('error', array($error));
222 60
            });
223
            
224
            $this->connection->on('close', function () {
225 3
                $this->connection = null;
226 3
                $this->connectionState = static::CONNECTION_UNUSABLE;
227
                
228 3
                $this->emit('close');
229 60
            });
230
            
231 60
            $deferred = new \React\Promise\Deferred();
232 60
            $this->parser = new \Plasma\Drivers\MySQL\ProtocolParser($this, $this->connection);
233
            
234
            $this->parser->on('error', function (\Throwable $error) use (&$deferred, &$resolved) {
235
                if($resolved) {
236
                    $this->emit('error', array($error));
237
                } else {
238
                    $deferred->reject($error);
239
                }
240 60
            });
241
            
242 60
            $user = ($parts['user'] ?? 'root');
243 60
            $password = ($parts['pass'] ?? '');
244 60
            $db = \ltrim(($parts['path'] ?? ''), '/');
245
            
246 60
            $credentials = \compact('user', 'password', 'db');
247
            
248 60
            $this->startHandshake($credentials, $deferred);
249
            return $deferred->promise()->then(function () use (&$resolved) {
250 59
                $this->busy = static::STATE_IDLE;
251 59
                $resolved = true;
252
                
253 59
                if(\count($this->queue) > 0) {
254
                    $this->parser->invokeCommand($this->getNextCommand());
255
                }
256 60
            });
257 60
        });
258
        
259 60
        if($this->options['characters.set']) {
260 58
            $this->charset = $this->options['characters.set'];
261
            $this->connectPromise = $this->connectPromise->then(function () {
262 57
                $query = 'SET NAMES "'.$this->options['characters.set'].'"'
263 57
                    .($this->options['characters.collate'] ? ' COLLATE "'.$this->options['characters.collate'].'"' : '');
264
                
265 57
                $cmd = new \Plasma\Drivers\MySQL\Commands\QueryCommand($this, $query);
266 57
                $this->executeCommand($cmd);
267
                
268 57
                return $cmd->getPromise();
269 58
            });
270
        }
271
        
272 60
        return $this->connectPromise;
273
    }
274
    
275
    /**
276
     * Pauses the underlying stream I/O consumption.
277
     * If consumption is already paused, this will do nothing.
278
     * @return bool  Whether the operation was successful.
279
     */
280 1
    function pauseStreamConsumption(): bool {
281 1
        if($this->connection === null || $this->goingAway) {
282 1
            return false;
283
        }
284
        
285
        $this->connection->pause();
286
        return true;
287
    }
288
    
289
    /**
290
     * Resumes the underlying stream I/O consumption.
291
     * If consumption is not paused, this will do nothing.
292
     * @return bool  Whether the operation was successful.
293
     */
294 1
    function resumeStreamConsumption(): bool {
295 1
        if($this->connection === null || $this->goingAway) {
296 1
            return false;
297
        }
298
        
299
        $this->connection->resume();
300
        return true;
301
    }
302
    
303
    /**
304
     * Closes all connections gracefully after processing all outstanding requests.
305
     * @return \React\Promise\PromiseInterface
306
     */
307 24
    function close(): \React\Promise\PromiseInterface {
308 24
        if($this->goingAway) {
309 10
            return $this->goingAway->promise();
310
        }
311
        
312 15
        $state = $this->connectionState;
313 15
        $this->connectionState = \Plasma\DriverInterface::CONNECTION_UNUSABLE;
314
        
315 15
        $this->goingAway = new \React\Promise\Deferred();
316
        
317 15
        if(\count($this->queue) === 0) {
318 14
            $this->goingAway->resolve();
319
        }
320
        
321
        return $this->goingAway->promise()->then(function () use ($state) {
322 15
            if($state !== static::CONNECTION_OK) {
323 14
                return;
324
            }
325
            
326 1
            $deferred = new \React\Promise\Deferred();
327
            
328 1
            $quit = new \Plasma\Drivers\MySQL\Commands\QuitCommand();
329
            
330
            $this->connection->once('close', function () use (&$deferred) {
331 1
                $deferred->resolve();
332 1
            });
333
            
334
            $quit->once('end', function () {
335
                $this->connection->close();
336 1
            });
337
            
338 1
            $this->parser->invokeCommand($quit);
339
            
340 1
            return $deferred->promise();
341 15
        });
342
    }
343
    
344
    /**
345
     * Forcefully closes the connection, without waiting for any outstanding requests. This will reject all outstanding requests.
346
     * @return void
347
     */
348 10
    function quit(): void {
349 10
        if($this->goingAway) {
350 1
            return;
351
        }
352
        
353 10
        $state = $this->connectionState;
354 10
        $this->connectionState = \Plasma\DriverInterface::CONNECTION_UNUSABLE;
355
        
356 10
        $this->goingAway = new \React\Promise\Deferred();
357 10
        $this->goingAway->resolve();
358
        
359
        /** @var \Plasma\Drivers\MySQL\Commands\CommandInterface  $command */
360 10
        while($command = \array_shift($this->queue)) {
361 1
            $command->emit('error', array((new \Plasma\Exception('Connection is going away'))));
362
        }
363
        
364 10
        if($state === static::CONNECTION_OK) {
365 1
            $quit = new \Plasma\Drivers\MySQL\Commands\QuitCommand();
366 1
            $this->parser->invokeCommand($quit);
367
            
368 1
            $this->connection->close();
369
        }
370 10
    }
371
    
372
    /**
373
     * Whether this driver is currently in a transaction.
374
     * @return bool
375
     */
376 2
    function isInTransaction(): bool {
377 2
        return $this->transaction;
378
    }
379
    
380
    /**
381
     * Executes a plain query. Resolves with a `QueryResultInterface` instance.
382
     * When the command is done, the driver must check itself back into the client.
383
     * @param \Plasma\ClientInterface  $client
384
     * @param string                   $query
385
     * @return \React\Promise\PromiseInterface
386
     * @throws \Plasma\Exception
387
     * @see \Plasma\QueryResultInterface
388
     */
389 48
    function query(\Plasma\ClientInterface $client, string $query): \React\Promise\PromiseInterface {
390 48
        if($this->goingAway) {
391 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
392 47
        } elseif($this->connectionState !== \Plasma\DriverInterface::CONNECTION_OK) {
393 1
            if($this->connectPromise !== null) {
394
                return $this->connectPromise->then(function () use (&$client, &$query) {
395
                    return $this->query($client, $query);
396
                });
397
            }
398
            
399 1
            throw new \Plasma\Exception('Unable to continue without connection');
400
        }
401
        
402 46
        $command = new \Plasma\Drivers\MySQL\Commands\QueryCommand($this, $query);
403 46
        $this->executeCommand($command);
404
        
405 46
        if(!$this->transaction) {
406
            $command->once('end', function () use (&$client) {
407 44
                $client->checkinConnection($this);
408 44
            });
409
        }
410
        
411 46
        return $command->getPromise();
412
    }
413
    
414
    /**
415
     * Prepares a query. Resolves with a `StatementInterface` instance.
416
     * When the command is done, the driver must check itself back into the client.
417
     * @param \Plasma\ClientInterface  $client
418
     * @param string                   $query
419
     * @return \React\Promise\PromiseInterface
420
     * @throws \Plasma\Exception
421
     * @see \Plasma\StatementInterface
422
     */
423 38
    function prepare(\Plasma\ClientInterface $client, string $query): \React\Promise\PromiseInterface {
424 38
        if($this->goingAway) {
425 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
426 37
        } elseif($this->connectionState !== \Plasma\DriverInterface::CONNECTION_OK) {
427 1
            if($this->connectPromise !== null) {
428
                return $this->connectPromise->then(function () use (&$client, &$query) {
429
                    return $this->prepare($client, $query);
430
                });
431
            }
432
            
433 1
            throw new \Plasma\Exception('Unable to continue without connection');
434
        }
435
        
436 36
        $command = new \Plasma\Drivers\MySQL\Commands\StatementPrepareCommand($client, $this, $query);
437 36
        $this->executeCommand($command);
438
        
439 36
        return $command->getPromise();
440
    }
441
    
442
    /**
443
     * Prepares and executes a query. Resolves with a `QueryResultInterface` instance.
444
     * This is equivalent to prepare -> execute -> close.
445
     * If you need to execute a query multiple times, prepare the query manually for performance reasons.
446
     * @param \Plasma\ClientInterface  $client
447
     * @param string                   $query
448
     * @param array                    $params
449
     * @return \React\Promise\PromiseInterface
450
     * @throws \Plasma\Exception
451
     * @see \Plasma\StatementInterface
452
     */
453 37
    function execute(\Plasma\ClientInterface $client, string $query, array $params = array()): \React\Promise\PromiseInterface {
454 37
        if($this->goingAway) {
455 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
456 36
        } elseif($this->connectionState !== \Plasma\DriverInterface::CONNECTION_OK) {
457 1
            if($this->connectPromise !== null) {
458
                return $this->connectPromise->then(function () use (&$client, &$query, $params) {
459
                    return $this->execute($client, $query, $params);
460
                });
461
            }
462
            
463 1
            throw new \Plasma\Exception('Unable to continue without connection');
464
        }
465
        
466
        return $this->prepare($client, $query)->then(function (\Plasma\StatementInterface $statement) use ($params) {
467
            return $statement->execute($params)->then(function (\Plasma\QueryResultInterface $result) use (&$statement) {
468 34
                if($result instanceof \Plasma\StreamQueryResultInterface) {
469
                    $statement->close()->then(null, function (\Throwable $error) {
470
                        $this->emit('error', array($error));
471 34
                    });
472
                    
473 34
                    return $result;
474
                }
475
                
476
                return $statement->close()->then(function () use ($result) {
477 33
                    return $result;
478 33
                });
479
            }, function (\Throwable $error) use (&$statement) {
480
                return $statement->close()->then(function () use ($error) {
481
                    throw $error;
482
                });
483 35
            });
484 35
        });
485
    }
486
    
487
    /**
488
     * Quotes the string for use in the query.
489
     * @param string  $str
490
     * @param int     $type  For types, see the constants.
491
     * @return string
492
     * @throws \LogicException  Thrown if the driver does not support quoting.
493
     * @throws \Plasma\Exception
494
     */
495 5
    function quote(string $str, int $type = \Plasma\DriverInterface::QUOTE_TYPE_VALUE): string {
496 5
        if($this->parser === null) {
497 1
            throw new \Plasma\Exception('Unable to continue without connection');
498
        }
499
        
500 4
        $message = $this->parser->getLastOkMessage();
501 4
        if($message === null) {
502
            $message = $this->parser->getHandshakeMessage();
503
            
504
            if($message === null) {
505
                throw new \Plasma\Exception('Unable to quote without a previous handshake');
506
            }
507
        }
508
        
509 4
        $pos = \strpos($this->charset, '_');
510 4
        $dbCharset = \substr($this->charset, 0, ($pos !== false ? $pos : \strlen($this->charset)));
511 4
        $realCharset = $this->getRealCharset($dbCharset);
512
        
513 4
        if(($message->statusFlags & \Plasma\Drivers\MySQL\StatusFlags::SERVER_STATUS_NO_BACKSLASH_ESCAPES) !== 0) {
514
            return $this->escapeUsingQuotes($realCharset, $str, $type);
515
        }
516
        
517 4
        return $this->escapeUsingBackslashes($realCharset, $str, $type);
518
    }
519
    
520
    /**
521
     * Escapes using quotes.
522
     * @param string  $realCharset
523
     * @param string  $str
524
     * @param int     $type
525
     * @return string
526
     */
527 2
    function escapeUsingQuotes(string $realCharset, string $str, int $type): string {
528 2
        if($type === \Plasma\DriverInterface::QUOTE_TYPE_IDENTIFIER) {
529 1
            return '`'.\str_replace('`', '``', $str).'`';
530
        }
531
        
532
        $escapeChars = array(
533 1
            '"',
534
            '\\',
535
        );
536
        
537
        $escapeReplace = array(
538 1
            '""',
539
            '\\\\',
540
        );
541
        
542 1
        return '"'.\str_replace($escapeChars, $escapeReplace, $str).'"';
543
    }
544
    
545
    /**
546
     * Escapes using backslashes.
547
     * @param string  $realCharset
548
     * @param string  $str
549
     * @param int     $type
550
     * @return string
551
     */
552 6
    function escapeUsingBackslashes(string $realCharset, string $str, int $type): string {
553 6
        if($type === \Plasma\DriverInterface::QUOTE_TYPE_IDENTIFIER) {
554 3
            return '`'.\str_replace('`', '\\`', $str).'`';
555
        }
556
        
557
        $escapeChars = array(
558 3
            '\\',
559
            '"',
560
        );
561
        
562
        $escapeReplace = array(
563 3
            '\\\\',
564
            '\"',
565
        );
566
        
567 3
        return '"'.\str_replace($escapeChars, $escapeReplace, $str).'"';
568
    }
569
    
570
    /**
571
     * Begins a transaction. Resolves with a `TransactionInterface` instance.
572
     *
573
     * Checks out a connection until the transaction gets committed or rolled back.
574
     * It must be noted that the user is responsible for finishing the transaction. The client WILL NOT automatically
575
     * check the connection back into the pool, as long as the transaction is not finished.
576
     *
577
     * Some databases, including MySQL, automatically issue an implicit COMMIT when a database definition language (DDL)
578
     * statement such as DROP TABLE or CREATE TABLE is issued within a transaction.
579
     * The implicit COMMIT will prevent you from rolling back any other changes within the transaction boundary.
580
     * @param \Plasma\ClientInterface  $client
581
     * @param int                      $isolation  See the `TransactionInterface` constants.
582
     * @return \React\Promise\PromiseInterface
583
     * @throws \Plasma\Exception
584
     * @see \Plasma\TransactionInterface
585
     */
586 3
    function beginTransaction(\Plasma\ClientInterface $client, int $isolation = \Plasma\TransactionInterface::ISOLATION_COMMITTED): \React\Promise\PromiseInterface {
587 3
        if($this->goingAway) {
588 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
589
        }
590
        
591 2
        if($this->transaction) {
592 1
            throw new \Plasma\Exception('Driver is already in transaction');
593
        }
594
        
595
        switch ($isolation) {
596 2
            case \Plasma\TransactionInterface::ISOLATION_UNCOMMITTED:
597
                $query = 'SET SESSION TRANSACTION ISOLATION LEVEL READ UNCOMMITTED';
598
            break;
599 2
            case \Plasma\TransactionInterface::ISOLATION_COMMITTED:
600 2
                $query = 'SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED';
601 2
            break;
602
            case \Plasma\TransactionInterface::ISOLATION_REPEATABLE:
603
                $query = 'SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ';
604
            break;
605
            case \Plasma\TransactionInterface::ISOLATION_SERIALIZABLE:
606
                $query = 'SET SESSION TRANSACTION ISOLATION LEVEL SERIALIZABLE';
607
            break;
608
            default:
609
                throw new \Plasma\Exception('Invalid isolation level given');
610
            break;
611
        }
612
        
613 2
        $this->transaction = true;
614
        
615
        return $this->query($client, $query)->then(function () use (&$client) {
616 2
            return $this->query($client, 'START TRANSACTION');
617
        })->then(function () use (&$client, $isolation) {
618 2
            return (new \Plasma\Transaction($client, $this, $isolation));
619
        })->then(null, function (\Throwable $e) {
620
            $this->transaction = false;
621
            throw $e;
622 2
        });
623
    }
624
    
625
    /**
626
     * Informationally closes a transaction. This method is used by `Transaction` to inform the driver of the end of the transaction.
627
     * @return void
628
     */
629 1
    function endTransaction(): void {
630 1
        $this->transaction = false;
631 1
    }
632
    
633
    /**
634
     * Runs the given command.
635
     * Returns a Promise, which resolves with the `end` event argument (defaults to `null),
636
     * or rejects with the `Throwable` of the `error` event.
637
     * When the command is done, the driver must check itself back into the client.
638
     * @param \Plasma\ClientInterface   $client
639
     * @param \Plasma\CommandInterface  $command
640
     * @return \React\Promise\PromiseInterface
641
     */
642 6
    function runCommand(\Plasma\ClientInterface $client, \Plasma\CommandInterface $command) {
643 6
        if($this->goingAway) {
644 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
645
        }
646
        
647
        return (new \React\Promise\Promise(function (callable $resolve, callable $reject) use (&$client, &$command) {
648
            $command->once('end', function ($value = null) use (&$client, &$resolve) {
649 3
                if(!$this->transaction) {
650 3
                    $client->checkinConnection($this);
651
                }
652
                
653 3
                $resolve($value);
654 5
            });
655
            
656
            $command->once('error', function (\Throwable $error) use (&$client, &$reject) {
657 2
                if(!$this->transaction) {
658 2
                    $client->checkinConnection($this);
659
                }
660
                
661 2
                $reject($error);
662 5
            });
663
            
664 5
            $this->executeCommand($command);
665 5
        }));
666
    }
667
    
668
    /**
669
     * Executes a command.
670
     * @param \Plasma\CommandInterface  $command
671
     * @return void
672
     * @internal
673
     */
674 58
    function executeCommand(\Plasma\CommandInterface $command): void {
675 58
        $this->queue[] = $command;
676
        
677 58
        if($this->parser && $this->busy === static::STATE_IDLE) {
678 57
            $this->parser->invokeCommand($this->getNextCommand());
679
        }
680 58
    }
681
    
682
    /**
683
     * Get the handshake message, or null if none received yet.
684
     * @return \Plasma\Drivers\MySQL\Messages\HandshakeMessage|null
685
     */
686 57
    function getHandshake(): ?\Plasma\Drivers\MySQL\Messages\HandshakeMessage {
687 57
        if($this->parser) {
688 57
            return $this->parser->getHandshakeMessage();
689
        }
690
        
691
        return null;
692
    }
693
    
694
    /**
695
     * Get the next command, or null.
696
     * @return \Plasma\CommandInterface|null
697
     * @internal
698
     */
699 57
    function getNextCommand(): ?\Plasma\CommandInterface {
700 57
        if(\count($this->queue) === 0) {
701 57
            if($this->goingAway) {
702
                $this->goingAway->resolve();
703
            }
704
            
705 57
            return null;
706 57
        } elseif($this->busy === static::STATE_BUSY) {
707
            return null;
708
        }
709
        
710
        /** @var \Plasma\CommandInterface  $command */
711 57
        $command =  \array_shift($this->queue);
712
        
713 57
        if($command->waitForCompletion()) {
714 57
            $this->busy = static::STATE_BUSY;
715
            
716
            $command->once('error', function () use (&$command) {
717 2
                $this->busy = static::STATE_IDLE;
718
                
719 2
                $this->endCommand();
720 57
            });
721
            
722
            $command->once('end', function () use (&$command) {
723 57
                $this->busy = static::STATE_IDLE;
724
                
725 57
                $this->endCommand();
726 57
            });
727
        } else {
728 35
            $this->endCommand();
729
        }
730
        
731 57
        return $command;
732
    }
733
    
734
    /**
735
     * Finishes up a command.
736
     * @return void
737
     */
738 57
    protected function endCommand() {
739
        $this->loop->futureTick(function () {
740 57
            if($this->goingAway && \count($this->queue) === 0) {
741 1
                return $this->goingAway->resolve();
742
            }
743
            
744 57
            $this->parser->invokeCommand($this->getNextCommand());
745 57
        });
746 57
    }
747
    
748
    /**
749
     * Starts the handshake process.
750
     * @param array                    $credentials
751
     * @param \React\Promise\Deferred  $deferred
752
     * @return void
753
     */
754 60
    protected function startHandshake(array $credentials, \React\Promise\Deferred $deferred) {
755
        $listener = function (\Plasma\Drivers\MySQL\Messages\MessageInterface $message) use ($credentials, &$deferred, &$listener) {
756 60
            if($message instanceof \Plasma\Drivers\MySQL\Messages\HandshakeMessage) {
757 60
                $this->parser->removeListener('message', $listener);
758
                
759 60
                $this->connectionState = static::CONNECTION_SETENV;
760 60
                $clientFlags = \Plasma\Drivers\MySQL\ProtocolParser::CLIENT_CAPABILITIES;
761
                
762 60
                \extract($credentials);
763
                
764 60
                if($db !== '') {
765 46
                    $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_CONNECT_WITH_DB;
766
                }
767
                
768 60
                if($this->charset === null) {
769 2
                    $this->charset = \Plasma\Drivers\MySQL\CharacterSetFlags::CHARSET_MAP[$message->characterSet] ?? 'latin1';
770
                }
771
                
772 60
                if(($message->capability & \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_COMPRESS) !== 0 && \extension_loaded('zlib') && $this->options['compression.enable']) {
773 2
                    $this->parser->enableCompression();
774 2
                    $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_COMPRESS;
775
                }
776
                
777
                // Check if we support auth plugins
778 60
                $plugins = \Plasma\Drivers\MySQL\DriverFactory::getAuthPlugins();
779 60
                $plugin = null;
780
                
781 60
                foreach($plugins as $key => $plug) {
782 60
                    if(\is_int($key) && ($message->capability & $key) !== 0) {
783 60
                        $plugin = $plug;
784 60
                        $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_PLUGIN_AUTH;
785 60
                        break;
786
                    } elseif($key === $message->authPluginName) {
787
                        $plugin = $plug;
788
                        $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_PLUGIN_AUTH;
789
                        break;
790
                    }
791
                }
792
                
793 60
                $remote = \parse_url($this->connection->getRemoteAddress());
794
                
795 60
                if($remote !== false && ($remote['host'] !== '127.0.0.1' || $this->options['tls.forceLocal'])) {
796
                    if(($message->capability & \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_SSL) !== 0) { // If SSL supported, connect through SSL
797
                        $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_SSL;
798
                        
799
                        $ssl = new \Plasma\Drivers\MySQL\Commands\SSLRequestCommand($message, $clientFlags);
800
                        
801
                        $ssl->once('end', function () use ($credentials, $clientFlags, $plugin, &$deferred, &$message) {
802
                            $this->connectionState = static::CONNECTION_SSL_STARTUP;
803
                            
804
                            $this->enableTLS()->then(function () use ($credentials, $clientFlags, $plugin, &$deferred, &$message) {
805
                                $this->createHandshakeResponse($message, $credentials, $clientFlags, $plugin, $deferred);
806
                            }, function (\Throwable $error) use (&$deferred) {
807
                                $deferred->reject($$error);
808
                                $this->connection->close();
809
                            });
810
                        });
811
                        
812
                        return $this->parser->invokeCommand($ssl);
813
                    } elseif($this->options['tls.force'] || $this->options['tls.forceLocal']) {
814
                        $deferred->reject((new \Plasma\Exception('TLS is not supported by the server')));
815
                        $this->connection->close();
816
                        return;
817
                    }
818
                }
819
                
820 60
                $this->createHandshakeResponse($message, $credentials, $clientFlags, $plugin, $deferred);
821
            }
822 60
        };
823
        
824 60
        $this->parser->on('message', $listener);
825
        
826
        $this->parser->on('message', function (\Plasma\Drivers\MySQL\Messages\MessageInterface $message) {
827 60
            if($message instanceof \Plasma\Drivers\MySQL\Messages\OkResponseMessage) {
828 59
                $this->connectionState = static::CONNECTION_OK;
829
            }
830
            
831 60
            $this->emit('eventRelay', array('message', $message));
832 60
        });
833 60
    }
834
    
835
    /**
836
     * Enables TLS on the connection.
837
     * @return \React\Promise\PromiseInterface
838
     */
839
    protected function enableTLS(): \React\Promise\PromiseInterface {
840
        // Set required SSL/TLS context options
841
        foreach($this->options['tls.context'] as $name => $value) {
842
            \stream_context_set_option($this->connection->stream, 'ssl', $name, $value);
843
        }
844
        
845
        return $this->encryption->enable($this->connection)->then(null, function (\Throwable $error) {
846
            $this->connection->close();
847
            throw new \RuntimeException('Connection failed during TLS handshake: '.$error->getMessage(), $error->getCode());
848
        });
849
    }
850
    
851
    /**
852
     * Sends the auth command.
853
     * @param \Plasma\Drivers\MySQL\Messages\HandshakeMessage  $message
854
     * @param array                                            $credentials
855
     * @param int                                              $clientFlags
856
     * @param string|null                                      $plugin
857
     * @param \React\Promise\Deferred                          $deferred
858
     * @return void
859
     */
860 60
    protected function createHandshakeResponse(
861
        \Plasma\Drivers\MySQL\Messages\HandshakeMessage $message, array $credentials, int $clientFlags, ?string $plugin, \React\Promise\Deferred $deferred
862
    ) {
863 60
        \extract($credentials);
864
        
865 60
        $auth = new \Plasma\Drivers\MySQL\Commands\HandshakeResponseCommand($this->parser, $message, $clientFlags, $plugin, $user, $password, $db);
866
        
867
        $auth->once('end', function () use (&$deferred) {
868
            $this->loop->futureTick(function () use (&$deferred) {
869 59
                $deferred->resolve();
870 59
            });
871 60
        });
872
        
873
        $auth->once('error', function (\Throwable $error) use (&$deferred) {
874 1
            $deferred->reject($error);
875 1
            $this->connection->close();
876 60
        });
877
        
878 60
        if($plugin) {
879
            $listener = function (\Plasma\Drivers\MySQL\Messages\MessageInterface $message) use ($password, &$deferred, &$listener) {
880
                /** @var \Plasma\Drivers\MySQL\AuthPlugins\AuthPluginInterface|null  $plugin */
881 60
                static $plugin;
882
                
883 60
                if($message instanceof \Plasma\Drivers\MySQL\Messages\AuthSwitchRequestMessage) {
884
                    $name = $message->authPluginName;
885
                    
886
                    if($name !== null) {
887
                        $plugins = \Plasma\Drivers\MySQL\DriverFactory::getAuthPlugins();
888
                        foreach($plugins as $key => $plug) {
889
                            if($key === $name) {
890
                                $plugin = new $plug($this->parser, $this->parser->getHandshakeMessage());
891
                                
892
                                $command = new \Plasma\Drivers\MySQL\Commands\AuthSwitchResponseCommand($message, $plugin, $password);
893
                                return $this->parser->invokeCommand($command);
894
                            }
895
                        }
896
                    }
897
                    
898
                    $deferred->reject((new \Plasma\Exception('Requested authentication method '.($name ? '"'.$name.'" ' : '').'is not supported')));
899 60
                } elseif($message instanceof \Plasma\Drivers\MySQL\Messages\AuthMoreDataMessage) {
900
                    if($plugin === null) {
901
                        $deferred->reject((new \Plasma\Exception('No auth plugin is in use, but we received auth more data packet')));
902
                        return $this->connection->close();
903
                    }
904
                    
905
                    try {
906
                        $command = $plugin->receiveMoreData($message);
907
                        return $this->parser->invokeCommand($command);
908
                    } catch (\Plasma\Exception $e) {
909
                        $deferred->reject($e);
910
                        $this->connection->close();
911
                    }
912 60
                } elseif($message instanceof \Plasma\Drivers\MySQL\Messages\OkResponseMessage) {
913 59
                    $this->parser->removeListener('message', $listener);
914
                }
915 60
            };
916
            
917 60
            $this->parser->on('message', $listener);
918
        }
919
        
920 60
        $this->parser->invokeCommand($auth);
921 60
        $this->connectionState = static::CONNECTION_AWAITING_RESPONSE;
922 60
    }
923
    
924
    /**
925
     * Get the real charset from the DB charset.
926
     * @param string  $charset
927
     * @return string
928
     */
929 4
    protected function getRealCharset(string $charset): string {
930 4
        if(\substr($charset, 0, 4) === 'utf8') {
931 2
            return 'UTF-8';
932
        }
933
        
934 2
        $charsets = \mb_list_encodings();
935
        
936 2
        foreach($charsets as $set) {
937 2
            if(\stripos($set, $charset) === 0) {
938 2
                return $set;
939
            }
940
        }
941
        
942 2
        return 'UTF-8';
943
    }
944
    
945
    /**
946
     * Validates the given options.
947
     * @param array  $options
948
     * @return void
949
     * @throws \InvalidArgumentException
950
     */
951 84
    protected function validateOptions(array $options) {
952 84
        \CharlotteDunois\Validation\Validator::make($options, array(
953 84
            'characters.set' => 'string',
954
            'characters.collate' => 'string',
955
            'compression.enable' => 'boolean',
956
            'connector' => 'class:'.\React\Socket\ConnectorInterface::class.'=object',
957
            'packet.maxAllowedSize' => 'integer|min:0|max:'.\Plasma\Drivers\MySQL\ProtocolParser::CLIENT_MAX_PACKET_SIZE,
958
            'tls.context' => 'array',
959
            'tls.force' => 'boolean',
960
            'tls.forceLocal' => 'boolean'
961 84
        ), true)->throw(\InvalidArgumentException::class);
962 84
    }
963
}
964