GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Failed Conditions
Push — master ( f2da15...888a68 )
by Charlotte
05:28
created

Driver::getNextCommand()   B

Complexity

Conditions 8
Paths 5

Size

Total Lines 37
Code Lines 21

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 72

Importance

Changes 0
Metric Value
cc 8
eloc 21
nc 5
nop 0
dl 0
loc 37
ccs 0
cts 21
cp 0
crap 72
rs 8.4444
c 0
b 0
f 0
1
<?php
2
/**
3
 * Plasma Driver MySQL component
4
 * Copyright 2018 PlasmaPHP, All Rights Reserved
5
 *
6
 * Website: https://github.com/PlasmaPHP
7
 * License: https://github.com/PlasmaPHP/driver-mysql/blob/master/LICENSE
8
*/
9
10
namespace Plasma\Drivers\MySQL;
11
12
/**
13
 * The MySQL Driver.
14
 * @internal
15
 */
16
class Driver implements \Plasma\DriverInterface {
17
    use \Evenement\EventEmitterTrait;
18
    
19
    /**
20
     * @var \React\EventLoop\LoopInterface
21
     */
22
    protected $loop;
23
    
24
    /**
25
     * @var array
26
     */
27
    protected $options = array(
28
        'tls.context' => array(),
29
        'tls.force' => true,
30
        'tls.forceLocal' => false
31
    );
32
    
33
    /**
34
     * @var \React\Socket\ConnectorInterface
35
     */
36
    protected $connector;
37
    
38
    /**
39
     * Internal class is intentional used, as there's no other way currently.
40
     * @var \React\Socket\StreamEncryption
41
     * @see https://github.com/reactphp/socket/issues/180
42
     */
43
    protected $encryption;
44
    
45
    /**
46
     * @var \React\Socket\Connection
47
     */
48
    protected $connection;
49
    
50
    /**
51
     * @var int
52
     */
53
    protected $connectionState = \Plasma\DriverInterface::CONNECTION_CLOSED;
54
    
55
    /**
56
     * @var \Plasma\Drivers\MySQL\ProtocolParser
57
     */
58
    protected $parser;
59
    
60
    /**
61
     * @var array
62
     */
63
    protected $queue;
64
    
65
    /**
66
     * @var int
67
     */
68
    protected $busy = \Plasma\DriverInterface::STATE_IDLE;
69
    
70
    /**
71
     * @var bool
72
     */
73
    protected $transaction = false;
74
    
75
    /**
76
     * @var \React\Promise\Deferred
77
     */
78
    protected $goingAway;
79
    
80
    /**
0 ignored issues
show
Coding Style introduced by
Parameter $loop should have a doc-comment as per coding-style.
Loading history...
Coding Style introduced by
Parameter $options should have a doc-comment as per coding-style.
Loading history...
81
     * Constructor.
82
     */
83 18
    function __construct(\React\EventLoop\LoopInterface $loop, array $options) {
84 18
        $this->validateOptions($options);
85
        
86 18
        $this->loop = $loop;
87 18
        $this->options = \array_merge($this->options, $options);
88
        
89 18
        $this->connector = ($options['connector'] ?? (new \React\Socket\Connector($loop)));
90 18
        $this->encryption = new \React\Socket\StreamEncryption($this->loop, false);
91 18
        $this->queue = array();
92 18
    }
93
    
94
    /**
95
     * Returns the event loop.
96
     * @var \React\EventLoop\LoopInterface
97
     */
0 ignored issues
show
Coding Style introduced by
Missing @return tag in function comment
Loading history...
98 1
    function getLoop(): \React\EventLoop\LoopInterface {
99 1
        return $this->loop;
100
    }
101
    
102
    /**
103
     * Retrieves the current connection state.
104
     * @return int
105
     */
106 8
    function getConnectionState(): int {
107 8
        return $this->connectionState;
108
    }
109
    
110
    /**
111
     * Retrieves the current busy state.
112
     * @return int
113
     */
114 1
    function getBusyState(): int {
115 1
        return $this->busy;
116
    }
117
    
118
    /**
119
     * Get the length of the driver backlog queue.
120
     * @return int
121
     */
122 1
    function getBacklogLength(): int {
123 1
        return \count($this->queue);
124
    }
125
    
126
    /**
0 ignored issues
show
Coding Style introduced by
Parameter $uri should have a doc-comment as per coding-style.
Loading history...
127
     * Connects to the given URI.
128
     * @return \React\Promise\PromiseInterface
129
     */
130 13
    function connect(string $uri): \React\Promise\PromiseInterface {
131 13
        if($this->goingAway || $this->connectionState === \Plasma\DriverInterface::CONNECTION_UNUSABLE) {
132
            return \React\Promise\resolve((new \Plasma\Exception('Connection is going away or unusable')));
133
        }
134
        
135 13
        $uri = 'mysql://'.\ltrim($uri, 'mysql://');
136
        
137 13
        $parts = \parse_url($uri);
138 13
        if(!isset($parts['host'])) {
139
            return \React\Promise\reject((new \InvalidArgumentException('Invalid connect uri given')));
140
        }
141
        
142 13
        $host = $parts['host'].':'.($parts['port'] ?? 3306);
143 13
        $this->connectionState = static::CONNECTION_STARTED;
144 13
        $resolved = false;
145
        
146
        return $this->connector->connect($host)->then(function (\React\Socket\ConnectionInterface $connection) use ($parts, &$resolved) {
147
            // See description of property encryption
148 13
            if(!($connection instanceof \React\Socket\Connection)) {
149
                throw new \LogicException('Custom connection class is NOT supported yet (encryption limitation)');
150
            }
151
            
152 13
            $this->busy = static::STATE_BUSY;
153 13
            $this->connectionState = static::CONNECTION_MADE;
154 13
            $this->connection = $connection;
155
            
156
            $this->connection->on('error', function (\Throwable $error) {
157
                $this->emit('error', array($error));
158 13
            });
159
            
160
            $this->connection->on('close', function () {
161 2
                $this->connection = null;
162 2
                $this->connectionState = static::CONNECTION_UNUSABLE;
163
                
164 2
                $this->emit('close');
165 13
            });
166
            
167 13
            $deferred = new \React\Promise\Deferred();
168 13
            $this->parser = new \Plasma\Drivers\MySQL\ProtocolParser($this, $this->connection);
169
            
170
            $this->parser->on('error', function (\Throwable $error) use (&$deferred, &$resolved) {
171
                if($resolved) {
172
                    $this->emit('error', array($error));
173
                } else {
174
                    $deferred->reject($error);
175
                }
176 13
            });
177
            
178 13
            $user = ($parts['user'] ?? 'root');
179 13
            $password = ($parts['pass'] ?? '');
180 13
            $db = (!empty($parts['path']) ? \ltrim($parts['path'], '/') : '');
181
            
182 13
            $credentials = \compact('user', 'password', 'db');
183
            
184 13
            $this->startHandshake($credentials, $deferred);
185
            return $deferred->promise()->then(function () use (&$resolved) {
186
                $this->busy = static::STATE_IDLE;
187
                $resolved = true;
188
                
189
                if(\count($this->queue) > 0) {
190
                    $this->parser->invokeCommand($this->getNextCommand());
191
                }
192 13
            });
193 13
        });
194
    }
195
    
196
    /**
197
     * Pauses the underlying stream I/O consumption.
198
     * If consumption is already paused, this will do nothing.
199
     * @return bool  Whether the operation was successful.
200
     */
201
    function pauseStreamConsumption(): bool {
202
        if($this->goingAway) {
203
            return false;
204
        }
205
        
206
        $this->connection->pause();
207
        return true;
208
    }
209
    
210
    /**
211
     * Resumes the underlying stream I/O consumption.
212
     * If consumption is not paused, this will do nothing.
213
     * @return bool  Whether the operation was successful.
214
     */
215
    function resumeStreamConsumption(): bool {
216
        if($this->goingAway) {
217
            return false;
218
        }
219
        
220
        $this->connection->resume();
221
        return true;
222
    }
223
    
224
    /**
225
     * Closes all connections gracefully after processing all outstanding requests.
226
     * @return \React\Promise\PromiseInterface
227
     */
228
    function close(): \React\Promise\PromiseInterface {
229
        if($this->goingAway) {
230
            return $this->goingAway->promise();
231
        }
232
        
233
        $state = $this->connectionState;
234
        $this->connectionState = \Plasma\DriverInterface::CONNECTION_UNUSABLE;
235
        
236
        $this->goingAway = new \React\Promise\Deferred();
237
        
238
        if(\count($this->queue) === 0) {
239
            $this->goingAway->resolve();
240
        }
241
        
242
        return $this->goingAway->promise()->then(function () use ($state) {
243
            if($state !== static::CONNECTION_OK) {
244
                return;
245
            }
246
            
247
            $deferred = new \React\Promise\Deferred();
248
            
249
            $quit = new \Plasma\Drivers\MySQL\Commands\QuitCommand();
250
            
251
            $this->connection->once('close', function () use (&$deferred) {
252
                $deferred->resolve();
253
            });
254
            
255
            $quit->once('end', function () {
256
                $this->connection->close();
257
            });
258
            
259
            $this->parser->invokeCommand($quit);
260
            
261
            return $deferred->promise();
262
        });
263
    }
264
    
265
    /**
266
     * Forcefully closes the connection, without waiting for any outstanding requests. This will reject all outstanding requests.
267
     * @return void
268
     */
269
    function quit(): void {
270
        if($this->goingAway) {
271
            return;
272
        }
273
        
274
        $state = $this->connectionState;
275
        $this->connectionState = \Plasma\DriverInterface::CONNECTION_UNUSABLE;
276
        
277
        $this->goingAway = new \React\Promise\Deferred();
278
        $this->goingAway->resolve();
279
        
280
        /** @var \Plasma\Drivers\MySQL\Commands\CommandInterface  $command */
281
        while($command = \array_shift($this->queue)) {
282
            $command->emit('error', array((new \Plasma\Exception('Connection is going away'))));
283
        }
284
        
285
        if($state === static::CONNECTION_OK) {
286
            $quit = new \Plasma\Drivers\MySQL\Commands\QuitCommand();
287
            $this->parser->invokeCommand($quit);
288
            
289
            $this->connection->close();
290
        }
291
    }
292
    
293
    /**
294
     * Whether this driver is currently in a transaction.
295
     * @return bool
296
     */
297
    function isInTransaction(): bool {
298
        return $this->transaction;
299
    }
300
    
301
    /**
302
     * Executes a plain query. Resolves with a `QueryResultInterface` instance.
303
     * When the command is done, the driver must check itself back into the client.
304
     * @param \Plasma\ClientInterface  $client
305
     * @param string                   $query
306
     * @return \React\Promise\PromiseInterface
307
     * @throws \Plasma\Exception
308
     * @see \Plasma\QueryResultInterface
309
     */
310
    function query(\Plasma\ClientInterface $client, string $query): \React\Promise\PromiseInterface {
311
        if($this->goingAway) {
312
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
313
        }
314
        
315
        $command = new \Plasma\Drivers\MySQL\Commands\QueryCommand($this, $query);
316
        $this->executeCommand($command);
317
        
318
        if(!$this->transaction) {
319
            $command->once('end', function () use (&$client) {
320
                $client->checkinConnection($this);
321
            });
322
        }
323
        
324
        return $command->getPromise();
325
    }
326
    
327
    /**
328
     * Prepares a query. Resolves with a `StatementInterface` instance.
329
     * When the command is done, the driver must check itself back into the client.
330
     * @param \Plasma\ClientInterface  $client
331
     * @param string                   $query
332
     * @return \React\Promise\PromiseInterface
333
     * @throws \Plasma\Exception
334
     * @see \Plasma\StatementInterface
335
     */
336
    function prepare(\Plasma\ClientInterface $client, string $query): \React\Promise\PromiseInterface {
337
        if($this->goingAway) {
338
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
339
        }
340
        
341
        $command = new \Plasma\Drivers\MySQL\Commands\PrepareCommand($client, $this, $query);
342
        $this->executeCommand($command);
343
        
344
        return $command->getPromise();
345
    }
346
    
347
    /**
348
     * Prepares and executes a query. Resolves with a `QueryResultInterface` instance.
349
     * This is equivalent to prepare -> execute -> close.
350
     * If you need to execute a query multiple times, prepare the query manually for performance reasons.
351
     * @param \Plasma\ClientInterface  $client
352
     * @param string                   $query
353
     * @param array                    $params
354
     * @return \React\Promise\PromiseInterface
355
     * @throws \Plasma\Exception
356
     * @see \Plasma\StatementInterface
357
     */
358
    function execute(\Plasma\ClientInterface $client, string $query, array $params = array()): \React\Promise\PromiseInterface {
359
        if($this->goingAway) {
360
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
361
        }
362
        
363
        return $this->prepare($client, $query)->then(function (\Plasma\StatementInterface $statement) use ($params) {
364
            return $statement->execute($params)->then(function (\Plasma\QueryResultInterface $result) use (&$statement) {
365
                return $statement->close()->then(function () use ($result) {
366
                    return $result;
367
                });
368
            }, function (\Throwable $error) use (&$statement) {
369
                return $statement->close()->then(function () use ($error) {
370
                    throw $error;
371
                });
372
            });
373
        })->always(function () use (&$client) {
0 ignored issues
show
Bug introduced by
The method always() does not exist on React\Promise\PromiseInterface. It seems like you code against a sub-type of said class. However, the method does not exist in React\Promise\SimpleFulfilledTestPromise or React\Promise\SimpleRejectedTestPromise or React\Promise\CancellablePromiseInterface. Are you sure you never get one of those? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

373
        })->/** @scrutinizer ignore-call */ always(function () use (&$client) {
Loading history...
374
            if(!$this->transaction) {
375
                $client->checkinConnection($this);
376
            }
377
        });
378
    }
379
    
380
    /**
381
     * Quotes the string for use in the query.
382
     * @param string  $str
383
     * @return string
384
     * @throws \LogicException  Thrown if the driver does not support quoting.
385
     * @throws \Plasma\Exception
386
     */
387
    function quote(string $str): string { // TODO
0 ignored issues
show
Coding Style introduced by
Opening brace must be the last content on the line
Loading history...
388
        throw new \LogicException('Not implemented yet');
389
    }
390
    
391
    /**
392
     * Begins a transaction. Resolves with a `TransactionInterface` instance.
393
     *
394
     * Checks out a connection until the transaction gets committed or rolled back.
395
     * It must be noted that the user is responsible for finishing the transaction. The client WILL NOT automatically
396
     * check the connection back into the pool, as long as the transaction is not finished.
397
     *
398
     * Some databases, including MySQL, automatically issue an implicit COMMIT when a database definition language (DDL)
399
     * statement such as DROP TABLE or CREATE TABLE is issued within a transaction.
400
     * The implicit COMMIT will prevent you from rolling back any other changes within the transaction boundary.
401
     * @param \Plasma\ClientInterface  $client
402
     * @param int                      $isolation  See the `TransactionInterface` constants.
403
     * @return \React\Promise\PromiseInterface
404
     * @throws \Plasma\Exception
405
     * @see \Plasma\TransactionInterface
406
     */
407
    function beginTransaction(\Plasma\ClientInterface $client, int $isolation = \Plasma\TransactionInterface::ISOLATION_COMMITTED): \React\Promise\PromiseInterface {
408
        if($this->goingAway) {
409
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
410
        }
411
        
412
        if($this->transaction) {
413
            throw new \Plasma\Exception('Driver is already in transaction');
414
        }
415
        
416
        switch ($isolation) {
417
            case \Plasma\TransactionInterface::ISOLATION_UNCOMMITTED:
1 ignored issue
show
Coding Style introduced by
Line indented incorrectly; expected 8 spaces, found 12
Loading history...
418
                $query = 'SET SESSION TRANSACTION ISOLATION LEVEL READ UNCOMMITTED';
419
            break;
1 ignored issue
show
Coding Style introduced by
Case breaking statement indented incorrectly; expected 16 spaces, found 12
Loading history...
420
            case \Plasma\TransactionInterface::ISOLATION_COMMITTED:
1 ignored issue
show
Coding Style introduced by
Line indented incorrectly; expected 8 spaces, found 12
Loading history...
421
                $query = 'SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED';
422
            break;
1 ignored issue
show
Coding Style introduced by
Case breaking statement indented incorrectly; expected 16 spaces, found 12
Loading history...
423
            case \Plasma\TransactionInterface::ISOLATION_REPEATABLE:
1 ignored issue
show
Coding Style introduced by
Line indented incorrectly; expected 8 spaces, found 12
Loading history...
424
                $query = 'SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ';
425
            break;
1 ignored issue
show
Coding Style introduced by
Case breaking statement indented incorrectly; expected 16 spaces, found 12
Loading history...
426
            case \Plasma\TransactionInterface::ISOLATION_SERIALIZABLE:
1 ignored issue
show
Coding Style introduced by
Line indented incorrectly; expected 8 spaces, found 12
Loading history...
427
                $query = 'SET SESSION TRANSACTION ISOLATION LEVEL SERIALIZABLE';
428
            break;
1 ignored issue
show
Coding Style introduced by
Case breaking statement indented incorrectly; expected 16 spaces, found 12
Loading history...
429
            default:
1 ignored issue
show
Coding Style introduced by
Line indented incorrectly; expected 8 spaces, found 12
Loading history...
430
                throw new \Plasma\Exception('Invalid isolation level given');
431
            break;
432
        }
433
        
434
        $this->transaction = true;
435
        
436
        return $this->query($client, $query)->then(function () use (&$client) {
437
            return $this->query($client, 'START TRANSACTION');
438
        })->then(function () use (&$client, $isolation) {
439
            return (new \Plasma\Transaction($client, $this, $isolation));
440
        })->otherwise(function (\Throwable $e) {
0 ignored issues
show
Bug introduced by
The method otherwise() does not exist on React\Promise\PromiseInterface. It seems like you code against a sub-type of said class. However, the method does not exist in React\Promise\SimpleFulfilledTestPromise or React\Promise\SimpleRejectedTestPromise or React\Promise\CancellablePromiseInterface. Are you sure you never get one of those? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

440
        })->/** @scrutinizer ignore-call */ otherwise(function (\Throwable $e) {
Loading history...
441
            $this->transaction = false;
442
            throw $e;
443
        });
444
    }
445
    
446
    /**
447
     * Informationally closes a transaction. This method is used by `Transaction` to inform the driver of the end of the transaction.
448
     * @return void
449
     */
450
    function endTransaction(): void {
451
        $this->transaction = false;
452
    }
453
    
454
    /**
455
     * Runs the given command.
456
     * Returns a Promise, which resolves with the `end` event argument (defaults to `null),
457
     * or rejects with the `Throwable` of the `error` event.
458
     * When the command is done, the driver must check itself back into the client.
459
     * @param \Plasma\ClientInterface   $client
460
     * @param \Plasma\CommandInterface  $command
461
     * @return \React\Promise\PromiseInterface
462
     */
463 1
    function runCommand(\Plasma\ClientInterface $client, \Plasma\CommandInterface $command) {
464 1
        if($this->goingAway) {
465
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
466
        }
467
        
468
        return (new \React\Promise\Promise(function (callable $resolve, callable $reject) use (&$client, &$command) {
469
            $command->once('end', function ($value = null) use (&$client, &$resolve) {
470
                if(!$this->transaction) {
471
                    $client->checkinConnection($this);
472
                }
473
                
474
                $resolve($value);
475 1
            });
476
            
477
            $command->once('error', function (\Throwable $error) use (&$client, &$reject) {
478
                if(!$this->transaction) {
479
                    $client->checkinConnection($this);
480
                }
481
                
482
                $reject($error);
483 1
            });
484
            
485 1
            $this->executeCommand($command);
486 1
        }));
487
    }
488
    
489
    /**
490
     * Executes a command.
491
     * @param \Plasma\CommandInterface  $command
492
     * @return void
493
     * @internal
494
     */
495 1
    function executeCommand(\Plasma\CommandInterface $command): void {
496 1
        $this->queue[] = $command;
497 1
        \assert((\Plasma\Drivers\MySQL\Messages\MessageUtility::debug('Command '.get_class($command).' added to queue') || true));
498
        
499 1
        if($this->parser && $this->busy === static::STATE_IDLE) {
500
            \assert((\Plasma\Drivers\MySQL\Messages\MessageUtility::debug('Command '.get_class($command).' invoked into parser') || true));
501
            $this->parser->invokeCommand($this->getNextCommand());
502
        }
503 1
    }
504
    
505
    /**
506
     * Get the handshake message, or null if none received yet.
507
     * @return \Plasma\Drivers\MySQL\Messages\HandshakeMessage|null
508
     */
509
    function getHandshake(): ?\Plasma\Drivers\MySQL\Messages\HandshakeMessage {
510
        if($this->parser) {
511
            return $this->parser->getHandshakeMessage();
512
        }
513
        
514
        return null;
515
    }
516
    
517
    /**
518
     * Get the next command, or null.
519
     * @return \Plasma\CommandInterface|null
520
     * @internal
521
     */
522
    function getNextCommand(): ?\Plasma\CommandInterface {
523
        if(\count($this->queue) === 0) {
524
            if($this->goingAway) {
525
                $this->goingAway->resolve();
526
            }
527
            
528
            return null;
529
        } elseif($this->busy === static::STATE_BUSY) {
530
            return null;
531
        }
532
        
533
        /** @var \Plasma\CommandInterface  $command */
534
        $command =  \array_shift($this->queue);
535
        
536
        \assert((\Plasma\Drivers\MySQL\Messages\MessageUtility::debug('Unshifted command '.get_class($command)) || true));
537
        
538
        if($command->waitForCompletion()) {
539
            $this->busy = static::STATE_BUSY;
540
            
541
            $command->once('error', function () use (&$command) {
542
                \assert((\Plasma\Drivers\MySQL\Messages\MessageUtility::debug('Command '.get_class($command).' errored') || true));
543
                $this->busy = static::STATE_IDLE;
544
                
545
                $this->endCommand();
546
            });
547
            
548
            $command->once('end', function () use (&$command) {
549
                \assert((\Plasma\Drivers\MySQL\Messages\MessageUtility::debug('Command '.get_class($command).' ended') || true));
550
                $this->busy = static::STATE_IDLE;
551
                
552
                $this->endCommand();
553
            });
554
        } else {
555
            $this->endCommand();
556
        }
557
        
558
        return $command;
559
    }
560
    
561
    /**
562
     * Finishes up a command.
563
     * @return void
564
     */
565
    protected function endCommand() {
566
        $this->loop->futureTick(function () {
567
            if($this->goingAway && \count($this->queue) === 0) {
568
                return $this->goingAway->resolve();
569
            }
570
            
571
            $this->parser->invokeCommand($this->getNextCommand());
572
        });
573
    }
574
    
575
    /**
576
     * Starts the handshake process.
577
     * @param array                    $credentials
578
     * @param \React\Promise\Deferred  $deferred
579
     * @return void
580
     */
581 13
    protected function startHandshake(array $credentials, \React\Promise\Deferred $deferred) {
582
        $listener = function (\Plasma\Drivers\MySQL\Messages\MessageInterface $message) use ($credentials, &$deferred, &$listener) {
583 13
            if($message instanceof \Plasma\Drivers\MySQL\Messages\HandshakeMessage) {
584 13
                $this->parser->removeListener('message', $listener);
585
                
586 13
                $this->connectionState = static::CONNECTION_SETENV;
587 13
                $clientFlags = \Plasma\Drivers\MySQL\ProtocolParser::CLIENT_CAPABILITIES;
588
                
589 13
                \extract($credentials);
590
                
591 13
                if($db !== '') {
592 1
                    $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_CONNECT_WITH_DB;
593
                }
594
                
595
                // Check if we support auth plugins
596 13
                $plugins = \Plasma\Drivers\MySQL\DriverFactory::getAuthPlugins();
597 13
                $plugin = null;
598
                
599 13
                foreach($plugins as $key => $plug) {
600 13
                    if(\is_int($key) && ($message->capability & $key) !== 0) {
601 13
                        $plugin = $plug;
602 13
                        $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_PLUGIN_AUTH;
603 13
                        break;
604
                    } elseif($key === $message->authPluginName) {
605
                        $plugin = $plug;
606
                        $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_PLUGIN_AUTH;
607
                        break;
608
                    }
609
                }
610
                
611 13
                $remote = \parse_url($this->connection->getRemoteAddress())['host'];
612
                
613 13
                if($remote !== '127.0.0.1' || $this->options['tls.forceLocal']) {
614 2
                    if(($message->capability & \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_SSL) !== 0) { // If SSL supported, connect through SSL
615 2
                        $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_SSL;
616
                        
617 2
                        $ssl = new \Plasma\Drivers\MySQL\Commands\SSLRequestCommand($message, $clientFlags);
618
                        
619
                        $ssl->once('end', function () use ($credentials, $clientFlags, $plugin, &$deferred, &$message) {
620 2
                            $this->connectionState = static::CONNECTION_SSL_STARTUP;
621
                            
622
                            $this->enableTLS()->then(function () use ($credentials, $clientFlags, $plugin, &$deferred, &$message) {
623
                                $this->createHandshakeResponse($message, $credentials, $clientFlags, $plugin, $deferred);
624
                            }, function (\Throwable $error) use (&$deferred) {
625 1
                                $deferred->reject($$error);
626
                                $this->connection->close();
627 2
                            });
628 2
                        });
629
                        
630 2
                        return $this->parser->invokeCommand($ssl);
631
                    } elseif($this->options['tls.force'] || $this->options['tls.forceLocal']) {
632
                        $deferred->reject((new \Plasma\Exception('TLS is not supported by the server')));
633
                        $this->connection->close();
634
                        return;
635
                    }
636
                }
637
                
638 11
                $this->createHandshakeResponse($message, $credentials, $clientFlags, $plugin, $deferred);
639
            }
640 13
        };
641
        
642 13
        $this->parser->on('message', $listener);
643
        
644
        $this->parser->on('message', function (\Plasma\Drivers\MySQL\Messages\MessageInterface $message) {
645 13
            if($message instanceof \Plasma\Drivers\MySQL\Messages\OkResponseMessage) {
646
                $this->connectionState = static::CONNECTION_OK;
647
            }
648
            
649 13
            $this->emit('eventRelay', array('message', $message));
650 13
        });
651 13
    }
652
    
653
    /**
654
     * Enables TLS on the connection.
655
     * @return \React\Promise\PromiseInterface
656
     */
657 2
    protected function enableTLS(): \React\Promise\PromiseInterface {
658
        // Set required SSL/TLS context options
659 2
        foreach($this->options['tls.context'] as $name => $value) {
660
            \stream_context_set_option($this->connection->stream, 'ssl', $name, $value);
661
        }
662
        
663
        return $this->encryption->enable($this->connection)->otherwise(function (\Throwable $error) {
664 1
            $this->connection->close();
665 1
            throw new \RuntimeException('Connection failed during TLS handshake: '.$error->getMessage(), $error->getCode());
666 2
        });
667
    }
668
    
669
    /**
670
     * Sends the auth command.
671
     * @param \Plasma\Drivers\MySQL\Messages\HandshakeMessage  $message
672
     * @param array                                            $credentials
673
     * @param int                                              $clientFlags
674
     * @param string|null                                      $plugin
675
     * @param \React\Promise\Deferred                          $deferred
676
     * @return void
677
     */
678 11
    protected function createHandshakeResponse(
679
        \Plasma\Drivers\MySQL\Messages\HandshakeMessage $message, array $credentials, int $clientFlags, ?string $plugin, \React\Promise\Deferred $deferred
680
    ) {
681 11
        \extract($credentials);
682
        
683 11
        $auth = new \Plasma\Drivers\MySQL\Commands\HandshakeResponseCommand($this->parser, $message, $clientFlags, $plugin, $user, $password, $db);
684
        
685
        $auth->once('end', function () use (&$deferred) {
686
            $deferred->resolve();
687 11
        });
688
        
689
        $auth->once('error', function (\Throwable $error) use (&$deferred) {
690 1
            $deferred->reject($error);
691 1
            $this->connection->close();
692 11
        });
693
        
694 11
        if($plugin) {
695
            $listener = function (\Plasma\Drivers\MySQL\Messages\MessageInterface $message) use ($password, &$deferred, &$listener) {
696
                /** @var \Plasma\Drivers\MySQL\AuthPlugins\AuthPluginInterface|null  $plugin */
697 1
                static $plugin;
698
                
699 1
                if($message instanceof \Plasma\Drivers\MySQL\Messages\AuthSwitchRequestMessage) {
700
                    $name = $message->authPluginName;
701
                    
702
                    if($name !== null) {
703
                        foreach($plugins as $key => $plug) {
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable $plugins does not exist. Did you maybe mean $plugin?
Loading history...
704
                            if($key === $name) {
705
                                $plugin = new $plug($this->parser, $message);
706
                                
707
                                $command = new \Plasma\Drivers\MySQL\Commands\AuthSwitchResponseCommand($message, $plugin, $password);
708
                                return $this->parser->invokeCommand($command);
709
                            }
710
                        }
711
                    }
712
                    
713
                    $deferred->reject((new \Plasma\Exception('Requested authentication method '.($name ? '"'.$name.'" ' : '').'is not supported')));
714 1
                } elseif($message instanceof \Plasma\Drivers\MySQL\Messages\AuthMoreDataMessage) {
715
                    if($plugin === null) {
716
                        $deferred->reject((new \Plasma\Exception('No auth plugin is in use, but we received auth more data packet')));
717
                        return $this->connection->close();
718
                    }
719
                    
720
                    try {
721
                        $command = $plugin->receiveMoreData($message);
722
                        return $this->parser->invokeCommand($command);
723
                    } catch (\Plasma\Exception $e) {
724
                        $deferred->reject($e);
725
                        $this->connection->close();
726
                    }
727 1
                } elseif($message instanceof \Plasma\Drivers\MySQL\Messages\OkResponseMessage) {
728
                    $this->parser->removeListener('message', $listener);
729
                }
730 11
            };
731
            
732 11
            $this->parser->on('message', $listener);
733
        }
734
        
735 11
        $this->parser->invokeCommand($auth);
736 11
        $this->connectionState = static::CONNECTION_AWAITING_RESPONSE;
737 11
    }
738
    
739
    /**
740
     * Validates the given options.
741
     * @param array  $options
742
     * @return void
743
     * @throws \InvalidArgumentException
744
     */
745 18
    protected function validateOptions(array $options) {
746 18
        $validator = \CharlotteDunois\Validation\Validator::make($options, array(
747 18
            'connector' => 'class:\React\Socket\ConnectorInterface=object',
748
            'tls.context' => 'array',
749
            'tls.force' => 'boolean',
750
            'tls.forceLocal' => 'boolean'
751
        ));
752
        
753 18
        $validator->throw(\InvalidArgumentException::class);
754 18
    }
755
}
756