GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Failed Conditions
Push — master ( 956460...cbcf1f )
by Charlotte
04:08
created

src/Driver.php (2 issues)

1
<?php
2
/**
3
 * Plasma Driver MySQL component
4
 * Copyright 2018 PlasmaPHP, All Rights Reserved
5
 *
6
 * Website: https://github.com/PlasmaPHP
7
 * License: https://github.com/PlasmaPHP/driver-mysql/blob/master/LICENSE
8
*/
9
10
namespace Plasma\Drivers\MySQL;
11
12
/**
13
 * The MySQL Driver.
14
 * @internal
15
 */
16
class Driver implements \Plasma\DriverInterface {
17
    use \Evenement\EventEmitterTrait;
18
    
19
    /**
20
     * @var \React\EventLoop\LoopInterface
21
     */
22
    protected $loop;
23
    
24
    /**
25
     * @var array
26
     */
27
    protected $options = array(
28
        'characters.set' => 'utf8mb4',
29
        'characters.collate' => null,
30
        'tls.context' => array(),
31
        'tls.force' => true,
32
        'tls.forceLocal' => false
33
    );
34
    
35
    /**
36
     * @var \React\Socket\ConnectorInterface
37
     */
38
    protected $connector;
39
    
40
    /**
41
     * Internal class is intentional used, as there's no other way currently.
42
     * @var \React\Socket\StreamEncryption
43
     * @see https://github.com/reactphp/socket/issues/180
44
     */
45
    protected $encryption;
46
    
47
    /**
48
     * @var \React\Promise\Promise|null
49
     */
50
    protected $connectPromise;
51
    
52
    /**
53
     * @var \React\Socket\Connection
54
     */
55
    protected $connection;
56
    
57
    /**
58
     * @var int
59
     */
60
    protected $connectionState = \Plasma\DriverInterface::CONNECTION_CLOSED;
61
    
62
    /**
63
     * @var \Plasma\Drivers\MySQL\ProtocolParser
64
     */
65
    protected $parser;
66
    
67
    /**
68
     * @var array
69
     */
70
    protected $queue;
71
    
72
    /**
73
     * @var int
74
     */
75
    protected $busy = \Plasma\DriverInterface::STATE_IDLE;
76
    
77
    /**
78
     * @var bool
79
     */
80
    protected $transaction = false;
81
    
82
    /**
83
     * @var \React\Promise\Deferred
84
     */
85
    protected $goingAway;
86
    
87
    /**
88
     * @var string|null
89
     */
90
    protected $charset;
91
    
92
    /**
93
     * Constructor.
94
     * @param \React\EventLoop\LoopInterface  $loop
95
     * @param array                           $options
96
     */
97 36
    function __construct(\React\EventLoop\LoopInterface $loop, array $options) {
98 36
        $this->validateOptions($options);
99
        
100 36
        $this->loop = $loop;
101 36
        $this->options = \array_merge($this->options, $options);
102
        
103 36
        $this->connector = ($options['connector'] ?? (new \React\Socket\Connector($loop)));
104 36
        $this->encryption = new \React\Socket\StreamEncryption($this->loop, false);
105 36
        $this->queue = array();
106 36
    }
107
    
108
    /**
109
     * Returns the event loop.
110
     * @return \React\EventLoop\LoopInterface
111
     */
112 10
    function getLoop(): \React\EventLoop\LoopInterface {
113 10
        return $this->loop;
114
    }
115
    
116
    /**
117
     * Retrieves the current connection state.
118
     * @return int
119
     */
120 6
    function getConnectionState(): int {
121 6
        return $this->connectionState;
122
    }
123
    
124
    /**
125
     * Retrieves the current busy state.
126
     * @return int
127
     */
128 1
    function getBusyState(): int {
129 1
        return $this->busy;
130
    }
131
    
132
    /**
133
     * Get the length of the driver backlog queue.
134
     * @return int
135
     */
136 1
    function getBacklogLength(): int {
137 1
        return \count($this->queue);
138
    }
139
    
140
    /**
141
     * Connects to the given URI.
142
     * @param string  $uri
143
     * @return \React\Promise\PromiseInterface
144
     */
145 17
    function connect(string $uri): \React\Promise\PromiseInterface {
146 17
        if($this->goingAway || $this->connectionState === \Plasma\DriverInterface::CONNECTION_UNUSABLE) {
147 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
148 16
        } elseif($this->connectionState === \Plasma\DriverInterface::CONNECTION_OK) {
149 1
            return \React\Promise\resolve();
150 16
        } elseif($this->connectPromise !== null) {
151 1
            return $this->connectPromise;
152
        }
153
        
154 16
        $uri = 'mysql://'.\ltrim($uri, 'mysql://');
155
        
156 16
        $parts = \parse_url($uri);
157 16
        if(!isset($parts['host'])) {
158 1
            return \React\Promise\reject((new \InvalidArgumentException('Invalid connect uri given')));
159
        }
160
        
161 15
        $host = $parts['host'].':'.($parts['port'] ?? 3306);
162 15
        $this->connectionState = static::CONNECTION_STARTED;
163 15
        $resolved = false;
164
        
165
        $this->connectPromise =  $this->connector->connect($host)->then(function (\React\Socket\ConnectionInterface $connection) use ($parts, &$resolved) {
166
            // See description of property encryption
167 15
            if(!($connection instanceof \React\Socket\Connection)) {
168
                throw new \LogicException('Custom connection class is NOT supported yet (encryption limitation)');
169
            }
170
            
171 15
            $this->busy = static::STATE_BUSY;
172 15
            $this->connectionState = static::CONNECTION_MADE;
173 15
            $this->connection = $connection;
174
            
175
            $this->connection->on('error', function (\Throwable $error) {
176
                $this->emit('error', array($error));
177 15
            });
178
            
179
            $this->connection->on('close', function () {
180 3
                $this->connection = null;
181 3
                $this->connectionState = static::CONNECTION_UNUSABLE;
182
                
183 3
                $this->emit('close');
184 15
            });
185
            
186 15
            $deferred = new \React\Promise\Deferred();
187 15
            $this->parser = new \Plasma\Drivers\MySQL\ProtocolParser($this, $this->connection);
188
            
189
            $this->parser->on('error', function (\Throwable $error) use (&$deferred, &$resolved) {
190
                if($resolved) {
191
                    $this->emit('error', array($error));
192
                } else {
193
                    $deferred->reject($error);
194
                }
195 15
            });
196
            
197 15
            $user = ($parts['user'] ?? 'root');
198 15
            $password = ($parts['pass'] ?? '');
199 15
            $db = (!empty($parts['path']) ? \ltrim($parts['path'], '/') : '');
200
            
201 15
            $credentials = \compact('user', 'password', 'db');
202
            
203 15
            $this->startHandshake($credentials, $deferred);
204
            return $deferred->promise()->then(function () use (&$resolved) {
205 14
                $this->busy = static::STATE_IDLE;
206 14
                $resolved = true;
207
                
208 14
                if(\count($this->queue) > 0) {
209
                    $this->parser->invokeCommand($this->getNextCommand());
210
                }
211 15
            });
212 15
        });
213
        
214 15
        if($this->options['characters.set']) {
215 14
            $this->charset = $this->options['characters.set'];
216
            $this->connectPromise = $this->connectPromise->then(function () {
217 13
                $query = 'SET NAMES "'.$this->options['characters.set'].'"'
218 13
                    .($this->options['characters.collate'] ? ' COLLATE "'.$this->options['characters.collate'].'"' : '');
219
                
220 13
                $cmd = new \Plasma\Drivers\MySQL\Commands\QueryCommand($this, $query);
221 13
                $this->executeCommand($cmd);
222 14
            });
223
        }
224
        
225 15
        return $this->connectPromise;
226
    }
227
    
228
    /**
229
     * Pauses the underlying stream I/O consumption.
230
     * If consumption is already paused, this will do nothing.
231
     * @return bool  Whether the operation was successful.
232
     */
233 1
    function pauseStreamConsumption(): bool {
234 1
        if($this->connection === null || $this->goingAway) {
235 1
            return false;
236
        }
237
        
238
        $this->connection->pause();
239
        return true;
240
    }
241
    
242
    /**
243
     * Resumes the underlying stream I/O consumption.
244
     * If consumption is not paused, this will do nothing.
245
     * @return bool  Whether the operation was successful.
246
     */
247 1
    function resumeStreamConsumption(): bool {
248 1
        if($this->connection === null || $this->goingAway) {
249 1
            return false;
250
        }
251
        
252
        $this->connection->resume();
253
        return true;
254
    }
255
    
256
    /**
257
     * Closes all connections gracefully after processing all outstanding requests.
258
     * @return \React\Promise\PromiseInterface
259
     */
260 2
    function close(): \React\Promise\PromiseInterface {
261 2
        if($this->goingAway) {
262 2
            return $this->goingAway->promise();
263
        }
264
        
265 1
        $state = $this->connectionState;
266 1
        $this->connectionState = \Plasma\DriverInterface::CONNECTION_UNUSABLE;
267
        
268 1
        $this->goingAway = new \React\Promise\Deferred();
269
        
270 1
        if(\count($this->queue) === 0) {
271
            $this->goingAway->resolve();
272
        }
273
        
274
        return $this->goingAway->promise()->then(function () use ($state) {
275 1
            if($state !== static::CONNECTION_OK) {
276
                return;
277
            }
278
            
279 1
            $deferred = new \React\Promise\Deferred();
280
            
281 1
            $quit = new \Plasma\Drivers\MySQL\Commands\QuitCommand();
282
            
283
            $this->connection->once('close', function () use (&$deferred) {
284 1
                $deferred->resolve();
285 1
            });
286
            
287
            $quit->once('end', function () {
288
                $this->connection->close();
289 1
            });
290
            
291 1
            $this->parser->invokeCommand($quit);
292
            
293 1
            return $deferred->promise();
294 1
        });
295
    }
296
    
297
    /**
298
     * Forcefully closes the connection, without waiting for any outstanding requests. This will reject all outstanding requests.
299
     * @return void
300
     */
301 10
    function quit(): void {
302 10
        if($this->goingAway) {
303 1
            return;
304
        }
305
        
306 10
        $state = $this->connectionState;
307 10
        $this->connectionState = \Plasma\DriverInterface::CONNECTION_UNUSABLE;
308
        
309 10
        $this->goingAway = new \React\Promise\Deferred();
310 10
        $this->goingAway->resolve();
311
        
312
        /** @var \Plasma\Drivers\MySQL\Commands\CommandInterface  $command */
313 10
        while($command = \array_shift($this->queue)) {
314 1
            $command->emit('error', array((new \Plasma\Exception('Connection is going away'))));
315
        }
316
        
317 10
        if($state === static::CONNECTION_OK) {
318 1
            $quit = new \Plasma\Drivers\MySQL\Commands\QuitCommand();
319 1
            $this->parser->invokeCommand($quit);
320
            
321 1
            $this->connection->close();
322
        }
323 10
    }
324
    
325
    /**
326
     * Whether this driver is currently in a transaction.
327
     * @return bool
328
     */
329 2
    function isInTransaction(): bool {
330 2
        return $this->transaction;
331
    }
332
    
333
    /**
334
     * Executes a plain query. Resolves with a `QueryResultInterface` instance.
335
     * When the command is done, the driver must check itself back into the client.
336
     * @param \Plasma\ClientInterface  $client
337
     * @param string                   $query
338
     * @return \React\Promise\PromiseInterface
339
     * @throws \Plasma\Exception
340
     * @see \Plasma\QueryResultInterface
341
     */
342 7
    function query(\Plasma\ClientInterface $client, string $query): \React\Promise\PromiseInterface {
343 7
        if($this->goingAway) {
344 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
345 6
        } elseif($this->connectionState !== \Plasma\DriverInterface::CONNECTION_OK) {
346 1
            if($this->connectPromise !== null) {
347
                return $this->connectPromise->then(function () use (&$client, &$query) {
348
                    return $this->query($client, $query);
349
                });
350
            }
351
            
352 1
            throw new \Plasma\Exception('Unable to continue without connection');
353
        }
354
        
355 5
        $command = new \Plasma\Drivers\MySQL\Commands\QueryCommand($this, $query);
356 5
        $this->executeCommand($command);
357
        
358 5
        if(!$this->transaction) {
359
            $command->once('end', function () use (&$client) {
360 3
                $client->checkinConnection($this);
361 3
            });
362
        }
363
        
364 5
        return $command->getPromise();
365
    }
366
    
367
    /**
368
     * Prepares a query. Resolves with a `StatementInterface` instance.
369
     * When the command is done, the driver must check itself back into the client.
370
     * @param \Plasma\ClientInterface  $client
371
     * @param string                   $query
372
     * @return \React\Promise\PromiseInterface
373
     * @throws \Plasma\Exception
374
     * @see \Plasma\StatementInterface
375
     */
376 4
    function prepare(\Plasma\ClientInterface $client, string $query): \React\Promise\PromiseInterface {
377 4
        if($this->goingAway) {
378 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
379 3
        } elseif($this->connectionState !== \Plasma\DriverInterface::CONNECTION_OK) {
380 1
            if($this->connectPromise !== null) {
381
                return $this->connectPromise->then(function () use (&$client, &$query) {
382
                    return $this->prepare($client, $query);
383
                });
384
            }
385
            
386 1
            throw new \Plasma\Exception('Unable to continue without connection');
387
        }
388
        
389 2
        $command = new \Plasma\Drivers\MySQL\Commands\StatementPrepareCommand($client, $this, $query);
390 2
        $this->executeCommand($command);
391
        
392 2
        return $command->getPromise();
393
    }
394
    
395
    /**
396
     * Prepares and executes a query. Resolves with a `QueryResultInterface` instance.
397
     * This is equivalent to prepare -> execute -> close.
398
     * If you need to execute a query multiple times, prepare the query manually for performance reasons.
399
     * @param \Plasma\ClientInterface  $client
400
     * @param string                   $query
401
     * @param array                    $params
402
     * @return \React\Promise\PromiseInterface
403
     * @throws \Plasma\Exception
404
     * @see \Plasma\StatementInterface
405
     */
406 3
    function execute(\Plasma\ClientInterface $client, string $query, array $params = array()): \React\Promise\PromiseInterface {
407 3
        if($this->goingAway) {
408 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
409 2
        } elseif($this->connectionState !== \Plasma\DriverInterface::CONNECTION_OK) {
410 1
            if($this->connectPromise !== null) {
411
                return $this->connectPromise->then(function () use (&$client, &$query, $params) {
412
                    return $this->execute($client, $query, $params);
413
                });
414
            }
415
            
416 1
            throw new \Plasma\Exception('Unable to continue without connection');
417
        }
418
        
419
        return $this->prepare($client, $query)->then(function (\Plasma\StatementInterface $statement) use ($params) {
420
            return $statement->execute($params)->then(function (\Plasma\QueryResultInterface $result) use (&$statement) {
421 1
                if($result instanceof \Plasma\StreamQueryResultInterface) {
422
                    $statement->close(null, function (\Throwable $error) {
423
                        $this->emit('error', array($error));
424 1
                    });
425
                    
426 1
                    return $result;
427
                }
428
                
429
                return $statement->close()->then(function () use ($result) {
430
                    return $result;
431
                });
432
            }, function (\Throwable $error) use (&$statement) {
433
                return $statement->close()->then(function () use ($error) {
434
                    throw $error;
435
                });
436 1
            });
437 1
        });
438
    }
439
    
440
    /**
441
     * Quotes the string for use in the query.
442
     * @param string  $str
443
     * @return string
444
     * @throws \LogicException  Thrown if the driver does not support quoting.
445
     * @throws \Plasma\Exception
446
     */
447 3
    function quote(string $str): string {
448 3
        if($this->parser === null) {
449 1
            throw new \Plasma\Exception('Unable to continue without connection');
450
        }
451
        
452 2
        $message = $this->parser->getLastOkMessage();
453 2
        if($message === null) {
454
            $message = $this->parser->getHandshakeMessage();
455
            
456
            if($message === null) {
457
                throw new \Plasma\Exception('Unable to quote without a previous handshake');
458
            }
459
        }
460
        
461 2
        $pos = \strpos($this->charset, '_');
462 2
        $dbCharset = \substr($this->charset, 0, ($pos !== false ? $pos : \strlen($this->charset)));
463 2
        $realCharset = $this->getRealCharset($dbCharset);
464
        
465 2
        if(($message->statusFlags & \Plasma\Drivers\MySQL\StatusFlags::SERVER_STATUS_NO_BACKSLASH_ESCAPES) !== 0) {
466
            return $this->escapeUsingQuotes($realCharset, $str);
467
        }
468
        
469 2
        return $this->escapeUsingBackslashes($realCharset, $str);
470
    }
471
    
472
    /**
473
     * Escapes using quotes.
474
     * @param string  $realCharset
475
     * @param string  $str
476
     * @return string
477
     */
478 1
    function escapeUsingQuotes(string $realCharset, string $str): string {
1 ignored issue
show
The parameter $realCharset is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

478
    function escapeUsingQuotes(/** @scrutinizer ignore-unused */ string $realCharset, string $str): string {

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
479
        $escapeChars = array(
480 1
            '"',
481
            '\\',
482
        );
483
        
484
        $escapeReplace = array(
485 1
            '""',
486
            '\\\\',
487
        );
488
        
489 1
        return '"'.\str_replace($escapeChars, $escapeReplace, $str).'"';
490
    }
491
    
492
    /**
493
     * Escapes using backslashes.
494
     * @param string  $realCharset
495
     * @param string  $str
496
     * @return string
497
     */
498 3
    function escapeUsingBackslashes(string $realCharset, string $str): string {
1 ignored issue
show
The parameter $realCharset is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

498
    function escapeUsingBackslashes(/** @scrutinizer ignore-unused */ string $realCharset, string $str): string {

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
499
        $escapeChars = array(
500 3
            '\\',
501
            '"',
502
        );
503
        
504
        $escapeReplace = array(
505 3
            '\\\\',
506
            '\"',
507
        );
508
        
509 3
        return '"'.\str_replace($escapeChars, $escapeReplace, $str).'"';
510
    }
511
    
512
    /**
513
     * Begins a transaction. Resolves with a `TransactionInterface` instance.
514
     *
515
     * Checks out a connection until the transaction gets committed or rolled back.
516
     * It must be noted that the user is responsible for finishing the transaction. The client WILL NOT automatically
517
     * check the connection back into the pool, as long as the transaction is not finished.
518
     *
519
     * Some databases, including MySQL, automatically issue an implicit COMMIT when a database definition language (DDL)
520
     * statement such as DROP TABLE or CREATE TABLE is issued within a transaction.
521
     * The implicit COMMIT will prevent you from rolling back any other changes within the transaction boundary.
522
     * @param \Plasma\ClientInterface  $client
523
     * @param int                      $isolation  See the `TransactionInterface` constants.
524
     * @return \React\Promise\PromiseInterface
525
     * @throws \Plasma\Exception
526
     * @see \Plasma\TransactionInterface
527
     */
528 3
    function beginTransaction(\Plasma\ClientInterface $client, int $isolation = \Plasma\TransactionInterface::ISOLATION_COMMITTED): \React\Promise\PromiseInterface {
529 3
        if($this->goingAway) {
530 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
531
        }
532
        
533 2
        if($this->transaction) {
534 1
            throw new \Plasma\Exception('Driver is already in transaction');
535
        }
536
        
537
        switch ($isolation) {
538 2
            case \Plasma\TransactionInterface::ISOLATION_UNCOMMITTED:
539
                $query = 'SET SESSION TRANSACTION ISOLATION LEVEL READ UNCOMMITTED';
540
            break;
541 2
            case \Plasma\TransactionInterface::ISOLATION_COMMITTED:
542 2
                $query = 'SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED';
543 2
            break;
544
            case \Plasma\TransactionInterface::ISOLATION_REPEATABLE:
545
                $query = 'SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ';
546
            break;
547
            case \Plasma\TransactionInterface::ISOLATION_SERIALIZABLE:
548
                $query = 'SET SESSION TRANSACTION ISOLATION LEVEL SERIALIZABLE';
549
            break;
550
            default:
551
                throw new \Plasma\Exception('Invalid isolation level given');
552
            break;
553
        }
554
        
555 2
        $this->transaction = true;
556
        
557
        return $this->query($client, $query)->then(function () use (&$client) {
558 2
            return $this->query($client, 'START TRANSACTION');
559
        })->then(function () use (&$client, $isolation) {
560 2
            return (new \Plasma\Transaction($client, $this, $isolation));
561
        })->then(null, function (\Throwable $e) {
562
            $this->transaction = false;
563
            throw $e;
564 2
        });
565
    }
566
    
567
    /**
568
     * Informationally closes a transaction. This method is used by `Transaction` to inform the driver of the end of the transaction.
569
     * @return void
570
     */
571 1
    function endTransaction(): void {
572 1
        $this->transaction = false;
573 1
    }
574
    
575
    /**
576
     * Runs the given command.
577
     * Returns a Promise, which resolves with the `end` event argument (defaults to `null),
578
     * or rejects with the `Throwable` of the `error` event.
579
     * When the command is done, the driver must check itself back into the client.
580
     * @param \Plasma\ClientInterface   $client
581
     * @param \Plasma\CommandInterface  $command
582
     * @return \React\Promise\PromiseInterface
583
     */
584 5
    function runCommand(\Plasma\ClientInterface $client, \Plasma\CommandInterface $command) {
585 5
        if($this->goingAway) {
586 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
587
        }
588
        
589
        return (new \React\Promise\Promise(function (callable $resolve, callable $reject) use (&$client, &$command) {
590
            $command->once('end', function ($value = null) use (&$client, &$resolve) {
591 2
                if(!$this->transaction) {
592 2
                    $client->checkinConnection($this);
593
                }
594
                
595 2
                $resolve($value);
596 4
            });
597
            
598
            $command->once('error', function (\Throwable $error) use (&$client, &$reject) {
599 2
                if(!$this->transaction) {
600 2
                    $client->checkinConnection($this);
601
                }
602
                
603 2
                $reject($error);
604 4
            });
605
            
606 4
            $this->executeCommand($command);
607 4
        }));
608
    }
609
    
610
    /**
611
     * Executes a command.
612
     * @param \Plasma\CommandInterface  $command
613
     * @return void
614
     * @internal
615
     */
616 14
    function executeCommand(\Plasma\CommandInterface $command): void {
617 14
        $this->queue[] = $command;
618
        //\assert((\Plasma\Drivers\MySQL\Messages\MessageUtility::debug('Command '.get_class($command).' added to queue') || true));
619
        
620 14
        if($this->parser && $this->busy === static::STATE_IDLE) {
621
            //\assert((\Plasma\Drivers\MySQL\Messages\MessageUtility::debug('Command '.get_class($command).' invoked into parser') || true));
622 13
            $this->parser->invokeCommand($this->getNextCommand());
623
        }
624 14
    }
625
    
626
    /**
627
     * Get the handshake message, or null if none received yet.
628
     * @return \Plasma\Drivers\MySQL\Messages\HandshakeMessage|null
629
     */
630 13
    function getHandshake(): ?\Plasma\Drivers\MySQL\Messages\HandshakeMessage {
631 13
        if($this->parser) {
632 13
            return $this->parser->getHandshakeMessage();
633
        }
634
        
635
        return null;
636
    }
637
    
638
    /**
639
     * Get the next command, or null.
640
     * @return \Plasma\CommandInterface|null
641
     * @internal
642
     */
643 13
    function getNextCommand(): ?\Plasma\CommandInterface {
644 13
        if(\count($this->queue) === 0) {
645 6
            if($this->goingAway) {
646
                $this->goingAway->resolve();
647
            }
648
            
649 6
            return null;
650 13
        } elseif($this->busy === static::STATE_BUSY) {
651
            return null;
652
        }
653
        
654
        /** @var \Plasma\CommandInterface  $command */
655 13
        $command =  \array_shift($this->queue);
656
        
657
        //\assert((\Plasma\Drivers\MySQL\Messages\MessageUtility::debug('Unshifted command '.get_class($command)) || true));
658
        
659 13
        if($command->waitForCompletion()) {
660 13
            $this->busy = static::STATE_BUSY;
661
            
662
            $command->once('error', function () use (&$command) {
663
                //\assert((\Plasma\Drivers\MySQL\Messages\MessageUtility::debug('Command '.get_class($command).' errored') || true));
664 1
                $this->busy = static::STATE_IDLE;
665
                
666 1
                $this->endCommand();
667 13
            });
668
            
669
            $command->once('end', function () use (&$command) {
670
                //\assert((\Plasma\Drivers\MySQL\Messages\MessageUtility::debug('Command '.get_class($command).' ended') || true));
671 9
                $this->busy = static::STATE_IDLE;
672
                
673 9
                $this->endCommand();
674 13
            });
675
        } else {
676 2
            $this->endCommand();
677
        }
678
        
679 13
        return $command;
680
    }
681
    
682
    /**
683
     * Finishes up a command.
684
     * @return void
685
     */
686 9
    protected function endCommand() {
687
        $this->loop->futureTick(function () {
688 9
            if($this->goingAway && \count($this->queue) === 0) {
689 1
                return $this->goingAway->resolve();
690
            }
691
            
692 9
            $this->parser->invokeCommand($this->getNextCommand());
693 9
        });
694 9
    }
695
    
696
    /**
697
     * Starts the handshake process.
698
     * @param array                    $credentials
699
     * @param \React\Promise\Deferred  $deferred
700
     * @return void
701
     */
702 15
    protected function startHandshake(array $credentials, \React\Promise\Deferred $deferred) {
703
        $listener = function (\Plasma\Drivers\MySQL\Messages\MessageInterface $message) use ($credentials, &$deferred, &$listener) {
704 15
            if($message instanceof \Plasma\Drivers\MySQL\Messages\HandshakeMessage) {
705 15
                $this->parser->removeListener('message', $listener);
706
                
707 15
                $this->connectionState = static::CONNECTION_SETENV;
708 15
                $clientFlags = \Plasma\Drivers\MySQL\ProtocolParser::CLIENT_CAPABILITIES;
709
                
710 15
                \extract($credentials);
711
                
712 15
                if($db !== '') {
713 5
                    $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_CONNECT_WITH_DB;
714
                }
715
                
716 15
                if($this->charset === null) {
717 1
                    $this->charset = \Plasma\Drivers\MySQL\CharacterSetFlags::CHARSET_MAP[$message->characterSet] ?? 'latin1';
718
                }
719
                
720
                // Check if we support auth plugins
721 15
                $plugins = \Plasma\Drivers\MySQL\DriverFactory::getAuthPlugins();
722 15
                $plugin = null;
723
                
724 15
                foreach($plugins as $key => $plug) {
725 15
                    if(\is_int($key) && ($message->capability & $key) !== 0) {
726 15
                        $plugin = $plug;
727 15
                        $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_PLUGIN_AUTH;
728 15
                        break;
729
                    } elseif($key === $message->authPluginName) {
730
                        $plugin = $plug;
731
                        $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_PLUGIN_AUTH;
732
                        break;
733
                    }
734
                }
735
                
736 15
                $remote = \parse_url($this->connection->getRemoteAddress())['host'];
737
                
738 15
                if($remote !== '127.0.0.1' || $this->options['tls.forceLocal']) {
739
                    if(($message->capability & \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_SSL) !== 0) { // If SSL supported, connect through SSL
740
                        $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_SSL;
741
                        
742
                        $ssl = new \Plasma\Drivers\MySQL\Commands\SSLRequestCommand($message, $clientFlags);
743
                        
744
                        $ssl->once('end', function () use ($credentials, $clientFlags, $plugin, &$deferred, &$message) {
745
                            $this->connectionState = static::CONNECTION_SSL_STARTUP;
746
                            
747
                            $this->enableTLS()->then(function () use ($credentials, $clientFlags, $plugin, &$deferred, &$message) {
748
                                $this->createHandshakeResponse($message, $credentials, $clientFlags, $plugin, $deferred);
749
                            }, function (\Throwable $error) use (&$deferred) {
750
                                $deferred->reject($$error);
751
                                $this->connection->close();
752
                            });
753
                        });
754
                        
755
                        return $this->parser->invokeCommand($ssl);
756
                    } elseif($this->options['tls.force'] || $this->options['tls.forceLocal']) {
757
                        $deferred->reject((new \Plasma\Exception('TLS is not supported by the server')));
758
                        $this->connection->close();
759
                        return;
760
                    }
761
                }
762
                
763 15
                $this->createHandshakeResponse($message, $credentials, $clientFlags, $plugin, $deferred);
764
            }
765 15
        };
766
        
767 15
        $this->parser->on('message', $listener);
768
        
769
        $this->parser->on('message', function (\Plasma\Drivers\MySQL\Messages\MessageInterface $message) {
770 15
            if($message instanceof \Plasma\Drivers\MySQL\Messages\OkResponseMessage) {
771 14
                $this->connectionState = static::CONNECTION_OK;
772
            }
773
            
774 15
            $this->emit('eventRelay', array('message', $message));
775 15
        });
776 15
    }
777
    
778
    /**
779
     * Enables TLS on the connection.
780
     * @return \React\Promise\PromiseInterface
781
     */
782
    protected function enableTLS(): \React\Promise\PromiseInterface {
783
        // Set required SSL/TLS context options
784
        foreach($this->options['tls.context'] as $name => $value) {
785
            \stream_context_set_option($this->connection->stream, 'ssl', $name, $value);
786
        }
787
        
788
        return $this->encryption->enable($this->connection)->then(null, function (\Throwable $error) {
789
            $this->connection->close();
790
            throw new \RuntimeException('Connection failed during TLS handshake: '.$error->getMessage(), $error->getCode());
791
        });
792
    }
793
    
794
    /**
795
     * Sends the auth command.
796
     * @param \Plasma\Drivers\MySQL\Messages\HandshakeMessage  $message
797
     * @param array                                            $credentials
798
     * @param int                                              $clientFlags
799
     * @param string|null                                      $plugin
800
     * @param \React\Promise\Deferred                          $deferred
801
     * @return void
802
     */
803 15
    protected function createHandshakeResponse(
804
        \Plasma\Drivers\MySQL\Messages\HandshakeMessage $message, array $credentials, int $clientFlags, ?string $plugin, \React\Promise\Deferred $deferred
805
    ) {
806 15
        \extract($credentials);
807
        
808 15
        $auth = new \Plasma\Drivers\MySQL\Commands\HandshakeResponseCommand($this->parser, $message, $clientFlags, $plugin, $user, $password, $db);
809
        
810
        $auth->once('end', function () use (&$deferred) {
811 14
            $deferred->resolve();
812 15
        });
813
        
814
        $auth->once('error', function (\Throwable $error) use (&$deferred) {
815 1
            $deferred->reject($error);
816 1
            $this->connection->close();
817 15
        });
818
        
819 15
        if($plugin) {
820
            $listener = function (\Plasma\Drivers\MySQL\Messages\MessageInterface $message) use ($password, &$deferred, &$listener) {
821
                /** @var \Plasma\Drivers\MySQL\AuthPlugins\AuthPluginInterface|null  $plugin */
822 15
                static $plugin;
823
                
824 15
                if($message instanceof \Plasma\Drivers\MySQL\Messages\AuthSwitchRequestMessage) {
825
                    $name = $message->authPluginName;
826
                    
827
                    if($name !== null) {
828
                        $plugins = \Plasma\Drivers\MySQL\DriverFactory::getAuthPlugins();
829
                        foreach($plugins as $key => $plug) {
830
                            if($key === $name) {
831
                                $plugin = new $plug($this->parser, $this->parser->getHandshakeMessage());
832
                                
833
                                $command = new \Plasma\Drivers\MySQL\Commands\AuthSwitchResponseCommand($message, $plugin, $password);
834
                                return $this->parser->invokeCommand($command);
835
                            }
836
                        }
837
                    }
838
                    
839
                    $deferred->reject((new \Plasma\Exception('Requested authentication method '.($name ? '"'.$name.'" ' : '').'is not supported')));
840 15
                } elseif($message instanceof \Plasma\Drivers\MySQL\Messages\AuthMoreDataMessage) {
841
                    if($plugin === null) {
842
                        $deferred->reject((new \Plasma\Exception('No auth plugin is in use, but we received auth more data packet')));
843
                        return $this->connection->close();
844
                    }
845
                    
846
                    try {
847
                        $command = $plugin->receiveMoreData($message);
848
                        return $this->parser->invokeCommand($command);
849
                    } catch (\Plasma\Exception $e) {
850
                        $deferred->reject($e);
851
                        $this->connection->close();
852
                    }
853 15
                } elseif($message instanceof \Plasma\Drivers\MySQL\Messages\OkResponseMessage) {
854 14
                    $this->parser->removeListener('message', $listener);
855
                }
856 15
            };
857
            
858 15
            $this->parser->on('message', $listener);
859
        }
860
        
861 15
        $this->parser->invokeCommand($auth);
862 15
        $this->connectionState = static::CONNECTION_AWAITING_RESPONSE;
863 15
    }
864
    
865
    /**
866
     * Get the real charset from the DB charset.
867
     * @param string  $charset
868
     * @return string
869
     */
870 2
    protected function getRealCharset(string $charset): string {
871 2
        if(\substr($charset, 0, 4) === 'utf8') {
872 1
            return 'UTF-8';
873
        }
874
        
875 1
        $charsets = \mb_list_encodings();
876
        
877 1
        foreach($charsets as $set) {
878 1
            if(\stripos($set, $charset) === 0) {
879 1
                return $set;
880
            }
881
        }
882
        
883 1
        return 'UTF-8';
884
    }
885
    
886
    /**
887
     * Validates the given options.
888
     * @param array  $options
889
     * @return void
890
     * @throws \InvalidArgumentException
891
     */
892 36
    protected function validateOptions(array $options) {
893 36
        $validator = \CharlotteDunois\Validation\Validator::make($options, array(
894 36
            'characters.set' => 'string',
895
            'characters.collate' => 'string',
896
            'connector' => 'class:\React\Socket\ConnectorInterface=object',
897
            'tls.context' => 'array',
898
            'tls.force' => 'boolean',
899
            'tls.forceLocal' => 'boolean'
900
        ));
901
        
902 36
        $validator->throw(\InvalidArgumentException::class);
903 36
    }
904
}
905