GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Passed
Pull Request — master (#25)
by Charlotte
02:17
created

Driver::beginTransaction()   B

Complexity

Conditions 8
Paths 8

Size

Total Lines 44
Code Lines 34

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 13
CRAP Score 17.8388

Importance

Changes 0
Metric Value
cc 8
eloc 34
nc 8
nop 2
dl 0
loc 44
ccs 13
cts 28
cp 0.4643
crap 17.8388
rs 8.1315
c 0
b 0
f 0
1
<?php
2
/**
3
 * Plasma Driver MySQL component
4
 * Copyright 2018-2019 PlasmaPHP, All Rights Reserved
5
 *
6
 * Website: https://github.com/PlasmaPHP
7
 * License: https://github.com/PlasmaPHP/driver-mysql/blob/master/LICENSE
8
*/
9
10
namespace Plasma\Drivers\MySQL;
11
12
/**
13
 * The MySQL Driver.
14
 * @internal
15
 */
16
class Driver implements \Plasma\DriverInterface {
17
    use \Evenement\EventEmitterTrait;
18
    
19
    /**
20
     * @var \React\EventLoop\LoopInterface
21
     */
22
    protected $loop;
23
    
24
    /**
25
     * @var array
26
     */
27
    protected $options = array(
28
        'characters.set' => 'utf8mb4',
29
        'characters.collate' => null,
30
        'compression.enable' => !true,
31
        'localInFile.enable' => false,
32
        'tls.context' => array(),
33
        'tls.force' => true,
34
        'tls.forceLocal' => false
35
    );
36
    
37
    /**
38
     * @var string[]
39
     */
40
    protected $allowedSchemes = array('mysql', 'tcp', 'tls', 'unix');
41
    
42
    /**
43
     * @var \React\Socket\ConnectorInterface
44
     */
45
    protected $connector;
46
    
47
    /**
48
     * Internal class is intentional used, as there's no other way currently.
49
     * @var \React\Socket\StreamEncryption
50
     * @see https://github.com/reactphp/socket/issues/180
51
     */
52
    protected $encryption;
53
    
54
    /**
55
     * @var \React\Promise\Promise|null
56
     */
57
    protected $connectPromise;
58
    
59
    /**
60
     * @var \React\Socket\Connection
61
     */
62
    protected $connection;
63
    
64
    /**
65
     * @var int
66
     */
67
    protected $connectionState = \Plasma\DriverInterface::CONNECTION_CLOSED;
68
    
69
    /**
70
     * @var \Plasma\Drivers\MySQL\ProtocolParser
71
     */
72
    protected $parser;
73
    
74
    /**
75
     * @var array
76
     */
77
    protected $queue;
78
    
79
    /**
80
     * @var int
81
     */
82
    protected $busy = \Plasma\DriverInterface::STATE_IDLE;
83
    
84
    /**
85
     * @var bool
86
     */
87
    protected $transaction = false;
88
    
89
    /**
90
     * @var \React\Promise\Deferred
91
     */
92
    protected $goingAway;
93
    
94
    /**
95
     * @var string|null
96
     */
97
    protected $charset;
98
    
99
    /**
100
     * Constructor.
101
     * @param \React\EventLoop\LoopInterface  $loop
102
     * @param array                           $options
103
     */
104 93
    function __construct(\React\EventLoop\LoopInterface $loop, array $options) {
105 93
        $this->validateOptions($options);
106
        
107 93
        $this->loop = $loop;
108 93
        $this->options = \array_merge($this->options, $options);
109
        
110 93
        $this->connector = ($options['connector'] ?? (new \React\Socket\Connector($loop)));
111 93
        $this->encryption = new \React\Socket\StreamEncryption($this->loop, false);
112 93
        $this->queue = array();
113 93
    }
114
    
115
    /**
116
     * Destructor.
117
     * @return void
118
     * @internal
119
     */
120 27
    function __destruct() {
121 27
        $this->close();
122 27
    }
123
    
124
    /**
125
     * Returns the event loop.
126
     * @return \React\EventLoop\LoopInterface
127
     */
128 61
    function getLoop(): \React\EventLoop\LoopInterface {
129 61
        return $this->loop;
130
    }
131
    
132
    /**
133
     * Retrieves the current connection state.
134
     * @return int
135
     */
136 12
    function getConnectionState(): int {
137 12
        return $this->connectionState;
138
    }
139
    
140
    /**
141
     * Retrieves the current busy state.
142
     * @return int
143
     */
144 1
    function getBusyState(): int {
145 1
        return $this->busy;
146
    }
147
    
148
    /**
149
     * Get the length of the driver backlog queue.
150
     * @return int
151
     */
152 1
    function getBacklogLength(): int {
153 1
        return \count($this->queue);
154
    }
155
    
156
    /**
157
     * Connects to the given URI.
158
     * @param string  $uri
159
     * @return \React\Promise\PromiseInterface
160
     * @throws \InvalidArgumentException
161
     */
162 69
    function connect(string $uri): \React\Promise\PromiseInterface {
163 69
        if($this->goingAway || $this->connectionState === \Plasma\DriverInterface::CONNECTION_UNUSABLE) {
164 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
165 68
        } elseif($this->connectionState === \Plasma\DriverInterface::CONNECTION_OK) {
166 1
            return \React\Promise\resolve();
167 68
        } elseif($this->connectPromise !== null) {
168 1
            return $this->connectPromise;
169
        }
170
        
171 68
        $pos = \strpos($uri, '://');
172
        
173 68
        if($pos === false) {
174 2
            $uri = 'tcp://'.$uri;
175 66
        } elseif(\substr($uri, 0, $pos) === 'unix') {
176 2
            $defaultSocket = '/tmp/mysql.sock';
177
            
178 2
            $apos = \strpos($uri, '@');
179 2
            $spos = \strrpos($uri, '/');
180
            
181 2
            if($apos === false) {
182 1
                throw new \InvalidArgumentException('Connecting without any username is not a valid operation');
183
            }
184
            
185 1
            $socket = \substr($uri, ($apos + 1), ($spos - ($apos + 1)));
186 1
            $uri = \substr($uri, 0, ($apos + 1)).'localhost'.\substr($uri, $spos);
187
            
188 1
            if($socket === 'localhost' || $socket === '127.0.0.1' || $socket === '::1') {
189
                $socket = $defaultSocket;
190
            }
191
        }
192
        
193 67
        $parts = \parse_url($uri);
194 67
        if(!isset($parts['scheme']) || !isset($parts['host']) || !\in_array($parts['scheme'], $this->allowedSchemes)) {
195 2
            throw new \InvalidArgumentException('Invalid connect uri given');
196
        }
197
        
198 65
        if(isset($socket)) {
199 1
            $parts['host'] = $socket;
200
        }
201
        
202 65
        if($parts['scheme'] === 'mysql') {
203 3
            $parts['scheme'] = 'tcp';
204
        }
205
        
206 65
        $host = $parts['scheme'].'://'.$parts['host'].($parts['scheme'] !== 'unix' ? ':'.($parts['port'] ?? 3306) : '');
207 65
        $this->connectionState = static::CONNECTION_STARTED;
208 65
        $resolved = false;
209
        
210
        $this->connectPromise = $this->connector->connect($host)->then(function (\React\Socket\ConnectionInterface $connection) use ($parts, &$resolved) {
211 63
            $this->connectPromise = null;
212
            
213
            // See description of property encryption
214 63
            if(!($connection instanceof \React\Socket\Connection)) {
215
                throw new \LogicException('Custom connection class is NOT supported yet (encryption limitation)');
216
            }
217
            
218 63
            $this->busy = static::STATE_BUSY;
219 63
            $this->connectionState = static::CONNECTION_MADE;
220 63
            $this->connection = $connection;
221
            
222
            $this->connection->on('error', function (\Throwable $error) {
223
                $this->emit('error', array($error));
224 63
            });
225
            
226
            $this->connection->on('close', function () {
227 3
                $this->connection = null;
228 3
                $this->connectionState = static::CONNECTION_UNUSABLE;
229
                
230 3
                $this->emit('close');
231 63
            });
232
            
233 63
            $deferred = new \React\Promise\Deferred();
234 63
            $this->parser = new \Plasma\Drivers\MySQL\ProtocolParser($this, $this->connection);
235
            
236
            $this->parser->on('error', function (\Throwable $error) use (&$deferred, &$resolved) {
237
                if($resolved) {
238
                    $this->emit('error', array($error));
239
                } else {
240
                    $deferred->reject($error);
241
                }
242 63
            });
243
            
244 63
            $user = ($parts['user'] ?? 'root');
245 63
            $password = ($parts['pass'] ?? '');
246 63
            $db = \ltrim(($parts['path'] ?? ''), '/');
247
            
248 63
            $credentials = \compact('user', 'password', 'db');
249
            
250 63
            $this->startHandshake($credentials, $deferred);
251
            return $deferred->promise()->then(function () use (&$resolved) {
252 62
                $this->busy = static::STATE_IDLE;
253 62
                $resolved = true;
254
                
255 62
                if(\count($this->queue) > 0) {
256
                    $this->parser->invokeCommand($this->getNextCommand());
257
                }
258 63
            });
259 65
        });
260
        
261 65
        if($this->options['characters.set']) {
262 63
            $this->charset = $this->options['characters.set'];
263
            $this->connectPromise = $this->connectPromise->then(function () {
264 60
                $query = 'SET NAMES "'.$this->options['characters.set'].'"'
265 60
                    .($this->options['characters.collate'] ? ' COLLATE "'.$this->options['characters.collate'].'"' : '');
266
                
267 60
                $cmd = new \Plasma\Drivers\MySQL\Commands\QueryCommand($this, $query);
268 60
                $this->executeCommand($cmd);
269
                
270 60
                return $cmd->getPromise();
271 63
            });
272
        }
273
        
274 65
        return $this->connectPromise;
275
    }
276
    
277
    /**
278
     * Pauses the underlying stream I/O consumption.
279
     * If consumption is already paused, this will do nothing.
280
     * @return bool  Whether the operation was successful.
281
     */
282 1
    function pauseStreamConsumption(): bool {
283 1
        if($this->connection === null || $this->goingAway) {
284 1
            return false;
285
        }
286
        
287
        $this->connection->pause();
288
        return true;
289
    }
290
    
291
    /**
292
     * Resumes the underlying stream I/O consumption.
293
     * If consumption is not paused, this will do nothing.
294
     * @return bool  Whether the operation was successful.
295
     */
296 1
    function resumeStreamConsumption(): bool {
297 1
        if($this->connection === null || $this->goingAway) {
298 1
            return false;
299
        }
300
        
301
        $this->connection->resume();
302
        return true;
303
    }
304
    
305
    /**
306
     * Closes all connections gracefully after processing all outstanding requests.
307
     * @return \React\Promise\PromiseInterface
308
     */
309 28
    function close(): \React\Promise\PromiseInterface {
310 28
        if($this->goingAway) {
311 13
            return $this->goingAway->promise();
312
        }
313
        
314 17
        $state = $this->connectionState;
315 17
        $this->connectionState = \Plasma\DriverInterface::CONNECTION_UNUSABLE;
316
        
317
        // Connection is still pending
318 17
        if($this->connectPromise !== null) {
319 1
            $this->connectPromise->cancel();
320
            
321
            /** @var \Plasma\Drivers\MySQL\Commands\CommandInterface  $command */
322 1
            while($command = \array_shift($this->queue)) {
323
                $command->emit('error', array((new \Plasma\Exception('Connection is going away'))));
324
            }
325
        }
326
        
327 17
        $this->goingAway = new \React\Promise\Deferred();
328
        
329 17
        if(\count($this->queue) === 0 || $state < \Plasma\DriverInterface::CONNECTION_OK) {
330 16
            $this->queue = array();
331 16
            $this->goingAway->resolve();
332
        }
333
        
334
        return $this->goingAway->promise()->then(function () use ($state) {
335 17
            if($state !== static::CONNECTION_OK) {
336 16
                return;
337 1
            } elseif(!$this->connection->isWritable()) {
338
                $this->connection->close();
339
                return;
340
            }
341
            
342 1
            $deferred = new \React\Promise\Deferred();
343
            
344 1
            $quit = new \Plasma\Drivers\MySQL\Commands\QuitCommand();
345
            
346
            $this->connection->once('close', function () use (&$deferred) {
347 1
                $deferred->resolve();
348 1
            });
349
            
350
            $quit->once('end', function () {
351
                $this->connection->close();
352 1
            });
353
            
354 1
            $this->parser->invokeCommand($quit);
355
            
356 1
            return $deferred->promise();
357 17
        });
358
    }
359
    
360
    /**
361
     * Forcefully closes the connection, without waiting for any outstanding requests. This will reject all outstanding requests.
362
     * @return void
363
     */
364 12
    function quit(): void {
365 12
        if($this->goingAway === null) {
366 12
            $this->goingAway = new \React\Promise\Deferred();
367
        }
368
        
369 12
        $state = $this->connectionState;
370 12
        $this->connectionState = \Plasma\DriverInterface::CONNECTION_UNUSABLE;
371
        
372
        /** @var \Plasma\Drivers\MySQL\Commands\CommandInterface  $command */
373 12
        while($command = \array_shift($this->queue)) {
374 1
            $command->emit('error', array((new \Plasma\Exception('Connection is going away'))));
375
        }
376
        
377 12
        if($this->connectPromise !== null) {
378 1
            $this->connectPromise->cancel();
379
        }
380
        
381 12
        if($state === static::CONNECTION_OK) {
382 1
            $quit = new \Plasma\Drivers\MySQL\Commands\QuitCommand();
383 1
            $this->parser->invokeCommand($quit);
384
            
385 1
            $this->connection->close();
386
        }
387
        
388 12
        $this->goingAway->resolve();
389 12
    }
390
    
391
    /**
392
     * Whether this driver is currently in a transaction.
393
     * @return bool
394
     */
395 2
    function isInTransaction(): bool {
396 2
        return $this->transaction;
397
    }
398
    
399
    /**
400
     * Executes a plain query. Resolves with a `QueryResultInterface` instance.
401
     * When the command is done, the driver must check itself back into the client.
402
     * @param \Plasma\ClientInterface  $client
403
     * @param string                   $query
404
     * @return \React\Promise\PromiseInterface
405
     * @throws \Plasma\Exception
406
     * @see \Plasma\QueryResultInterface
407
     */
408 48
    function query(\Plasma\ClientInterface $client, string $query): \React\Promise\PromiseInterface {
409 48
        if($this->goingAway) {
410 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
411 47
        } elseif($this->connectionState !== \Plasma\DriverInterface::CONNECTION_OK) {
412 1
            if($this->connectPromise !== null) {
413
                return $this->connectPromise->then(function () use (&$client, &$query) {
414
                    return $this->query($client, $query);
415
                });
416
            }
417
            
418 1
            throw new \Plasma\Exception('Unable to continue without connection');
419
        }
420
        
421 46
        $command = new \Plasma\Drivers\MySQL\Commands\QueryCommand($this, $query);
422 46
        $this->executeCommand($command);
423
        
424 46
        if(!$this->transaction) {
425
            $command->once('end', function () use (&$client) {
426 44
                $client->checkinConnection($this);
427 44
            });
428
        }
429
        
430 46
        return $command->getPromise();
431
    }
432
    
433
    /**
434
     * Prepares a query. Resolves with a `StatementInterface` instance.
435
     * When the command is done, the driver must check itself back into the client.
436
     * @param \Plasma\ClientInterface  $client
437
     * @param string                   $query
438
     * @return \React\Promise\PromiseInterface
439
     * @throws \Plasma\Exception
440
     * @see \Plasma\StatementInterface
441
     */
442 40
    function prepare(\Plasma\ClientInterface $client, string $query): \React\Promise\PromiseInterface {
443 40
        if($this->goingAway) {
444 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
445 39
        } elseif($this->connectionState !== \Plasma\DriverInterface::CONNECTION_OK) {
446 1
            if($this->connectPromise !== null) {
447
                return $this->connectPromise->then(function () use (&$client, &$query) {
448
                    return $this->prepare($client, $query);
449
                });
450
            }
451
            
452 1
            throw new \Plasma\Exception('Unable to continue without connection');
453
        }
454
        
455 38
        $command = new \Plasma\Drivers\MySQL\Commands\StatementPrepareCommand($client, $this, $query);
456 38
        $this->executeCommand($command);
457
        
458 38
        return $command->getPromise();
459
    }
460
    
461
    /**
462
     * Prepares and executes a query. Resolves with a `QueryResultInterface` instance.
463
     * This is equivalent to prepare -> execute -> close.
464
     * If you need to execute a query multiple times, prepare the query manually for performance reasons.
465
     * @param \Plasma\ClientInterface  $client
466
     * @param string                   $query
467
     * @param array                    $params
468
     * @return \React\Promise\PromiseInterface
469
     * @throws \Plasma\Exception
470
     * @see \Plasma\StatementInterface
471
     */
472 38
    function execute(\Plasma\ClientInterface $client, string $query, array $params = array()): \React\Promise\PromiseInterface {
473 38
        if($this->goingAway) {
474 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
475 37
        } elseif($this->connectionState !== \Plasma\DriverInterface::CONNECTION_OK) {
476 1
            if($this->connectPromise !== null) {
477
                return $this->connectPromise->then(function () use (&$client, &$query, $params) {
478
                    return $this->execute($client, $query, $params);
479
                });
480
            }
481
            
482 1
            throw new \Plasma\Exception('Unable to continue without connection');
483
        }
484
        
485
        return $this->prepare($client, $query)->then(function (\Plasma\StatementInterface $statement) use ($params) {
486
            return $statement->execute($params)->then(function (\Plasma\QueryResultInterface $result) use (&$statement) {
487 35
                if($result instanceof \Plasma\StreamQueryResultInterface) {
488
                    $statement->close()->then(null, function (\Throwable $error) {
489
                        $this->emit('error', array($error));
490 35
                    });
491
                    
492 35
                    return $result;
493
                }
494
                
495
                return $statement->close()->then(function () use ($result) {
496 33
                    return $result;
497 33
                });
498
            }, function (\Throwable $error) use (&$statement) {
499
                return $statement->close()->then(function () use ($error) {
500 1
                    throw $error;
501 1
                });
502 36
            });
503 36
        });
504
    }
505
    
506
    /**
507
     * Quotes the string for use in the query.
508
     * @param string  $str
509
     * @param int     $type  For types, see the constants.
510
     * @return string
511
     * @throws \LogicException  Thrown if the driver does not support quoting.
512
     * @throws \Plasma\Exception
513
     */
514 5
    function quote(string $str, int $type = \Plasma\DriverInterface::QUOTE_TYPE_VALUE): string {
515 5
        if($this->parser === null) {
516 1
            throw new \Plasma\Exception('Unable to continue without connection');
517
        }
518
        
519 4
        $message = $this->parser->getLastOkMessage();
520 4
        if($message === null) {
521
            $message = $this->parser->getHandshakeMessage();
522
            
523
            if($message === null) {
524
                throw new \Plasma\Exception('Unable to quote without a previous handshake');
525
            }
526
        }
527
        
528 4
        $pos = \strpos($this->charset, '_');
529 4
        $dbCharset = \substr($this->charset, 0, ($pos !== false ? $pos : \strlen($this->charset)));
530 4
        $realCharset = $this->getRealCharset($dbCharset);
531
        
532 4
        if(($message->statusFlags & \Plasma\Drivers\MySQL\StatusFlags::SERVER_STATUS_NO_BACKSLASH_ESCAPES) !== 0) {
533
            return $this->escapeUsingQuotes($realCharset, $str, $type);
534
        }
535
        
536 4
        return $this->escapeUsingBackslashes($realCharset, $str, $type);
537
    }
538
    
539
    /**
540
     * Escapes using quotes.
541
     * @param string  $realCharset
542
     * @param string  $str
543
     * @param int     $type
544
     * @return string
545
     */
546 2
    function escapeUsingQuotes(string $realCharset, string $str, int $type): string {
547 2
        if($type === \Plasma\DriverInterface::QUOTE_TYPE_IDENTIFIER) {
548 1
            return '`'.\str_replace('`', '``', $str).'`';
549
        }
550
        
551
        $escapeChars = array(
552 1
            '"',
553
            '\\',
554
        );
555
        
556
        $escapeReplace = array(
557 1
            '""',
558
            '\\\\',
559
        );
560
        
561 1
        return '"'.\str_replace($escapeChars, $escapeReplace, $str).'"';
562
    }
563
    
564
    /**
565
     * Escapes using backslashes.
566
     * @param string  $realCharset
567
     * @param string  $str
568
     * @param int     $type
569
     * @return string
570
     */
571 6
    function escapeUsingBackslashes(string $realCharset, string $str, int $type): string {
572 6
        if($type === \Plasma\DriverInterface::QUOTE_TYPE_IDENTIFIER) {
573 3
            return '`'.\str_replace('`', '\\`', $str).'`';
574
        }
575
        
576
        $escapeChars = array(
577 3
            '\\',
578
            '"',
579
        );
580
        
581
        $escapeReplace = array(
582 3
            '\\\\',
583
            '\"',
584
        );
585
        
586 3
        return '"'.\str_replace($escapeChars, $escapeReplace, $str).'"';
587
    }
588
    
589
    /**
590
     * Begins a transaction. Resolves with a `TransactionInterface` instance.
591
     *
592
     * Checks out a connection until the transaction gets committed or rolled back.
593
     * It must be noted that the user is responsible for finishing the transaction. The client WILL NOT automatically
594
     * check the connection back into the pool, as long as the transaction is not finished.
595
     *
596
     * Some databases, including MySQL, automatically issue an implicit COMMIT when a database definition language (DDL)
597
     * statement such as DROP TABLE or CREATE TABLE is issued within a transaction.
598
     * The implicit COMMIT will prevent you from rolling back any other changes within the transaction boundary.
599
     * @param \Plasma\ClientInterface  $client
600
     * @param int                      $isolation  See the `TransactionInterface` constants.
601
     * @return \React\Promise\PromiseInterface
602
     * @throws \Plasma\Exception
603
     * @see \Plasma\TransactionInterface
604
     */
605 3
    function beginTransaction(\Plasma\ClientInterface $client, int $isolation = \Plasma\TransactionInterface::ISOLATION_COMMITTED): \React\Promise\PromiseInterface {
606 3
        if($this->goingAway) {
607 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
608
        }
609
        
610 2
        if($this->transaction) {
611 1
            throw new \Plasma\Exception('Driver is already in transaction');
612
        }
613
        
614 2
        $this->transaction = true;
615
        
616
        switch ($isolation) {
617 2
            case \Plasma\TransactionInterface::ISOLATION_NO_CHANGE:
618
                return $this->query($client, 'START TRANSACTION')->then(function () use (&$client, $isolation) {
619
                    return (new \Plasma\Transaction($client, $this, $isolation));
620
                })->then(null, function (\Throwable $e) {
621
                    $this->transaction = false;
622
                    throw $e;
623
                });
624
            break;
625 2
            case \Plasma\TransactionInterface::ISOLATION_UNCOMMITTED:
626
                $query = 'SET TRANSACTION ISOLATION LEVEL READ UNCOMMITTED';
627
            break;
628 2
            case \Plasma\TransactionInterface::ISOLATION_COMMITTED:
629 2
                $query = 'SET TRANSACTION ISOLATION LEVEL READ COMMITTED';
630 2
            break;
631
            case \Plasma\TransactionInterface::ISOLATION_REPEATABLE:
632
                $query = 'SET TRANSACTION ISOLATION LEVEL REPEATABLE READ';
633
            break;
634
            case \Plasma\TransactionInterface::ISOLATION_SERIALIZABLE:
635
                $query = 'SET TRANSACTION ISOLATION LEVEL SERIALIZABLE';
636
            break;
637
            default:
638
                throw new \Plasma\Exception('Invalid isolation level given');
639
            break;
640
        }
641
        
642
        return $this->query($client, $query)->then(function () use (&$client, $isolation) {
643
            return $this->query($client, 'START TRANSACTION')->then(function () use (&$client, $isolation) {
644 2
                return (new \Plasma\Transaction($client, $this, $isolation));
645 2
            });
646
        })->then(null, function (\Throwable $e) {
647
            $this->transaction = false;
648
            throw $e;
649 2
        });
650
    }
651
    
652
    /**
653
     * Informationally closes a transaction. This method is used by `Transaction` to inform the driver of the end of the transaction.
654
     * @return void
655
     */
656 1
    function endTransaction(): void {
657 1
        $this->transaction = false;
658 1
    }
659
    
660
    /**
661
     * Runs the given command.
662
     * Returns a Promise, which resolves with the `end` event argument (defaults to `null),
663
     * or rejects with the `Throwable` of the `error` event.
664
     * When the command is done, the driver must check itself back into the client.
665
     * @param \Plasma\ClientInterface   $client
666
     * @param \Plasma\CommandInterface  $command
667
     * @return \React\Promise\PromiseInterface
668
     */
669 6
    function runCommand(\Plasma\ClientInterface $client, \Plasma\CommandInterface $command) {
670 6
        if($this->goingAway) {
671 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
672
        }
673
        
674
        return (new \React\Promise\Promise(function (callable $resolve, callable $reject) use (&$client, &$command) {
675
            $command->once('end', function ($value = null) use (&$client, &$resolve) {
676 3
                if(!$this->transaction) {
677 3
                    $client->checkinConnection($this);
678
                }
679
                
680 3
                $resolve($value);
681 5
            });
682
            
683
            $command->once('error', function (\Throwable $error) use (&$client, &$reject) {
684 2
                if(!$this->transaction) {
685 2
                    $client->checkinConnection($this);
686
                }
687
                
688 2
                $reject($error);
689 5
            });
690
            
691 5
            $this->executeCommand($command);
692 5
        }));
693
    }
694
    
695
    /**
696
     * Runs the given SQL querybuilder.
697
     * The driver CAN throw an exception if the given querybuilder is not supported.
698
     * An example would be a SQL querybuilder and a Cassandra driver.
699
     * @param \Plasma\ClientInterface           $client
700
     * @param \Plasma\SQLQueryBuilderInterface  $query
701
     * @return \React\Promise\PromiseInterface
702
     * @throws \Plasma\Exception
703
     */
704 3
    function runQuery(\Plasma\ClientInterface $client, \Plasma\QueryBuilderInterface $query): \React\Promise\PromiseInterface {
705 3
        if($this->goingAway) {
706 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
707
        }
708
        
709 2
        if(!($query instanceof \Plasma\SQLQueryBuilderInterface)) {
710 1
            throw new \Plasma\Exception('Given querybuilder must be a SQL querybuilder');
711
        }
712
        
713 1
        $sql = $query->getQuery();
714 1
        $params = $query->getParameters();
715
        
716 1
        return $this->execute($client, $sql, $params);
717
    }
718
    
719
    /**
720
     * Creates a new cursor to seek through SELECT query results. Resolves with a `CursorInterface` instance.
721
     * @param \Plasma\ClientInterface  $client
722
     * @param string                   $query
723
     * @param array                    $params
724
     * @return \React\Promise\PromiseInterface
725
     * @throws \LogicException  Thrown if the driver or DBMS does not support cursors.
726
     * @throws \Plasma\Exception
727
     */
728 1
    function createCursor(\Plasma\ClientInterface $client, string $query, array $params = array()): \React\Promise\PromiseInterface {
729 1
        if($this->goingAway) {
730
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
731 1
        } elseif(!$this->supportsCursors()) {
732
            throw new \LogicException('Used DBMS version does not support cursors');
733
        }
734
        
735
        return $this->prepare($client, $query)->then(function (\Plasma\Drivers\MySQL\Statement $statement) use ($params) {
736 1
            if(!$this->supportsCursors()) {
737
                $statement->close()->then(null, function () {
738
                    $this->close();
739
                });
740
                
741
                throw new \LogicException('Used DBMS version does not support cursors');
742
            }
743
            
744 1
            return $statement->createCursor($params);
745 1
        });
746
    }
747
    
748
    /**
749
     * Executes a command.
750
     * @param \Plasma\CommandInterface  $command
751
     * @return void
752
     * @internal
753
     */
754 62
    function executeCommand(\Plasma\CommandInterface $command): void {
755 62
        $this->queue[] = $command;
756
        
757 62
        if($this->parser && $this->busy === static::STATE_IDLE) {
758 61
            $this->parser->invokeCommand($this->getNextCommand());
759
        }
760 62
    }
761
    
762
    /**
763
     * Get the handshake message, or null if none received yet.
764
     * @return \Plasma\Drivers\MySQL\Messages\HandshakeMessage|null
765
     */
766 62
    function getHandshake(): ?\Plasma\Drivers\MySQL\Messages\HandshakeMessage {
767 62
        if($this->parser) {
768 62
            return $this->parser->getHandshakeMessage();
769
        }
770
        
771
        return null;
772
    }
773
    
774
    /**
775
     * Get the next command, or null.
776
     * @return \Plasma\CommandInterface|null
777
     * @internal
778
     */
779 61
    function getNextCommand(): ?\Plasma\CommandInterface {
780 61
        if(\count($this->queue) === 0) {
781 60
            if($this->goingAway) {
782
                $this->goingAway->resolve();
783
            }
784
            
785 60
            return null;
786 61
        } elseif($this->busy === static::STATE_BUSY) {
787
            return null;
788
        }
789
        
790
        /** @var \Plasma\CommandInterface  $command */
791 61
        $command =  \array_shift($this->queue);
792
        
793 61
        if($command->waitForCompletion()) {
794 61
            $this->busy = static::STATE_BUSY;
795
            
796
            $command->once('error', function () use (&$command) {
797 4
                $this->busy = static::STATE_IDLE;
798
                
799 4
                $this->endCommand();
800 61
            });
801
            
802
            $command->once('end', function () use (&$command) {
803 60
                $this->busy = static::STATE_IDLE;
804
                
805 60
                $this->endCommand();
806 61
            });
807
        } else {
808 37
            $this->endCommand();
809
        }
810
        
811 61
        return $command;
812
    }
813
    
814
    /**
815
     * Get the driver options.
816
     * @return array
817
     * @internal
818
     */
819
    function getOptions(): array {
820
        return $this->options;
821
    }
822
    
823
    /**
824
     * Whether the DBMS supports cursors.
825
     * @return bool
826
     * @internal
827
     */
828 1
    function supportsCursors(): bool {
829 1
        if($this->getHandshake() === null) {
830
            return true; // Let's be optimistic
831
        }
832
        
833 1
        $version = $this->getHandshake()->serverVersion;
834 1
        $mariaDB = (\stripos($version, 'MariaDB') !== false);
835 1
        $version = \explode('-', $version)[($mariaDB ? 1 : 0)];
836
        
837
        return (
838 1
            (($this->getHandshake()->capability & \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_PS_MULTI_RESULTS) > 0) &&
839
            (
840 1
                ($mariaDB && \version_compare($version, '10.3', '>=')) ||
841 1
                (!$mariaDB && \version_compare($version, '5.7', '>='))
842
            )
843
        );
844
    }
845
    
846
    /**
847
     * Finishes up a command.
848
     * @return void
849
     */
850 61
    protected function endCommand() {
851
        $this->loop->futureTick(function () {
852 60
            if($this->goingAway && \count($this->queue) === 0) {
853 1
                return $this->goingAway->resolve();
854
            }
855
            
856 60
            $this->parser->invokeCommand($this->getNextCommand());
857 61
        });
858 61
    }
859
    
860
    /**
861
     * Starts the handshake process.
862
     * @param array                    $credentials
863
     * @param \React\Promise\Deferred  $deferred
864
     * @return void
865
     */
866 63
    protected function startHandshake(array $credentials, \React\Promise\Deferred $deferred) {
867
        $listener = function (\Plasma\Drivers\MySQL\Messages\MessageInterface $message) use ($credentials, &$deferred, &$listener) {
868 63
            if($message instanceof \Plasma\Drivers\MySQL\Messages\HandshakeMessage) {
869 63
                $this->parser->removeListener('message', $listener);
870
                
871 63
                $this->connectionState = static::CONNECTION_SETENV;
872 63
                $clientFlags = \Plasma\Drivers\MySQL\ProtocolParser::CLIENT_CAPABILITIES;
873
                
874 63
                \extract($credentials);
875
                
876 63
                if($db !== '') {
877 47
                    $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_CONNECT_WITH_DB;
878
                }
879
                
880 63
                if($this->charset === null) {
881 2
                    $this->charset = \Plasma\Drivers\MySQL\CharacterSetFlags::CHARSET_MAP[$message->characterSet] ?? 'latin1';
882
                }
883
                
884 63
                if(($message->capability & \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_COMPRESS) !== 0 && \extension_loaded('zlib') && $this->options['compression.enable']) {
885 2
                    $this->parser->enableCompression();
886 2
                    $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_COMPRESS;
887
                }
888
                
889
                // Check if we support auth plugins
890 63
                $plugins = \Plasma\Drivers\MySQL\DriverFactory::getAuthPlugins();
891 63
                $plugin = null;
892
                
893 63
                foreach($plugins as $key => $plug) {
894 63
                    if(\is_int($key) && ($message->capability & $key) !== 0) {
895 63
                        $plugin = $plug;
896 63
                        $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_PLUGIN_AUTH;
897 63
                        break;
898
                    } elseif($key === $message->authPluginName) {
899
                        $plugin = $plug;
900
                        $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_PLUGIN_AUTH;
901
                        break;
902
                    }
903
                }
904
                
905 63
                $remote = \parse_url($this->connection->getRemoteAddress());
906
                
907 63
                if($remote !== false && ($remote['host'] !== '127.0.0.1' || $this->options['tls.forceLocal'])) {
908
                    if(($message->capability & \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_SSL) !== 0) { // If SSL supported, connect through SSL
909
                        $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_SSL;
910
                        
911
                        $ssl = new \Plasma\Drivers\MySQL\Commands\SSLRequestCommand($message, $clientFlags);
912
                        
913
                        $ssl->once('end', function () use ($credentials, $clientFlags, $plugin, &$deferred, &$message) {
914
                            $this->connectionState = static::CONNECTION_SSL_STARTUP;
915
                            
916
                            $this->enableTLS()->then(function () use ($credentials, $clientFlags, $plugin, &$deferred, &$message) {
917
                                $this->createHandshakeResponse($message, $credentials, $clientFlags, $plugin, $deferred);
918
                            }, function (\Throwable $error) use (&$deferred) {
919
                                $deferred->reject($$error);
920
                                $this->connection->close();
921
                            });
922
                        });
923
                        
924
                        return $this->parser->invokeCommand($ssl);
925
                    } elseif($this->options['tls.force'] || $this->options['tls.forceLocal']) {
926
                        $deferred->reject((new \Plasma\Exception('TLS is not supported by the server')));
927
                        $this->connection->close();
928
                        return;
929
                    }
930
                }
931
                
932 63
                $this->createHandshakeResponse($message, $credentials, $clientFlags, $plugin, $deferred);
933
            }
934 63
        };
935
        
936 63
        $this->parser->on('message', $listener);
937
        
938
        $this->parser->on('message', function (\Plasma\Drivers\MySQL\Messages\MessageInterface $message) {
939 63
            if($message instanceof \Plasma\Drivers\MySQL\Messages\OkResponseMessage) {
940 62
                $this->connectionState = static::CONNECTION_OK;
941
            }
942
            
943 63
            $this->emit('eventRelay', array('message', $message));
944 63
        });
945 63
    }
946
    
947
    /**
948
     * Enables TLS on the connection.
949
     * @return \React\Promise\PromiseInterface
950
     */
951
    protected function enableTLS(): \React\Promise\PromiseInterface {
952
        // Set required SSL/TLS context options
953
        foreach($this->options['tls.context'] as $name => $value) {
954
            \stream_context_set_option($this->connection->stream, 'ssl', $name, $value);
955
        }
956
        
957
        return $this->encryption->enable($this->connection)->then(null, function (\Throwable $error) {
958
            $this->connection->close();
959
            throw new \RuntimeException('Connection failed during TLS handshake: '.$error->getMessage(), $error->getCode());
960
        });
961
    }
962
    
963
    /**
964
     * Sends the auth command.
965
     * @param \Plasma\Drivers\MySQL\Messages\HandshakeMessage  $message
966
     * @param array                                            $credentials
967
     * @param int                                              $clientFlags
968
     * @param string|null                                      $plugin
969
     * @param \React\Promise\Deferred                          $deferred
970
     * @return void
971
     */
972 63
    protected function createHandshakeResponse(
973
        \Plasma\Drivers\MySQL\Messages\HandshakeMessage $message, array $credentials, int $clientFlags, ?string $plugin, \React\Promise\Deferred $deferred
974
    ) {
975 63
        \extract($credentials);
976
        
977 63
        $auth = new \Plasma\Drivers\MySQL\Commands\HandshakeResponseCommand($this->parser, $message, $clientFlags, $plugin, $user, $password, $db);
978
        
979
        $auth->once('end', function () use (&$deferred) {
980
            $this->loop->futureTick(function () use (&$deferred) {
981 62
                $deferred->resolve();
982 62
            });
983 63
        });
984
        
985
        $auth->once('error', function (\Throwable $error) use (&$deferred) {
986 1
            $deferred->reject($error);
987 1
            $this->connection->close();
988 63
        });
989
        
990 63
        if($plugin) {
991
            $listener = function (\Plasma\Drivers\MySQL\Messages\MessageInterface $message) use ($password, &$deferred, &$listener) {
992
                /** @var \Plasma\Drivers\MySQL\AuthPlugins\AuthPluginInterface|null  $plugin */
993 63
                static $plugin;
994
                
995 63
                if($message instanceof \Plasma\Drivers\MySQL\Messages\AuthSwitchRequestMessage) {
996
                    $name = $message->authPluginName;
997
                    
998
                    if($name !== null) {
999
                        $plugins = \Plasma\Drivers\MySQL\DriverFactory::getAuthPlugins();
1000
                        foreach($plugins as $key => $plug) {
1001
                            if($key === $name) {
1002
                                $plugin = new $plug($this->parser, $this->parser->getHandshakeMessage());
1003
                                
1004
                                $command = new \Plasma\Drivers\MySQL\Commands\AuthSwitchResponseCommand($message, $plugin, $password);
1005
                                return $this->parser->invokeCommand($command);
1006
                            }
1007
                        }
1008
                    }
1009
                    
1010
                    $deferred->reject((new \Plasma\Exception('Requested authentication method '.($name ? '"'.$name.'" ' : '').'is not supported')));
1011 63
                } elseif($message instanceof \Plasma\Drivers\MySQL\Messages\AuthMoreDataMessage) {
1012
                    if($plugin === null) {
1013
                        $deferred->reject((new \Plasma\Exception('No auth plugin is in use, but we received auth more data packet')));
1014
                        return $this->connection->close();
1015
                    }
1016
                    
1017
                    try {
1018
                        $command = $plugin->receiveMoreData($message);
1019
                        return $this->parser->invokeCommand($command);
1020
                    } catch (\Plasma\Exception $e) {
1021
                        $deferred->reject($e);
1022
                        $this->connection->close();
1023
                    }
1024 63
                } elseif($message instanceof \Plasma\Drivers\MySQL\Messages\OkResponseMessage) {
1025 62
                    $this->parser->removeListener('message', $listener);
1026
                }
1027 63
            };
1028
            
1029 63
            $this->parser->on('message', $listener);
1030
        }
1031
        
1032 63
        $this->parser->invokeCommand($auth);
1033 63
        $this->connectionState = static::CONNECTION_AWAITING_RESPONSE;
1034 63
    }
1035
    
1036
    /**
1037
     * Get the real charset from the DB charset.
1038
     * @param string  $charset
1039
     * @return string
1040
     */
1041 4
    protected function getRealCharset(string $charset): string {
1042 4
        if(\substr($charset, 0, 4) === 'utf8') {
1043 2
            return 'UTF-8';
1044
        }
1045
        
1046 2
        $charsets = \mb_list_encodings();
1047
        
1048 2
        foreach($charsets as $set) {
1049 2
            if(\stripos($set, $charset) === 0) {
1050 2
                return $set;
1051
            }
1052
        }
1053
        
1054 2
        return 'UTF-8';
1055
    }
1056
    
1057
    /**
1058
     * Validates the given options.
1059
     * @param array  $options
1060
     * @return void
1061
     * @throws \InvalidArgumentException
1062
     */
1063 93
    protected function validateOptions(array $options) {
1064 93
        \CharlotteDunois\Validation\Validator::make($options, array(
1065 93
            'characters.set' => 'string',
1066
            'characters.collate' => 'string',
1067
            'compression.enable' => 'boolean',
1068
            'connector' => 'class:'.\React\Socket\ConnectorInterface::class.'=object',
1069
            'packet.maxAllowedSize' => 'integer|min:0|max:'.\Plasma\Drivers\MySQL\ProtocolParser::CLIENT_MAX_PACKET_SIZE,
1070
            'tls.context' => 'array',
1071
            'tls.force' => 'boolean',
1072
            'tls.forceLocal' => 'boolean'
1073 93
        ), true)->throw(\InvalidArgumentException::class);
1074 93
    }
1075
}
1076