GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Passed
Push — master ( 12f59d...0fd036 )
by Charlotte
02:19
created

Driver::quit()   A

Complexity

Conditions 4
Paths 8

Size

Total Lines 20
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 12
CRAP Score 4

Importance

Changes 0
Metric Value
cc 4
eloc 11
nc 8
nop 0
dl 0
loc 20
ccs 12
cts 12
cp 1
crap 4
rs 9.9
c 0
b 0
f 0
1
<?php
2
/**
3
 * Plasma Driver MySQL component
4
 * Copyright 2018-2019 PlasmaPHP, All Rights Reserved
5
 *
6
 * Website: https://github.com/PlasmaPHP
7
 * License: https://github.com/PlasmaPHP/driver-mysql/blob/master/LICENSE
8
*/
9
10
namespace Plasma\Drivers\MySQL;
11
12
/**
13
 * The MySQL Driver.
14
 * @internal
15
 */
16
class Driver implements \Plasma\DriverInterface {
17
    use \Evenement\EventEmitterTrait;
18
    
19
    /**
20
     * @var \React\EventLoop\LoopInterface
21
     */
22
    protected $loop;
23
    
24
    /**
25
     * @var array
26
     */
27
    protected $options = array(
28
        'characters.set' => 'utf8mb4',
29
        'characters.collate' => null,
30
        'compression.enable' => !true,
31
        'tls.context' => array(),
32
        'tls.force' => true,
33
        'tls.forceLocal' => false
34
    );
35
    
36
    /**
37
     * @var string[]
38
     */
39
    protected $allowedSchemes = array('mysql', 'tcp', 'tls', 'unix');
40
    
41
    /**
42
     * @var \React\Socket\ConnectorInterface
43
     */
44
    protected $connector;
45
    
46
    /**
47
     * Internal class is intentional used, as there's no other way currently.
48
     * @var \React\Socket\StreamEncryption
49
     * @see https://github.com/reactphp/socket/issues/180
50
     */
51
    protected $encryption;
52
    
53
    /**
54
     * @var \React\Promise\Promise|null
55
     */
56
    protected $connectPromise;
57
    
58
    /**
59
     * @var \React\Socket\Connection
60
     */
61
    protected $connection;
62
    
63
    /**
64
     * @var int
65
     */
66
    protected $connectionState = \Plasma\DriverInterface::CONNECTION_CLOSED;
67
    
68
    /**
69
     * @var \Plasma\Drivers\MySQL\ProtocolParser
70
     */
71
    protected $parser;
72
    
73
    /**
74
     * @var array
75
     */
76
    protected $queue;
77
    
78
    /**
79
     * @var int
80
     */
81
    protected $busy = \Plasma\DriverInterface::STATE_IDLE;
82
    
83
    /**
84
     * @var bool
85
     */
86
    protected $transaction = false;
87
    
88
    /**
89
     * @var \React\Promise\Deferred
90
     */
91
    protected $goingAway;
92
    
93
    /**
94
     * @var string|null
95
     */
96
    protected $charset;
97
    
98
    /**
99
     * Constructor.
100
     * @param \React\EventLoop\LoopInterface  $loop
101
     * @param array                           $options
102
     */
103 87
    function __construct(\React\EventLoop\LoopInterface $loop, array $options) {
104 87
        $this->validateOptions($options);
105
        
106 87
        $this->loop = $loop;
107 87
        $this->options = \array_merge($this->options, $options);
108
        
109 87
        $this->connector = ($options['connector'] ?? (new \React\Socket\Connector($loop)));
110 87
        $this->encryption = new \React\Socket\StreamEncryption($this->loop, false);
111 87
        $this->queue = array();
112 87
    }
113
    
114
    /**
115
     * Destructor.
116
     * @return void
117
     * @internal
118
     */
119 24
    function __destruct() {
120 24
        $this->close();
121 24
    }
122
    
123
    /**
124
     * Returns the event loop.
125
     * @return \React\EventLoop\LoopInterface
126
     */
127 60
    function getLoop(): \React\EventLoop\LoopInterface {
128 60
        return $this->loop;
129
    }
130
    
131
    /**
132
     * Retrieves the current connection state.
133
     * @return int
134
     */
135 10
    function getConnectionState(): int {
136 10
        return $this->connectionState;
137
    }
138
    
139
    /**
140
     * Retrieves the current busy state.
141
     * @return int
142
     */
143 1
    function getBusyState(): int {
144 1
        return $this->busy;
145
    }
146
    
147
    /**
148
     * Get the length of the driver backlog queue.
149
     * @return int
150
     */
151 1
    function getBacklogLength(): int {
152 1
        return \count($this->queue);
153
    }
154
    
155
    /**
156
     * Connects to the given URI.
157
     * @param string  $uri
158
     * @return \React\Promise\PromiseInterface
159
     */
160 65
    function connect(string $uri): \React\Promise\PromiseInterface {
161 65
        if($this->goingAway || $this->connectionState === \Plasma\DriverInterface::CONNECTION_UNUSABLE) {
162 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
163 64
        } elseif($this->connectionState === \Plasma\DriverInterface::CONNECTION_OK) {
164 1
            return \React\Promise\resolve();
165 64
        } elseif($this->connectPromise !== null) {
166 1
            return $this->connectPromise;
167
        }
168
        
169 64
        $pos = \strpos($uri, '://');
170 64
        $unix = false;
0 ignored issues
show
Unused Code introduced by
The assignment to $unix is dead and can be removed.
Loading history...
171
        
172 64
        if($pos === false) {
173 2
            $uri = 'tcp://'.$uri;
174 62
        } elseif(\substr($uri, 0, $pos) === 'unix') {
175 1
            $defaultSocket = '/tmp/mysql.sock';
176
            
177 1
            $apos = \strpos($uri, '@');
178 1
            $spos = \strrpos($uri, '/');
179
            
180 1
            if($apos !== false) {
181 1
                $socket = \substr($uri, ($apos + 1), ($spos - ($apos + 1)));
182 1
                $uri = \substr($uri, 0, ($apos + 1)).'localhost'.\substr($uri, $spos);
183
            } else {
184
                $socket = \substr($uri, ($pos + 3), ($spos - ($pos + 3)));
185
                $uri = 'unix://localhost'.\substr($uri, $spos);
186
            }
187
            
188 1
            if($socket === 'localhost' || $socket === '127.0.0.1' || $socket === '::1') {
189
                $socket = $defaultSocket;
190
            }
191
        }
192
        
193 64
        $parts = \parse_url($uri);
194 64
        if(!isset($parts['scheme']) || !isset($parts['host']) || !\in_array($parts['scheme'], $this->allowedSchemes)) {
195 2
            return \React\Promise\reject((new \InvalidArgumentException('Invalid connect uri given')));
196
        }
197
        
198 62
        if(isset($socket)) {
199 1
            $parts['host'] = $socket;
200
        }
201
        
202 62
        if($parts['scheme'] === 'mysql') {
203 1
            $parts['scheme'] = 'tcp';
204
        }
205
        
206 62
        $host = $parts['scheme'].'://'.$parts['host'].($parts['scheme'] !== 'unix' ? ':'.($parts['port'] ?? 3306) : '');
207 62
        $this->connectionState = static::CONNECTION_STARTED;
208 62
        $resolved = false;
209
        
210
        $this->connectPromise =  $this->connector->connect($host)->then(function (\React\Socket\ConnectionInterface $connection) use ($parts, &$resolved) {
211
            // See description of property encryption
212 62
            if(!($connection instanceof \React\Socket\Connection)) {
213
                throw new \LogicException('Custom connection class is NOT supported yet (encryption limitation)');
214
            }
215
            
216 62
            $this->busy = static::STATE_BUSY;
217 62
            $this->connectionState = static::CONNECTION_MADE;
218 62
            $this->connection = $connection;
219
            
220
            $this->connection->on('error', function (\Throwable $error) {
221
                $this->emit('error', array($error));
222 62
            });
223
            
224
            $this->connection->on('close', function () {
225 3
                $this->connection = null;
226 3
                $this->connectionState = static::CONNECTION_UNUSABLE;
227
                
228 3
                $this->emit('close');
229 62
            });
230
            
231 62
            $deferred = new \React\Promise\Deferred();
232 62
            $this->parser = new \Plasma\Drivers\MySQL\ProtocolParser($this, $this->connection);
233
            
234
            $this->parser->on('error', function (\Throwable $error) use (&$deferred, &$resolved) {
235
                if($resolved) {
236
                    $this->emit('error', array($error));
237
                } else {
238
                    $deferred->reject($error);
239
                }
240 62
            });
241
            
242 62
            $user = ($parts['user'] ?? 'root');
243 62
            $password = ($parts['pass'] ?? '');
244 62
            $db = \ltrim(($parts['path'] ?? ''), '/');
245
            
246 62
            $credentials = \compact('user', 'password', 'db');
247
            
248 62
            $this->startHandshake($credentials, $deferred);
249
            return $deferred->promise()->then(function () use (&$resolved) {
250 61
                $this->busy = static::STATE_IDLE;
251 61
                $resolved = true;
252
                
253 61
                if(\count($this->queue) > 0) {
254
                    $this->parser->invokeCommand($this->getNextCommand());
255
                }
256 62
            });
257 62
        });
258
        
259 62
        if($this->options['characters.set']) {
260 60
            $this->charset = $this->options['characters.set'];
261
            $this->connectPromise = $this->connectPromise->then(function () {
262 59
                $query = 'SET NAMES "'.$this->options['characters.set'].'"'
263 59
                    .($this->options['characters.collate'] ? ' COLLATE "'.$this->options['characters.collate'].'"' : '');
264
                
265 59
                $cmd = new \Plasma\Drivers\MySQL\Commands\QueryCommand($this, $query);
266 59
                $this->executeCommand($cmd);
267
                
268 59
                return $cmd->getPromise();
269 60
            });
270
        }
271
        
272 62
        return $this->connectPromise;
273
    }
274
    
275
    /**
276
     * Pauses the underlying stream I/O consumption.
277
     * If consumption is already paused, this will do nothing.
278
     * @return bool  Whether the operation was successful.
279
     */
280 1
    function pauseStreamConsumption(): bool {
281 1
        if($this->connection === null || $this->goingAway) {
282 1
            return false;
283
        }
284
        
285
        $this->connection->pause();
286
        return true;
287
    }
288
    
289
    /**
290
     * Resumes the underlying stream I/O consumption.
291
     * If consumption is not paused, this will do nothing.
292
     * @return bool  Whether the operation was successful.
293
     */
294 1
    function resumeStreamConsumption(): bool {
295 1
        if($this->connection === null || $this->goingAway) {
296 1
            return false;
297
        }
298
        
299
        $this->connection->resume();
300
        return true;
301
    }
302
    
303
    /**
304
     * Closes all connections gracefully after processing all outstanding requests.
305
     * @return \React\Promise\PromiseInterface
306
     */
307 25
    function close(): \React\Promise\PromiseInterface {
308 25
        if($this->goingAway) {
309 11
            return $this->goingAway->promise();
310
        }
311
        
312 15
        if($this->connectionState < \Plasma\DriverInterface::CONNECTION_OK) {
313 14
            $this->goingAway = new \React\Promise\Deferred();
314
            
315 14
            if($this->connection !== null) {
316
                $this->connection->close();
317
            }
318
            
319 14
            $this->goingAway->resolve();
320 14
            return $this->goingAway->promise();
321
        }
322
        
323 1
        $state = $this->connectionState;
324 1
        $this->connectionState = \Plasma\DriverInterface::CONNECTION_UNUSABLE;
325
        
326 1
        $this->goingAway = new \React\Promise\Deferred();
327
        
328 1
        if(\count($this->queue) === 0) {
329
            $this->goingAway->resolve();
330
        }
331
        
332
        return $this->goingAway->promise()->then(function () use ($state) {
333 1
            if($state !== static::CONNECTION_OK) {
334
                return;
335
            }
336
            
337 1
            $deferred = new \React\Promise\Deferred();
338
            
339 1
            $quit = new \Plasma\Drivers\MySQL\Commands\QuitCommand();
340
            
341
            $this->connection->once('close', function () use (&$deferred) {
342 1
                $deferred->resolve();
343 1
            });
344
            
345
            $quit->once('end', function () {
346
                $this->connection->close();
347 1
            });
348
            
349 1
            $this->parser->invokeCommand($quit);
350
            
351 1
            return $deferred->promise();
352 1
        });
353
    }
354
    
355
    /**
356
     * Forcefully closes the connection, without waiting for any outstanding requests. This will reject all outstanding requests.
357
     * @return void
358
     */
359 11
    function quit(): void {
360 11
        if($this->goingAway === null) {
361 11
            $this->goingAway = new \React\Promise\Deferred();
362
        }
363
        
364 11
        $state = $this->connectionState;
365 11
        $this->connectionState = \Plasma\DriverInterface::CONNECTION_UNUSABLE;
366
        
367 11
        $this->goingAway->resolve();
368
        
369
        /** @var \Plasma\Drivers\MySQL\Commands\CommandInterface  $command */
370 11
        while($command = \array_shift($this->queue)) {
371 1
            $command->emit('error', array((new \Plasma\Exception('Connection is going away'))));
372
        }
373
        
374 11
        if($state === static::CONNECTION_OK) {
375 1
            $quit = new \Plasma\Drivers\MySQL\Commands\QuitCommand();
376 1
            $this->parser->invokeCommand($quit);
377
            
378 1
            $this->connection->close();
379
        }
380 11
    }
381
    
382
    /**
383
     * Whether this driver is currently in a transaction.
384
     * @return bool
385
     */
386 2
    function isInTransaction(): bool {
387 2
        return $this->transaction;
388
    }
389
    
390
    /**
391
     * Executes a plain query. Resolves with a `QueryResultInterface` instance.
392
     * When the command is done, the driver must check itself back into the client.
393
     * @param \Plasma\ClientInterface  $client
394
     * @param string                   $query
395
     * @return \React\Promise\PromiseInterface
396
     * @throws \Plasma\Exception
397
     * @see \Plasma\QueryResultInterface
398
     */
399 48
    function query(\Plasma\ClientInterface $client, string $query): \React\Promise\PromiseInterface {
400 48
        if($this->goingAway) {
401 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
402 47
        } elseif($this->connectionState !== \Plasma\DriverInterface::CONNECTION_OK) {
403 1
            if($this->connectPromise !== null) {
404
                return $this->connectPromise->then(function () use (&$client, &$query) {
405
                    return $this->query($client, $query);
406
                });
407
            }
408
            
409 1
            throw new \Plasma\Exception('Unable to continue without connection');
410
        }
411
        
412 46
        $command = new \Plasma\Drivers\MySQL\Commands\QueryCommand($this, $query);
413 46
        $this->executeCommand($command);
414
        
415 46
        if(!$this->transaction) {
416
            $command->once('end', function () use (&$client) {
417 44
                $client->checkinConnection($this);
418 44
            });
419
        }
420
        
421 46
        return $command->getPromise();
422
    }
423
    
424
    /**
425
     * Prepares a query. Resolves with a `StatementInterface` instance.
426
     * When the command is done, the driver must check itself back into the client.
427
     * @param \Plasma\ClientInterface  $client
428
     * @param string                   $query
429
     * @return \React\Promise\PromiseInterface
430
     * @throws \Plasma\Exception
431
     * @see \Plasma\StatementInterface
432
     */
433 39
    function prepare(\Plasma\ClientInterface $client, string $query): \React\Promise\PromiseInterface {
434 39
        if($this->goingAway) {
435 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
436 38
        } elseif($this->connectionState !== \Plasma\DriverInterface::CONNECTION_OK) {
437 1
            if($this->connectPromise !== null) {
438
                return $this->connectPromise->then(function () use (&$client, &$query) {
439
                    return $this->prepare($client, $query);
440
                });
441
            }
442
            
443 1
            throw new \Plasma\Exception('Unable to continue without connection');
444
        }
445
        
446 37
        $command = new \Plasma\Drivers\MySQL\Commands\StatementPrepareCommand($client, $this, $query);
447 37
        $this->executeCommand($command);
448
        
449 37
        return $command->getPromise();
450
    }
451
    
452
    /**
453
     * Prepares and executes a query. Resolves with a `QueryResultInterface` instance.
454
     * This is equivalent to prepare -> execute -> close.
455
     * If you need to execute a query multiple times, prepare the query manually for performance reasons.
456
     * @param \Plasma\ClientInterface  $client
457
     * @param string                   $query
458
     * @param array                    $params
459
     * @return \React\Promise\PromiseInterface
460
     * @throws \Plasma\Exception
461
     * @see \Plasma\StatementInterface
462
     */
463 38
    function execute(\Plasma\ClientInterface $client, string $query, array $params = array()): \React\Promise\PromiseInterface {
464 38
        if($this->goingAway) {
465 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
466 37
        } elseif($this->connectionState !== \Plasma\DriverInterface::CONNECTION_OK) {
467 1
            if($this->connectPromise !== null) {
468
                return $this->connectPromise->then(function () use (&$client, &$query, $params) {
469
                    return $this->execute($client, $query, $params);
470
                });
471
            }
472
            
473 1
            throw new \Plasma\Exception('Unable to continue without connection');
474
        }
475
        
476
        return $this->prepare($client, $query)->then(function (\Plasma\StatementInterface $statement) use ($params) {
477
            return $statement->execute($params)->then(function (\Plasma\QueryResultInterface $result) use (&$statement) {
478 35
                if($result instanceof \Plasma\StreamQueryResultInterface) {
479
                    $statement->close()->then(null, function (\Throwable $error) {
480
                        $this->emit('error', array($error));
481 35
                    });
482
                    
483 35
                    return $result;
484
                }
485
                
486
                return $statement->close()->then(function () use ($result) {
487 33
                    return $result;
488 33
                });
489
            }, function (\Throwable $error) use (&$statement) {
490
                return $statement->close()->then(function () use ($error) {
491
                    throw $error;
492
                });
493 36
            });
494 36
        });
495
    }
496
    
497
    /**
498
     * Quotes the string for use in the query.
499
     * @param string  $str
500
     * @param int     $type  For types, see the constants.
501
     * @return string
502
     * @throws \LogicException  Thrown if the driver does not support quoting.
503
     * @throws \Plasma\Exception
504
     */
505 5
    function quote(string $str, int $type = \Plasma\DriverInterface::QUOTE_TYPE_VALUE): string {
506 5
        if($this->parser === null) {
507 1
            throw new \Plasma\Exception('Unable to continue without connection');
508
        }
509
        
510 4
        $message = $this->parser->getLastOkMessage();
511 4
        if($message === null) {
512
            $message = $this->parser->getHandshakeMessage();
513
            
514
            if($message === null) {
515
                throw new \Plasma\Exception('Unable to quote without a previous handshake');
516
            }
517
        }
518
        
519 4
        $pos = \strpos($this->charset, '_');
520 4
        $dbCharset = \substr($this->charset, 0, ($pos !== false ? $pos : \strlen($this->charset)));
521 4
        $realCharset = $this->getRealCharset($dbCharset);
522
        
523 4
        if(($message->statusFlags & \Plasma\Drivers\MySQL\StatusFlags::SERVER_STATUS_NO_BACKSLASH_ESCAPES) !== 0) {
524
            return $this->escapeUsingQuotes($realCharset, $str, $type);
525
        }
526
        
527 4
        return $this->escapeUsingBackslashes($realCharset, $str, $type);
528
    }
529
    
530
    /**
531
     * Escapes using quotes.
532
     * @param string  $realCharset
533
     * @param string  $str
534
     * @param int     $type
535
     * @return string
536
     */
537 2
    function escapeUsingQuotes(string $realCharset, string $str, int $type): string {
538 2
        if($type === \Plasma\DriverInterface::QUOTE_TYPE_IDENTIFIER) {
539 1
            return '`'.\str_replace('`', '``', $str).'`';
540
        }
541
        
542
        $escapeChars = array(
543 1
            '"',
544
            '\\',
545
        );
546
        
547
        $escapeReplace = array(
548 1
            '""',
549
            '\\\\',
550
        );
551
        
552 1
        return '"'.\str_replace($escapeChars, $escapeReplace, $str).'"';
553
    }
554
    
555
    /**
556
     * Escapes using backslashes.
557
     * @param string  $realCharset
558
     * @param string  $str
559
     * @param int     $type
560
     * @return string
561
     */
562 6
    function escapeUsingBackslashes(string $realCharset, string $str, int $type): string {
563 6
        if($type === \Plasma\DriverInterface::QUOTE_TYPE_IDENTIFIER) {
564 3
            return '`'.\str_replace('`', '\\`', $str).'`';
565
        }
566
        
567
        $escapeChars = array(
568 3
            '\\',
569
            '"',
570
        );
571
        
572
        $escapeReplace = array(
573 3
            '\\\\',
574
            '\"',
575
        );
576
        
577 3
        return '"'.\str_replace($escapeChars, $escapeReplace, $str).'"';
578
    }
579
    
580
    /**
581
     * Begins a transaction. Resolves with a `TransactionInterface` instance.
582
     *
583
     * Checks out a connection until the transaction gets committed or rolled back.
584
     * It must be noted that the user is responsible for finishing the transaction. The client WILL NOT automatically
585
     * check the connection back into the pool, as long as the transaction is not finished.
586
     *
587
     * Some databases, including MySQL, automatically issue an implicit COMMIT when a database definition language (DDL)
588
     * statement such as DROP TABLE or CREATE TABLE is issued within a transaction.
589
     * The implicit COMMIT will prevent you from rolling back any other changes within the transaction boundary.
590
     * @param \Plasma\ClientInterface  $client
591
     * @param int                      $isolation  See the `TransactionInterface` constants.
592
     * @return \React\Promise\PromiseInterface
593
     * @throws \Plasma\Exception
594
     * @see \Plasma\TransactionInterface
595
     */
596 3
    function beginTransaction(\Plasma\ClientInterface $client, int $isolation = \Plasma\TransactionInterface::ISOLATION_COMMITTED): \React\Promise\PromiseInterface {
597 3
        if($this->goingAway) {
598 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
599
        }
600
        
601 2
        if($this->transaction) {
602 1
            throw new \Plasma\Exception('Driver is already in transaction');
603
        }
604
        
605
        switch ($isolation) {
606 2
            case \Plasma\TransactionInterface::ISOLATION_UNCOMMITTED:
607
                $query = 'SET SESSION TRANSACTION ISOLATION LEVEL READ UNCOMMITTED';
608
            break;
609 2
            case \Plasma\TransactionInterface::ISOLATION_COMMITTED:
610 2
                $query = 'SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED';
611 2
            break;
612
            case \Plasma\TransactionInterface::ISOLATION_REPEATABLE:
613
                $query = 'SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ';
614
            break;
615
            case \Plasma\TransactionInterface::ISOLATION_SERIALIZABLE:
616
                $query = 'SET SESSION TRANSACTION ISOLATION LEVEL SERIALIZABLE';
617
            break;
618
            default:
619
                throw new \Plasma\Exception('Invalid isolation level given');
620
            break;
621
        }
622
        
623 2
        $this->transaction = true;
624
        
625
        return $this->query($client, $query)->then(function () use (&$client) {
626 2
            return $this->query($client, 'START TRANSACTION');
627
        })->then(function () use (&$client, $isolation) {
628 2
            return (new \Plasma\Transaction($client, $this, $isolation));
629
        })->then(null, function (\Throwable $e) {
630
            $this->transaction = false;
631
            throw $e;
632 2
        });
633
    }
634
    
635
    /**
636
     * Informationally closes a transaction. This method is used by `Transaction` to inform the driver of the end of the transaction.
637
     * @return void
638
     */
639 1
    function endTransaction(): void {
640 1
        $this->transaction = false;
641 1
    }
642
    
643
    /**
644
     * Runs the given command.
645
     * Returns a Promise, which resolves with the `end` event argument (defaults to `null),
646
     * or rejects with the `Throwable` of the `error` event.
647
     * When the command is done, the driver must check itself back into the client.
648
     * @param \Plasma\ClientInterface   $client
649
     * @param \Plasma\CommandInterface  $command
650
     * @return \React\Promise\PromiseInterface
651
     */
652 6
    function runCommand(\Plasma\ClientInterface $client, \Plasma\CommandInterface $command) {
653 6
        if($this->goingAway) {
654 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
655
        }
656
        
657
        return (new \React\Promise\Promise(function (callable $resolve, callable $reject) use (&$client, &$command) {
658
            $command->once('end', function ($value = null) use (&$client, &$resolve) {
659 3
                if(!$this->transaction) {
660 3
                    $client->checkinConnection($this);
661
                }
662
                
663 3
                $resolve($value);
664 5
            });
665
            
666
            $command->once('error', function (\Throwable $error) use (&$client, &$reject) {
667 2
                if(!$this->transaction) {
668 2
                    $client->checkinConnection($this);
669
                }
670
                
671 2
                $reject($error);
672 5
            });
673
            
674 5
            $this->executeCommand($command);
675 5
        }));
676
    }
677
    
678
    /**
679
     * Runs the given SQL querybuilder.
680
     * The driver CAN throw an exception if the given querybuilder is not supported.
681
     * An example would be a SQL querybuilder and a Cassandra driver.
682
     * @param \Plasma\ClientInterface           $client
683
     * @param \Plasma\SQLQuerybuilderInterface  $query
684
     * @return \React\Promise\PromiseInterface
685
     * @throws \Plasma\Exception
686
     */
687 3
    function runQuery(\Plasma\ClientInterface $client, \Plasma\QuerybuilderInterface $query): \React\Promise\PromiseInterface {
688 3
        if($this->goingAway) {
689 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
690
        }
691
        
692 2
        if(!($query instanceof \Plasma\SQLQuerybuilderInterface)) {
693 1
            throw new \Plasma\Exception('Given querybuilder must be a SQL querybuilder');
694
        }
695
        
696 1
        $sql = $query->getQuery();
697 1
        $params = $query->getParameters();
698
        
699 1
        return $this->execute($client, $sql, $params);
700
    }
701
    
702
    /**
703
     * Executes a command.
704
     * @param \Plasma\CommandInterface  $command
705
     * @return void
706
     * @internal
707
     */
708 60
    function executeCommand(\Plasma\CommandInterface $command): void {
709 60
        $this->queue[] = $command;
710
        
711 60
        if($this->parser && $this->busy === static::STATE_IDLE) {
712 59
            $this->parser->invokeCommand($this->getNextCommand());
713
        }
714 60
    }
715
    
716
    /**
717
     * Get the handshake message, or null if none received yet.
718
     * @return \Plasma\Drivers\MySQL\Messages\HandshakeMessage|null
719
     */
720 59
    function getHandshake(): ?\Plasma\Drivers\MySQL\Messages\HandshakeMessage {
721 59
        if($this->parser) {
722 59
            return $this->parser->getHandshakeMessage();
723
        }
724
        
725
        return null;
726
    }
727
    
728
    /**
729
     * Get the next command, or null.
730
     * @return \Plasma\CommandInterface|null
731
     * @internal
732
     */
733 59
    function getNextCommand(): ?\Plasma\CommandInterface {
734 59
        if(\count($this->queue) === 0) {
735 59
            if($this->goingAway) {
736
                $this->goingAway->resolve();
737
            }
738
            
739 59
            return null;
740 59
        } elseif($this->busy === static::STATE_BUSY) {
741
            return null;
742
        }
743
        
744
        /** @var \Plasma\CommandInterface  $command */
745 59
        $command =  \array_shift($this->queue);
746
        
747 59
        if($command->waitForCompletion()) {
748 59
            $this->busy = static::STATE_BUSY;
749
            
750
            $command->once('error', function () use (&$command) {
751 2
                $this->busy = static::STATE_IDLE;
752
                
753 2
                $this->endCommand();
754 59
            });
755
            
756
            $command->once('end', function () use (&$command) {
757 59
                $this->busy = static::STATE_IDLE;
758
                
759 59
                $this->endCommand();
760 59
            });
761
        } else {
762 35
            $this->endCommand();
763
        }
764
        
765 59
        return $command;
766
    }
767
    
768
    /**
769
     * Finishes up a command.
770
     * @return void
771
     */
772 59
    protected function endCommand() {
773
        $this->loop->futureTick(function () {
774 59
            if($this->goingAway && \count($this->queue) === 0) {
775 1
                return $this->goingAway->resolve();
776
            }
777
            
778 59
            $this->parser->invokeCommand($this->getNextCommand());
779 59
        });
780 59
    }
781
    
782
    /**
783
     * Starts the handshake process.
784
     * @param array                    $credentials
785
     * @param \React\Promise\Deferred  $deferred
786
     * @return void
787
     */
788 62
    protected function startHandshake(array $credentials, \React\Promise\Deferred $deferred) {
789
        $listener = function (\Plasma\Drivers\MySQL\Messages\MessageInterface $message) use ($credentials, &$deferred, &$listener) {
790 62
            if($message instanceof \Plasma\Drivers\MySQL\Messages\HandshakeMessage) {
791 62
                $this->parser->removeListener('message', $listener);
792
                
793 62
                $this->connectionState = static::CONNECTION_SETENV;
794 62
                $clientFlags = \Plasma\Drivers\MySQL\ProtocolParser::CLIENT_CAPABILITIES;
795
                
796 62
                \extract($credentials);
797
                
798 62
                if($db !== '') {
799 46
                    $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_CONNECT_WITH_DB;
800
                }
801
                
802 62
                if($this->charset === null) {
803 2
                    $this->charset = \Plasma\Drivers\MySQL\CharacterSetFlags::CHARSET_MAP[$message->characterSet] ?? 'latin1';
804
                }
805
                
806 62
                if(($message->capability & \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_COMPRESS) !== 0 && \extension_loaded('zlib') && $this->options['compression.enable']) {
807 2
                    $this->parser->enableCompression();
808 2
                    $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_COMPRESS;
809
                }
810
                
811
                // Check if we support auth plugins
812 62
                $plugins = \Plasma\Drivers\MySQL\DriverFactory::getAuthPlugins();
813 62
                $plugin = null;
814
                
815 62
                foreach($plugins as $key => $plug) {
816 62
                    if(\is_int($key) && ($message->capability & $key) !== 0) {
817 62
                        $plugin = $plug;
818 62
                        $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_PLUGIN_AUTH;
819 62
                        break;
820
                    } elseif($key === $message->authPluginName) {
821
                        $plugin = $plug;
822
                        $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_PLUGIN_AUTH;
823
                        break;
824
                    }
825
                }
826
                
827 62
                $remote = \parse_url($this->connection->getRemoteAddress());
828
                
829 62
                if($remote !== false && ($remote['host'] !== '127.0.0.1' || $this->options['tls.forceLocal'])) {
830
                    if(($message->capability & \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_SSL) !== 0) { // If SSL supported, connect through SSL
831
                        $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_SSL;
832
                        
833
                        $ssl = new \Plasma\Drivers\MySQL\Commands\SSLRequestCommand($message, $clientFlags);
834
                        
835
                        $ssl->once('end', function () use ($credentials, $clientFlags, $plugin, &$deferred, &$message) {
836
                            $this->connectionState = static::CONNECTION_SSL_STARTUP;
837
                            
838
                            $this->enableTLS()->then(function () use ($credentials, $clientFlags, $plugin, &$deferred, &$message) {
839
                                $this->createHandshakeResponse($message, $credentials, $clientFlags, $plugin, $deferred);
840
                            }, function (\Throwable $error) use (&$deferred) {
841
                                $deferred->reject($$error);
842
                                $this->connection->close();
843
                            });
844
                        });
845
                        
846
                        return $this->parser->invokeCommand($ssl);
847
                    } elseif($this->options['tls.force'] || $this->options['tls.forceLocal']) {
848
                        $deferred->reject((new \Plasma\Exception('TLS is not supported by the server')));
849
                        $this->connection->close();
850
                        return;
851
                    }
852
                }
853
                
854 62
                $this->createHandshakeResponse($message, $credentials, $clientFlags, $plugin, $deferred);
855
            }
856 62
        };
857
        
858 62
        $this->parser->on('message', $listener);
859
        
860
        $this->parser->on('message', function (\Plasma\Drivers\MySQL\Messages\MessageInterface $message) {
861 62
            if($message instanceof \Plasma\Drivers\MySQL\Messages\OkResponseMessage) {
862 61
                $this->connectionState = static::CONNECTION_OK;
863
            }
864
            
865 62
            $this->emit('eventRelay', array('message', $message));
866 62
        });
867 62
    }
868
    
869
    /**
870
     * Enables TLS on the connection.
871
     * @return \React\Promise\PromiseInterface
872
     */
873
    protected function enableTLS(): \React\Promise\PromiseInterface {
874
        // Set required SSL/TLS context options
875
        foreach($this->options['tls.context'] as $name => $value) {
876
            \stream_context_set_option($this->connection->stream, 'ssl', $name, $value);
877
        }
878
        
879
        return $this->encryption->enable($this->connection)->then(null, function (\Throwable $error) {
880
            $this->connection->close();
881
            throw new \RuntimeException('Connection failed during TLS handshake: '.$error->getMessage(), $error->getCode());
882
        });
883
    }
884
    
885
    /**
886
     * Sends the auth command.
887
     * @param \Plasma\Drivers\MySQL\Messages\HandshakeMessage  $message
888
     * @param array                                            $credentials
889
     * @param int                                              $clientFlags
890
     * @param string|null                                      $plugin
891
     * @param \React\Promise\Deferred                          $deferred
892
     * @return void
893
     */
894 62
    protected function createHandshakeResponse(
895
        \Plasma\Drivers\MySQL\Messages\HandshakeMessage $message, array $credentials, int $clientFlags, ?string $plugin, \React\Promise\Deferred $deferred
896
    ) {
897 62
        \extract($credentials);
898
        
899 62
        $auth = new \Plasma\Drivers\MySQL\Commands\HandshakeResponseCommand($this->parser, $message, $clientFlags, $plugin, $user, $password, $db);
900
        
901
        $auth->once('end', function () use (&$deferred) {
902
            $this->loop->futureTick(function () use (&$deferred) {
903 61
                $deferred->resolve();
904 61
            });
905 62
        });
906
        
907
        $auth->once('error', function (\Throwable $error) use (&$deferred) {
908 1
            $deferred->reject($error);
909 1
            $this->connection->close();
910 62
        });
911
        
912 62
        if($plugin) {
913
            $listener = function (\Plasma\Drivers\MySQL\Messages\MessageInterface $message) use ($password, &$deferred, &$listener) {
914
                /** @var \Plasma\Drivers\MySQL\AuthPlugins\AuthPluginInterface|null  $plugin */
915 62
                static $plugin;
916
                
917 62
                if($message instanceof \Plasma\Drivers\MySQL\Messages\AuthSwitchRequestMessage) {
918
                    $name = $message->authPluginName;
919
                    
920
                    if($name !== null) {
921
                        $plugins = \Plasma\Drivers\MySQL\DriverFactory::getAuthPlugins();
922
                        foreach($plugins as $key => $plug) {
923
                            if($key === $name) {
924
                                $plugin = new $plug($this->parser, $this->parser->getHandshakeMessage());
925
                                
926
                                $command = new \Plasma\Drivers\MySQL\Commands\AuthSwitchResponseCommand($message, $plugin, $password);
927
                                return $this->parser->invokeCommand($command);
928
                            }
929
                        }
930
                    }
931
                    
932
                    $deferred->reject((new \Plasma\Exception('Requested authentication method '.($name ? '"'.$name.'" ' : '').'is not supported')));
933 62
                } elseif($message instanceof \Plasma\Drivers\MySQL\Messages\AuthMoreDataMessage) {
934
                    if($plugin === null) {
935
                        $deferred->reject((new \Plasma\Exception('No auth plugin is in use, but we received auth more data packet')));
936
                        return $this->connection->close();
937
                    }
938
                    
939
                    try {
940
                        $command = $plugin->receiveMoreData($message);
941
                        return $this->parser->invokeCommand($command);
942
                    } catch (\Plasma\Exception $e) {
943
                        $deferred->reject($e);
944
                        $this->connection->close();
945
                    }
946 62
                } elseif($message instanceof \Plasma\Drivers\MySQL\Messages\OkResponseMessage) {
947 61
                    $this->parser->removeListener('message', $listener);
948
                }
949 62
            };
950
            
951 62
            $this->parser->on('message', $listener);
952
        }
953
        
954 62
        $this->parser->invokeCommand($auth);
955 62
        $this->connectionState = static::CONNECTION_AWAITING_RESPONSE;
956 62
    }
957
    
958
    /**
959
     * Get the real charset from the DB charset.
960
     * @param string  $charset
961
     * @return string
962
     */
963 4
    protected function getRealCharset(string $charset): string {
964 4
        if(\substr($charset, 0, 4) === 'utf8') {
965 2
            return 'UTF-8';
966
        }
967
        
968 2
        $charsets = \mb_list_encodings();
969
        
970 2
        foreach($charsets as $set) {
971 2
            if(\stripos($set, $charset) === 0) {
972 2
                return $set;
973
            }
974
        }
975
        
976 2
        return 'UTF-8';
977
    }
978
    
979
    /**
980
     * Validates the given options.
981
     * @param array  $options
982
     * @return void
983
     * @throws \InvalidArgumentException
984
     */
985 87
    protected function validateOptions(array $options) {
986 87
        \CharlotteDunois\Validation\Validator::make($options, array(
987 87
            'characters.set' => 'string',
988
            'characters.collate' => 'string',
989
            'compression.enable' => 'boolean',
990
            'connector' => 'class:'.\React\Socket\ConnectorInterface::class.'=object',
991
            'packet.maxAllowedSize' => 'integer|min:0|max:'.\Plasma\Drivers\MySQL\ProtocolParser::CLIENT_MAX_PACKET_SIZE,
992
            'tls.context' => 'array',
993
            'tls.force' => 'boolean',
994
            'tls.forceLocal' => 'boolean'
995 87
        ), true)->throw(\InvalidArgumentException::class);
996 87
    }
997
}
998