GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Test Failed
Push — master ( 0fd036...ae5f5e )
by Charlotte
02:20
created

Driver::prepare()   A

Complexity

Conditions 4
Paths 4

Size

Total Lines 17
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 9
CRAP Score 4.0961

Importance

Changes 0
Metric Value
cc 4
eloc 10
nc 4
nop 2
dl 0
loc 17
ccs 9
cts 11
cp 0.8182
crap 4.0961
rs 9.9332
c 0
b 0
f 0
1
<?php
2
/**
3
 * Plasma Driver MySQL component
4
 * Copyright 2018-2019 PlasmaPHP, All Rights Reserved
5
 *
6
 * Website: https://github.com/PlasmaPHP
7
 * License: https://github.com/PlasmaPHP/driver-mysql/blob/master/LICENSE
8
*/
9
10
namespace Plasma\Drivers\MySQL;
11
12
/**
13
 * The MySQL Driver.
14
 * @internal
15
 */
16
class Driver implements \Plasma\DriverInterface {
17
    use \Evenement\EventEmitterTrait;
18
    
19
    /**
20
     * @var \React\EventLoop\LoopInterface
21
     */
22
    protected $loop;
23
    
24
    /**
25
     * @var array
26
     */
27
    protected $options = array(
28
        'characters.set' => 'utf8mb4',
29
        'characters.collate' => null,
30
        'compression.enable' => !true,
31
        'tls.context' => array(),
32
        'tls.force' => true,
33
        'tls.forceLocal' => false
34
    );
35
    
36
    /**
37
     * @var string[]
38
     */
39
    protected $allowedSchemes = array('mysql', 'tcp', 'tls', 'unix');
40
    
41
    /**
42
     * @var \React\Socket\ConnectorInterface
43
     */
44
    protected $connector;
45
    
46
    /**
47
     * Internal class is intentional used, as there's no other way currently.
48
     * @var \React\Socket\StreamEncryption
49
     * @see https://github.com/reactphp/socket/issues/180
50
     */
51
    protected $encryption;
52
    
53
    /**
54
     * @var \React\Promise\Promise|null
55
     */
56
    protected $connectPromise;
57
    
58
    /**
59
     * @var \React\Socket\Connection
60
     */
61
    protected $connection;
62
    
63
    /**
64
     * @var int
65
     */
66
    protected $connectionState = \Plasma\DriverInterface::CONNECTION_CLOSED;
67
    
68
    /**
69
     * @var \Plasma\Drivers\MySQL\ProtocolParser
70
     */
71
    protected $parser;
72
    
73
    /**
74
     * @var array
75
     */
76
    protected $queue;
77
    
78
    /**
79
     * @var int
80
     */
81
    protected $busy = \Plasma\DriverInterface::STATE_IDLE;
82
    
83
    /**
84
     * @var bool
85
     */
86
    protected $transaction = false;
87
    
88
    /**
89
     * @var \React\Promise\Deferred
90
     */
91
    protected $goingAway;
92
    
93
    /**
94
     * @var string|null
95
     */
96
    protected $charset;
97
    
98
    /**
99
     * Constructor.
100
     * @param \React\EventLoop\LoopInterface  $loop
101
     * @param array                           $options
102
     */
103 87
    function __construct(\React\EventLoop\LoopInterface $loop, array $options) {
104 87
        $this->validateOptions($options);
105
        
106 87
        $this->loop = $loop;
107 87
        $this->options = \array_merge($this->options, $options);
108
        
109 87
        $this->connector = ($options['connector'] ?? (new \React\Socket\Connector($loop)));
110 87
        $this->encryption = new \React\Socket\StreamEncryption($this->loop, false);
111 87
        $this->queue = array();
112 87
    }
113
    
114
    /**
115
     * Destructor.
116
     * @return void
117
     * @internal
118
     */
119 24
    function __destruct() {
120 24
        $this->close();
121 24
    }
122
    
123
    /**
124
     * Returns the event loop.
125
     * @return \React\EventLoop\LoopInterface
126
     */
127 60
    function getLoop(): \React\EventLoop\LoopInterface {
128 60
        return $this->loop;
129
    }
130
    
131
    /**
132
     * Retrieves the current connection state.
133
     * @return int
134
     */
135 10
    function getConnectionState(): int {
136 10
        return $this->connectionState;
137
    }
138
    
139
    /**
140
     * Retrieves the current busy state.
141
     * @return int
142
     */
143 1
    function getBusyState(): int {
144 1
        return $this->busy;
145
    }
146
    
147
    /**
148
     * Get the length of the driver backlog queue.
149
     * @return int
150
     */
151 1
    function getBacklogLength(): int {
152 1
        return \count($this->queue);
153
    }
154
    
155
    /**
156
     * Connects to the given URI.
157
     * @param string  $uri
158
     * @return \React\Promise\PromiseInterface
159
     */
160 65
    function connect(string $uri): \React\Promise\PromiseInterface {
161 65
        if($this->goingAway || $this->connectionState === \Plasma\DriverInterface::CONNECTION_UNUSABLE) {
162 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
163 64
        } elseif($this->connectionState === \Plasma\DriverInterface::CONNECTION_OK) {
164 1
            return \React\Promise\resolve();
165 64
        } elseif($this->connectPromise !== null) {
166 1
            return $this->connectPromise;
167
        }
168
        
169 64
        $pos = \strpos($uri, '://');
170 64
        $unix = false;
0 ignored issues
show
Unused Code introduced by
The assignment to $unix is dead and can be removed.
Loading history...
171
        
172 64
        if($pos === false) {
173 2
            $uri = 'tcp://'.$uri;
174 62
        } elseif(\substr($uri, 0, $pos) === 'unix') {
175 1
            $defaultSocket = '/tmp/mysql.sock';
176
            
177 1
            $apos = \strpos($uri, '@');
178 1
            $spos = \strrpos($uri, '/');
179
            
180 1
            if($apos !== false) {
181 1
                $socket = \substr($uri, ($apos + 1), ($spos - ($apos + 1)));
182 1
                $uri = \substr($uri, 0, ($apos + 1)).'localhost'.\substr($uri, $spos);
183
            } else {
184
                $socket = \substr($uri, ($pos + 3), ($spos - ($pos + 3)));
185
                $uri = 'unix://localhost'.\substr($uri, $spos);
186
            }
187
            
188 1
            if($socket === 'localhost' || $socket === '127.0.0.1' || $socket === '::1') {
189
                $socket = $defaultSocket;
190
            }
191
        }
192
        
193 64
        $parts = \parse_url($uri);
194 64
        if(!isset($parts['scheme']) || !isset($parts['host']) || !\in_array($parts['scheme'], $this->allowedSchemes)) {
195 2
            return \React\Promise\reject((new \InvalidArgumentException('Invalid connect uri given')));
196
        }
197
        
198 62
        if(isset($socket)) {
199 1
            $parts['host'] = $socket;
200
        }
201
        
202 62
        if($parts['scheme'] === 'mysql') {
203 1
            $parts['scheme'] = 'tcp';
204
        }
205
        
206 62
        $host = $parts['scheme'].'://'.$parts['host'].($parts['scheme'] !== 'unix' ? ':'.($parts['port'] ?? 3306) : '');
207 62
        $this->connectionState = static::CONNECTION_STARTED;
208 62
        $resolved = false;
209
        
210
        $this->connectPromise =  $this->connector->connect($host)->then(function (\React\Socket\ConnectionInterface $connection) use ($parts, &$resolved) {
211
            // See description of property encryption
212 62
            if(!($connection instanceof \React\Socket\Connection)) {
213
                throw new \LogicException('Custom connection class is NOT supported yet (encryption limitation)');
214
            }
215
            
216 62
            $this->busy = static::STATE_BUSY;
217 62
            $this->connectionState = static::CONNECTION_MADE;
218 62
            $this->connection = $connection;
219
            
220
            $this->connection->on('error', function (\Throwable $error) {
221
                $this->emit('error', array($error));
222 62
            });
223
            
224
            $this->connection->on('close', function () {
225 2
                $this->connection = null;
226 2
                $this->connectionState = static::CONNECTION_UNUSABLE;
227
                
228 2
                $this->emit('close');
229 62
            });
230
            
231 62
            $deferred = new \React\Promise\Deferred();
232 62
            $this->parser = new \Plasma\Drivers\MySQL\ProtocolParser($this, $this->connection);
233
            
234
            $this->parser->on('error', function (\Throwable $error) use (&$deferred, &$resolved) {
235
                if($resolved) {
236
                    $this->emit('error', array($error));
237
                } else {
238
                    $deferred->reject($error);
239
                }
240 62
            });
241
            
242 62
            $user = ($parts['user'] ?? 'root');
243 62
            $password = ($parts['pass'] ?? '');
244 62
            $db = \ltrim(($parts['path'] ?? ''), '/');
245
            
246 62
            $credentials = \compact('user', 'password', 'db');
247
            
248 62
            $this->startHandshake($credentials, $deferred);
249
            return $deferred->promise()->then(function () use (&$resolved) {
250 61
                $this->busy = static::STATE_IDLE;
251 61
                $resolved = true;
252
                
253 61
                if(\count($this->queue) > 0) {
254
                    $this->parser->invokeCommand($this->getNextCommand());
255
                }
256 62
            });
257 62
        });
258
        
259 62
        if($this->options['characters.set']) {
260 60
            $this->charset = $this->options['characters.set'];
261
            $this->connectPromise = $this->connectPromise->then(function () {
262 59
                $query = 'SET NAMES "'.$this->options['characters.set'].'"'
263 59
                    .($this->options['characters.collate'] ? ' COLLATE "'.$this->options['characters.collate'].'"' : '');
264
                
265 59
                $cmd = new \Plasma\Drivers\MySQL\Commands\QueryCommand($this, $query);
266 59
                $this->executeCommand($cmd);
267
                
268 59
                return $cmd->getPromise();
269 60
            });
270
        }
271
        
272 62
        return $this->connectPromise;
273
    }
274
    
275
    /**
276
     * Pauses the underlying stream I/O consumption.
277
     * If consumption is already paused, this will do nothing.
278
     * @return bool  Whether the operation was successful.
279
     */
280 1
    function pauseStreamConsumption(): bool {
281 1
        if($this->connection === null || $this->goingAway) {
282 1
            return false;
283
        }
284
        
285
        $this->connection->pause();
286
        return true;
287
    }
288
    
289
    /**
290
     * Resumes the underlying stream I/O consumption.
291
     * If consumption is not paused, this will do nothing.
292
     * @return bool  Whether the operation was successful.
293
     */
294 1
    function resumeStreamConsumption(): bool {
295 1
        if($this->connection === null || $this->goingAway) {
296 1
            return false;
297
        }
298
        
299
        $this->connection->resume();
300
        return true;
301
    }
302
    
303
    /**
304
     * Closes all connections gracefully after processing all outstanding requests.
305
     * @return \React\Promise\PromiseInterface
306
     */
307 25
    function close(): \React\Promise\PromiseInterface {
308 25
        if($this->goingAway) {
309 10
            return $this->goingAway->promise();
310
        }
311
        
312 15
        $state = $this->connectionState;
313 15
        $this->connectionState = \Plasma\DriverInterface::CONNECTION_UNUSABLE;
314
        
315
        // Connection is still pending
316 15
        if($this->connectPromise !== null) {
317 1
            $this->connectPromise->cancel();
318
            
319
            /** @var \Plasma\Drivers\MySQL\Commands\CommandInterface  $command */
320 1
            while($command = \array_shift($this->queue)) {
321 1
                $command->emit('error', array((new \Plasma\Exception('Connection is going away'))));
322
            }
323
        }
324
        
325 15
        $this->goingAway = new \React\Promise\Deferred();
326
        
327 15
        if(\count($this->queue) === 0 || $state < \Plasma\DriverInterface::CONNECTION_OK) {
328 15
            $this->queue = array();
329 15
            $this->goingAway->resolve();
330
        }
331
        
332
        return $this->goingAway->promise()->then(function () use ($state) {
333 15
            if($state !== static::CONNECTION_OK) {
334 14
                return;
335 1
            } elseif(!$this->connection->isWritable()) {
336
                $this->connection->close();
337
                return;
338
            }
339
            
340 1
            $deferred = new \React\Promise\Deferred();
341
            
342 1
            $quit = new \Plasma\Drivers\MySQL\Commands\QuitCommand();
343
            
344
            $this->connection->once('close', function () use (&$deferred) {
345
                $deferred->resolve();
346 1
            });
347
            
348
            $quit->once('end', function () {
349
                $this->connection->close();
350 1
            });
351
            
352 1
            $this->parser->invokeCommand($quit);
353
            
354 1
            return $deferred->promise();
355 15
        });
356
    }
357
    
358
    /**
359
     * Forcefully closes the connection, without waiting for any outstanding requests. This will reject all outstanding requests.
360
     * @return void
361
     */
362 11
    function quit(): void {
363 11
        if($this->goingAway === null) {
364 11
            $this->goingAway = new \React\Promise\Deferred();
365
        }
366
        
367 11
        $state = $this->connectionState;
368 11
        $this->connectionState = \Plasma\DriverInterface::CONNECTION_UNUSABLE;
369
        
370
        /** @var \Plasma\Drivers\MySQL\Commands\CommandInterface  $command */
371 11
        while($command = \array_shift($this->queue)) {
372 1
            $command->emit('error', array((new \Plasma\Exception('Connection is going away'))));
373
        }
374
        
375 11
        if($this->connectPromise !== null) {
376 1
            $this->connectPromise->cancel();
377
        }
378
        
379 11
        if($state === static::CONNECTION_OK) {
380 1
            $quit = new \Plasma\Drivers\MySQL\Commands\QuitCommand();
381 1
            $this->parser->invokeCommand($quit);
382
            
383 1
            $this->connection->close();
384
        }
385
        
386 11
        $this->goingAway->resolve();
387 11
    }
388
    
389
    /**
390
     * Whether this driver is currently in a transaction.
391
     * @return bool
392
     */
393 2
    function isInTransaction(): bool {
394 2
        return $this->transaction;
395
    }
396
    
397
    /**
398
     * Executes a plain query. Resolves with a `QueryResultInterface` instance.
399
     * When the command is done, the driver must check itself back into the client.
400
     * @param \Plasma\ClientInterface  $client
401
     * @param string                   $query
402
     * @return \React\Promise\PromiseInterface
403
     * @throws \Plasma\Exception
404
     * @see \Plasma\QueryResultInterface
405
     */
406 48
    function query(\Plasma\ClientInterface $client, string $query): \React\Promise\PromiseInterface {
407 48
        if($this->goingAway) {
408 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
409 47
        } elseif($this->connectionState !== \Plasma\DriverInterface::CONNECTION_OK) {
410 1
            if($this->connectPromise !== null) {
411
                return $this->connectPromise->then(function () use (&$client, &$query) {
412
                    return $this->query($client, $query);
413
                });
414
            }
415
            
416 1
            throw new \Plasma\Exception('Unable to continue without connection');
417
        }
418
        
419 46
        $command = new \Plasma\Drivers\MySQL\Commands\QueryCommand($this, $query);
420 46
        $this->executeCommand($command);
421
        
422 46
        if(!$this->transaction) {
423
            $command->once('end', function () use (&$client) {
424 44
                $client->checkinConnection($this);
425 44
            });
426
        }
427
        
428 46
        return $command->getPromise();
429
    }
430
    
431
    /**
432
     * Prepares a query. Resolves with a `StatementInterface` instance.
433
     * When the command is done, the driver must check itself back into the client.
434
     * @param \Plasma\ClientInterface  $client
435
     * @param string                   $query
436
     * @return \React\Promise\PromiseInterface
437
     * @throws \Plasma\Exception
438
     * @see \Plasma\StatementInterface
439
     */
440 39
    function prepare(\Plasma\ClientInterface $client, string $query): \React\Promise\PromiseInterface {
441 39
        if($this->goingAway) {
442 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
443 38
        } elseif($this->connectionState !== \Plasma\DriverInterface::CONNECTION_OK) {
444 1
            if($this->connectPromise !== null) {
445
                return $this->connectPromise->then(function () use (&$client, &$query) {
446
                    return $this->prepare($client, $query);
447
                });
448
            }
449
            
450 1
            throw new \Plasma\Exception('Unable to continue without connection');
451
        }
452
        
453 37
        $command = new \Plasma\Drivers\MySQL\Commands\StatementPrepareCommand($client, $this, $query);
454 37
        $this->executeCommand($command);
455
        
456 37
        return $command->getPromise();
457
    }
458
    
459
    /**
460
     * Prepares and executes a query. Resolves with a `QueryResultInterface` instance.
461
     * This is equivalent to prepare -> execute -> close.
462
     * If you need to execute a query multiple times, prepare the query manually for performance reasons.
463
     * @param \Plasma\ClientInterface  $client
464
     * @param string                   $query
465
     * @param array                    $params
466
     * @return \React\Promise\PromiseInterface
467
     * @throws \Plasma\Exception
468
     * @see \Plasma\StatementInterface
469
     */
470 38
    function execute(\Plasma\ClientInterface $client, string $query, array $params = array()): \React\Promise\PromiseInterface {
471 38
        if($this->goingAway) {
472 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
473 37
        } elseif($this->connectionState !== \Plasma\DriverInterface::CONNECTION_OK) {
474 1
            if($this->connectPromise !== null) {
475
                return $this->connectPromise->then(function () use (&$client, &$query, $params) {
476
                    return $this->execute($client, $query, $params);
477
                });
478
            }
479
            
480 1
            throw new \Plasma\Exception('Unable to continue without connection');
481
        }
482
        
483
        return $this->prepare($client, $query)->then(function (\Plasma\StatementInterface $statement) use ($params) {
484
            return $statement->execute($params)->then(function (\Plasma\QueryResultInterface $result) use (&$statement) {
485 35
                if($result instanceof \Plasma\StreamQueryResultInterface) {
486
                    $statement->close()->then(null, function (\Throwable $error) {
487
                        $this->emit('error', array($error));
488 35
                    });
489
                    
490 35
                    return $result;
491
                }
492
                
493
                return $statement->close()->then(function () use ($result) {
494 33
                    return $result;
495 33
                });
496
            }, function (\Throwable $error) use (&$statement) {
497
                return $statement->close()->then(function () use ($error) {
498
                    throw $error;
499
                });
500 36
            });
501 36
        });
502
    }
503
    
504
    /**
505
     * Quotes the string for use in the query.
506
     * @param string  $str
507
     * @param int     $type  For types, see the constants.
508
     * @return string
509
     * @throws \LogicException  Thrown if the driver does not support quoting.
510
     * @throws \Plasma\Exception
511
     */
512 5
    function quote(string $str, int $type = \Plasma\DriverInterface::QUOTE_TYPE_VALUE): string {
513 5
        if($this->parser === null) {
514 1
            throw new \Plasma\Exception('Unable to continue without connection');
515
        }
516
        
517 4
        $message = $this->parser->getLastOkMessage();
518 4
        if($message === null) {
519
            $message = $this->parser->getHandshakeMessage();
520
            
521
            if($message === null) {
522
                throw new \Plasma\Exception('Unable to quote without a previous handshake');
523
            }
524
        }
525
        
526 4
        $pos = \strpos($this->charset, '_');
527 4
        $dbCharset = \substr($this->charset, 0, ($pos !== false ? $pos : \strlen($this->charset)));
528 4
        $realCharset = $this->getRealCharset($dbCharset);
529
        
530 4
        if(($message->statusFlags & \Plasma\Drivers\MySQL\StatusFlags::SERVER_STATUS_NO_BACKSLASH_ESCAPES) !== 0) {
531
            return $this->escapeUsingQuotes($realCharset, $str, $type);
532
        }
533
        
534 4
        return $this->escapeUsingBackslashes($realCharset, $str, $type);
535
    }
536
    
537
    /**
538
     * Escapes using quotes.
539
     * @param string  $realCharset
540
     * @param string  $str
541
     * @param int     $type
542
     * @return string
543
     */
544 2
    function escapeUsingQuotes(string $realCharset, string $str, int $type): string {
545 2
        if($type === \Plasma\DriverInterface::QUOTE_TYPE_IDENTIFIER) {
546 1
            return '`'.\str_replace('`', '``', $str).'`';
547
        }
548
        
549
        $escapeChars = array(
550 1
            '"',
551
            '\\',
552
        );
553
        
554
        $escapeReplace = array(
555 1
            '""',
556
            '\\\\',
557
        );
558
        
559 1
        return '"'.\str_replace($escapeChars, $escapeReplace, $str).'"';
560
    }
561
    
562
    /**
563
     * Escapes using backslashes.
564
     * @param string  $realCharset
565
     * @param string  $str
566
     * @param int     $type
567
     * @return string
568
     */
569 6
    function escapeUsingBackslashes(string $realCharset, string $str, int $type): string {
570 6
        if($type === \Plasma\DriverInterface::QUOTE_TYPE_IDENTIFIER) {
571 3
            return '`'.\str_replace('`', '\\`', $str).'`';
572
        }
573
        
574
        $escapeChars = array(
575 3
            '\\',
576
            '"',
577
        );
578
        
579
        $escapeReplace = array(
580 3
            '\\\\',
581
            '\"',
582
        );
583
        
584 3
        return '"'.\str_replace($escapeChars, $escapeReplace, $str).'"';
585
    }
586
    
587
    /**
588
     * Begins a transaction. Resolves with a `TransactionInterface` instance.
589
     *
590
     * Checks out a connection until the transaction gets committed or rolled back.
591
     * It must be noted that the user is responsible for finishing the transaction. The client WILL NOT automatically
592
     * check the connection back into the pool, as long as the transaction is not finished.
593
     *
594
     * Some databases, including MySQL, automatically issue an implicit COMMIT when a database definition language (DDL)
595
     * statement such as DROP TABLE or CREATE TABLE is issued within a transaction.
596
     * The implicit COMMIT will prevent you from rolling back any other changes within the transaction boundary.
597
     * @param \Plasma\ClientInterface  $client
598
     * @param int                      $isolation  See the `TransactionInterface` constants.
599
     * @return \React\Promise\PromiseInterface
600
     * @throws \Plasma\Exception
601
     * @see \Plasma\TransactionInterface
602
     */
603 3
    function beginTransaction(\Plasma\ClientInterface $client, int $isolation = \Plasma\TransactionInterface::ISOLATION_COMMITTED): \React\Promise\PromiseInterface {
604 3
        if($this->goingAway) {
605 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
606
        }
607
        
608 2
        if($this->transaction) {
609 1
            throw new \Plasma\Exception('Driver is already in transaction');
610
        }
611
        
612
        switch ($isolation) {
613 2
            case \Plasma\TransactionInterface::ISOLATION_UNCOMMITTED:
614
                $query = 'SET SESSION TRANSACTION ISOLATION LEVEL READ UNCOMMITTED';
615
            break;
616 2
            case \Plasma\TransactionInterface::ISOLATION_COMMITTED:
617 2
                $query = 'SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED';
618 2
            break;
619
            case \Plasma\TransactionInterface::ISOLATION_REPEATABLE:
620
                $query = 'SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ';
621
            break;
622
            case \Plasma\TransactionInterface::ISOLATION_SERIALIZABLE:
623
                $query = 'SET SESSION TRANSACTION ISOLATION LEVEL SERIALIZABLE';
624
            break;
625
            default:
626
                throw new \Plasma\Exception('Invalid isolation level given');
627
            break;
628
        }
629
        
630 2
        $this->transaction = true;
631
        
632
        return $this->query($client, $query)->then(function () use (&$client) {
633 2
            return $this->query($client, 'START TRANSACTION');
634
        })->then(function () use (&$client, $isolation) {
635 2
            return (new \Plasma\Transaction($client, $this, $isolation));
636
        })->then(null, function (\Throwable $e) {
637
            $this->transaction = false;
638
            throw $e;
639 2
        });
640
    }
641
    
642
    /**
643
     * Informationally closes a transaction. This method is used by `Transaction` to inform the driver of the end of the transaction.
644
     * @return void
645
     */
646 1
    function endTransaction(): void {
647 1
        $this->transaction = false;
648 1
    }
649
    
650
    /**
651
     * Runs the given command.
652
     * Returns a Promise, which resolves with the `end` event argument (defaults to `null),
653
     * or rejects with the `Throwable` of the `error` event.
654
     * When the command is done, the driver must check itself back into the client.
655
     * @param \Plasma\ClientInterface   $client
656
     * @param \Plasma\CommandInterface  $command
657
     * @return \React\Promise\PromiseInterface
658
     */
659 6
    function runCommand(\Plasma\ClientInterface $client, \Plasma\CommandInterface $command) {
660 6
        if($this->goingAway) {
661 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
662
        }
663
        
664
        return (new \React\Promise\Promise(function (callable $resolve, callable $reject) use (&$client, &$command) {
665
            $command->once('end', function ($value = null) use (&$client, &$resolve) {
666 2
                if(!$this->transaction) {
667 2
                    $client->checkinConnection($this);
668
                }
669
                
670 2
                $resolve($value);
671 5
            });
672
            
673
            $command->once('error', function (\Throwable $error) use (&$client, &$reject) {
674 3
                if(!$this->transaction) {
675 3
                    $client->checkinConnection($this);
676
                }
677
                
678 3
                $reject($error);
679 5
            });
680
            
681 5
            $this->executeCommand($command);
682 5
        }));
683
    }
684
    
685
    /**
686
     * Runs the given SQL querybuilder.
687
     * The driver CAN throw an exception if the given querybuilder is not supported.
688
     * An example would be a SQL querybuilder and a Cassandra driver.
689
     * @param \Plasma\ClientInterface           $client
690
     * @param \Plasma\SQLQuerybuilderInterface  $query
691
     * @return \React\Promise\PromiseInterface
692
     * @throws \Plasma\Exception
693
     */
694 3
    function runQuery(\Plasma\ClientInterface $client, \Plasma\QuerybuilderInterface $query): \React\Promise\PromiseInterface {
695 3
        if($this->goingAway) {
696 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
697
        }
698
        
699 2
        if(!($query instanceof \Plasma\SQLQuerybuilderInterface)) {
700 1
            throw new \Plasma\Exception('Given querybuilder must be a SQL querybuilder');
701
        }
702
        
703 1
        $sql = $query->getQuery();
704 1
        $params = $query->getParameters();
705
        
706 1
        return $this->execute($client, $sql, $params);
707
    }
708
    
709
    /**
710
     * Executes a command.
711
     * @param \Plasma\CommandInterface  $command
712
     * @return void
713
     * @internal
714
     */
715 60
    function executeCommand(\Plasma\CommandInterface $command): void {
716 60
        $this->queue[] = $command;
717
        
718 60
        if($this->parser && $this->busy === static::STATE_IDLE) {
719 59
            $this->parser->invokeCommand($this->getNextCommand());
720
        }
721 60
    }
722
    
723
    /**
724
     * Get the handshake message, or null if none received yet.
725
     * @return \Plasma\Drivers\MySQL\Messages\HandshakeMessage|null
726
     */
727 59
    function getHandshake(): ?\Plasma\Drivers\MySQL\Messages\HandshakeMessage {
728 59
        if($this->parser) {
729 59
            return $this->parser->getHandshakeMessage();
730
        }
731
        
732
        return null;
733
    }
734
    
735
    /**
736
     * Get the next command, or null.
737
     * @return \Plasma\CommandInterface|null
738
     * @internal
739
     */
740 59
    function getNextCommand(): ?\Plasma\CommandInterface {
741 59
        if(\count($this->queue) === 0) {
742 59
            if($this->goingAway) {
743
                $this->goingAway->resolve();
744
            }
745
            
746 59
            return null;
747 59
        } elseif($this->busy === static::STATE_BUSY) {
748
            return null;
749
        }
750
        
751
        /** @var \Plasma\CommandInterface  $command */
752 59
        $command =  \array_shift($this->queue);
753
        
754 59
        if($command->waitForCompletion()) {
755 59
            $this->busy = static::STATE_BUSY;
756
            
757
            $command->once('error', function () use (&$command) {
758 3
                $this->busy = static::STATE_IDLE;
759
                
760 3
                $this->endCommand();
761 59
            });
762
            
763
            $command->once('end', function () use (&$command) {
764 59
                $this->busy = static::STATE_IDLE;
765
                
766 59
                $this->endCommand();
767 59
            });
768
        } else {
769 35
            $this->endCommand();
770
        }
771
        
772 59
        return $command;
773
    }
774
    
775
    /**
776
     * Finishes up a command.
777
     * @return void
778
     */
779 59
    protected function endCommand() {
780
        $this->loop->futureTick(function () {
781 59
            if($this->goingAway && \count($this->queue) === 0) {
782
                return $this->goingAway->resolve();
783
            }
784
            
785 59
            $this->parser->invokeCommand($this->getNextCommand());
786 59
        });
787 59
    }
788
    
789
    /**
790
     * Starts the handshake process.
791
     * @param array                    $credentials
792
     * @param \React\Promise\Deferred  $deferred
793
     * @return void
794
     */
795 62
    protected function startHandshake(array $credentials, \React\Promise\Deferred $deferred) {
796
        $listener = function (\Plasma\Drivers\MySQL\Messages\MessageInterface $message) use ($credentials, &$deferred, &$listener) {
797 62
            if($message instanceof \Plasma\Drivers\MySQL\Messages\HandshakeMessage) {
798 62
                $this->parser->removeListener('message', $listener);
799
                
800 62
                $this->connectionState = static::CONNECTION_SETENV;
801 62
                $clientFlags = \Plasma\Drivers\MySQL\ProtocolParser::CLIENT_CAPABILITIES;
802
                
803 62
                \extract($credentials);
804
                
805 62
                if($db !== '') {
806 46
                    $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_CONNECT_WITH_DB;
807
                }
808
                
809 62
                if($this->charset === null) {
810 2
                    $this->charset = \Plasma\Drivers\MySQL\CharacterSetFlags::CHARSET_MAP[$message->characterSet] ?? 'latin1';
811
                }
812
                
813 62
                if(($message->capability & \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_COMPRESS) !== 0 && \extension_loaded('zlib') && $this->options['compression.enable']) {
814 2
                    $this->parser->enableCompression();
815 2
                    $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_COMPRESS;
816
                }
817
                
818
                // Check if we support auth plugins
819 62
                $plugins = \Plasma\Drivers\MySQL\DriverFactory::getAuthPlugins();
820 62
                $plugin = null;
821
                
822 62
                foreach($plugins as $key => $plug) {
823 62
                    if(\is_int($key) && ($message->capability & $key) !== 0) {
824 62
                        $plugin = $plug;
825 62
                        $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_PLUGIN_AUTH;
826 62
                        break;
827
                    } elseif($key === $message->authPluginName) {
828
                        $plugin = $plug;
829
                        $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_PLUGIN_AUTH;
830
                        break;
831
                    }
832
                }
833
                
834 62
                $remote = \parse_url($this->connection->getRemoteAddress());
835
                
836 62
                if($remote !== false && ($remote['host'] !== '127.0.0.1' || $this->options['tls.forceLocal'])) {
837
                    if(($message->capability & \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_SSL) !== 0) { // If SSL supported, connect through SSL
838
                        $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_SSL;
839
                        
840
                        $ssl = new \Plasma\Drivers\MySQL\Commands\SSLRequestCommand($message, $clientFlags);
841
                        
842
                        $ssl->once('end', function () use ($credentials, $clientFlags, $plugin, &$deferred, &$message) {
843
                            $this->connectionState = static::CONNECTION_SSL_STARTUP;
844
                            
845
                            $this->enableTLS()->then(function () use ($credentials, $clientFlags, $plugin, &$deferred, &$message) {
846
                                $this->createHandshakeResponse($message, $credentials, $clientFlags, $plugin, $deferred);
847
                            }, function (\Throwable $error) use (&$deferred) {
848
                                $deferred->reject($$error);
849
                                $this->connection->close();
850
                            });
851
                        });
852
                        
853
                        return $this->parser->invokeCommand($ssl);
854
                    } elseif($this->options['tls.force'] || $this->options['tls.forceLocal']) {
855
                        $deferred->reject((new \Plasma\Exception('TLS is not supported by the server')));
856
                        $this->connection->close();
857
                        return;
858
                    }
859
                }
860
                
861 62
                $this->createHandshakeResponse($message, $credentials, $clientFlags, $plugin, $deferred);
862
            }
863 62
        };
864
        
865 62
        $this->parser->on('message', $listener);
866
        
867
        $this->parser->on('message', function (\Plasma\Drivers\MySQL\Messages\MessageInterface $message) {
868 62
            if($message instanceof \Plasma\Drivers\MySQL\Messages\OkResponseMessage) {
869 61
                $this->connectionState = static::CONNECTION_OK;
870
            }
871
            
872 62
            $this->emit('eventRelay', array('message', $message));
873 62
        });
874 62
    }
875
    
876
    /**
877
     * Enables TLS on the connection.
878
     * @return \React\Promise\PromiseInterface
879
     */
880
    protected function enableTLS(): \React\Promise\PromiseInterface {
881
        // Set required SSL/TLS context options
882
        foreach($this->options['tls.context'] as $name => $value) {
883
            \stream_context_set_option($this->connection->stream, 'ssl', $name, $value);
884
        }
885
        
886
        return $this->encryption->enable($this->connection)->then(null, function (\Throwable $error) {
887
            $this->connection->close();
888
            throw new \RuntimeException('Connection failed during TLS handshake: '.$error->getMessage(), $error->getCode());
889
        });
890
    }
891
    
892
    /**
893
     * Sends the auth command.
894
     * @param \Plasma\Drivers\MySQL\Messages\HandshakeMessage  $message
895
     * @param array                                            $credentials
896
     * @param int                                              $clientFlags
897
     * @param string|null                                      $plugin
898
     * @param \React\Promise\Deferred                          $deferred
899
     * @return void
900
     */
901 62
    protected function createHandshakeResponse(
902
        \Plasma\Drivers\MySQL\Messages\HandshakeMessage $message, array $credentials, int $clientFlags, ?string $plugin, \React\Promise\Deferred $deferred
903
    ) {
904 62
        \extract($credentials);
905
        
906 62
        $auth = new \Plasma\Drivers\MySQL\Commands\HandshakeResponseCommand($this->parser, $message, $clientFlags, $plugin, $user, $password, $db);
907
        
908
        $auth->once('end', function () use (&$deferred) {
909
            $this->loop->futureTick(function () use (&$deferred) {
910 61
                $deferred->resolve();
911 61
            });
912 62
        });
913
        
914
        $auth->once('error', function (\Throwable $error) use (&$deferred) {
915 1
            $deferred->reject($error);
916 1
            $this->connection->close();
917 62
        });
918
        
919 62
        if($plugin) {
920
            $listener = function (\Plasma\Drivers\MySQL\Messages\MessageInterface $message) use ($password, &$deferred, &$listener) {
921
                /** @var \Plasma\Drivers\MySQL\AuthPlugins\AuthPluginInterface|null  $plugin */
922 62
                static $plugin;
923
                
924 62
                if($message instanceof \Plasma\Drivers\MySQL\Messages\AuthSwitchRequestMessage) {
925
                    $name = $message->authPluginName;
926
                    
927
                    if($name !== null) {
928
                        $plugins = \Plasma\Drivers\MySQL\DriverFactory::getAuthPlugins();
929
                        foreach($plugins as $key => $plug) {
930
                            if($key === $name) {
931
                                $plugin = new $plug($this->parser, $this->parser->getHandshakeMessage());
932
                                
933
                                $command = new \Plasma\Drivers\MySQL\Commands\AuthSwitchResponseCommand($message, $plugin, $password);
934
                                return $this->parser->invokeCommand($command);
935
                            }
936
                        }
937
                    }
938
                    
939
                    $deferred->reject((new \Plasma\Exception('Requested authentication method '.($name ? '"'.$name.'" ' : '').'is not supported')));
940 62
                } elseif($message instanceof \Plasma\Drivers\MySQL\Messages\AuthMoreDataMessage) {
941
                    if($plugin === null) {
942
                        $deferred->reject((new \Plasma\Exception('No auth plugin is in use, but we received auth more data packet')));
943
                        return $this->connection->close();
944
                    }
945
                    
946
                    try {
947
                        $command = $plugin->receiveMoreData($message);
948
                        return $this->parser->invokeCommand($command);
949
                    } catch (\Plasma\Exception $e) {
950
                        $deferred->reject($e);
951
                        $this->connection->close();
952
                    }
953 62
                } elseif($message instanceof \Plasma\Drivers\MySQL\Messages\OkResponseMessage) {
954 61
                    $this->parser->removeListener('message', $listener);
955
                }
956 62
            };
957
            
958 62
            $this->parser->on('message', $listener);
959
        }
960
        
961 62
        $this->parser->invokeCommand($auth);
962 62
        $this->connectionState = static::CONNECTION_AWAITING_RESPONSE;
963 62
    }
964
    
965
    /**
966
     * Get the real charset from the DB charset.
967
     * @param string  $charset
968
     * @return string
969
     */
970 4
    protected function getRealCharset(string $charset): string {
971 4
        if(\substr($charset, 0, 4) === 'utf8') {
972 2
            return 'UTF-8';
973
        }
974
        
975 2
        $charsets = \mb_list_encodings();
976
        
977 2
        foreach($charsets as $set) {
978 2
            if(\stripos($set, $charset) === 0) {
979 2
                return $set;
980
            }
981
        }
982
        
983 2
        return 'UTF-8';
984
    }
985
    
986
    /**
987
     * Validates the given options.
988
     * @param array  $options
989
     * @return void
990
     * @throws \InvalidArgumentException
991
     */
992 87
    protected function validateOptions(array $options) {
993 87
        \CharlotteDunois\Validation\Validator::make($options, array(
994 87
            'characters.set' => 'string',
995
            'characters.collate' => 'string',
996
            'compression.enable' => 'boolean',
997
            'connector' => 'class:'.\React\Socket\ConnectorInterface::class.'=object',
998
            'packet.maxAllowedSize' => 'integer|min:0|max:'.\Plasma\Drivers\MySQL\ProtocolParser::CLIENT_MAX_PACKET_SIZE,
999
            'tls.context' => 'array',
1000
            'tls.force' => 'boolean',
1001
            'tls.forceLocal' => 'boolean'
1002 87
        ), true)->throw(\InvalidArgumentException::class);
1003 87
    }
1004
}
1005