GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Push — master ( 6047c1...12f59d )
by Charlotte
02:28
created

Driver::getConnectionState()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 2
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 1
nc 1
nop 0
dl 0
loc 2
ccs 2
cts 2
cp 1
crap 1
rs 10
c 0
b 0
f 0
1
<?php
2
/**
3
 * Plasma Driver MySQL component
4
 * Copyright 2018-2019 PlasmaPHP, All Rights Reserved
5
 *
6
 * Website: https://github.com/PlasmaPHP
7
 * License: https://github.com/PlasmaPHP/driver-mysql/blob/master/LICENSE
8
*/
9
10
namespace Plasma\Drivers\MySQL;
11
12
/**
13
 * The MySQL Driver.
14
 * @internal
15
 */
16
class Driver implements \Plasma\DriverInterface {
17
    use \Evenement\EventEmitterTrait;
18
    
19
    /**
20
     * @var \React\EventLoop\LoopInterface
21
     */
22
    protected $loop;
23
    
24
    /**
25
     * @var array
26
     */
27
    protected $options = array(
28
        'characters.set' => 'utf8mb4',
29
        'characters.collate' => null,
30
        'compression.enable' => !true,
31
        'tls.context' => array(),
32
        'tls.force' => true,
33
        'tls.forceLocal' => false
34
    );
35
    
36
    /**
37
     * @var string[]
38
     */
39
    protected $allowedSchemes = array('mysql', 'tcp', 'tls', 'unix');
40
    
41
    /**
42
     * @var \React\Socket\ConnectorInterface
43
     */
44
    protected $connector;
45
    
46
    /**
47
     * Internal class is intentional used, as there's no other way currently.
48
     * @var \React\Socket\StreamEncryption
49
     * @see https://github.com/reactphp/socket/issues/180
50
     */
51
    protected $encryption;
52
    
53
    /**
54
     * @var \React\Promise\Promise|null
55
     */
56
    protected $connectPromise;
57
    
58
    /**
59
     * @var \React\Socket\Connection
60
     */
61
    protected $connection;
62
    
63
    /**
64
     * @var int
65
     */
66
    protected $connectionState = \Plasma\DriverInterface::CONNECTION_CLOSED;
67
    
68
    /**
69
     * @var \Plasma\Drivers\MySQL\ProtocolParser
70
     */
71
    protected $parser;
72
    
73
    /**
74
     * @var array
75
     */
76
    protected $queue;
77
    
78
    /**
79
     * @var int
80
     */
81
    protected $busy = \Plasma\DriverInterface::STATE_IDLE;
82
    
83
    /**
84
     * @var bool
85
     */
86
    protected $transaction = false;
87
    
88
    /**
89
     * @var \React\Promise\Deferred
90
     */
91
    protected $goingAway;
92
    
93
    /**
94
     * @var string|null
95
     */
96
    protected $charset;
97
    
98
    /**
99
     * Constructor.
100
     * @param \React\EventLoop\LoopInterface  $loop
101
     * @param array                           $options
102
     */
103 87
    function __construct(\React\EventLoop\LoopInterface $loop, array $options) {
104 87
        $this->validateOptions($options);
105
        
106 87
        $this->loop = $loop;
107 87
        $this->options = \array_merge($this->options, $options);
108
        
109 87
        $this->connector = ($options['connector'] ?? (new \React\Socket\Connector($loop)));
110 87
        $this->encryption = new \React\Socket\StreamEncryption($this->loop, false);
111 87
        $this->queue = array();
112 87
    }
113
    
114
    /**
115
     * Destructor.
116
     * @return void
117
     * @internal
118
     */
119 24
    function __destruct() {
120 24
        $this->close();
121 24
    }
122
    
123
    /**
124
     * Returns the event loop.
125
     * @return \React\EventLoop\LoopInterface
126
     */
127 60
    function getLoop(): \React\EventLoop\LoopInterface {
128 60
        return $this->loop;
129
    }
130
    
131
    /**
132
     * Retrieves the current connection state.
133
     * @return int
134
     */
135 10
    function getConnectionState(): int {
136 10
        return $this->connectionState;
137
    }
138
    
139
    /**
140
     * Retrieves the current busy state.
141
     * @return int
142
     */
143 1
    function getBusyState(): int {
144 1
        return $this->busy;
145
    }
146
    
147
    /**
148
     * Get the length of the driver backlog queue.
149
     * @return int
150
     */
151 1
    function getBacklogLength(): int {
152 1
        return \count($this->queue);
153
    }
154
    
155
    /**
156
     * Connects to the given URI.
157
     * @param string  $uri
158
     * @return \React\Promise\PromiseInterface
159
     */
160 65
    function connect(string $uri): \React\Promise\PromiseInterface {
161 65
        if($this->goingAway || $this->connectionState === \Plasma\DriverInterface::CONNECTION_UNUSABLE) {
162 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
163 64
        } elseif($this->connectionState === \Plasma\DriverInterface::CONNECTION_OK) {
164 1
            return \React\Promise\resolve();
165 64
        } elseif($this->connectPromise !== null) {
166 1
            return $this->connectPromise;
167
        }
168
        
169 64
        $pos = \strpos($uri, '://');
170 64
        $unix = false;
0 ignored issues
show
Unused Code introduced by
The assignment to $unix is dead and can be removed.
Loading history...
171
        
172 64
        if($pos === false) {
173 2
            $uri = 'tcp://'.$uri;
174 62
        } elseif(\substr($uri, 0, $pos) === 'unix') {
175 1
            $defaultSocket = '/tmp/mysql.sock';
176
            
177 1
            $apos = \strpos($uri, '@');
178 1
            $spos = \strrpos($uri, '/');
179
            
180 1
            if($apos !== false) {
181 1
                $socket = \substr($uri, ($apos + 1), ($spos - ($apos + 1)));
182 1
                $uri = \substr($uri, 0, ($apos + 1)).'localhost'.\substr($uri, $spos);
183
            } else {
184
                $socket = \substr($uri, ($pos + 3), ($spos - ($pos + 3)));
185
                $uri = 'unix://localhost'.\substr($uri, $spos);
186
            }
187
            
188 1
            if($socket === 'localhost' || $socket === '127.0.0.1' || $socket === '::1') {
189
                $socket = $defaultSocket;
190
            }
191
        }
192
        
193 64
        $parts = \parse_url($uri);
194 64
        if(!isset($parts['scheme']) || !isset($parts['host']) || !\in_array($parts['scheme'], $this->allowedSchemes)) {
195 2
            return \React\Promise\reject((new \InvalidArgumentException('Invalid connect uri given')));
196
        }
197
        
198 62
        if(isset($socket)) {
199 1
            $parts['host'] = $socket;
200
        }
201
        
202 62
        if($parts['scheme'] === 'mysql') {
203 1
            $parts['scheme'] = 'tcp';
204
        }
205
        
206 62
        $host = $parts['scheme'].'://'.$parts['host'].($parts['scheme'] !== 'unix' ? ':'.($parts['port'] ?? 3306) : '');
207 62
        $this->connectionState = static::CONNECTION_STARTED;
208 62
        $resolved = false;
209
        
210
        $this->connectPromise =  $this->connector->connect($host)->then(function (\React\Socket\ConnectionInterface $connection) use ($parts, &$resolved) {
211
            // See description of property encryption
212 62
            if(!($connection instanceof \React\Socket\Connection)) {
213
                throw new \LogicException('Custom connection class is NOT supported yet (encryption limitation)');
214
            }
215
            
216 62
            $this->busy = static::STATE_BUSY;
217 62
            $this->connectionState = static::CONNECTION_MADE;
218 62
            $this->connection = $connection;
219
            
220
            $this->connection->on('error', function (\Throwable $error) {
221
                $this->emit('error', array($error));
222 62
            });
223
            
224
            $this->connection->on('close', function () {
225 3
                $this->connection = null;
226 3
                $this->connectionState = static::CONNECTION_UNUSABLE;
227
                
228 3
                $this->emit('close');
229 62
            });
230
            
231 62
            $deferred = new \React\Promise\Deferred();
232 62
            $this->parser = new \Plasma\Drivers\MySQL\ProtocolParser($this, $this->connection);
233
            
234
            $this->parser->on('error', function (\Throwable $error) use (&$deferred, &$resolved) {
235
                if($resolved) {
236
                    $this->emit('error', array($error));
237
                } else {
238
                    $deferred->reject($error);
239
                }
240 62
            });
241
            
242 62
            $user = ($parts['user'] ?? 'root');
243 62
            $password = ($parts['pass'] ?? '');
244 62
            $db = \ltrim(($parts['path'] ?? ''), '/');
245
            
246 62
            $credentials = \compact('user', 'password', 'db');
247
            
248 62
            $this->startHandshake($credentials, $deferred);
249
            return $deferred->promise()->then(function () use (&$resolved) {
250 61
                $this->busy = static::STATE_IDLE;
251 61
                $resolved = true;
252
                
253 61
                if(\count($this->queue) > 0) {
254
                    $this->parser->invokeCommand($this->getNextCommand());
255
                }
256 62
            });
257 62
        });
258
        
259 62
        if($this->options['characters.set']) {
260 60
            $this->charset = $this->options['characters.set'];
261
            $this->connectPromise = $this->connectPromise->then(function () {
262 59
                $query = 'SET NAMES "'.$this->options['characters.set'].'"'
263 59
                    .($this->options['characters.collate'] ? ' COLLATE "'.$this->options['characters.collate'].'"' : '');
264
                
265 59
                $cmd = new \Plasma\Drivers\MySQL\Commands\QueryCommand($this, $query);
266 59
                $this->executeCommand($cmd);
267
                
268 59
                return $cmd->getPromise();
269 60
            });
270
        }
271
        
272 62
        return $this->connectPromise;
273
    }
274
    
275
    /**
276
     * Pauses the underlying stream I/O consumption.
277
     * If consumption is already paused, this will do nothing.
278
     * @return bool  Whether the operation was successful.
279
     */
280 1
    function pauseStreamConsumption(): bool {
281 1
        if($this->connection === null || $this->goingAway) {
282 1
            return false;
283
        }
284
        
285
        $this->connection->pause();
286
        return true;
287
    }
288
    
289
    /**
290
     * Resumes the underlying stream I/O consumption.
291
     * If consumption is not paused, this will do nothing.
292
     * @return bool  Whether the operation was successful.
293
     */
294 1
    function resumeStreamConsumption(): bool {
295 1
        if($this->connection === null || $this->goingAway) {
296 1
            return false;
297
        }
298
        
299
        $this->connection->resume();
300
        return true;
301
    }
302
    
303
    /**
304
     * Closes all connections gracefully after processing all outstanding requests.
305
     * @return \React\Promise\PromiseInterface
306
     */
307 25
    function close(): \React\Promise\PromiseInterface {
308 25
        if($this->goingAway) {
309 11
            return $this->goingAway->promise();
310
        }
311
        
312 15
        $state = $this->connectionState;
313 15
        $this->connectionState = \Plasma\DriverInterface::CONNECTION_UNUSABLE;
314
        
315 15
        $this->goingAway = new \React\Promise\Deferred();
316
        
317 15
        if(\count($this->queue) === 0) {
318 14
            $this->goingAway->resolve();
319
        }
320
        
321
        return $this->goingAway->promise()->then(function () use ($state) {
322 15
            if($state !== static::CONNECTION_OK) {
323 14
                return;
324
            }
325
            
326 1
            $deferred = new \React\Promise\Deferred();
327
            
328 1
            $quit = new \Plasma\Drivers\MySQL\Commands\QuitCommand();
329
            
330
            $this->connection->once('close', function () use (&$deferred) {
331 1
                $deferred->resolve();
332 1
            });
333
            
334
            $quit->once('end', function () {
335
                $this->connection->close();
336 1
            });
337
            
338 1
            $this->parser->invokeCommand($quit);
339
            
340 1
            return $deferred->promise();
341 15
        });
342
    }
343
    
344
    /**
345
     * Forcefully closes the connection, without waiting for any outstanding requests. This will reject all outstanding requests.
346
     * @return void
347
     */
348 11
    function quit(): void {
349 11
        if($this->goingAway) {
350 1
            return;
351
        }
352
        
353 11
        $state = $this->connectionState;
354 11
        $this->connectionState = \Plasma\DriverInterface::CONNECTION_UNUSABLE;
355
        
356 11
        $this->goingAway = new \React\Promise\Deferred();
357 11
        $this->goingAway->resolve();
358
        
359
        /** @var \Plasma\Drivers\MySQL\Commands\CommandInterface  $command */
360 11
        while($command = \array_shift($this->queue)) {
361 1
            $command->emit('error', array((new \Plasma\Exception('Connection is going away'))));
362
        }
363
        
364 11
        if($state === static::CONNECTION_OK) {
365 1
            $quit = new \Plasma\Drivers\MySQL\Commands\QuitCommand();
366 1
            $this->parser->invokeCommand($quit);
367
            
368 1
            $this->connection->close();
369
        }
370 11
    }
371
    
372
    /**
373
     * Whether this driver is currently in a transaction.
374
     * @return bool
375
     */
376 2
    function isInTransaction(): bool {
377 2
        return $this->transaction;
378
    }
379
    
380
    /**
381
     * Executes a plain query. Resolves with a `QueryResultInterface` instance.
382
     * When the command is done, the driver must check itself back into the client.
383
     * @param \Plasma\ClientInterface  $client
384
     * @param string                   $query
385
     * @return \React\Promise\PromiseInterface
386
     * @throws \Plasma\Exception
387
     * @see \Plasma\QueryResultInterface
388
     */
389 48
    function query(\Plasma\ClientInterface $client, string $query): \React\Promise\PromiseInterface {
390 48
        if($this->goingAway) {
391 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
392 47
        } elseif($this->connectionState !== \Plasma\DriverInterface::CONNECTION_OK) {
393 1
            if($this->connectPromise !== null) {
394
                return $this->connectPromise->then(function () use (&$client, &$query) {
395
                    return $this->query($client, $query);
396
                });
397
            }
398
            
399 1
            throw new \Plasma\Exception('Unable to continue without connection');
400
        }
401
        
402 46
        $command = new \Plasma\Drivers\MySQL\Commands\QueryCommand($this, $query);
403 46
        $this->executeCommand($command);
404
        
405 46
        if(!$this->transaction) {
406
            $command->once('end', function () use (&$client) {
407 44
                $client->checkinConnection($this);
408 44
            });
409
        }
410
        
411 46
        return $command->getPromise();
412
    }
413
    
414
    /**
415
     * Prepares a query. Resolves with a `StatementInterface` instance.
416
     * When the command is done, the driver must check itself back into the client.
417
     * @param \Plasma\ClientInterface  $client
418
     * @param string                   $query
419
     * @return \React\Promise\PromiseInterface
420
     * @throws \Plasma\Exception
421
     * @see \Plasma\StatementInterface
422
     */
423 39
    function prepare(\Plasma\ClientInterface $client, string $query): \React\Promise\PromiseInterface {
424 39
        if($this->goingAway) {
425 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
426 38
        } elseif($this->connectionState !== \Plasma\DriverInterface::CONNECTION_OK) {
427 1
            if($this->connectPromise !== null) {
428
                return $this->connectPromise->then(function () use (&$client, &$query) {
429
                    return $this->prepare($client, $query);
430
                });
431
            }
432
            
433 1
            throw new \Plasma\Exception('Unable to continue without connection');
434
        }
435
        
436 37
        $command = new \Plasma\Drivers\MySQL\Commands\StatementPrepareCommand($client, $this, $query);
437 37
        $this->executeCommand($command);
438
        
439 37
        return $command->getPromise();
440
    }
441
    
442
    /**
443
     * Prepares and executes a query. Resolves with a `QueryResultInterface` instance.
444
     * This is equivalent to prepare -> execute -> close.
445
     * If you need to execute a query multiple times, prepare the query manually for performance reasons.
446
     * @param \Plasma\ClientInterface  $client
447
     * @param string                   $query
448
     * @param array                    $params
449
     * @return \React\Promise\PromiseInterface
450
     * @throws \Plasma\Exception
451
     * @see \Plasma\StatementInterface
452
     */
453 38
    function execute(\Plasma\ClientInterface $client, string $query, array $params = array()): \React\Promise\PromiseInterface {
454 38
        if($this->goingAway) {
455 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
456 37
        } elseif($this->connectionState !== \Plasma\DriverInterface::CONNECTION_OK) {
457 1
            if($this->connectPromise !== null) {
458
                return $this->connectPromise->then(function () use (&$client, &$query, $params) {
459
                    return $this->execute($client, $query, $params);
460
                });
461
            }
462
            
463 1
            throw new \Plasma\Exception('Unable to continue without connection');
464
        }
465
        
466
        return $this->prepare($client, $query)->then(function (\Plasma\StatementInterface $statement) use ($params) {
467
            return $statement->execute($params)->then(function (\Plasma\QueryResultInterface $result) use (&$statement) {
468 35
                if($result instanceof \Plasma\StreamQueryResultInterface) {
469
                    $statement->close()->then(null, function (\Throwable $error) {
470
                        $this->emit('error', array($error));
471 35
                    });
472
                    
473 35
                    return $result;
474
                }
475
                
476
                return $statement->close()->then(function () use ($result) {
477 33
                    return $result;
478 33
                });
479
            }, function (\Throwable $error) use (&$statement) {
480
                return $statement->close()->then(function () use ($error) {
481
                    throw $error;
482
                });
483 36
            });
484 36
        });
485
    }
486
    
487
    /**
488
     * Quotes the string for use in the query.
489
     * @param string  $str
490
     * @param int     $type  For types, see the constants.
491
     * @return string
492
     * @throws \LogicException  Thrown if the driver does not support quoting.
493
     * @throws \Plasma\Exception
494
     */
495 5
    function quote(string $str, int $type = \Plasma\DriverInterface::QUOTE_TYPE_VALUE): string {
496 5
        if($this->parser === null) {
497 1
            throw new \Plasma\Exception('Unable to continue without connection');
498
        }
499
        
500 4
        $message = $this->parser->getLastOkMessage();
501 4
        if($message === null) {
502
            $message = $this->parser->getHandshakeMessage();
503
            
504
            if($message === null) {
505
                throw new \Plasma\Exception('Unable to quote without a previous handshake');
506
            }
507
        }
508
        
509 4
        $pos = \strpos($this->charset, '_');
510 4
        $dbCharset = \substr($this->charset, 0, ($pos !== false ? $pos : \strlen($this->charset)));
511 4
        $realCharset = $this->getRealCharset($dbCharset);
512
        
513 4
        if(($message->statusFlags & \Plasma\Drivers\MySQL\StatusFlags::SERVER_STATUS_NO_BACKSLASH_ESCAPES) !== 0) {
514
            return $this->escapeUsingQuotes($realCharset, $str, $type);
515
        }
516
        
517 4
        return $this->escapeUsingBackslashes($realCharset, $str, $type);
518
    }
519
    
520
    /**
521
     * Escapes using quotes.
522
     * @param string  $realCharset
523
     * @param string  $str
524
     * @param int     $type
525
     * @return string
526
     */
527 2
    function escapeUsingQuotes(string $realCharset, string $str, int $type): string {
528 2
        if($type === \Plasma\DriverInterface::QUOTE_TYPE_IDENTIFIER) {
529 1
            return '`'.\str_replace('`', '``', $str).'`';
530
        }
531
        
532
        $escapeChars = array(
533 1
            '"',
534
            '\\',
535
        );
536
        
537
        $escapeReplace = array(
538 1
            '""',
539
            '\\\\',
540
        );
541
        
542 1
        return '"'.\str_replace($escapeChars, $escapeReplace, $str).'"';
543
    }
544
    
545
    /**
546
     * Escapes using backslashes.
547
     * @param string  $realCharset
548
     * @param string  $str
549
     * @param int     $type
550
     * @return string
551
     */
552 6
    function escapeUsingBackslashes(string $realCharset, string $str, int $type): string {
553 6
        if($type === \Plasma\DriverInterface::QUOTE_TYPE_IDENTIFIER) {
554 3
            return '`'.\str_replace('`', '\\`', $str).'`';
555
        }
556
        
557
        $escapeChars = array(
558 3
            '\\',
559
            '"',
560
        );
561
        
562
        $escapeReplace = array(
563 3
            '\\\\',
564
            '\"',
565
        );
566
        
567 3
        return '"'.\str_replace($escapeChars, $escapeReplace, $str).'"';
568
    }
569
    
570
    /**
571
     * Begins a transaction. Resolves with a `TransactionInterface` instance.
572
     *
573
     * Checks out a connection until the transaction gets committed or rolled back.
574
     * It must be noted that the user is responsible for finishing the transaction. The client WILL NOT automatically
575
     * check the connection back into the pool, as long as the transaction is not finished.
576
     *
577
     * Some databases, including MySQL, automatically issue an implicit COMMIT when a database definition language (DDL)
578
     * statement such as DROP TABLE or CREATE TABLE is issued within a transaction.
579
     * The implicit COMMIT will prevent you from rolling back any other changes within the transaction boundary.
580
     * @param \Plasma\ClientInterface  $client
581
     * @param int                      $isolation  See the `TransactionInterface` constants.
582
     * @return \React\Promise\PromiseInterface
583
     * @throws \Plasma\Exception
584
     * @see \Plasma\TransactionInterface
585
     */
586 3
    function beginTransaction(\Plasma\ClientInterface $client, int $isolation = \Plasma\TransactionInterface::ISOLATION_COMMITTED): \React\Promise\PromiseInterface {
587 3
        if($this->goingAway) {
588 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
589
        }
590
        
591 2
        if($this->transaction) {
592 1
            throw new \Plasma\Exception('Driver is already in transaction');
593
        }
594
        
595
        switch ($isolation) {
596 2
            case \Plasma\TransactionInterface::ISOLATION_UNCOMMITTED:
597
                $query = 'SET SESSION TRANSACTION ISOLATION LEVEL READ UNCOMMITTED';
598
            break;
599 2
            case \Plasma\TransactionInterface::ISOLATION_COMMITTED:
600 2
                $query = 'SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED';
601 2
            break;
602
            case \Plasma\TransactionInterface::ISOLATION_REPEATABLE:
603
                $query = 'SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ';
604
            break;
605
            case \Plasma\TransactionInterface::ISOLATION_SERIALIZABLE:
606
                $query = 'SET SESSION TRANSACTION ISOLATION LEVEL SERIALIZABLE';
607
            break;
608
            default:
609
                throw new \Plasma\Exception('Invalid isolation level given');
610
            break;
611
        }
612
        
613 2
        $this->transaction = true;
614
        
615
        return $this->query($client, $query)->then(function () use (&$client) {
616 2
            return $this->query($client, 'START TRANSACTION');
617
        })->then(function () use (&$client, $isolation) {
618 2
            return (new \Plasma\Transaction($client, $this, $isolation));
619
        })->then(null, function (\Throwable $e) {
620
            $this->transaction = false;
621
            throw $e;
622 2
        });
623
    }
624
    
625
    /**
626
     * Informationally closes a transaction. This method is used by `Transaction` to inform the driver of the end of the transaction.
627
     * @return void
628
     */
629 1
    function endTransaction(): void {
630 1
        $this->transaction = false;
631 1
    }
632
    
633
    /**
634
     * Runs the given command.
635
     * Returns a Promise, which resolves with the `end` event argument (defaults to `null),
636
     * or rejects with the `Throwable` of the `error` event.
637
     * When the command is done, the driver must check itself back into the client.
638
     * @param \Plasma\ClientInterface   $client
639
     * @param \Plasma\CommandInterface  $command
640
     * @return \React\Promise\PromiseInterface
641
     */
642 6
    function runCommand(\Plasma\ClientInterface $client, \Plasma\CommandInterface $command) {
643 6
        if($this->goingAway) {
644 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
645
        }
646
        
647
        return (new \React\Promise\Promise(function (callable $resolve, callable $reject) use (&$client, &$command) {
648
            $command->once('end', function ($value = null) use (&$client, &$resolve) {
649 3
                if(!$this->transaction) {
650 3
                    $client->checkinConnection($this);
651
                }
652
                
653 3
                $resolve($value);
654 5
            });
655
            
656
            $command->once('error', function (\Throwable $error) use (&$client, &$reject) {
657 2
                if(!$this->transaction) {
658 2
                    $client->checkinConnection($this);
659
                }
660
                
661 2
                $reject($error);
662 5
            });
663
            
664 5
            $this->executeCommand($command);
665 5
        }));
666
    }
667
    
668
    /**
669
     * Runs the given SQL querybuilder.
670
     * The driver CAN throw an exception if the given querybuilder is not supported.
671
     * An example would be a SQL querybuilder and a Cassandra driver.
672
     * @param \Plasma\ClientInterface           $client
673
     * @param \Plasma\SQLQuerybuilderInterface  $query
674
     * @return \React\Promise\PromiseInterface
675
     * @throws \Plasma\Exception
676
     */
677 3
    function runQuery(\Plasma\ClientInterface $client, \Plasma\QuerybuilderInterface $query): \React\Promise\PromiseInterface {
678 3
        if($this->goingAway) {
679 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
680
        }
681
        
682 2
        if(!($query instanceof \Plasma\SQLQuerybuilderInterface)) {
683 1
            throw new \Plasma\Exception('Given querybuilder must be a SQL querybuilder');
684
        }
685
        
686 1
        $sql = $query->getQuery();
687 1
        $params = $query->getParameters();
688
        
689 1
        return $this->execute($client, $sql, $params);
690
    }
691
    
692
    /**
693
     * Executes a command.
694
     * @param \Plasma\CommandInterface  $command
695
     * @return void
696
     * @internal
697
     */
698 60
    function executeCommand(\Plasma\CommandInterface $command): void {
699 60
        $this->queue[] = $command;
700
        
701 60
        if($this->parser && $this->busy === static::STATE_IDLE) {
702 59
            $this->parser->invokeCommand($this->getNextCommand());
703
        }
704 60
    }
705
    
706
    /**
707
     * Get the handshake message, or null if none received yet.
708
     * @return \Plasma\Drivers\MySQL\Messages\HandshakeMessage|null
709
     */
710 59
    function getHandshake(): ?\Plasma\Drivers\MySQL\Messages\HandshakeMessage {
711 59
        if($this->parser) {
712 59
            return $this->parser->getHandshakeMessage();
713
        }
714
        
715
        return null;
716
    }
717
    
718
    /**
719
     * Get the next command, or null.
720
     * @return \Plasma\CommandInterface|null
721
     * @internal
722
     */
723 59
    function getNextCommand(): ?\Plasma\CommandInterface {
724 59
        if(\count($this->queue) === 0) {
725 59
            if($this->goingAway) {
726
                $this->goingAway->resolve();
727
            }
728
            
729 59
            return null;
730 59
        } elseif($this->busy === static::STATE_BUSY) {
731
            return null;
732
        }
733
        
734
        /** @var \Plasma\CommandInterface  $command */
735 59
        $command =  \array_shift($this->queue);
736
        
737 59
        if($command->waitForCompletion()) {
738 59
            $this->busy = static::STATE_BUSY;
739
            
740
            $command->once('error', function () use (&$command) {
741 2
                $this->busy = static::STATE_IDLE;
742
                
743 2
                $this->endCommand();
744 59
            });
745
            
746
            $command->once('end', function () use (&$command) {
747 59
                $this->busy = static::STATE_IDLE;
748
                
749 59
                $this->endCommand();
750 59
            });
751
        } else {
752 35
            $this->endCommand();
753
        }
754
        
755 59
        return $command;
756
    }
757
    
758
    /**
759
     * Finishes up a command.
760
     * @return void
761
     */
762 59
    protected function endCommand() {
763
        $this->loop->futureTick(function () {
764 59
            if($this->goingAway && \count($this->queue) === 0) {
765 1
                return $this->goingAway->resolve();
766
            }
767
            
768 59
            $this->parser->invokeCommand($this->getNextCommand());
769 59
        });
770 59
    }
771
    
772
    /**
773
     * Starts the handshake process.
774
     * @param array                    $credentials
775
     * @param \React\Promise\Deferred  $deferred
776
     * @return void
777
     */
778 62
    protected function startHandshake(array $credentials, \React\Promise\Deferred $deferred) {
779
        $listener = function (\Plasma\Drivers\MySQL\Messages\MessageInterface $message) use ($credentials, &$deferred, &$listener) {
780 62
            if($message instanceof \Plasma\Drivers\MySQL\Messages\HandshakeMessage) {
781 62
                $this->parser->removeListener('message', $listener);
782
                
783 62
                $this->connectionState = static::CONNECTION_SETENV;
784 62
                $clientFlags = \Plasma\Drivers\MySQL\ProtocolParser::CLIENT_CAPABILITIES;
785
                
786 62
                \extract($credentials);
787
                
788 62
                if($db !== '') {
789 46
                    $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_CONNECT_WITH_DB;
790
                }
791
                
792 62
                if($this->charset === null) {
793 2
                    $this->charset = \Plasma\Drivers\MySQL\CharacterSetFlags::CHARSET_MAP[$message->characterSet] ?? 'latin1';
794
                }
795
                
796 62
                if(($message->capability & \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_COMPRESS) !== 0 && \extension_loaded('zlib') && $this->options['compression.enable']) {
797 2
                    $this->parser->enableCompression();
798 2
                    $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_COMPRESS;
799
                }
800
                
801
                // Check if we support auth plugins
802 62
                $plugins = \Plasma\Drivers\MySQL\DriverFactory::getAuthPlugins();
803 62
                $plugin = null;
804
                
805 62
                foreach($plugins as $key => $plug) {
806 62
                    if(\is_int($key) && ($message->capability & $key) !== 0) {
807 62
                        $plugin = $plug;
808 62
                        $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_PLUGIN_AUTH;
809 62
                        break;
810
                    } elseif($key === $message->authPluginName) {
811
                        $plugin = $plug;
812
                        $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_PLUGIN_AUTH;
813
                        break;
814
                    }
815
                }
816
                
817 62
                $remote = \parse_url($this->connection->getRemoteAddress());
818
                
819 62
                if($remote !== false && ($remote['host'] !== '127.0.0.1' || $this->options['tls.forceLocal'])) {
820
                    if(($message->capability & \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_SSL) !== 0) { // If SSL supported, connect through SSL
821
                        $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_SSL;
822
                        
823
                        $ssl = new \Plasma\Drivers\MySQL\Commands\SSLRequestCommand($message, $clientFlags);
824
                        
825
                        $ssl->once('end', function () use ($credentials, $clientFlags, $plugin, &$deferred, &$message) {
826
                            $this->connectionState = static::CONNECTION_SSL_STARTUP;
827
                            
828
                            $this->enableTLS()->then(function () use ($credentials, $clientFlags, $plugin, &$deferred, &$message) {
829
                                $this->createHandshakeResponse($message, $credentials, $clientFlags, $plugin, $deferred);
830
                            }, function (\Throwable $error) use (&$deferred) {
831
                                $deferred->reject($$error);
832
                                $this->connection->close();
833
                            });
834
                        });
835
                        
836
                        return $this->parser->invokeCommand($ssl);
837
                    } elseif($this->options['tls.force'] || $this->options['tls.forceLocal']) {
838
                        $deferred->reject((new \Plasma\Exception('TLS is not supported by the server')));
839
                        $this->connection->close();
840
                        return;
841
                    }
842
                }
843
                
844 62
                $this->createHandshakeResponse($message, $credentials, $clientFlags, $plugin, $deferred);
845
            }
846 62
        };
847
        
848 62
        $this->parser->on('message', $listener);
849
        
850
        $this->parser->on('message', function (\Plasma\Drivers\MySQL\Messages\MessageInterface $message) {
851 62
            if($message instanceof \Plasma\Drivers\MySQL\Messages\OkResponseMessage) {
852 61
                $this->connectionState = static::CONNECTION_OK;
853
            }
854
            
855 62
            $this->emit('eventRelay', array('message', $message));
856 62
        });
857 62
    }
858
    
859
    /**
860
     * Enables TLS on the connection.
861
     * @return \React\Promise\PromiseInterface
862
     */
863
    protected function enableTLS(): \React\Promise\PromiseInterface {
864
        // Set required SSL/TLS context options
865
        foreach($this->options['tls.context'] as $name => $value) {
866
            \stream_context_set_option($this->connection->stream, 'ssl', $name, $value);
867
        }
868
        
869
        return $this->encryption->enable($this->connection)->then(null, function (\Throwable $error) {
870
            $this->connection->close();
871
            throw new \RuntimeException('Connection failed during TLS handshake: '.$error->getMessage(), $error->getCode());
872
        });
873
    }
874
    
875
    /**
876
     * Sends the auth command.
877
     * @param \Plasma\Drivers\MySQL\Messages\HandshakeMessage  $message
878
     * @param array                                            $credentials
879
     * @param int                                              $clientFlags
880
     * @param string|null                                      $plugin
881
     * @param \React\Promise\Deferred                          $deferred
882
     * @return void
883
     */
884 62
    protected function createHandshakeResponse(
885
        \Plasma\Drivers\MySQL\Messages\HandshakeMessage $message, array $credentials, int $clientFlags, ?string $plugin, \React\Promise\Deferred $deferred
886
    ) {
887 62
        \extract($credentials);
888
        
889 62
        $auth = new \Plasma\Drivers\MySQL\Commands\HandshakeResponseCommand($this->parser, $message, $clientFlags, $plugin, $user, $password, $db);
890
        
891
        $auth->once('end', function () use (&$deferred) {
892
            $this->loop->futureTick(function () use (&$deferred) {
893 61
                $deferred->resolve();
894 61
            });
895 62
        });
896
        
897
        $auth->once('error', function (\Throwable $error) use (&$deferred) {
898 1
            $deferred->reject($error);
899 1
            $this->connection->close();
900 62
        });
901
        
902 62
        if($plugin) {
903
            $listener = function (\Plasma\Drivers\MySQL\Messages\MessageInterface $message) use ($password, &$deferred, &$listener) {
904
                /** @var \Plasma\Drivers\MySQL\AuthPlugins\AuthPluginInterface|null  $plugin */
905 62
                static $plugin;
906
                
907 62
                if($message instanceof \Plasma\Drivers\MySQL\Messages\AuthSwitchRequestMessage) {
908
                    $name = $message->authPluginName;
909
                    
910
                    if($name !== null) {
911
                        $plugins = \Plasma\Drivers\MySQL\DriverFactory::getAuthPlugins();
912
                        foreach($plugins as $key => $plug) {
913
                            if($key === $name) {
914
                                $plugin = new $plug($this->parser, $this->parser->getHandshakeMessage());
915
                                
916
                                $command = new \Plasma\Drivers\MySQL\Commands\AuthSwitchResponseCommand($message, $plugin, $password);
917
                                return $this->parser->invokeCommand($command);
918
                            }
919
                        }
920
                    }
921
                    
922
                    $deferred->reject((new \Plasma\Exception('Requested authentication method '.($name ? '"'.$name.'" ' : '').'is not supported')));
923 62
                } elseif($message instanceof \Plasma\Drivers\MySQL\Messages\AuthMoreDataMessage) {
924
                    if($plugin === null) {
925
                        $deferred->reject((new \Plasma\Exception('No auth plugin is in use, but we received auth more data packet')));
926
                        return $this->connection->close();
927
                    }
928
                    
929
                    try {
930
                        $command = $plugin->receiveMoreData($message);
931
                        return $this->parser->invokeCommand($command);
932
                    } catch (\Plasma\Exception $e) {
933
                        $deferred->reject($e);
934
                        $this->connection->close();
935
                    }
936 62
                } elseif($message instanceof \Plasma\Drivers\MySQL\Messages\OkResponseMessage) {
937 61
                    $this->parser->removeListener('message', $listener);
938
                }
939 62
            };
940
            
941 62
            $this->parser->on('message', $listener);
942
        }
943
        
944 62
        $this->parser->invokeCommand($auth);
945 62
        $this->connectionState = static::CONNECTION_AWAITING_RESPONSE;
946 62
    }
947
    
948
    /**
949
     * Get the real charset from the DB charset.
950
     * @param string  $charset
951
     * @return string
952
     */
953 4
    protected function getRealCharset(string $charset): string {
954 4
        if(\substr($charset, 0, 4) === 'utf8') {
955 2
            return 'UTF-8';
956
        }
957
        
958 2
        $charsets = \mb_list_encodings();
959
        
960 2
        foreach($charsets as $set) {
961 2
            if(\stripos($set, $charset) === 0) {
962 2
                return $set;
963
            }
964
        }
965
        
966 2
        return 'UTF-8';
967
    }
968
    
969
    /**
970
     * Validates the given options.
971
     * @param array  $options
972
     * @return void
973
     * @throws \InvalidArgumentException
974
     */
975 87
    protected function validateOptions(array $options) {
976 87
        \CharlotteDunois\Validation\Validator::make($options, array(
977 87
            'characters.set' => 'string',
978
            'characters.collate' => 'string',
979
            'compression.enable' => 'boolean',
980
            'connector' => 'class:'.\React\Socket\ConnectorInterface::class.'=object',
981
            'packet.maxAllowedSize' => 'integer|min:0|max:'.\Plasma\Drivers\MySQL\ProtocolParser::CLIENT_MAX_PACKET_SIZE,
982
            'tls.context' => 'array',
983
            'tls.force' => 'boolean',
984
            'tls.forceLocal' => 'boolean'
985 87
        ), true)->throw(\InvalidArgumentException::class);
986 87
    }
987
}
988