GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Failed Conditions
Push — master ( 888a68...c84b74 )
by Charlotte
05:53
created

Driver::connect()   C

Complexity

Conditions 11
Paths 6

Size

Total Lines 86
Code Lines 52

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 37
CRAP Score 13.5027

Importance

Changes 0
Metric Value
cc 11
eloc 52
nc 6
nop 1
dl 0
loc 86
ccs 37
cts 51
cp 0.7255
crap 13.5027
rs 6.9006
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
/**
3
 * Plasma Driver MySQL component
4
 * Copyright 2018 PlasmaPHP, All Rights Reserved
5
 *
6
 * Website: https://github.com/PlasmaPHP
7
 * License: https://github.com/PlasmaPHP/driver-mysql/blob/master/LICENSE
8
*/
9
10
namespace Plasma\Drivers\MySQL;
11
12
/**
13
 * The MySQL Driver.
14
 * @internal
15
 */
16
class Driver implements \Plasma\DriverInterface {
17
    use \Evenement\EventEmitterTrait;
18
    
19
    /**
20
     * @var \React\EventLoop\LoopInterface
21
     */
22
    protected $loop;
23
    
24
    /**
25
     * @var array
26
     */
27
    protected $options = array(
28
        'tls.context' => array(),
29
        'tls.force' => true,
30
        'tls.forceLocal' => false
31
    );
32
    
33
    /**
34
     * @var \React\Socket\ConnectorInterface
35
     */
36
    protected $connector;
37
    
38
    /**
39
     * Internal class is intentional used, as there's no other way currently.
40
     * @var \React\Socket\StreamEncryption
41
     * @see https://github.com/reactphp/socket/issues/180
42
     */
43
    protected $encryption;
44
    
45
    /**
46
     * @var \React\Socket\Connection
47
     */
48
    protected $connection;
49
    
50
    /**
51
     * @var int
52
     */
53
    protected $connectionState = \Plasma\DriverInterface::CONNECTION_CLOSED;
54
    
55
    /**
56
     * @var \Plasma\Drivers\MySQL\ProtocolParser
57
     */
58
    protected $parser;
59
    
60
    /**
61
     * @var array
62
     */
63
    protected $queue;
64
    
65
    /**
66
     * @var int
67
     */
68
    protected $busy = \Plasma\DriverInterface::STATE_IDLE;
69
    
70
    /**
71
     * @var bool
72
     */
73
    protected $transaction = false;
74
    
75
    /**
76
     * @var \React\Promise\Deferred
77
     */
78
    protected $goingAway;
79
    
80
    /**
0 ignored issues
show
Coding Style introduced by
Parameter $loop should have a doc-comment as per coding-style.
Loading history...
Coding Style introduced by
Parameter $options should have a doc-comment as per coding-style.
Loading history...
81
     * Constructor.
82
     */
83 21
    function __construct(\React\EventLoop\LoopInterface $loop, array $options) {
84 21
        $this->validateOptions($options);
85
        
86 21
        $this->loop = $loop;
87 21
        $this->options = \array_merge($this->options, $options);
88
        
89 21
        $this->connector = ($options['connector'] ?? (new \React\Socket\Connector($loop)));
90 21
        $this->encryption = new \React\Socket\StreamEncryption($this->loop, false);
91 21
        $this->queue = array();
92 21
    }
93
    
94
    /**
95
     * Returns the event loop.
96
     * @var \React\EventLoop\LoopInterface
97
     */
0 ignored issues
show
Coding Style introduced by
Missing @return tag in function comment
Loading history...
98 1
    function getLoop(): \React\EventLoop\LoopInterface {
99 1
        return $this->loop;
100
    }
101
    
102
    /**
103
     * Retrieves the current connection state.
104
     * @return int
105
     */
106 9
    function getConnectionState(): int {
107 9
        return $this->connectionState;
108
    }
109
    
110
    /**
111
     * Retrieves the current busy state.
112
     * @return int
113
     */
114 1
    function getBusyState(): int {
115 1
        return $this->busy;
116
    }
117
    
118
    /**
119
     * Get the length of the driver backlog queue.
120
     * @return int
121
     */
122 1
    function getBacklogLength(): int {
123 1
        return \count($this->queue);
124
    }
125
    
126
    /**
0 ignored issues
show
Coding Style introduced by
Parameter $uri should have a doc-comment as per coding-style.
Loading history...
127
     * Connects to the given URI.
128
     * @return \React\Promise\PromiseInterface
129
     */
130 16
    function connect(string $uri): \React\Promise\PromiseInterface {
131 16
        if($this->goingAway || $this->connectionState === \Plasma\DriverInterface::CONNECTION_UNUSABLE) {
132
            return \React\Promise\resolve((new \Plasma\Exception('Connection is going away or unusable')));
133
        }
134
        
135 16
        $uri = 'mysql://'.\ltrim($uri, 'mysql://');
136
        
137 16
        $parts = \parse_url($uri);
138 16
        if(!isset($parts['host'])) {
139
            return \React\Promise\reject((new \InvalidArgumentException('Invalid connect uri given')));
140
        }
141
        
142 16
        $host = $parts['host'].':'.($parts['port'] ?? 3306);
143 16
        $this->connectionState = static::CONNECTION_STARTED;
144 16
        $resolved = false;
145
        
146 16
        if(!empty($parts['query'])) {
147 1
            \parse_str($parts['query'], $query);
148 1
            $charset = $query['charset'] ?? null;
149 1
            $collate = $query['collate'] ?? null;
150
            
151 1
            unset($query);
152
        } else {
153 15
            $charset = null;
154 15
            $collate = null;
155
        }
156
        
157
        $connect =  $this->connector->connect($host)->then(function (\React\Socket\ConnectionInterface $connection) use ($parts, &$resolved) {
158
            // See description of property encryption
159 16
            if(!($connection instanceof \React\Socket\Connection)) {
160
                throw new \LogicException('Custom connection class is NOT supported yet (encryption limitation)');
161
            }
162
            
163 16
            $this->busy = static::STATE_BUSY;
164 16
            $this->connectionState = static::CONNECTION_MADE;
165 16
            $this->connection = $connection;
166
            
167
            $this->connection->on('error', function (\Throwable $error) {
168
                $this->emit('error', array($error));
169 16
            });
170
            
171
            $this->connection->on('close', function () {
172 2
                $this->connection = null;
173 2
                $this->connectionState = static::CONNECTION_UNUSABLE;
174
                
175 2
                $this->emit('close');
176 16
            });
177
            
178 16
            $deferred = new \React\Promise\Deferred();
179 16
            $this->parser = new \Plasma\Drivers\MySQL\ProtocolParser($this, $this->connection);
180
            
181
            $this->parser->on('error', function (\Throwable $error) use (&$deferred, &$resolved) {
182
                if($resolved) {
183
                    $this->emit('error', array($error));
184
                } else {
185
                    $deferred->reject($error);
186
                }
187 16
            });
188
            
189 16
            $user = ($parts['user'] ?? 'root');
190 16
            $password = ($parts['pass'] ?? '');
191 16
            $db = (!empty($parts['path']) ? \ltrim($parts['path'], '/') : '');
192
            
193 16
            $credentials = \compact('user', 'password', 'db');
194
            
195 16
            $this->startHandshake($credentials, $deferred);
196
            return $deferred->promise()->then(function () use (&$resolved) {
197
                $this->busy = static::STATE_IDLE;
198
                $resolved = true;
199
                
200
                if(\count($this->queue) > 0) {
201
                    $this->parser->invokeCommand($this->getNextCommand());
202
                }
203 16
            });
204 16
        });
205
        
206 16
        if($charset) {
207
            $connect = $connect->then(function () use ($charset, $collate) {
208
                $query = 'SET NAMES "'.$charset.'"'.($collate ? ' COLLATE "'.$collate.'"' : '');
209
                
210
                $cmd = new \Plasma\Drivers\MySQL\Commands\QueryCommand($this, $query);
211
                $this->executeCommand($cmd);
212 1
            });
213
        }
214
        
215 16
        return $connect;
216
    }
217
    
218
    /**
219
     * Pauses the underlying stream I/O consumption.
220
     * If consumption is already paused, this will do nothing.
221
     * @return bool  Whether the operation was successful.
222
     */
223
    function pauseStreamConsumption(): bool {
224
        if($this->goingAway) {
225
            return false;
226
        }
227
        
228
        $this->connection->pause();
229
        return true;
230
    }
231
    
232
    /**
233
     * Resumes the underlying stream I/O consumption.
234
     * If consumption is not paused, this will do nothing.
235
     * @return bool  Whether the operation was successful.
236
     */
237
    function resumeStreamConsumption(): bool {
238
        if($this->goingAway) {
239
            return false;
240
        }
241
        
242
        $this->connection->resume();
243
        return true;
244
    }
245
    
246
    /**
247
     * Closes all connections gracefully after processing all outstanding requests.
248
     * @return \React\Promise\PromiseInterface
249
     */
250
    function close(): \React\Promise\PromiseInterface {
251
        if($this->goingAway) {
252
            return $this->goingAway->promise();
253
        }
254
        
255
        $state = $this->connectionState;
256
        $this->connectionState = \Plasma\DriverInterface::CONNECTION_UNUSABLE;
257
        
258
        $this->goingAway = new \React\Promise\Deferred();
259
        
260
        if(\count($this->queue) === 0) {
261
            $this->goingAway->resolve();
262
        }
263
        
264
        return $this->goingAway->promise()->then(function () use ($state) {
265
            if($state !== static::CONNECTION_OK) {
266
                return;
267
            }
268
            
269
            $deferred = new \React\Promise\Deferred();
270
            
271
            $quit = new \Plasma\Drivers\MySQL\Commands\QuitCommand();
272
            
273
            $this->connection->once('close', function () use (&$deferred) {
274
                $deferred->resolve();
275
            });
276
            
277
            $quit->once('end', function () {
278
                $this->connection->close();
279
            });
280
            
281
            $this->parser->invokeCommand($quit);
282
            
283
            return $deferred->promise();
284
        });
285
    }
286
    
287
    /**
288
     * Forcefully closes the connection, without waiting for any outstanding requests. This will reject all outstanding requests.
289
     * @return void
290
     */
291
    function quit(): void {
292
        if($this->goingAway) {
293
            return;
294
        }
295
        
296
        $state = $this->connectionState;
297
        $this->connectionState = \Plasma\DriverInterface::CONNECTION_UNUSABLE;
298
        
299
        $this->goingAway = new \React\Promise\Deferred();
300
        $this->goingAway->resolve();
301
        
302
        /** @var \Plasma\Drivers\MySQL\Commands\CommandInterface  $command */
303
        while($command = \array_shift($this->queue)) {
304
            $command->emit('error', array((new \Plasma\Exception('Connection is going away'))));
305
        }
306
        
307
        if($state === static::CONNECTION_OK) {
308
            $quit = new \Plasma\Drivers\MySQL\Commands\QuitCommand();
309
            $this->parser->invokeCommand($quit);
310
            
311
            $this->connection->close();
312
        }
313
    }
314
    
315
    /**
316
     * Whether this driver is currently in a transaction.
317
     * @return bool
318
     */
319
    function isInTransaction(): bool {
320
        return $this->transaction;
321
    }
322
    
323
    /**
324
     * Executes a plain query. Resolves with a `QueryResultInterface` instance.
325
     * When the command is done, the driver must check itself back into the client.
326
     * @param \Plasma\ClientInterface  $client
327
     * @param string                   $query
328
     * @return \React\Promise\PromiseInterface
329
     * @throws \Plasma\Exception
330
     * @see \Plasma\QueryResultInterface
331
     */
332
    function query(\Plasma\ClientInterface $client, string $query): \React\Promise\PromiseInterface {
333
        if($this->goingAway) {
334
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
335
        }
336
        
337
        $command = new \Plasma\Drivers\MySQL\Commands\QueryCommand($this, $query);
338
        $this->executeCommand($command);
339
        
340
        if(!$this->transaction) {
341
            $command->once('end', function () use (&$client) {
342
                $client->checkinConnection($this);
343
            });
344
        }
345
        
346
        return $command->getPromise();
347
    }
348
    
349
    /**
350
     * Prepares a query. Resolves with a `StatementInterface` instance.
351
     * When the command is done, the driver must check itself back into the client.
352
     * @param \Plasma\ClientInterface  $client
353
     * @param string                   $query
354
     * @return \React\Promise\PromiseInterface
355
     * @throws \Plasma\Exception
356
     * @see \Plasma\StatementInterface
357
     */
358
    function prepare(\Plasma\ClientInterface $client, string $query): \React\Promise\PromiseInterface {
359
        if($this->goingAway) {
360
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
361
        }
362
        
363
        $command = new \Plasma\Drivers\MySQL\Commands\PrepareCommand($client, $this, $query);
364
        $this->executeCommand($command);
365
        
366
        return $command->getPromise();
367
    }
368
    
369
    /**
370
     * Prepares and executes a query. Resolves with a `QueryResultInterface` instance.
371
     * This is equivalent to prepare -> execute -> close.
372
     * If you need to execute a query multiple times, prepare the query manually for performance reasons.
373
     * @param \Plasma\ClientInterface  $client
374
     * @param string                   $query
375
     * @param array                    $params
376
     * @return \React\Promise\PromiseInterface
377
     * @throws \Plasma\Exception
378
     * @see \Plasma\StatementInterface
379
     */
380
    function execute(\Plasma\ClientInterface $client, string $query, array $params = array()): \React\Promise\PromiseInterface {
381
        if($this->goingAway) {
382
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
383
        }
384
        
385
        return $this->prepare($client, $query)->then(function (\Plasma\StatementInterface $statement) use ($params) {
386
            return $statement->execute($params)->then(function (\Plasma\QueryResultInterface $result) use (&$statement) {
387
                if($result instanceof \Plasma\StreamQueryResultInterface) {
388
                    $statement->close(null, function (\Throwable $error) {
389
                        $this->emit('error', array($error));
390
                    });
391
                    
392
                    return $result;
393
                }
394
                
395
                return $statement->close()->then(function () use ($result) {
396
                    return $result;
397
                });
398
            }, function (\Throwable $error) use (&$statement) {
399
                return $statement->close()->then(function () use ($error) {
400
                    throw $error;
401
                });
402
            });
403
        });
404
    }
405
    
406
    /**
407
     * Quotes the string for use in the query.
408
     * @param string  $str
409
     * @return string
410
     * @throws \LogicException  Thrown if the driver does not support quoting.
411
     * @throws \Plasma\Exception
412
     */
413
    function quote(string $str): string { // TODO
0 ignored issues
show
Coding Style introduced by
Opening brace must be the last content on the line
Loading history...
414
        throw new \LogicException('Not implemented yet');
415
    }
416
    
417
    /**
418
     * Begins a transaction. Resolves with a `TransactionInterface` instance.
419
     *
420
     * Checks out a connection until the transaction gets committed or rolled back.
421
     * It must be noted that the user is responsible for finishing the transaction. The client WILL NOT automatically
422
     * check the connection back into the pool, as long as the transaction is not finished.
423
     *
424
     * Some databases, including MySQL, automatically issue an implicit COMMIT when a database definition language (DDL)
425
     * statement such as DROP TABLE or CREATE TABLE is issued within a transaction.
426
     * The implicit COMMIT will prevent you from rolling back any other changes within the transaction boundary.
427
     * @param \Plasma\ClientInterface  $client
428
     * @param int                      $isolation  See the `TransactionInterface` constants.
429
     * @return \React\Promise\PromiseInterface
430
     * @throws \Plasma\Exception
431
     * @see \Plasma\TransactionInterface
432
     */
433
    function beginTransaction(\Plasma\ClientInterface $client, int $isolation = \Plasma\TransactionInterface::ISOLATION_COMMITTED): \React\Promise\PromiseInterface {
434
        if($this->goingAway) {
435
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
436
        }
437
        
438
        if($this->transaction) {
439
            throw new \Plasma\Exception('Driver is already in transaction');
440
        }
441
        
442
        switch ($isolation) {
443
            case \Plasma\TransactionInterface::ISOLATION_UNCOMMITTED:
1 ignored issue
show
Coding Style introduced by
Line indented incorrectly; expected 8 spaces, found 12
Loading history...
444
                $query = 'SET SESSION TRANSACTION ISOLATION LEVEL READ UNCOMMITTED';
445
            break;
1 ignored issue
show
Coding Style introduced by
Case breaking statement indented incorrectly; expected 16 spaces, found 12
Loading history...
446
            case \Plasma\TransactionInterface::ISOLATION_COMMITTED:
1 ignored issue
show
Coding Style introduced by
Line indented incorrectly; expected 8 spaces, found 12
Loading history...
447
                $query = 'SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED';
448
            break;
1 ignored issue
show
Coding Style introduced by
Case breaking statement indented incorrectly; expected 16 spaces, found 12
Loading history...
449
            case \Plasma\TransactionInterface::ISOLATION_REPEATABLE:
1 ignored issue
show
Coding Style introduced by
Line indented incorrectly; expected 8 spaces, found 12
Loading history...
450
                $query = 'SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ';
451
            break;
1 ignored issue
show
Coding Style introduced by
Case breaking statement indented incorrectly; expected 16 spaces, found 12
Loading history...
452
            case \Plasma\TransactionInterface::ISOLATION_SERIALIZABLE:
1 ignored issue
show
Coding Style introduced by
Line indented incorrectly; expected 8 spaces, found 12
Loading history...
453
                $query = 'SET SESSION TRANSACTION ISOLATION LEVEL SERIALIZABLE';
454
            break;
1 ignored issue
show
Coding Style introduced by
Case breaking statement indented incorrectly; expected 16 spaces, found 12
Loading history...
455
            default:
1 ignored issue
show
Coding Style introduced by
Line indented incorrectly; expected 8 spaces, found 12
Loading history...
456
                throw new \Plasma\Exception('Invalid isolation level given');
457
            break;
458
        }
459
        
460
        $this->transaction = true;
461
        
462
        return $this->query($client, $query)->then(function () use (&$client) {
463
            return $this->query($client, 'START TRANSACTION');
464
        })->then(function () use (&$client, $isolation) {
465
            return (new \Plasma\Transaction($client, $this, $isolation));
466
        })->otherwise(function (\Throwable $e) {
0 ignored issues
show
Bug introduced by
The method otherwise() does not exist on React\Promise\PromiseInterface. It seems like you code against a sub-type of said class. However, the method does not exist in React\Promise\SimpleFulfilledTestPromise or React\Promise\SimpleRejectedTestPromise or React\Promise\CancellablePromiseInterface. Are you sure you never get one of those? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

466
        })->/** @scrutinizer ignore-call */ otherwise(function (\Throwable $e) {
Loading history...
467
            $this->transaction = false;
468
            throw $e;
469
        });
470
    }
471
    
472
    /**
473
     * Informationally closes a transaction. This method is used by `Transaction` to inform the driver of the end of the transaction.
474
     * @return void
475
     */
476
    function endTransaction(): void {
477
        $this->transaction = false;
478
    }
479
    
480
    /**
481
     * Runs the given command.
482
     * Returns a Promise, which resolves with the `end` event argument (defaults to `null),
483
     * or rejects with the `Throwable` of the `error` event.
484
     * When the command is done, the driver must check itself back into the client.
485
     * @param \Plasma\ClientInterface   $client
486
     * @param \Plasma\CommandInterface  $command
487
     * @return \React\Promise\PromiseInterface
488
     */
489 1
    function runCommand(\Plasma\ClientInterface $client, \Plasma\CommandInterface $command) {
490 1
        if($this->goingAway) {
491
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
492
        }
493
        
494
        return (new \React\Promise\Promise(function (callable $resolve, callable $reject) use (&$client, &$command) {
495
            $command->once('end', function ($value = null) use (&$client, &$resolve) {
496
                if(!$this->transaction) {
497
                    $client->checkinConnection($this);
498
                }
499
                
500
                $resolve($value);
501 1
            });
502
            
503
            $command->once('error', function (\Throwable $error) use (&$client, &$reject) {
504
                if(!$this->transaction) {
505
                    $client->checkinConnection($this);
506
                }
507
                
508
                $reject($error);
509 1
            });
510
            
511 1
            $this->executeCommand($command);
512 1
        }));
513
    }
514
    
515
    /**
516
     * Executes a command.
517
     * @param \Plasma\CommandInterface  $command
518
     * @return void
519
     * @internal
520
     */
521 1
    function executeCommand(\Plasma\CommandInterface $command): void {
522 1
        $this->queue[] = $command;
523 1
        \assert((\Plasma\Drivers\MySQL\Messages\MessageUtility::debug('Command '.get_class($command).' added to queue') || true));
524
        
525 1
        if($this->parser && $this->busy === static::STATE_IDLE) {
526
            \assert((\Plasma\Drivers\MySQL\Messages\MessageUtility::debug('Command '.get_class($command).' invoked into parser') || true));
527
            $this->parser->invokeCommand($this->getNextCommand());
528
        }
529 1
    }
530
    
531
    /**
532
     * Get the handshake message, or null if none received yet.
533
     * @return \Plasma\Drivers\MySQL\Messages\HandshakeMessage|null
534
     */
535
    function getHandshake(): ?\Plasma\Drivers\MySQL\Messages\HandshakeMessage {
536
        if($this->parser) {
537
            return $this->parser->getHandshakeMessage();
538
        }
539
        
540
        return null;
541
    }
542
    
543
    /**
544
     * Get the next command, or null.
545
     * @return \Plasma\CommandInterface|null
546
     * @internal
547
     */
548
    function getNextCommand(): ?\Plasma\CommandInterface {
549
        if(\count($this->queue) === 0) {
550
            if($this->goingAway) {
551
                $this->goingAway->resolve();
552
            }
553
            
554
            return null;
555
        } elseif($this->busy === static::STATE_BUSY) {
556
            return null;
557
        }
558
        
559
        /** @var \Plasma\CommandInterface  $command */
560
        $command =  \array_shift($this->queue);
561
        
562
        \assert((\Plasma\Drivers\MySQL\Messages\MessageUtility::debug('Unshifted command '.get_class($command)) || true));
563
        
564
        if($command->waitForCompletion()) {
565
            $this->busy = static::STATE_BUSY;
566
            
567
            $command->once('error', function () use (&$command) {
568
                \assert((\Plasma\Drivers\MySQL\Messages\MessageUtility::debug('Command '.get_class($command).' errored') || true));
569
                $this->busy = static::STATE_IDLE;
570
                
571
                $this->endCommand();
572
            });
573
            
574
            $command->once('end', function () use (&$command) {
575
                \assert((\Plasma\Drivers\MySQL\Messages\MessageUtility::debug('Command '.get_class($command).' ended') || true));
576
                $this->busy = static::STATE_IDLE;
577
                
578
                $this->endCommand();
579
            });
580
        } else {
581
            $this->endCommand();
582
        }
583
        
584
        return $command;
585
    }
586
    
587
    /**
588
     * Finishes up a command.
589
     * @return void
590
     */
591
    protected function endCommand() {
592
        $this->loop->futureTick(function () {
593
            if($this->goingAway && \count($this->queue) === 0) {
594
                return $this->goingAway->resolve();
595
            }
596
            
597
            $this->parser->invokeCommand($this->getNextCommand());
598
        });
599
    }
600
    
601
    /**
602
     * Starts the handshake process.
603
     * @param array                    $credentials
604
     * @param \React\Promise\Deferred  $deferred
605
     * @return void
606
     */
607 16
    protected function startHandshake(array $credentials, \React\Promise\Deferred $deferred) {
608
        $listener = function (\Plasma\Drivers\MySQL\Messages\MessageInterface $message) use ($credentials, &$deferred, &$listener) {
609 16
            if($message instanceof \Plasma\Drivers\MySQL\Messages\HandshakeMessage) {
610 16
                $this->parser->removeListener('message', $listener);
611
                
612 16
                $this->connectionState = static::CONNECTION_SETENV;
613 16
                $clientFlags = \Plasma\Drivers\MySQL\ProtocolParser::CLIENT_CAPABILITIES;
614
                
615 16
                \extract($credentials);
616
                
617 16
                if($db !== '') {
618 4
                    $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_CONNECT_WITH_DB;
619
                }
620
                
621
                // Check if we support auth plugins
622 16
                $plugins = \Plasma\Drivers\MySQL\DriverFactory::getAuthPlugins();
623 16
                $plugin = null;
624
                
625 16
                foreach($plugins as $key => $plug) {
626 16
                    if(\is_int($key) && ($message->capability & $key) !== 0) {
627 16
                        $plugin = $plug;
628 16
                        $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_PLUGIN_AUTH;
629 16
                        break;
630
                    } elseif($key === $message->authPluginName) {
631
                        $plugin = $plug;
632
                        $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_PLUGIN_AUTH;
633
                        break;
634
                    }
635
                }
636
                
637 16
                $remote = \parse_url($this->connection->getRemoteAddress())['host'];
638
                
639 16
                if($remote !== '127.0.0.1' || $this->options['tls.forceLocal']) {
640 2
                    if(($message->capability & \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_SSL) !== 0) { // If SSL supported, connect through SSL
641 2
                        $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_SSL;
642
                        
643 2
                        $ssl = new \Plasma\Drivers\MySQL\Commands\SSLRequestCommand($message, $clientFlags);
644
                        
645
                        $ssl->once('end', function () use ($credentials, $clientFlags, $plugin, &$deferred, &$message) {
646 2
                            $this->connectionState = static::CONNECTION_SSL_STARTUP;
647
                            
648
                            $this->enableTLS()->then(function () use ($credentials, $clientFlags, $plugin, &$deferred, &$message) {
649
                                $this->createHandshakeResponse($message, $credentials, $clientFlags, $plugin, $deferred);
650
                            }, function (\Throwable $error) use (&$deferred) {
651 1
                                $deferred->reject($$error);
652
                                $this->connection->close();
653 2
                            });
654 2
                        });
655
                        
656 2
                        return $this->parser->invokeCommand($ssl);
657
                    } elseif($this->options['tls.force'] || $this->options['tls.forceLocal']) {
658
                        $deferred->reject((new \Plasma\Exception('TLS is not supported by the server')));
659
                        $this->connection->close();
660
                        return;
661
                    }
662
                }
663
                
664 14
                $this->createHandshakeResponse($message, $credentials, $clientFlags, $plugin, $deferred);
665
            }
666 16
        };
667
        
668 16
        $this->parser->on('message', $listener);
669
        
670
        $this->parser->on('message', function (\Plasma\Drivers\MySQL\Messages\MessageInterface $message) {
671 16
            if($message instanceof \Plasma\Drivers\MySQL\Messages\OkResponseMessage) {
672
                $this->connectionState = static::CONNECTION_OK;
673
            }
674
            
675 16
            $this->emit('eventRelay', array('message', $message));
676 16
        });
677 16
    }
678
    
679
    /**
680
     * Enables TLS on the connection.
681
     * @return \React\Promise\PromiseInterface
682
     */
683 2
    protected function enableTLS(): \React\Promise\PromiseInterface {
684
        // Set required SSL/TLS context options
685 2
        foreach($this->options['tls.context'] as $name => $value) {
686
            \stream_context_set_option($this->connection->stream, 'ssl', $name, $value);
687
        }
688
        
689
        return $this->encryption->enable($this->connection)->otherwise(function (\Throwable $error) {
690 1
            $this->connection->close();
691 1
            throw new \RuntimeException('Connection failed during TLS handshake: '.$error->getMessage(), $error->getCode());
692 2
        });
693
    }
694
    
695
    /**
696
     * Sends the auth command.
697
     * @param \Plasma\Drivers\MySQL\Messages\HandshakeMessage  $message
698
     * @param array                                            $credentials
699
     * @param int                                              $clientFlags
700
     * @param string|null                                      $plugin
701
     * @param \React\Promise\Deferred                          $deferred
702
     * @return void
703
     */
704 14
    protected function createHandshakeResponse(
705
        \Plasma\Drivers\MySQL\Messages\HandshakeMessage $message, array $credentials, int $clientFlags, ?string $plugin, \React\Promise\Deferred $deferred
706
    ) {
707 14
        \extract($credentials);
708
        
709 14
        $auth = new \Plasma\Drivers\MySQL\Commands\HandshakeResponseCommand($this->parser, $message, $clientFlags, $plugin, $user, $password, $db);
710
        
711
        $auth->once('end', function () use (&$deferred) {
712
            $deferred->resolve();
713 14
        });
714
        
715
        $auth->once('error', function (\Throwable $error) use (&$deferred) {
716 1
            $deferred->reject($error);
717 1
            $this->connection->close();
718 14
        });
719
        
720 14
        if($plugin) {
721
            $listener = function (\Plasma\Drivers\MySQL\Messages\MessageInterface $message) use ($password, &$deferred, &$listener) {
722
                /** @var \Plasma\Drivers\MySQL\AuthPlugins\AuthPluginInterface|null  $plugin */
723 1
                static $plugin;
724
                
725 1
                if($message instanceof \Plasma\Drivers\MySQL\Messages\AuthSwitchRequestMessage) {
726
                    $name = $message->authPluginName;
727
                    
728
                    if($name !== null) {
729
                        foreach($plugins as $key => $plug) {
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable $plugins does not exist. Did you maybe mean $plugin?
Loading history...
730
                            if($key === $name) {
731
                                $plugin = new $plug($this->parser, $message);
732
                                
733
                                $command = new \Plasma\Drivers\MySQL\Commands\AuthSwitchResponseCommand($message, $plugin, $password);
734
                                return $this->parser->invokeCommand($command);
735
                            }
736
                        }
737
                    }
738
                    
739
                    $deferred->reject((new \Plasma\Exception('Requested authentication method '.($name ? '"'.$name.'" ' : '').'is not supported')));
740 1
                } elseif($message instanceof \Plasma\Drivers\MySQL\Messages\AuthMoreDataMessage) {
741
                    if($plugin === null) {
742
                        $deferred->reject((new \Plasma\Exception('No auth plugin is in use, but we received auth more data packet')));
743
                        return $this->connection->close();
744
                    }
745
                    
746
                    try {
747
                        $command = $plugin->receiveMoreData($message);
748
                        return $this->parser->invokeCommand($command);
749
                    } catch (\Plasma\Exception $e) {
750
                        $deferred->reject($e);
751
                        $this->connection->close();
752
                    }
753 1
                } elseif($message instanceof \Plasma\Drivers\MySQL\Messages\OkResponseMessage) {
754
                    $this->parser->removeListener('message', $listener);
755
                }
756 14
            };
757
            
758 14
            $this->parser->on('message', $listener);
759
        }
760
        
761 14
        $this->parser->invokeCommand($auth);
762 14
        $this->connectionState = static::CONNECTION_AWAITING_RESPONSE;
763 14
    }
764
    
765
    /**
766
     * Validates the given options.
767
     * @param array  $options
768
     * @return void
769
     * @throws \InvalidArgumentException
770
     */
771 21
    protected function validateOptions(array $options) {
772 21
        $validator = \CharlotteDunois\Validation\Validator::make($options, array(
773 21
            'connector' => 'class:\React\Socket\ConnectorInterface=object',
774
            'tls.context' => 'array',
775
            'tls.force' => 'boolean',
776
            'tls.forceLocal' => 'boolean'
777
        ));
778
        
779 21
        $validator->throw(\InvalidArgumentException::class);
780 21
    }
781
}
782