GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Passed
Push — master ( ae5f5e...f4b4f6 )
by Charlotte
02:14
created

Driver::__destruct()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 2
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 1
nc 1
nop 0
dl 0
loc 2
ccs 2
cts 2
cp 1
crap 1
rs 10
c 0
b 0
f 0
1
<?php
2
/**
3
 * Plasma Driver MySQL component
4
 * Copyright 2018-2019 PlasmaPHP, All Rights Reserved
5
 *
6
 * Website: https://github.com/PlasmaPHP
7
 * License: https://github.com/PlasmaPHP/driver-mysql/blob/master/LICENSE
8
*/
9
10
namespace Plasma\Drivers\MySQL;
11
12
/**
13
 * The MySQL Driver.
14
 * @internal
15
 */
16
class Driver implements \Plasma\DriverInterface {
17
    use \Evenement\EventEmitterTrait;
18
    
19
    /**
20
     * @var \React\EventLoop\LoopInterface
21
     */
22
    protected $loop;
23
    
24
    /**
25
     * @var array
26
     */
27
    protected $options = array(
28
        'characters.set' => 'utf8mb4',
29
        'characters.collate' => null,
30
        'compression.enable' => !true,
31
        'tls.context' => array(),
32
        'tls.force' => true,
33
        'tls.forceLocal' => false
34
    );
35
    
36
    /**
37
     * @var string[]
38
     */
39
    protected $allowedSchemes = array('mysql', 'tcp', 'tls', 'unix');
40
    
41
    /**
42
     * @var \React\Socket\ConnectorInterface
43
     */
44
    protected $connector;
45
    
46
    /**
47
     * Internal class is intentional used, as there's no other way currently.
48
     * @var \React\Socket\StreamEncryption
49
     * @see https://github.com/reactphp/socket/issues/180
50
     */
51
    protected $encryption;
52
    
53
    /**
54
     * @var \React\Promise\Promise|null
55
     */
56
    protected $connectPromise;
57
    
58
    /**
59
     * @var \React\Socket\Connection
60
     */
61
    protected $connection;
62
    
63
    /**
64
     * @var int
65
     */
66
    protected $connectionState = \Plasma\DriverInterface::CONNECTION_CLOSED;
67
    
68
    /**
69
     * @var \Plasma\Drivers\MySQL\ProtocolParser
70
     */
71
    protected $parser;
72
    
73
    /**
74
     * @var array
75
     */
76
    protected $queue;
77
    
78
    /**
79
     * @var int
80
     */
81
    protected $busy = \Plasma\DriverInterface::STATE_IDLE;
82
    
83
    /**
84
     * @var bool
85
     */
86
    protected $transaction = false;
87
    
88
    /**
89
     * @var \React\Promise\Deferred
90
     */
91
    protected $goingAway;
92
    
93
    /**
94
     * @var string|null
95
     */
96
    protected $charset;
97
    
98
    /**
99
     * Constructor.
100
     * @param \React\EventLoop\LoopInterface  $loop
101
     * @param array                           $options
102
     */
103 87
    function __construct(\React\EventLoop\LoopInterface $loop, array $options) {
104 87
        $this->validateOptions($options);
105
        
106 87
        $this->loop = $loop;
107 87
        $this->options = \array_merge($this->options, $options);
108
        
109 87
        $this->connector = ($options['connector'] ?? (new \React\Socket\Connector($loop)));
110 87
        $this->encryption = new \React\Socket\StreamEncryption($this->loop, false);
111 87
        $this->queue = array();
112 87
    }
113
    
114
    /**
115
     * Destructor.
116
     * @return void
117
     * @internal
118
     */
119 24
    function __destruct() {
120 24
        $this->close();
121 24
    }
122
    
123
    /**
124
     * Returns the event loop.
125
     * @return \React\EventLoop\LoopInterface
126
     */
127 60
    function getLoop(): \React\EventLoop\LoopInterface {
128 60
        return $this->loop;
129
    }
130
    
131
    /**
132
     * Retrieves the current connection state.
133
     * @return int
134
     */
135 10
    function getConnectionState(): int {
136 10
        return $this->connectionState;
137
    }
138
    
139
    /**
140
     * Retrieves the current busy state.
141
     * @return int
142
     */
143 1
    function getBusyState(): int {
144 1
        return $this->busy;
145
    }
146
    
147
    /**
148
     * Get the length of the driver backlog queue.
149
     * @return int
150
     */
151 1
    function getBacklogLength(): int {
152 1
        return \count($this->queue);
153
    }
154
    
155
    /**
156
     * Connects to the given URI.
157
     * @param string  $uri
158
     * @return \React\Promise\PromiseInterface
159
     */
160 65
    function connect(string $uri): \React\Promise\PromiseInterface {
161 65
        if($this->goingAway || $this->connectionState === \Plasma\DriverInterface::CONNECTION_UNUSABLE) {
162 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
163 64
        } elseif($this->connectionState === \Plasma\DriverInterface::CONNECTION_OK) {
164 1
            return \React\Promise\resolve();
165 64
        } elseif($this->connectPromise !== null) {
166 1
            return $this->connectPromise;
167
        }
168
        
169 64
        $pos = \strpos($uri, '://');
170 64
        $unix = false;
0 ignored issues
show
Unused Code introduced by
The assignment to $unix is dead and can be removed.
Loading history...
171
        
172 64
        if($pos === false) {
173 2
            $uri = 'tcp://'.$uri;
174 62
        } elseif(\substr($uri, 0, $pos) === 'unix') {
175 1
            $defaultSocket = '/tmp/mysql.sock';
176
            
177 1
            $apos = \strpos($uri, '@');
178 1
            $spos = \strrpos($uri, '/');
179
            
180 1
            if($apos !== false) {
181 1
                $socket = \substr($uri, ($apos + 1), ($spos - ($apos + 1)));
182 1
                $uri = \substr($uri, 0, ($apos + 1)).'localhost'.\substr($uri, $spos);
183
            } else {
184
                $socket = \substr($uri, ($pos + 3), ($spos - ($pos + 3)));
185
                $uri = 'unix://localhost'.\substr($uri, $spos);
186
            }
187
            
188 1
            if($socket === 'localhost' || $socket === '127.0.0.1' || $socket === '::1') {
189
                $socket = $defaultSocket;
190
            }
191
        }
192
        
193 64
        $parts = \parse_url($uri);
194 64
        if(!isset($parts['scheme']) || !isset($parts['host']) || !\in_array($parts['scheme'], $this->allowedSchemes)) {
195 2
            return \React\Promise\reject((new \InvalidArgumentException('Invalid connect uri given')));
196
        }
197
        
198 62
        if(isset($socket)) {
199 1
            $parts['host'] = $socket;
200
        }
201
        
202 62
        if($parts['scheme'] === 'mysql') {
203 1
            $parts['scheme'] = 'tcp';
204
        }
205
        
206 62
        $host = $parts['scheme'].'://'.$parts['host'].($parts['scheme'] !== 'unix' ? ':'.($parts['port'] ?? 3306) : '');
207 62
        $this->connectionState = static::CONNECTION_STARTED;
208 62
        $resolved = false;
209
        
210
        $this->connectPromise = $this->connector->connect($host)->then(function (\React\Socket\ConnectionInterface $connection) use ($parts, &$resolved) {
211 62
            $this->connectPromise = null;
212
            
213
            // See description of property encryption
214 62
            if(!($connection instanceof \React\Socket\Connection)) {
215
                throw new \LogicException('Custom connection class is NOT supported yet (encryption limitation)');
216
            }
217
            
218 62
            $this->busy = static::STATE_BUSY;
219 62
            $this->connectionState = static::CONNECTION_MADE;
220 62
            $this->connection = $connection;
221
            
222
            $this->connection->on('error', function (\Throwable $error) {
223
                $this->emit('error', array($error));
224 62
            });
225
            
226
            $this->connection->on('close', function () {
227 3
                $this->connection = null;
228 3
                $this->connectionState = static::CONNECTION_UNUSABLE;
229
                
230 3
                $this->emit('close');
231 62
            });
232
            
233 62
            $deferred = new \React\Promise\Deferred();
234 62
            $this->parser = new \Plasma\Drivers\MySQL\ProtocolParser($this, $this->connection);
235
            
236
            $this->parser->on('error', function (\Throwable $error) use (&$deferred, &$resolved) {
237
                if($resolved) {
238
                    $this->emit('error', array($error));
239
                } else {
240
                    $deferred->reject($error);
241
                }
242 62
            });
243
            
244 62
            $user = ($parts['user'] ?? 'root');
245 62
            $password = ($parts['pass'] ?? '');
246 62
            $db = \ltrim(($parts['path'] ?? ''), '/');
247
            
248 62
            $credentials = \compact('user', 'password', 'db');
249
            
250 62
            $this->startHandshake($credentials, $deferred);
251
            return $deferred->promise()->then(function () use (&$resolved) {
252 61
                $this->busy = static::STATE_IDLE;
253 61
                $resolved = true;
254
                
255 61
                if(\count($this->queue) > 0) {
256
                    $this->parser->invokeCommand($this->getNextCommand());
257
                }
258 62
            });
259 62
        });
260
        
261 62
        if($this->options['characters.set']) {
262 60
            $this->charset = $this->options['characters.set'];
263
            $this->connectPromise = $this->connectPromise->then(function () {
264 59
                $query = 'SET NAMES "'.$this->options['characters.set'].'"'
265 59
                    .($this->options['characters.collate'] ? ' COLLATE "'.$this->options['characters.collate'].'"' : '');
266
                
267 59
                $cmd = new \Plasma\Drivers\MySQL\Commands\QueryCommand($this, $query);
268 59
                $this->executeCommand($cmd);
269
                
270 59
                return $cmd->getPromise();
271 60
            });
272
        }
273
        
274 62
        return $this->connectPromise;
275
    }
276
    
277
    /**
278
     * Pauses the underlying stream I/O consumption.
279
     * If consumption is already paused, this will do nothing.
280
     * @return bool  Whether the operation was successful.
281
     */
282 1
    function pauseStreamConsumption(): bool {
283 1
        if($this->connection === null || $this->goingAway) {
284 1
            return false;
285
        }
286
        
287
        $this->connection->pause();
288
        return true;
289
    }
290
    
291
    /**
292
     * Resumes the underlying stream I/O consumption.
293
     * If consumption is not paused, this will do nothing.
294
     * @return bool  Whether the operation was successful.
295
     */
296 1
    function resumeStreamConsumption(): bool {
297 1
        if($this->connection === null || $this->goingAway) {
298 1
            return false;
299
        }
300
        
301
        $this->connection->resume();
302
        return true;
303
    }
304
    
305
    /**
306
     * Closes all connections gracefully after processing all outstanding requests.
307
     * @return \React\Promise\PromiseInterface
308
     */
309 25
    function close(): \React\Promise\PromiseInterface {
310 25
        if($this->goingAway) {
311 11
            return $this->goingAway->promise();
312
        }
313
        
314 15
        $state = $this->connectionState;
315 15
        $this->connectionState = \Plasma\DriverInterface::CONNECTION_UNUSABLE;
316
        
317
        // Connection is still pending
318 15
        if($this->connectPromise !== null) {
319
            $this->connectPromise->cancel();
320
            
321
            /** @var \Plasma\Drivers\MySQL\Commands\CommandInterface  $command */
322
            while($command = \array_shift($this->queue)) {
323
                $command->emit('error', array((new \Plasma\Exception('Connection is going away'))));
324
            }
325
        }
326
        
327 15
        $this->goingAway = new \React\Promise\Deferred();
328
        
329 15
        if(\count($this->queue) === 0 || $state < \Plasma\DriverInterface::CONNECTION_OK) {
330 14
            $this->queue = array();
331 14
            $this->goingAway->resolve();
332
        }
333
        
334
        return $this->goingAway->promise()->then(function () use ($state) {
335 15
            if($state !== static::CONNECTION_OK) {
336 14
                return;
337 1
            } elseif(!$this->connection->isWritable()) {
338
                $this->connection->close();
339
                return;
340
            }
341
            
342 1
            $deferred = new \React\Promise\Deferred();
343
            
344 1
            $quit = new \Plasma\Drivers\MySQL\Commands\QuitCommand();
345
            
346
            $this->connection->once('close', function () use (&$deferred) {
347 1
                $deferred->resolve();
348 1
            });
349
            
350
            $quit->once('end', function () {
351
                $this->connection->close();
352 1
            });
353
            
354 1
            $this->parser->invokeCommand($quit);
355
            
356 1
            return $deferred->promise();
357 15
        });
358
    }
359
    
360
    /**
361
     * Forcefully closes the connection, without waiting for any outstanding requests. This will reject all outstanding requests.
362
     * @return void
363
     */
364 11
    function quit(): void {
365 11
        if($this->goingAway === null) {
366 11
            $this->goingAway = new \React\Promise\Deferred();
367
        }
368
        
369 11
        $state = $this->connectionState;
370 11
        $this->connectionState = \Plasma\DriverInterface::CONNECTION_UNUSABLE;
371
        
372
        /** @var \Plasma\Drivers\MySQL\Commands\CommandInterface  $command */
373 11
        while($command = \array_shift($this->queue)) {
374 1
            $command->emit('error', array((new \Plasma\Exception('Connection is going away'))));
375
        }
376
        
377 11
        if($this->connectPromise !== null) {
378
            $this->connectPromise->cancel();
379
        }
380
        
381 11
        if($state === static::CONNECTION_OK) {
382 1
            $quit = new \Plasma\Drivers\MySQL\Commands\QuitCommand();
383 1
            $this->parser->invokeCommand($quit);
384
            
385 1
            $this->connection->close();
386
        }
387
        
388 11
        $this->goingAway->resolve();
389 11
    }
390
    
391
    /**
392
     * Whether this driver is currently in a transaction.
393
     * @return bool
394
     */
395 2
    function isInTransaction(): bool {
396 2
        return $this->transaction;
397
    }
398
    
399
    /**
400
     * Executes a plain query. Resolves with a `QueryResultInterface` instance.
401
     * When the command is done, the driver must check itself back into the client.
402
     * @param \Plasma\ClientInterface  $client
403
     * @param string                   $query
404
     * @return \React\Promise\PromiseInterface
405
     * @throws \Plasma\Exception
406
     * @see \Plasma\QueryResultInterface
407
     */
408 48
    function query(\Plasma\ClientInterface $client, string $query): \React\Promise\PromiseInterface {
409 48
        if($this->goingAway) {
410 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
411 47
        } elseif($this->connectionState !== \Plasma\DriverInterface::CONNECTION_OK) {
412 1
            if($this->connectPromise !== null) {
413
                return $this->connectPromise->then(function () use (&$client, &$query) {
414
                    return $this->query($client, $query);
415
                });
416
            }
417
            
418 1
            throw new \Plasma\Exception('Unable to continue without connection');
419
        }
420
        
421 46
        $command = new \Plasma\Drivers\MySQL\Commands\QueryCommand($this, $query);
422 46
        $this->executeCommand($command);
423
        
424 46
        if(!$this->transaction) {
425
            $command->once('end', function () use (&$client) {
426 44
                $client->checkinConnection($this);
427 44
            });
428
        }
429
        
430 46
        return $command->getPromise();
431
    }
432
    
433
    /**
434
     * Prepares a query. Resolves with a `StatementInterface` instance.
435
     * When the command is done, the driver must check itself back into the client.
436
     * @param \Plasma\ClientInterface  $client
437
     * @param string                   $query
438
     * @return \React\Promise\PromiseInterface
439
     * @throws \Plasma\Exception
440
     * @see \Plasma\StatementInterface
441
     */
442 39
    function prepare(\Plasma\ClientInterface $client, string $query): \React\Promise\PromiseInterface {
443 39
        if($this->goingAway) {
444 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
445 38
        } elseif($this->connectionState !== \Plasma\DriverInterface::CONNECTION_OK) {
446 1
            if($this->connectPromise !== null) {
447
                return $this->connectPromise->then(function () use (&$client, &$query) {
448
                    return $this->prepare($client, $query);
449
                });
450
            }
451
            
452 1
            throw new \Plasma\Exception('Unable to continue without connection');
453
        }
454
        
455 37
        $command = new \Plasma\Drivers\MySQL\Commands\StatementPrepareCommand($client, $this, $query);
456 37
        $this->executeCommand($command);
457
        
458 37
        return $command->getPromise();
459
    }
460
    
461
    /**
462
     * Prepares and executes a query. Resolves with a `QueryResultInterface` instance.
463
     * This is equivalent to prepare -> execute -> close.
464
     * If you need to execute a query multiple times, prepare the query manually for performance reasons.
465
     * @param \Plasma\ClientInterface  $client
466
     * @param string                   $query
467
     * @param array                    $params
468
     * @return \React\Promise\PromiseInterface
469
     * @throws \Plasma\Exception
470
     * @see \Plasma\StatementInterface
471
     */
472 38
    function execute(\Plasma\ClientInterface $client, string $query, array $params = array()): \React\Promise\PromiseInterface {
473 38
        if($this->goingAway) {
474 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
475 37
        } elseif($this->connectionState !== \Plasma\DriverInterface::CONNECTION_OK) {
476 1
            if($this->connectPromise !== null) {
477
                return $this->connectPromise->then(function () use (&$client, &$query, $params) {
478
                    return $this->execute($client, $query, $params);
479
                });
480
            }
481
            
482 1
            throw new \Plasma\Exception('Unable to continue without connection');
483
        }
484
        
485
        return $this->prepare($client, $query)->then(function (\Plasma\StatementInterface $statement) use ($params) {
486
            return $statement->execute($params)->then(function (\Plasma\QueryResultInterface $result) use (&$statement) {
487 35
                if($result instanceof \Plasma\StreamQueryResultInterface) {
488
                    $statement->close()->then(null, function (\Throwable $error) {
489
                        $this->emit('error', array($error));
490 35
                    });
491
                    
492 35
                    return $result;
493
                }
494
                
495
                return $statement->close()->then(function () use ($result) {
496 33
                    return $result;
497 33
                });
498
            }, function (\Throwable $error) use (&$statement) {
499
                return $statement->close()->then(function () use ($error) {
500
                    throw $error;
501
                });
502 36
            });
503 36
        });
504
    }
505
    
506
    /**
507
     * Quotes the string for use in the query.
508
     * @param string  $str
509
     * @param int     $type  For types, see the constants.
510
     * @return string
511
     * @throws \LogicException  Thrown if the driver does not support quoting.
512
     * @throws \Plasma\Exception
513
     */
514 5
    function quote(string $str, int $type = \Plasma\DriverInterface::QUOTE_TYPE_VALUE): string {
515 5
        if($this->parser === null) {
516 1
            throw new \Plasma\Exception('Unable to continue without connection');
517
        }
518
        
519 4
        $message = $this->parser->getLastOkMessage();
520 4
        if($message === null) {
521
            $message = $this->parser->getHandshakeMessage();
522
            
523
            if($message === null) {
524
                throw new \Plasma\Exception('Unable to quote without a previous handshake');
525
            }
526
        }
527
        
528 4
        $pos = \strpos($this->charset, '_');
529 4
        $dbCharset = \substr($this->charset, 0, ($pos !== false ? $pos : \strlen($this->charset)));
530 4
        $realCharset = $this->getRealCharset($dbCharset);
531
        
532 4
        if(($message->statusFlags & \Plasma\Drivers\MySQL\StatusFlags::SERVER_STATUS_NO_BACKSLASH_ESCAPES) !== 0) {
533
            return $this->escapeUsingQuotes($realCharset, $str, $type);
534
        }
535
        
536 4
        return $this->escapeUsingBackslashes($realCharset, $str, $type);
537
    }
538
    
539
    /**
540
     * Escapes using quotes.
541
     * @param string  $realCharset
542
     * @param string  $str
543
     * @param int     $type
544
     * @return string
545
     */
546 2
    function escapeUsingQuotes(string $realCharset, string $str, int $type): string {
547 2
        if($type === \Plasma\DriverInterface::QUOTE_TYPE_IDENTIFIER) {
548 1
            return '`'.\str_replace('`', '``', $str).'`';
549
        }
550
        
551
        $escapeChars = array(
552 1
            '"',
553
            '\\',
554
        );
555
        
556
        $escapeReplace = array(
557 1
            '""',
558
            '\\\\',
559
        );
560
        
561 1
        return '"'.\str_replace($escapeChars, $escapeReplace, $str).'"';
562
    }
563
    
564
    /**
565
     * Escapes using backslashes.
566
     * @param string  $realCharset
567
     * @param string  $str
568
     * @param int     $type
569
     * @return string
570
     */
571 6
    function escapeUsingBackslashes(string $realCharset, string $str, int $type): string {
572 6
        if($type === \Plasma\DriverInterface::QUOTE_TYPE_IDENTIFIER) {
573 3
            return '`'.\str_replace('`', '\\`', $str).'`';
574
        }
575
        
576
        $escapeChars = array(
577 3
            '\\',
578
            '"',
579
        );
580
        
581
        $escapeReplace = array(
582 3
            '\\\\',
583
            '\"',
584
        );
585
        
586 3
        return '"'.\str_replace($escapeChars, $escapeReplace, $str).'"';
587
    }
588
    
589
    /**
590
     * Begins a transaction. Resolves with a `TransactionInterface` instance.
591
     *
592
     * Checks out a connection until the transaction gets committed or rolled back.
593
     * It must be noted that the user is responsible for finishing the transaction. The client WILL NOT automatically
594
     * check the connection back into the pool, as long as the transaction is not finished.
595
     *
596
     * Some databases, including MySQL, automatically issue an implicit COMMIT when a database definition language (DDL)
597
     * statement such as DROP TABLE or CREATE TABLE is issued within a transaction.
598
     * The implicit COMMIT will prevent you from rolling back any other changes within the transaction boundary.
599
     * @param \Plasma\ClientInterface  $client
600
     * @param int                      $isolation  See the `TransactionInterface` constants.
601
     * @return \React\Promise\PromiseInterface
602
     * @throws \Plasma\Exception
603
     * @see \Plasma\TransactionInterface
604
     */
605 3
    function beginTransaction(\Plasma\ClientInterface $client, int $isolation = \Plasma\TransactionInterface::ISOLATION_COMMITTED): \React\Promise\PromiseInterface {
606 3
        if($this->goingAway) {
607 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
608
        }
609
        
610 2
        if($this->transaction) {
611 1
            throw new \Plasma\Exception('Driver is already in transaction');
612
        }
613
        
614
        switch ($isolation) {
615 2
            case \Plasma\TransactionInterface::ISOLATION_UNCOMMITTED:
616
                $query = 'SET SESSION TRANSACTION ISOLATION LEVEL READ UNCOMMITTED';
617
            break;
618 2
            case \Plasma\TransactionInterface::ISOLATION_COMMITTED:
619 2
                $query = 'SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED';
620 2
            break;
621
            case \Plasma\TransactionInterface::ISOLATION_REPEATABLE:
622
                $query = 'SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ';
623
            break;
624
            case \Plasma\TransactionInterface::ISOLATION_SERIALIZABLE:
625
                $query = 'SET SESSION TRANSACTION ISOLATION LEVEL SERIALIZABLE';
626
            break;
627
            default:
628
                throw new \Plasma\Exception('Invalid isolation level given');
629
            break;
630
        }
631
        
632 2
        $this->transaction = true;
633
        
634
        return $this->query($client, $query)->then(function () use (&$client) {
635 2
            return $this->query($client, 'START TRANSACTION');
636
        })->then(function () use (&$client, $isolation) {
637 2
            return (new \Plasma\Transaction($client, $this, $isolation));
638
        })->then(null, function (\Throwable $e) {
639
            $this->transaction = false;
640
            throw $e;
641 2
        });
642
    }
643
    
644
    /**
645
     * Informationally closes a transaction. This method is used by `Transaction` to inform the driver of the end of the transaction.
646
     * @return void
647
     */
648 1
    function endTransaction(): void {
649 1
        $this->transaction = false;
650 1
    }
651
    
652
    /**
653
     * Runs the given command.
654
     * Returns a Promise, which resolves with the `end` event argument (defaults to `null),
655
     * or rejects with the `Throwable` of the `error` event.
656
     * When the command is done, the driver must check itself back into the client.
657
     * @param \Plasma\ClientInterface   $client
658
     * @param \Plasma\CommandInterface  $command
659
     * @return \React\Promise\PromiseInterface
660
     */
661 6
    function runCommand(\Plasma\ClientInterface $client, \Plasma\CommandInterface $command) {
662 6
        if($this->goingAway) {
663 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
664
        }
665
        
666
        return (new \React\Promise\Promise(function (callable $resolve, callable $reject) use (&$client, &$command) {
667
            $command->once('end', function ($value = null) use (&$client, &$resolve) {
668 3
                if(!$this->transaction) {
669 3
                    $client->checkinConnection($this);
670
                }
671
                
672 3
                $resolve($value);
673 5
            });
674
            
675
            $command->once('error', function (\Throwable $error) use (&$client, &$reject) {
676 2
                if(!$this->transaction) {
677 2
                    $client->checkinConnection($this);
678
                }
679
                
680 2
                $reject($error);
681 5
            });
682
            
683 5
            $this->executeCommand($command);
684 5
        }));
685
    }
686
    
687
    /**
688
     * Runs the given SQL querybuilder.
689
     * The driver CAN throw an exception if the given querybuilder is not supported.
690
     * An example would be a SQL querybuilder and a Cassandra driver.
691
     * @param \Plasma\ClientInterface           $client
692
     * @param \Plasma\SQLQuerybuilderInterface  $query
693
     * @return \React\Promise\PromiseInterface
694
     * @throws \Plasma\Exception
695
     */
696 3
    function runQuery(\Plasma\ClientInterface $client, \Plasma\QuerybuilderInterface $query): \React\Promise\PromiseInterface {
697 3
        if($this->goingAway) {
698 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
699
        }
700
        
701 2
        if(!($query instanceof \Plasma\SQLQuerybuilderInterface)) {
702 1
            throw new \Plasma\Exception('Given querybuilder must be a SQL querybuilder');
703
        }
704
        
705 1
        $sql = $query->getQuery();
706 1
        $params = $query->getParameters();
707
        
708 1
        return $this->execute($client, $sql, $params);
709
    }
710
    
711
    /**
712
     * Executes a command.
713
     * @param \Plasma\CommandInterface  $command
714
     * @return void
715
     * @internal
716
     */
717 60
    function executeCommand(\Plasma\CommandInterface $command): void {
718 60
        $this->queue[] = $command;
719
        
720 60
        if($this->parser && $this->busy === static::STATE_IDLE) {
721 59
            $this->parser->invokeCommand($this->getNextCommand());
722
        }
723 60
    }
724
    
725
    /**
726
     * Get the handshake message, or null if none received yet.
727
     * @return \Plasma\Drivers\MySQL\Messages\HandshakeMessage|null
728
     */
729 59
    function getHandshake(): ?\Plasma\Drivers\MySQL\Messages\HandshakeMessage {
730 59
        if($this->parser) {
731 59
            return $this->parser->getHandshakeMessage();
732
        }
733
        
734
        return null;
735
    }
736
    
737
    /**
738
     * Get the next command, or null.
739
     * @return \Plasma\CommandInterface|null
740
     * @internal
741
     */
742 59
    function getNextCommand(): ?\Plasma\CommandInterface {
743 59
        if(\count($this->queue) === 0) {
744 59
            if($this->goingAway) {
745
                $this->goingAway->resolve();
746
            }
747
            
748 59
            return null;
749 59
        } elseif($this->busy === static::STATE_BUSY) {
750
            return null;
751
        }
752
        
753
        /** @var \Plasma\CommandInterface  $command */
754 59
        $command =  \array_shift($this->queue);
755
        
756 59
        if($command->waitForCompletion()) {
757 59
            $this->busy = static::STATE_BUSY;
758
            
759
            $command->once('error', function () use (&$command) {
760 2
                $this->busy = static::STATE_IDLE;
761
                
762 2
                $this->endCommand();
763 59
            });
764
            
765
            $command->once('end', function () use (&$command) {
766 59
                $this->busy = static::STATE_IDLE;
767
                
768 59
                $this->endCommand();
769 59
            });
770
        } else {
771 35
            $this->endCommand();
772
        }
773
        
774 59
        return $command;
775
    }
776
    
777
    /**
778
     * Finishes up a command.
779
     * @return void
780
     */
781 59
    protected function endCommand() {
782
        $this->loop->futureTick(function () {
783 59
            if($this->goingAway && \count($this->queue) === 0) {
784 1
                return $this->goingAway->resolve();
785
            }
786
            
787 59
            $this->parser->invokeCommand($this->getNextCommand());
788 59
        });
789 59
    }
790
    
791
    /**
792
     * Starts the handshake process.
793
     * @param array                    $credentials
794
     * @param \React\Promise\Deferred  $deferred
795
     * @return void
796
     */
797 62
    protected function startHandshake(array $credentials, \React\Promise\Deferred $deferred) {
798
        $listener = function (\Plasma\Drivers\MySQL\Messages\MessageInterface $message) use ($credentials, &$deferred, &$listener) {
799 62
            if($message instanceof \Plasma\Drivers\MySQL\Messages\HandshakeMessage) {
800 62
                $this->parser->removeListener('message', $listener);
801
                
802 62
                $this->connectionState = static::CONNECTION_SETENV;
803 62
                $clientFlags = \Plasma\Drivers\MySQL\ProtocolParser::CLIENT_CAPABILITIES;
804
                
805 62
                \extract($credentials);
806
                
807 62
                if($db !== '') {
808 46
                    $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_CONNECT_WITH_DB;
809
                }
810
                
811 62
                if($this->charset === null) {
812 2
                    $this->charset = \Plasma\Drivers\MySQL\CharacterSetFlags::CHARSET_MAP[$message->characterSet] ?? 'latin1';
813
                }
814
                
815 62
                if(($message->capability & \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_COMPRESS) !== 0 && \extension_loaded('zlib') && $this->options['compression.enable']) {
816 2
                    $this->parser->enableCompression();
817 2
                    $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_COMPRESS;
818
                }
819
                
820
                // Check if we support auth plugins
821 62
                $plugins = \Plasma\Drivers\MySQL\DriverFactory::getAuthPlugins();
822 62
                $plugin = null;
823
                
824 62
                foreach($plugins as $key => $plug) {
825 62
                    if(\is_int($key) && ($message->capability & $key) !== 0) {
826 62
                        $plugin = $plug;
827 62
                        $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_PLUGIN_AUTH;
828 62
                        break;
829
                    } elseif($key === $message->authPluginName) {
830
                        $plugin = $plug;
831
                        $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_PLUGIN_AUTH;
832
                        break;
833
                    }
834
                }
835
                
836 62
                $remote = \parse_url($this->connection->getRemoteAddress());
837
                
838 62
                if($remote !== false && ($remote['host'] !== '127.0.0.1' || $this->options['tls.forceLocal'])) {
839
                    if(($message->capability & \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_SSL) !== 0) { // If SSL supported, connect through SSL
840
                        $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_SSL;
841
                        
842
                        $ssl = new \Plasma\Drivers\MySQL\Commands\SSLRequestCommand($message, $clientFlags);
843
                        
844
                        $ssl->once('end', function () use ($credentials, $clientFlags, $plugin, &$deferred, &$message) {
845
                            $this->connectionState = static::CONNECTION_SSL_STARTUP;
846
                            
847
                            $this->enableTLS()->then(function () use ($credentials, $clientFlags, $plugin, &$deferred, &$message) {
848
                                $this->createHandshakeResponse($message, $credentials, $clientFlags, $plugin, $deferred);
849
                            }, function (\Throwable $error) use (&$deferred) {
850
                                $deferred->reject($$error);
851
                                $this->connection->close();
852
                            });
853
                        });
854
                        
855
                        return $this->parser->invokeCommand($ssl);
856
                    } elseif($this->options['tls.force'] || $this->options['tls.forceLocal']) {
857
                        $deferred->reject((new \Plasma\Exception('TLS is not supported by the server')));
858
                        $this->connection->close();
859
                        return;
860
                    }
861
                }
862
                
863 62
                $this->createHandshakeResponse($message, $credentials, $clientFlags, $plugin, $deferred);
864
            }
865 62
        };
866
        
867 62
        $this->parser->on('message', $listener);
868
        
869
        $this->parser->on('message', function (\Plasma\Drivers\MySQL\Messages\MessageInterface $message) {
870 62
            if($message instanceof \Plasma\Drivers\MySQL\Messages\OkResponseMessage) {
871 61
                $this->connectionState = static::CONNECTION_OK;
872
            }
873
            
874 62
            $this->emit('eventRelay', array('message', $message));
875 62
        });
876 62
    }
877
    
878
    /**
879
     * Enables TLS on the connection.
880
     * @return \React\Promise\PromiseInterface
881
     */
882
    protected function enableTLS(): \React\Promise\PromiseInterface {
883
        // Set required SSL/TLS context options
884
        foreach($this->options['tls.context'] as $name => $value) {
885
            \stream_context_set_option($this->connection->stream, 'ssl', $name, $value);
886
        }
887
        
888
        return $this->encryption->enable($this->connection)->then(null, function (\Throwable $error) {
889
            $this->connection->close();
890
            throw new \RuntimeException('Connection failed during TLS handshake: '.$error->getMessage(), $error->getCode());
891
        });
892
    }
893
    
894
    /**
895
     * Sends the auth command.
896
     * @param \Plasma\Drivers\MySQL\Messages\HandshakeMessage  $message
897
     * @param array                                            $credentials
898
     * @param int                                              $clientFlags
899
     * @param string|null                                      $plugin
900
     * @param \React\Promise\Deferred                          $deferred
901
     * @return void
902
     */
903 62
    protected function createHandshakeResponse(
904
        \Plasma\Drivers\MySQL\Messages\HandshakeMessage $message, array $credentials, int $clientFlags, ?string $plugin, \React\Promise\Deferred $deferred
905
    ) {
906 62
        \extract($credentials);
907
        
908 62
        $auth = new \Plasma\Drivers\MySQL\Commands\HandshakeResponseCommand($this->parser, $message, $clientFlags, $plugin, $user, $password, $db);
909
        
910
        $auth->once('end', function () use (&$deferred) {
911
            $this->loop->futureTick(function () use (&$deferred) {
912 61
                $deferred->resolve();
913 61
            });
914 62
        });
915
        
916
        $auth->once('error', function (\Throwable $error) use (&$deferred) {
917 1
            $deferred->reject($error);
918 1
            $this->connection->close();
919 62
        });
920
        
921 62
        if($plugin) {
922
            $listener = function (\Plasma\Drivers\MySQL\Messages\MessageInterface $message) use ($password, &$deferred, &$listener) {
923
                /** @var \Plasma\Drivers\MySQL\AuthPlugins\AuthPluginInterface|null  $plugin */
924 62
                static $plugin;
925
                
926 62
                if($message instanceof \Plasma\Drivers\MySQL\Messages\AuthSwitchRequestMessage) {
927
                    $name = $message->authPluginName;
928
                    
929
                    if($name !== null) {
930
                        $plugins = \Plasma\Drivers\MySQL\DriverFactory::getAuthPlugins();
931
                        foreach($plugins as $key => $plug) {
932
                            if($key === $name) {
933
                                $plugin = new $plug($this->parser, $this->parser->getHandshakeMessage());
934
                                
935
                                $command = new \Plasma\Drivers\MySQL\Commands\AuthSwitchResponseCommand($message, $plugin, $password);
936
                                return $this->parser->invokeCommand($command);
937
                            }
938
                        }
939
                    }
940
                    
941
                    $deferred->reject((new \Plasma\Exception('Requested authentication method '.($name ? '"'.$name.'" ' : '').'is not supported')));
942 62
                } elseif($message instanceof \Plasma\Drivers\MySQL\Messages\AuthMoreDataMessage) {
943
                    if($plugin === null) {
944
                        $deferred->reject((new \Plasma\Exception('No auth plugin is in use, but we received auth more data packet')));
945
                        return $this->connection->close();
946
                    }
947
                    
948
                    try {
949
                        $command = $plugin->receiveMoreData($message);
950
                        return $this->parser->invokeCommand($command);
951
                    } catch (\Plasma\Exception $e) {
952
                        $deferred->reject($e);
953
                        $this->connection->close();
954
                    }
955 62
                } elseif($message instanceof \Plasma\Drivers\MySQL\Messages\OkResponseMessage) {
956 61
                    $this->parser->removeListener('message', $listener);
957
                }
958 62
            };
959
            
960 62
            $this->parser->on('message', $listener);
961
        }
962
        
963 62
        $this->parser->invokeCommand($auth);
964 62
        $this->connectionState = static::CONNECTION_AWAITING_RESPONSE;
965 62
    }
966
    
967
    /**
968
     * Get the real charset from the DB charset.
969
     * @param string  $charset
970
     * @return string
971
     */
972 4
    protected function getRealCharset(string $charset): string {
973 4
        if(\substr($charset, 0, 4) === 'utf8') {
974 2
            return 'UTF-8';
975
        }
976
        
977 2
        $charsets = \mb_list_encodings();
978
        
979 2
        foreach($charsets as $set) {
980 2
            if(\stripos($set, $charset) === 0) {
981 2
                return $set;
982
            }
983
        }
984
        
985 2
        return 'UTF-8';
986
    }
987
    
988
    /**
989
     * Validates the given options.
990
     * @param array  $options
991
     * @return void
992
     * @throws \InvalidArgumentException
993
     */
994 87
    protected function validateOptions(array $options) {
995 87
        \CharlotteDunois\Validation\Validator::make($options, array(
996 87
            'characters.set' => 'string',
997
            'characters.collate' => 'string',
998
            'compression.enable' => 'boolean',
999
            'connector' => 'class:'.\React\Socket\ConnectorInterface::class.'=object',
1000
            'packet.maxAllowedSize' => 'integer|min:0|max:'.\Plasma\Drivers\MySQL\ProtocolParser::CLIENT_MAX_PACKET_SIZE,
1001
            'tls.context' => 'array',
1002
            'tls.force' => 'boolean',
1003
            'tls.forceLocal' => 'boolean'
1004 87
        ), true)->throw(\InvalidArgumentException::class);
1005 87
    }
1006
}
1007