GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Push — master ( 1b0f9b...f731b0 )
by Charlotte
04:44 queued 02:01
created

Driver::query()   A

Complexity

Conditions 5
Paths 5

Size

Total Lines 23
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 30

Importance

Changes 0
Metric Value
cc 5
eloc 13
nc 5
nop 2
dl 0
loc 23
ccs 0
cts 14
cp 0
crap 30
rs 9.5222
c 0
b 0
f 0
1
<?php
2
/**
3
 * Plasma Driver MySQL component
4
 * Copyright 2018-2019 PlasmaPHP, All Rights Reserved
5
 *
6
 * Website: https://github.com/PlasmaPHP
7
 * License: https://github.com/PlasmaPHP/driver-mysql/blob/master/LICENSE
8
*/
9
10
namespace Plasma\Drivers\MySQL;
11
12
/**
13
 * The MySQL Driver.
14
 * @internal
15
 */
16
class Driver implements \Plasma\DriverInterface {
17
    use \Evenement\EventEmitterTrait;
18
    
19
    /**
20
     * @var \React\EventLoop\LoopInterface
21
     */
22
    protected $loop;
23
    
24
    /**
25
     * @var array
26
     */
27
    protected $options = array(
28
        'characters.set' => 'utf8mb4',
29
        'characters.collate' => null,
30
        'compression.enable' => true,
31
        'tls.context' => array(),
32
        'tls.force' => true,
33
        'tls.forceLocal' => false
34
    );
35
    
36
    /**
37
     * @var string[]
38
     */
39
    protected $allowedSchemes = array('mysql', 'tcp', 'tls', 'unix');
40
    
41
    /**
42
     * @var \React\Socket\ConnectorInterface
43
     */
44
    protected $connector;
45
    
46
    /**
47
     * Internal class is intentional used, as there's no other way currently.
48
     * @var \React\Socket\StreamEncryption
49
     * @see https://github.com/reactphp/socket/issues/180
50
     */
51
    protected $encryption;
52
    
53
    /**
54
     * @var \React\Promise\Promise|null
55
     */
56
    protected $connectPromise;
57
    
58
    /**
59
     * @var \React\Socket\Connection
60
     */
61
    protected $connection;
62
    
63
    /**
64
     * @var int
65
     */
66
    protected $connectionState = \Plasma\DriverInterface::CONNECTION_CLOSED;
67
    
68
    /**
69
     * @var \Plasma\Drivers\MySQL\ProtocolParser
70
     */
71
    protected $parser;
72
    
73
    /**
74
     * @var array
75
     */
76
    protected $queue;
77
    
78
    /**
79
     * @var int
80
     */
81
    protected $busy = \Plasma\DriverInterface::STATE_IDLE;
82
    
83
    /**
84
     * @var bool
85
     */
86
    protected $transaction = false;
87
    
88
    /**
89
     * @var \React\Promise\Deferred
90
     */
91
    protected $goingAway;
92
    
93
    /**
94
     * @var string|null
95
     */
96
    protected $charset;
97
    
98
    /**
99
     * Constructor.
100
     * @param \React\EventLoop\LoopInterface  $loop
101
     * @param array                           $options
102
     */
103
    function __construct(\React\EventLoop\LoopInterface $loop, array $options) {
104
        $this->validateOptions($options);
105
        
106
        $this->loop = $loop;
107
        $this->options = \array_merge($this->options, $options);
108
        
109
        $this->connector = ($options['connector'] ?? (new \React\Socket\Connector($loop)));
110
        $this->encryption = new \React\Socket\StreamEncryption($this->loop, false);
111
        $this->queue = array();
112
    }
113
    
114
    /**
115
     * Returns the event loop.
116
     * @return \React\EventLoop\LoopInterface
117
     */
118
    function getLoop(): \React\EventLoop\LoopInterface {
119
        return $this->loop;
120
    }
121
    
122
    /**
123
     * Retrieves the current connection state.
124
     * @return int
125
     */
126
    function getConnectionState(): int {
127
        return $this->connectionState;
128
    }
129
    
130
    /**
131
     * Retrieves the current busy state.
132
     * @return int
133
     */
134
    function getBusyState(): int {
135
        return $this->busy;
136
    }
137
    
138
    /**
139
     * Get the length of the driver backlog queue.
140
     * @return int
141
     */
142
    function getBacklogLength(): int {
143
        return \count($this->queue);
144
    }
145
    
146
    /**
147
     * Connects to the given URI.
148
     * @param string  $uri
149
     * @return \React\Promise\PromiseInterface
150
     */
151
    function connect(string $uri): \React\Promise\PromiseInterface {
152
        if($this->goingAway || $this->connectionState === \Plasma\DriverInterface::CONNECTION_UNUSABLE) {
153
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
154
        } elseif($this->connectionState === \Plasma\DriverInterface::CONNECTION_OK) {
155
            return \React\Promise\resolve();
156
        } elseif($this->connectPromise !== null) {
157
            return $this->connectPromise;
158
        }
159
        
160
        if(\strpos($uri, '://') === false) {
161
            $uri = 'tcp://'.$uri;
162
        }
163
        
164
        $parts = \parse_url($uri);
165
        if(!isset($parts['scheme']) || !isset($parts['host']) || !\in_array($parts['scheme'], $this->allowedSchemes)) {
166
            return \React\Promise\reject((new \InvalidArgumentException('Invalid connect uri given')));
167
        }
168
        
169
        if($parts['scheme'] === 'mysql') {
170
            $parts['scheme'] = 'tcp';
171
        }
172
        
173
        $host = $parts['scheme'].'://'.$parts['host'].($parts['scheme'] !== 'unix' ? ':'.($parts['port'] ?? 3306) : '');
174
        $this->connectionState = static::CONNECTION_STARTED;
175
        $resolved = false;
176
        
177
        $this->connectPromise =  $this->connector->connect($host)->then(function (\React\Socket\ConnectionInterface $connection) use ($parts, &$resolved) {
178
            // See description of property encryption
179
            if(!($connection instanceof \React\Socket\Connection)) {
180
                throw new \LogicException('Custom connection class is NOT supported yet (encryption limitation)');
181
            }
182
            
183
            $this->busy = static::STATE_BUSY;
184
            $this->connectionState = static::CONNECTION_MADE;
185
            $this->connection = $connection;
186
            
187
            $this->connection->on('error', function (\Throwable $error) {
188
                $this->emit('error', array($error));
189
            });
190
            
191
            $this->connection->on('close', function () {
192
                $this->connection = null;
193
                $this->connectionState = static::CONNECTION_UNUSABLE;
194
                
195
                $this->emit('close');
196
            });
197
            
198
            $deferred = new \React\Promise\Deferred();
199
            $this->parser = new \Plasma\Drivers\MySQL\ProtocolParser($this, $this->connection);
200
            
201
            $this->parser->on('error', function (\Throwable $error) use (&$deferred, &$resolved) {
202
                if($resolved) {
203
                    $this->emit('error', array($error));
204
                } else {
205
                    $deferred->reject($error);
206
                }
207
            });
208
            
209
            $user = ($parts['user'] ?? 'root');
210
            $password = ($parts['pass'] ?? '');
211
            $db = (!empty($parts['path']) ? \ltrim($parts['path'], '/') : '');
212
            
213
            $credentials = \compact('user', 'password', 'db');
214
            
215
            $this->startHandshake($credentials, $deferred);
216
            return $deferred->promise()->then(function () use (&$resolved) {
217
                $this->busy = static::STATE_IDLE;
218
                $resolved = true;
219
                
220
                if(\count($this->queue) > 0) {
221
                    $this->parser->invokeCommand($this->getNextCommand());
222
                }
223
            });
224
        });
225
        
226
        if($this->options['characters.set']) {
227
            $this->charset = $this->options['characters.set'];
228
            $this->connectPromise = $this->connectPromise->then(function () {
229
                $query = 'SET NAMES "'.$this->options['characters.set'].'"'
230
                    .($this->options['characters.collate'] ? ' COLLATE "'.$this->options['characters.collate'].'"' : '');
231
                
232
                $cmd = new \Plasma\Drivers\MySQL\Commands\QueryCommand($this, $query);
233
                $this->executeCommand($cmd);
234
                
235
                return $cmd->getPromise();
236
            });
237
        }
238
        
239
        return $this->connectPromise;
240
    }
241
    
242
    /**
243
     * Pauses the underlying stream I/O consumption.
244
     * If consumption is already paused, this will do nothing.
245
     * @return bool  Whether the operation was successful.
246
     */
247
    function pauseStreamConsumption(): bool {
248
        if($this->connection === null || $this->goingAway) {
249
            return false;
250
        }
251
        
252
        $this->connection->pause();
253
        return true;
254
    }
255
    
256
    /**
257
     * Resumes the underlying stream I/O consumption.
258
     * If consumption is not paused, this will do nothing.
259
     * @return bool  Whether the operation was successful.
260
     */
261
    function resumeStreamConsumption(): bool {
262
        if($this->connection === null || $this->goingAway) {
263
            return false;
264
        }
265
        
266
        $this->connection->resume();
267
        return true;
268
    }
269
    
270
    /**
271
     * Closes all connections gracefully after processing all outstanding requests.
272
     * @return \React\Promise\PromiseInterface
273
     */
274
    function close(): \React\Promise\PromiseInterface {
275
        if($this->goingAway) {
276
            return $this->goingAway->promise();
277
        }
278
        
279
        $state = $this->connectionState;
280
        $this->connectionState = \Plasma\DriverInterface::CONNECTION_UNUSABLE;
281
        
282
        $this->goingAway = new \React\Promise\Deferred();
283
        
284
        if(\count($this->queue) === 0) {
285
            $this->goingAway->resolve();
286
        }
287
        
288
        return $this->goingAway->promise()->then(function () use ($state) {
289
            if($state !== static::CONNECTION_OK) {
290
                return;
291
            }
292
            
293
            $deferred = new \React\Promise\Deferred();
294
            
295
            $quit = new \Plasma\Drivers\MySQL\Commands\QuitCommand();
296
            
297
            $this->connection->once('close', function () use (&$deferred) {
298
                $deferred->resolve();
299
            });
300
            
301
            $quit->once('end', function () {
302
                $this->connection->close();
303
            });
304
            
305
            $this->parser->invokeCommand($quit);
306
            
307
            return $deferred->promise();
308
        });
309
    }
310
    
311
    /**
312
     * Forcefully closes the connection, without waiting for any outstanding requests. This will reject all outstanding requests.
313
     * @return void
314
     */
315
    function quit(): void {
316
        if($this->goingAway) {
317
            return;
318
        }
319
        
320
        $state = $this->connectionState;
321
        $this->connectionState = \Plasma\DriverInterface::CONNECTION_UNUSABLE;
322
        
323
        $this->goingAway = new \React\Promise\Deferred();
324
        $this->goingAway->resolve();
325
        
326
        /** @var \Plasma\Drivers\MySQL\Commands\CommandInterface  $command */
327
        while($command = \array_shift($this->queue)) {
328
            $command->emit('error', array((new \Plasma\Exception('Connection is going away'))));
329
        }
330
        
331
        if($state === static::CONNECTION_OK) {
332
            $quit = new \Plasma\Drivers\MySQL\Commands\QuitCommand();
333
            $this->parser->invokeCommand($quit);
334
            
335
            $this->connection->close();
336
        }
337
    }
338
    
339
    /**
340
     * Whether this driver is currently in a transaction.
341
     * @return bool
342
     */
343
    function isInTransaction(): bool {
344
        return $this->transaction;
345
    }
346
    
347
    /**
348
     * Executes a plain query. Resolves with a `QueryResultInterface` instance.
349
     * When the command is done, the driver must check itself back into the client.
350
     * @param \Plasma\ClientInterface  $client
351
     * @param string                   $query
352
     * @return \React\Promise\PromiseInterface
353
     * @throws \Plasma\Exception
354
     * @see \Plasma\QueryResultInterface
355
     */
356
    function query(\Plasma\ClientInterface $client, string $query): \React\Promise\PromiseInterface {
357
        if($this->goingAway) {
358
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
359
        } elseif($this->connectionState !== \Plasma\DriverInterface::CONNECTION_OK) {
360
            if($this->connectPromise !== null) {
361
                return $this->connectPromise->then(function () use (&$client, &$query) {
362
                    return $this->query($client, $query);
363
                });
364
            }
365
            
366
            throw new \Plasma\Exception('Unable to continue without connection');
367
        }
368
        
369
        $command = new \Plasma\Drivers\MySQL\Commands\QueryCommand($this, $query);
370
        $this->executeCommand($command);
371
        
372
        if(!$this->transaction) {
373
            $command->once('end', function () use (&$client) {
374
                $client->checkinConnection($this);
375
            });
376
        }
377
        
378
        return $command->getPromise();
379
    }
380
    
381
    /**
382
     * Prepares a query. Resolves with a `StatementInterface` instance.
383
     * When the command is done, the driver must check itself back into the client.
384
     * @param \Plasma\ClientInterface  $client
385
     * @param string                   $query
386
     * @return \React\Promise\PromiseInterface
387
     * @throws \Plasma\Exception
388
     * @see \Plasma\StatementInterface
389
     */
390
    function prepare(\Plasma\ClientInterface $client, string $query): \React\Promise\PromiseInterface {
391
        if($this->goingAway) {
392
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
393
        } elseif($this->connectionState !== \Plasma\DriverInterface::CONNECTION_OK) {
394
            if($this->connectPromise !== null) {
395
                return $this->connectPromise->then(function () use (&$client, &$query) {
396
                    return $this->prepare($client, $query);
397
                });
398
            }
399
            
400
            throw new \Plasma\Exception('Unable to continue without connection');
401
        }
402
        
403
        $command = new \Plasma\Drivers\MySQL\Commands\StatementPrepareCommand($client, $this, $query);
404
        $this->executeCommand($command);
405
        
406
        return $command->getPromise();
407
    }
408
    
409
    /**
410
     * Prepares and executes a query. Resolves with a `QueryResultInterface` instance.
411
     * This is equivalent to prepare -> execute -> close.
412
     * If you need to execute a query multiple times, prepare the query manually for performance reasons.
413
     * @param \Plasma\ClientInterface  $client
414
     * @param string                   $query
415
     * @param array                    $params
416
     * @return \React\Promise\PromiseInterface
417
     * @throws \Plasma\Exception
418
     * @see \Plasma\StatementInterface
419
     */
420
    function execute(\Plasma\ClientInterface $client, string $query, array $params = array()): \React\Promise\PromiseInterface {
421
        if($this->goingAway) {
422
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
423
        } elseif($this->connectionState !== \Plasma\DriverInterface::CONNECTION_OK) {
424
            if($this->connectPromise !== null) {
425
                return $this->connectPromise->then(function () use (&$client, &$query, $params) {
426
                    return $this->execute($client, $query, $params);
427
                });
428
            }
429
            
430
            throw new \Plasma\Exception('Unable to continue without connection');
431
        }
432
        
433
        return $this->prepare($client, $query)->then(function (\Plasma\StatementInterface $statement) use ($params) {
434
            return $statement->execute($params)->then(function (\Plasma\QueryResultInterface $result) use (&$statement) {
435
                if($result instanceof \Plasma\StreamQueryResultInterface) {
436
                    $statement->close(null, function (\Throwable $error) {
437
                        $this->emit('error', array($error));
438
                    });
439
                    
440
                    return $result;
441
                }
442
                
443
                return $statement->close()->then(function () use ($result) {
444
                    return $result;
445
                });
446
            }, function (\Throwable $error) use (&$statement) {
447
                return $statement->close()->then(function () use ($error) {
448
                    throw $error;
449
                });
450
            });
451
        });
452
    }
453
    
454
    /**
455
     * Quotes the string for use in the query.
456
     * @param string  $str
457
     * @param int     $type  For types, see the constants.
458
     * @return string
459
     * @throws \LogicException  Thrown if the driver does not support quoting.
460
     * @throws \Plasma\Exception
461
     */
462
    function quote(string $str, int $type = \Plasma\DriverInterface::QUOTE_TYPE_VALUE): string {
463
        if($this->parser === null) {
464
            throw new \Plasma\Exception('Unable to continue without connection');
465
        }
466
        
467
        $message = $this->parser->getLastOkMessage();
468
        if($message === null) {
469
            $message = $this->parser->getHandshakeMessage();
470
            
471
            if($message === null) {
472
                throw new \Plasma\Exception('Unable to quote without a previous handshake');
473
            }
474
        }
475
        
476
        $pos = \strpos($this->charset, '_');
477
        $dbCharset = \substr($this->charset, 0, ($pos !== false ? $pos : \strlen($this->charset)));
478
        $realCharset = $this->getRealCharset($dbCharset);
479
        
480
        if(($message->statusFlags & \Plasma\Drivers\MySQL\StatusFlags::SERVER_STATUS_NO_BACKSLASH_ESCAPES) !== 0) {
481
            return $this->escapeUsingQuotes($realCharset, $str, $type);
482
        }
483
        
484
        return $this->escapeUsingBackslashes($realCharset, $str, $type);
485
    }
486
    
487
    /**
488
     * Escapes using quotes.
489
     * @param string  $realCharset
490
     * @param string  $str
491
     * @param int     $type
492
     * @return string
493
     */
494
    function escapeUsingQuotes(string $realCharset, string $str, int $type): string {
495
        if($type === \Plasma\DriverInterface::QUOTE_TYPE_IDENTIFIER) {
496
            return '`'.\str_replace('`', '``', $str).'`';
497
        }
498
        
499
        $escapeChars = array(
500
            '"',
501
            '\\',
502
        );
503
        
504
        $escapeReplace = array(
505
            '""',
506
            '\\\\',
507
        );
508
        
509
        return '"'.\str_replace($escapeChars, $escapeReplace, $str).'"';
510
    }
511
    
512
    /**
513
     * Escapes using backslashes.
514
     * @param string  $realCharset
515
     * @param string  $str
516
     * @param int     $type
517
     * @return string
518
     */
519
    function escapeUsingBackslashes(string $realCharset, string $str, int $type): string {
520
        if($type === \Plasma\DriverInterface::QUOTE_TYPE_IDENTIFIER) {
521
            return '`'.\str_replace('`', '\\`', $str).'`';
522
        }
523
        
524
        $escapeChars = array(
525
            '\\',
526
            '"',
527
        );
528
        
529
        $escapeReplace = array(
530
            '\\\\',
531
            '\"',
532
        );
533
        
534
        return '"'.\str_replace($escapeChars, $escapeReplace, $str).'"';
535
    }
536
    
537
    /**
538
     * Begins a transaction. Resolves with a `TransactionInterface` instance.
539
     *
540
     * Checks out a connection until the transaction gets committed or rolled back.
541
     * It must be noted that the user is responsible for finishing the transaction. The client WILL NOT automatically
542
     * check the connection back into the pool, as long as the transaction is not finished.
543
     *
544
     * Some databases, including MySQL, automatically issue an implicit COMMIT when a database definition language (DDL)
545
     * statement such as DROP TABLE or CREATE TABLE is issued within a transaction.
546
     * The implicit COMMIT will prevent you from rolling back any other changes within the transaction boundary.
547
     * @param \Plasma\ClientInterface  $client
548
     * @param int                      $isolation  See the `TransactionInterface` constants.
549
     * @return \React\Promise\PromiseInterface
550
     * @throws \Plasma\Exception
551
     * @see \Plasma\TransactionInterface
552
     */
553
    function beginTransaction(\Plasma\ClientInterface $client, int $isolation = \Plasma\TransactionInterface::ISOLATION_COMMITTED): \React\Promise\PromiseInterface {
554
        if($this->goingAway) {
555
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
556
        }
557
        
558
        if($this->transaction) {
559
            throw new \Plasma\Exception('Driver is already in transaction');
560
        }
561
        
562
        switch ($isolation) {
563
            case \Plasma\TransactionInterface::ISOLATION_UNCOMMITTED:
564
                $query = 'SET SESSION TRANSACTION ISOLATION LEVEL READ UNCOMMITTED';
565
            break;
566
            case \Plasma\TransactionInterface::ISOLATION_COMMITTED:
567
                $query = 'SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED';
568
            break;
569
            case \Plasma\TransactionInterface::ISOLATION_REPEATABLE:
570
                $query = 'SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ';
571
            break;
572
            case \Plasma\TransactionInterface::ISOLATION_SERIALIZABLE:
573
                $query = 'SET SESSION TRANSACTION ISOLATION LEVEL SERIALIZABLE';
574
            break;
575
            default:
576
                throw new \Plasma\Exception('Invalid isolation level given');
577
            break;
578
        }
579
        
580
        $this->transaction = true;
581
        
582
        return $this->query($client, $query)->then(function () use (&$client) {
583
            return $this->query($client, 'START TRANSACTION');
584
        })->then(function () use (&$client, $isolation) {
585
            return (new \Plasma\Transaction($client, $this, $isolation));
586
        })->then(null, function (\Throwable $e) {
587
            $this->transaction = false;
588
            throw $e;
589
        });
590
    }
591
    
592
    /**
593
     * Informationally closes a transaction. This method is used by `Transaction` to inform the driver of the end of the transaction.
594
     * @return void
595
     */
596
    function endTransaction(): void {
597
        $this->transaction = false;
598
    }
599
    
600
    /**
601
     * Runs the given command.
602
     * Returns a Promise, which resolves with the `end` event argument (defaults to `null),
603
     * or rejects with the `Throwable` of the `error` event.
604
     * When the command is done, the driver must check itself back into the client.
605
     * @param \Plasma\ClientInterface   $client
606
     * @param \Plasma\CommandInterface  $command
607
     * @return \React\Promise\PromiseInterface
608
     */
609
    function runCommand(\Plasma\ClientInterface $client, \Plasma\CommandInterface $command) {
610
        if($this->goingAway) {
611
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
612
        }
613
        
614
        return (new \React\Promise\Promise(function (callable $resolve, callable $reject) use (&$client, &$command) {
615
            $command->once('end', function ($value = null) use (&$client, &$resolve) {
616
                if(!$this->transaction) {
617
                    $client->checkinConnection($this);
618
                }
619
                
620
                $resolve($value);
621
            });
622
            
623
            $command->once('error', function (\Throwable $error) use (&$client, &$reject) {
624
                if(!$this->transaction) {
625
                    $client->checkinConnection($this);
626
                }
627
                
628
                $reject($error);
629
            });
630
            
631
            $this->executeCommand($command);
632
        }));
633
    }
634
    
635
    /**
636
     * Executes a command.
637
     * @param \Plasma\CommandInterface  $command
638
     * @return void
639
     * @internal
640
     */
641
    function executeCommand(\Plasma\CommandInterface $command): void {
642
        $this->queue[] = $command;
643
        
644
        if($this->parser && $this->busy === static::STATE_IDLE) {
645
            $this->parser->invokeCommand($this->getNextCommand());
646
        }
647
    }
648
    
649
    /**
650
     * Get the handshake message, or null if none received yet.
651
     * @return \Plasma\Drivers\MySQL\Messages\HandshakeMessage|null
652
     */
653
    function getHandshake(): ?\Plasma\Drivers\MySQL\Messages\HandshakeMessage {
654
        if($this->parser) {
655
            return $this->parser->getHandshakeMessage();
656
        }
657
        
658
        return null;
659
    }
660
    
661
    /**
662
     * Get the next command, or null.
663
     * @return \Plasma\CommandInterface|null
664
     * @internal
665
     */
666
    function getNextCommand(): ?\Plasma\CommandInterface {
667
        if(\count($this->queue) === 0) {
668
            if($this->goingAway) {
669
                $this->goingAway->resolve();
670
            }
671
            
672
            return null;
673
        } elseif($this->busy === static::STATE_BUSY) {
674
            return null;
675
        }
676
        
677
        /** @var \Plasma\CommandInterface  $command */
678
        $command =  \array_shift($this->queue);
679
        
680
        if($command->waitForCompletion()) {
681
            $this->busy = static::STATE_BUSY;
682
            
683
            $command->once('error', function () use (&$command) {
684
                $this->busy = static::STATE_IDLE;
685
                
686
                $this->endCommand();
687
            });
688
            
689
            $command->once('end', function () use (&$command) {
690
                $this->busy = static::STATE_IDLE;
691
                
692
                $this->endCommand();
693
            });
694
        } else {
695
            $this->endCommand();
696
        }
697
        
698
        return $command;
699
    }
700
    
701
    /**
702
     * Finishes up a command.
703
     * @return void
704
     */
705
    protected function endCommand() {
706
        $this->loop->futureTick(function () {
707
            if($this->goingAway && \count($this->queue) === 0) {
708
                return $this->goingAway->resolve();
709
            }
710
            
711
            $this->parser->invokeCommand($this->getNextCommand());
712
        });
713
    }
714
    
715
    /**
716
     * Starts the handshake process.
717
     * @param array                    $credentials
718
     * @param \React\Promise\Deferred  $deferred
719
     * @return void
720
     */
721
    protected function startHandshake(array $credentials, \React\Promise\Deferred $deferred) {
722
        $listener = function (\Plasma\Drivers\MySQL\Messages\MessageInterface $message) use ($credentials, &$deferred, &$listener) {
723
            if($message instanceof \Plasma\Drivers\MySQL\Messages\HandshakeMessage) {
724
                $this->parser->removeListener('message', $listener);
725
                
726
                $this->connectionState = static::CONNECTION_SETENV;
727
                $clientFlags = \Plasma\Drivers\MySQL\ProtocolParser::CLIENT_CAPABILITIES;
728
                
729
                \extract($credentials);
730
                
731
                if($db !== '') {
732
                    $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_CONNECT_WITH_DB;
733
                }
734
                
735
                if($this->charset === null) {
736
                    $this->charset = \Plasma\Drivers\MySQL\CharacterSetFlags::CHARSET_MAP[$message->characterSet] ?? 'latin1';
737
                }
738
                
739
                if(($message->capability & \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_COMPRESS) !== 0 && \extension_loaded('zlib') && $this->options['compression.enable']) {
740
                    $this->parser->enableCompression();
741
                    $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_COMPRESS;
742
                }
743
                
744
                // Check if we support auth plugins
745
                $plugins = \Plasma\Drivers\MySQL\DriverFactory::getAuthPlugins();
746
                $plugin = null;
747
                
748
                foreach($plugins as $key => $plug) {
749
                    if(\is_int($key) && ($message->capability & $key) !== 0) {
750
                        $plugin = $plug;
751
                        $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_PLUGIN_AUTH;
752
                        break;
753
                    } elseif($key === $message->authPluginName) {
754
                        $plugin = $plug;
755
                        $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_PLUGIN_AUTH;
756
                        break;
757
                    }
758
                }
759
                
760
                $remote = \parse_url($this->connection->getRemoteAddress())['host'];
761
                
762
                if($remote !== '127.0.0.1' || $this->options['tls.forceLocal']) {
763
                    if(($message->capability & \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_SSL) !== 0) { // If SSL supported, connect through SSL
764
                        $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_SSL;
765
                        
766
                        $ssl = new \Plasma\Drivers\MySQL\Commands\SSLRequestCommand($message, $clientFlags);
767
                        
768
                        $ssl->once('end', function () use ($credentials, $clientFlags, $plugin, &$deferred, &$message) {
769
                            $this->connectionState = static::CONNECTION_SSL_STARTUP;
770
                            
771
                            $this->enableTLS()->then(function () use ($credentials, $clientFlags, $plugin, &$deferred, &$message) {
772
                                $this->createHandshakeResponse($message, $credentials, $clientFlags, $plugin, $deferred);
773
                            }, function (\Throwable $error) use (&$deferred) {
774
                                $deferred->reject($$error);
775
                                $this->connection->close();
776
                            });
777
                        });
778
                        
779
                        return $this->parser->invokeCommand($ssl);
780
                    } elseif($this->options['tls.force'] || $this->options['tls.forceLocal']) {
781
                        $deferred->reject((new \Plasma\Exception('TLS is not supported by the server')));
782
                        $this->connection->close();
783
                        return;
784
                    }
785
                }
786
                
787
                $this->createHandshakeResponse($message, $credentials, $clientFlags, $plugin, $deferred);
788
            }
789
        };
790
        
791
        $this->parser->on('message', $listener);
792
        
793
        $this->parser->on('message', function (\Plasma\Drivers\MySQL\Messages\MessageInterface $message) {
794
            if($message instanceof \Plasma\Drivers\MySQL\Messages\OkResponseMessage) {
795
                $this->connectionState = static::CONNECTION_OK;
796
            }
797
            
798
            $this->emit('eventRelay', array('message', $message));
799
        });
800
    }
801
    
802
    /**
803
     * Enables TLS on the connection.
804
     * @return \React\Promise\PromiseInterface
805
     */
806
    protected function enableTLS(): \React\Promise\PromiseInterface {
807
        // Set required SSL/TLS context options
808
        foreach($this->options['tls.context'] as $name => $value) {
809
            \stream_context_set_option($this->connection->stream, 'ssl', $name, $value);
810
        }
811
        
812
        return $this->encryption->enable($this->connection)->then(null, function (\Throwable $error) {
813
            $this->connection->close();
814
            throw new \RuntimeException('Connection failed during TLS handshake: '.$error->getMessage(), $error->getCode());
815
        });
816
    }
817
    
818
    /**
819
     * Sends the auth command.
820
     * @param \Plasma\Drivers\MySQL\Messages\HandshakeMessage  $message
821
     * @param array                                            $credentials
822
     * @param int                                              $clientFlags
823
     * @param string|null                                      $plugin
824
     * @param \React\Promise\Deferred                          $deferred
825
     * @return void
826
     */
827
    protected function createHandshakeResponse(
828
        \Plasma\Drivers\MySQL\Messages\HandshakeMessage $message, array $credentials, int $clientFlags, ?string $plugin, \React\Promise\Deferred $deferred
829
    ) {
830
        \extract($credentials);
831
        
832
        $auth = new \Plasma\Drivers\MySQL\Commands\HandshakeResponseCommand($this->parser, $message, $clientFlags, $plugin, $user, $password, $db);
833
        
834
        $auth->once('end', function () use (&$deferred) {
835
            $this->loop->futureTick(function () use (&$deferred) {
836
                $deferred->resolve();
837
            });
838
        });
839
        
840
        $auth->once('error', function (\Throwable $error) use (&$deferred) {
841
            $deferred->reject($error);
842
            $this->connection->close();
843
        });
844
        
845
        if($plugin) {
846
            $listener = function (\Plasma\Drivers\MySQL\Messages\MessageInterface $message) use ($password, &$deferred, &$listener) {
847
                /** @var \Plasma\Drivers\MySQL\AuthPlugins\AuthPluginInterface|null  $plugin */
848
                static $plugin;
849
                
850
                if($message instanceof \Plasma\Drivers\MySQL\Messages\AuthSwitchRequestMessage) {
851
                    $name = $message->authPluginName;
852
                    
853
                    if($name !== null) {
854
                        $plugins = \Plasma\Drivers\MySQL\DriverFactory::getAuthPlugins();
855
                        foreach($plugins as $key => $plug) {
856
                            if($key === $name) {
857
                                $plugin = new $plug($this->parser, $this->parser->getHandshakeMessage());
858
                                
859
                                $command = new \Plasma\Drivers\MySQL\Commands\AuthSwitchResponseCommand($message, $plugin, $password);
860
                                return $this->parser->invokeCommand($command);
861
                            }
862
                        }
863
                    }
864
                    
865
                    $deferred->reject((new \Plasma\Exception('Requested authentication method '.($name ? '"'.$name.'" ' : '').'is not supported')));
866
                } elseif($message instanceof \Plasma\Drivers\MySQL\Messages\AuthMoreDataMessage) {
867
                    if($plugin === null) {
868
                        $deferred->reject((new \Plasma\Exception('No auth plugin is in use, but we received auth more data packet')));
869
                        return $this->connection->close();
870
                    }
871
                    
872
                    try {
873
                        $command = $plugin->receiveMoreData($message);
874
                        return $this->parser->invokeCommand($command);
875
                    } catch (\Plasma\Exception $e) {
876
                        $deferred->reject($e);
877
                        $this->connection->close();
878
                    }
879
                } elseif($message instanceof \Plasma\Drivers\MySQL\Messages\OkResponseMessage) {
880
                    $this->parser->removeListener('message', $listener);
881
                }
882
            };
883
            
884
            $this->parser->on('message', $listener);
885
        }
886
        
887
        $this->parser->invokeCommand($auth);
888
        $this->connectionState = static::CONNECTION_AWAITING_RESPONSE;
889
    }
890
    
891
    /**
892
     * Get the real charset from the DB charset.
893
     * @param string  $charset
894
     * @return string
895
     */
896
    protected function getRealCharset(string $charset): string {
897
        if(\substr($charset, 0, 4) === 'utf8') {
898
            return 'UTF-8';
899
        }
900
        
901
        $charsets = \mb_list_encodings();
902
        
903
        foreach($charsets as $set) {
904
            if(\stripos($set, $charset) === 0) {
905
                return $set;
906
            }
907
        }
908
        
909
        return 'UTF-8';
910
    }
911
    
912
    /**
913
     * Validates the given options.
914
     * @param array  $options
915
     * @return void
916
     * @throws \InvalidArgumentException
917
     */
918
    protected function validateOptions(array $options) {
919
        \CharlotteDunois\Validation\Validator::make($options, array(
920
            'characters.set' => 'string',
921
            'characters.collate' => 'string',
922
            'compression.enable' => 'bool',
923
            'connector' => 'class:'.\React\Socket\ConnectorInterface::class.'=object',
924
            'tls.context' => 'array',
925
            'tls.force' => 'boolean',
926
            'tls.forceLocal' => 'boolean'
927
        ))->throw(\InvalidArgumentException::class);
928
    }
929
}
930