GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Push — master ( f5146c...c37035 )
by Charlotte
02:33
created

Driver::connect()   C

Complexity

Conditions 12
Paths 6

Size

Total Lines 83
Code Lines 51

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 45
CRAP Score 12.2341

Importance

Changes 0
Metric Value
cc 12
eloc 51
nc 6
nop 1
dl 0
loc 83
ccs 45
cts 51
cp 0.8824
crap 12.2341
rs 6.9666
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
/**
3
 * Plasma Driver MySQL component
4
 * Copyright 2018 PlasmaPHP, All Rights Reserved
5
 *
6
 * Website: https://github.com/PlasmaPHP
7
 * License: https://github.com/PlasmaPHP/driver-mysql/blob/master/LICENSE
8
*/
9
10
namespace Plasma\Drivers\MySQL;
11
12
/**
13
 * The MySQL Driver.
14
 * @internal
15
 */
16
class Driver implements \Plasma\DriverInterface {
17
    use \Evenement\EventEmitterTrait;
18
    
19
    /**
20
     * @var \React\EventLoop\LoopInterface
21
     */
22
    protected $loop;
23
    
24
    /**
25
     * @var array
26
     */
27
    protected $options = array(
28
        'characters.set' => 'utf8mb4',
29
        'characters.collate' => null,
30
        'tls.context' => array(),
31
        'tls.force' => true,
32
        'tls.forceLocal' => false
33
    );
34
    
35
    /**
36
     * @var \React\Socket\ConnectorInterface
37
     */
38
    protected $connector;
39
    
40
    /**
41
     * Internal class is intentional used, as there's no other way currently.
42
     * @var \React\Socket\StreamEncryption
43
     * @see https://github.com/reactphp/socket/issues/180
44
     */
45
    protected $encryption;
46
    
47
    /**
48
     * @var \React\Promise\Promise|null
49
     */
50
    protected $connectPromise;
51
    
52
    /**
53
     * @var \React\Socket\Connection
54
     */
55
    protected $connection;
56
    
57
    /**
58
     * @var int
59
     */
60
    protected $connectionState = \Plasma\DriverInterface::CONNECTION_CLOSED;
61
    
62
    /**
63
     * @var \Plasma\Drivers\MySQL\ProtocolParser
64
     */
65
    protected $parser;
66
    
67
    /**
68
     * @var array
69
     */
70
    protected $queue;
71
    
72
    /**
73
     * @var int
74
     */
75
    protected $busy = \Plasma\DriverInterface::STATE_IDLE;
76
    
77
    /**
78
     * @var bool
79
     */
80
    protected $transaction = false;
81
    
82
    /**
83
     * @var \React\Promise\Deferred
84
     */
85
    protected $goingAway;
86
    
87
    /**
88
     * @var string|null
89
     */
90
    protected $charset;
91
    
92
    /**
93
     * Constructor.
94
     * @param \React\EventLoop\LoopInterface  $loop
95
     * @param array                           $options
96
     */
97 79
    function __construct(\React\EventLoop\LoopInterface $loop, array $options) {
98 79
        $this->validateOptions($options);
99
        
100 79
        $this->loop = $loop;
101 79
        $this->options = \array_merge($this->options, $options);
102
        
103 79
        $this->connector = ($options['connector'] ?? (new \React\Socket\Connector($loop)));
104 79
        $this->encryption = new \React\Socket\StreamEncryption($this->loop, false);
105 79
        $this->queue = array();
106 79
    }
107
    
108
    /**
109
     * Returns the event loop.
110
     * @return \React\EventLoop\LoopInterface
111
     */
112 54
    function getLoop(): \React\EventLoop\LoopInterface {
113 54
        return $this->loop;
114
    }
115
    
116
    /**
117
     * Retrieves the current connection state.
118
     * @return int
119
     */
120 6
    function getConnectionState(): int {
121 6
        return $this->connectionState;
122
    }
123
    
124
    /**
125
     * Retrieves the current busy state.
126
     * @return int
127
     */
128 1
    function getBusyState(): int {
129 1
        return $this->busy;
130
    }
131
    
132
    /**
133
     * Get the length of the driver backlog queue.
134
     * @return int
135
     */
136 1
    function getBacklogLength(): int {
137 1
        return \count($this->queue);
138
    }
139
    
140
    /**
141
     * Connects to the given URI.
142
     * @param string  $uri
143
     * @return \React\Promise\PromiseInterface
144
     */
145 58
    function connect(string $uri): \React\Promise\PromiseInterface {
146 58
        if($this->goingAway || $this->connectionState === \Plasma\DriverInterface::CONNECTION_UNUSABLE) {
147 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
148 57
        } elseif($this->connectionState === \Plasma\DriverInterface::CONNECTION_OK) {
149 1
            return \React\Promise\resolve();
150 57
        } elseif($this->connectPromise !== null) {
151 1
            return $this->connectPromise;
152
        }
153
        
154 57
        $uri = 'mysql://'.\ltrim($uri, 'mysql://');
155
        
156 57
        $parts = \parse_url($uri);
157 57
        if(!isset($parts['host'])) {
158 1
            return \React\Promise\reject((new \InvalidArgumentException('Invalid connect uri given')));
159
        }
160
        
161 56
        $host = $parts['host'].':'.($parts['port'] ?? 3306);
162 56
        $this->connectionState = static::CONNECTION_STARTED;
163 56
        $resolved = false;
164
        
165
        $this->connectPromise =  $this->connector->connect($host)->then(function (\React\Socket\ConnectionInterface $connection) use ($parts, &$resolved) {
166
            // See description of property encryption
167 56
            if(!($connection instanceof \React\Socket\Connection)) {
168
                throw new \LogicException('Custom connection class is NOT supported yet (encryption limitation)');
169
            }
170
            
171 56
            $this->busy = static::STATE_BUSY;
172 56
            $this->connectionState = static::CONNECTION_MADE;
173 56
            $this->connection = $connection;
174
            
175
            $this->connection->on('error', function (\Throwable $error) {
176
                $this->emit('error', array($error));
177 56
            });
178
            
179
            $this->connection->on('close', function () {
180 3
                $this->connection = null;
181 3
                $this->connectionState = static::CONNECTION_UNUSABLE;
182
                
183 3
                $this->emit('close');
184 56
            });
185
            
186 56
            $deferred = new \React\Promise\Deferred();
187 56
            $this->parser = new \Plasma\Drivers\MySQL\ProtocolParser($this, $this->connection);
188
            
189
            $this->parser->on('error', function (\Throwable $error) use (&$deferred, &$resolved) {
190
                if($resolved) {
191
                    $this->emit('error', array($error));
192
                } else {
193
                    $deferred->reject($error);
194
                }
195 56
            });
196
            
197 56
            $user = ($parts['user'] ?? 'root');
198 56
            $password = ($parts['pass'] ?? '');
199 56
            $db = (!empty($parts['path']) ? \ltrim($parts['path'], '/') : '');
200
            
201 56
            $credentials = \compact('user', 'password', 'db');
202
            
203 56
            $this->startHandshake($credentials, $deferred);
204
            return $deferred->promise()->then(function () use (&$resolved) {
205 55
                $this->busy = static::STATE_IDLE;
206 55
                $resolved = true;
207
                
208 55
                if(\count($this->queue) > 0) {
209
                    $this->parser->invokeCommand($this->getNextCommand());
210
                }
211 56
            });
212 56
        });
213
        
214 56
        if($this->options['characters.set']) {
215 54
            $this->charset = $this->options['characters.set'];
216
            $this->connectPromise = $this->connectPromise->then(function () {
217 53
                $query = 'SET NAMES "'.$this->options['characters.set'].'"'
218 53
                    .($this->options['characters.collate'] ? ' COLLATE "'.$this->options['characters.collate'].'"' : '');
219
                
220 53
                $cmd = new \Plasma\Drivers\MySQL\Commands\QueryCommand($this, $query);
221 53
                $this->executeCommand($cmd);
222
                
223 53
                return $cmd->getPromise();
224 54
            });
225
        }
226
        
227 56
        return $this->connectPromise;
228
    }
229
    
230
    /**
231
     * Pauses the underlying stream I/O consumption.
232
     * If consumption is already paused, this will do nothing.
233
     * @return bool  Whether the operation was successful.
234
     */
235 1
    function pauseStreamConsumption(): bool {
236 1
        if($this->connection === null || $this->goingAway) {
237 1
            return false;
238
        }
239
        
240
        $this->connection->pause();
241
        return true;
242
    }
243
    
244
    /**
245
     * Resumes the underlying stream I/O consumption.
246
     * If consumption is not paused, this will do nothing.
247
     * @return bool  Whether the operation was successful.
248
     */
249 1
    function resumeStreamConsumption(): bool {
250 1
        if($this->connection === null || $this->goingAway) {
251 1
            return false;
252
        }
253
        
254
        $this->connection->resume();
255
        return true;
256
    }
257
    
258
    /**
259
     * Closes all connections gracefully after processing all outstanding requests.
260
     * @return \React\Promise\PromiseInterface
261
     */
262 2
    function close(): \React\Promise\PromiseInterface {
263 2
        if($this->goingAway) {
264 2
            return $this->goingAway->promise();
265
        }
266
        
267 1
        $state = $this->connectionState;
268 1
        $this->connectionState = \Plasma\DriverInterface::CONNECTION_UNUSABLE;
269
        
270 1
        $this->goingAway = new \React\Promise\Deferred();
271
        
272 1
        if(\count($this->queue) === 0) {
273
            $this->goingAway->resolve();
274
        }
275
        
276
        return $this->goingAway->promise()->then(function () use ($state) {
277 1
            if($state !== static::CONNECTION_OK) {
278
                return;
279
            }
280
            
281 1
            $deferred = new \React\Promise\Deferred();
282
            
283 1
            $quit = new \Plasma\Drivers\MySQL\Commands\QuitCommand();
284
            
285
            $this->connection->once('close', function () use (&$deferred) {
286 1
                $deferred->resolve();
287 1
            });
288
            
289
            $quit->once('end', function () {
290
                $this->connection->close();
291 1
            });
292
            
293 1
            $this->parser->invokeCommand($quit);
294
            
295 1
            return $deferred->promise();
296 1
        });
297
    }
298
    
299
    /**
300
     * Forcefully closes the connection, without waiting for any outstanding requests. This will reject all outstanding requests.
301
     * @return void
302
     */
303 10
    function quit(): void {
304 10
        if($this->goingAway) {
305 1
            return;
306
        }
307
        
308 10
        $state = $this->connectionState;
309 10
        $this->connectionState = \Plasma\DriverInterface::CONNECTION_UNUSABLE;
310
        
311 10
        $this->goingAway = new \React\Promise\Deferred();
312 10
        $this->goingAway->resolve();
313
        
314
        /** @var \Plasma\Drivers\MySQL\Commands\CommandInterface  $command */
315 10
        while($command = \array_shift($this->queue)) {
316 1
            $command->emit('error', array((new \Plasma\Exception('Connection is going away'))));
317
        }
318
        
319 10
        if($state === static::CONNECTION_OK) {
320 1
            $quit = new \Plasma\Drivers\MySQL\Commands\QuitCommand();
321 1
            $this->parser->invokeCommand($quit);
322
            
323 1
            $this->connection->close();
324
        }
325 10
    }
326
    
327
    /**
328
     * Whether this driver is currently in a transaction.
329
     * @return bool
330
     */
331 2
    function isInTransaction(): bool {
332 2
        return $this->transaction;
333
    }
334
    
335
    /**
336
     * Executes a plain query. Resolves with a `QueryResultInterface` instance.
337
     * When the command is done, the driver must check itself back into the client.
338
     * @param \Plasma\ClientInterface  $client
339
     * @param string                   $query
340
     * @return \React\Promise\PromiseInterface
341
     * @throws \Plasma\Exception
342
     * @see \Plasma\QueryResultInterface
343
     */
344 45
    function query(\Plasma\ClientInterface $client, string $query): \React\Promise\PromiseInterface {
345 45
        if($this->goingAway) {
346 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
347 44
        } elseif($this->connectionState !== \Plasma\DriverInterface::CONNECTION_OK) {
348 1
            if($this->connectPromise !== null) {
349
                return $this->connectPromise->then(function () use (&$client, &$query) {
350
                    return $this->query($client, $query);
351
                });
352
            }
353
            
354 1
            throw new \Plasma\Exception('Unable to continue without connection');
355
        }
356
        
357 43
        $command = new \Plasma\Drivers\MySQL\Commands\QueryCommand($this, $query);
358 43
        $this->executeCommand($command);
359
        
360 43
        if(!$this->transaction) {
361
            $command->once('end', function () use (&$client) {
362 41
                $client->checkinConnection($this);
363 41
            });
364
        }
365
        
366 43
        return $command->getPromise();
367
    }
368
    
369
    /**
370
     * Prepares a query. Resolves with a `StatementInterface` instance.
371
     * When the command is done, the driver must check itself back into the client.
372
     * @param \Plasma\ClientInterface  $client
373
     * @param string                   $query
374
     * @return \React\Promise\PromiseInterface
375
     * @throws \Plasma\Exception
376
     * @see \Plasma\StatementInterface
377
     */
378 38
    function prepare(\Plasma\ClientInterface $client, string $query): \React\Promise\PromiseInterface {
379 38
        if($this->goingAway) {
380 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
381 37
        } elseif($this->connectionState !== \Plasma\DriverInterface::CONNECTION_OK) {
382 1
            if($this->connectPromise !== null) {
383
                return $this->connectPromise->then(function () use (&$client, &$query) {
384
                    return $this->prepare($client, $query);
385
                });
386
            }
387
            
388 1
            throw new \Plasma\Exception('Unable to continue without connection');
389
        }
390
        
391 36
        $command = new \Plasma\Drivers\MySQL\Commands\StatementPrepareCommand($client, $this, $query);
392 36
        $this->executeCommand($command);
393
        
394 36
        return $command->getPromise();
395
    }
396
    
397
    /**
398
     * Prepares and executes a query. Resolves with a `QueryResultInterface` instance.
399
     * This is equivalent to prepare -> execute -> close.
400
     * If you need to execute a query multiple times, prepare the query manually for performance reasons.
401
     * @param \Plasma\ClientInterface  $client
402
     * @param string                   $query
403
     * @param array                    $params
404
     * @return \React\Promise\PromiseInterface
405
     * @throws \Plasma\Exception
406
     * @see \Plasma\StatementInterface
407
     */
408 37
    function execute(\Plasma\ClientInterface $client, string $query, array $params = array()): \React\Promise\PromiseInterface {
409 37
        if($this->goingAway) {
410 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
411 36
        } elseif($this->connectionState !== \Plasma\DriverInterface::CONNECTION_OK) {
412 1
            if($this->connectPromise !== null) {
413
                return $this->connectPromise->then(function () use (&$client, &$query, $params) {
414
                    return $this->execute($client, $query, $params);
415
                });
416
            }
417
            
418 1
            throw new \Plasma\Exception('Unable to continue without connection');
419
        }
420
        
421
        return $this->prepare($client, $query)->then(function (\Plasma\StatementInterface $statement) use ($params) {
422
            return $statement->execute($params)->then(function (\Plasma\QueryResultInterface $result) use (&$statement) {
423 34
                if($result instanceof \Plasma\StreamQueryResultInterface) {
424
                    $statement->close(null, function (\Throwable $error) {
425
                        $this->emit('error', array($error));
426 34
                    });
427
                    
428 34
                    return $result;
429
                }
430
                
431
                return $statement->close()->then(function () use ($result) {
432 33
                    return $result;
433 33
                });
434
            }, function (\Throwable $error) use (&$statement) {
435
                return $statement->close()->then(function () use ($error) {
436
                    throw $error;
437
                });
438 35
            });
439 35
        });
440
    }
441
    
442
    /**
443
     * Quotes the string for use in the query.
444
     * @param string  $str
445
     * @param int     $type  For types, see the constants.
446
     * @return string
447
     * @throws \LogicException  Thrown if the driver does not support quoting.
448
     * @throws \Plasma\Exception
449
     */
450 5
    function quote(string $str, int $type = \Plasma\DriverInterface::QUOTE_TYPE_VALUE): string {
451 5
        if($this->parser === null) {
452 1
            throw new \Plasma\Exception('Unable to continue without connection');
453
        }
454
        
455 4
        $message = $this->parser->getLastOkMessage();
456 4
        if($message === null) {
457
            $message = $this->parser->getHandshakeMessage();
458
            
459
            if($message === null) {
460
                throw new \Plasma\Exception('Unable to quote without a previous handshake');
461
            }
462
        }
463
        
464 4
        $pos = \strpos($this->charset, '_');
465 4
        $dbCharset = \substr($this->charset, 0, ($pos !== false ? $pos : \strlen($this->charset)));
466 4
        $realCharset = $this->getRealCharset($dbCharset);
467
        
468 4
        if(($message->statusFlags & \Plasma\Drivers\MySQL\StatusFlags::SERVER_STATUS_NO_BACKSLASH_ESCAPES) !== 0) {
469
            return $this->escapeUsingQuotes($realCharset, $str, $type);
470
        }
471
        
472 4
        return $this->escapeUsingBackslashes($realCharset, $str, $type);
473
    }
474
    
475
    /**
476
     * Escapes using quotes.
477
     * @param string  $realCharset
478
     * @param string  $str
479
     * @param int     $type
480
     * @return string
481
     */
482 2
    function escapeUsingQuotes(string $realCharset, string $str, int $type): string {
483 2
        if($type === \Plasma\DriverInterface::QUOTE_TYPE_IDENTIFIER) {
484 1
            return '`'.\str_replace('`', '``', $str).'`';
485
        }
486
        
487
        $escapeChars = array(
488 1
            '"',
489
            '\\',
490
        );
491
        
492
        $escapeReplace = array(
493 1
            '""',
494
            '\\\\',
495
        );
496
        
497 1
        return '"'.\str_replace($escapeChars, $escapeReplace, $str).'"';
498
    }
499
    
500
    /**
501
     * Escapes using backslashes.
502
     * @param string  $realCharset
503
     * @param string  $str
504
     * @param int     $type
505
     * @return string
506
     */
507 6
    function escapeUsingBackslashes(string $realCharset, string $str, int $type): string {
508 6
        if($type === \Plasma\DriverInterface::QUOTE_TYPE_IDENTIFIER) {
509 3
            return '`'.\str_replace('`', '\\`', $str).'`';
510
        }
511
        
512
        $escapeChars = array(
513 3
            '\\',
514
            '"',
515
        );
516
        
517
        $escapeReplace = array(
518 3
            '\\\\',
519
            '\"',
520
        );
521
        
522 3
        return '"'.\str_replace($escapeChars, $escapeReplace, $str).'"';
523
    }
524
    
525
    /**
526
     * Begins a transaction. Resolves with a `TransactionInterface` instance.
527
     *
528
     * Checks out a connection until the transaction gets committed or rolled back.
529
     * It must be noted that the user is responsible for finishing the transaction. The client WILL NOT automatically
530
     * check the connection back into the pool, as long as the transaction is not finished.
531
     *
532
     * Some databases, including MySQL, automatically issue an implicit COMMIT when a database definition language (DDL)
533
     * statement such as DROP TABLE or CREATE TABLE is issued within a transaction.
534
     * The implicit COMMIT will prevent you from rolling back any other changes within the transaction boundary.
535
     * @param \Plasma\ClientInterface  $client
536
     * @param int                      $isolation  See the `TransactionInterface` constants.
537
     * @return \React\Promise\PromiseInterface
538
     * @throws \Plasma\Exception
539
     * @see \Plasma\TransactionInterface
540
     */
541 3
    function beginTransaction(\Plasma\ClientInterface $client, int $isolation = \Plasma\TransactionInterface::ISOLATION_COMMITTED): \React\Promise\PromiseInterface {
542 3
        if($this->goingAway) {
543 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
544
        }
545
        
546 2
        if($this->transaction) {
547 1
            throw new \Plasma\Exception('Driver is already in transaction');
548
        }
549
        
550
        switch ($isolation) {
551 2
            case \Plasma\TransactionInterface::ISOLATION_UNCOMMITTED:
552
                $query = 'SET SESSION TRANSACTION ISOLATION LEVEL READ UNCOMMITTED';
553
            break;
554 2
            case \Plasma\TransactionInterface::ISOLATION_COMMITTED:
555 2
                $query = 'SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED';
556 2
            break;
557
            case \Plasma\TransactionInterface::ISOLATION_REPEATABLE:
558
                $query = 'SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ';
559
            break;
560
            case \Plasma\TransactionInterface::ISOLATION_SERIALIZABLE:
561
                $query = 'SET SESSION TRANSACTION ISOLATION LEVEL SERIALIZABLE';
562
            break;
563
            default:
564
                throw new \Plasma\Exception('Invalid isolation level given');
565
            break;
566
        }
567
        
568 2
        $this->transaction = true;
569
        
570
        return $this->query($client, $query)->then(function () use (&$client) {
571 2
            return $this->query($client, 'START TRANSACTION');
572
        })->then(function () use (&$client, $isolation) {
573 2
            return (new \Plasma\Transaction($client, $this, $isolation));
574
        })->then(null, function (\Throwable $e) {
575
            $this->transaction = false;
576
            throw $e;
577 2
        });
578
    }
579
    
580
    /**
581
     * Informationally closes a transaction. This method is used by `Transaction` to inform the driver of the end of the transaction.
582
     * @return void
583
     */
584 1
    function endTransaction(): void {
585 1
        $this->transaction = false;
586 1
    }
587
    
588
    /**
589
     * Runs the given command.
590
     * Returns a Promise, which resolves with the `end` event argument (defaults to `null),
591
     * or rejects with the `Throwable` of the `error` event.
592
     * When the command is done, the driver must check itself back into the client.
593
     * @param \Plasma\ClientInterface   $client
594
     * @param \Plasma\CommandInterface  $command
595
     * @return \React\Promise\PromiseInterface
596
     */
597 6
    function runCommand(\Plasma\ClientInterface $client, \Plasma\CommandInterface $command) {
598 6
        if($this->goingAway) {
599 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
600
        }
601
        
602
        return (new \React\Promise\Promise(function (callable $resolve, callable $reject) use (&$client, &$command) {
603
            $command->once('end', function ($value = null) use (&$client, &$resolve) {
604 3
                if(!$this->transaction) {
605 3
                    $client->checkinConnection($this);
606
                }
607
                
608 3
                $resolve($value);
609 5
            });
610
            
611
            $command->once('error', function (\Throwable $error) use (&$client, &$reject) {
612 2
                if(!$this->transaction) {
613 2
                    $client->checkinConnection($this);
614
                }
615
                
616 2
                $reject($error);
617 5
            });
618
            
619 5
            $this->executeCommand($command);
620 5
        }));
621
    }
622
    
623
    /**
624
     * Executes a command.
625
     * @param \Plasma\CommandInterface  $command
626
     * @return void
627
     * @internal
628
     */
629 54
    function executeCommand(\Plasma\CommandInterface $command): void {
630 54
        $this->queue[] = $command;
631
        //\assert((\Plasma\Drivers\MySQL\Messages\MessageUtility::debug('Command '.get_class($command).' added to queue') || true));
632
        
633 54
        if($this->parser && $this->busy === static::STATE_IDLE) {
634
            //\assert((\Plasma\Drivers\MySQL\Messages\MessageUtility::debug('Command '.get_class($command).' invoked into parser') || true));
635 53
            $this->parser->invokeCommand($this->getNextCommand());
636
        }
637 54
    }
638
    
639
    /**
640
     * Get the handshake message, or null if none received yet.
641
     * @return \Plasma\Drivers\MySQL\Messages\HandshakeMessage|null
642
     */
643 53
    function getHandshake(): ?\Plasma\Drivers\MySQL\Messages\HandshakeMessage {
644 53
        if($this->parser) {
645 53
            return $this->parser->getHandshakeMessage();
646
        }
647
        
648
        return null;
649
    }
650
    
651
    /**
652
     * Get the next command, or null.
653
     * @return \Plasma\CommandInterface|null
654
     * @internal
655
     */
656 53
    function getNextCommand(): ?\Plasma\CommandInterface {
657 53
        if(\count($this->queue) === 0) {
658 53
            if($this->goingAway) {
659
                $this->goingAway->resolve();
660
            }
661
            
662 53
            return null;
663 53
        } elseif($this->busy === static::STATE_BUSY) {
664
            return null;
665
        }
666
        
667
        /** @var \Plasma\CommandInterface  $command */
668 53
        $command =  \array_shift($this->queue);
669
        
670
        //\assert((\Plasma\Drivers\MySQL\Messages\MessageUtility::debug('Unshifted command '.get_class($command)) || true));
671
        
672 53
        if($command->waitForCompletion()) {
673 53
            $this->busy = static::STATE_BUSY;
674
            
675
            $command->once('error', function () use (&$command) {
676
                //\assert((\Plasma\Drivers\MySQL\Messages\MessageUtility::debug('Command '.get_class($command).' errored') || true));
677 2
                $this->busy = static::STATE_IDLE;
678
                
679 2
                $this->endCommand();
680 53
            });
681
            
682
            $command->once('end', function () use (&$command) {
683
                //\assert((\Plasma\Drivers\MySQL\Messages\MessageUtility::debug('Command '.get_class($command).' ended') || true));
684 53
                $this->busy = static::STATE_IDLE;
685
                
686 53
                $this->endCommand();
687 53
            });
688
        } else {
689 35
            $this->endCommand();
690
        }
691
        
692 53
        return $command;
693
    }
694
    
695
    /**
696
     * Finishes up a command.
697
     * @return void
698
     */
699 53
    protected function endCommand() {
700
        $this->loop->futureTick(function () {
701 53
            if($this->goingAway && \count($this->queue) === 0) {
702 1
                return $this->goingAway->resolve();
703
            }
704
            
705 53
            $this->parser->invokeCommand($this->getNextCommand());
706 53
        });
707 53
    }
708
    
709
    /**
710
     * Starts the handshake process.
711
     * @param array                    $credentials
712
     * @param \React\Promise\Deferred  $deferred
713
     * @return void
714
     */
715 56
    protected function startHandshake(array $credentials, \React\Promise\Deferred $deferred) {
716
        $listener = function (\Plasma\Drivers\MySQL\Messages\MessageInterface $message) use ($credentials, &$deferred, &$listener) {
717 56
            if($message instanceof \Plasma\Drivers\MySQL\Messages\HandshakeMessage) {
718 56
                $this->parser->removeListener('message', $listener);
719
                
720 56
                $this->connectionState = static::CONNECTION_SETENV;
721 56
                $clientFlags = \Plasma\Drivers\MySQL\ProtocolParser::CLIENT_CAPABILITIES;
722
                
723 56
                \extract($credentials);
724
                
725 56
                if($db !== '') {
726 43
                    $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_CONNECT_WITH_DB;
727
                }
728
                
729 56
                if($this->charset === null) {
730 2
                    $this->charset = \Plasma\Drivers\MySQL\CharacterSetFlags::CHARSET_MAP[$message->characterSet] ?? 'latin1';
731
                }
732
                
733
                // Check if we support auth plugins
734 56
                $plugins = \Plasma\Drivers\MySQL\DriverFactory::getAuthPlugins();
735 56
                $plugin = null;
736
                
737 56
                foreach($plugins as $key => $plug) {
738 56
                    if(\is_int($key) && ($message->capability & $key) !== 0) {
739 56
                        $plugin = $plug;
740 56
                        $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_PLUGIN_AUTH;
741 56
                        break;
742
                    } elseif($key === $message->authPluginName) {
743
                        $plugin = $plug;
744
                        $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_PLUGIN_AUTH;
745
                        break;
746
                    }
747
                }
748
                
749 56
                $remote = \parse_url($this->connection->getRemoteAddress())['host'];
750
                
751 56
                if($remote !== '127.0.0.1' || $this->options['tls.forceLocal']) {
752
                    if(($message->capability & \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_SSL) !== 0) { // If SSL supported, connect through SSL
753
                        $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_SSL;
754
                        
755
                        $ssl = new \Plasma\Drivers\MySQL\Commands\SSLRequestCommand($message, $clientFlags);
756
                        
757
                        $ssl->once('end', function () use ($credentials, $clientFlags, $plugin, &$deferred, &$message) {
758
                            $this->connectionState = static::CONNECTION_SSL_STARTUP;
759
                            
760
                            $this->enableTLS()->then(function () use ($credentials, $clientFlags, $plugin, &$deferred, &$message) {
761
                                $this->createHandshakeResponse($message, $credentials, $clientFlags, $plugin, $deferred);
762
                            }, function (\Throwable $error) use (&$deferred) {
763
                                $deferred->reject($$error);
764
                                $this->connection->close();
765
                            });
766
                        });
767
                        
768
                        return $this->parser->invokeCommand($ssl);
769
                    } elseif($this->options['tls.force'] || $this->options['tls.forceLocal']) {
770
                        $deferred->reject((new \Plasma\Exception('TLS is not supported by the server')));
771
                        $this->connection->close();
772
                        return;
773
                    }
774
                }
775
                
776 56
                $this->createHandshakeResponse($message, $credentials, $clientFlags, $plugin, $deferred);
777
            }
778 56
        };
779
        
780 56
        $this->parser->on('message', $listener);
781
        
782
        $this->parser->on('message', function (\Plasma\Drivers\MySQL\Messages\MessageInterface $message) {
783 56
            if($message instanceof \Plasma\Drivers\MySQL\Messages\OkResponseMessage) {
784 55
                $this->connectionState = static::CONNECTION_OK;
785
            }
786
            
787 56
            $this->emit('eventRelay', array('message', $message));
788 56
        });
789 56
    }
790
    
791
    /**
792
     * Enables TLS on the connection.
793
     * @return \React\Promise\PromiseInterface
794
     */
795
    protected function enableTLS(): \React\Promise\PromiseInterface {
796
        // Set required SSL/TLS context options
797
        foreach($this->options['tls.context'] as $name => $value) {
798
            \stream_context_set_option($this->connection->stream, 'ssl', $name, $value);
799
        }
800
        
801
        return $this->encryption->enable($this->connection)->then(null, function (\Throwable $error) {
802
            $this->connection->close();
803
            throw new \RuntimeException('Connection failed during TLS handshake: '.$error->getMessage(), $error->getCode());
804
        });
805
    }
806
    
807
    /**
808
     * Sends the auth command.
809
     * @param \Plasma\Drivers\MySQL\Messages\HandshakeMessage  $message
810
     * @param array                                            $credentials
811
     * @param int                                              $clientFlags
812
     * @param string|null                                      $plugin
813
     * @param \React\Promise\Deferred                          $deferred
814
     * @return void
815
     */
816 56
    protected function createHandshakeResponse(
817
        \Plasma\Drivers\MySQL\Messages\HandshakeMessage $message, array $credentials, int $clientFlags, ?string $plugin, \React\Promise\Deferred $deferred
818
    ) {
819 56
        \extract($credentials);
820
        
821 56
        $auth = new \Plasma\Drivers\MySQL\Commands\HandshakeResponseCommand($this->parser, $message, $clientFlags, $plugin, $user, $password, $db);
822
        
823
        $auth->once('end', function () use (&$deferred) {
824
            $this->loop->futureTick(function () use (&$deferred) {
825 55
                $deferred->resolve();
826 55
            });
827 56
        });
828
        
829
        $auth->once('error', function (\Throwable $error) use (&$deferred) {
830 1
            $deferred->reject($error);
831 1
            $this->connection->close();
832 56
        });
833
        
834 56
        if($plugin) {
835
            $listener = function (\Plasma\Drivers\MySQL\Messages\MessageInterface $message) use ($password, &$deferred, &$listener) {
836
                /** @var \Plasma\Drivers\MySQL\AuthPlugins\AuthPluginInterface|null  $plugin */
837 56
                static $plugin;
838
                
839 56
                if($message instanceof \Plasma\Drivers\MySQL\Messages\AuthSwitchRequestMessage) {
840
                    $name = $message->authPluginName;
841
                    
842
                    if($name !== null) {
843
                        $plugins = \Plasma\Drivers\MySQL\DriverFactory::getAuthPlugins();
844
                        foreach($plugins as $key => $plug) {
845
                            if($key === $name) {
846
                                $plugin = new $plug($this->parser, $this->parser->getHandshakeMessage());
847
                                
848
                                $command = new \Plasma\Drivers\MySQL\Commands\AuthSwitchResponseCommand($message, $plugin, $password);
849
                                return $this->parser->invokeCommand($command);
850
                            }
851
                        }
852
                    }
853
                    
854
                    $deferred->reject((new \Plasma\Exception('Requested authentication method '.($name ? '"'.$name.'" ' : '').'is not supported')));
855 56
                } elseif($message instanceof \Plasma\Drivers\MySQL\Messages\AuthMoreDataMessage) {
856
                    if($plugin === null) {
857
                        $deferred->reject((new \Plasma\Exception('No auth plugin is in use, but we received auth more data packet')));
858
                        return $this->connection->close();
859
                    }
860
                    
861
                    try {
862
                        $command = $plugin->receiveMoreData($message);
863
                        return $this->parser->invokeCommand($command);
864
                    } catch (\Plasma\Exception $e) {
865
                        $deferred->reject($e);
866
                        $this->connection->close();
867
                    }
868 56
                } elseif($message instanceof \Plasma\Drivers\MySQL\Messages\OkResponseMessage) {
869 55
                    $this->parser->removeListener('message', $listener);
870
                }
871 56
            };
872
            
873 56
            $this->parser->on('message', $listener);
874
        }
875
        
876 56
        $this->parser->invokeCommand($auth);
877 56
        $this->connectionState = static::CONNECTION_AWAITING_RESPONSE;
878 56
    }
879
    
880
    /**
881
     * Get the real charset from the DB charset.
882
     * @param string  $charset
883
     * @return string
884
     */
885 4
    protected function getRealCharset(string $charset): string {
886 4
        if(\substr($charset, 0, 4) === 'utf8') {
887 2
            return 'UTF-8';
888
        }
889
        
890 2
        $charsets = \mb_list_encodings();
891
        
892 2
        foreach($charsets as $set) {
893 2
            if(\stripos($set, $charset) === 0) {
894 2
                return $set;
895
            }
896
        }
897
        
898 2
        return 'UTF-8';
899
    }
900
    
901
    /**
902
     * Validates the given options.
903
     * @param array  $options
904
     * @return void
905
     * @throws \InvalidArgumentException
906
     */
907 79
    protected function validateOptions(array $options) {
908 79
        $validator = \CharlotteDunois\Validation\Validator::make($options, array(
909 79
            'characters.set' => 'string',
910
            'characters.collate' => 'string',
911
            'connector' => 'class:\React\Socket\ConnectorInterface=object',
912
            'tls.context' => 'array',
913
            'tls.force' => 'boolean',
914
            'tls.forceLocal' => 'boolean'
915
        ));
916
        
917 79
        $validator->throw(\InvalidArgumentException::class);
918 79
    }
919
}
920