GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Push — master ( bf6976...0e8096 )
by Charlotte
03:40
created

Driver::prepare()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 9
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 5
CRAP Score 2.0185

Importance

Changes 0
Metric Value
cc 2
eloc 5
nc 2
nop 2
dl 0
loc 9
ccs 5
cts 6
cp 0.8333
crap 2.0185
rs 10
c 0
b 0
f 0
1
<?php
2
/**
3
 * Plasma Driver MySQL component
4
 * Copyright 2018 PlasmaPHP, All Rights Reserved
5
 *
6
 * Website: https://github.com/PlasmaPHP
7
 * License: https://github.com/PlasmaPHP/driver-mysql/blob/master/LICENSE
8
*/
9
10
namespace Plasma\Drivers\MySQL;
11
12
/**
13
 * The MySQL Driver.
14
 * @internal
15
 */
16
class Driver implements \Plasma\DriverInterface {
17
    use \Evenement\EventEmitterTrait;
18
    
19
    /**
20
     * @var \React\EventLoop\LoopInterface
21
     */
22
    protected $loop;
23
    
24
    /**
25
     * @var array
26
     */
27
    protected $options = array(
28
        'tls.context' => array(),
29
        'tls.force' => true,
30
        'tls.forceLocal' => false
31
    );
32
    
33
    /**
34
     * @var \React\Socket\ConnectorInterface
35
     */
36
    protected $connector;
37
    
38
    /**
39
     * Internal class is intentional used, as there's no other way currently.
40
     * @var \React\Socket\StreamEncryption
41
     * @see https://github.com/reactphp/socket/issues/180
42
     */
43
    protected $encryption;
44
    
45
    /**
46
     * @var \React\Promise\Promise|null
47
     */
48
    protected $connectPromise;
49
    
50
    /**
51
     * @var \React\Socket\Connection
52
     */
53
    protected $connection;
54
    
55
    /**
56
     * @var int
57
     */
58
    protected $connectionState = \Plasma\DriverInterface::CONNECTION_CLOSED;
59
    
60
    /**
61
     * @var \Plasma\Drivers\MySQL\ProtocolParser
62
     */
63
    protected $parser;
64
    
65
    /**
66
     * @var array
67
     */
68
    protected $queue;
69
    
70
    /**
71
     * @var int
72
     */
73
    protected $busy = \Plasma\DriverInterface::STATE_IDLE;
74
    
75
    /**
76
     * @var bool
77
     */
78
    protected $transaction = false;
79
    
80
    /**
81
     * @var \React\Promise\Deferred
82
     */
83
    protected $goingAway;
84
    
85
    /**
86
     * Constructor.
87
     * @param \React\EventLoop\LoopInterface  $loop
88
     * @param array                           $options
89
     */
90 17
    function __construct(\React\EventLoop\LoopInterface $loop, array $options) {
91 17
        $this->validateOptions($options);
92
        
93 17
        $this->loop = $loop;
94 17
        $this->options = \array_merge($this->options, $options);
95
        
96 17
        $this->connector = ($options['connector'] ?? (new \React\Socket\Connector($loop)));
97 17
        $this->encryption = new \React\Socket\StreamEncryption($this->loop, false);
98 17
        $this->queue = array();
99 17
    }
100
    
101
    /**
102
     * Returns the event loop.
103
     * @return \React\EventLoop\LoopInterface
104
     */
105 7
    function getLoop(): \React\EventLoop\LoopInterface {
106 7
        return $this->loop;
107
    }
108
    
109
    /**
110
     * Retrieves the current connection state.
111
     * @return int
112
     */
113 5
    function getConnectionState(): int {
114 5
        return $this->connectionState;
115
    }
116
    
117
    /**
118
     * Retrieves the current busy state.
119
     * @return int
120
     */
121 1
    function getBusyState(): int {
122 1
        return $this->busy;
123
    }
124
    
125
    /**
126
     * Get the length of the driver backlog queue.
127
     * @return int
128
     */
129 1
    function getBacklogLength(): int {
130 1
        return \count($this->queue);
131
    }
132
    
133
    /**
134
     * Connects to the given URI.
135
     * @param string  $uri
136
     * @return \React\Promise\PromiseInterface
137
     */
138 12
    function connect(string $uri): \React\Promise\PromiseInterface {
139 12
        if($this->goingAway || $this->connectionState === \Plasma\DriverInterface::CONNECTION_UNUSABLE) {
140
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away or unusable')));
141 12
        } elseif($this->connectionState === \Plasma\DriverInterface::CONNECTION_OK) {
142
            return \React\Promise\resolve();
143 12
        } elseif($this->connectPromise !== null) {
144
            return $this->connectPromise;
145
        }
146
        
147 12
        $uri = 'mysql://'.\ltrim($uri, 'mysql://');
148
        
149 12
        $parts = \parse_url($uri);
150 12
        if(!isset($parts['host'])) {
151
            return \React\Promise\reject((new \InvalidArgumentException('Invalid connect uri given')));
152
        }
153
        
154 12
        $host = $parts['host'].':'.($parts['port'] ?? 3306);
155 12
        $this->connectionState = static::CONNECTION_STARTED;
156 12
        $resolved = false;
157
        
158 12
        if(!empty($parts['query'])) {
159 1
            \parse_str($parts['query'], $query);
160 1
            $charset = $query['charset'] ?? null;
161 1
            $collate = $query['collate'] ?? null;
162
            
163 1
            unset($query);
164
        } else {
165 11
            $charset = null;
166 11
            $collate = null;
167
        }
168
        
169
        $this->connectPromise =  $this->connector->connect($host)->then(function (\React\Socket\ConnectionInterface $connection) use ($parts, &$resolved) {
170
            // See description of property encryption
171 12
            if(!($connection instanceof \React\Socket\Connection)) {
172
                throw new \LogicException('Custom connection class is NOT supported yet (encryption limitation)');
173
            }
174
            
175 12
            $this->busy = static::STATE_BUSY;
176 12
            $this->connectionState = static::CONNECTION_MADE;
177 12
            $this->connection = $connection;
178
            
179
            $this->connection->on('error', function (\Throwable $error) {
180
                $this->emit('error', array($error));
181 12
            });
182
            
183
            $this->connection->on('close', function () {
184 3
                $this->connection = null;
185 3
                $this->connectionState = static::CONNECTION_UNUSABLE;
186
                
187 3
                $this->emit('close');
188 12
            });
189
            
190 12
            $deferred = new \React\Promise\Deferred();
191 12
            $this->parser = new \Plasma\Drivers\MySQL\ProtocolParser($this, $this->connection);
192
            
193
            $this->parser->on('error', function (\Throwable $error) use (&$deferred, &$resolved) {
194
                if($resolved) {
195
                    $this->emit('error', array($error));
196
                } else {
197
                    $deferred->reject($error);
198
                }
199 12
            });
200
            
201 12
            $user = ($parts['user'] ?? 'root');
202 12
            $password = ($parts['pass'] ?? '');
203 12
            $db = (!empty($parts['path']) ? \ltrim($parts['path'], '/') : '');
204
            
205 12
            $credentials = \compact('user', 'password', 'db');
206
            
207 12
            $this->startHandshake($credentials, $deferred);
208
            return $deferred->promise()->then(function () use (&$resolved) {
209 11
                $this->busy = static::STATE_IDLE;
210 11
                $resolved = true;
211
                
212 11
                if(\count($this->queue) > 0) {
213
                    $this->parser->invokeCommand($this->getNextCommand());
214
                }
215 12
            });
216 12
        });
217
        
218 12
        if($charset) {
219
            $this->connectPromise = $this->connectPromise->then(function () use ($charset, $collate) {
220 1
                $query = 'SET NAMES "'.$charset.'"'.($collate ? ' COLLATE "'.$collate.'"' : '');
221
                
222 1
                $cmd = new \Plasma\Drivers\MySQL\Commands\QueryCommand($this, $query);
223 1
                $this->executeCommand($cmd);
224 1
            });
225
        }
226
        
227 12
        return $this->connectPromise;
228
    }
229
    
230
    /**
231
     * Pauses the underlying stream I/O consumption.
232
     * If consumption is already paused, this will do nothing.
233
     * @return bool  Whether the operation was successful.
234
     */
235
    function pauseStreamConsumption(): bool {
236
        if($this->goingAway) {
237
            return false;
238
        }
239
        
240
        $this->connection->pause();
241
        return true;
242
    }
243
    
244
    /**
245
     * Resumes the underlying stream I/O consumption.
246
     * If consumption is not paused, this will do nothing.
247
     * @return bool  Whether the operation was successful.
248
     */
249
    function resumeStreamConsumption(): bool {
250
        if($this->goingAway) {
251
            return false;
252
        }
253
        
254
        $this->connection->resume();
255
        return true;
256
    }
257
    
258
    /**
259
     * Closes all connections gracefully after processing all outstanding requests.
260
     * @return \React\Promise\PromiseInterface
261
     */
262 1
    function close(): \React\Promise\PromiseInterface {
263 1
        if($this->goingAway) {
264
            return $this->goingAway->promise();
265
        }
266
        
267 1
        $state = $this->connectionState;
268 1
        $this->connectionState = \Plasma\DriverInterface::CONNECTION_UNUSABLE;
269
        
270 1
        $this->goingAway = new \React\Promise\Deferred();
271
        
272 1
        if(\count($this->queue) === 0) {
273
            $this->goingAway->resolve();
274
        }
275
        
276
        return $this->goingAway->promise()->then(function () use ($state) {
277 1
            if($state !== static::CONNECTION_OK) {
278
                return;
279
            }
280
            
281 1
            $deferred = new \React\Promise\Deferred();
282
            
283 1
            $quit = new \Plasma\Drivers\MySQL\Commands\QuitCommand();
284
            
285
            $this->connection->once('close', function () use (&$deferred) {
286 1
                $deferred->resolve();
287 1
            });
288
            
289
            $quit->once('end', function () {
290
                $this->connection->close();
291 1
            });
292
            
293 1
            $this->parser->invokeCommand($quit);
294
            
295 1
            return $deferred->promise();
296 1
        });
297
    }
298
    
299
    /**
300
     * Forcefully closes the connection, without waiting for any outstanding requests. This will reject all outstanding requests.
301
     * @return void
302
     */
303 1
    function quit(): void {
304 1
        if($this->goingAway) {
305
            return;
306
        }
307
        
308 1
        $state = $this->connectionState;
309 1
        $this->connectionState = \Plasma\DriverInterface::CONNECTION_UNUSABLE;
310
        
311 1
        $this->goingAway = new \React\Promise\Deferred();
312 1
        $this->goingAway->resolve();
313
        
314
        /** @var \Plasma\Drivers\MySQL\Commands\CommandInterface  $command */
315 1
        while($command = \array_shift($this->queue)) {
316 1
            $command->emit('error', array((new \Plasma\Exception('Connection is going away'))));
317
        }
318
        
319 1
        if($state === static::CONNECTION_OK) {
320 1
            $quit = new \Plasma\Drivers\MySQL\Commands\QuitCommand();
321 1
            $this->parser->invokeCommand($quit);
322
            
323 1
            $this->connection->close();
324
        }
325 1
    }
326
    
327
    /**
328
     * Whether this driver is currently in a transaction.
329
     * @return bool
330
     */
331 1
    function isInTransaction(): bool {
332 1
        return $this->transaction;
333
    }
334
    
335
    /**
336
     * Executes a plain query. Resolves with a `QueryResultInterface` instance.
337
     * When the command is done, the driver must check itself back into the client.
338
     * @param \Plasma\ClientInterface  $client
339
     * @param string                   $query
340
     * @return \React\Promise\PromiseInterface
341
     * @throws \Plasma\Exception
342
     * @see \Plasma\QueryResultInterface
343
     */
344 4
    function query(\Plasma\ClientInterface $client, string $query): \React\Promise\PromiseInterface {
345 4
        if($this->goingAway) {
346
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
347
        }
348
        
349 4
        $command = new \Plasma\Drivers\MySQL\Commands\QueryCommand($this, $query);
350 4
        $this->executeCommand($command);
351
        
352 4
        if(!$this->transaction) {
353
            $command->once('end', function () use (&$client) {
354 3
                $client->checkinConnection($this);
355 3
            });
356
        }
357
        
358 4
        return $command->getPromise();
359
    }
360
    
361
    /**
362
     * Prepares a query. Resolves with a `StatementInterface` instance.
363
     * When the command is done, the driver must check itself back into the client.
364
     * @param \Plasma\ClientInterface  $client
365
     * @param string                   $query
366
     * @return \React\Promise\PromiseInterface
367
     * @throws \Plasma\Exception
368
     * @see \Plasma\StatementInterface
369
     */
370 2
    function prepare(\Plasma\ClientInterface $client, string $query): \React\Promise\PromiseInterface {
371 2
        if($this->goingAway) {
372
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
373
        }
374
        
375 2
        $command = new \Plasma\Drivers\MySQL\Commands\StatementPrepareCommand($client, $this, $query);
376 2
        $this->executeCommand($command);
377
        
378 2
        return $command->getPromise();
379
    }
380
    
381
    /**
382
     * Prepares and executes a query. Resolves with a `QueryResultInterface` instance.
383
     * This is equivalent to prepare -> execute -> close.
384
     * If you need to execute a query multiple times, prepare the query manually for performance reasons.
385
     * @param \Plasma\ClientInterface  $client
386
     * @param string                   $query
387
     * @param array                    $params
388
     * @return \React\Promise\PromiseInterface
389
     * @throws \Plasma\Exception
390
     * @see \Plasma\StatementInterface
391
     */
392 1
    function execute(\Plasma\ClientInterface $client, string $query, array $params = array()): \React\Promise\PromiseInterface {
393 1
        if($this->goingAway) {
394
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
395
        }
396
        
397
        return $this->prepare($client, $query)->then(function (\Plasma\StatementInterface $statement) use ($params) {
398
            return $statement->execute($params)->then(function (\Plasma\QueryResultInterface $result) use (&$statement) {
399 1
                if($result instanceof \Plasma\StreamQueryResultInterface) {
400
                    $statement->close(null, function (\Throwable $error) {
401
                        $this->emit('error', array($error));
402 1
                    });
403
                    
404 1
                    return $result;
405
                }
406
                
407
                return $statement->close()->then(function () use ($result) {
408
                    return $result;
409
                });
410
            }, function (\Throwable $error) use (&$statement) {
411
                return $statement->close()->then(function () use ($error) {
412
                    throw $error;
413
                });
414 1
            });
415 1
        });
416
    }
417
    
418
    /**
419
     * Quotes the string for use in the query.
420
     * @param string  $str
421
     * @return string
422
     * @throws \LogicException  Thrown if the driver does not support quoting.
423
     * @throws \Plasma\Exception
424
     */
425
    function quote(string $str): string {
426
        throw new \LogicException('Not implemented yet'); // TODO
427
    }
428
    
429
    /**
430
     * Begins a transaction. Resolves with a `TransactionInterface` instance.
431
     *
432
     * Checks out a connection until the transaction gets committed or rolled back.
433
     * It must be noted that the user is responsible for finishing the transaction. The client WILL NOT automatically
434
     * check the connection back into the pool, as long as the transaction is not finished.
435
     *
436
     * Some databases, including MySQL, automatically issue an implicit COMMIT when a database definition language (DDL)
437
     * statement such as DROP TABLE or CREATE TABLE is issued within a transaction.
438
     * The implicit COMMIT will prevent you from rolling back any other changes within the transaction boundary.
439
     * @param \Plasma\ClientInterface  $client
440
     * @param int                      $isolation  See the `TransactionInterface` constants.
441
     * @return \React\Promise\PromiseInterface
442
     * @throws \Plasma\Exception
443
     * @see \Plasma\TransactionInterface
444
     */
445 1
    function beginTransaction(\Plasma\ClientInterface $client, int $isolation = \Plasma\TransactionInterface::ISOLATION_COMMITTED): \React\Promise\PromiseInterface {
446 1
        if($this->goingAway) {
447
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
448
        }
449
        
450 1
        if($this->transaction) {
451
            throw new \Plasma\Exception('Driver is already in transaction');
452
        }
453
        
454
        switch ($isolation) {
455 1
            case \Plasma\TransactionInterface::ISOLATION_UNCOMMITTED:
456
                $query = 'SET SESSION TRANSACTION ISOLATION LEVEL READ UNCOMMITTED';
457
            break;
458 1
            case \Plasma\TransactionInterface::ISOLATION_COMMITTED:
459 1
                $query = 'SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED';
460 1
            break;
461
            case \Plasma\TransactionInterface::ISOLATION_REPEATABLE:
462
                $query = 'SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ';
463
            break;
464
            case \Plasma\TransactionInterface::ISOLATION_SERIALIZABLE:
465
                $query = 'SET SESSION TRANSACTION ISOLATION LEVEL SERIALIZABLE';
466
            break;
467
            default:
468
                throw new \Plasma\Exception('Invalid isolation level given');
469
            break;
470
        }
471
        
472 1
        $this->transaction = true;
473
        
474
        return $this->query($client, $query)->then(function () use (&$client) {
475 1
            return $this->query($client, 'START TRANSACTION');
476
        })->then(function () use (&$client, $isolation) {
477 1
            return (new \Plasma\Transaction($client, $this, $isolation));
478
        })->then(null, function (\Throwable $e) {
479
            $this->transaction = false;
480
            throw $e;
481 1
        });
482
    }
483
    
484
    /**
485
     * Informationally closes a transaction. This method is used by `Transaction` to inform the driver of the end of the transaction.
486
     * @return void
487
     */
488 1
    function endTransaction(): void {
489 1
        $this->transaction = false;
490 1
    }
491
    
492
    /**
493
     * Runs the given command.
494
     * Returns a Promise, which resolves with the `end` event argument (defaults to `null),
495
     * or rejects with the `Throwable` of the `error` event.
496
     * When the command is done, the driver must check itself back into the client.
497
     * @param \Plasma\ClientInterface   $client
498
     * @param \Plasma\CommandInterface  $command
499
     * @return \React\Promise\PromiseInterface
500
     */
501 4
    function runCommand(\Plasma\ClientInterface $client, \Plasma\CommandInterface $command) {
502 4
        if($this->goingAway) {
503
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
504
        }
505
        
506
        return (new \React\Promise\Promise(function (callable $resolve, callable $reject) use (&$client, &$command) {
507
            $command->once('end', function ($value = null) use (&$client, &$resolve) {
508 2
                if(!$this->transaction) {
509 2
                    $client->checkinConnection($this);
510
                }
511
                
512 2
                $resolve($value);
513 4
            });
514
            
515
            $command->once('error', function (\Throwable $error) use (&$client, &$reject) {
516 2
                if(!$this->transaction) {
517 2
                    $client->checkinConnection($this);
518
                }
519
                
520 2
                $reject($error);
521 4
            });
522
            
523 4
            $this->executeCommand($command);
524 4
        }));
525
    }
526
    
527
    /**
528
     * Executes a command.
529
     * @param \Plasma\CommandInterface  $command
530
     * @return void
531
     * @internal
532
     */
533 10
    function executeCommand(\Plasma\CommandInterface $command): void {
534 10
        $this->queue[] = $command;
535
        //\assert((\Plasma\Drivers\MySQL\Messages\MessageUtility::debug('Command '.get_class($command).' added to queue') || true));
536
        
537 10
        if($this->parser && $this->busy === static::STATE_IDLE) {
538
            //\assert((\Plasma\Drivers\MySQL\Messages\MessageUtility::debug('Command '.get_class($command).' invoked into parser') || true));
539 9
            $this->parser->invokeCommand($this->getNextCommand());
540
        }
541 10
    }
542
    
543
    /**
544
     * Get the handshake message, or null if none received yet.
545
     * @return \Plasma\Drivers\MySQL\Messages\HandshakeMessage|null
546
     */
547 6
    function getHandshake(): ?\Plasma\Drivers\MySQL\Messages\HandshakeMessage {
548 6
        if($this->parser) {
549 6
            return $this->parser->getHandshakeMessage();
550
        }
551
        
552
        return null;
553
    }
554
    
555
    /**
556
     * Get the next command, or null.
557
     * @return \Plasma\CommandInterface|null
558
     * @internal
559
     */
560 9
    function getNextCommand(): ?\Plasma\CommandInterface {
561 9
        if(\count($this->queue) === 0) {
562 5
            if($this->goingAway) {
563
                $this->goingAway->resolve();
564
            }
565
            
566 5
            return null;
567 9
        } elseif($this->busy === static::STATE_BUSY) {
568
            return null;
569
        }
570
        
571
        /** @var \Plasma\CommandInterface  $command */
572 9
        $command =  \array_shift($this->queue);
573
        
574
        //\assert((\Plasma\Drivers\MySQL\Messages\MessageUtility::debug('Unshifted command '.get_class($command)) || true));
575
        
576 9
        if($command->waitForCompletion()) {
577 9
            $this->busy = static::STATE_BUSY;
578
            
579
            $command->once('error', function () use (&$command) {
580
                //\assert((\Plasma\Drivers\MySQL\Messages\MessageUtility::debug('Command '.get_class($command).' errored') || true));
581 2
                $this->busy = static::STATE_IDLE;
582
                
583 2
                $this->endCommand();
584 9
            });
585
            
586
            $command->once('end', function () use (&$command) {
587
                //\assert((\Plasma\Drivers\MySQL\Messages\MessageUtility::debug('Command '.get_class($command).' ended') || true));
588 8
                $this->busy = static::STATE_IDLE;
589
                
590 8
                $this->endCommand();
591 9
            });
592
        } else {
593 2
            $this->endCommand();
594
        }
595
        
596 9
        return $command;
597
    }
598
    
599
    /**
600
     * Finishes up a command.
601
     * @return void
602
     */
603 9
    protected function endCommand() {
604
        $this->loop->futureTick(function () {
605 7
            if($this->goingAway && \count($this->queue) === 0) {
606 1
                return $this->goingAway->resolve();
607
            }
608
            
609 7
            $this->parser->invokeCommand($this->getNextCommand());
610 9
        });
611 9
    }
612
    
613
    /**
614
     * Starts the handshake process.
615
     * @param array                    $credentials
616
     * @param \React\Promise\Deferred  $deferred
617
     * @return void
618
     */
619 12
    protected function startHandshake(array $credentials, \React\Promise\Deferred $deferred) {
620
        $listener = function (\Plasma\Drivers\MySQL\Messages\MessageInterface $message) use ($credentials, &$deferred, &$listener) {
621 12
            if($message instanceof \Plasma\Drivers\MySQL\Messages\HandshakeMessage) {
622 12
                $this->parser->removeListener('message', $listener);
623
                
624 12
                $this->connectionState = static::CONNECTION_SETENV;
625 12
                $clientFlags = \Plasma\Drivers\MySQL\ProtocolParser::CLIENT_CAPABILITIES;
626
                
627 12
                \extract($credentials);
628
                
629 12
                if($db !== '') {
630 5
                    $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_CONNECT_WITH_DB;
631
                }
632
                
633
                // Check if we support auth plugins
634 12
                $plugins = \Plasma\Drivers\MySQL\DriverFactory::getAuthPlugins();
635 12
                $plugin = null;
636
                
637 12
                foreach($plugins as $key => $plug) {
638 12
                    if(\is_int($key) && ($message->capability & $key) !== 0) {
639 12
                        $plugin = $plug;
640 12
                        $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_PLUGIN_AUTH;
641 12
                        break;
642
                    } elseif($key === $message->authPluginName) {
643
                        $plugin = $plug;
644
                        $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_PLUGIN_AUTH;
645
                        break;
646
                    }
647
                }
648
                
649 12
                $remote = \parse_url($this->connection->getRemoteAddress())['host'];
650
                
651 12
                if($remote !== '127.0.0.1' || $this->options['tls.forceLocal']) {
652
                    if(($message->capability & \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_SSL) !== 0) { // If SSL supported, connect through SSL
653
                        $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_SSL;
654
                        
655
                        $ssl = new \Plasma\Drivers\MySQL\Commands\SSLRequestCommand($message, $clientFlags);
656
                        
657
                        $ssl->once('end', function () use ($credentials, $clientFlags, $plugin, &$deferred, &$message) {
658
                            $this->connectionState = static::CONNECTION_SSL_STARTUP;
659
                            
660
                            $this->enableTLS()->then(function () use ($credentials, $clientFlags, $plugin, &$deferred, &$message) {
661
                                $this->createHandshakeResponse($message, $credentials, $clientFlags, $plugin, $deferred);
662
                            }, function (\Throwable $error) use (&$deferred) {
663
                                $deferred->reject($$error);
664
                                $this->connection->close();
665
                            });
666
                        });
667
                        
668
                        return $this->parser->invokeCommand($ssl);
669
                    } elseif($this->options['tls.force'] || $this->options['tls.forceLocal']) {
670
                        $deferred->reject((new \Plasma\Exception('TLS is not supported by the server')));
671
                        $this->connection->close();
672
                        return;
673
                    }
674
                }
675
                
676 12
                $this->createHandshakeResponse($message, $credentials, $clientFlags, $plugin, $deferred);
677
            }
678 12
        };
679
        
680 12
        $this->parser->on('message', $listener);
681
        
682
        $this->parser->on('message', function (\Plasma\Drivers\MySQL\Messages\MessageInterface $message) {
683 12
            if($message instanceof \Plasma\Drivers\MySQL\Messages\OkResponseMessage) {
684 11
                $this->connectionState = static::CONNECTION_OK;
685
            }
686
            
687 12
            $this->emit('eventRelay', array('message', $message));
688 12
        });
689 12
    }
690
    
691
    /**
692
     * Enables TLS on the connection.
693
     * @return \React\Promise\PromiseInterface
694
     */
695
    protected function enableTLS(): \React\Promise\PromiseInterface {
696
        // Set required SSL/TLS context options
697
        foreach($this->options['tls.context'] as $name => $value) {
698
            \stream_context_set_option($this->connection->stream, 'ssl', $name, $value);
699
        }
700
        
701
        return $this->encryption->enable($this->connection)->then(null, function (\Throwable $error) {
702
            $this->connection->close();
703
            throw new \RuntimeException('Connection failed during TLS handshake: '.$error->getMessage(), $error->getCode());
704
        });
705
    }
706
    
707
    /**
708
     * Sends the auth command.
709
     * @param \Plasma\Drivers\MySQL\Messages\HandshakeMessage  $message
710
     * @param array                                            $credentials
711
     * @param int                                              $clientFlags
712
     * @param string|null                                      $plugin
713
     * @param \React\Promise\Deferred                          $deferred
714
     * @return void
715
     */
716 12
    protected function createHandshakeResponse(
717
        \Plasma\Drivers\MySQL\Messages\HandshakeMessage $message, array $credentials, int $clientFlags, ?string $plugin, \React\Promise\Deferred $deferred
718
    ) {
719 12
        \extract($credentials);
720
        
721 12
        $auth = new \Plasma\Drivers\MySQL\Commands\HandshakeResponseCommand($this->parser, $message, $clientFlags, $plugin, $user, $password, $db);
722
        
723
        $auth->once('end', function () use (&$deferred) {
724 11
            $deferred->resolve();
725 12
        });
726
        
727
        $auth->once('error', function (\Throwable $error) use (&$deferred) {
728 1
            $deferred->reject($error);
729 1
            $this->connection->close();
730 12
        });
731
        
732 12
        if($plugin) {
733
            $listener = function (\Plasma\Drivers\MySQL\Messages\MessageInterface $message) use ($password, &$deferred, &$listener) {
734
                /** @var \Plasma\Drivers\MySQL\AuthPlugins\AuthPluginInterface|null  $plugin */
735 12
                static $plugin;
736
                
737 12
                if($message instanceof \Plasma\Drivers\MySQL\Messages\AuthSwitchRequestMessage) {
738
                    $name = $message->authPluginName;
739
                    
740
                    if($name !== null) {
741
                        $plugins = \Plasma\Drivers\MySQL\DriverFactory::getAuthPlugins();
742
                        foreach($plugins as $key => $plug) {
743
                            if($key === $name) {
744
                                $plugin = new $plug($this->parser, $this->parser->getHandshakeMessage());
745
                                
746
                                $command = new \Plasma\Drivers\MySQL\Commands\AuthSwitchResponseCommand($message, $plugin, $password);
747
                                return $this->parser->invokeCommand($command);
748
                            }
749
                        }
750
                    }
751
                    
752
                    $deferred->reject((new \Plasma\Exception('Requested authentication method '.($name ? '"'.$name.'" ' : '').'is not supported')));
753 12
                } elseif($message instanceof \Plasma\Drivers\MySQL\Messages\AuthMoreDataMessage) {
754
                    if($plugin === null) {
755
                        $deferred->reject((new \Plasma\Exception('No auth plugin is in use, but we received auth more data packet')));
756
                        return $this->connection->close();
757
                    }
758
                    
759
                    try {
760
                        $command = $plugin->receiveMoreData($message);
761
                        return $this->parser->invokeCommand($command);
762
                    } catch (\Plasma\Exception $e) {
763
                        $deferred->reject($e);
764
                        $this->connection->close();
765
                    }
766 12
                } elseif($message instanceof \Plasma\Drivers\MySQL\Messages\OkResponseMessage) {
767 11
                    $this->parser->removeListener('message', $listener);
768
                }
769 12
            };
770
            
771 12
            $this->parser->on('message', $listener);
772
        }
773
        
774 12
        $this->parser->invokeCommand($auth);
775 12
        $this->connectionState = static::CONNECTION_AWAITING_RESPONSE;
776 12
    }
777
    
778
    /**
779
     * Validates the given options.
780
     * @param array  $options
781
     * @return void
782
     * @throws \InvalidArgumentException
783
     */
784 17
    protected function validateOptions(array $options) {
785 17
        $validator = \CharlotteDunois\Validation\Validator::make($options, array(
786 17
            'connector' => 'class:\React\Socket\ConnectorInterface=object',
787
            'tls.context' => 'array',
788
            'tls.force' => 'boolean',
789
            'tls.forceLocal' => 'boolean'
790
        ));
791
        
792 17
        $validator->throw(\InvalidArgumentException::class);
793 17
    }
794
}
795