GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Test Failed
Push — master ( 754ed9...2515ed )
by Charlotte
02:29
created

Driver::close()   A

Complexity

Conditions 4
Paths 3

Size

Total Lines 34
Code Lines 18

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 15
CRAP Score 4.074

Importance

Changes 0
Metric Value
cc 4
eloc 18
nc 3
nop 0
dl 0
loc 34
ccs 15
cts 18
cp 0.8333
crap 4.074
rs 9.6666
c 0
b 0
f 0
1
<?php
2
/**
3
 * Plasma Driver MySQL component
4
 * Copyright 2018 PlasmaPHP, All Rights Reserved
5
 *
6
 * Website: https://github.com/PlasmaPHP
7
 * License: https://github.com/PlasmaPHP/driver-mysql/blob/master/LICENSE
8
*/
9
10
namespace Plasma\Drivers\MySQL;
11
12
/**
13
 * The MySQL Driver.
14
 * @internal
15
 */
16
class Driver implements \Plasma\DriverInterface {
17
    use \Evenement\EventEmitterTrait;
18
    
19
    /**
20
     * @var \React\EventLoop\LoopInterface
21
     */
22
    protected $loop;
23
    
24
    /**
25
     * @var array
26
     */
27
    protected $options = array(
28
        'characters.set' => 'utf8mb4',
29
        'characters.collate' => null,
30
        'tls.context' => array(),
31
        'tls.force' => true,
32
        'tls.forceLocal' => false
33
    );
34
    
35
    /**
36
     * @var \React\Socket\ConnectorInterface
37
     */
38
    protected $connector;
39
    
40
    /**
41
     * Internal class is intentional used, as there's no other way currently.
42
     * @var \React\Socket\StreamEncryption
43
     * @see https://github.com/reactphp/socket/issues/180
44
     */
45
    protected $encryption;
46
    
47
    /**
48
     * @var \React\Promise\Promise|null
49
     */
50
    protected $connectPromise;
51
    
52
    /**
53
     * @var \React\Socket\Connection
54
     */
55
    protected $connection;
56
    
57
    /**
58
     * @var int
59
     */
60
    protected $connectionState = \Plasma\DriverInterface::CONNECTION_CLOSED;
61
    
62
    /**
63
     * @var \Plasma\Drivers\MySQL\ProtocolParser
64
     */
65
    protected $parser;
66
    
67
    /**
68
     * @var array
69
     */
70
    protected $queue;
71
    
72
    /**
73
     * @var int
74
     */
75
    protected $busy = \Plasma\DriverInterface::STATE_IDLE;
76
    
77
    /**
78
     * @var bool
79
     */
80
    protected $transaction = false;
81
    
82
    /**
83
     * @var \React\Promise\Deferred
84
     */
85
    protected $goingAway;
86
    
87
    /**
88
     * @var string|null
89
     */
90
    protected $charset;
91
    
92
    /**
93
     * Constructor.
94
     * @param \React\EventLoop\LoopInterface  $loop
95
     * @param array                           $options
96
     */
97 70
    function __construct(\React\EventLoop\LoopInterface $loop, array $options) {
98 70
        $this->validateOptions($options);
99
        
100 70
        $this->loop = $loop;
101 70
        $this->options = \array_merge($this->options, $options);
102
        
103 70
        $this->connector = ($options['connector'] ?? (new \React\Socket\Connector($loop)));
104 70
        $this->encryption = new \React\Socket\StreamEncryption($this->loop, false);
105 70
        $this->queue = array();
106 70
    }
107
    
108
    /**
109
     * Returns the event loop.
110
     * @return \React\EventLoop\LoopInterface
111
     */
112 42
    function getLoop(): \React\EventLoop\LoopInterface {
113 42
        return $this->loop;
114
    }
115
    
116
    /**
117
     * Retrieves the current connection state.
118
     * @return int
119
     */
120 6
    function getConnectionState(): int {
121 6
        return $this->connectionState;
122
    }
123
    
124
    /**
125
     * Retrieves the current busy state.
126
     * @return int
127
     */
128 1
    function getBusyState(): int {
129 1
        return $this->busy;
130
    }
131
    
132
    /**
133
     * Get the length of the driver backlog queue.
134
     * @return int
135
     */
136 1
    function getBacklogLength(): int {
137 1
        return \count($this->queue);
138
    }
139
    
140
    /**
141
     * Connects to the given URI.
142
     * @param string  $uri
143
     * @return \React\Promise\PromiseInterface
144
     */
145 49
    function connect(string $uri): \React\Promise\PromiseInterface {
146 49
        if($this->goingAway || $this->connectionState === \Plasma\DriverInterface::CONNECTION_UNUSABLE) {
147 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
148 48
        } elseif($this->connectionState === \Plasma\DriverInterface::CONNECTION_OK) {
149 1
            return \React\Promise\resolve();
150 48
        } elseif($this->connectPromise !== null) {
151 1
            return $this->connectPromise;
152
        }
153
        
154 48
        $uri = 'mysql://'.\ltrim($uri, 'mysql://');
155
        
156 48
        $parts = \parse_url($uri);
157 48
        if(!isset($parts['host'])) {
158 1
            return \React\Promise\reject((new \InvalidArgumentException('Invalid connect uri given')));
159
        }
160
        
161 47
        $host = $parts['host'].':'.($parts['port'] ?? 3306);
162 47
        $this->connectionState = static::CONNECTION_STARTED;
163 47
        $resolved = false;
164
        
165
        $this->connectPromise =  $this->connector->connect($host)->then(function (\React\Socket\ConnectionInterface $connection) use ($parts, &$resolved) {
166
            // See description of property encryption
167 47
            if(!($connection instanceof \React\Socket\Connection)) {
168
                throw new \LogicException('Custom connection class is NOT supported yet (encryption limitation)');
169
            }
170
            
171 47
            $this->busy = static::STATE_BUSY;
172 47
            $this->connectionState = static::CONNECTION_MADE;
173 47
            $this->connection = $connection;
174
            
175
            $this->connection->on('error', function (\Throwable $error) {
176
                $this->emit('error', array($error));
177 47
            });
178
            
179
            $this->connection->on('close', function () {
180 3
                $this->connection = null;
181 3
                $this->connectionState = static::CONNECTION_UNUSABLE;
182
                
183 3
                $this->emit('close');
184 47
            });
185
            
186 47
            $deferred = new \React\Promise\Deferred();
187 47
            $this->parser = new \Plasma\Drivers\MySQL\ProtocolParser($this, $this->connection);
188
            
189
            $this->parser->on('error', function (\Throwable $error) use (&$deferred, &$resolved) {
190
                if($resolved) {
191
                    $this->emit('error', array($error));
192
                } else {
193
                    $deferred->reject($error);
194
                }
195 47
            });
196
            
197 47
            $user = ($parts['user'] ?? 'root');
198 47
            $password = ($parts['pass'] ?? '');
199 47
            $db = (!empty($parts['path']) ? \ltrim($parts['path'], '/') : '');
200
            
201 47
            $credentials = \compact('user', 'password', 'db');
202
            
203 47
            $this->startHandshake($credentials, $deferred);
204
            return $deferred->promise()->then(function () use (&$resolved) {
205 46
                $this->busy = static::STATE_IDLE;
206 46
                $resolved = true;
207
                
208 46
                if(\count($this->queue) > 0) {
209
                    $this->parser->invokeCommand($this->getNextCommand());
210
                }
211 47
            });
212 47
        });
213
        
214 47
        if($this->options['characters.set']) {
215 46
            $this->charset = $this->options['characters.set'];
216
            $this->connectPromise = $this->connectPromise->then(function () {
217 45
                $query = 'SET NAMES "'.$this->options['characters.set'].'"'
218 45
                    .($this->options['characters.collate'] ? ' COLLATE "'.$this->options['characters.collate'].'"' : '');
219
                
220 45
                $cmd = new \Plasma\Drivers\MySQL\Commands\QueryCommand($this, $query);
221 45
                $this->executeCommand($cmd);
222 46
            });
223
        }
224
        
225 47
        return $this->connectPromise;
226
    }
227
    
228
    /**
229
     * Pauses the underlying stream I/O consumption.
230
     * If consumption is already paused, this will do nothing.
231
     * @return bool  Whether the operation was successful.
232
     */
233 1
    function pauseStreamConsumption(): bool {
234 1
        if($this->connection === null || $this->goingAway) {
235 1
            return false;
236
        }
237
        
238
        $this->connection->pause();
239
        return true;
240
    }
241
    
242
    /**
243
     * Resumes the underlying stream I/O consumption.
244
     * If consumption is not paused, this will do nothing.
245
     * @return bool  Whether the operation was successful.
246
     */
247 1
    function resumeStreamConsumption(): bool {
248 1
        if($this->connection === null || $this->goingAway) {
249 1
            return false;
250
        }
251
        
252
        $this->connection->resume();
253
        return true;
254
    }
255
    
256
    /**
257
     * Closes all connections gracefully after processing all outstanding requests.
258
     * @return \React\Promise\PromiseInterface
259
     */
260 2
    function close(): \React\Promise\PromiseInterface {
261 2
        if($this->goingAway) {
262 2
            return $this->goingAway->promise();
263
        }
264
        
265 1
        $state = $this->connectionState;
266 1
        $this->connectionState = \Plasma\DriverInterface::CONNECTION_UNUSABLE;
267
        
268 1
        $this->goingAway = new \React\Promise\Deferred();
269
        
270 1
        if(\count($this->queue) === 0) {
271
            $this->goingAway->resolve();
272
        }
273
        
274
        return $this->goingAway->promise()->then(function () use ($state) {
275 1
            if($state !== static::CONNECTION_OK) {
276
                return;
277
            }
278
            
279 1
            $deferred = new \React\Promise\Deferred();
280
            
281 1
            $quit = new \Plasma\Drivers\MySQL\Commands\QuitCommand();
282
            
283
            $this->connection->once('close', function () use (&$deferred) {
284 1
                $deferred->resolve();
285 1
            });
286
            
287
            $quit->once('end', function () {
288
                $this->connection->close();
289 1
            });
290
            
291 1
            $this->parser->invokeCommand($quit);
292
            
293 1
            return $deferred->promise();
294 1
        });
295
    }
296
    
297
    /**
298
     * Forcefully closes the connection, without waiting for any outstanding requests. This will reject all outstanding requests.
299
     * @return void
300
     */
301 10
    function quit(): void {
302 10
        if($this->goingAway) {
303 1
            return;
304
        }
305
        
306 10
        $state = $this->connectionState;
307 10
        $this->connectionState = \Plasma\DriverInterface::CONNECTION_UNUSABLE;
308
        
309 10
        $this->goingAway = new \React\Promise\Deferred();
310 10
        $this->goingAway->resolve();
311
        
312
        /** @var \Plasma\Drivers\MySQL\Commands\CommandInterface  $command */
313 10
        while($command = \array_shift($this->queue)) {
314 1
            $command->emit('error', array((new \Plasma\Exception('Connection is going away'))));
315
        }
316
        
317 10
        if($state === static::CONNECTION_OK) {
318 1
            $quit = new \Plasma\Drivers\MySQL\Commands\QuitCommand();
319 1
            $this->parser->invokeCommand($quit);
320
            
321 1
            $this->connection->close();
322
        }
323 10
    }
324
    
325
    /**
326
     * Whether this driver is currently in a transaction.
327
     * @return bool
328
     */
329 2
    function isInTransaction(): bool {
330 2
        return $this->transaction;
331
    }
332
    
333
    /**
334
     * Executes a plain query. Resolves with a `QueryResultInterface` instance.
335
     * When the command is done, the driver must check itself back into the client.
336
     * @param \Plasma\ClientInterface  $client
337
     * @param string                   $query
338
     * @return \React\Promise\PromiseInterface
339
     * @throws \Plasma\Exception
340
     * @see \Plasma\QueryResultInterface
341
     */
342 39
    function query(\Plasma\ClientInterface $client, string $query): \React\Promise\PromiseInterface {
343 39
        if($this->goingAway) {
344 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
345 38
        } elseif($this->connectionState !== \Plasma\DriverInterface::CONNECTION_OK) {
346 1
            if($this->connectPromise !== null) {
347
                return $this->connectPromise->then(function () use (&$client, &$query) {
348
                    return $this->query($client, $query);
349
                });
350
            }
351
            
352 1
            throw new \Plasma\Exception('Unable to continue without connection');
353
        }
354
        
355 37
        $command = new \Plasma\Drivers\MySQL\Commands\QueryCommand($this, $query);
356 37
        $this->executeCommand($command);
357
        
358 37
        if(!$this->transaction) {
359
            $command->once('end', function () use (&$client) {
360 35
                $client->checkinConnection($this);
361 35
            });
362
        }
363
        
364 37
        return $command->getPromise();
365
    }
366
    
367
    /**
368
     * Prepares a query. Resolves with a `StatementInterface` instance.
369
     * When the command is done, the driver must check itself back into the client.
370
     * @param \Plasma\ClientInterface  $client
371
     * @param string                   $query
372
     * @return \React\Promise\PromiseInterface
373
     * @throws \Plasma\Exception
374
     * @see \Plasma\StatementInterface
375
     */
376 36
    function prepare(\Plasma\ClientInterface $client, string $query): \React\Promise\PromiseInterface {
377 36
        if($this->goingAway) {
378 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
379 35
        } elseif($this->connectionState !== \Plasma\DriverInterface::CONNECTION_OK) {
380 1
            if($this->connectPromise !== null) {
381
                return $this->connectPromise->then(function () use (&$client, &$query) {
382
                    return $this->prepare($client, $query);
383
                });
384
            }
385
            
386 1
            throw new \Plasma\Exception('Unable to continue without connection');
387
        }
388
        
389 34
        $command = new \Plasma\Drivers\MySQL\Commands\StatementPrepareCommand($client, $this, $query);
390 34
        $this->executeCommand($command);
391
        
392 34
        return $command->getPromise();
393
    }
394
    
395
    /**
396
     * Prepares and executes a query. Resolves with a `QueryResultInterface` instance.
397
     * This is equivalent to prepare -> execute -> close.
398
     * If you need to execute a query multiple times, prepare the query manually for performance reasons.
399
     * @param \Plasma\ClientInterface  $client
400
     * @param string                   $query
401
     * @param array                    $params
402
     * @return \React\Promise\PromiseInterface
403
     * @throws \Plasma\Exception
404
     * @see \Plasma\StatementInterface
405
     */
406 35
    function execute(\Plasma\ClientInterface $client, string $query, array $params = array()): \React\Promise\PromiseInterface {
407 35
        if($this->goingAway) {
408 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
409 34
        } elseif($this->connectionState !== \Plasma\DriverInterface::CONNECTION_OK) {
410 1
            if($this->connectPromise !== null) {
411
                return $this->connectPromise->then(function () use (&$client, &$query, $params) {
412
                    return $this->execute($client, $query, $params);
413
                });
414
            }
415
            
416 1
            throw new \Plasma\Exception('Unable to continue without connection');
417
        }
418
        
419
        return $this->prepare($client, $query)->then(function (\Plasma\StatementInterface $statement) use ($params) {
420
            return $statement->execute($params)->then(function (\Plasma\QueryResultInterface $result) use (&$statement) {
421 33
                if($result instanceof \Plasma\StreamQueryResultInterface) {
422
                    $statement->close(null, function (\Throwable $error) {
423
                        $this->emit('error', array($error));
424 33
                    });
425
                    
426 33
                    return $result;
427
                }
428
                
429
                return $statement->close()->then(function () use ($result) {
430 32
                    return $result;
431 32
                });
432
            }, function (\Throwable $error) use (&$statement) {
433
                return $statement->close()->then(function () use ($error) {
434
                    throw $error;
435
                });
436 33
            });
437 33
        });
438
    }
439
    
440
    /**
441
     * Quotes the string for use in the query.
442
     * @param string  $str
443
     * @param int     $type  For types, see the constants.
444
     * @return string
445
     * @throws \LogicException  Thrown if the driver does not support quoting.
446
     * @throws \Plasma\Exception
447
     */
448 3
    function quote(string $str, int $type = \Plasma\DriverInterface::QUOTE_TYPE_VALUE): string {
449 3
        if($this->parser === null) {
450 1
            throw new \Plasma\Exception('Unable to continue without connection');
451
        }
452
        
453 2
        $message = $this->parser->getLastOkMessage();
454 2
        if($message === null) {
455
            $message = $this->parser->getHandshakeMessage();
456
            
457
            if($message === null) {
458
                throw new \Plasma\Exception('Unable to quote without a previous handshake');
459
            }
460
        }
461
        
462 2
        $pos = \strpos($this->charset, '_');
463 2
        $dbCharset = \substr($this->charset, 0, ($pos !== false ? $pos : \strlen($this->charset)));
464 2
        $realCharset = $this->getRealCharset($dbCharset);
465
        
466 2
        if(($message->statusFlags & \Plasma\Drivers\MySQL\StatusFlags::SERVER_STATUS_NO_BACKSLASH_ESCAPES) !== 0) {
467
            return $this->escapeUsingQuotes($realCharset, $str, $type);
468
        }
469
        
470 2
        return $this->escapeUsingBackslashes($realCharset, $str, $type);
471
    }
472
    
473
    /**
474
     * Escapes using quotes.
475
     * @param string  $realCharset
476
     * @param string  $str
477
     * @param int     $type
478
     * @return string
479
     */
480 1
    function escapeUsingQuotes(string $realCharset, string $str, int $type): string {
481 1
        if($type === \Plasma\DriverInterface::QUOTE_TYPE_IDENTIFIER) {
482 1
            return '`'.\str_replace('`', '``', $str).'`';
483
        }
484
        
485
        $escapeChars = array(
486
            '"',
487
            '\\',
488
        );
489
        
490
        $escapeReplace = array(
491
            '""',
492
            '\\\\',
493
        );
494
        
495
        return '"'.\str_replace($escapeChars, $escapeReplace, $str).'"';
496
    }
497
    
498
    /**
499
     * Escapes using backslashes.
500
     * @param string  $realCharset
501
     * @param string  $str
502
     * @param int     $type
503
     * @return string
504
     */
505 3
    function escapeUsingBackslashes(string $realCharset, string $str, int $type): string {
506 3
        if($type === \Plasma\DriverInterface::QUOTE_TYPE_IDENTIFIER) {
507 1
            return '`'.\str_replace('`', '\\`', $str).'`';
508
        }
509
        
510
        $escapeChars = array(
511 2
            '\\',
512
            '"',
513
        );
514
        
515
        $escapeReplace = array(
516 2
            '\\\\',
517
            '\"',
518
        );
519
        
520 2
        return '"'.\str_replace($escapeChars, $escapeReplace, $str).'"';
521
    }
522
    
523
    /**
524
     * Begins a transaction. Resolves with a `TransactionInterface` instance.
525
     *
526
     * Checks out a connection until the transaction gets committed or rolled back.
527
     * It must be noted that the user is responsible for finishing the transaction. The client WILL NOT automatically
528
     * check the connection back into the pool, as long as the transaction is not finished.
529
     *
530
     * Some databases, including MySQL, automatically issue an implicit COMMIT when a database definition language (DDL)
531
     * statement such as DROP TABLE or CREATE TABLE is issued within a transaction.
532
     * The implicit COMMIT will prevent you from rolling back any other changes within the transaction boundary.
533
     * @param \Plasma\ClientInterface  $client
534
     * @param int                      $isolation  See the `TransactionInterface` constants.
535
     * @return \React\Promise\PromiseInterface
536
     * @throws \Plasma\Exception
537
     * @see \Plasma\TransactionInterface
538
     */
539 3
    function beginTransaction(\Plasma\ClientInterface $client, int $isolation = \Plasma\TransactionInterface::ISOLATION_COMMITTED): \React\Promise\PromiseInterface {
540 3
        if($this->goingAway) {
541 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
542
        }
543
        
544 2
        if($this->transaction) {
545 1
            throw new \Plasma\Exception('Driver is already in transaction');
546
        }
547
        
548
        switch ($isolation) {
549 2
            case \Plasma\TransactionInterface::ISOLATION_UNCOMMITTED:
550
                $query = 'SET SESSION TRANSACTION ISOLATION LEVEL READ UNCOMMITTED';
551
            break;
552 2
            case \Plasma\TransactionInterface::ISOLATION_COMMITTED:
553 2
                $query = 'SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED';
554 2
            break;
555
            case \Plasma\TransactionInterface::ISOLATION_REPEATABLE:
556
                $query = 'SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ';
557
            break;
558
            case \Plasma\TransactionInterface::ISOLATION_SERIALIZABLE:
559
                $query = 'SET SESSION TRANSACTION ISOLATION LEVEL SERIALIZABLE';
560
            break;
561
            default:
562
                throw new \Plasma\Exception('Invalid isolation level given');
563
            break;
564
        }
565
        
566 2
        $this->transaction = true;
567
        
568
        return $this->query($client, $query)->then(function () use (&$client) {
569 2
            return $this->query($client, 'START TRANSACTION');
570
        })->then(function () use (&$client, $isolation) {
571 2
            return (new \Plasma\Transaction($client, $this, $isolation));
572
        })->then(null, function (\Throwable $e) {
573
            $this->transaction = false;
574
            throw $e;
575 2
        });
576
    }
577
    
578
    /**
579
     * Informationally closes a transaction. This method is used by `Transaction` to inform the driver of the end of the transaction.
580
     * @return void
581
     */
582 1
    function endTransaction(): void {
583 1
        $this->transaction = false;
584 1
    }
585
    
586
    /**
587
     * Runs the given command.
588
     * Returns a Promise, which resolves with the `end` event argument (defaults to `null),
589
     * or rejects with the `Throwable` of the `error` event.
590
     * When the command is done, the driver must check itself back into the client.
591
     * @param \Plasma\ClientInterface   $client
592
     * @param \Plasma\CommandInterface  $command
593
     * @return \React\Promise\PromiseInterface
594
     */
595 5
    function runCommand(\Plasma\ClientInterface $client, \Plasma\CommandInterface $command) {
596 5
        if($this->goingAway) {
597 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
598
        }
599
        
600
        return (new \React\Promise\Promise(function (callable $resolve, callable $reject) use (&$client, &$command) {
601
            $command->once('end', function ($value = null) use (&$client, &$resolve) {
602 2
                if(!$this->transaction) {
603 2
                    $client->checkinConnection($this);
604
                }
605
                
606 2
                $resolve($value);
607 4
            });
608
            
609
            $command->once('error', function (\Throwable $error) use (&$client, &$reject) {
610 2
                if(!$this->transaction) {
611 2
                    $client->checkinConnection($this);
612
                }
613
                
614 2
                $reject($error);
615 4
            });
616
            
617 4
            $this->executeCommand($command);
618 4
        }));
619
    }
620
    
621
    /**
622
     * Executes a command.
623
     * @param \Plasma\CommandInterface  $command
624
     * @return void
625
     * @internal
626
     */
627 46
    function executeCommand(\Plasma\CommandInterface $command): void {
628 46
        $this->queue[] = $command;
629
        //\assert((\Plasma\Drivers\MySQL\Messages\MessageUtility::debug('Command '.get_class($command).' added to queue') || true));
630
        
631 46
        if($this->parser && $this->busy === static::STATE_IDLE) {
632
            //\assert((\Plasma\Drivers\MySQL\Messages\MessageUtility::debug('Command '.get_class($command).' invoked into parser') || true));
633 45
            $this->parser->invokeCommand($this->getNextCommand());
634
        }
635 46
    }
636
    
637
    /**
638
     * Get the handshake message, or null if none received yet.
639
     * @return \Plasma\Drivers\MySQL\Messages\HandshakeMessage|null
640
     */
641 45
    function getHandshake(): ?\Plasma\Drivers\MySQL\Messages\HandshakeMessage {
642 45
        if($this->parser) {
643 45
            return $this->parser->getHandshakeMessage();
644
        }
645
        
646
        return null;
647
    }
648
    
649
    /**
650
     * Get the next command, or null.
651
     * @return \Plasma\CommandInterface|null
652
     * @internal
653
     */
654 45
    function getNextCommand(): ?\Plasma\CommandInterface {
655 45
        if(\count($this->queue) === 0) {
656 38
            if($this->goingAway) {
657
                $this->goingAway->resolve();
658
            }
659
            
660 38
            return null;
661 45
        } elseif($this->busy === static::STATE_BUSY) {
662
            return null;
663
        }
664
        
665
        /** @var \Plasma\CommandInterface  $command */
666 45
        $command =  \array_shift($this->queue);
667
        
668
        //\assert((\Plasma\Drivers\MySQL\Messages\MessageUtility::debug('Unshifted command '.get_class($command)) || true));
669
        
670 45
        if($command->waitForCompletion()) {
671 45
            $this->busy = static::STATE_BUSY;
672
            
673
            $command->once('error', function () use (&$command) {
674
                //\assert((\Plasma\Drivers\MySQL\Messages\MessageUtility::debug('Command '.get_class($command).' errored') || true));
675 1
                $this->busy = static::STATE_IDLE;
676
                
677 1
                $this->endCommand();
678 45
            });
679
            
680
            $command->once('end', function () use (&$command) {
681
                //\assert((\Plasma\Drivers\MySQL\Messages\MessageUtility::debug('Command '.get_class($command).' ended') || true));
682 41
                $this->busy = static::STATE_IDLE;
683
                
684 41
                $this->endCommand();
685 45
            });
686
        } else {
687 34
            $this->endCommand();
688
        }
689
        
690 45
        return $command;
691
    }
692
    
693
    /**
694
     * Finishes up a command.
695
     * @return void
696
     */
697 41
    protected function endCommand() {
698
        $this->loop->futureTick(function () {
699 41
            if($this->goingAway && \count($this->queue) === 0) {
700 1
                return $this->goingAway->resolve();
701
            }
702
            
703 41
            $this->parser->invokeCommand($this->getNextCommand());
704 41
        });
705 41
    }
706
    
707
    /**
708
     * Starts the handshake process.
709
     * @param array                    $credentials
710
     * @param \React\Promise\Deferred  $deferred
711
     * @return void
712
     */
713 47
    protected function startHandshake(array $credentials, \React\Promise\Deferred $deferred) {
714
        $listener = function (\Plasma\Drivers\MySQL\Messages\MessageInterface $message) use ($credentials, &$deferred, &$listener) {
715 47
            if($message instanceof \Plasma\Drivers\MySQL\Messages\HandshakeMessage) {
716 47
                $this->parser->removeListener('message', $listener);
717
                
718 47
                $this->connectionState = static::CONNECTION_SETENV;
719 47
                $clientFlags = \Plasma\Drivers\MySQL\ProtocolParser::CLIENT_CAPABILITIES;
720
                
721 47
                \extract($credentials);
722
                
723 47
                if($db !== '') {
724 37
                    $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_CONNECT_WITH_DB;
725
                }
726
                
727 47
                if($this->charset === null) {
728 1
                    $this->charset = \Plasma\Drivers\MySQL\CharacterSetFlags::CHARSET_MAP[$message->characterSet] ?? 'latin1';
729
                }
730
                
731
                // Check if we support auth plugins
732 47
                $plugins = \Plasma\Drivers\MySQL\DriverFactory::getAuthPlugins();
733 47
                $plugin = null;
734
                
735 47
                foreach($plugins as $key => $plug) {
736 47
                    if(\is_int($key) && ($message->capability & $key) !== 0) {
737 47
                        $plugin = $plug;
738 47
                        $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_PLUGIN_AUTH;
739 47
                        break;
740
                    } elseif($key === $message->authPluginName) {
741
                        $plugin = $plug;
742
                        $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_PLUGIN_AUTH;
743
                        break;
744
                    }
745
                }
746
                
747 47
                $remote = \parse_url($this->connection->getRemoteAddress())['host'];
748
                
749 47
                if($remote !== '127.0.0.1' || $this->options['tls.forceLocal']) {
750
                    if(($message->capability & \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_SSL) !== 0) { // If SSL supported, connect through SSL
751
                        $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_SSL;
752
                        
753
                        $ssl = new \Plasma\Drivers\MySQL\Commands\SSLRequestCommand($message, $clientFlags);
754
                        
755
                        $ssl->once('end', function () use ($credentials, $clientFlags, $plugin, &$deferred, &$message) {
756
                            $this->connectionState = static::CONNECTION_SSL_STARTUP;
757
                            
758
                            $this->enableTLS()->then(function () use ($credentials, $clientFlags, $plugin, &$deferred, &$message) {
759
                                $this->createHandshakeResponse($message, $credentials, $clientFlags, $plugin, $deferred);
760
                            }, function (\Throwable $error) use (&$deferred) {
761
                                $deferred->reject($$error);
762
                                $this->connection->close();
763
                            });
764
                        });
765
                        
766
                        return $this->parser->invokeCommand($ssl);
767
                    } elseif($this->options['tls.force'] || $this->options['tls.forceLocal']) {
768
                        $deferred->reject((new \Plasma\Exception('TLS is not supported by the server')));
769
                        $this->connection->close();
770
                        return;
771
                    }
772
                }
773
                
774 47
                $this->createHandshakeResponse($message, $credentials, $clientFlags, $plugin, $deferred);
775
            }
776 47
        };
777
        
778 47
        $this->parser->on('message', $listener);
779
        
780
        $this->parser->on('message', function (\Plasma\Drivers\MySQL\Messages\MessageInterface $message) {
781 47
            if($message instanceof \Plasma\Drivers\MySQL\Messages\OkResponseMessage) {
782 46
                $this->connectionState = static::CONNECTION_OK;
783
            }
784
            
785 47
            $this->emit('eventRelay', array('message', $message));
786 47
        });
787 47
    }
788
    
789
    /**
790
     * Enables TLS on the connection.
791
     * @return \React\Promise\PromiseInterface
792
     */
793
    protected function enableTLS(): \React\Promise\PromiseInterface {
794
        // Set required SSL/TLS context options
795
        foreach($this->options['tls.context'] as $name => $value) {
796
            \stream_context_set_option($this->connection->stream, 'ssl', $name, $value);
797
        }
798
        
799
        return $this->encryption->enable($this->connection)->then(null, function (\Throwable $error) {
800
            $this->connection->close();
801
            throw new \RuntimeException('Connection failed during TLS handshake: '.$error->getMessage(), $error->getCode());
802
        });
803
    }
804
    
805
    /**
806
     * Sends the auth command.
807
     * @param \Plasma\Drivers\MySQL\Messages\HandshakeMessage  $message
808
     * @param array                                            $credentials
809
     * @param int                                              $clientFlags
810
     * @param string|null                                      $plugin
811
     * @param \React\Promise\Deferred                          $deferred
812
     * @return void
813
     */
814 47
    protected function createHandshakeResponse(
815
        \Plasma\Drivers\MySQL\Messages\HandshakeMessage $message, array $credentials, int $clientFlags, ?string $plugin, \React\Promise\Deferred $deferred
816
    ) {
817 47
        \extract($credentials);
818
        
819 47
        $auth = new \Plasma\Drivers\MySQL\Commands\HandshakeResponseCommand($this->parser, $message, $clientFlags, $plugin, $user, $password, $db);
820
        
821
        $auth->once('end', function () use (&$deferred) {
822 46
            $deferred->resolve();
823 47
        });
824
        
825
        $auth->once('error', function (\Throwable $error) use (&$deferred) {
826 1
            $deferred->reject($error);
827 1
            $this->connection->close();
828 47
        });
829
        
830 47
        if($plugin) {
831
            $listener = function (\Plasma\Drivers\MySQL\Messages\MessageInterface $message) use ($password, &$deferred, &$listener) {
832
                /** @var \Plasma\Drivers\MySQL\AuthPlugins\AuthPluginInterface|null  $plugin */
833 47
                static $plugin;
834
                
835 47
                if($message instanceof \Plasma\Drivers\MySQL\Messages\AuthSwitchRequestMessage) {
836
                    $name = $message->authPluginName;
837
                    
838
                    if($name !== null) {
839
                        $plugins = \Plasma\Drivers\MySQL\DriverFactory::getAuthPlugins();
840
                        foreach($plugins as $key => $plug) {
841
                            if($key === $name) {
842
                                $plugin = new $plug($this->parser, $this->parser->getHandshakeMessage());
843
                                
844
                                $command = new \Plasma\Drivers\MySQL\Commands\AuthSwitchResponseCommand($message, $plugin, $password);
845
                                return $this->parser->invokeCommand($command);
846
                            }
847
                        }
848
                    }
849
                    
850
                    $deferred->reject((new \Plasma\Exception('Requested authentication method '.($name ? '"'.$name.'" ' : '').'is not supported')));
851 47
                } elseif($message instanceof \Plasma\Drivers\MySQL\Messages\AuthMoreDataMessage) {
852
                    if($plugin === null) {
853
                        $deferred->reject((new \Plasma\Exception('No auth plugin is in use, but we received auth more data packet')));
854
                        return $this->connection->close();
855
                    }
856
                    
857
                    try {
858
                        $command = $plugin->receiveMoreData($message);
859
                        return $this->parser->invokeCommand($command);
860
                    } catch (\Plasma\Exception $e) {
861
                        $deferred->reject($e);
862
                        $this->connection->close();
863
                    }
864 47
                } elseif($message instanceof \Plasma\Drivers\MySQL\Messages\OkResponseMessage) {
865 46
                    $this->parser->removeListener('message', $listener);
866
                }
867 47
            };
868
            
869 47
            $this->parser->on('message', $listener);
870
        }
871
        
872 47
        $this->parser->invokeCommand($auth);
873 47
        $this->connectionState = static::CONNECTION_AWAITING_RESPONSE;
874 47
    }
875
    
876
    /**
877
     * Get the real charset from the DB charset.
878
     * @param string  $charset
879
     * @return string
880
     */
881 2
    protected function getRealCharset(string $charset): string {
882 2
        if(\substr($charset, 0, 4) === 'utf8') {
883 1
            return 'UTF-8';
884
        }
885
        
886 1
        $charsets = \mb_list_encodings();
887
        
888 1
        foreach($charsets as $set) {
889 1
            if(\stripos($set, $charset) === 0) {
890 1
                return $set;
891
            }
892
        }
893
        
894 1
        return 'UTF-8';
895
    }
896
    
897
    /**
898
     * Validates the given options.
899
     * @param array  $options
900
     * @return void
901
     * @throws \InvalidArgumentException
902
     */
903 70
    protected function validateOptions(array $options) {
904 70
        $validator = \CharlotteDunois\Validation\Validator::make($options, array(
905 70
            'characters.set' => 'string',
906
            'characters.collate' => 'string',
907
            'connector' => 'class:\React\Socket\ConnectorInterface=object',
908
            'tls.context' => 'array',
909
            'tls.force' => 'boolean',
910
            'tls.forceLocal' => 'boolean'
911
        ));
912
        
913 70
        $validator->throw(\InvalidArgumentException::class);
914 70
    }
915
}
916