GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Push — master ( 8abffd...754ed9 )
by Charlotte
03:13
created

Driver::connect()   C

Complexity

Conditions 12
Paths 6

Size

Total Lines 81
Code Lines 50

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 44
CRAP Score 12.2487

Importance

Changes 0
Metric Value
cc 12
eloc 50
nc 6
nop 1
dl 0
loc 81
ccs 44
cts 50
cp 0.88
crap 12.2487
rs 6.9666
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
/**
3
 * Plasma Driver MySQL component
4
 * Copyright 2018 PlasmaPHP, All Rights Reserved
5
 *
6
 * Website: https://github.com/PlasmaPHP
7
 * License: https://github.com/PlasmaPHP/driver-mysql/blob/master/LICENSE
8
*/
9
10
namespace Plasma\Drivers\MySQL;
11
12
/**
13
 * The MySQL Driver.
14
 * @internal
15
 */
16
class Driver implements \Plasma\DriverInterface {
17
    use \Evenement\EventEmitterTrait;
18
    
19
    /**
20
     * @var \React\EventLoop\LoopInterface
21
     */
22
    protected $loop;
23
    
24
    /**
25
     * @var array
26
     */
27
    protected $options = array(
28
        'characters.set' => 'utf8mb4',
29
        'characters.collate' => null,
30
        'tls.context' => array(),
31
        'tls.force' => true,
32
        'tls.forceLocal' => false
33
    );
34
    
35
    /**
36
     * @var \React\Socket\ConnectorInterface
37
     */
38
    protected $connector;
39
    
40
    /**
41
     * Internal class is intentional used, as there's no other way currently.
42
     * @var \React\Socket\StreamEncryption
43
     * @see https://github.com/reactphp/socket/issues/180
44
     */
45
    protected $encryption;
46
    
47
    /**
48
     * @var \React\Promise\Promise|null
49
     */
50
    protected $connectPromise;
51
    
52
    /**
53
     * @var \React\Socket\Connection
54
     */
55
    protected $connection;
56
    
57
    /**
58
     * @var int
59
     */
60
    protected $connectionState = \Plasma\DriverInterface::CONNECTION_CLOSED;
61
    
62
    /**
63
     * @var \Plasma\Drivers\MySQL\ProtocolParser
64
     */
65
    protected $parser;
66
    
67
    /**
68
     * @var array
69
     */
70
    protected $queue;
71
    
72
    /**
73
     * @var int
74
     */
75
    protected $busy = \Plasma\DriverInterface::STATE_IDLE;
76
    
77
    /**
78
     * @var bool
79
     */
80
    protected $transaction = false;
81
    
82
    /**
83
     * @var \React\Promise\Deferred
84
     */
85
    protected $goingAway;
86
    
87
    /**
88
     * @var string|null
89
     */
90
    protected $charset;
91
    
92
    /**
93
     * Constructor.
94
     * @param \React\EventLoop\LoopInterface  $loop
95
     * @param array                           $options
96
     */
97 68
    function __construct(\React\EventLoop\LoopInterface $loop, array $options) {
98 68
        $this->validateOptions($options);
99
        
100 68
        $this->loop = $loop;
101 68
        $this->options = \array_merge($this->options, $options);
102
        
103 68
        $this->connector = ($options['connector'] ?? (new \React\Socket\Connector($loop)));
104 68
        $this->encryption = new \React\Socket\StreamEncryption($this->loop, false);
105 68
        $this->queue = array();
106 68
    }
107
    
108
    /**
109
     * Returns the event loop.
110
     * @return \React\EventLoop\LoopInterface
111
     */
112 42
    function getLoop(): \React\EventLoop\LoopInterface {
113 42
        return $this->loop;
114
    }
115
    
116
    /**
117
     * Retrieves the current connection state.
118
     * @return int
119
     */
120 6
    function getConnectionState(): int {
121 6
        return $this->connectionState;
122
    }
123
    
124
    /**
125
     * Retrieves the current busy state.
126
     * @return int
127
     */
128 1
    function getBusyState(): int {
129 1
        return $this->busy;
130
    }
131
    
132
    /**
133
     * Get the length of the driver backlog queue.
134
     * @return int
135
     */
136 1
    function getBacklogLength(): int {
137 1
        return \count($this->queue);
138
    }
139
    
140
    /**
141
     * Connects to the given URI.
142
     * @param string  $uri
143
     * @return \React\Promise\PromiseInterface
144
     */
145 49
    function connect(string $uri): \React\Promise\PromiseInterface {
146 49
        if($this->goingAway || $this->connectionState === \Plasma\DriverInterface::CONNECTION_UNUSABLE) {
147 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
148 48
        } elseif($this->connectionState === \Plasma\DriverInterface::CONNECTION_OK) {
149 1
            return \React\Promise\resolve();
150 48
        } elseif($this->connectPromise !== null) {
151 1
            return $this->connectPromise;
152
        }
153
        
154 48
        $uri = 'mysql://'.\ltrim($uri, 'mysql://');
155
        
156 48
        $parts = \parse_url($uri);
157 48
        if(!isset($parts['host'])) {
158 1
            return \React\Promise\reject((new \InvalidArgumentException('Invalid connect uri given')));
159
        }
160
        
161 47
        $host = $parts['host'].':'.($parts['port'] ?? 3306);
162 47
        $this->connectionState = static::CONNECTION_STARTED;
163 47
        $resolved = false;
164
        
165
        $this->connectPromise =  $this->connector->connect($host)->then(function (\React\Socket\ConnectionInterface $connection) use ($parts, &$resolved) {
166
            // See description of property encryption
167 47
            if(!($connection instanceof \React\Socket\Connection)) {
168
                throw new \LogicException('Custom connection class is NOT supported yet (encryption limitation)');
169
            }
170
            
171 47
            $this->busy = static::STATE_BUSY;
172 47
            $this->connectionState = static::CONNECTION_MADE;
173 47
            $this->connection = $connection;
174
            
175
            $this->connection->on('error', function (\Throwable $error) {
176
                $this->emit('error', array($error));
177 47
            });
178
            
179
            $this->connection->on('close', function () {
180 3
                $this->connection = null;
181 3
                $this->connectionState = static::CONNECTION_UNUSABLE;
182
                
183 3
                $this->emit('close');
184 47
            });
185
            
186 47
            $deferred = new \React\Promise\Deferred();
187 47
            $this->parser = new \Plasma\Drivers\MySQL\ProtocolParser($this, $this->connection);
188
            
189
            $this->parser->on('error', function (\Throwable $error) use (&$deferred, &$resolved) {
190
                if($resolved) {
191
                    $this->emit('error', array($error));
192
                } else {
193
                    $deferred->reject($error);
194
                }
195 47
            });
196
            
197 47
            $user = ($parts['user'] ?? 'root');
198 47
            $password = ($parts['pass'] ?? '');
199 47
            $db = (!empty($parts['path']) ? \ltrim($parts['path'], '/') : '');
200
            
201 47
            $credentials = \compact('user', 'password', 'db');
202
            
203 47
            $this->startHandshake($credentials, $deferred);
204
            return $deferred->promise()->then(function () use (&$resolved) {
205 46
                $this->busy = static::STATE_IDLE;
206 46
                $resolved = true;
207
                
208 46
                if(\count($this->queue) > 0) {
209
                    $this->parser->invokeCommand($this->getNextCommand());
210
                }
211 47
            });
212 47
        });
213
        
214 47
        if($this->options['characters.set']) {
215 46
            $this->charset = $this->options['characters.set'];
216
            $this->connectPromise = $this->connectPromise->then(function () {
217 45
                $query = 'SET NAMES "'.$this->options['characters.set'].'"'
218 45
                    .($this->options['characters.collate'] ? ' COLLATE "'.$this->options['characters.collate'].'"' : '');
219
                
220 45
                $cmd = new \Plasma\Drivers\MySQL\Commands\QueryCommand($this, $query);
221 45
                $this->executeCommand($cmd);
222 46
            });
223
        }
224
        
225 47
        return $this->connectPromise;
226
    }
227
    
228
    /**
229
     * Pauses the underlying stream I/O consumption.
230
     * If consumption is already paused, this will do nothing.
231
     * @return bool  Whether the operation was successful.
232
     */
233 1
    function pauseStreamConsumption(): bool {
234 1
        if($this->connection === null || $this->goingAway) {
235 1
            return false;
236
        }
237
        
238
        $this->connection->pause();
239
        return true;
240
    }
241
    
242
    /**
243
     * Resumes the underlying stream I/O consumption.
244
     * If consumption is not paused, this will do nothing.
245
     * @return bool  Whether the operation was successful.
246
     */
247 1
    function resumeStreamConsumption(): bool {
248 1
        if($this->connection === null || $this->goingAway) {
249 1
            return false;
250
        }
251
        
252
        $this->connection->resume();
253
        return true;
254
    }
255
    
256
    /**
257
     * Closes all connections gracefully after processing all outstanding requests.
258
     * @return \React\Promise\PromiseInterface
259
     */
260 2
    function close(): \React\Promise\PromiseInterface {
261 2
        if($this->goingAway) {
262 2
            return $this->goingAway->promise();
263
        }
264
        
265 1
        $state = $this->connectionState;
266 1
        $this->connectionState = \Plasma\DriverInterface::CONNECTION_UNUSABLE;
267
        
268 1
        $this->goingAway = new \React\Promise\Deferred();
269
        
270 1
        if(\count($this->queue) === 0) {
271
            $this->goingAway->resolve();
272
        }
273
        
274
        return $this->goingAway->promise()->then(function () use ($state) {
275 1
            if($state !== static::CONNECTION_OK) {
276
                return;
277
            }
278
            
279 1
            $deferred = new \React\Promise\Deferred();
280
            
281 1
            $quit = new \Plasma\Drivers\MySQL\Commands\QuitCommand();
282
            
283
            $this->connection->once('close', function () use (&$deferred) {
284 1
                $deferred->resolve();
285 1
            });
286
            
287
            $quit->once('end', function () {
288
                $this->connection->close();
289 1
            });
290
            
291 1
            $this->parser->invokeCommand($quit);
292
            
293 1
            return $deferred->promise();
294 1
        });
295
    }
296
    
297
    /**
298
     * Forcefully closes the connection, without waiting for any outstanding requests. This will reject all outstanding requests.
299
     * @return void
300
     */
301 10
    function quit(): void {
302 10
        if($this->goingAway) {
303 1
            return;
304
        }
305
        
306 10
        $state = $this->connectionState;
307 10
        $this->connectionState = \Plasma\DriverInterface::CONNECTION_UNUSABLE;
308
        
309 10
        $this->goingAway = new \React\Promise\Deferred();
310 10
        $this->goingAway->resolve();
311
        
312
        /** @var \Plasma\Drivers\MySQL\Commands\CommandInterface  $command */
313 10
        while($command = \array_shift($this->queue)) {
314 1
            $command->emit('error', array((new \Plasma\Exception('Connection is going away'))));
315
        }
316
        
317 10
        if($state === static::CONNECTION_OK) {
318 1
            $quit = new \Plasma\Drivers\MySQL\Commands\QuitCommand();
319 1
            $this->parser->invokeCommand($quit);
320
            
321 1
            $this->connection->close();
322
        }
323 10
    }
324
    
325
    /**
326
     * Whether this driver is currently in a transaction.
327
     * @return bool
328
     */
329 2
    function isInTransaction(): bool {
330 2
        return $this->transaction;
331
    }
332
    
333
    /**
334
     * Executes a plain query. Resolves with a `QueryResultInterface` instance.
335
     * When the command is done, the driver must check itself back into the client.
336
     * @param \Plasma\ClientInterface  $client
337
     * @param string                   $query
338
     * @return \React\Promise\PromiseInterface
339
     * @throws \Plasma\Exception
340
     * @see \Plasma\QueryResultInterface
341
     */
342 39
    function query(\Plasma\ClientInterface $client, string $query): \React\Promise\PromiseInterface {
343 39
        if($this->goingAway) {
344 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
345 38
        } elseif($this->connectionState !== \Plasma\DriverInterface::CONNECTION_OK) {
346 1
            if($this->connectPromise !== null) {
347
                return $this->connectPromise->then(function () use (&$client, &$query) {
348
                    return $this->query($client, $query);
349
                });
350
            }
351
            
352 1
            throw new \Plasma\Exception('Unable to continue without connection');
353
        }
354
        
355 37
        $command = new \Plasma\Drivers\MySQL\Commands\QueryCommand($this, $query);
356 37
        $this->executeCommand($command);
357
        
358 37
        if(!$this->transaction) {
359
            $command->once('end', function () use (&$client) {
360 35
                $client->checkinConnection($this);
361 35
            });
362
        }
363
        
364 37
        return $command->getPromise();
365
    }
366
    
367
    /**
368
     * Prepares a query. Resolves with a `StatementInterface` instance.
369
     * When the command is done, the driver must check itself back into the client.
370
     * @param \Plasma\ClientInterface  $client
371
     * @param string                   $query
372
     * @return \React\Promise\PromiseInterface
373
     * @throws \Plasma\Exception
374
     * @see \Plasma\StatementInterface
375
     */
376 36
    function prepare(\Plasma\ClientInterface $client, string $query): \React\Promise\PromiseInterface {
377 36
        if($this->goingAway) {
378 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
379 35
        } elseif($this->connectionState !== \Plasma\DriverInterface::CONNECTION_OK) {
380 1
            if($this->connectPromise !== null) {
381
                return $this->connectPromise->then(function () use (&$client, &$query) {
382
                    return $this->prepare($client, $query);
383
                });
384
            }
385
            
386 1
            throw new \Plasma\Exception('Unable to continue without connection');
387
        }
388
        
389 34
        $command = new \Plasma\Drivers\MySQL\Commands\StatementPrepareCommand($client, $this, $query);
390 34
        $this->executeCommand($command);
391
        
392 34
        return $command->getPromise();
393
    }
394
    
395
    /**
396
     * Prepares and executes a query. Resolves with a `QueryResultInterface` instance.
397
     * This is equivalent to prepare -> execute -> close.
398
     * If you need to execute a query multiple times, prepare the query manually for performance reasons.
399
     * @param \Plasma\ClientInterface  $client
400
     * @param string                   $query
401
     * @param array                    $params
402
     * @return \React\Promise\PromiseInterface
403
     * @throws \Plasma\Exception
404
     * @see \Plasma\StatementInterface
405
     */
406 35
    function execute(\Plasma\ClientInterface $client, string $query, array $params = array()): \React\Promise\PromiseInterface {
407 35
        if($this->goingAway) {
408 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
409 34
        } elseif($this->connectionState !== \Plasma\DriverInterface::CONNECTION_OK) {
410 1
            if($this->connectPromise !== null) {
411
                return $this->connectPromise->then(function () use (&$client, &$query, $params) {
412
                    return $this->execute($client, $query, $params);
413
                });
414
            }
415
            
416 1
            throw new \Plasma\Exception('Unable to continue without connection');
417
        }
418
        
419
        return $this->prepare($client, $query)->then(function (\Plasma\StatementInterface $statement) use ($params) {
420
            return $statement->execute($params)->then(function (\Plasma\QueryResultInterface $result) use (&$statement) {
421 33
                if($result instanceof \Plasma\StreamQueryResultInterface) {
422
                    $statement->close(null, function (\Throwable $error) {
423
                        $this->emit('error', array($error));
424 33
                    });
425
                    
426 33
                    return $result;
427
                }
428
                
429
                return $statement->close()->then(function () use ($result) {
430 32
                    return $result;
431 32
                });
432
            }, function (\Throwable $error) use (&$statement) {
433
                return $statement->close()->then(function () use ($error) {
434
                    throw $error;
435
                });
436 33
            });
437 33
        });
438
    }
439
    
440
    /**
441
     * Quotes the string for use in the query.
442
     * @param string  $str
443
     * @return string
444
     * @throws \LogicException  Thrown if the driver does not support quoting.
445
     * @throws \Plasma\Exception
446
     */
447 3
    function quote(string $str): string {
448 3
        if($this->parser === null) {
449 1
            throw new \Plasma\Exception('Unable to continue without connection');
450
        }
451
        
452 2
        $message = $this->parser->getLastOkMessage();
453 2
        if($message === null) {
454
            $message = $this->parser->getHandshakeMessage();
455
            
456
            if($message === null) {
457
                throw new \Plasma\Exception('Unable to quote without a previous handshake');
458
            }
459
        }
460
        
461 2
        $pos = \strpos($this->charset, '_');
462 2
        $dbCharset = \substr($this->charset, 0, ($pos !== false ? $pos : \strlen($this->charset)));
463 2
        $realCharset = $this->getRealCharset($dbCharset);
464
        
465 2
        if(($message->statusFlags & \Plasma\Drivers\MySQL\StatusFlags::SERVER_STATUS_NO_BACKSLASH_ESCAPES) !== 0) {
466
            return $this->escapeUsingQuotes($realCharset, $str);
467
        }
468
        
469 2
        return $this->escapeUsingBackslashes($realCharset, $str);
470
    }
471
    
472
    /**
473
     * Escapes using quotes.
474
     * @param string  $realCharset
475
     * @param string  $str
476
     * @return string
477
     */
478 1
    function escapeUsingQuotes(string $realCharset, string $str): string {
479
        $escapeChars = array(
480 1
            '"',
481
            '\\',
482
        );
483
        
484
        $escapeReplace = array(
485 1
            '""',
486
            '\\\\',
487
        );
488
        
489 1
        return '"'.\str_replace($escapeChars, $escapeReplace, $str).'"';
490
    }
491
    
492
    /**
493
     * Escapes using backslashes.
494
     * @param string  $realCharset
495
     * @param string  $str
496
     * @return string
497
     */
498 3
    function escapeUsingBackslashes(string $realCharset, string $str): string {
499
        $escapeChars = array(
500 3
            '\\',
501
            '"',
502
        );
503
        
504
        $escapeReplace = array(
505 3
            '\\\\',
506
            '\"',
507
        );
508
        
509 3
        return '"'.\str_replace($escapeChars, $escapeReplace, $str).'"';
510
    }
511
    
512
    /**
513
     * Begins a transaction. Resolves with a `TransactionInterface` instance.
514
     *
515
     * Checks out a connection until the transaction gets committed or rolled back.
516
     * It must be noted that the user is responsible for finishing the transaction. The client WILL NOT automatically
517
     * check the connection back into the pool, as long as the transaction is not finished.
518
     *
519
     * Some databases, including MySQL, automatically issue an implicit COMMIT when a database definition language (DDL)
520
     * statement such as DROP TABLE or CREATE TABLE is issued within a transaction.
521
     * The implicit COMMIT will prevent you from rolling back any other changes within the transaction boundary.
522
     * @param \Plasma\ClientInterface  $client
523
     * @param int                      $isolation  See the `TransactionInterface` constants.
524
     * @return \React\Promise\PromiseInterface
525
     * @throws \Plasma\Exception
526
     * @see \Plasma\TransactionInterface
527
     */
528 3
    function beginTransaction(\Plasma\ClientInterface $client, int $isolation = \Plasma\TransactionInterface::ISOLATION_COMMITTED): \React\Promise\PromiseInterface {
529 3
        if($this->goingAway) {
530 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
531
        }
532
        
533 2
        if($this->transaction) {
534 1
            throw new \Plasma\Exception('Driver is already in transaction');
535
        }
536
        
537
        switch ($isolation) {
538 2
            case \Plasma\TransactionInterface::ISOLATION_UNCOMMITTED:
539
                $query = 'SET SESSION TRANSACTION ISOLATION LEVEL READ UNCOMMITTED';
540
            break;
541 2
            case \Plasma\TransactionInterface::ISOLATION_COMMITTED:
542 2
                $query = 'SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED';
543 2
            break;
544
            case \Plasma\TransactionInterface::ISOLATION_REPEATABLE:
545
                $query = 'SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ';
546
            break;
547
            case \Plasma\TransactionInterface::ISOLATION_SERIALIZABLE:
548
                $query = 'SET SESSION TRANSACTION ISOLATION LEVEL SERIALIZABLE';
549
            break;
550
            default:
551
                throw new \Plasma\Exception('Invalid isolation level given');
552
            break;
553
        }
554
        
555 2
        $this->transaction = true;
556
        
557
        return $this->query($client, $query)->then(function () use (&$client) {
558 2
            return $this->query($client, 'START TRANSACTION');
559
        })->then(function () use (&$client, $isolation) {
560 2
            return (new \Plasma\Transaction($client, $this, $isolation));
561
        })->then(null, function (\Throwable $e) {
562
            $this->transaction = false;
563
            throw $e;
564 2
        });
565
    }
566
    
567
    /**
568
     * Informationally closes a transaction. This method is used by `Transaction` to inform the driver of the end of the transaction.
569
     * @return void
570
     */
571 1
    function endTransaction(): void {
572 1
        $this->transaction = false;
573 1
    }
574
    
575
    /**
576
     * Runs the given command.
577
     * Returns a Promise, which resolves with the `end` event argument (defaults to `null),
578
     * or rejects with the `Throwable` of the `error` event.
579
     * When the command is done, the driver must check itself back into the client.
580
     * @param \Plasma\ClientInterface   $client
581
     * @param \Plasma\CommandInterface  $command
582
     * @return \React\Promise\PromiseInterface
583
     */
584 5
    function runCommand(\Plasma\ClientInterface $client, \Plasma\CommandInterface $command) {
585 5
        if($this->goingAway) {
586 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
587
        }
588
        
589
        return (new \React\Promise\Promise(function (callable $resolve, callable $reject) use (&$client, &$command) {
590
            $command->once('end', function ($value = null) use (&$client, &$resolve) {
591 2
                if(!$this->transaction) {
592 2
                    $client->checkinConnection($this);
593
                }
594
                
595 2
                $resolve($value);
596 4
            });
597
            
598
            $command->once('error', function (\Throwable $error) use (&$client, &$reject) {
599 2
                if(!$this->transaction) {
600 2
                    $client->checkinConnection($this);
601
                }
602
                
603 2
                $reject($error);
604 4
            });
605
            
606 4
            $this->executeCommand($command);
607 4
        }));
608
    }
609
    
610
    /**
611
     * Executes a command.
612
     * @param \Plasma\CommandInterface  $command
613
     * @return void
614
     * @internal
615
     */
616 46
    function executeCommand(\Plasma\CommandInterface $command): void {
617 46
        $this->queue[] = $command;
618
        //\assert((\Plasma\Drivers\MySQL\Messages\MessageUtility::debug('Command '.get_class($command).' added to queue') || true));
619
        
620 46
        if($this->parser && $this->busy === static::STATE_IDLE) {
621
            //\assert((\Plasma\Drivers\MySQL\Messages\MessageUtility::debug('Command '.get_class($command).' invoked into parser') || true));
622 45
            $this->parser->invokeCommand($this->getNextCommand());
623
        }
624 46
    }
625
    
626
    /**
627
     * Get the handshake message, or null if none received yet.
628
     * @return \Plasma\Drivers\MySQL\Messages\HandshakeMessage|null
629
     */
630 45
    function getHandshake(): ?\Plasma\Drivers\MySQL\Messages\HandshakeMessage {
631 45
        if($this->parser) {
632 45
            return $this->parser->getHandshakeMessage();
633
        }
634
        
635
        return null;
636
    }
637
    
638
    /**
639
     * Get the next command, or null.
640
     * @return \Plasma\CommandInterface|null
641
     * @internal
642
     */
643 45
    function getNextCommand(): ?\Plasma\CommandInterface {
644 45
        if(\count($this->queue) === 0) {
645 38
            if($this->goingAway) {
646
                $this->goingAway->resolve();
647
            }
648
            
649 38
            return null;
650 45
        } elseif($this->busy === static::STATE_BUSY) {
651
            return null;
652
        }
653
        
654
        /** @var \Plasma\CommandInterface  $command */
655 45
        $command =  \array_shift($this->queue);
656
        
657
        //\assert((\Plasma\Drivers\MySQL\Messages\MessageUtility::debug('Unshifted command '.get_class($command)) || true));
658
        
659 45
        if($command->waitForCompletion()) {
660 45
            $this->busy = static::STATE_BUSY;
661
            
662
            $command->once('error', function () use (&$command) {
663
                //\assert((\Plasma\Drivers\MySQL\Messages\MessageUtility::debug('Command '.get_class($command).' errored') || true));
664 1
                $this->busy = static::STATE_IDLE;
665
                
666 1
                $this->endCommand();
667 45
            });
668
            
669
            $command->once('end', function () use (&$command) {
670
                //\assert((\Plasma\Drivers\MySQL\Messages\MessageUtility::debug('Command '.get_class($command).' ended') || true));
671 41
                $this->busy = static::STATE_IDLE;
672
                
673 41
                $this->endCommand();
674 45
            });
675
        } else {
676 34
            $this->endCommand();
677
        }
678
        
679 45
        return $command;
680
    }
681
    
682
    /**
683
     * Finishes up a command.
684
     * @return void
685
     */
686 41
    protected function endCommand() {
687
        $this->loop->futureTick(function () {
688 41
            if($this->goingAway && \count($this->queue) === 0) {
689 1
                return $this->goingAway->resolve();
690
            }
691
            
692 41
            $this->parser->invokeCommand($this->getNextCommand());
693 41
        });
694 41
    }
695
    
696
    /**
697
     * Starts the handshake process.
698
     * @param array                    $credentials
699
     * @param \React\Promise\Deferred  $deferred
700
     * @return void
701
     */
702 47
    protected function startHandshake(array $credentials, \React\Promise\Deferred $deferred) {
703
        $listener = function (\Plasma\Drivers\MySQL\Messages\MessageInterface $message) use ($credentials, &$deferred, &$listener) {
704 47
            if($message instanceof \Plasma\Drivers\MySQL\Messages\HandshakeMessage) {
705 47
                $this->parser->removeListener('message', $listener);
706
                
707 47
                $this->connectionState = static::CONNECTION_SETENV;
708 47
                $clientFlags = \Plasma\Drivers\MySQL\ProtocolParser::CLIENT_CAPABILITIES;
709
                
710 47
                \extract($credentials);
711
                
712 47
                if($db !== '') {
713 37
                    $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_CONNECT_WITH_DB;
714
                }
715
                
716 47
                if($this->charset === null) {
717 1
                    $this->charset = \Plasma\Drivers\MySQL\CharacterSetFlags::CHARSET_MAP[$message->characterSet] ?? 'latin1';
718
                }
719
                
720
                // Check if we support auth plugins
721 47
                $plugins = \Plasma\Drivers\MySQL\DriverFactory::getAuthPlugins();
722 47
                $plugin = null;
723
                
724 47
                foreach($plugins as $key => $plug) {
725 47
                    if(\is_int($key) && ($message->capability & $key) !== 0) {
726 47
                        $plugin = $plug;
727 47
                        $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_PLUGIN_AUTH;
728 47
                        break;
729
                    } elseif($key === $message->authPluginName) {
730
                        $plugin = $plug;
731
                        $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_PLUGIN_AUTH;
732
                        break;
733
                    }
734
                }
735
                
736 47
                $remote = \parse_url($this->connection->getRemoteAddress())['host'];
737
                
738 47
                if($remote !== '127.0.0.1' || $this->options['tls.forceLocal']) {
739
                    if(($message->capability & \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_SSL) !== 0) { // If SSL supported, connect through SSL
740
                        $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_SSL;
741
                        
742
                        $ssl = new \Plasma\Drivers\MySQL\Commands\SSLRequestCommand($message, $clientFlags);
743
                        
744
                        $ssl->once('end', function () use ($credentials, $clientFlags, $plugin, &$deferred, &$message) {
745
                            $this->connectionState = static::CONNECTION_SSL_STARTUP;
746
                            
747
                            $this->enableTLS()->then(function () use ($credentials, $clientFlags, $plugin, &$deferred, &$message) {
748
                                $this->createHandshakeResponse($message, $credentials, $clientFlags, $plugin, $deferred);
749
                            }, function (\Throwable $error) use (&$deferred) {
750
                                $deferred->reject($$error);
751
                                $this->connection->close();
752
                            });
753
                        });
754
                        
755
                        return $this->parser->invokeCommand($ssl);
756
                    } elseif($this->options['tls.force'] || $this->options['tls.forceLocal']) {
757
                        $deferred->reject((new \Plasma\Exception('TLS is not supported by the server')));
758
                        $this->connection->close();
759
                        return;
760
                    }
761
                }
762
                
763 47
                $this->createHandshakeResponse($message, $credentials, $clientFlags, $plugin, $deferred);
764
            }
765 47
        };
766
        
767 47
        $this->parser->on('message', $listener);
768
        
769
        $this->parser->on('message', function (\Plasma\Drivers\MySQL\Messages\MessageInterface $message) {
770 47
            if($message instanceof \Plasma\Drivers\MySQL\Messages\OkResponseMessage) {
771 46
                $this->connectionState = static::CONNECTION_OK;
772
            }
773
            
774 47
            $this->emit('eventRelay', array('message', $message));
775 47
        });
776 47
    }
777
    
778
    /**
779
     * Enables TLS on the connection.
780
     * @return \React\Promise\PromiseInterface
781
     */
782
    protected function enableTLS(): \React\Promise\PromiseInterface {
783
        // Set required SSL/TLS context options
784
        foreach($this->options['tls.context'] as $name => $value) {
785
            \stream_context_set_option($this->connection->stream, 'ssl', $name, $value);
786
        }
787
        
788
        return $this->encryption->enable($this->connection)->then(null, function (\Throwable $error) {
789
            $this->connection->close();
790
            throw new \RuntimeException('Connection failed during TLS handshake: '.$error->getMessage(), $error->getCode());
791
        });
792
    }
793
    
794
    /**
795
     * Sends the auth command.
796
     * @param \Plasma\Drivers\MySQL\Messages\HandshakeMessage  $message
797
     * @param array                                            $credentials
798
     * @param int                                              $clientFlags
799
     * @param string|null                                      $plugin
800
     * @param \React\Promise\Deferred                          $deferred
801
     * @return void
802
     */
803 47
    protected function createHandshakeResponse(
804
        \Plasma\Drivers\MySQL\Messages\HandshakeMessage $message, array $credentials, int $clientFlags, ?string $plugin, \React\Promise\Deferred $deferred
805
    ) {
806 47
        \extract($credentials);
807
        
808 47
        $auth = new \Plasma\Drivers\MySQL\Commands\HandshakeResponseCommand($this->parser, $message, $clientFlags, $plugin, $user, $password, $db);
809
        
810
        $auth->once('end', function () use (&$deferred) {
811 46
            $deferred->resolve();
812 47
        });
813
        
814
        $auth->once('error', function (\Throwable $error) use (&$deferred) {
815 1
            $deferred->reject($error);
816 1
            $this->connection->close();
817 47
        });
818
        
819 47
        if($plugin) {
820
            $listener = function (\Plasma\Drivers\MySQL\Messages\MessageInterface $message) use ($password, &$deferred, &$listener) {
821
                /** @var \Plasma\Drivers\MySQL\AuthPlugins\AuthPluginInterface|null  $plugin */
822 47
                static $plugin;
823
                
824 47
                if($message instanceof \Plasma\Drivers\MySQL\Messages\AuthSwitchRequestMessage) {
825
                    $name = $message->authPluginName;
826
                    
827
                    if($name !== null) {
828
                        $plugins = \Plasma\Drivers\MySQL\DriverFactory::getAuthPlugins();
829
                        foreach($plugins as $key => $plug) {
830
                            if($key === $name) {
831
                                $plugin = new $plug($this->parser, $this->parser->getHandshakeMessage());
832
                                
833
                                $command = new \Plasma\Drivers\MySQL\Commands\AuthSwitchResponseCommand($message, $plugin, $password);
834
                                return $this->parser->invokeCommand($command);
835
                            }
836
                        }
837
                    }
838
                    
839
                    $deferred->reject((new \Plasma\Exception('Requested authentication method '.($name ? '"'.$name.'" ' : '').'is not supported')));
840 47
                } elseif($message instanceof \Plasma\Drivers\MySQL\Messages\AuthMoreDataMessage) {
841
                    if($plugin === null) {
842
                        $deferred->reject((new \Plasma\Exception('No auth plugin is in use, but we received auth more data packet')));
843
                        return $this->connection->close();
844
                    }
845
                    
846
                    try {
847
                        $command = $plugin->receiveMoreData($message);
848
                        return $this->parser->invokeCommand($command);
849
                    } catch (\Plasma\Exception $e) {
850
                        $deferred->reject($e);
851
                        $this->connection->close();
852
                    }
853 47
                } elseif($message instanceof \Plasma\Drivers\MySQL\Messages\OkResponseMessage) {
854 46
                    $this->parser->removeListener('message', $listener);
855
                }
856 47
            };
857
            
858 47
            $this->parser->on('message', $listener);
859
        }
860
        
861 47
        $this->parser->invokeCommand($auth);
862 47
        $this->connectionState = static::CONNECTION_AWAITING_RESPONSE;
863 47
    }
864
    
865
    /**
866
     * Get the real charset from the DB charset.
867
     * @param string  $charset
868
     * @return string
869
     */
870 2
    protected function getRealCharset(string $charset): string {
871 2
        if(\substr($charset, 0, 4) === 'utf8') {
872 1
            return 'UTF-8';
873
        }
874
        
875 1
        $charsets = \mb_list_encodings();
876
        
877 1
        foreach($charsets as $set) {
878 1
            if(\stripos($set, $charset) === 0) {
879 1
                return $set;
880
            }
881
        }
882
        
883 1
        return 'UTF-8';
884
    }
885
    
886
    /**
887
     * Validates the given options.
888
     * @param array  $options
889
     * @return void
890
     * @throws \InvalidArgumentException
891
     */
892 68
    protected function validateOptions(array $options) {
893 68
        $validator = \CharlotteDunois\Validation\Validator::make($options, array(
894 68
            'characters.set' => 'string',
895
            'characters.collate' => 'string',
896
            'connector' => 'class:\React\Socket\ConnectorInterface=object',
897
            'tls.context' => 'array',
898
            'tls.force' => 'boolean',
899
            'tls.forceLocal' => 'boolean'
900
        ));
901
        
902 68
        $validator->throw(\InvalidArgumentException::class);
903 68
    }
904
}
905