GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Failed Conditions
Push — master ( d4925e...b99c96 )
by Charlotte
08:36
created

Driver::connect()   D

Complexity

Conditions 23
Paths 105

Size

Total Lines 113
Code Lines 70

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 54
CRAP Score 24.1355

Importance

Changes 0
Metric Value
cc 23
eloc 70
nc 105
nop 1
dl 0
loc 113
ccs 54
cts 62
cp 0.871
crap 24.1355
rs 4.125
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
/**
3
 * Plasma Driver MySQL component
4
 * Copyright 2018-2019 PlasmaPHP, All Rights Reserved
5
 *
6
 * Website: https://github.com/PlasmaPHP
7
 * License: https://github.com/PlasmaPHP/driver-mysql/blob/master/LICENSE
8
*/
9
10
namespace Plasma\Drivers\MySQL;
11
12
/**
13
 * The MySQL Driver.
14
 * @internal
15
 */
16
class Driver implements \Plasma\DriverInterface {
17
    use \Evenement\EventEmitterTrait;
18
    
19
    /**
20
     * @var \React\EventLoop\LoopInterface
21
     */
22
    protected $loop;
23
    
24
    /**
25
     * @var array
26
     */
27
    protected $options = array(
28
        'characters.set' => 'utf8mb4',
29
        'characters.collate' => null,
30
        'compression.enable' => !true,
31
        'tls.context' => array(),
32
        'tls.force' => true,
33
        'tls.forceLocal' => false
34
    );
35
    
36
    /**
37
     * @var string[]
38
     */
39
    protected $allowedSchemes = array('mysql', 'tcp', 'tls', 'unix');
40
    
41
    /**
42
     * @var \React\Socket\ConnectorInterface
43
     */
44
    protected $connector;
45
    
46
    /**
47
     * Internal class is intentional used, as there's no other way currently.
48
     * @var \React\Socket\StreamEncryption
49
     * @see https://github.com/reactphp/socket/issues/180
50
     */
51
    protected $encryption;
52
    
53
    /**
54
     * @var \React\Promise\Promise|null
55
     */
56
    protected $connectPromise;
57
    
58
    /**
59
     * @var \React\Socket\Connection
60
     */
61
    protected $connection;
62
    
63
    /**
64
     * @var int
65
     */
66
    protected $connectionState = \Plasma\DriverInterface::CONNECTION_CLOSED;
67
    
68
    /**
69
     * @var \Plasma\Drivers\MySQL\ProtocolParser
70
     */
71
    protected $parser;
72
    
73
    /**
74
     * @var array
75
     */
76
    protected $queue;
77
    
78
    /**
79
     * @var int
80
     */
81
    protected $busy = \Plasma\DriverInterface::STATE_IDLE;
82
    
83
    /**
84
     * @var bool
85
     */
86
    protected $transaction = false;
87
    
88
    /**
89
     * @var \React\Promise\Deferred
90
     */
91
    protected $goingAway;
92
    
93
    /**
94
     * @var string|null
95
     */
96
    protected $charset;
97
    
98
    /**
99
     * Constructor.
100
     * @param \React\EventLoop\LoopInterface  $loop
101
     * @param array                           $options
102
     */
103 83
    function __construct(\React\EventLoop\LoopInterface $loop, array $options) {
104 83
        $this->validateOptions($options);
105
        
106 83
        $this->loop = $loop;
107 83
        $this->options = \array_merge($this->options, $options);
108
        
109 83
        $this->connector = ($options['connector'] ?? (new \React\Socket\Connector($loop)));
110 83
        $this->encryption = new \React\Socket\StreamEncryption($this->loop, false);
111 83
        $this->queue = array();
112 83
    }
113
    
114
    /**
115
     * Destructor.
116
     * @return void
117
     * @internal
118
     */
119 23
    function __destruct() {
120 23
        $this->close();
121 23
    }
122
    
123
    /**
124
     * Returns the event loop.
125
     * @return \React\EventLoop\LoopInterface
126
     */
127 57
    function getLoop(): \React\EventLoop\LoopInterface {
128 57
        return $this->loop;
129
    }
130
    
131
    /**
132
     * Retrieves the current connection state.
133
     * @return int
134
     */
135 9
    function getConnectionState(): int {
136 9
        return $this->connectionState;
137
    }
138
    
139
    /**
140
     * Retrieves the current busy state.
141
     * @return int
142
     */
143 1
    function getBusyState(): int {
144 1
        return $this->busy;
145
    }
146
    
147
    /**
148
     * Get the length of the driver backlog queue.
149
     * @return int
150
     */
151 1
    function getBacklogLength(): int {
152 1
        return \count($this->queue);
153
    }
154
    
155
    /**
156
     * Connects to the given URI.
157
     * @param string  $uri
158
     * @return \React\Promise\PromiseInterface
159
     */
160 62
    function connect(string $uri): \React\Promise\PromiseInterface {
161 62
        if($this->goingAway || $this->connectionState === \Plasma\DriverInterface::CONNECTION_UNUSABLE) {
162 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
163 61
        } elseif($this->connectionState === \Plasma\DriverInterface::CONNECTION_OK) {
164 1
            return \React\Promise\resolve();
165 61
        } elseif($this->connectPromise !== null) {
166 1
            return $this->connectPromise;
167
        }
168
        
169 61
        $pos = \strpos($uri, '://');
170 2
        $unix = false;
0 ignored issues
show
Unused Code introduced by
The assignment to $unix is dead and can be removed.
Loading history...
171
        
172
        if($pos === false) {
173 61
            $uri = 'tcp://'.$uri;
174 61
        } elseif(\substr($uri, 0, $pos) === 'unix') {
175 2
            $defaultSocket = '/tmp/mysql.sock';
176
            
177
            $apos = \strpos($uri, '@');
178 59
            $spos = \strrpos($uri, '/');
179 1
            
180
            if($apos !== false) {
181
                $socket = \substr($uri, ($apos + 1), $spos);
182 59
                $uri = \substr($uri, 0, ($apos + 1)).'localhost'.\substr($uri, $spos);
183 59
            } else {
184 59
                $socket = \substr($uri, ($pos + 3), $spos);
185
                $uri = 'unix://localhost'.\substr($uri, $spos);
186
            }
187
            
188 59
            if($socket === 'localhost' || $socket === '127.0.0.1' || $socket === '::1') {
189
                $socket = $defaultSocket;
190
            }
191
        }
192 59
        
193 59
        $parts = \parse_url($uri);
194 59
        if(!isset($parts['scheme']) || !isset($parts['host']) || !\in_array($parts['scheme'], $this->allowedSchemes)) {
195
            return \React\Promise\reject((new \InvalidArgumentException('Invalid connect uri given')));
196
        }
197
        
198 59
        if(isset($socket)) {
199
            $parts['host'] = $socket;
200
        }
201 3
        
202 3
        if($parts['scheme'] === 'mysql') {
203
            $parts['scheme'] = 'tcp';
204 3
        }
205 59
        
206
        $host = $parts['scheme'].'://'.$parts['host'].($parts['scheme'] !== 'unix' ? ':'.($parts['port'] ?? 3306) : '');
207 59
        $this->connectionState = static::CONNECTION_STARTED;
208 59
        $resolved = false;
209
        
210
        $this->connectPromise =  $this->connector->connect($host)->then(function (\React\Socket\ConnectionInterface $connection) use ($parts, &$resolved) {
211
            // See description of property encryption
212
            if(!($connection instanceof \React\Socket\Connection)) {
213
                throw new \LogicException('Custom connection class is NOT supported yet (encryption limitation)');
214
            }
215
            
216 59
            $this->busy = static::STATE_BUSY;
217
            $this->connectionState = static::CONNECTION_MADE;
218 59
            $this->connection = $connection;
219 59
            
220 59
            $this->connection->on('error', function (\Throwable $error) {
221
                $this->emit('error', array($error));
222 59
            });
223
            
224 59
            $this->connection->on('close', function () {
225
                $this->connection = null;
226 58
                $this->connectionState = static::CONNECTION_UNUSABLE;
227 58
                
228
                $this->emit('close');
229 58
            });
230
            
231
            $deferred = new \React\Promise\Deferred();
232 59
            $this->parser = new \Plasma\Drivers\MySQL\ProtocolParser($this, $this->connection);
233 59
            
234
            $this->parser->on('error', function (\Throwable $error) use (&$deferred, &$resolved) {
235 59
                if($resolved) {
236 57
                    $this->emit('error', array($error));
237
                } else {
238 56
                    $deferred->reject($error);
239 56
                }
240
            });
241 56
            
242 56
            $user = ($parts['user'] ?? 'root');
243
            $password = ($parts['pass'] ?? '');
244 56
            $db = (!empty($parts['path']) ? \ltrim($parts['path'], '/') : '');
245 57
            
246
            $credentials = \compact('user', 'password', 'db');
247
            
248 59
            $this->startHandshake($credentials, $deferred);
249
            return $deferred->promise()->then(function () use (&$resolved) {
250
                $this->busy = static::STATE_IDLE;
251
                $resolved = true;
252
                
253
                if(\count($this->queue) > 0) {
254
                    $this->parser->invokeCommand($this->getNextCommand());
255
                }
256 1
            });
257 1
        });
258 1
        
259
        if($this->options['characters.set']) {
260
            $this->charset = $this->options['characters.set'];
261
            $this->connectPromise = $this->connectPromise->then(function () {
262
                $query = 'SET NAMES "'.$this->options['characters.set'].'"'
263
                    .($this->options['characters.collate'] ? ' COLLATE "'.$this->options['characters.collate'].'"' : '');
264
                
265
                $cmd = new \Plasma\Drivers\MySQL\Commands\QueryCommand($this, $query);
266
                $this->executeCommand($cmd);
267
                
268
                return $cmd->getPromise();
269
            });
270 1
        }
271 1
        
272 1
        return $this->connectPromise;
273
    }
274
    
275
    /**
276
     * Pauses the underlying stream I/O consumption.
277
     * If consumption is already paused, this will do nothing.
278
     * @return bool  Whether the operation was successful.
279
     */
280
    function pauseStreamConsumption(): bool {
281
        if($this->connection === null || $this->goingAway) {
282
            return false;
283 24
        }
284 24
        
285 10
        $this->connection->pause();
286
        return true;
287
    }
288 15
    
289 15
    /**
290
     * Resumes the underlying stream I/O consumption.
291 15
     * If consumption is not paused, this will do nothing.
292
     * @return bool  Whether the operation was successful.
293 15
     */
294 14
    function resumeStreamConsumption(): bool {
295
        if($this->connection === null || $this->goingAway) {
296
            return false;
297
        }
298 15
        
299 14
        $this->connection->resume();
300
        return true;
301
    }
302 1
    
303
    /**
304 1
     * Closes all connections gracefully after processing all outstanding requests.
305
     * @return \React\Promise\PromiseInterface
306
     */
307 1
    function close(): \React\Promise\PromiseInterface {
308 1
        if($this->goingAway) {
309
            return $this->goingAway->promise();
310
        }
311
        
312 1
        $state = $this->connectionState;
313
        $this->connectionState = \Plasma\DriverInterface::CONNECTION_UNUSABLE;
314 1
        
315
        $this->goingAway = new \React\Promise\Deferred();
316 1
        
317 15
        if(\count($this->queue) === 0) {
318
            $this->goingAway->resolve();
319
        }
320
        
321
        return $this->goingAway->promise()->then(function () use ($state) {
322
            if($state !== static::CONNECTION_OK) {
323
                return;
324 10
            }
325 10
            
326 1
            $deferred = new \React\Promise\Deferred();
327
            
328
            $quit = new \Plasma\Drivers\MySQL\Commands\QuitCommand();
329 10
            
330 10
            $this->connection->once('close', function () use (&$deferred) {
331
                $deferred->resolve();
332 10
            });
333 10
            
334
            $quit->once('end', function () {
335
                $this->connection->close();
336 10
            });
337 1
            
338
            $this->parser->invokeCommand($quit);
339
            
340 10
            return $deferred->promise();
341 1
        });
342 1
    }
343
    
344 1
    /**
345
     * Forcefully closes the connection, without waiting for any outstanding requests. This will reject all outstanding requests.
346 10
     * @return void
347
     */
348
    function quit(): void {
349
        if($this->goingAway) {
350
            return;
351
        }
352 2
        
353 2
        $state = $this->connectionState;
354
        $this->connectionState = \Plasma\DriverInterface::CONNECTION_UNUSABLE;
355
        
356
        $this->goingAway = new \React\Promise\Deferred();
357
        $this->goingAway->resolve();
358
        
359
        /** @var \Plasma\Drivers\MySQL\Commands\CommandInterface  $command */
360
        while($command = \array_shift($this->queue)) {
361
            $command->emit('error', array((new \Plasma\Exception('Connection is going away'))));
362
        }
363
        
364
        if($state === static::CONNECTION_OK) {
365 48
            $quit = new \Plasma\Drivers\MySQL\Commands\QuitCommand();
366 48
            $this->parser->invokeCommand($quit);
367 1
            
368 47
            $this->connection->close();
369 1
        }
370
    }
371
    
372
    /**
373
     * Whether this driver is currently in a transaction.
374
     * @return bool
375 1
     */
376
    function isInTransaction(): bool {
377
        return $this->transaction;
378 46
    }
379 46
    
380
    /**
381 46
     * Executes a plain query. Resolves with a `QueryResultInterface` instance.
382
     * When the command is done, the driver must check itself back into the client.
383 44
     * @param \Plasma\ClientInterface  $client
384 44
     * @param string                   $query
385
     * @return \React\Promise\PromiseInterface
386
     * @throws \Plasma\Exception
387 46
     * @see \Plasma\QueryResultInterface
388
     */
389
    function query(\Plasma\ClientInterface $client, string $query): \React\Promise\PromiseInterface {
390
        if($this->goingAway) {
391
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
392
        } elseif($this->connectionState !== \Plasma\DriverInterface::CONNECTION_OK) {
393
            if($this->connectPromise !== null) {
394
                return $this->connectPromise->then(function () use (&$client, &$query) {
395
                    return $this->query($client, $query);
396
                });
397
            }
398
            
399 38
            throw new \Plasma\Exception('Unable to continue without connection');
400 38
        }
401 1
        
402 37
        $command = new \Plasma\Drivers\MySQL\Commands\QueryCommand($this, $query);
403 1
        $this->executeCommand($command);
404
        
405
        if(!$this->transaction) {
406
            $command->once('end', function () use (&$client) {
407
                $client->checkinConnection($this);
408
            });
409 1
        }
410
        
411
        return $command->getPromise();
412 36
    }
413 36
    
414
    /**
415 36
     * Prepares a query. Resolves with a `StatementInterface` instance.
416
     * When the command is done, the driver must check itself back into the client.
417
     * @param \Plasma\ClientInterface  $client
418
     * @param string                   $query
419
     * @return \React\Promise\PromiseInterface
420
     * @throws \Plasma\Exception
421
     * @see \Plasma\StatementInterface
422
     */
423
    function prepare(\Plasma\ClientInterface $client, string $query): \React\Promise\PromiseInterface {
424
        if($this->goingAway) {
425
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
426
        } elseif($this->connectionState !== \Plasma\DriverInterface::CONNECTION_OK) {
427
            if($this->connectPromise !== null) {
428
                return $this->connectPromise->then(function () use (&$client, &$query) {
429 37
                    return $this->prepare($client, $query);
430 37
                });
431 1
            }
432 36
            
433 1
            throw new \Plasma\Exception('Unable to continue without connection');
434
        }
435
        
436
        $command = new \Plasma\Drivers\MySQL\Commands\StatementPrepareCommand($client, $this, $query);
437
        $this->executeCommand($command);
438
        
439 1
        return $command->getPromise();
440
    }
441
    
442
    /**
443
     * Prepares and executes a query. Resolves with a `QueryResultInterface` instance.
444 34
     * This is equivalent to prepare -> execute -> close.
445
     * If you need to execute a query multiple times, prepare the query manually for performance reasons.
446
     * @param \Plasma\ClientInterface  $client
447 34
     * @param string                   $query
448
     * @param array                    $params
449 34
     * @return \React\Promise\PromiseInterface
450
     * @throws \Plasma\Exception
451
     * @see \Plasma\StatementInterface
452
     */
453 33
    function execute(\Plasma\ClientInterface $client, string $query, array $params = array()): \React\Promise\PromiseInterface {
454 33
        if($this->goingAway) {
455
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
456
        } elseif($this->connectionState !== \Plasma\DriverInterface::CONNECTION_OK) {
457
            if($this->connectPromise !== null) {
458
                return $this->connectPromise->then(function () use (&$client, &$query, $params) {
459 35
                    return $this->execute($client, $query, $params);
460 35
                });
461
            }
462
            
463
            throw new \Plasma\Exception('Unable to continue without connection');
464
        }
465
        
466
        return $this->prepare($client, $query)->then(function (\Plasma\StatementInterface $statement) use ($params) {
467
            return $statement->execute($params)->then(function (\Plasma\QueryResultInterface $result) use (&$statement) {
468
                if($result instanceof \Plasma\StreamQueryResultInterface) {
469
                    $statement->close()->then(null, function (\Throwable $error) {
470
                        $this->emit('error', array($error));
471 5
                    });
472 5
                    
473 1
                    return $result;
474
                }
475
                
476 4
                return $statement->close()->then(function () use ($result) {
477 4
                    return $result;
478
                });
479
            }, function (\Throwable $error) use (&$statement) {
480
                return $statement->close()->then(function () use ($error) {
481
                    throw $error;
482
                });
483
            });
484
        });
485 4
    }
486 4
    
487 4
    /**
488
     * Quotes the string for use in the query.
489 4
     * @param string  $str
490
     * @param int     $type  For types, see the constants.
491
     * @return string
492
     * @throws \LogicException  Thrown if the driver does not support quoting.
493 4
     * @throws \Plasma\Exception
494
     */
495
    function quote(string $str, int $type = \Plasma\DriverInterface::QUOTE_TYPE_VALUE): string {
496
        if($this->parser === null) {
497
            throw new \Plasma\Exception('Unable to continue without connection');
498
        }
499
        
500
        $message = $this->parser->getLastOkMessage();
501
        if($message === null) {
502
            $message = $this->parser->getHandshakeMessage();
503 2
            
504 2
            if($message === null) {
505 1
                throw new \Plasma\Exception('Unable to quote without a previous handshake');
506
            }
507
        }
508
        
509 1
        $pos = \strpos($this->charset, '_');
510
        $dbCharset = \substr($this->charset, 0, ($pos !== false ? $pos : \strlen($this->charset)));
511
        $realCharset = $this->getRealCharset($dbCharset);
512
        
513
        if(($message->statusFlags & \Plasma\Drivers\MySQL\StatusFlags::SERVER_STATUS_NO_BACKSLASH_ESCAPES) !== 0) {
514 1
            return $this->escapeUsingQuotes($realCharset, $str, $type);
515
        }
516
        
517
        return $this->escapeUsingBackslashes($realCharset, $str, $type);
518 1
    }
519
    
520
    /**
521
     * Escapes using quotes.
522
     * @param string  $realCharset
523
     * @param string  $str
524
     * @param int     $type
525
     * @return string
526
     */
527
    function escapeUsingQuotes(string $realCharset, string $str, int $type): string {
528 6
        if($type === \Plasma\DriverInterface::QUOTE_TYPE_IDENTIFIER) {
529 6
            return '`'.\str_replace('`', '``', $str).'`';
530 3
        }
531
        
532
        $escapeChars = array(
533
            '"',
534 3
            '\\',
535
        );
536
        
537
        $escapeReplace = array(
538
            '""',
539 3
            '\\\\',
540
        );
541
        
542
        return '"'.\str_replace($escapeChars, $escapeReplace, $str).'"';
543 3
    }
544
    
545
    /**
546
     * Escapes using backslashes.
547
     * @param string  $realCharset
548
     * @param string  $str
549
     * @param int     $type
550
     * @return string
551
     */
552
    function escapeUsingBackslashes(string $realCharset, string $str, int $type): string {
553
        if($type === \Plasma\DriverInterface::QUOTE_TYPE_IDENTIFIER) {
554
            return '`'.\str_replace('`', '\\`', $str).'`';
555
        }
556
        
557
        $escapeChars = array(
558
            '\\',
559
            '"',
560
        );
561
        
562 3
        $escapeReplace = array(
563 3
            '\\\\',
564 1
            '\"',
565
        );
566
        
567 2
        return '"'.\str_replace($escapeChars, $escapeReplace, $str).'"';
568 1
    }
569
    
570
    /**
571
     * Begins a transaction. Resolves with a `TransactionInterface` instance.
572 2
     *
573
     * Checks out a connection until the transaction gets committed or rolled back.
574
     * It must be noted that the user is responsible for finishing the transaction. The client WILL NOT automatically
575 2
     * check the connection back into the pool, as long as the transaction is not finished.
576 2
     *
577 2
     * Some databases, including MySQL, automatically issue an implicit COMMIT when a database definition language (DDL)
578
     * statement such as DROP TABLE or CREATE TABLE is issued within a transaction.
579
     * The implicit COMMIT will prevent you from rolling back any other changes within the transaction boundary.
580
     * @param \Plasma\ClientInterface  $client
581
     * @param int                      $isolation  See the `TransactionInterface` constants.
582
     * @return \React\Promise\PromiseInterface
583
     * @throws \Plasma\Exception
584
     * @see \Plasma\TransactionInterface
585
     */
586
    function beginTransaction(\Plasma\ClientInterface $client, int $isolation = \Plasma\TransactionInterface::ISOLATION_COMMITTED): \React\Promise\PromiseInterface {
587
        if($this->goingAway) {
588
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
589 2
        }
590
        
591
        if($this->transaction) {
592 2
            throw new \Plasma\Exception('Driver is already in transaction');
593
        }
594 2
        
595
        switch ($isolation) {
596
            case \Plasma\TransactionInterface::ISOLATION_UNCOMMITTED:
597
                $query = 'SET SESSION TRANSACTION ISOLATION LEVEL READ UNCOMMITTED';
598 2
            break;
599
            case \Plasma\TransactionInterface::ISOLATION_COMMITTED:
600
                $query = 'SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED';
601
            break;
602
            case \Plasma\TransactionInterface::ISOLATION_REPEATABLE:
603
                $query = 'SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ';
604
            break;
605 1
            case \Plasma\TransactionInterface::ISOLATION_SERIALIZABLE:
606 1
                $query = 'SET SESSION TRANSACTION ISOLATION LEVEL SERIALIZABLE';
607 1
            break;
608
            default:
609
                throw new \Plasma\Exception('Invalid isolation level given');
610
            break;
611
        }
612
        
613
        $this->transaction = true;
614
        
615
        return $this->query($client, $query)->then(function () use (&$client) {
616
            return $this->query($client, 'START TRANSACTION');
617
        })->then(function () use (&$client, $isolation) {
618 6
            return (new \Plasma\Transaction($client, $this, $isolation));
619 6
        })->then(null, function (\Throwable $e) {
620 1
            $this->transaction = false;
621
            throw $e;
622
        });
623
    }
624
    
625 3
    /**
626 3
     * Informationally closes a transaction. This method is used by `Transaction` to inform the driver of the end of the transaction.
627
     * @return void
628
     */
629 3
    function endTransaction(): void {
630 5
        $this->transaction = false;
631
    }
632
    
633 2
    /**
634 2
     * Runs the given command.
635
     * Returns a Promise, which resolves with the `end` event argument (defaults to `null),
636
     * or rejects with the `Throwable` of the `error` event.
637 2
     * When the command is done, the driver must check itself back into the client.
638 5
     * @param \Plasma\ClientInterface   $client
639
     * @param \Plasma\CommandInterface  $command
640 5
     * @return \React\Promise\PromiseInterface
641 5
     */
642
    function runCommand(\Plasma\ClientInterface $client, \Plasma\CommandInterface $command) {
643
        if($this->goingAway) {
644
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
645
        }
646
        
647
        return (new \React\Promise\Promise(function (callable $resolve, callable $reject) use (&$client, &$command) {
648
            $command->once('end', function ($value = null) use (&$client, &$resolve) {
649
                if(!$this->transaction) {
650 57
                    $client->checkinConnection($this);
651 57
                }
652
                
653 57
                $resolve($value);
654 56
            });
655
            
656 57
            $command->once('error', function (\Throwable $error) use (&$client, &$reject) {
657
                if(!$this->transaction) {
658
                    $client->checkinConnection($this);
659
                }
660
                
661
                $reject($error);
662 56
            });
663 56
            
664 56
            $this->executeCommand($command);
665
        }));
666
    }
667
    
668
    /**
669
     * Executes a command.
670
     * @param \Plasma\CommandInterface  $command
671
     * @return void
672
     * @internal
673
     */
674
    function executeCommand(\Plasma\CommandInterface $command): void {
675 56
        $this->queue[] = $command;
676 56
        
677 56
        if($this->parser && $this->busy === static::STATE_IDLE) {
678
            $this->parser->invokeCommand($this->getNextCommand());
679
        }
680
    }
681 56
    
682 56
    /**
683
     * Get the handshake message, or null if none received yet.
684
     * @return \Plasma\Drivers\MySQL\Messages\HandshakeMessage|null
685
     */
686
    function getHandshake(): ?\Plasma\Drivers\MySQL\Messages\HandshakeMessage {
687 56
        if($this->parser) {
688
            return $this->parser->getHandshakeMessage();
689 56
        }
690 56
        
691
        return null;
692
    }
693 2
    
694
    /**
695 2
     * Get the next command, or null.
696 56
     * @return \Plasma\CommandInterface|null
697
     * @internal
698
     */
699 56
    function getNextCommand(): ?\Plasma\CommandInterface {
700
        if(\count($this->queue) === 0) {
701 56
            if($this->goingAway) {
702 56
                $this->goingAway->resolve();
703
            }
704 35
            
705
            return null;
706
        } elseif($this->busy === static::STATE_BUSY) {
707 56
            return null;
708
        }
709
        
710
        /** @var \Plasma\CommandInterface  $command */
711
        $command =  \array_shift($this->queue);
712
        
713
        if($command->waitForCompletion()) {
714 56
            $this->busy = static::STATE_BUSY;
715
            
716 56
            $command->once('error', function () use (&$command) {
717 1
                $this->busy = static::STATE_IDLE;
718
                
719
                $this->endCommand();
720 56
            });
721 56
            
722 56
            $command->once('end', function () use (&$command) {
723
                $this->busy = static::STATE_IDLE;
724
                
725
                $this->endCommand();
726
            });
727
        } else {
728
            $this->endCommand();
729
        }
730 59
        
731
        return $command;
732 59
    }
733 59
    
734
    /**
735 59
     * Finishes up a command.
736 59
     * @return void
737
     */
738 59
    protected function endCommand() {
739
        $this->loop->futureTick(function () {
740 59
            if($this->goingAway && \count($this->queue) === 0) {
741 46
                return $this->goingAway->resolve();
742
            }
743
            
744 59
            $this->parser->invokeCommand($this->getNextCommand());
745 2
        });
746
    }
747
    
748 59
    /**
749 2
     * Starts the handshake process.
750 2
     * @param array                    $credentials
751
     * @param \React\Promise\Deferred  $deferred
752
     * @return void
753
     */
754 59
    protected function startHandshake(array $credentials, \React\Promise\Deferred $deferred) {
755 59
        $listener = function (\Plasma\Drivers\MySQL\Messages\MessageInterface $message) use ($credentials, &$deferred, &$listener) {
756
            if($message instanceof \Plasma\Drivers\MySQL\Messages\HandshakeMessage) {
757 59
                $this->parser->removeListener('message', $listener);
758 59
                
759 59
                $this->connectionState = static::CONNECTION_SETENV;
760 59
                $clientFlags = \Plasma\Drivers\MySQL\ProtocolParser::CLIENT_CAPABILITIES;
761 59
                
762
                \extract($credentials);
763
                
764
                if($db !== '') {
765
                    $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_CONNECT_WITH_DB;
766
                }
767
                
768
                if($this->charset === null) {
769 59
                    $this->charset = \Plasma\Drivers\MySQL\CharacterSetFlags::CHARSET_MAP[$message->characterSet] ?? 'latin1';
770
                }
771 59
                
772
                if(($message->capability & \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_COMPRESS) !== 0 && \extension_loaded('zlib') && $this->options['compression.enable']) {
773
                    $this->parser->enableCompression();
774
                    $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_COMPRESS;
775
                }
776
                
777
                // Check if we support auth plugins
778
                $plugins = \Plasma\Drivers\MySQL\DriverFactory::getAuthPlugins();
779
                $plugin = null;
780
                
781
                foreach($plugins as $key => $plug) {
782
                    if(\is_int($key) && ($message->capability & $key) !== 0) {
783
                        $plugin = $plug;
784
                        $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_PLUGIN_AUTH;
785
                        break;
786
                    } elseif($key === $message->authPluginName) {
787
                        $plugin = $plug;
788
                        $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_PLUGIN_AUTH;
789
                        break;
790
                    }
791
                }
792
                
793
                $remote = \parse_url($this->connection->getRemoteAddress())['host'];
794
                
795
                if($remote !== '127.0.0.1' || $this->options['tls.forceLocal']) {
796 59
                    if(($message->capability & \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_SSL) !== 0) { // If SSL supported, connect through SSL
797
                        $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_SSL;
798 59
                        
799
                        $ssl = new \Plasma\Drivers\MySQL\Commands\SSLRequestCommand($message, $clientFlags);
800 59
                        
801
                        $ssl->once('end', function () use ($credentials, $clientFlags, $plugin, &$deferred, &$message) {
802
                            $this->connectionState = static::CONNECTION_SSL_STARTUP;
803 59
                            
804 58
                            $this->enableTLS()->then(function () use ($credentials, $clientFlags, $plugin, &$deferred, &$message) {
805
                                $this->createHandshakeResponse($message, $credentials, $clientFlags, $plugin, $deferred);
806
                            }, function (\Throwable $error) use (&$deferred) {
807 59
                                $deferred->reject($$error);
808 59
                                $this->connection->close();
809 59
                            });
810
                        });
811
                        
812
                        return $this->parser->invokeCommand($ssl);
813
                    } elseif($this->options['tls.force'] || $this->options['tls.forceLocal']) {
814
                        $deferred->reject((new \Plasma\Exception('TLS is not supported by the server')));
815
                        $this->connection->close();
816
                        return;
817
                    }
818
                }
819
                
820
                $this->createHandshakeResponse($message, $credentials, $clientFlags, $plugin, $deferred);
821
            }
822
        };
823
        
824
        $this->parser->on('message', $listener);
825
        
826
        $this->parser->on('message', function (\Plasma\Drivers\MySQL\Messages\MessageInterface $message) {
827
            if($message instanceof \Plasma\Drivers\MySQL\Messages\OkResponseMessage) {
828
                $this->connectionState = static::CONNECTION_OK;
829
            }
830
            
831
            $this->emit('eventRelay', array('message', $message));
832
        });
833
    }
834
    
835
    /**
836 59
     * Enables TLS on the connection.
837
     * @return \React\Promise\PromiseInterface
838
     */
839 59
    protected function enableTLS(): \React\Promise\PromiseInterface {
840
        // Set required SSL/TLS context options
841 59
        foreach($this->options['tls.context'] as $name => $value) {
842
            \stream_context_set_option($this->connection->stream, 'ssl', $name, $value);
843
        }
844
        
845 58
        return $this->encryption->enable($this->connection)->then(null, function (\Throwable $error) {
846 58
            $this->connection->close();
847 59
            throw new \RuntimeException('Connection failed during TLS handshake: '.$error->getMessage(), $error->getCode());
848
        });
849
    }
850 1
    
851 1
    /**
852 59
     * Sends the auth command.
853
     * @param \Plasma\Drivers\MySQL\Messages\HandshakeMessage  $message
854 59
     * @param array                                            $credentials
855
     * @param int                                              $clientFlags
856
     * @param string|null                                      $plugin
857 59
     * @param \React\Promise\Deferred                          $deferred
858
     * @return void
859 59
     */
860
    protected function createHandshakeResponse(
861
        \Plasma\Drivers\MySQL\Messages\HandshakeMessage $message, array $credentials, int $clientFlags, ?string $plugin, \React\Promise\Deferred $deferred
862
    ) {
863
        \extract($credentials);
864
        
865
        $auth = new \Plasma\Drivers\MySQL\Commands\HandshakeResponseCommand($this->parser, $message, $clientFlags, $plugin, $user, $password, $db);
866
        
867
        $auth->once('end', function () use (&$deferred) {
868
            $this->loop->futureTick(function () use (&$deferred) {
869
                $deferred->resolve();
870
            });
871
        });
872
        
873
        $auth->once('error', function (\Throwable $error) use (&$deferred) {
874
            $deferred->reject($error);
875 59
            $this->connection->close();
876
        });
877
        
878
        if($plugin) {
879
            $listener = function (\Plasma\Drivers\MySQL\Messages\MessageInterface $message) use ($password, &$deferred, &$listener) {
880
                /** @var \Plasma\Drivers\MySQL\AuthPlugins\AuthPluginInterface|null  $plugin */
881
                static $plugin;
882
                
883
                if($message instanceof \Plasma\Drivers\MySQL\Messages\AuthSwitchRequestMessage) {
884
                    $name = $message->authPluginName;
885
                    
886
                    if($name !== null) {
887
                        $plugins = \Plasma\Drivers\MySQL\DriverFactory::getAuthPlugins();
888 59
                        foreach($plugins as $key => $plug) {
889 58
                            if($key === $name) {
890
                                $plugin = new $plug($this->parser, $this->parser->getHandshakeMessage());
891 59
                                
892
                                $command = new \Plasma\Drivers\MySQL\Commands\AuthSwitchResponseCommand($message, $plugin, $password);
893 59
                                return $this->parser->invokeCommand($command);
894
                            }
895
                        }
896 59
                    }
897 59
                    
898 59
                    $deferred->reject((new \Plasma\Exception('Requested authentication method '.($name ? '"'.$name.'" ' : '').'is not supported')));
899
                } elseif($message instanceof \Plasma\Drivers\MySQL\Messages\AuthMoreDataMessage) {
900
                    if($plugin === null) {
901
                        $deferred->reject((new \Plasma\Exception('No auth plugin is in use, but we received auth more data packet')));
902
                        return $this->connection->close();
903
                    }
904
                    
905 4
                    try {
906 4
                        $command = $plugin->receiveMoreData($message);
907 2
                        return $this->parser->invokeCommand($command);
908
                    } catch (\Plasma\Exception $e) {
909
                        $deferred->reject($e);
910 2
                        $this->connection->close();
911
                    }
912 2
                } elseif($message instanceof \Plasma\Drivers\MySQL\Messages\OkResponseMessage) {
913 2
                    $this->parser->removeListener('message', $listener);
914 2
                }
915
            };
916
            
917
            $this->parser->on('message', $listener);
918 2
        }
919
        
920
        $this->parser->invokeCommand($auth);
921
        $this->connectionState = static::CONNECTION_AWAITING_RESPONSE;
922
    }
923
    
924
    /**
925
     * Get the real charset from the DB charset.
926
     * @param string  $charset
927 83
     * @return string
928 83
     */
929 83
    protected function getRealCharset(string $charset): string {
930
        if(\substr($charset, 0, 4) === 'utf8') {
931
            return 'UTF-8';
932
        }
933
        
934
        $charsets = \mb_list_encodings();
935
        
936
        foreach($charsets as $set) {
937 83
            if(\stripos($set, $charset) === 0) {
938 83
                return $set;
939
            }
940
        }
941
        
942
        return 'UTF-8';
943
    }
944
    
945
    /**
946
     * Validates the given options.
947
     * @param array  $options
948
     * @return void
949
     * @throws \InvalidArgumentException
950
     */
951
    protected function validateOptions(array $options) {
952
        \CharlotteDunois\Validation\Validator::make($options, array(
953
            'characters.set' => 'string',
954
            'characters.collate' => 'string',
955
            'compression.enable' => 'boolean',
956
            'connector' => 'class:'.\React\Socket\ConnectorInterface::class.'=object',
957
            'packet.maxAllowedSize' => 'integer|min:0|max:'.\Plasma\Drivers\MySQL\ProtocolParser::CLIENT_MAX_PACKET_SIZE,
958
            'tls.context' => 'array',
959
            'tls.force' => 'boolean',
960
            'tls.forceLocal' => 'boolean'
961
        ), true)->throw(\InvalidArgumentException::class);
962
    }
963
}
964