GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Test Failed
Branch master (b4dd81)
by Charlotte
04:21
created

Driver   F

Complexity

Total Complexity 143

Size/Duplication

Total Lines 1039
Duplicated Lines 0 %

Test Coverage

Coverage 75.9%

Importance

Changes 0
Metric Value
eloc 425
dl 0
loc 1039
ccs 315
cts 415
cp 0.759
rs 2
c 0
b 0
f 0
wmc 143

32 Methods

Rating   Name   Duplication   Size   Complexity  
A __destruct() 0 2 1
A getBusyState() 0 2 1
A getBacklogLength() 0 2 1
A __construct() 0 9 1
A getConnectionState() 0 2 1
A getLoop() 0 2 1
D connect() 0 113 22
A isInTransaction() 0 2 1
A validateOptions() 0 11 1
A escapeUsingQuotes() 0 16 2
A quit() 0 25 5
A escapeUsingBackslashes() 0 16 2
A query() 0 23 5
B supportsCursors() 0 20 7
B createHandshakeResponse() 0 62 11
A getHandshake() 0 6 2
A executeCommand() 0 5 3
A getOptions() 0 2 1
A endCommand() 0 7 3
B beginTransaction() 0 44 8
A runQuery() 0 13 3
A enableTLS() 0 9 2
D startHandshake() 0 78 18
A endTransaction() 0 2 1
B close() 0 48 8
A quote() 0 23 6
A prepare() 0 17 4
A getRealCharset() 0 14 4
A runCommand() 0 23 4
A createReadCursor() 0 17 4
A execute() 0 29 5
A getNextCommand() 0 33 5

How to fix   Complexity   

Complex Class

Complex classes like Driver often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use Driver, and based on these observations, apply Extract Interface, too.

1
<?php
2
/**
3
 * Plasma Driver MySQL component
4
 * Copyright 2018-2019 PlasmaPHP, All Rights Reserved
5
 *
6
 * Website: https://github.com/PlasmaPHP
7
 * License: https://github.com/PlasmaPHP/driver-mysql/blob/master/LICENSE
8
*/
9
10
namespace Plasma\Drivers\MySQL;
11
12
/**
13
 * The MySQL Driver.
14
 * @internal
15
 */
