GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Push — master ( f4b4f6...d42adb )
by Charlotte
02:25
created

Driver   F

Complexity

Total Complexity 136

Size/Duplication

Total Lines 988
Duplicated Lines 0 %

Test Coverage

Coverage 78.43%

Importance

Changes 0
Metric Value
eloc 402
dl 0
loc 988
ccs 309
cts 394
cp 0.7843
rs 2
c 0
b 0
f 0
wmc 136

31 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 9 1
A __destruct() 0 2 1
A getBusyState() 0 2 1
A getBacklogLength() 0 2 1
A getConnectionState() 0 2 1
A getLoop() 0 2 1
A isInTransaction() 0 2 1
A validateOptions() 0 11 1
A escapeUsingQuotes() 0 16 2
A quit() 0 25 5
A escapeUsingBackslashes() 0 16 2
A query() 0 23 5
B createHandshakeResponse() 0 62 11
A getHandshake() 0 6 2
A executeCommand() 0 5 3
A execute() 0 29 5
A endCommand() 0 7 3
A resumeStreamConsumption() 0 7 3
B beginTransaction() 0 36 7
A runQuery() 0 13 3
A enableTLS() 0 9 2
A pauseStreamConsumption() 0 7 3
D startHandshake() 0 78 18
A endTransaction() 0 2 1
B close() 0 48 8
A quote() 0 23 6
A prepare() 0 17 4
A getRealCharset() 0 14 4
D connect() 0 113 22
A runCommand() 0 23 4
A getNextCommand() 0 33 5

How to fix   Complexity   

Complex Class

Complex classes like Driver often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use Driver, and based on these observations, apply Extract Interface, too.

1
<?php
2
/**
3
 * Plasma Driver MySQL component
4
 * Copyright 2018-2019 PlasmaPHP, All Rights Reserved
5
 *
6
 * Website: https://github.com/PlasmaPHP
7
 * License: https://github.com/PlasmaPHP/driver-mysql/blob/master/LICENSE
8
*/
9
10
namespace Plasma\Drivers\MySQL;
11
12
/**
13
 * The MySQL Driver.
14
 * @internal
15
 */
