GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Test Failed
Push — master ( 655fbe...ab123d )
by Charlotte
02:23
created

Driver::startHandshake()   D

Complexity

Conditions 18
Paths 1

Size

Total Lines 78
Code Lines 47

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 28
CRAP Score 37.4121

Importance

Changes 0
Metric Value
cc 18
eloc 47
nc 1
nop 2
dl 0
loc 78
ccs 28
cts 46
cp 0.6087
crap 37.4121
rs 4.8666
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
/**
3
 * Plasma Driver MySQL component
4
 * Copyright 2018-2019 PlasmaPHP, All Rights Reserved
5
 *
6
 * Website: https://github.com/PlasmaPHP
7
 * License: https://github.com/PlasmaPHP/driver-mysql/blob/master/LICENSE
8
*/
9
10
namespace Plasma\Drivers\MySQL;
11
12
/**
13
 * The MySQL Driver.
14
 * @internal
15
 */
16
class Driver implements \Plasma\DriverInterface {
17
    use \Evenement\EventEmitterTrait;
18
    
19
    /**
20
     * @var \React\EventLoop\LoopInterface
21
     */
22
    protected $loop;
23
    
24
    /**
25
     * @var array
26
     */
27
    protected $options = array(
28
        'characters.set' => 'utf8mb4',
29
        'characters.collate' => null,
30
        'compression.enable' => !true,
31
        'tls.context' => array(),
32
        'tls.force' => true,
33
        'tls.forceLocal' => false
34
    );
35
    
36
    /**
37
     * @var string[]
38
     */
39
    protected $allowedSchemes = array('mysql', 'tcp', 'tls', 'unix');
40
    
41
    /**
42
     * @var \React\Socket\ConnectorInterface
43
     */
44
    protected $connector;
45
    
46
    /**
47
     * Internal class is intentional used, as there's no other way currently.
48
     * @var \React\Socket\StreamEncryption
49
     * @see https://github.com/reactphp/socket/issues/180
50
     */
51
    protected $encryption;
52
    
53
    /**
54
     * @var \React\Promise\Promise|null
55
     */
56
    protected $connectPromise;
57
    
58
    /**
59
     * @var \React\Socket\Connection
60
     */
61
    protected $connection;
62
    
63
    /**
64
     * @var int
65
     */
66
    protected $connectionState = \Plasma\DriverInterface::CONNECTION_CLOSED;
67
    
68
    /**
69
     * @var \Plasma\Drivers\MySQL\ProtocolParser
70
     */
71
    protected $parser;
72
    
73
    /**
74
     * @var array
75
     */
76
    protected $queue;
77
    
78
    /**
79
     * @var int
80
     */
81
    protected $busy = \Plasma\DriverInterface::STATE_IDLE;
82
    
83
    /**
84
     * @var bool
85
     */
86
    protected $transaction = false;
87
    
88
    /**
89
     * @var \React\Promise\Deferred
90
     */
91
    protected $goingAway;
92
    
93
    /**
94
     * @var string|null
95
     */
96
    protected $charset;
97
    
98
    /**
99
     * Constructor.
100
     * @param \React\EventLoop\LoopInterface  $loop
101
     * @param array                           $options
102
     */
103 90
    function __construct(\React\EventLoop\LoopInterface $loop, array $options) {
104 90
        $this->validateOptions($options);
105
        
106 90
        $this->loop = $loop;
107 90
        $this->options = \array_merge($this->options, $options);
108
        
109 90
        $this->connector = ($options['connector'] ?? (new \React\Socket\Connector($loop)));
110 90
        $this->encryption = new \React\Socket\StreamEncryption($this->loop, false);
111 90
        $this->queue = array();
112 90
    }
113
    
114
    /**
115
     * Destructor.
116
     * @return void
117
     * @internal
118
     */
119 27
    function __destruct() {
120 27
        $this->close();
121 27
    }
122
    
123
    /**
124
     * Returns the event loop.
125
     * @return \React\EventLoop\LoopInterface
126
     */
127 60
    function getLoop(): \React\EventLoop\LoopInterface {
128 60
        return $this->loop;
129
    }
130
    
131
    /**
132
     * Retrieves the current connection state.
133
     * @return int
134
     */
135 11
    function getConnectionState(): int {
136 11
        return $this->connectionState;
137
    }
138
    
139
    /**
140
     * Retrieves the current busy state.
141
     * @return int
142
     */
143 1
    function getBusyState(): int {
144 1
        return $this->busy;
145
    }
146
    
147
    /**
148
     * Get the length of the driver backlog queue.
149
     * @return int
150
     */
151 1
    function getBacklogLength(): int {
152 1
        return \count($this->queue);
153
    }
154
    
155
    /**
156
     * Connects to the given URI.
157
     * @param string  $uri
158
     * @return \React\Promise\PromiseInterface
159
     * @throws \InvalidArgumentException
160
     */
161 68
    function connect(string $uri): \React\Promise\PromiseInterface {
162 68
        if($this->goingAway || $this->connectionState === \Plasma\DriverInterface::CONNECTION_UNUSABLE) {
163 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
164 67
        } elseif($this->connectionState === \Plasma\DriverInterface::CONNECTION_OK) {
165 1
            return \React\Promise\resolve();
166 67
        } elseif($this->connectPromise !== null) {
167 1
            return $this->connectPromise;
168
        }
169
        
170 67
        $pos = \strpos($uri, '://');
171
        
172 67
        if($pos === false) {
173 2
            $uri = 'tcp://'.$uri;
174 65
        } elseif(\substr($uri, 0, $pos) === 'unix') {
175 2
            $defaultSocket = '/tmp/mysql.sock';
176
            
177 2
            $apos = \strpos($uri, '@');
178 2
            $spos = \strrpos($uri, '/');
179
            
180 2
            if($apos === false) {
181 1
                throw new \InvalidArgumentException('Connecting without any username is not a valid operation');
182
            }
183
            
184 1
            $socket = \substr($uri, ($apos + 1), ($spos - ($apos + 1)));
185 1
            $uri = \substr($uri, 0, ($apos + 1)).'localhost'.\substr($uri, $spos);
186
            
187 1
            if($socket === 'localhost' || $socket === '127.0.0.1' || $socket === '::1') {
188
                $socket = $defaultSocket;
189
            }
190
        }
191
        
192 66
        $parts = \parse_url($uri);
193 66
        if(!isset($parts['scheme']) || !isset($parts['host']) || !\in_array($parts['scheme'], $this->allowedSchemes)) {
194 2
            throw new \InvalidArgumentException('Invalid connect uri given');
195
        }
196
        
197 64
        if(isset($socket)) {
198 1
            $parts['host'] = $socket;
199
        }
200
        
201 64
        if($parts['scheme'] === 'mysql') {
202 3
            $parts['scheme'] = 'tcp';
203
        }
204
        
205 64
        $host = $parts['scheme'].'://'.$parts['host'].($parts['scheme'] !== 'unix' ? ':'.($parts['port'] ?? 3306) : '');
206 64
        $this->connectionState = static::CONNECTION_STARTED;
207 64
        $resolved = false;
208
        
209
        $this->connectPromise = $this->connector->connect($host)->then(function (\React\Socket\ConnectionInterface $connection) use ($parts, &$resolved) {
210 62
            $this->connectPromise = null;
211
            
212
            // See description of property encryption
213 62
            if(!($connection instanceof \React\Socket\Connection)) {
214
                throw new \LogicException('Custom connection class is NOT supported yet (encryption limitation)');
215
            }
216
            
217 62
            $this->busy = static::STATE_BUSY;
218 62
            $this->connectionState = static::CONNECTION_MADE;
219 62
            $this->connection = $connection;
220
            
221
            $this->connection->on('error', function (\Throwable $error) {
222
                $this->emit('error', array($error));
223 62
            });
224
            
225
            $this->connection->on('close', function () {
226 3
                $this->connection = null;
227 3
                $this->connectionState = static::CONNECTION_UNUSABLE;
228
                
229 3
                $this->emit('close');
230 62
            });
231
            
232 62
            $deferred = new \React\Promise\Deferred();
233 62
            $this->parser = new \Plasma\Drivers\MySQL\ProtocolParser($this, $this->connection);
234
            
235
            $this->parser->on('error', function (\Throwable $error) use (&$deferred, &$resolved) {
236
                if($resolved) {
237
                    $this->emit('error', array($error));
238
                } else {
239
                    $deferred->reject($error);
240
                }
241 62
            });
242
            
243 62
            $user = ($parts['user'] ?? 'root');
244 62
            $password = ($parts['pass'] ?? '');
245 62
            $db = \ltrim(($parts['path'] ?? ''), '/');
246
            
247 62
            $credentials = \compact('user', 'password', 'db');
248
            
249 62
            $this->startHandshake($credentials, $deferred);
250
            return $deferred->promise()->then(function () use (&$resolved) {
251 61
                $this->busy = static::STATE_IDLE;
252 61
                $resolved = true;
253
                
254 61
                if(\count($this->queue) > 0) {
255
                    $this->parser->invokeCommand($this->getNextCommand());
256
                }
257 62
            });
258 64
        });
259
        
260 64
        if($this->options['characters.set']) {
261 62
            $this->charset = $this->options['characters.set'];
262
            $this->connectPromise = $this->connectPromise->then(function () {
263 59
                $query = 'SET NAMES "'.$this->options['characters.set'].'"'
264 59
                    .($this->options['characters.collate'] ? ' COLLATE "'.$this->options['characters.collate'].'"' : '');
265
                
266 59
                $cmd = new \Plasma\Drivers\MySQL\Commands\QueryCommand($this, $query);
267 59
                $this->executeCommand($cmd);
268
                
269 59
                return $cmd->getPromise();
270 62
            });
271
        }
272
        
273 64
        return $this->connectPromise;
274
    }
275
    
276
    /**
277
     * Pauses the underlying stream I/O consumption.
278
     * If consumption is already paused, this will do nothing.
279
     * @return bool  Whether the operation was successful.
280
     */
281 1
    function pauseStreamConsumption(): bool {
282 1
        if($this->connection === null || $this->goingAway) {
283 1
            return false;
284
        }
285
        
286
        $this->connection->pause();
287
        return true;
288
    }
289
    
290
    /**
291
     * Resumes the underlying stream I/O consumption.
292
     * If consumption is not paused, this will do nothing.
293
     * @return bool  Whether the operation was successful.
294
     */
295 1
    function resumeStreamConsumption(): bool {
296 1
        if($this->connection === null || $this->goingAway) {
297 1
            return false;
298
        }
299
        
300
        $this->connection->resume();
301
        return true;
302
    }
303
    
304
    /**
305
     * Closes all connections gracefully after processing all outstanding requests.
306
     * @return \React\Promise\PromiseInterface
307
     */
308 28
    function close(): \React\Promise\PromiseInterface {
309 28
        if($this->goingAway) {
310 13
            return $this->goingAway->promise();
311
        }
312
        
313 17
        $state = $this->connectionState;
314 17
        $this->connectionState = \Plasma\DriverInterface::CONNECTION_UNUSABLE;
315
        
316
        // Connection is still pending
317 17
        if($this->connectPromise !== null) {
318 1
            $this->connectPromise->cancel();
319
            
320
            /** @var \Plasma\Drivers\MySQL\Commands\CommandInterface  $command */
321 1
            while($command = \array_shift($this->queue)) {
322
                $command->emit('error', array((new \Plasma\Exception('Connection is going away'))));
323
            }
324
        }
325
        
326 17
        $this->goingAway = new \React\Promise\Deferred();
327
        
328 17
        if(\count($this->queue) === 0 || $state < \Plasma\DriverInterface::CONNECTION_OK) {
329 16
            $this->queue = array();
330 16
            $this->goingAway->resolve();
331
        }
332
        
333
        return $this->goingAway->promise()->then(function () use ($state) {
334 17
            if($state !== static::CONNECTION_OK) {
335 16
                return;
336 1
            } elseif(!$this->connection->isWritable()) {
337
                $this->connection->close();
338
                return;
339
            }
340
            
341 1
            $deferred = new \React\Promise\Deferred();
342
            
343 1
            $quit = new \Plasma\Drivers\MySQL\Commands\QuitCommand();
344
            
345
            $this->connection->once('close', function () use (&$deferred) {
346 1
                $deferred->resolve();
347 1
            });
348
            
349
            $quit->once('end', function () {
350
                $this->connection->close();
351 1
            });
352
            
353 1
            $this->parser->invokeCommand($quit);
354
            
355 1
            return $deferred->promise();
356 17
        });
357
    }
358
    
359
    /**
360
     * Forcefully closes the connection, without waiting for any outstanding requests. This will reject all outstanding requests.
361
     * @return void
362
     */
363 12
    function quit(): void {
364 12
        if($this->goingAway === null) {
365 12
            $this->goingAway = new \React\Promise\Deferred();
366
        }
367
        
368 12
        $state = $this->connectionState;
369 12
        $this->connectionState = \Plasma\DriverInterface::CONNECTION_UNUSABLE;
370
        
371
        /** @var \Plasma\Drivers\MySQL\Commands\CommandInterface  $command */
372 12
        while($command = \array_shift($this->queue)) {
373 1
            $command->emit('error', array((new \Plasma\Exception('Connection is going away'))));
374
        }
375
        
376 12
        if($this->connectPromise !== null) {
377 1
            $this->connectPromise->cancel();
378
        }
379
        
380 12
        if($state === static::CONNECTION_OK) {
381 1
            $quit = new \Plasma\Drivers\MySQL\Commands\QuitCommand();
382 1
            $this->parser->invokeCommand($quit);
383
            
384 1
            $this->connection->close();
385
        }
386
        
387 12
        $this->goingAway->resolve();
388 12
    }
389
    
390
    /**
391
     * Whether this driver is currently in a transaction.
392
     * @return bool
393
     */
394 2
    function isInTransaction(): bool {
395 2
        return $this->transaction;
396
    }
397
    
398
    /**
399
     * Executes a plain query. Resolves with a `QueryResultInterface` instance.
400
     * When the command is done, the driver must check itself back into the client.
401
     * @param \Plasma\ClientInterface  $client
402
     * @param string                   $query
403
     * @return \React\Promise\PromiseInterface
404
     * @throws \Plasma\Exception
405
     * @see \Plasma\QueryResultInterface
406
     */
407 46
    function query(\Plasma\ClientInterface $client, string $query): \React\Promise\PromiseInterface {
408 46
        if($this->goingAway) {
409 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
410 45
        } elseif($this->connectionState !== \Plasma\DriverInterface::CONNECTION_OK) {
411 1
            if($this->connectPromise !== null) {
412
                return $this->connectPromise->then(function () use (&$client, &$query) {
413
                    return $this->query($client, $query);
414
                });
415
            }
416
            
417 1
            throw new \Plasma\Exception('Unable to continue without connection');
418
        }
419
        
420 44
        $command = new \Plasma\Drivers\MySQL\Commands\QueryCommand($this, $query);
421 44
        $this->executeCommand($command);
422
        
423 44
        if(!$this->transaction) {
424
            $command->once('end', function () use (&$client) {
425 44
                $client->checkinConnection($this);
426 44
            });
427
        }
428
        
429 44
        return $command->getPromise();
430
    }
431
    
432
    /**
433
     * Prepares a query. Resolves with a `StatementInterface` instance.
434
     * When the command is done, the driver must check itself back into the client.
435
     * @param \Plasma\ClientInterface  $client
436
     * @param string                   $query
437
     * @return \React\Promise\PromiseInterface
438
     * @throws \Plasma\Exception
439
     * @see \Plasma\StatementInterface
440
     */
441 38
    function prepare(\Plasma\ClientInterface $client, string $query): \React\Promise\PromiseInterface {
442 38
        if($this->goingAway) {
443 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
444 37
        } elseif($this->connectionState !== \Plasma\DriverInterface::CONNECTION_OK) {
445 1
            if($this->connectPromise !== null) {
446
                return $this->connectPromise->then(function () use (&$client, &$query) {
447
                    return $this->prepare($client, $query);
448
                });
449
            }
450
            
451 1
            throw new \Plasma\Exception('Unable to continue without connection');
452
        }
453
        
454 36
        $command = new \Plasma\Drivers\MySQL\Commands\StatementPrepareCommand($client, $this, $query);
455 36
        $this->executeCommand($command);
456
        
457 36
        return $command->getPromise();
458
    }
459
    
460
    /**
461
     * Prepares and executes a query. Resolves with a `QueryResultInterface` instance.
462
     * This is equivalent to prepare -> execute -> close.
463
     * If you need to execute a query multiple times, prepare the query manually for performance reasons.
464
     * @param \Plasma\ClientInterface  $client
465
     * @param string                   $query
466
     * @param array                    $params
467
     * @return \React\Promise\PromiseInterface
468
     * @throws \Plasma\Exception
469
     * @see \Plasma\StatementInterface
470
     */
471 37
    function execute(\Plasma\ClientInterface $client, string $query, array $params = array()): \React\Promise\PromiseInterface {
472 37
        if($this->goingAway) {
473 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
474 36
        } elseif($this->connectionState !== \Plasma\DriverInterface::CONNECTION_OK) {
475 1
            if($this->connectPromise !== null) {
476
                return $this->connectPromise->then(function () use (&$client, &$query, $params) {
477
                    return $this->execute($client, $query, $params);
478
                });
479
            }
480
            
481 1
            throw new \Plasma\Exception('Unable to continue without connection');
482
        }
483
        
484
        return $this->prepare($client, $query)->then(function (\Plasma\StatementInterface $statement) use ($params) {
485
            return $statement->execute($params)->then(function (\Plasma\QueryResultInterface $result) use (&$statement) {
486 34
                if($result instanceof \Plasma\StreamQueryResultInterface) {
487
                    $statement->close()->then(null, function (\Throwable $error) {
488
                        $this->emit('error', array($error));
489 34
                    });
490
                    
491 34
                    return $result;
492
                }
493
                
494
                return $statement->close()->then(function () use ($result) {
495 33
                    return $result;
496 33
                });
497
            }, function (\Throwable $error) use (&$statement) {
498
                return $statement->close()->then(function () use ($error) {
499
                    throw $error;
500
                });
501 35
            });
502 35
        });
503
    }
504
    
505
    /**
506
     * Quotes the string for use in the query.
507
     * @param string  $str
508
     * @param int     $type  For types, see the constants.
509
     * @return string
510
     * @throws \LogicException  Thrown if the driver does not support quoting.
511
     * @throws \Plasma\Exception
512
     */
513 5
    function quote(string $str, int $type = \Plasma\DriverInterface::QUOTE_TYPE_VALUE): string {
514 5
        if($this->parser === null) {
515 1
            throw new \Plasma\Exception('Unable to continue without connection');
516
        }
517
        
518 4
        $message = $this->parser->getLastOkMessage();
519 4
        if($message === null) {
520
            $message = $this->parser->getHandshakeMessage();
521
            
522
            if($message === null) {
523
                throw new \Plasma\Exception('Unable to quote without a previous handshake');
524
            }
525
        }
526
        
527 4
        $pos = \strpos($this->charset, '_');
528 4
        $dbCharset = \substr($this->charset, 0, ($pos !== false ? $pos : \strlen($this->charset)));
529 4
        $realCharset = $this->getRealCharset($dbCharset);
530
        
531 4
        if(($message->statusFlags & \Plasma\Drivers\MySQL\StatusFlags::SERVER_STATUS_NO_BACKSLASH_ESCAPES) !== 0) {
532
            return $this->escapeUsingQuotes($realCharset, $str, $type);
533
        }
534
        
535 4
        return $this->escapeUsingBackslashes($realCharset, $str, $type);
536
    }
537
    
538
    /**
539
     * Escapes using quotes.
540
     * @param string  $realCharset
541
     * @param string  $str
542
     * @param int     $type
543
     * @return string
544
     */
545 2
    function escapeUsingQuotes(string $realCharset, string $str, int $type): string {
546 2
        if($type === \Plasma\DriverInterface::QUOTE_TYPE_IDENTIFIER) {
547 1
            return '`'.\str_replace('`', '``', $str).'`';
548
        }
549
        
550
        $escapeChars = array(
551 1
            '"',
552
            '\\',
553
        );
554
        
555
        $escapeReplace = array(
556 1
            '""',
557
            '\\\\',
558
        );
559
        
560 1
        return '"'.\str_replace($escapeChars, $escapeReplace, $str).'"';
561
    }
562
    
563
    /**
564
     * Escapes using backslashes.
565
     * @param string  $realCharset
566
     * @param string  $str
567
     * @param int     $type
568
     * @return string
569
     */
570 6
    function escapeUsingBackslashes(string $realCharset, string $str, int $type): string {
571 6
        if($type === \Plasma\DriverInterface::QUOTE_TYPE_IDENTIFIER) {
572 3
            return '`'.\str_replace('`', '\\`', $str).'`';
573
        }
574
        
575
        $escapeChars = array(
576 3
            '\\',
577
            '"',
578
        );
579
        
580
        $escapeReplace = array(
581 3
            '\\\\',
582
            '\"',
583
        );
584
        
585 3
        return '"'.\str_replace($escapeChars, $escapeReplace, $str).'"';
586
    }
587
    
588
    /**
589
     * Begins a transaction. Resolves with a `TransactionInterface` instance.
590
     *
591
     * Checks out a connection until the transaction gets committed or rolled back.
592
     * It must be noted that the user is responsible for finishing the transaction. The client WILL NOT automatically
593
     * check the connection back into the pool, as long as the transaction is not finished.
594
     *
595
     * Some databases, including MySQL, automatically issue an implicit COMMIT when a database definition language (DDL)
596
     * statement such as DROP TABLE or CREATE TABLE is issued within a transaction.
597
     * The implicit COMMIT will prevent you from rolling back any other changes within the transaction boundary.
598
     * @param \Plasma\ClientInterface  $client
599
     * @param int                      $isolation  See the `TransactionInterface` constants.
600
     * @return \React\Promise\PromiseInterface
601
     * @throws \Plasma\Exception
602
     * @see \Plasma\TransactionInterface
603
     */
604 3
    function beginTransaction(\Plasma\ClientInterface $client, int $isolation = \Plasma\TransactionInterface::ISOLATION_COMMITTED): \React\Promise\PromiseInterface {
605 3
        if($this->goingAway) {
606 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
607
        }
608
        
609 2
        if($this->transaction) {
610
            throw new \Plasma\Exception('Driver is already in transaction');
611
        }
612
        
613
        switch ($isolation) {
614 2
            case \Plasma\TransactionInterface::$isolation = \Plasma\TransactionInterface::ISOLATION_NO_CHANGE:
0 ignored issues
show
Coding Style introduced by
Line indented incorrectly; expected 8 spaces, found 12
Loading history...
Bug introduced by
Accessing isolation on the interface Plasma\TransactionInterface suggest that you code against a concrete implementation. How about adding an instanceof check?
Loading history...
615
                return $this->query($client, 'START TRANSACTION')->then(function () use (&$client, $isolation) {
616
                    return (new \Plasma\Transaction($client, $this, $isolation));
617
                })->then(null, function (\Throwable $e) {
618
                    $this->transaction = false;
619
                    throw $e;
620
                });
621
            break;
622
            case \Plasma\TransactionInterface::ISOLATION_UNCOMMITTED:
623
                $query = 'SET SESSION TRANSACTION ISOLATION LEVEL READ UNCOMMITTED';
624
            break;
625
            case \Plasma\TransactionInterface::ISOLATION_COMMITTED:
626
                $query = 'SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED';
627
            break;
628
            case \Plasma\TransactionInterface::ISOLATION_REPEATABLE:
629
                $query = 'SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ';
630
            break;
631
            case \Plasma\TransactionInterface::ISOLATION_SERIALIZABLE:
632
                $query = 'SET SESSION TRANSACTION ISOLATION LEVEL SERIALIZABLE';
633
            break;
634
            default:
635
                throw new \Plasma\Exception('Invalid isolation level given');
636
            break;
637
        }
638
        
639
        $this->transaction = true;
640
        
641
        return $this->query($client, $query)->then(function () use (&$client) {
642
            return $this->query($client, 'START TRANSACTION');
643
        })->then(function () use (&$client, $isolation) {
644
            return (new \Plasma\Transaction($client, $this, $isolation));
645
        })->then(null, function (\Throwable $e) {
646
            $this->transaction = false;
647
            throw $e;
648
        });
649
    }
650
    
651
    /**
652
     * Informationally closes a transaction. This method is used by `Transaction` to inform the driver of the end of the transaction.
653
     * @return void
654
     */
655
    function endTransaction(): void {
656
        $this->transaction = false;
657
    }
658
    
659
    /**
660
     * Runs the given command.
661
     * Returns a Promise, which resolves with the `end` event argument (defaults to `null),
662
     * or rejects with the `Throwable` of the `error` event.
663
     * When the command is done, the driver must check itself back into the client.
664
     * @param \Plasma\ClientInterface   $client
665
     * @param \Plasma\CommandInterface  $command
666
     * @return \React\Promise\PromiseInterface
667
     */
668 6
    function runCommand(\Plasma\ClientInterface $client, \Plasma\CommandInterface $command) {
669 6
        if($this->goingAway) {
670 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
671
        }
672
        
673
        return (new \React\Promise\Promise(function (callable $resolve, callable $reject) use (&$client, &$command) {
674
            $command->once('end', function ($value = null) use (&$client, &$resolve) {
675 3
                if(!$this->transaction) {
676 3
                    $client->checkinConnection($this);
677
                }
678
                
679 3
                $resolve($value);
680 5
            });
681
            
682
            $command->once('error', function (\Throwable $error) use (&$client, &$reject) {
683 2
                if(!$this->transaction) {
684 2
                    $client->checkinConnection($this);
685
                }
686
                
687 2
                $reject($error);
688 5
            });
689
            
690 5
            $this->executeCommand($command);
691 5
        }));
692
    }
693
    
694
    /**
695
     * Runs the given SQL querybuilder.
696
     * The driver CAN throw an exception if the given querybuilder is not supported.
697
     * An example would be a SQL querybuilder and a Cassandra driver.
698
     * @param \Plasma\ClientInterface           $client
699
     * @param \Plasma\SQLQueryBuilderInterface  $query
700
     * @return \React\Promise\PromiseInterface
701
     * @throws \Plasma\Exception
702
     */
703 2
    function runQuery(\Plasma\ClientInterface $client, \Plasma\QueryBuilderInterface $query): \React\Promise\PromiseInterface {
704 2
        if($this->goingAway) {
705 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
706
        }
707
        
708 1
        if(!($query instanceof \Plasma\SQLQueryBuilderInterface)) {
709 1
            throw new \Plasma\Exception('Given querybuilder must be a SQL querybuilder');
710
        }
711
        
712
        $sql = $query->getQuery();
713
        $params = $query->getParameters();
714
        
715
        return $this->execute($client, $sql, $params);
716
    }
717
    
718
    /**
719
     * Executes a command.
720
     * @param \Plasma\CommandInterface  $command
721
     * @return void
722
     * @internal
723
     */
724 60
    function executeCommand(\Plasma\CommandInterface $command): void {
725 60
        $this->queue[] = $command;
726
        
727 60
        if($this->parser && $this->busy === static::STATE_IDLE) {
728 59
            $this->parser->invokeCommand($this->getNextCommand());
729
        }
730 60
    }
731
    
732
    /**
733
     * Get the handshake message, or null if none received yet.
734
     * @return \Plasma\Drivers\MySQL\Messages\HandshakeMessage|null
735
     */
736 59
    function getHandshake(): ?\Plasma\Drivers\MySQL\Messages\HandshakeMessage {
737 59
        if($this->parser) {
738 59
            return $this->parser->getHandshakeMessage();
739
        }
740
        
741
        return null;
742
    }
743
    
744
    /**
745
     * Get the next command, or null.
746
     * @return \Plasma\CommandInterface|null
747
     * @internal
748
     */
749 59
    function getNextCommand(): ?\Plasma\CommandInterface {
750 59
        if(\count($this->queue) === 0) {
751 59
            if($this->goingAway) {
752
                $this->goingAway->resolve();
753
            }
754
            
755 59
            return null;
756 59
        } elseif($this->busy === static::STATE_BUSY) {
757
            return null;
758
        }
759
        
760
        /** @var \Plasma\CommandInterface  $command */
761 59
        $command =  \array_shift($this->queue);
762
        
763 59
        if($command->waitForCompletion()) {
764 59
            $this->busy = static::STATE_BUSY;
765
            
766
            $command->once('error', function () use (&$command) {
767 2
                $this->busy = static::STATE_IDLE;
768
                
769 2
                $this->endCommand();
770 59
            });
771
            
772
            $command->once('end', function () use (&$command) {
773 59
                $this->busy = static::STATE_IDLE;
774
                
775 59
                $this->endCommand();
776 59
            });
777
        } else {
778 35
            $this->endCommand();
779
        }
780
        
781 59
        return $command;
782
    }
783
    
784
    /**
785
     * Finishes up a command.
786
     * @return void
787
     */
788 59
    protected function endCommand() {
789
        $this->loop->futureTick(function () {
790 59
            if($this->goingAway && \count($this->queue) === 0) {
791 1
                return $this->goingAway->resolve();
792
            }
793
            
794 59
            $this->parser->invokeCommand($this->getNextCommand());
795 59
        });
796 59
    }
797
    
798
    /**
799
     * Starts the handshake process.
800
     * @param array                    $credentials
801
     * @param \React\Promise\Deferred  $deferred
802
     * @return void
803
     */
804 62
    protected function startHandshake(array $credentials, \React\Promise\Deferred $deferred) {
805
        $listener = function (\Plasma\Drivers\MySQL\Messages\MessageInterface $message) use ($credentials, &$deferred, &$listener) {
806 62
            if($message instanceof \Plasma\Drivers\MySQL\Messages\HandshakeMessage) {
807 62
                $this->parser->removeListener('message', $listener);
808
                
809 62
                $this->connectionState = static::CONNECTION_SETENV;
810 62
                $clientFlags = \Plasma\Drivers\MySQL\ProtocolParser::CLIENT_CAPABILITIES;
811
                
812 62
                \extract($credentials);
813
                
814 62
                if($db !== '') {
815 46
                    $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_CONNECT_WITH_DB;
816
                }
817
                
818 62
                if($this->charset === null) {
819 2
                    $this->charset = \Plasma\Drivers\MySQL\CharacterSetFlags::CHARSET_MAP[$message->characterSet] ?? 'latin1';
820
                }
821
                
822 62
                if(($message->capability & \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_COMPRESS) !== 0 && \extension_loaded('zlib') && $this->options['compression.enable']) {
823 2
                    $this->parser->enableCompression();
824 2
                    $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_COMPRESS;
825
                }
826
                
827
                // Check if we support auth plugins
828 62
                $plugins = \Plasma\Drivers\MySQL\DriverFactory::getAuthPlugins();
829 62
                $plugin = null;
830
                
831 62
                foreach($plugins as $key => $plug) {
832 62
                    if(\is_int($key) && ($message->capability & $key) !== 0) {
833 62
                        $plugin = $plug;
834 62
                        $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_PLUGIN_AUTH;
835 62
                        break;
836
                    } elseif($key === $message->authPluginName) {
837
                        $plugin = $plug;
838
                        $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_PLUGIN_AUTH;
839
                        break;
840
                    }
841
                }
842
                
843 62
                $remote = \parse_url($this->connection->getRemoteAddress());
844
                
845 62
                if($remote !== false && ($remote['host'] !== '127.0.0.1' || $this->options['tls.forceLocal'])) {
846
                    if(($message->capability & \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_SSL) !== 0) { // If SSL supported, connect through SSL
847
                        $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_SSL;
848
                        
849
                        $ssl = new \Plasma\Drivers\MySQL\Commands\SSLRequestCommand($message, $clientFlags);
850
                        
851
                        $ssl->once('end', function () use ($credentials, $clientFlags, $plugin, &$deferred, &$message) {
852
                            $this->connectionState = static::CONNECTION_SSL_STARTUP;
853
                            
854
                            $this->enableTLS()->then(function () use ($credentials, $clientFlags, $plugin, &$deferred, &$message) {
855
                                $this->createHandshakeResponse($message, $credentials, $clientFlags, $plugin, $deferred);
856
                            }, function (\Throwable $error) use (&$deferred) {
857
                                $deferred->reject($$error);
858
                                $this->connection->close();
859
                            });
860
                        });
861
                        
862
                        return $this->parser->invokeCommand($ssl);
863
                    } elseif($this->options['tls.force'] || $this->options['tls.forceLocal']) {
864
                        $deferred->reject((new \Plasma\Exception('TLS is not supported by the server')));
865
                        $this->connection->close();
866
                        return;
867
                    }
868
                }
869
                
870 62
                $this->createHandshakeResponse($message, $credentials, $clientFlags, $plugin, $deferred);
871
            }
872 62
        };
873
        
874 62
        $this->parser->on('message', $listener);
875
        
876
        $this->parser->on('message', function (\Plasma\Drivers\MySQL\Messages\MessageInterface $message) {
877 62
            if($message instanceof \Plasma\Drivers\MySQL\Messages\OkResponseMessage) {
878 61
                $this->connectionState = static::CONNECTION_OK;
879
            }
880
            
881 62
            $this->emit('eventRelay', array('message', $message));
882 62
        });
883 62
    }
884
    
885
    /**
886
     * Enables TLS on the connection.
887
     * @return \React\Promise\PromiseInterface
888
     */
889
    protected function enableTLS(): \React\Promise\PromiseInterface {
890
        // Set required SSL/TLS context options
891
        foreach($this->options['tls.context'] as $name => $value) {
892
            \stream_context_set_option($this->connection->stream, 'ssl', $name, $value);
893
        }
894
        
895
        return $this->encryption->enable($this->connection)->then(null, function (\Throwable $error) {
896
            $this->connection->close();
897
            throw new \RuntimeException('Connection failed during TLS handshake: '.$error->getMessage(), $error->getCode());
898
        });
899
    }
900
    
901
    /**
902
     * Sends the auth command.
903
     * @param \Plasma\Drivers\MySQL\Messages\HandshakeMessage  $message
904
     * @param array                                            $credentials
905
     * @param int                                              $clientFlags
906
     * @param string|null                                      $plugin
907
     * @param \React\Promise\Deferred                          $deferred
908
     * @return void
909
     */
910 62
    protected function createHandshakeResponse(
911
        \Plasma\Drivers\MySQL\Messages\HandshakeMessage $message, array $credentials, int $clientFlags, ?string $plugin, \React\Promise\Deferred $deferred
912
    ) {
913 62
        \extract($credentials);
914
        
915 62
        $auth = new \Plasma\Drivers\MySQL\Commands\HandshakeResponseCommand($this->parser, $message, $clientFlags, $plugin, $user, $password, $db);
916
        
917
        $auth->once('end', function () use (&$deferred) {
918
            $this->loop->futureTick(function () use (&$deferred) {
919 61
                $deferred->resolve();
920 61
            });
921 62
        });
922
        
923
        $auth->once('error', function (\Throwable $error) use (&$deferred) {
924 1
            $deferred->reject($error);
925 1
            $this->connection->close();
926 62
        });
927
        
928 62
        if($plugin) {
929
            $listener = function (\Plasma\Drivers\MySQL\Messages\MessageInterface $message) use ($password, &$deferred, &$listener) {
930
                /** @var \Plasma\Drivers\MySQL\AuthPlugins\AuthPluginInterface|null  $plugin */
931 62
                static $plugin;
932
                
933 62
                if($message instanceof \Plasma\Drivers\MySQL\Messages\AuthSwitchRequestMessage) {
934
                    $name = $message->authPluginName;
935
                    
936
                    if($name !== null) {
937
                        $plugins = \Plasma\Drivers\MySQL\DriverFactory::getAuthPlugins();
938
                        foreach($plugins as $key => $plug) {
939
                            if($key === $name) {
940
                                $plugin = new $plug($this->parser, $this->parser->getHandshakeMessage());
941
                                
942
                                $command = new \Plasma\Drivers\MySQL\Commands\AuthSwitchResponseCommand($message, $plugin, $password);
943
                                return $this->parser->invokeCommand($command);
944
                            }
945
                        }
946
                    }
947
                    
948
                    $deferred->reject((new \Plasma\Exception('Requested authentication method '.($name ? '"'.$name.'" ' : '').'is not supported')));
949 62
                } elseif($message instanceof \Plasma\Drivers\MySQL\Messages\AuthMoreDataMessage) {
950
                    if($plugin === null) {
951
                        $deferred->reject((new \Plasma\Exception('No auth plugin is in use, but we received auth more data packet')));
952
                        return $this->connection->close();
953
                    }
954
                    
955
                    try {
956
                        $command = $plugin->receiveMoreData($message);
957
                        return $this->parser->invokeCommand($command);
958
                    } catch (\Plasma\Exception $e) {
959
                        $deferred->reject($e);
960
                        $this->connection->close();
961
                    }
962 62
                } elseif($message instanceof \Plasma\Drivers\MySQL\Messages\OkResponseMessage) {
963 61
                    $this->parser->removeListener('message', $listener);
964
                }
965 62
            };
966
            
967 62
            $this->parser->on('message', $listener);
968
        }
969
        
970 62
        $this->parser->invokeCommand($auth);
971 62
        $this->connectionState = static::CONNECTION_AWAITING_RESPONSE;
972 62
    }
973
    
974
    /**
975
     * Get the real charset from the DB charset.
976
     * @param string  $charset
977
     * @return string
978
     */
979 4
    protected function getRealCharset(string $charset): string {
980 4
        if(\substr($charset, 0, 4) === 'utf8') {
981 2
            return 'UTF-8';
982
        }
983
        
984 2
        $charsets = \mb_list_encodings();
985
        
986 2
        foreach($charsets as $set) {
987 2
            if(\stripos($set, $charset) === 0) {
988 2
                return $set;
989
            }
990
        }
991
        
992 2
        return 'UTF-8';
993
    }
994
    
995
    /**
996
     * Validates the given options.
997
     * @param array  $options
998
     * @return void
999
     * @throws \InvalidArgumentException
1000
     */
1001 90
    protected function validateOptions(array $options) {
1002 90
        \CharlotteDunois\Validation\Validator::make($options, array(
1003 90
            'characters.set' => 'string',
1004
            'characters.collate' => 'string',
1005
            'compression.enable' => 'boolean',
1006
            'connector' => 'class:'.\React\Socket\ConnectorInterface::class.'=object',
1007
            'packet.maxAllowedSize' => 'integer|min:0|max:'.\Plasma\Drivers\MySQL\ProtocolParser::CLIENT_MAX_PACKET_SIZE,
1008
            'tls.context' => 'array',
1009
            'tls.force' => 'boolean',
1010
            'tls.forceLocal' => 'boolean'
1011 90
        ), true)->throw(\InvalidArgumentException::class);
1012 90
    }
1013
}
1014