16
class Driver implements \Plasma\DriverInterface {
17
    use \Evenement\EventEmitterTrait;
18
    
19
    /**
20
     * @var \React\EventLoop\LoopInterface
21
     */
22
    protected $loop;
23
    
24
    /**
25
     * @var array
26
     */
27
    protected $options = array(
28
        'characters.set' => 'utf8mb4',
29
        'characters.collate' => null,
30
        'compression.enable' => !true,
31
        'localInFile.enable' => false,
32
        'tls.context' => array(),
33
        'tls.force' => true,
34
        'tls.forceLocal' => false
35
    );
36
    
37
    /**
38
     * @var string[]
39
     */
40
    protected $allowedSchemes = array('mysql', 'tcp', 'tls', 'unix');
41
    
42
    /**
43
     * @var \React\Socket\ConnectorInterface
44
     */
45
    protected $connector;
46
    
47
    /**
48
     * Internal class is intentional used, as there's no other way currently.
49
     * @var \React\Socket\StreamEncryption
50
     * @see https://github.com/reactphp/socket/issues/180
51
     */
52
    protected $encryption;
53
    
54
    /**
55
     * @var \React\Promise\Promise|null
56
     */
57
    protected $connectPromise;
58
    
59
    /**
60
     * @var \React\Socket\Connection
61
     */
62
    protected $connection;
63
    
64
    /**
65
     * @var int
66
     */
67
    protected $connectionState = \Plasma\DriverInterface::CONNECTION_CLOSED;
68
    
69
    /**
70
     * @var \Plasma\Drivers\MySQL\ProtocolParser
71
     */
72
    protected $parser;
73
    
74
    /**
75
     * @var array
76
     */
77
    protected $queue;
78
    
79
    /**
80
     * @var int
81
     */
82
    protected $busy = \Plasma\DriverInterface::STATE_IDLE;
83
    
84
    /**
85
     * @var bool
86
     */
87
    protected $transaction = false;
88
    
89
    /**
90
     * @var \React\Promise\Deferred
91
     */
92
    protected $goingAway;
93
    
94
    /**
95
     * @var string|null
96
     */
97
    protected $charset;
98
    
99
    /**
100
     * @var bool|null
101
     */
102
    protected $cursorSupported;
103
    
104
    /**
105
     * Constructor.
106
     * @param \React\EventLoop\LoopInterface  $loop
107
     * @param array                           $options
108
     */
109 92
    function __construct(\React\EventLoop\LoopInterface $loop, array $options) {
110 92
        $this->validateOptions($options);
111
        
112 92
        $this->loop = $loop;
113 92
        $this->options = \array_merge($this->options, $options);
114
        
115 92
        $this->connector = ($options['connector'] ?? (new \React\Socket\Connector($loop)));
116 92
        $this->encryption = new \React\Socket\StreamEncryption($this->loop, false);
117 92
        $this->queue = array();
118 92
    }
119
    
120
    /**
121
     * Destructor.
122
     * @return void
123
     * @internal
124
     */
125 26
    function __destruct() {
126 26
        $this->close();
127 26
    }
128
    
129
    /**
130
     * Returns the event loop.
131
     * @return \React\EventLoop\LoopInterface
132
     */
133 63
    function getLoop(): \React\EventLoop\LoopInterface {
134 63
        return $this->loop;
135
    }
136
    
137
    /**
138
     * Retrieves the current connection state.
139
     * @return int
140
     */
141 12
    function getConnectionState(): int {
142 12
        return $this->connectionState;
143
    }
144
    
145
    /**
146
     * Retrieves the current busy state.
147
     * @return int
148
     */
149 1
    function getBusyState(): int {
150 1
        return $this->busy;
151
    }
152
    
153
    /**
154
     * Get the length of the driver backlog queue.
155
     * @return int
156
     */
157 1
    function getBacklogLength(): int {
158 1
        return \count($this->queue);
159
    }
160
    
161
    /**
162
     * Connects to the given URI.
163
     * @param string  $uri
164
     * @return \React\Promise\PromiseInterface
165
     * @throws \InvalidArgumentException
166
     */
167 69
    function connect(string $uri): \React\Promise\PromiseInterface {
168 69
        if($this->goingAway || $this->connectionState === \Plasma\DriverInterface::CONNECTION_UNUSABLE) {
169 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
170 68
        } elseif($this->connectionState === \Plasma\DriverInterface::CONNECTION_OK) {
171 1
            return \React\Promise\resolve();
172 68
        } elseif($this->connectPromise !== null) {
173 1
            return $this->connectPromise;
174
        }
175
        
176 68
        $pos = \strpos($uri, '://');
177
        
178 68
        if($pos === false) {
179 2
            $uri = 'tcp://'.$uri;
180 66
        } elseif(\substr($uri, 0, $pos) === 'unix') {
181 2
            $defaultSocket = '/tmp/mysql.sock';
182
            
183 2
            $apos = \strpos($uri, '@');
184 2
            $spos = \strrpos($uri, '/');
185
            
186 2
            if($apos === false) {
187 1
                throw new \InvalidArgumentException('Connecting without any username is not a valid operation');
188
            }
189
            
190 1
            $socket = \substr($uri, ($apos + 1), ($spos - ($apos + 1)));
191 1
            $uri = \substr($uri, 0, ($apos + 1)).'localhost'.\substr($uri, $spos);
192
            
193 1
            if($socket === 'localhost' || $socket === '127.0.0.1' || $socket === '::1') {
194
                $socket = $defaultSocket;
195
            }
196
        }
197
        
198 67
        $parts = \parse_url($uri);
199 67
        if(!isset($parts['scheme']) || !isset($parts['host']) || !\in_array($parts['scheme'], $this->allowedSchemes)) {
200 2
            throw new \InvalidArgumentException('Invalid connect uri given');
201
        }
202
        
203 65
        if(isset($socket)) {
204 1
            $parts['host'] = $socket;
205
        }
206
        
207 65
        if($parts['scheme'] === 'mysql') {
208 3
            $parts['scheme'] = 'tcp';
209
        }
210
        
211 65
        $host = $parts['scheme'].'://'.$parts['host'].($parts['scheme'] !== 'unix' ? ':'.($parts['port'] ?? 3306) : '');
212 65
        $this->connectionState = static::CONNECTION_STARTED;
213 65
        $resolved = false;
214
        
215
        $this->connectPromise = $this->connector->connect($host)->then(function (\React\Socket\ConnectionInterface $connection) use ($parts, &$resolved) {
216 63
            $this->connectPromise = null;
217
            
218
            // See description of property encryption
219 63
            if(!($connection instanceof \React\Socket\Connection)) {
220
                throw new \LogicException('Custom connection class is NOT supported yet (encryption limitation)');
221
            }
222
            
223 63
            $this->busy = static::STATE_BUSY;
224 63
            $this->connectionState = static::CONNECTION_MADE;
225 63
            $this->connection = $connection;
226
            
227
            $this->connection->on('error', function (\Throwable $error) {
228
                $this->emit('error', array($error));
229 63
            });
230
            
231
            $this->connection->on('close', function () {
232 3
                $this->connection = null;
233 3
                $this->connectionState = static::CONNECTION_UNUSABLE;
234
                
235 3
                $this->emit('close');
236 63
            });
237
            
238 63
            $deferred = new \React\Promise\Deferred();
239 63
            $this->parser = new \Plasma\Drivers\MySQL\ProtocolParser($this, $this->connection);
240
            
241
            $this->parser->on('error', function (\Throwable $error) use (&$deferred, &$resolved) {
242
                if($resolved) {
243
                    $this->emit('error', array($error));
244
                } else {
245
                    $deferred->reject($error);
246
                }
247 63
            });
248
            
249 63
            $user = ($parts['user'] ?? 'root');
250 63
            $password = ($parts['pass'] ?? '');
251 63
            $db = \ltrim(($parts['path'] ?? ''), '/');
252
            
253 63
            $credentials = \compact('user', 'password', 'db');
254
            
255 63
            $this->startHandshake($credentials, $deferred);
256
            return $deferred->promise()->then(function () use (&$resolved) {
257 62
                $this->busy = static::STATE_IDLE;
258 62
                $resolved = true;
259
                
260 62
                if(\count($this->queue) > 0) {
261
                    $this->parser->invokeCommand($this->getNextCommand());
262
                }
263 63
            });
264 65
        });
265
        
266 65
        if($this->options['characters.set']) {
267 63
            $this->charset = $this->options['characters.set'];
268
            $this->connectPromise = $this->connectPromise->then(function () {
269 60
                $query = 'SET NAMES "'.$this->options['characters.set'].'"'
270 60
                    .($this->options['characters.collate'] ? ' COLLATE "'.$this->options['characters.collate'].'"' : '');
271
                
272 60
                $cmd = new \Plasma\Drivers\MySQL\Commands\QueryCommand($this, $query);
273 60
                $this->executeCommand($cmd);
274
                
275 60
                return $cmd->getPromise();
276 63
            });
277
        }
278
        
279 65
        return $this->connectPromise;
280
    }
281
    
282
    /**
283
     * Closes all connections gracefully after processing all outstanding requests.
284
     * @return \React\Promise\PromiseInterface
285
     */
286 27
    function close(): \React\Promise\PromiseInterface {
287 27
        if($this->goingAway) {
288 12
            return $this->goingAway->promise();
289
        }
290
        
291 17
        $state = $this->connectionState;
292 17
        $this->connectionState = \Plasma\DriverInterface::CONNECTION_UNUSABLE;
293
        
294
        // Connection is still pending
295 17
        if($this->connectPromise !== null) {
296 1
            $this->connectPromise->cancel();
297
            
298
            /** @var \Plasma\Drivers\MySQL\Commands\CommandInterface  $command */
299 1
            while($command = \array_shift($this->queue)) {
300
                $command->emit('error', array((new \Plasma\Exception('Connection is going away'))));
301
            }
302
        }
303
        
304 17
        $this->goingAway = new \React\Promise\Deferred();
305
        
306 17
        if(\count($this->queue) === 0 || $state < \Plasma\DriverInterface::CONNECTION_OK) {
307 16
            $this->queue = array();
308 16
            $this->goingAway->resolve();
309
        }
310
        
311
        return $this->goingAway->promise()->then(function () use ($state) {
312 17
            if($state !== static::CONNECTION_OK) {
313 16
                return;
314 1
            } elseif(!$this->connection->isWritable()) {
315
                $this->connection->close();
316
                return;
317
            }
318
            
319 1
            $deferred = new \React\Promise\Deferred();
320
            
321 1
            $quit = new \Plasma\Drivers\MySQL\Commands\QuitCommand();
322
            
323
            $this->connection->once('close', function () use (&$deferred) {
324 1
                $deferred->resolve();
325 1
            });
326
            
327
            $quit->once('end', function () {
328
                $this->connection->close();
329 1
            });
330
            
331 1
            $this->parser->invokeCommand($quit);
332
            
333 1
            return $deferred->promise();
334 17
        });
335
    }
336
    
337
    /**
338
     * Forcefully closes the connection, without waiting for any outstanding requests. This will reject all outstanding requests.
339
     * @return void
340
     */
341 11
    function quit(): void {
342 11
        if($this->goingAway === null) {
343 11
            $this->goingAway = new \React\Promise\Deferred();
344
        }
345
        
346 11
        $state = $this->connectionState;
347 11
        $this->connectionState = \Plasma\DriverInterface::CONNECTION_UNUSABLE;
348
        
349
        /** @var \Plasma\Drivers\MySQL\Commands\CommandInterface  $command */
350 11
        while($command = \array_shift($this->queue)) {
351 1
            $command->emit('error', array((new \Plasma\Exception('Connection is going away'))));
352
        }
353
        
354 11
        if($this->connectPromise !== null) {
355 1
            $this->connectPromise->cancel();
356
        }
357
        
358 11
        if($state === static::CONNECTION_OK) {
359 1
            $quit = new \Plasma\Drivers\MySQL\Commands\QuitCommand();
360 1
            $this->parser->invokeCommand($quit);
361
            
362 1
            $this->connection->close();
363
        }
364
        
365 11
        $this->goingAway->resolve();
366 11
    }
367
    
368
    /**
369
     * Whether this driver is currently in a transaction.
370
     * @return bool
371
     */
372 2
    function isInTransaction(): bool {
373 2
        return $this->transaction;
374
    }
375
    
376
    /**
377
     * Executes a plain query. Resolves with a `QueryResultInterface` instance.
378
     * When the command is done, the driver must check itself back into the client.
379
     * @param \Plasma\ClientInterface  $client
380
     * @param string                   $query
381
     * @return \React\Promise\PromiseInterface
382
     * @throws \Plasma\Exception
383
     * @see \Plasma\QueryResultInterface
384
     */
385 48
    function query(\Plasma\ClientInterface $client, string $query): \React\Promise\PromiseInterface {
386 48
        if($this->goingAway) {
387 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
388 47
        } elseif($this->connectionState !== \Plasma\DriverInterface::CONNECTION_OK) {
389 1
            if($this->connectPromise !== null) {
390
                return $this->connectPromise->then(function () use (&$client, &$query) {
391
                    return $this->query($client, $query);
392
                });
393
            }
394
            
395 1
            throw new \Plasma\Exception('Unable to continue without connection');
396
        }
397
        
398 46
        $command = new \Plasma\Drivers\MySQL\Commands\QueryCommand($this, $query);
399 46
        $this->executeCommand($command);
400
        
401 46
        if(!$this->transaction) {
402
            $command->once('end', function () use (&$client) {
403 39
                $client->checkinConnection($this);
404 44
            });
405
        }
406
        
407 46
        return $command->getPromise();
408
    }
409
    
410
    /**
411
     * Prepares a query. Resolves with a `StatementInterface` instance.
412
     * When the command is done, the driver must check itself back into the client.
413
     * @param \Plasma\ClientInterface  $client
414
     * @param string                   $query
415
     * @return \React\Promise\PromiseInterface
416
     * @throws \Plasma\Exception
417
     * @see \Plasma\StatementInterface
418
     */
419 40
    function prepare(\Plasma\ClientInterface $client, string $query): \React\Promise\PromiseInterface {
420 40
        if($this->goingAway) {
421 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
422 39
        } elseif($this->connectionState !== \Plasma\DriverInterface::CONNECTION_OK) {
423 1
            if($this->connectPromise !== null) {
424
                return $this->connectPromise->then(function () use (&$client, &$query) {
425
                    return $this->prepare($client, $query);
426
                });
427
            }
428
            
429 1
            throw new \Plasma\Exception('Unable to continue without connection');
430
        }
431
        
432 38
        $command = new \Plasma\Drivers\MySQL\Commands\StatementPrepareCommand($client, $this, $query);
433 38
        $this->executeCommand($command);
434
        
435 38
        return $command->getPromise();
436
    }
437
    
438
    /**
439
     * Prepares and executes a query. Resolves with a `QueryResultInterface` instance.
440
     * This is equivalent to prepare -> execute -> close.
441
     * If you need to execute a query multiple times, prepare the query manually for performance reasons.
442
     * @param \Plasma\ClientInterface  $client
443
     * @param string                   $query
444
     * @param array                    $params
445
     * @return \React\Promise\PromiseInterface
446
     * @throws \Plasma\Exception
447
     * @see \Plasma\StatementInterface
448
     */
449 38
    function execute(\Plasma\ClientInterface $client, string $query, array $params = array()): \React\Promise\PromiseInterface {
450 38
        if($this->goingAway) {
451 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
452 37
        } elseif($this->connectionState !== \Plasma\DriverInterface::CONNECTION_OK) {
453 1
            if($this->connectPromise !== null) {
454
                return $this->connectPromise->then(function () use (&$client, &$query, $params) {
455
                    return $this->execute($client, $query, $params);
456
                });
457
            }
458
            
459 1
            throw new \Plasma\Exception('Unable to continue without connection');
460
        }
461
        
462
        return $this->prepare($client, $query)->then(function (\Plasma\StatementInterface $statement) use ($params) {
463
            return $statement->execute($params)->then(function (\Plasma\QueryResultInterface $result) use (&$statement) {
464
                if($result instanceof \Plasma\StreamQueryResultInterface) {
465
                    $statement->close()->then(null, function (\Throwable $error) {
466
                        $this->emit('error', array($error));
467
                    });
468
                    
469
                    return $result;
470
                }
471
                
472
                return $statement->close()->then(function () use ($result) {
473
                    return $result;
474
                });
475
            }, function (\Throwable $error) use (&$statement) {
476
                return $statement->close()->then(function () use ($error) {
477
                    throw $error;
478
                });
479
            });
480 36
        });
481
    }
482
    
483
    /**
484
     * Quotes the string for use in the query.
485
     * @param string  $str
486
     * @param int     $type  For types, see the constants.
487
     * @return string
488
     * @throws \LogicException  Thrown if the driver does not support quoting.
489
     * @throws \Plasma\Exception
490
     */
491 5
    function quote(string $str, int $type = \Plasma\DriverInterface::QUOTE_TYPE_VALUE): string {
492 5
        if($this->parser === null) {
493 1
            throw new \Plasma\Exception('Unable to continue without connection');
494
        }
495
        
496 4
        $message = $this->parser->getLastOkMessage();
497 4
        if($message === null) {
498
            $message = $this->parser->getHandshakeMessage();
499
            
500
            if($message === null) {
501
                throw new \Plasma\Exception('Unable to quote without a previous handshake');
502
            }
503
        }
504
        
505 4
        $pos = \strpos($this->charset, '_');
506 4
        $dbCharset = \substr($this->charset, 0, ($pos !== false ? $pos : \strlen($this->charset)));
507 4
        $realCharset = $this->getRealCharset($dbCharset);
508
        
509 4
        if(($message->statusFlags & \Plasma\Drivers\MySQL\StatusFlags::SERVER_STATUS_NO_BACKSLASH_ESCAPES) !== 0) {
510
            return $this->escapeUsingQuotes($realCharset, $str, $type);
511
        }
512
        
513 4
        return $this->escapeUsingBackslashes($realCharset, $str, $type);
514
    }
515
    
516
    /**
517
     * Escapes using quotes.
518
     * @param string  $realCharset
519
     * @param string  $str
520
     * @param int     $type
521
     * @return string
522
     */
523 2
    function escapeUsingQuotes(string $realCharset, string $str, int $type): string {
524 2
        if($type === \Plasma\DriverInterface::QUOTE_TYPE_IDENTIFIER) {
525 1
            return '`'.\str_replace('`', '``', $str).'`';
526
        }
527
        
528
        $escapeChars = array(
529 1
            '"',
530
            '\\',
531
        );
532
        
533
        $escapeReplace = array(
534 1
            '""',
535
            '\\\\',
536
        );
537
        
538 1
        return '"'.\str_replace($escapeChars, $escapeReplace, $str).'"';
539
    }
540
    
541
    /**
542
     * Escapes using backslashes.
543
     * @param string  $realCharset
544
     * @param string  $str
545
     * @param int     $type
546
     * @return string
547
     */
548 6
    function escapeUsingBackslashes(string $realCharset, string $str, int $type): string {
549 6
        if($type === \Plasma\DriverInterface::QUOTE_TYPE_IDENTIFIER) {
550 3
            return '`'.\str_replace('`', '\\`', $str).'`';
551
        }
552
        
553
        $escapeChars = array(
554 3
            '\\',
555
            '"',
556
        );
557
        
558
        $escapeReplace = array(
559 3
            '\\\\',
560
            '\"',
561
        );
562
        
563 3
        return '"'.\str_replace($escapeChars, $escapeReplace, $str).'"';
564
    }
565
    
566
    /**
567
     * Begins a transaction. Resolves with a `TransactionInterface` instance.
568
     *
569
     * Checks out a connection until the transaction gets committed or rolled back.
570
     * It must be noted that the user is responsible for finishing the transaction. The client WILL NOT automatically
571
     * check the connection back into the pool, as long as the transaction is not finished.
572
     *
573
     * Some databases, including MySQL, automatically issue an implicit COMMIT when a database definition language (DDL)
574
     * statement such as DROP TABLE or CREATE TABLE is issued within a transaction.
575
     * The implicit COMMIT will prevent you from rolling back any other changes within the transaction boundary.
576
     * @param \Plasma\ClientInterface  $client
577
     * @param int                      $isolation  See the `TransactionInterface` constants.
578
     * @return \React\Promise\PromiseInterface
579
     * @throws \Plasma\Exception
580
     * @see \Plasma\TransactionInterface
581
     */
582 3
    function beginTransaction(\Plasma\ClientInterface $client, int $isolation = \Plasma\TransactionInterface::ISOLATION_COMMITTED): \React\Promise\PromiseInterface {
583 3
        if($this->goingAway) {
584 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
585
        }
586
        
587 2
        if($this->transaction) {
588 1
            throw new \Plasma\Exception('Driver is already in transaction');
589
        }
590
        
591 2
        $this->transaction = true;
592
        
593
        switch ($isolation) {
594 2
            case \Plasma\TransactionInterface::ISOLATION_NO_CHANGE:
595
                return $this->query($client, 'START TRANSACTION')->then(function () use (&$client, $isolation) {
596
                    return (new \Plasma\Transaction($client, $this, $isolation));
597
                })->then(null, function (\Throwable $e) {
598
                    $this->transaction = false;
599
                    throw $e;
600
                });
601
            break;
602 2
            case \Plasma\TransactionInterface::ISOLATION_UNCOMMITTED:
603
                $query = 'SET TRANSACTION ISOLATION LEVEL READ UNCOMMITTED';
604
            break;
605 2
            case \Plasma\TransactionInterface::ISOLATION_COMMITTED:
606 2
                $query = 'SET TRANSACTION ISOLATION LEVEL READ COMMITTED';
607 2
            break;
608
            case \Plasma\TransactionInterface::ISOLATION_REPEATABLE:
609
                $query = 'SET TRANSACTION ISOLATION LEVEL REPEATABLE READ';
610
            break;
611
            case \Plasma\TransactionInterface::ISOLATION_SERIALIZABLE:
612
                $query = 'SET TRANSACTION ISOLATION LEVEL SERIALIZABLE';
613
            break;
614
            default:
615
                throw new \Plasma\Exception('Invalid isolation level given');
616
            break;
617
        }
618
        
619
        return $this->query($client, $query)->then(function () use (&$client, $isolation) {
620
            return $this->query($client, 'START TRANSACTION')->then(function () use (&$client, $isolation) {
621 2
                return (new \Plasma\Transaction($client, $this, $isolation));
622 2
            });
623
        })->then(null, function (\Throwable $e) {
624
            $this->transaction = false;
625
            throw $e;
626 2
        });
627
    }
628
    
629
    /**
630
     * Informationally closes a transaction. This method is used by `Transaction` to inform the driver of the end of the transaction.
631
     * @return void
632
     */
633 1
    function endTransaction(): void {
634 1
        $this->transaction = false;
635 1
    }
636
    
637
    /**
638
     * Runs the given command.
639
     * Returns a Promise, which resolves with the `end` event argument (defaults to `null),
640
     * or rejects with the `Throwable` of the `error` event.
641
     * When the command is done, the driver must check itself back into the client.
642
     * @param \Plasma\ClientInterface   $client
643
     * @param \Plasma\CommandInterface  $command
644
     * @return \React\Promise\PromiseInterface
645
     */
646 6
    function runCommand(\Plasma\ClientInterface $client, \Plasma\CommandInterface $command) {
647 6
        if($this->goingAway) {
648 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
649
        }
650
        
651
        return (new \React\Promise\Promise(function (callable $resolve, callable $reject) use (&$client, &$command) {
652
            $command->once('end', function ($value = null) use (&$client, &$resolve) {
653 3
                if(!$this->transaction) {
654 3
                    $client->checkinConnection($this);
655
                }
656
                
657 3
                $resolve($value);
658 5
            });
659
            
660
            $command->once('error', function (\Throwable $error) use (&$client, &$reject) {
661 2
                if(!$this->transaction) {
662 2
                    $client->checkinConnection($this);
663
                }
664
                
665 2
                $reject($error);
666 5
            });
667
            
668 5
            $this->executeCommand($command);
669 5
        }));
670
    }
671
    
672
    /**
673
     * Runs the given SQL querybuilder.
674
     * The driver CAN throw an exception if the given querybuilder is not supported.
675
     * An example would be a SQL querybuilder and a Cassandra driver.
676
     * @param \Plasma\ClientInterface           $client
677
     * @param \Plasma\SQLQueryBuilderInterface  $query
678
     * @return \React\Promise\PromiseInterface
679
     * @throws \Plasma\Exception
680
     */
681 3
    function runQuery(\Plasma\ClientInterface $client, \Plasma\QueryBuilderInterface $query): \React\Promise\PromiseInterface {
682 3
        if($this->goingAway) {
683 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
684
        }
685
        
686 2
        if(!($query instanceof \Plasma\SQLQueryBuilderInterface)) {
687 1
            throw new \Plasma\Exception('Given querybuilder must be a SQL querybuilder');
688
        }
689
        
690 1
        $sql = $query->getQuery();
691 1
        $params = $query->getParameters();
692
        
693 1
        return $this->execute($client, $sql, $params);
694
    }
695
    
696
    /**
697
     * Creates a new cursor to seek through SELECT query results. Resolves with a `CursorInterface` instance.
698
     * @param \Plasma\ClientInterface  $client
699
     * @param string                   $query
700
     * @param array                    $params
701
     * @return \React\Promise\PromiseInterface
702
     * @throws \LogicException  Thrown if the driver or DBMS does not support cursors.
703
     * @throws \Plasma\Exception
704
     */
705 1
    function createReadCursor(\Plasma\ClientInterface $client, string $query, array $params = array()): \React\Promise\PromiseInterface {
706 1
        if($this->goingAway) {
707
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
708 1
        } elseif(!$this->supportsCursors()) {
709
            throw new \LogicException('Used DBMS version does not support cursors');
710
        }
711
        
712
        return $this->prepare($client, $query)->then(function (\Plasma\Drivers\MySQL\Statement $statement) use ($params) {
713
            if(!$this->supportsCursors()) {
714
                $statement->close()->then(null, function () {
715
                    $this->close();
716
                });
717
                
718
                throw new \LogicException('Used DBMS version does not support cursors');
719
            }
720
            
721
            return $statement->createReadCursor($params);
722 1
        });
723
    }
724
    
725
    /**
726
     * Executes a command.
727
     * @param \Plasma\CommandInterface  $command
728
     * @return void
729
     * @internal
730
     */
731 62
    function executeCommand(\Plasma\CommandInterface $command): void {
732 62
        $this->queue[] = $command;
733
        
734 62
        if($this->parser && $this->busy === static::STATE_IDLE) {
735 61
            $this->parser->invokeCommand($this->getNextCommand());
736
        }
737 62
    }
738
    
739
    /**
740
     * Get the handshake message, or null if none received yet.
741
     * @return \Plasma\Drivers\MySQL\Messages\HandshakeMessage|null
742
     */
743 62
    function getHandshake(): ?\Plasma\Drivers\MySQL\Messages\HandshakeMessage {
744 62
        if($this->parser) {
745 62
            return $this->parser->getHandshakeMessage();
746
        }
747
        
748
        return null;
749
    }
750
    
751
    /**
752
     * Get the next command, or null.
753
     * @return \Plasma\CommandInterface|null
754
     * @internal
755
     */
756 61
    function getNextCommand(): ?\Plasma\CommandInterface {
757 61
        if(\count($this->queue) === 0) {
758 60
            if($this->goingAway) {
759
                $this->goingAway->resolve();
760
            }
761
            
762 60
            return null;
763 61
        } elseif($this->busy === static::STATE_BUSY) {
764
            return null;
765
        }
766
        
767
        /** @var \Plasma\CommandInterface  $command */
768 61
        $command =  \array_shift($this->queue);
769
        
770 61
        if($command->waitForCompletion()) {
771 61
            $this->busy = static::STATE_BUSY;
772
            
773
            $command->once('error', function () use (&$command) {
774 3
                $this->busy = static::STATE_IDLE;
775
                
776 3
                $this->endCommand();
777 61
            });
778
            
779
            $command->once('end', function () use (&$command) {
780 60
                $this->busy = static::STATE_IDLE;
781
                
782 60
                $this->endCommand();
783 61
            });
784
        } else {
785
            $this->endCommand();
786
        }
787
        
788 61
        return $command;
789
    }
790
    
791
    /**
792
     * Get the driver options.
793
     * @return array
794
     * @internal
795
     */
796
    function getOptions(): array {
797
        return $this->options;
798
    }
799
    
800
    /**
801
     * Whether the DBMS supports cursors.
802
     * @return bool
803
     * @internal
804
     */
805 1
    function supportsCursors(): bool {
806 1
        if($this->getHandshake() === null) {
807
            return true; // Let's be optimistic
808 1
        } elseif($this->cursorSupported !== null) {
809 1
            return $this->cursorSupported;
810
        }
811
        
812 1
        $version = $this->getHandshake()->serverVersion;
813 1
        $mariaDB = (int) (\stripos($version, 'MariaDB') !== false);
814 1
        $version = \explode('-', $version)[$mariaDB];
815
    
816 1
        $this->cursorSupported = (
817 1
            (($this->getHandshake()->capability & \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_PS_MULTI_RESULTS) > 0) &&
818
            (
819 1
                ($mariaDB && \version_compare($version, '10.3', '>=')) ||
820 1
                (!$mariaDB && \version_compare($version, '5.7', '>='))
821
            )
822
        );
823
        
824 1
        return $this->cursorSupported;
825
    }
826
    
827
    /**
828
     * Finishes up a command.
829
     * @return void
830
     */
831 61
    protected function endCommand() {
832
        $this->loop->futureTick(function () {
833 60
            if($this->goingAway && \count($this->queue) === 0) {
834 1
                return $this->goingAway->resolve();
835
            }
836
            
837 60
            $this->parser->invokeCommand($this->getNextCommand());
838 61
        });
839 61
    }
840
    
841
    /**
842
     * Starts the handshake process.
843
     * @param array                    $credentials
844
     * @param \React\Promise\Deferred  $deferred
845
     * @return void
846
     */
847 63
    protected function startHandshake(array $credentials, \React\Promise\Deferred $deferred) {
848
        $listener = function (\Plasma\Drivers\MySQL\Messages\MessageInterface $message) use ($credentials, &$deferred, &$listener) {
849 63
            if($message instanceof \Plasma\Drivers\MySQL\Messages\HandshakeMessage) {
850 63
                $this->parser->removeListener('message', $listener);
851
                
852 63
                $this->connectionState = static::CONNECTION_SETENV;
853 63
                $clientFlags = \Plasma\Drivers\MySQL\ProtocolParser::CLIENT_CAPABILITIES;
854
                
855 63
                \extract($credentials);
856
                
857 63
                if($db !== '') {
858 47
                    $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_CONNECT_WITH_DB;
859
                }
860
                
861 63
                if($this->charset === null) {
862 2
                    $this->charset = \Plasma\Drivers\MySQL\CharacterSetFlags::CHARSET_MAP[$message->characterSet] ?? 'latin1';
863
                }
864
                
865 63
                if(($message->capability & \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_COMPRESS) !== 0 && \extension_loaded('zlib') && $this->options['compression.enable']) {
866 2
                    $this->parser->enableCompression();
867 2
                    $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_COMPRESS;
868
                }
869
                
870
                // Check if we support auth plugins
871 63
                $plugins = \Plasma\Drivers\MySQL\DriverFactory::getAuthPlugins();
872 63
                $plugin = null;
873
                
874 63
                foreach($plugins as $key => $plug) {
875 63
                    if(\is_int($key) && ($message->capability & $key) !== 0) {
876 63
                        $plugin = $plug;
877 63
                        $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_PLUGIN_AUTH;
878 63
                        break;
879
                    } elseif($key === $message->authPluginName) {
880
                        $plugin = $plug;
881
                        $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_PLUGIN_AUTH;
882
                        break;
883
                    }
884
                }
885
                
886 63
                $remote = \parse_url($this->connection->getRemoteAddress());
887
                
888 63
                if($remote !== false && ($remote['host'] !== '127.0.0.1' || $this->options['tls.forceLocal'])) {
889
                    if(($message->capability & \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_SSL) !== 0) { // If SSL supported, connect through SSL
890
                        $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_SSL;
891
                        
892
                        $ssl = new \Plasma\Drivers\MySQL\Commands\SSLRequestCommand($message, $clientFlags);
893
                        
894
                        $ssl->once('end', function () use ($credentials, $clientFlags, $plugin, &$deferred, &$message) {
895
                            $this->connectionState = static::CONNECTION_SSL_STARTUP;
896
                            
897
                            $this->enableTLS()->then(function () use ($credentials, $clientFlags, $plugin, &$deferred, &$message) {
898
                                $this->createHandshakeResponse($message, $credentials, $clientFlags, $plugin, $deferred);
899
                            }, function (\Throwable $error) use (&$deferred) {
900
                                $deferred->reject($$error);
901
                                $this->connection->close();
902
                            });
903
                        });
904
                        
905
                        return $this->parser->invokeCommand($ssl);
906
                    } elseif($this->options['tls.force'] || $this->options['tls.forceLocal']) {
907
                        $deferred->reject((new \Plasma\Exception('TLS is not supported by the server')));
908
                        $this->connection->close();
909
                        return;
910
                    }
911
                }
912
                
913 63
                $this->createHandshakeResponse($message, $credentials, $clientFlags, $plugin, $deferred);
914
            }
915 63
        };
916
        
917 63
        $this->parser->on('message', $listener);
918
        
919
        $this->parser->on('message', function (\Plasma\Drivers\MySQL\Messages\MessageInterface $message) {
920 63
            if($message instanceof \Plasma\Drivers\MySQL\Messages\OkResponseMessage) {
921 62
                $this->connectionState = static::CONNECTION_OK;
922
            }
923
            
924 63
            $this->emit('eventRelay', array('message', $message));
925 63
        });
926 63
    }
927
    
928
    /**
929
     * Enables TLS on the connection.
930
     * @return \React\Promise\PromiseInterface
931
     */
932
    protected function enableTLS(): \React\Promise\PromiseInterface {
933
        // Set required SSL/TLS context options
934
        foreach($this->options['tls.context'] as $name => $value) {
935
            \stream_context_set_option($this->connection->stream, 'ssl', $name, $value);
936
        }
937
        
938
        return $this->encryption->enable($this->connection)->then(null, function (\Throwable $error) {
939
            $this->connection->close();
940
            throw new \RuntimeException('Connection failed during TLS handshake: '.$error->getMessage(), $error->getCode());
941
        });
942
    }
943
    
944
    /**
945
     * Sends the auth command.
946
     * @param \Plasma\Drivers\MySQL\Messages\HandshakeMessage  $message
947
     * @param array                                            $credentials
948
     * @param int                                              $clientFlags
949
     * @param string|null                                      $plugin
950
     * @param \React\Promise\Deferred                          $deferred
951
     * @return void
952
     */
953 63
    protected function createHandshakeResponse(
954
        \Plasma\Drivers\MySQL\Messages\HandshakeMessage $message, array $credentials, int $clientFlags, ?string $plugin, \React\Promise\Deferred $deferred
955
    ) {
956 63
        \extract($credentials);
957
        
958 63
        $auth = new \Plasma\Drivers\MySQL\Commands\HandshakeResponseCommand($this->parser, $message, $clientFlags, $plugin, $user, $password, $db);
959
        
960
        $auth->once('end', function () use (&$deferred) {
961
            $this->loop->futureTick(function () use (&$deferred) {
962 62
                $deferred->resolve();
963 62
            });
964 63
        });
965
        
966
        $auth->once('error', function (\Throwable $error) use (&$deferred) {
967 1
            $deferred->reject($error);
968 1
            $this->connection->close();
969 63
        });
970
        
971 63
        if($plugin) {
972
            $listener = function (\Plasma\Drivers\MySQL\Messages\MessageInterface $message) use ($password, &$deferred, &$listener) {
973
                /** @var \Plasma\Drivers\MySQL\AuthPlugins\AuthPluginInterface|null  $plugin */
974 63
                static $plugin;
975
                
976 63
                if($message instanceof \Plasma\Drivers\MySQL\Messages\AuthSwitchRequestMessage) {
977
                    $name = $message->authPluginName;
978
                    
979
                    if($name !== null) {
980
                        $plugins = \Plasma\Drivers\MySQL\DriverFactory::getAuthPlugins();
981
                        foreach($plugins as $key => $plug) {
982
                            if($key === $name) {
983
                                $plugin = new $plug($this->parser, $this->parser->getHandshakeMessage());
984
                                
985
                                $command = new \Plasma\Drivers\MySQL\Commands\AuthSwitchResponseCommand($message, $plugin, $password);
986
                                return $this->parser->invokeCommand($command);
987
                            }
988
                        }
989
                    }
990
                    
991
                    $deferred->reject((new \Plasma\Exception('Requested authentication method '.($name ? '"'.$name.'" ' : '').'is not supported')));
992 63
                } elseif($message instanceof \Plasma\Drivers\MySQL\Messages\AuthMoreDataMessage) {
993
                    if($plugin === null) {
994
                        $deferred->reject((new \Plasma\Exception('No auth plugin is in use, but we received auth more data packet')));
995
                        return $this->connection->close();
996
                    }
997
                    
998
                    try {
999
                        $command = $plugin->receiveMoreData($message);
1000
                        return $this->parser->invokeCommand($command);
1001
                    } catch (\Plasma\Exception $e) {
1002
                        $deferred->reject($e);
1003
                        $this->connection->close();
1004
                    }
1005 63
                } elseif($message instanceof \Plasma\Drivers\MySQL\Messages\OkResponseMessage) {
1006 62
                    $this->parser->removeListener('message', $listener);
1007
                }
1008 63
            };
1009
            
1010 63
            $this->parser->on('message', $listener);
1011
        }
1012
        
1013 63
        $this->parser->invokeCommand($auth);
1014 63
        $this->connectionState = static::CONNECTION_AWAITING_RESPONSE;
1015 63
    }
1016
    
1017
    /**
1018
     * Get the real charset from the DB charset.
1019
     * @param string  $charset
1020
     * @return string
1021
     */
1022 4
    protected function getRealCharset(string $charset): string {
1023 4
        if(\substr($charset, 0, 4) === 'utf8') {
1024 2
            return 'UTF-8';
1025
        }
1026
        
1027 2
        $charsets = \mb_list_encodings();
1028
        
1029 2
        foreach($charsets as $set) {
1030 2
            if(\stripos($set, $charset) === 0) {
1031 2
                return $set;
1032
            }
1033
        }
1034
        
1035 2
        return 'UTF-8';
1036
    }
1037
    
1038
    /**
1039
     * Validates the given options.
1040
     * @param array  $options
1041
     * @return void
1042
     * @throws \InvalidArgumentException
1043
     */
1044 92
    protected function validateOptions(array $options) {
1045 92
        \CharlotteDunois\Validation\Validator::make($options, array(
1046 92
            'characters.set' => 'string',
1047
            'characters.collate' => 'string',
1048
            'compression.enable' => 'boolean',
1049
            'connector' => 'class:'.\React\Socket\ConnectorInterface::class.'=object',
1050
            'packet.maxAllowedSize' => 'integer|min:0|max:'.\Plasma\Drivers\MySQL\ProtocolParser::CLIENT_MAX_PACKET_SIZE,
1051
            'tls.context' => 'array',
1052
            'tls.force' => 'boolean',
1053
            'tls.forceLocal' => 'boolean'
1054 92
        ), true)->throw(\InvalidArgumentException::class);
1055 92
    }
1056
}
1057