16
class Driver implements \Plasma\DriverInterface {
17
    use \Evenement\EventEmitterTrait;
18
    
19
    /**
20
     * @var \React\EventLoop\LoopInterface
21
     */
22
    protected $loop;
23
    
24
    /**
25
     * @var array
26
     */
27
    protected $options = array(
28
        'characters.set' => 'utf8mb4',
29
        'characters.collate' => null,
30
        'compression.enable' => !true,
31
        'tls.context' => array(),
32
        'tls.force' => true,
33
        'tls.forceLocal' => false
34
    );
35
    
36
    /**
37
     * @var string[]
38
     */
39
    protected $allowedSchemes = array('mysql', 'tcp', 'tls', 'unix');
40
    
41
    /**
42
     * @var \React\Socket\ConnectorInterface
43
     */
44
    protected $connector;
45
    
46
    /**
47
     * Internal class is intentional used, as there's no other way currently.
48
     * @var \React\Socket\StreamEncryption
49
     * @see https://github.com/reactphp/socket/issues/180
50
     */
51
    protected $encryption;
52
    
53
    /**
54
     * @var \React\Promise\Promise|null
55
     */
56
    protected $connectPromise;
57
    
58
    /**
59
     * @var \React\Socket\Connection
60
     */
61
    protected $connection;
62
    
63
    /**
64
     * @var int
65
     */
66
    protected $connectionState = \Plasma\DriverInterface::CONNECTION_CLOSED;
67
    
68
    /**
69
     * @var \Plasma\Drivers\MySQL\ProtocolParser
70
     */
71
    protected $parser;
72
    
73
    /**
74
     * @var array
75
     */
76
    protected $queue;
77
    
78
    /**
79
     * @var int
80
     */
81
    protected $busy = \Plasma\DriverInterface::STATE_IDLE;
82
    
83
    /**
84
     * @var bool
85
     */
86
    protected $transaction = false;
87
    
88
    /**
89
     * @var \React\Promise\Deferred
90
     */
91
    protected $goingAway;
92
    
93
    /**
94
     * @var string|null
95
     */
96
    protected $charset;
97
    
98
    /**
99
     * Constructor.
100
     * @param \React\EventLoop\LoopInterface  $loop
101
     * @param array                           $options
102
     */
103 89
    function __construct(\React\EventLoop\LoopInterface $loop, array $options) {
104 89
        $this->validateOptions($options);
105
        
106 89
        $this->loop = $loop;
107 89
        $this->options = \array_merge($this->options, $options);
108
        
109 89
        $this->connector = ($options['connector'] ?? (new \React\Socket\Connector($loop)));
110 89
        $this->encryption = new \React\Socket\StreamEncryption($this->loop, false);
111 89
        $this->queue = array();
112 89
    }
113
    
114
    /**
115
     * Destructor.
116
     * @return void
117
     * @internal
118
     */
119 26
    function __destruct() {
120 26
        $this->close();
121 26
    }
122
    
123
    /**
124
     * Returns the event loop.
125
     * @return \React\EventLoop\LoopInterface
126
     */
127 60
    function getLoop(): \React\EventLoop\LoopInterface {
128 60
        return $this->loop;
129
    }
130
    
131
    /**
132
     * Retrieves the current connection state.
133
     * @return int
134
     */
135 11
    function getConnectionState(): int {
136 11
        return $this->connectionState;
137
    }
138
    
139
    /**
140
     * Retrieves the current busy state.
141
     * @return int
142
     */
143 1
    function getBusyState(): int {
144 1
        return $this->busy;
145
    }
146
    
147
    /**
148
     * Get the length of the driver backlog queue.
149
     * @return int
150
     */
151 1
    function getBacklogLength(): int {
152 1
        return \count($this->queue);
153
    }
154
    
155
    /**
156
     * Connects to the given URI.
157
     * @param string  $uri
158
     * @return \React\Promise\PromiseInterface
159
     * @throws \InvalidArgumentException
160
     */
161 67
    function connect(string $uri): \React\Promise\PromiseInterface {
162 67
        if($this->goingAway || $this->connectionState === \Plasma\DriverInterface::CONNECTION_UNUSABLE) {
163 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
164 66
        } elseif($this->connectionState === \Plasma\DriverInterface::CONNECTION_OK) {
165 1
            return \React\Promise\resolve();
166 66
        } elseif($this->connectPromise !== null) {
167 1
            return $this->connectPromise;
168
        }
169
        
170 66
        $pos = \strpos($uri, '://');
171
        
172 66
        if($pos === false) {
173 2
            $uri = 'tcp://'.$uri;
174 64
        } elseif(\substr($uri, 0, $pos) === 'unix') {
175 2
            $defaultSocket = '/tmp/mysql.sock';
176
            
177 2
            $apos = \strpos($uri, '@');
178 2
            $spos = \strrpos($uri, '/');
179
            
180 2
            if($apos === false) {
181 1
                throw new \InvalidArgumentException('Connecting without any username is not a valid operation');
182
            }
183
            
184 1
            $socket = \substr($uri, ($apos + 1), ($spos - ($apos + 1)));
185 1
            $uri = \substr($uri, 0, ($apos + 1)).'localhost'.\substr($uri, $spos);
186
            
187 1
            if($socket === 'localhost' || $socket === '127.0.0.1' || $socket === '::1') {
188
                $socket = $defaultSocket;
189
            }
190
        }
191
        
192 65
        $parts = \parse_url($uri);
193 65
        if(!isset($parts['scheme']) || !isset($parts['host']) || !\in_array($parts['scheme'], $this->allowedSchemes)) {
194 2
            throw new \InvalidArgumentException('Invalid connect uri given');
195
        }
196
        
197 63
        if(isset($socket)) {
198 1
            $parts['host'] = $socket;
199
        }
200
        
201 63
        if($parts['scheme'] === 'mysql') {
202 2
            $parts['scheme'] = 'tcp';
203
        }
204
        
205 63
        $host = $parts['scheme'].'://'.$parts['host'].($parts['scheme'] !== 'unix' ? ':'.($parts['port'] ?? 3306) : '');
206 63
        $this->connectionState = static::CONNECTION_STARTED;
207 63
        $resolved = false;
208
        
209
        $this->connectPromise = $this->connector->connect($host)->then(function (\React\Socket\ConnectionInterface $connection) use ($parts, &$resolved) {
210 62
            $this->connectPromise = null;
211
            
212
            // See description of property encryption
213 62
            if(!($connection instanceof \React\Socket\Connection)) {
214
                throw new \LogicException('Custom connection class is NOT supported yet (encryption limitation)');
215
            }
216
            
217 62
            $this->busy = static::STATE_BUSY;
218 62
            $this->connectionState = static::CONNECTION_MADE;
219 62
            $this->connection = $connection;
220
            
221
            $this->connection->on('error', function (\Throwable $error) {
222
                $this->emit('error', array($error));
223 62
            });
224
            
225
            $this->connection->on('close', function () {
226 3
                $this->connection = null;
227 3
                $this->connectionState = static::CONNECTION_UNUSABLE;
228
                
229 3
                $this->emit('close');
230 62
            });
231
            
232 62
            $deferred = new \React\Promise\Deferred();
233 62
            $this->parser = new \Plasma\Drivers\MySQL\ProtocolParser($this, $this->connection);
234
            
235
            $this->parser->on('error', function (\Throwable $error) use (&$deferred, &$resolved) {
236
                if($resolved) {
237
                    $this->emit('error', array($error));
238
                } else {
239
                    $deferred->reject($error);
240
                }
241 62
            });
242
            
243 62
            $user = ($parts['user'] ?? 'root');
244 62
            $password = ($parts['pass'] ?? '');
245 62
            $db = \ltrim(($parts['path'] ?? ''), '/');
246
            
247 62
            $credentials = \compact('user', 'password', 'db');
248
            
249 62
            $this->startHandshake($credentials, $deferred);
250
            return $deferred->promise()->then(function () use (&$resolved) {
251 61
                $this->busy = static::STATE_IDLE;
252 61
                $resolved = true;
253
                
254 61
                if(\count($this->queue) > 0) {
255
                    $this->parser->invokeCommand($this->getNextCommand());
256
                }
257 62
            });
258 63
        });
259
        
260 63
        if($this->options['characters.set']) {
261 61
            $this->charset = $this->options['characters.set'];
262
            $this->connectPromise = $this->connectPromise->then(function () {
263 59
                $query = 'SET NAMES "'.$this->options['characters.set'].'"'
264 59
                    .($this->options['characters.collate'] ? ' COLLATE "'.$this->options['characters.collate'].'"' : '');
265
                
266 59
                $cmd = new \Plasma\Drivers\MySQL\Commands\QueryCommand($this, $query);
267 59
                $this->executeCommand($cmd);
268
                
269 59
                return $cmd->getPromise();
270 61
            });
271
        }
272
        
273 63
        return $this->connectPromise;
274
    }
275
    
276
    /**
277
     * Pauses the underlying stream I/O consumption.
278
     * If consumption is already paused, this will do nothing.
279
     * @return bool  Whether the operation was successful.
280
     */
281 1
    function pauseStreamConsumption(): bool {
282 1
        if($this->connection === null || $this->goingAway) {
283 1
            return false;
284
        }
285
        
286
        $this->connection->pause();
287
        return true;
288
    }
289
    
290
    /**
291
     * Resumes the underlying stream I/O consumption.
292
     * If consumption is not paused, this will do nothing.
293
     * @return bool  Whether the operation was successful.
294
     */
295 1
    function resumeStreamConsumption(): bool {
296 1
        if($this->connection === null || $this->goingAway) {
297 1
            return false;
298
        }
299
        
300
        $this->connection->resume();
301
        return true;
302
    }
303
    
304
    /**
305
     * Closes all connections gracefully after processing all outstanding requests.
306
     * @return \React\Promise\PromiseInterface
307
     */
308 27
    function close(): \React\Promise\PromiseInterface {
309 27
        if($this->goingAway) {
310 12
            return $this->goingAway->promise();
311
        }
312
        
313 16
        $state = $this->connectionState;
314 16
        $this->connectionState = \Plasma\DriverInterface::CONNECTION_UNUSABLE;
315
        
316
        // Connection is still pending
317 16
        if($this->connectPromise !== null) {
318
            $this->connectPromise->cancel();
319
            
320
            /** @var \Plasma\Drivers\MySQL\Commands\CommandInterface  $command */
321
            while($command = \array_shift($this->queue)) {
322
                $command->emit('error', array((new \Plasma\Exception('Connection is going away'))));
323
            }
324
        }
325
        
326 16
        $this->goingAway = new \React\Promise\Deferred();
327
        
328 16
        if(\count($this->queue) === 0 || $state < \Plasma\DriverInterface::CONNECTION_OK) {
329 15
            $this->queue = array();
330 15
            $this->goingAway->resolve();
331
        }
332
        
333
        return $this->goingAway->promise()->then(function () use ($state) {
334 16
            if($state !== static::CONNECTION_OK) {
335 15
                return;
336 1
            } elseif(!$this->connection->isWritable()) {
337
                $this->connection->close();
338
                return;
339
            }
340
            
341 1
            $deferred = new \React\Promise\Deferred();
342
            
343 1
            $quit = new \Plasma\Drivers\MySQL\Commands\QuitCommand();
344
            
345
            $this->connection->once('close', function () use (&$deferred) {
346 1
                $deferred->resolve();
347 1
            });
348
            
349
            $quit->once('end', function () {
350
                $this->connection->close();
351 1
            });
352
            
353 1
            $this->parser->invokeCommand($quit);
354
            
355 1
            return $deferred->promise();
356 16
        });
357
    }
358
    
359
    /**
360
     * Forcefully closes the connection, without waiting for any outstanding requests. This will reject all outstanding requests.
361
     * @return void
362
     */
363 12
    function quit(): void {
364 12
        if($this->goingAway === null) {
365 12
            $this->goingAway = new \React\Promise\Deferred();
366
        }
367
        
368 12
        $state = $this->connectionState;
369 12
        $this->connectionState = \Plasma\DriverInterface::CONNECTION_UNUSABLE;
370
        
371
        /** @var \Plasma\Drivers\MySQL\Commands\CommandInterface  $command */
372 12
        while($command = \array_shift($this->queue)) {
373 1
            $command->emit('error', array((new \Plasma\Exception('Connection is going away'))));
374
        }
375
        
376 12
        if($this->connectPromise !== null) {
377 1
            $this->connectPromise->cancel();
378
        }
379
        
380 12
        if($state === static::CONNECTION_OK) {
381 1
            $quit = new \Plasma\Drivers\MySQL\Commands\QuitCommand();
382 1
            $this->parser->invokeCommand($quit);
383
            
384 1
            $this->connection->close();
385
        }
386
        
387 12
        $this->goingAway->resolve();
388 12
    }
389
    
390
    /**
391
     * Whether this driver is currently in a transaction.
392
     * @return bool
393
     */
394 2
    function isInTransaction(): bool {
395 2
        return $this->transaction;
396
    }
397
    
398
    /**
399
     * Executes a plain query. Resolves with a `QueryResultInterface` instance.
400
     * When the command is done, the driver must check itself back into the client.
401
     * @param \Plasma\ClientInterface  $client
402
     * @param string                   $query
403
     * @return \React\Promise\PromiseInterface
404
     * @throws \Plasma\Exception
405
     * @see \Plasma\QueryResultInterface
406
     */
407 48
    function query(\Plasma\ClientInterface $client, string $query): \React\Promise\PromiseInterface {
408 48
        if($this->goingAway) {
409 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
410 47
        } elseif($this->connectionState !== \Plasma\DriverInterface::CONNECTION_OK) {
411 1
            if($this->connectPromise !== null) {
412
                return $this->connectPromise->then(function () use (&$client, &$query) {
413
                    return $this->query($client, $query);
414
                });
415
            }
416
            
417 1
            throw new \Plasma\Exception('Unable to continue without connection');
418
        }
419
        
420 46
        $command = new \Plasma\Drivers\MySQL\Commands\QueryCommand($this, $query);
421 46
        $this->executeCommand($command);
422
        
423 46
        if(!$this->transaction) {
424
            $command->once('end', function () use (&$client) {
425 44
                $client->checkinConnection($this);
426 44
            });
427
        }
428
        
429 46
        return $command->getPromise();
430
    }
431
    
432
    /**
433
     * Prepares a query. Resolves with a `StatementInterface` instance.
434
     * When the command is done, the driver must check itself back into the client.
435
     * @param \Plasma\ClientInterface  $client
436
     * @param string                   $query
437
     * @return \React\Promise\PromiseInterface
438
     * @throws \Plasma\Exception
439
     * @see \Plasma\StatementInterface
440
     */
441 39
    function prepare(\Plasma\ClientInterface $client, string $query): \React\Promise\PromiseInterface {
442 39
        if($this->goingAway) {
443 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
444 38
        } elseif($this->connectionState !== \Plasma\DriverInterface::CONNECTION_OK) {
445 1
            if($this->connectPromise !== null) {
446
                return $this->connectPromise->then(function () use (&$client, &$query) {
447
                    return $this->prepare($client, $query);
448
                });
449
            }
450
            
451 1
            throw new \Plasma\Exception('Unable to continue without connection');
452
        }
453
        
454 37
        $command = new \Plasma\Drivers\MySQL\Commands\StatementPrepareCommand($client, $this, $query);
455 37
        $this->executeCommand($command);
456
        
457 37
        return $command->getPromise();
458
    }
459
    
460
    /**
461
     * Prepares and executes a query. Resolves with a `QueryResultInterface` instance.
462
     * This is equivalent to prepare -> execute -> close.
463
     * If you need to execute a query multiple times, prepare the query manually for performance reasons.
464
     * @param \Plasma\ClientInterface  $client
465
     * @param string                   $query
466
     * @param array                    $params
467
     * @return \React\Promise\PromiseInterface
468
     * @throws \Plasma\Exception
469
     * @see \Plasma\StatementInterface
470
     */
471 38
    function execute(\Plasma\ClientInterface $client, string $query, array $params = array()): \React\Promise\PromiseInterface {
472 38
        if($this->goingAway) {
473 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
474 37
        } elseif($this->connectionState !== \Plasma\DriverInterface::CONNECTION_OK) {
475 1
            if($this->connectPromise !== null) {
476
                return $this->connectPromise->then(function () use (&$client, &$query, $params) {
477
                    return $this->execute($client, $query, $params);
478
                });
479
            }
480
            
481 1
            throw new \Plasma\Exception('Unable to continue without connection');
482
        }
483
        
484
        return $this->prepare($client, $query)->then(function (\Plasma\StatementInterface $statement) use ($params) {
485
            return $statement->execute($params)->then(function (\Plasma\QueryResultInterface $result) use (&$statement) {
486 35
                if($result instanceof \Plasma\StreamQueryResultInterface) {
487
                    $statement->close()->then(null, function (\Throwable $error) {
488
                        $this->emit('error', array($error));
489 35
                    });
490
                    
491 35
                    return $result;
492
                }
493
                
494
                return $statement->close()->then(function () use ($result) {
495 33
                    return $result;
496 33
                });
497
            }, function (\Throwable $error) use (&$statement) {
498
                return $statement->close()->then(function () use ($error) {
499
                    throw $error;
500
                });
501 36
            });
502 36
        });
503
    }
504
    
505
    /**
506
     * Quotes the string for use in the query.
507
     * @param string  $str
508
     * @param int     $type  For types, see the constants.
509
     * @return string
510
     * @throws \LogicException  Thrown if the driver does not support quoting.
511
     * @throws \Plasma\Exception
512
     */
513 5
    function quote(string $str, int $type = \Plasma\DriverInterface::QUOTE_TYPE_VALUE): string {
514 5
        if($this->parser === null) {
515 1
            throw new \Plasma\Exception('Unable to continue without connection');
516
        }
517
        
518 4
        $message = $this->parser->getLastOkMessage();
519 4
        if($message === null) {
520
            $message = $this->parser->getHandshakeMessage();
521
            
522
            if($message === null) {
523
                throw new \Plasma\Exception('Unable to quote without a previous handshake');
524
            }
525
        }
526
        
527 4
        $pos = \strpos($this->charset, '_');
528 4
        $dbCharset = \substr($this->charset, 0, ($pos !== false ? $pos : \strlen($this->charset)));
529 4
        $realCharset = $this->getRealCharset($dbCharset);
530
        
531 4
        if(($message->statusFlags & \Plasma\Drivers\MySQL\StatusFlags::SERVER_STATUS_NO_BACKSLASH_ESCAPES) !== 0) {
532
            return $this->escapeUsingQuotes($realCharset, $str, $type);
533
        }
534
        
535 4
        return $this->escapeUsingBackslashes($realCharset, $str, $type);
536
    }
537
    
538
    /**
539
     * Escapes using quotes.
540
     * @param string  $realCharset
541
     * @param string  $str
542
     * @param int     $type
543
     * @return string
544
     */
545 2
    function escapeUsingQuotes(string $realCharset, string $str, int $type): string {
546 2
        if($type === \Plasma\DriverInterface::QUOTE_TYPE_IDENTIFIER) {
547 1
            return '`'.\str_replace('`', '``', $str).'`';
548
        }
549
        
550
        $escapeChars = array(
551 1
            '"',
552
            '\\',
553
        );
554
        
555
        $escapeReplace = array(
556 1
            '""',
557
            '\\\\',
558
        );
559
        
560 1
        return '"'.\str_replace($escapeChars, $escapeReplace, $str).'"';
561
    }
562
    
563
    /**
564
     * Escapes using backslashes.
565
     * @param string  $realCharset
566
     * @param string  $str
567
     * @param int     $type
568
     * @return string
569
     */
570 6
    function escapeUsingBackslashes(string $realCharset, string $str, int $type): string {
571 6
        if($type === \Plasma\DriverInterface::QUOTE_TYPE_IDENTIFIER) {
572 3
            return '`'.\str_replace('`', '\\`', $str).'`';
573
        }
574
        
575
        $escapeChars = array(
576 3
            '\\',
577
            '"',
578
        );
579
        
580
        $escapeReplace = array(
581 3
            '\\\\',
582
            '\"',
583
        );
584
        
585 3
        return '"'.\str_replace($escapeChars, $escapeReplace, $str).'"';
586
    }
587
    
588
    /**
589
     * Begins a transaction. Resolves with a `TransactionInterface` instance.
590
     *
591
     * Checks out a connection until the transaction gets committed or rolled back.
592
     * It must be noted that the user is responsible for finishing the transaction. The client WILL NOT automatically
593
     * check the connection back into the pool, as long as the transaction is not finished.
594
     *
595
     * Some databases, including MySQL, automatically issue an implicit COMMIT when a database definition language (DDL)
596
     * statement such as DROP TABLE or CREATE TABLE is issued within a transaction.
597
     * The implicit COMMIT will prevent you from rolling back any other changes within the transaction boundary.
598
     * @param \Plasma\ClientInterface  $client
599
     * @param int                      $isolation  See the `TransactionInterface` constants.
600
     * @return \React\Promise\PromiseInterface
601
     * @throws \Plasma\Exception
602
     * @see \Plasma\TransactionInterface
603
     */
604 3
    function beginTransaction(\Plasma\ClientInterface $client, int $isolation = \Plasma\TransactionInterface::ISOLATION_COMMITTED): \React\Promise\PromiseInterface {
605 3
        if($this->goingAway) {
606 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
607
        }
608
        
609 2
        if($this->transaction) {
610 1
            throw new \Plasma\Exception('Driver is already in transaction');
611
        }
612
        
613
        switch ($isolation) {
614 2
            case \Plasma\TransactionInterface::ISOLATION_UNCOMMITTED:
615
                $query = 'SET SESSION TRANSACTION ISOLATION LEVEL READ UNCOMMITTED';
616
            break;
617 2
            case \Plasma\TransactionInterface::ISOLATION_COMMITTED:
618 2
                $query = 'SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED';
619 2
            break;
620
            case \Plasma\TransactionInterface::ISOLATION_REPEATABLE:
621
                $query = 'SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ';
622
            break;
623
            case \Plasma\TransactionInterface::ISOLATION_SERIALIZABLE:
624
                $query = 'SET SESSION TRANSACTION ISOLATION LEVEL SERIALIZABLE';
625
            break;
626
            default:
627
                throw new \Plasma\Exception('Invalid isolation level given');
628
            break;
629
        }
630
        
631 2
        $this->transaction = true;
632
        
633
        return $this->query($client, $query)->then(function () use (&$client) {
634 2
            return $this->query($client, 'START TRANSACTION');
635
        })->then(function () use (&$client, $isolation) {
636 2
            return (new \Plasma\Transaction($client, $this, $isolation));
637
        })->then(null, function (\Throwable $e) {
638
            $this->transaction = false;
639
            throw $e;
640 2
        });
641
    }
642
    
643
    /**
644
     * Informationally closes a transaction. This method is used by `Transaction` to inform the driver of the end of the transaction.
645
     * @return void
646
     */
647 1
    function endTransaction(): void {
648 1
        $this->transaction = false;
649 1
    }
650
    
651
    /**
652
     * Runs the given command.
653
     * Returns a Promise, which resolves with the `end` event argument (defaults to `null),
654
     * or rejects with the `Throwable` of the `error` event.
655
     * When the command is done, the driver must check itself back into the client.
656
     * @param \Plasma\ClientInterface   $client
657
     * @param \Plasma\CommandInterface  $command
658
     * @return \React\Promise\PromiseInterface
659
     */
660 6
    function runCommand(\Plasma\ClientInterface $client, \Plasma\CommandInterface $command) {
661 6
        if($this->goingAway) {
662 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
663
        }
664
        
665
        return (new \React\Promise\Promise(function (callable $resolve, callable $reject) use (&$client, &$command) {
666
            $command->once('end', function ($value = null) use (&$client, &$resolve) {
667 3
                if(!$this->transaction) {
668 3
                    $client->checkinConnection($this);
669
                }
670
                
671 3
                $resolve($value);
672 5
            });
673
            
674
            $command->once('error', function (\Throwable $error) use (&$client, &$reject) {
675 2
                if(!$this->transaction) {
676 2
                    $client->checkinConnection($this);
677
                }
678
                
679 2
                $reject($error);
680 5
            });
681
            
682 5
            $this->executeCommand($command);
683 5
        }));
684
    }
685
    
686
    /**
687
     * Runs the given SQL querybuilder.
688
     * The driver CAN throw an exception if the given querybuilder is not supported.
689
     * An example would be a SQL querybuilder and a Cassandra driver.
690
     * @param \Plasma\ClientInterface           $client
691
     * @param \Plasma\SQLQuerybuilderInterface  $query
692
     * @return \React\Promise\PromiseInterface
693
     * @throws \Plasma\Exception
694
     */
695 3
    function runQuery(\Plasma\ClientInterface $client, \Plasma\QuerybuilderInterface $query): \React\Promise\PromiseInterface {
696 3
        if($this->goingAway) {
697 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
698
        }
699
        
700 2
        if(!($query instanceof \Plasma\SQLQuerybuilderInterface)) {
701 1
            throw new \Plasma\Exception('Given querybuilder must be a SQL querybuilder');
702
        }
703
        
704 1
        $sql = $query->getQuery();
705 1
        $params = $query->getParameters();
706
        
707 1
        return $this->execute($client, $sql, $params);
708
    }
709
    
710
    /**
711
     * Executes a command.
712
     * @param \Plasma\CommandInterface  $command
713
     * @return void
714
     * @internal
715
     */
716 60
    function executeCommand(\Plasma\CommandInterface $command): void {
717 60
        $this->queue[] = $command;
718
        
719 60
        if($this->parser && $this->busy === static::STATE_IDLE) {
720 59
            $this->parser->invokeCommand($this->getNextCommand());
721
        }
722 60
    }
723
    
724
    /**
725
     * Get the handshake message, or null if none received yet.
726
     * @return \Plasma\Drivers\MySQL\Messages\HandshakeMessage|null
727
     */
728 59
    function getHandshake(): ?\Plasma\Drivers\MySQL\Messages\HandshakeMessage {
729 59
        if($this->parser) {
730 59
            return $this->parser->getHandshakeMessage();
731
        }
732
        
733
        return null;
734
    }
735
    
736
    /**
737
     * Get the next command, or null.
738
     * @return \Plasma\CommandInterface|null
739
     * @internal
740
     */
741 59
    function getNextCommand(): ?\Plasma\CommandInterface {
742 59
        if(\count($this->queue) === 0) {
743 59
            if($this->goingAway) {
744
                $this->goingAway->resolve();
745
            }
746
            
747 59
            return null;
748 59
        } elseif($this->busy === static::STATE_BUSY) {
749
            return null;
750
        }
751
        
752
        /** @var \Plasma\CommandInterface  $command */
753 59
        $command =  \array_shift($this->queue);
754
        
755 59
        if($command->waitForCompletion()) {
756 59
            $this->busy = static::STATE_BUSY;
757
            
758
            $command->once('error', function () use (&$command) {
759 2
                $this->busy = static::STATE_IDLE;
760
                
761 2
                $this->endCommand();
762 59
            });
763
            
764
            $command->once('end', function () use (&$command) {
765 59
                $this->busy = static::STATE_IDLE;
766
                
767 59
                $this->endCommand();
768 59
            });
769
        } else {
770 35
            $this->endCommand();
771
        }
772
        
773 59
        return $command;
774
    }
775
    
776
    /**
777
     * Finishes up a command.
778
     * @return void
779
     */
780 59
    protected function endCommand() {
781
        $this->loop->futureTick(function () {
782 59
            if($this->goingAway && \count($this->queue) === 0) {
783 1
                return $this->goingAway->resolve();
784
            }
785
            
786 59
            $this->parser->invokeCommand($this->getNextCommand());
787 59
        });
788 59
    }
789
    
790
    /**
791
     * Starts the handshake process.
792
     * @param array                    $credentials
793
     * @param \React\Promise\Deferred  $deferred
794
     * @return void
795
     */
796 62
    protected function startHandshake(array $credentials, \React\Promise\Deferred $deferred) {
797
        $listener = function (\Plasma\Drivers\MySQL\Messages\MessageInterface $message) use ($credentials, &$deferred, &$listener) {
798 62
            if($message instanceof \Plasma\Drivers\MySQL\Messages\HandshakeMessage) {
799 62
                $this->parser->removeListener('message', $listener);
800
                
801 62
                $this->connectionState = static::CONNECTION_SETENV;
802 62
                $clientFlags = \Plasma\Drivers\MySQL\ProtocolParser::CLIENT_CAPABILITIES;
803
                
804 62
                \extract($credentials);
805
                
806 62
                if($db !== '') {
807 46
                    $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_CONNECT_WITH_DB;
808
                }
809
                
810 62
                if($this->charset === null) {
811 2
                    $this->charset = \Plasma\Drivers\MySQL\CharacterSetFlags::CHARSET_MAP[$message->characterSet] ?? 'latin1';
812
                }
813
                
814 62
                if(($message->capability & \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_COMPRESS) !== 0 && \extension_loaded('zlib') && $this->options['compression.enable']) {
815 2
                    $this->parser->enableCompression();
816 2
                    $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_COMPRESS;
817
                }
818
                
819
                // Check if we support auth plugins
820 62
                $plugins = \Plasma\Drivers\MySQL\DriverFactory::getAuthPlugins();
821 62
                $plugin = null;
822
                
823 62
                foreach($plugins as $key => $plug) {
824 62
                    if(\is_int($key) && ($message->capability & $key) !== 0) {
825 62
                        $plugin = $plug;
826 62
                        $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_PLUGIN_AUTH;
827 62
                        break;
828
                    } elseif($key === $message->authPluginName) {
829
                        $plugin = $plug;
830
                        $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_PLUGIN_AUTH;
831
                        break;
832
                    }
833
                }
834
                
835 62
                $remote = \parse_url($this->connection->getRemoteAddress());
836
                
837 62
                if($remote !== false && ($remote['host'] !== '127.0.0.1' || $this->options['tls.forceLocal'])) {
838
                    if(($message->capability & \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_SSL) !== 0) { // If SSL supported, connect through SSL
839
                        $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_SSL;
840
                        
841
                        $ssl = new \Plasma\Drivers\MySQL\Commands\SSLRequestCommand($message, $clientFlags);
842
                        
843
                        $ssl->once('end', function () use ($credentials, $clientFlags, $plugin, &$deferred, &$message) {
844
                            $this->connectionState = static::CONNECTION_SSL_STARTUP;
845
                            
846
                            $this->enableTLS()->then(function () use ($credentials, $clientFlags, $plugin, &$deferred, &$message) {
847
                                $this->createHandshakeResponse($message, $credentials, $clientFlags, $plugin, $deferred);
848
                            }, function (\Throwable $error) use (&$deferred) {
849
                                $deferred->reject($$error);
850
                                $this->connection->close();
851
                            });
852
                        });
853
                        
854
                        return $this->parser->invokeCommand($ssl);
855
                    } elseif($this->options['tls.force'] || $this->options['tls.forceLocal']) {
856
                        $deferred->reject((new \Plasma\Exception('TLS is not supported by the server')));
857
                        $this->connection->close();
858
                        return;
859
                    }
860
                }
861
                
862 62
                $this->createHandshakeResponse($message, $credentials, $clientFlags, $plugin, $deferred);
863
            }
864 62
        };
865
        
866 62
        $this->parser->on('message', $listener);
867
        
868
        $this->parser->on('message', function (\Plasma\Drivers\MySQL\Messages\MessageInterface $message) {
869 62
            if($message instanceof \Plasma\Drivers\MySQL\Messages\OkResponseMessage) {
870 61
                $this->connectionState = static::CONNECTION_OK;
871
            }
872
            
873 62
            $this->emit('eventRelay', array('message', $message));
874 62
        });
875 62
    }
876
    
877
    /**
878
     * Enables TLS on the connection.
879
     * @return \React\Promise\PromiseInterface
880
     */
881
    protected function enableTLS(): \React\Promise\PromiseInterface {
882
        // Set required SSL/TLS context options
883
        foreach($this->options['tls.context'] as $name => $value) {
884
            \stream_context_set_option($this->connection->stream, 'ssl', $name, $value);
885
        }
886
        
887
        return $this->encryption->enable($this->connection)->then(null, function (\Throwable $error) {
888
            $this->connection->close();
889
            throw new \RuntimeException('Connection failed during TLS handshake: '.$error->getMessage(), $error->getCode());
890
        });
891
    }
892
    
893
    /**
894
     * Sends the auth command.
895
     * @param \Plasma\Drivers\MySQL\Messages\HandshakeMessage  $message
896
     * @param array                                            $credentials
897
     * @param int                                              $clientFlags
898
     * @param string|null                                      $plugin
899
     * @param \React\Promise\Deferred                          $deferred
900
     * @return void
901
     */
902 62
    protected function createHandshakeResponse(
903
        \Plasma\Drivers\MySQL\Messages\HandshakeMessage $message, array $credentials, int $clientFlags, ?string $plugin, \React\Promise\Deferred $deferred
904
    ) {
905 62
        \extract($credentials);
906
        
907 62
        $auth = new \Plasma\Drivers\MySQL\Commands\HandshakeResponseCommand($this->parser, $message, $clientFlags, $plugin, $user, $password, $db);
908
        
909
        $auth->once('end', function () use (&$deferred) {
910
            $this->loop->futureTick(function () use (&$deferred) {
911 61
                $deferred->resolve();
912 61
            });
913 62
        });
914
        
915
        $auth->once('error', function (\Throwable $error) use (&$deferred) {
916 1
            $deferred->reject($error);
917 1
            $this->connection->close();
918 62
        });
919
        
920 62
        if($plugin) {
921
            $listener = function (\Plasma\Drivers\MySQL\Messages\MessageInterface $message) use ($password, &$deferred, &$listener) {
922
                /** @var \Plasma\Drivers\MySQL\AuthPlugins\AuthPluginInterface|null  $plugin */
923 62
                static $plugin;
924
                
925 62
                if($message instanceof \Plasma\Drivers\MySQL\Messages\AuthSwitchRequestMessage) {
926
                    $name = $message->authPluginName;
927
                    
928
                    if($name !== null) {
929
                        $plugins = \Plasma\Drivers\MySQL\DriverFactory::getAuthPlugins();
930
                        foreach($plugins as $key => $plug) {
931
                            if($key === $name) {
932
                                $plugin = new $plug($this->parser, $this->parser->getHandshakeMessage());
933
                                
934
                                $command = new \Plasma\Drivers\MySQL\Commands\AuthSwitchResponseCommand($message, $plugin, $password);
935
                                return $this->parser->invokeCommand($command);
936
                            }
937
                        }
938
                    }
939
                    
940
                    $deferred->reject((new \Plasma\Exception('Requested authentication method '.($name ? '"'.$name.'" ' : '').'is not supported')));
941 62
                } elseif($message instanceof \Plasma\Drivers\MySQL\Messages\AuthMoreDataMessage) {
942
                    if($plugin === null) {
943
                        $deferred->reject((new \Plasma\Exception('No auth plugin is in use, but we received auth more data packet')));
944
                        return $this->connection->close();
945
                    }
946
                    
947
                    try {
948
                        $command = $plugin->receiveMoreData($message);
949
                        return $this->parser->invokeCommand($command);
950
                    } catch (\Plasma\Exception $e) {
951
                        $deferred->reject($e);
952
                        $this->connection->close();
953
                    }
954 62
                } elseif($message instanceof \Plasma\Drivers\MySQL\Messages\OkResponseMessage) {
955 61
                    $this->parser->removeListener('message', $listener);
956
                }
957 62
            };
958
            
959 62
            $this->parser->on('message', $listener);
960
        }
961
        
962 62
        $this->parser->invokeCommand($auth);
963 62
        $this->connectionState = static::CONNECTION_AWAITING_RESPONSE;
964 62
    }
965
    
966
    /**
967
     * Get the real charset from the DB charset.
968
     * @param string  $charset
969
     * @return string
970
     */
971 4
    protected function getRealCharset(string $charset): string {
972 4
        if(\substr($charset, 0, 4) === 'utf8') {
973 2
            return 'UTF-8';
974
        }
975
        
976 2
        $charsets = \mb_list_encodings();
977
        
978 2
        foreach($charsets as $set) {
979 2
            if(\stripos($set, $charset) === 0) {
980 2
                return $set;
981
            }
982
        }
983
        
984 2
        return 'UTF-8';
985
    }
986
    
987
    /**
988
     * Validates the given options.
989
     * @param array  $options
990
     * @return void
991
     * @throws \InvalidArgumentException
992
     */
993 89
    protected function validateOptions(array $options) {
994 89
        \CharlotteDunois\Validation\Validator::make($options, array(
995 89
            'characters.set' => 'string',
996
            'characters.collate' => 'string',
997
            'compression.enable' => 'boolean',
998
            'connector' => 'class:'.\React\Socket\ConnectorInterface::class.'=object',
999
            'packet.maxAllowedSize' => 'integer|min:0|max:'.\Plasma\Drivers\MySQL\ProtocolParser::CLIENT_MAX_PACKET_SIZE,
1000
            'tls.context' => 'array',
1001
            'tls.force' => 'boolean',
1002
            'tls.forceLocal' => 'boolean'
1003 89
        ), true)->throw(\InvalidArgumentException::class);
1004 89
    }
1005
}
1006