GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Test Failed
Push — master ( e440bf...9648ec )
by Charlotte
02:09
created

Driver::execute()   A

Complexity

Conditions 5
Paths 4

Size

Total Lines 29
Code Lines 18

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 12
CRAP Score 5.2

Importance

Changes 0
Metric Value
cc 5
eloc 18
nc 4
nop 3
dl 0
loc 29
ccs 12
cts 15
cp 0.8
crap 5.2
rs 9.3554
c 0
b 0
f 0
1
<?php
2
/**
3
 * Plasma Driver MySQL component
4
 * Copyright 2018-2019 PlasmaPHP, All Rights Reserved
5
 *
6
 * Website: https://github.com/PlasmaPHP
7
 * License: https://github.com/PlasmaPHP/driver-mysql/blob/master/LICENSE
8
*/
9
10
namespace Plasma\Drivers\MySQL;
11
12
/**
13
 * The MySQL Driver.
14
 * @internal
15
 */
16
class Driver implements \Plasma\DriverInterface {
17
    use \Evenement\EventEmitterTrait;
18
    
19
    /**
20
     * @var \React\EventLoop\LoopInterface
21
     */
22
    protected $loop;
23
    
24
    /**
25
     * @var array
26
     */
27
    protected $options = array(
28
        'characters.set' => 'utf8mb4',
29
        'characters.collate' => null,
30
        'compression.enable' => !true,
31
        'localInFile.enable' => false,
32
        'tls.context' => array(),
33
        'tls.force' => true,
34
        'tls.forceLocal' => false
35
    );
36
    
37
    /**
38
     * @var string[]
39
     */
40
    protected $allowedSchemes = array('mysql', 'tcp', 'tls', 'unix');
41
    
42
    /**
43
     * @var \React\Socket\ConnectorInterface
44
     */
45
    protected $connector;
46
    
47
    /**
48
     * Internal class is intentional used, as there's no other way currently.
49
     * @var \React\Socket\StreamEncryption
50
     * @see https://github.com/reactphp/socket/issues/180
51
     */
52
    protected $encryption;
53
    
54
    /**
55
     * @var \React\Promise\Promise|null
56
     */
57
    protected $connectPromise;
58
    
59
    /**
60
     * @var \React\Socket\Connection
61
     */
62
    protected $connection;
63
    
64
    /**
65
     * @var int
66
     */
67
    protected $connectionState = \Plasma\DriverInterface::CONNECTION_CLOSED;
68
    
69
    /**
70
     * @var \Plasma\Drivers\MySQL\ProtocolParser
71
     */
72
    protected $parser;
73
    
74
    /**
75
     * @var array
76
     */
77
    protected $queue;
78
    
79
    /**
80
     * @var int
81
     */
82
    protected $busy = \Plasma\DriverInterface::STATE_IDLE;
83
    
84
    /**
85
     * @var bool
86
     */
87
    protected $transaction = false;
88
    
89
    /**
90
     * @var \React\Promise\Deferred
91
     */
92
    protected $goingAway;
93
    
94
    /**
95
     * @var string|null
96
     */
97
    protected $charset;
98
    
99
    /**
100
     * @var bool|null
101
     */
102
    protected $cursorSupported;
103
    
104
    /**
105
     * Constructor.
106
     * @param \React\EventLoop\LoopInterface  $loop
107
     * @param array                           $options
108
     */
109 93
    function __construct(\React\EventLoop\LoopInterface $loop, array $options) {
110 93
        $this->validateOptions($options);
111
        
112 93
        $this->loop = $loop;
113 93
        $this->options = \array_merge($this->options, $options);
114
        
115 93
        $this->connector = ($options['connector'] ?? (new \React\Socket\Connector($loop)));
116 93
        $this->encryption = new \React\Socket\StreamEncryption($this->loop, false);
117 93
        $this->queue = array();
118 93
    }
119
    
120
    /**
121
     * Destructor.
122
     * @return void
123
     * @internal
124
     */
125 27
    function __destruct() {
126 27
        $this->close();
127 27
    }
128
    
129
    /**
130
     * Returns the event loop.
131
     * @return \React\EventLoop\LoopInterface
132
     */
133 61
    function getLoop(): \React\EventLoop\LoopInterface {
134 61
        return $this->loop;
135
    }
136
    
137
    /**
138
     * Retrieves the current connection state.
139
     * @return int
140
     */
141 12
    function getConnectionState(): int {
142 12
        return $this->connectionState;
143
    }
144
    
145
    /**
146
     * Retrieves the current busy state.
147
     * @return int
148
     */
149 1
    function getBusyState(): int {
150 1
        return $this->busy;
151
    }
152
    
153
    /**
154
     * Get the length of the driver backlog queue.
155
     * @return int
156
     */
157 1
    function getBacklogLength(): int {
158 1
        return \count($this->queue);
159
    }
160
    
161
    /**
162
     * Connects to the given URI.
163
     * @param string  $uri
164
     * @return \React\Promise\PromiseInterface
165
     * @throws \InvalidArgumentException
166
     */
167 69
    function connect(string $uri): \React\Promise\PromiseInterface {
168 69
        if($this->goingAway || $this->connectionState === \Plasma\DriverInterface::CONNECTION_UNUSABLE) {
169 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
170 68
        } elseif($this->connectionState === \Plasma\DriverInterface::CONNECTION_OK) {
171 1
            return \React\Promise\resolve();
172 68
        } elseif($this->connectPromise !== null) {
173 1
            return $this->connectPromise;
174
        }
175
        
176 68
        $pos = \strpos($uri, '://');
177
        
178 68
        if($pos === false) {
179 2
            $uri = 'tcp://'.$uri;
180 66
        } elseif(\substr($uri, 0, $pos) === 'unix') {
181 2
            $defaultSocket = '/tmp/mysql.sock';
182
            
183 2
            $apos = \strpos($uri, '@');
184 2
            $spos = \strrpos($uri, '/');
185
            
186 2
            if($apos === false) {
187 1
                throw new \InvalidArgumentException('Connecting without any username is not a valid operation');
188
            }
189
            
190 1
            $socket = \substr($uri, ($apos + 1), ($spos - ($apos + 1)));
191 1
            $uri = \substr($uri, 0, ($apos + 1)).'localhost'.\substr($uri, $spos);
192
            
193 1
            if($socket === 'localhost' || $socket === '127.0.0.1' || $socket === '::1') {
194
                $socket = $defaultSocket;
195
            }
196
        }
197
        
198 67
        $parts = \parse_url($uri);
199 67
        if(!isset($parts['scheme']) || !isset($parts['host']) || !\in_array($parts['scheme'], $this->allowedSchemes)) {
200 2
            throw new \InvalidArgumentException('Invalid connect uri given');
201
        }
202
        
203 65
        if(isset($socket)) {
204 1
            $parts['host'] = $socket;
205
        }
206
        
207 65
        if($parts['scheme'] === 'mysql') {
208 3
            $parts['scheme'] = 'tcp';
209
        }
210
        
211 65
        $host = $parts['scheme'].'://'.$parts['host'].($parts['scheme'] !== 'unix' ? ':'.($parts['port'] ?? 3306) : '');
212 65
        $this->connectionState = static::CONNECTION_STARTED;
213 65
        $resolved = false;
214
        
215
        $this->connectPromise = $this->connector->connect($host)->then(function (\React\Socket\ConnectionInterface $connection) use ($parts, &$resolved) {
216 63
            $this->connectPromise = null;
217
            
218
            // See description of property encryption
219 63
            if(!($connection instanceof \React\Socket\Connection)) {
220
                throw new \LogicException('Custom connection class is NOT supported yet (encryption limitation)');
221
            }
222
            
223 63
            $this->busy = static::STATE_BUSY;
224 63
            $this->connectionState = static::CONNECTION_MADE;
225 63
            $this->connection = $connection;
226
            
227
            $this->connection->on('error', function (\Throwable $error) {
228
                $this->emit('error', array($error));
229 63
            });
230
            
231
            $this->connection->on('close', function () {
232 3
                $this->connection = null;
233 3
                $this->connectionState = static::CONNECTION_UNUSABLE;
234
                
235 3
                $this->emit('close');
236 63
            });
237
            
238 63
            $deferred = new \React\Promise\Deferred();
239 63
            $this->parser = new \Plasma\Drivers\MySQL\ProtocolParser($this, $this->connection);
240
            
241
            $this->parser->on('error', function (\Throwable $error) use (&$deferred, &$resolved) {
242
                if($resolved) {
243
                    $this->emit('error', array($error));
244
                } else {
245
                    $deferred->reject($error);
246
                }
247 63
            });
248
            
249 63
            $user = ($parts['user'] ?? 'root');
250 63
            $password = ($parts['pass'] ?? '');
251 63
            $db = \ltrim(($parts['path'] ?? ''), '/');
252
            
253 63
            $credentials = \compact('user', 'password', 'db');
254
            
255 63
            $this->startHandshake($credentials, $deferred);
256
            return $deferred->promise()->then(function () use (&$resolved) {
257 62
                $this->busy = static::STATE_IDLE;
258 62
                $resolved = true;
259
                
260 62
                if(\count($this->queue) > 0) {
261
                    $this->parser->invokeCommand($this->getNextCommand());
262
                }
263 63
            });
264 65
        });
265
        
266 65
        if($this->options['characters.set']) {
267 63
            $this->charset = $this->options['characters.set'];
268
            $this->connectPromise = $this->connectPromise->then(function () {
269 60
                $query = 'SET NAMES "'.$this->options['characters.set'].'"'
270 60
                    .($this->options['characters.collate'] ? ' COLLATE "'.$this->options['characters.collate'].'"' : '');
271
                
272 60
                $cmd = new \Plasma\Drivers\MySQL\Commands\QueryCommand($this, $query);
273 60
                $this->executeCommand($cmd);
274
                
275 60
                return $cmd->getPromise();
276 63
            });
277
        }
278
        
279 65
        return $this->connectPromise;
280
    }
281
    
282
    /**
283
     * Pauses the underlying stream I/O consumption.
284
     * If consumption is already paused, this will do nothing.
285
     * @return bool  Whether the operation was successful.
286
     */
287 1
    function pauseStreamConsumption(): bool {
288 1
        if($this->connection === null || $this->goingAway) {
289 1
            return false;
290
        }
291
        
292
        $this->connection->pause();
293
        return true;
294
    }
295
    
296
    /**
297
     * Resumes the underlying stream I/O consumption.
298
     * If consumption is not paused, this will do nothing.
299
     * @return bool  Whether the operation was successful.
300
     */
301 1
    function resumeStreamConsumption(): bool {
302 1
        if($this->connection === null || $this->goingAway) {
303 1
            return false;
304
        }
305
        
306
        $this->connection->resume();
307
        return true;
308
    }
309
    
310
    /**
311
     * Closes all connections gracefully after processing all outstanding requests.
312
     * @return \React\Promise\PromiseInterface
313
     */
314 28
    function close(): \React\Promise\PromiseInterface {
315 28
        if($this->goingAway) {
316 13
            return $this->goingAway->promise();
317
        }
318
        
319 17
        $state = $this->connectionState;
320 17
        $this->connectionState = \Plasma\DriverInterface::CONNECTION_UNUSABLE;
321
        
322
        // Connection is still pending
323 17
        if($this->connectPromise !== null) {
324 1
            $this->connectPromise->cancel();
325
            
326
            /** @var \Plasma\Drivers\MySQL\Commands\CommandInterface  $command */
327 1
            while($command = \array_shift($this->queue)) {
328
                $command->emit('error', array((new \Plasma\Exception('Connection is going away'))));
329
            }
330
        }
331
        
332 17
        $this->goingAway = new \React\Promise\Deferred();
333
        
334 17
        if(\count($this->queue) === 0 || $state < \Plasma\DriverInterface::CONNECTION_OK) {
335 16
            $this->queue = array();
336 16
            $this->goingAway->resolve();
337
        }
338
        
339
        return $this->goingAway->promise()->then(function () use ($state) {
340 17
            if($state !== static::CONNECTION_OK) {
341 16
                return;
342 1
            } elseif(!$this->connection->isWritable()) {
343
                $this->connection->close();
344
                return;
345
            }
346
            
347 1
            $deferred = new \React\Promise\Deferred();
348
            
349 1
            $quit = new \Plasma\Drivers\MySQL\Commands\QuitCommand();
350
            
351
            $this->connection->once('close', function () use (&$deferred) {
352 1
                $deferred->resolve();
353 1
            });
354
            
355
            $quit->once('end', function () {
356
                $this->connection->close();
357 1
            });
358
            
359 1
            $this->parser->invokeCommand($quit);
360
            
361 1
            return $deferred->promise();
362 17
        });
363
    }
364
    
365
    /**
366
     * Forcefully closes the connection, without waiting for any outstanding requests. This will reject all outstanding requests.
367
     * @return void
368
     */
369 12
    function quit(): void {
370 12
        if($this->goingAway === null) {
371 12
            $this->goingAway = new \React\Promise\Deferred();
372
        }
373
        
374 12
        $state = $this->connectionState;
375 12
        $this->connectionState = \Plasma\DriverInterface::CONNECTION_UNUSABLE;
376
        
377
        /** @var \Plasma\Drivers\MySQL\Commands\CommandInterface  $command */
378 12
        while($command = \array_shift($this->queue)) {
379 1
            $command->emit('error', array((new \Plasma\Exception('Connection is going away'))));
380
        }
381
        
382 12
        if($this->connectPromise !== null) {
383 1
            $this->connectPromise->cancel();
384
        }
385
        
386 12
        if($state === static::CONNECTION_OK) {
387 1
            $quit = new \Plasma\Drivers\MySQL\Commands\QuitCommand();
388 1
            $this->parser->invokeCommand($quit);
389
            
390 1
            $this->connection->close();
391
        }
392
        
393 12
        $this->goingAway->resolve();
394 12
    }
395
    
396
    /**
397
     * Whether this driver is currently in a transaction.
398
     * @return bool
399
     */
400 2
    function isInTransaction(): bool {
401 2
        return $this->transaction;
402
    }
403
    
404
    /**
405
     * Executes a plain query. Resolves with a `QueryResultInterface` instance.
406
     * When the command is done, the driver must check itself back into the client.
407
     * @param \Plasma\ClientInterface  $client
408
     * @param string                   $query
409
     * @return \React\Promise\PromiseInterface
410
     * @throws \Plasma\Exception
411
     * @see \Plasma\QueryResultInterface
412
     */
413 48
    function query(\Plasma\ClientInterface $client, string $query): \React\Promise\PromiseInterface {
414 48
        if($this->goingAway) {
415 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
416 47
        } elseif($this->connectionState !== \Plasma\DriverInterface::CONNECTION_OK) {
417 1
            if($this->connectPromise !== null) {
418
                return $this->connectPromise->then(function () use (&$client, &$query) {
419
                    return $this->query($client, $query);
420
                });
421
            }
422
            
423 1
            throw new \Plasma\Exception('Unable to continue without connection');
424
        }
425
        
426 46
        $command = new \Plasma\Drivers\MySQL\Commands\QueryCommand($this, $query);
427 46
        $this->executeCommand($command);
428
        
429 46
        if(!$this->transaction) {
430
            $command->once('end', function () use (&$client) {
431 44
                $client->checkinConnection($this);
432 44
            });
433
        }
434
        
435 46
        return $command->getPromise();
436
    }
437
    
438
    /**
439
     * Prepares a query. Resolves with a `StatementInterface` instance.
440
     * When the command is done, the driver must check itself back into the client.
441
     * @param \Plasma\ClientInterface  $client
442
     * @param string                   $query
443
     * @return \React\Promise\PromiseInterface
444
     * @throws \Plasma\Exception
445
     * @see \Plasma\StatementInterface
446
     */
447 40
    function prepare(\Plasma\ClientInterface $client, string $query): \React\Promise\PromiseInterface {
448 40
        if($this->goingAway) {
449 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
450 39
        } elseif($this->connectionState !== \Plasma\DriverInterface::CONNECTION_OK) {
451 1
            if($this->connectPromise !== null) {
452
                return $this->connectPromise->then(function () use (&$client, &$query) {
453
                    return $this->prepare($client, $query);
454
                });
455
            }
456
            
457 1
            throw new \Plasma\Exception('Unable to continue without connection');
458
        }
459
        
460 38
        $command = new \Plasma\Drivers\MySQL\Commands\StatementPrepareCommand($client, $this, $query);
461 38
        $this->executeCommand($command);
462
        
463 38
        return $command->getPromise();
464
    }
465
    
466
    /**
467
     * Prepares and executes a query. Resolves with a `QueryResultInterface` instance.
468
     * This is equivalent to prepare -> execute -> close.
469
     * If you need to execute a query multiple times, prepare the query manually for performance reasons.
470
     * @param \Plasma\ClientInterface  $client
471
     * @param string                   $query
472
     * @param array                    $params
473
     * @return \React\Promise\PromiseInterface
474
     * @throws \Plasma\Exception
475
     * @see \Plasma\StatementInterface
476
     */
477 38
    function execute(\Plasma\ClientInterface $client, string $query, array $params = array()): \React\Promise\PromiseInterface {
478 38
        if($this->goingAway) {
479 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
480 37
        } elseif($this->connectionState !== \Plasma\DriverInterface::CONNECTION_OK) {
481 1
            if($this->connectPromise !== null) {
482
                return $this->connectPromise->then(function () use (&$client, &$query, $params) {
483
                    return $this->execute($client, $query, $params);
484
                });
485
            }
486
            
487 1
            throw new \Plasma\Exception('Unable to continue without connection');
488
        }
489
        
490
        return $this->prepare($client, $query)->then(function (\Plasma\StatementInterface $statement) use ($params) {
491
            return $statement->execute($params)->then(function (\Plasma\QueryResultInterface $result) use (&$statement) {
492 35
                if($result instanceof \Plasma\StreamQueryResultInterface) {
493
                    $statement->close()->then(null, function (\Throwable $error) {
494
                        $this->emit('error', array($error));
495 35
                    });
496
                    
497 35
                    return $result;
498
                }
499
                
500
                return $statement->close()->then(function () use ($result) {
501 33
                    return $result;
502 33
                });
503
            }, function (\Throwable $error) use (&$statement) {
504
                return $statement->close()->then(function () use ($error) {
505 1
                    throw $error;
506 1
                });
507 36
            });
508 36
        });
509
    }
510
    
511
    /**
512
     * Quotes the string for use in the query.
513
     * @param string  $str
514
     * @param int     $type  For types, see the constants.
515
     * @return string
516
     * @throws \LogicException  Thrown if the driver does not support quoting.
517
     * @throws \Plasma\Exception
518
     */
519 5
    function quote(string $str, int $type = \Plasma\DriverInterface::QUOTE_TYPE_VALUE): string {
520 5
        if($this->parser === null) {
521 1
            throw new \Plasma\Exception('Unable to continue without connection');
522
        }
523
        
524 4
        $message = $this->parser->getLastOkMessage();
525 4
        if($message === null) {
526
            $message = $this->parser->getHandshakeMessage();
527
            
528
            if($message === null) {
529
                throw new \Plasma\Exception('Unable to quote without a previous handshake');
530
            }
531
        }
532
        
533 4
        $pos = \strpos($this->charset, '_');
534 4
        $dbCharset = \substr($this->charset, 0, ($pos !== false ? $pos : \strlen($this->charset)));
535 4
        $realCharset = $this->getRealCharset($dbCharset);
536
        
537 4
        if(($message->statusFlags & \Plasma\Drivers\MySQL\StatusFlags::SERVER_STATUS_NO_BACKSLASH_ESCAPES) !== 0) {
538
            return $this->escapeUsingQuotes($realCharset, $str, $type);
539
        }
540
        
541 4
        return $this->escapeUsingBackslashes($realCharset, $str, $type);
542
    }
543
    
544
    /**
545
     * Escapes using quotes.
546
     * @param string  $realCharset
547
     * @param string  $str
548
     * @param int     $type
549
     * @return string
550
     */
551 2
    function escapeUsingQuotes(string $realCharset, string $str, int $type): string {
552 2
        if($type === \Plasma\DriverInterface::QUOTE_TYPE_IDENTIFIER) {
553 1
            return '`'.\str_replace('`', '``', $str).'`';
554
        }
555
        
556
        $escapeChars = array(
557 1
            '"',
558
            '\\',
559
        );
560
        
561
        $escapeReplace = array(
562 1
            '""',
563
            '\\\\',
564
        );
565
        
566 1
        return '"'.\str_replace($escapeChars, $escapeReplace, $str).'"';
567
    }
568
    
569
    /**
570
     * Escapes using backslashes.
571
     * @param string  $realCharset
572
     * @param string  $str
573
     * @param int     $type
574
     * @return string
575
     */
576 6
    function escapeUsingBackslashes(string $realCharset, string $str, int $type): string {
577 6
        if($type === \Plasma\DriverInterface::QUOTE_TYPE_IDENTIFIER) {
578 3
            return '`'.\str_replace('`', '\\`', $str).'`';
579
        }
580
        
581
        $escapeChars = array(
582 3
            '\\',
583
            '"',
584
        );
585
        
586
        $escapeReplace = array(
587 3
            '\\\\',
588
            '\"',
589
        );
590
        
591 3
        return '"'.\str_replace($escapeChars, $escapeReplace, $str).'"';
592
    }
593
    
594
    /**
595
     * Begins a transaction. Resolves with a `TransactionInterface` instance.
596
     *
597
     * Checks out a connection until the transaction gets committed or rolled back.
598
     * It must be noted that the user is responsible for finishing the transaction. The client WILL NOT automatically
599
     * check the connection back into the pool, as long as the transaction is not finished.
600
     *
601
     * Some databases, including MySQL, automatically issue an implicit COMMIT when a database definition language (DDL)
602
     * statement such as DROP TABLE or CREATE TABLE is issued within a transaction.
603
     * The implicit COMMIT will prevent you from rolling back any other changes within the transaction boundary.
604
     * @param \Plasma\ClientInterface  $client
605
     * @param int                      $isolation  See the `TransactionInterface` constants.
606
     * @return \React\Promise\PromiseInterface
607
     * @throws \Plasma\Exception
608
     * @see \Plasma\TransactionInterface
609
     */
610 3
    function beginTransaction(\Plasma\ClientInterface $client, int $isolation = \Plasma\TransactionInterface::ISOLATION_COMMITTED): \React\Promise\PromiseInterface {
611 3
        if($this->goingAway) {
612 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
613
        }
614
        
615 2
        if($this->transaction) {
616 1
            throw new \Plasma\Exception('Driver is already in transaction');
617
        }
618
        
619 2
        $this->transaction = true;
620
        
621
        switch ($isolation) {
622 2
            case \Plasma\TransactionInterface::ISOLATION_NO_CHANGE:
623
                return $this->query($client, 'START TRANSACTION')->then(function () use (&$client, $isolation) {
624
                    return (new \Plasma\Transaction($client, $this, $isolation));
625
                })->then(null, function (\Throwable $e) {
626
                    $this->transaction = false;
627
                    throw $e;
628
                });
629
            break;
630 2
            case \Plasma\TransactionInterface::ISOLATION_UNCOMMITTED:
631
                $query = 'SET TRANSACTION ISOLATION LEVEL READ UNCOMMITTED';
632
            break;
633 2
            case \Plasma\TransactionInterface::ISOLATION_COMMITTED:
634 2
                $query = 'SET TRANSACTION ISOLATION LEVEL READ COMMITTED';
635 2
            break;
636
            case \Plasma\TransactionInterface::ISOLATION_REPEATABLE:
637
                $query = 'SET TRANSACTION ISOLATION LEVEL REPEATABLE READ';
638
            break;
639
            case \Plasma\TransactionInterface::ISOLATION_SERIALIZABLE:
640
                $query = 'SET TRANSACTION ISOLATION LEVEL SERIALIZABLE';
641
            break;
642
            default:
643
                throw new \Plasma\Exception('Invalid isolation level given');
644
            break;
645
        }
646
        
647
        return $this->query($client, $query)->then(function () use (&$client, $isolation) {
648
            return $this->query($client, 'START TRANSACTION')->then(function () use (&$client, $isolation) {
649 2
                return (new \Plasma\Transaction($client, $this, $isolation));
650 2
            });
651
        })->then(null, function (\Throwable $e) {
652
            $this->transaction = false;
653
            throw $e;
654 2
        });
655
    }
656
    
657
    /**
658
     * Informationally closes a transaction. This method is used by `Transaction` to inform the driver of the end of the transaction.
659
     * @return void
660
     */
661 1
    function endTransaction(): void {
662 1
        $this->transaction = false;
663 1
    }
664
    
665
    /**
666
     * Runs the given command.
667
     * Returns a Promise, which resolves with the `end` event argument (defaults to `null),
668
     * or rejects with the `Throwable` of the `error` event.
669
     * When the command is done, the driver must check itself back into the client.
670
     * @param \Plasma\ClientInterface   $client
671
     * @param \Plasma\CommandInterface  $command
672
     * @return \React\Promise\PromiseInterface
673
     */
674 6
    function runCommand(\Plasma\ClientInterface $client, \Plasma\CommandInterface $command) {
675 6
        if($this->goingAway) {
676 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
677
        }
678
        
679
        return (new \React\Promise\Promise(function (callable $resolve, callable $reject) use (&$client, &$command) {
680
            $command->once('end', function ($value = null) use (&$client, &$resolve) {
681 3
                if(!$this->transaction) {
682 3
                    $client->checkinConnection($this);
683
                }
684
                
685 3
                $resolve($value);
686 5
            });
687
            
688
            $command->once('error', function (\Throwable $error) use (&$client, &$reject) {
689 2
                if(!$this->transaction) {
690 2
                    $client->checkinConnection($this);
691
                }
692
                
693 2
                $reject($error);
694 5
            });
695
            
696 5
            $this->executeCommand($command);
697 5
        }));
698
    }
699
    
700
    /**
701
     * Runs the given SQL querybuilder.
702
     * The driver CAN throw an exception if the given querybuilder is not supported.
703
     * An example would be a SQL querybuilder and a Cassandra driver.
704
     * @param \Plasma\ClientInterface           $client
705
     * @param \Plasma\SQLQueryBuilderInterface  $query
706
     * @return \React\Promise\PromiseInterface
707
     * @throws \Plasma\Exception
708
     */
709 3
    function runQuery(\Plasma\ClientInterface $client, \Plasma\QueryBuilderInterface $query): \React\Promise\PromiseInterface {
710 3
        if($this->goingAway) {
711 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
712
        }
713
        
714 2
        if(!($query instanceof \Plasma\SQLQueryBuilderInterface)) {
715 1
            throw new \Plasma\Exception('Given querybuilder must be a SQL querybuilder');
716
        }
717
        
718 1
        $sql = $query->getQuery();
719 1
        $params = $query->getParameters();
720
        
721 1
        return $this->execute($client, $sql, $params);
722
    }
723
    
724
    /**
725
     * Creates a new cursor to seek through SELECT query results. Resolves with a `CursorInterface` instance.
726
     * @param \Plasma\ClientInterface  $client
727
     * @param string                   $query
728
     * @param array                    $params
729
     * @return \React\Promise\PromiseInterface
730
     * @throws \LogicException  Thrown if the driver or DBMS does not support cursors.
731
     * @throws \Plasma\Exception
732
     */
733 1
    function createReadCursor(\Plasma\ClientInterface $client, string $query, array $params = array()): \React\Promise\PromiseInterface {
734 1
        if($this->goingAway) {
735
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
736 1
        } elseif(!$this->supportsCursors()) {
737
            throw new \LogicException('Used DBMS version does not support cursors');
738
        }
739
        
740
        return $this->prepare($client, $query)->then(function (\Plasma\Drivers\MySQL\Statement $statement) use ($params) {
741 1
            if(!$this->supportsCursors()) {
742
                $statement->close()->then(null, function () {
743
                    $this->close();
744
                });
745
                
746
                throw new \LogicException('Used DBMS version does not support cursors');
747
            }
748
            
749 1
            return $statement->createReadCursor($params);
750 1
        });
751
    }
752
    
753
    /**
754
     * Executes a command.
755
     * @param \Plasma\CommandInterface  $command
756
     * @return void
757
     * @internal
758
     */
759 62
    function executeCommand(\Plasma\CommandInterface $command): void {
760 62
        $this->queue[] = $command;
761
        
762 62
        if($this->parser && $this->busy === static::STATE_IDLE) {
763 61
            $this->parser->invokeCommand($this->getNextCommand());
764
        }
765 62
    }
766
    
767
    /**
768
     * Get the handshake message, or null if none received yet.
769
     * @return \Plasma\Drivers\MySQL\Messages\HandshakeMessage|null
770
     */
771 62
    function getHandshake(): ?\Plasma\Drivers\MySQL\Messages\HandshakeMessage {
772 62
        if($this->parser) {
773 62
            return $this->parser->getHandshakeMessage();
774
        }
775
        
776
        return null;
777
    }
778
    
779
    /**
780
     * Get the next command, or null.
781
     * @return \Plasma\CommandInterface|null
782
     * @internal
783
     */
784 61
    function getNextCommand(): ?\Plasma\CommandInterface {
785 61
        if(\count($this->queue) === 0) {
786 60
            if($this->goingAway) {
787
                $this->goingAway->resolve();
788
            }
789
            
790 60
            return null;
791 61
        } elseif($this->busy === static::STATE_BUSY) {
792
            return null;
793
        }
794
        
795
        /** @var \Plasma\CommandInterface  $command */
796 61
        $command =  \array_shift($this->queue);
797
        
798 61
        if($command->waitForCompletion()) {
799 61
            $this->busy = static::STATE_BUSY;
800
            
801
            $command->once('error', function () use (&$command) {
802 4
                $this->busy = static::STATE_IDLE;
803
                
804 4
                $this->endCommand();
805 61
            });
806
            
807
            $command->once('end', function () use (&$command) {
808 60
                $this->busy = static::STATE_IDLE;
809
                
810 60
                $this->endCommand();
811 61
            });
812
        } else {
813 37
            $this->endCommand();
814
        }
815
        
816 61
        return $command;
817
    }
818
    
819
    /**
820
     * Get the driver options.
821
     * @return array
822
     * @internal
823
     */
824
    function getOptions(): array {
825
        return $this->options;
826
    }
827
    
828
    /**
829
     * Whether the DBMS supports cursors.
830
     * @return bool
831
     * @internal
832
     */
833 1
    function supportsCursors(): bool {
834 1
        if($this->getHandshake() === null) {
835
            return true; // Let's be optimistic
836 1
        } elseif($this->cursorSupported !== null) {
837 1
            return $this->cursorSupported;
838
        }
839
        
840 1
        $version = $this->getHandshake()->serverVersion;
841 1
        $mariaDB = (int) (\stripos($version, 'MariaDB') !== false);
842 1
        $version = \explode('-', $version)[$mariaDB];
843
    
844 1
        $this->cursorSupported = (
845 1
            (($this->getHandshake()->capability & \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_PS_MULTI_RESULTS) > 0) &&
846
            (
847 1
                ($mariaDB && \version_compare($version, '10.3', '>=')) ||
848 1
                (!$mariaDB && \version_compare($version, '5.7', '>='))
849
            )
850
        );
851
        
852 1
        return $this->cursorSupported;
853
    }
854
    
855
    /**
856
     * Finishes up a command.
857
     * @return void
858
     */
859 61
    protected function endCommand() {
860
        $this->loop->futureTick(function () {
861 60
            if($this->goingAway && \count($this->queue) === 0) {
862 1
                return $this->goingAway->resolve();
863
            }
864
            
865 60
            $this->parser->invokeCommand($this->getNextCommand());
866 61
        });
867 61
    }
868
    
869
    /**
870
     * Starts the handshake process.
871
     * @param array                    $credentials
872
     * @param \React\Promise\Deferred  $deferred
873
     * @return void
874
     */
875 63
    protected function startHandshake(array $credentials, \React\Promise\Deferred $deferred) {
876
        $listener = function (\Plasma\Drivers\MySQL\Messages\MessageInterface $message) use ($credentials, &$deferred, &$listener) {
877 63
            if($message instanceof \Plasma\Drivers\MySQL\Messages\HandshakeMessage) {
878 63
                $this->parser->removeListener('message', $listener);
879
                
880 63
                $this->connectionState = static::CONNECTION_SETENV;
881 63
                $clientFlags = \Plasma\Drivers\MySQL\ProtocolParser::CLIENT_CAPABILITIES;
882
                
883 63
                \extract($credentials);
884
                
885 63
                if($db !== '') {
886 47
                    $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_CONNECT_WITH_DB;
887
                }
888
                
889 63
                if($this->charset === null) {
890 2
                    $this->charset = \Plasma\Drivers\MySQL\CharacterSetFlags::CHARSET_MAP[$message->characterSet] ?? 'latin1';
891
                }
892
                
893 63
                if(($message->capability & \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_COMPRESS) !== 0 && \extension_loaded('zlib') && $this->options['compression.enable']) {
894 2
                    $this->parser->enableCompression();
895 2
                    $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_COMPRESS;
896
                }
897
                
898
                // Check if we support auth plugins
899 63
                $plugins = \Plasma\Drivers\MySQL\DriverFactory::getAuthPlugins();
900 63
                $plugin = null;
901
                
902 63
                foreach($plugins as $key => $plug) {
903 63
                    if(\is_int($key) && ($message->capability & $key) !== 0) {
904 63
                        $plugin = $plug;
905 63
                        $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_PLUGIN_AUTH;
906 63
                        break;
907
                    } elseif($key === $message->authPluginName) {
908
                        $plugin = $plug;
909
                        $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_PLUGIN_AUTH;
910
                        break;
911
                    }
912
                }
913
                
914 63
                $remote = \parse_url($this->connection->getRemoteAddress());
915
                
916 63
                if($remote !== false && ($remote['host'] !== '127.0.0.1' || $this->options['tls.forceLocal'])) {
917
                    if(($message->capability & \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_SSL) !== 0) { // If SSL supported, connect through SSL
918
                        $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_SSL;
919
                        
920
                        $ssl = new \Plasma\Drivers\MySQL\Commands\SSLRequestCommand($message, $clientFlags);
921
                        
922
                        $ssl->once('end', function () use ($credentials, $clientFlags, $plugin, &$deferred, &$message) {
923
                            $this->connectionState = static::CONNECTION_SSL_STARTUP;
924
                            
925
                            $this->enableTLS()->then(function () use ($credentials, $clientFlags, $plugin, &$deferred, &$message) {
926
                                $this->createHandshakeResponse($message, $credentials, $clientFlags, $plugin, $deferred);
927
                            }, function (\Throwable $error) use (&$deferred) {
928
                                $deferred->reject($$error);
929
                                $this->connection->close();
930
                            });
931
                        });
932
                        
933
                        return $this->parser->invokeCommand($ssl);
934
                    } elseif($this->options['tls.force'] || $this->options['tls.forceLocal']) {
935
                        $deferred->reject((new \Plasma\Exception('TLS is not supported by the server')));
936
                        $this->connection->close();
937
                        return;
938
                    }
939
                }
940
                
941 63
                $this->createHandshakeResponse($message, $credentials, $clientFlags, $plugin, $deferred);
942
            }
943 63
        };
944
        
945 63
        $this->parser->on('message', $listener);
946
        
947
        $this->parser->on('message', function (\Plasma\Drivers\MySQL\Messages\MessageInterface $message) {
948 63
            if($message instanceof \Plasma\Drivers\MySQL\Messages\OkResponseMessage) {
949 62
                $this->connectionState = static::CONNECTION_OK;
950
            }
951
            
952 63
            $this->emit('eventRelay', array('message', $message));
953 63
        });
954 63
    }
955
    
956
    /**
957
     * Enables TLS on the connection.
958
     * @return \React\Promise\PromiseInterface
959
     */
960
    protected function enableTLS(): \React\Promise\PromiseInterface {
961
        // Set required SSL/TLS context options
962
        foreach($this->options['tls.context'] as $name => $value) {
963
            \stream_context_set_option($this->connection->stream, 'ssl', $name, $value);
964
        }
965
        
966
        return $this->encryption->enable($this->connection)->then(null, function (\Throwable $error) {
967
            $this->connection->close();
968
            throw new \RuntimeException('Connection failed during TLS handshake: '.$error->getMessage(), $error->getCode());
969
        });
970
    }
971
    
972
    /**
973
     * Sends the auth command.
974
     * @param \Plasma\Drivers\MySQL\Messages\HandshakeMessage  $message
975
     * @param array                                            $credentials
976
     * @param int                                              $clientFlags
977
     * @param string|null                                      $plugin
978
     * @param \React\Promise\Deferred                          $deferred
979
     * @return void
980
     */
981 63
    protected function createHandshakeResponse(
982
        \Plasma\Drivers\MySQL\Messages\HandshakeMessage $message, array $credentials, int $clientFlags, ?string $plugin, \React\Promise\Deferred $deferred
983
    ) {
984 63
        \extract($credentials);
985
        
986 63
        $auth = new \Plasma\Drivers\MySQL\Commands\HandshakeResponseCommand($this->parser, $message, $clientFlags, $plugin, $user, $password, $db);
987
        
988
        $auth->once('end', function () use (&$deferred) {
989
            $this->loop->futureTick(function () use (&$deferred) {
990 62
                $deferred->resolve();
991 62
            });
992 63
        });
993
        
994
        $auth->once('error', function (\Throwable $error) use (&$deferred) {
995 1
            $deferred->reject($error);
996 1
            $this->connection->close();
997 63
        });
998
        
999 63
        if($plugin) {
1000
            $listener = function (\Plasma\Drivers\MySQL\Messages\MessageInterface $message) use ($password, &$deferred, &$listener) {
1001
                /** @var \Plasma\Drivers\MySQL\AuthPlugins\AuthPluginInterface|null  $plugin */
1002 63
                static $plugin;
1003
                
1004 63
                if($message instanceof \Plasma\Drivers\MySQL\Messages\AuthSwitchRequestMessage) {
1005
                    $name = $message->authPluginName;
1006
                    
1007
                    if($name !== null) {
1008
                        $plugins = \Plasma\Drivers\MySQL\DriverFactory::getAuthPlugins();
1009
                        foreach($plugins as $key => $plug) {
1010
                            if($key === $name) {
1011
                                $plugin = new $plug($this->parser, $this->parser->getHandshakeMessage());
1012
                                
1013
                                $command = new \Plasma\Drivers\MySQL\Commands\AuthSwitchResponseCommand($message, $plugin, $password);
1014
                                return $this->parser->invokeCommand($command);
1015
                            }
1016
                        }
1017
                    }
1018
                    
1019
                    $deferred->reject((new \Plasma\Exception('Requested authentication method '.($name ? '"'.$name.'" ' : '').'is not supported')));
1020 63
                } elseif($message instanceof \Plasma\Drivers\MySQL\Messages\AuthMoreDataMessage) {
1021
                    if($plugin === null) {
1022
                        $deferred->reject((new \Plasma\Exception('No auth plugin is in use, but we received auth more data packet')));
1023
                        return $this->connection->close();
1024
                    }
1025
                    
1026
                    try {
1027
                        $command = $plugin->receiveMoreData($message);
1028
                        return $this->parser->invokeCommand($command);
1029
                    } catch (\Plasma\Exception $e) {
1030
                        $deferred->reject($e);
1031
                        $this->connection->close();
1032
                    }
1033 63
                } elseif($message instanceof \Plasma\Drivers\MySQL\Messages\OkResponseMessage) {
1034 62
                    $this->parser->removeListener('message', $listener);
1035
                }
1036 63
            };
1037
            
1038 63
            $this->parser->on('message', $listener);
1039
        }
1040
        
1041 63
        $this->parser->invokeCommand($auth);
1042 63
        $this->connectionState = static::CONNECTION_AWAITING_RESPONSE;
1043 63
    }
1044
    
1045
    /**
1046
     * Get the real charset from the DB charset.
1047
     * @param string  $charset
1048
     * @return string
1049
     */
1050 4
    protected function getRealCharset(string $charset): string {
1051 4
        if(\substr($charset, 0, 4) === 'utf8') {
1052 2
            return 'UTF-8';
1053
        }
1054
        
1055 2
        $charsets = \mb_list_encodings();
1056
        
1057 2
        foreach($charsets as $set) {
1058 2
            if(\stripos($set, $charset) === 0) {
1059 2
                return $set;
1060
            }
1061
        }
1062
        
1063 2
        return 'UTF-8';
1064
    }
1065
    
1066
    /**
1067
     * Validates the given options.
1068
     * @param array  $options
1069
     * @return void
1070
     * @throws \InvalidArgumentException
1071
     */
1072 93
    protected function validateOptions(array $options) {
1073 93
        \CharlotteDunois\Validation\Validator::make($options, array(
1074 93
            'characters.set' => 'string',
1075
            'characters.collate' => 'string',
1076
            'compression.enable' => 'boolean',
1077
            'connector' => 'class:'.\React\Socket\ConnectorInterface::class.'=object',
1078
            'packet.maxAllowedSize' => 'integer|min:0|max:'.\Plasma\Drivers\MySQL\ProtocolParser::CLIENT_MAX_PACKET_SIZE,
1079
            'tls.context' => 'array',
1080
            'tls.force' => 'boolean',
1081
            'tls.forceLocal' => 'boolean'
1082 93
        ), true)->throw(\InvalidArgumentException::class);
1083 93
    }
1084
}
1085