GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Test Failed
Push — master ( 9c8b5e...1077ea )
by Charlotte
03:13
created

Driver::runCommand()   A

Complexity

Conditions 4
Paths 2

Size

Total Lines 23
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 11
CRAP Score 4.0092

Importance

Changes 0
Metric Value
cc 4
eloc 12
nc 2
nop 2
dl 0
loc 23
ccs 11
cts 12
cp 0.9167
crap 4.0092
rs 9.8666
c 0
b 0
f 0
1
<?php
2
/**
3
 * Plasma Driver MySQL component
4
 * Copyright 2018 PlasmaPHP, All Rights Reserved
5
 *
6
 * Website: https://github.com/PlasmaPHP
7
 * License: https://github.com/PlasmaPHP/driver-mysql/blob/master/LICENSE
8
*/
9
10
namespace Plasma\Drivers\MySQL;
11
12
/**
13
 * The MySQL Driver.
14
 * @internal
15
 */
16
class Driver implements \Plasma\DriverInterface {
17
    use \Evenement\EventEmitterTrait;
18
    
19
    /**
20
     * @var \React\EventLoop\LoopInterface
21
     */
22
    protected $loop;
23
    
24
    /**
25
     * @var array
26
     */
27
    protected $options = array(
28
        'tls.context' => array(),
29
        'tls.force' => true,
30
        'tls.forceLocal' => false
31
    );
32
    
33
    /**
34
     * @var \React\Socket\ConnectorInterface
35
     */
36
    protected $connector;
37
    
38
    /**
39
     * Internal class is intentional used, as there's no other way currently.
40
     * @var \React\Socket\StreamEncryption
41
     * @see https://github.com/reactphp/socket/issues/180
42
     */
43
    protected $encryption;
44
    
45
    /**
46
     * @var \React\Socket\Connection
47
     */
48
    protected $connection;
49
    
50
    /**
51
     * @var int
52
     */
53
    protected $connectionState = \Plasma\DriverInterface::CONNECTION_CLOSED;
54
    
55
    /**
56
     * @var \Plasma\Drivers\MySQL\ProtocolParser
57
     */
58
    protected $parser;
59
    
60
    /**
61
     * @var array
62
     */
63
    protected $queue;
64
    
65
    /**
66
     * @var int
67
     */
68
    protected $busy = \Plasma\DriverInterface::STATE_IDLE;
69
    
70
    /**
71
     * @var bool
72
     */
73
    protected $transaction = false;
74
    
75
    /**
76
     * @var \React\Promise\Deferred
77
     */
78
    protected $goingAway;
79
    
80
    /**
0 ignored issues
show
Coding Style introduced by
Parameter $loop should have a doc-comment as per coding-style.
Loading history...
Coding Style introduced by
Parameter $options should have a doc-comment as per coding-style.
Loading history...
81
     * Constructor.
82
     */
83 18
    function __construct(\React\EventLoop\LoopInterface $loop, array $options) {
84 18
        $this->validateOptions($options);
85
        
86 18
        $this->loop = $loop;
87 18
        $this->options = \array_merge($this->options, $options);
88
        
89 18
        $this->connector = ($options['connector'] ?? (new \React\Socket\Connector($loop)));
90 18
        $this->encryption = new \React\Socket\StreamEncryption($this->loop, false);
91 18
        $this->queue = array();
92 18
    }
93
    
94
    /**
95
     * Returns the event loop.
96
     * @var \React\EventLoop\LoopInterface
97
     */
0 ignored issues
show
Coding Style introduced by
Missing @return tag in function comment
Loading history...
98 3
    function getLoop(): \React\EventLoop\LoopInterface {
99 3
        return $this->loop;
100
    }
101
    
102
    /**
103
     * Retrieves the current connection state.
104
     * @return int
105
     */
106 8
    function getConnectionState(): int {
107 8
        return $this->connectionState;
108
    }
109
    
110
    /**
111
     * Retrieves the current busy state.
112
     * @return int
113
     */
114 1
    function getBusyState(): int {
115 1
        return $this->busy;
116
    }
117
    
118
    /**
119
     * Get the length of the driver backlog queue.
120
     * @return int
121
     */
122 1
    function getBacklogLength(): int {
123 1
        return \count($this->queue);
124
    }
125
    
126
    /**
0 ignored issues
show
Coding Style introduced by
Parameter $uri should have a doc-comment as per coding-style.
Loading history...
127
     * Connects to the given URI.
128
     * @return \React\Promise\PromiseInterface
129
     */
130 13
    function connect(string $uri): \React\Promise\PromiseInterface {
131 13
        if($this->goingAway || $this->connectionState === \Plasma\DriverInterface::CONNECTION_UNUSABLE) {
132
            return \React\Promise\resolve((new \Plasma\Exception('Connection is going away or unusable')));
133
        }
134
        
135 13
        $uri = 'mysql://'.\ltrim($uri, 'mysql://');
136
        
137 13
        $parts = \parse_url($uri);
138 13
        if(!isset($parts['host'])) {
139
            return \React\Promise\reject((new \InvalidArgumentException('Invalid connect uri given')));
140
        }
141
        
142 13
        $host = $parts['host'].':'.($parts['port'] ?? 3306);
143 13
        $this->connectionState = static::CONNECTION_STARTED;
144 13
        $resolved = false;
145
        
146
        return $this->connector->connect($host)->then(function (\React\Socket\ConnectionInterface $connection) use ($parts, &$resolved) {
147
            // See description of property encryption
148 13
            if(!($connection instanceof \React\Socket\Connection)) {
149
                throw new \LogicException('Custom connection class is NOT supported yet (encryption limitation)');
150
            }
151
            
152 13
            $this->busy = static::STATE_BUSY;
153 13
            $this->connectionState = static::CONNECTION_MADE;
154 13
            $this->connection = $connection;
155
            
156
            $this->connection->on('error', function (\Throwable $error) {
157
                $this->emit('error', array($error));
158 13
            });
159
            
160
            $this->connection->on('close', function () {
161 5
                $this->connection = null;
162 5
                $this->connectionState = static::CONNECTION_UNUSABLE;
163
                
164 5
                $this->emit('close');
165 13
            });
166
            
167 13
            $deferred = new \React\Promise\Deferred();
168 13
            $this->parser = new \Plasma\Drivers\MySQL\ProtocolParser($this, $this->connection);
169
            
170
            $this->parser->on('error', function (\Throwable $error) use (&$deferred, &$resolved) {
171
                if($resolved) {
172
                    $this->emit('error', array($error));
173
                } else {
174
                    $deferred->reject($error);
175
                }
176 13
            });
177
            
178 13
            $user = ($parts['user'] ?? 'root');
179 13
            $password = ($parts['pass'] ?? '');
180 13
            $db = (!empty($parts['path']) ? \ltrim($parts['path'], '/') : '');
181
            
182 13
            $credentials = \compact('user', 'password', 'db');
183
            
184 13
            $this->startHandshake($credentials, $deferred);
185
            return $deferred->promise()->then(function () use (&$resolved) {
186 10
                $this->busy = static::STATE_IDLE;
187 10
                $resolved = true;
188
                
189 10
                if(\count($this->queue) > 0) {
190
                    $this->parser->invokeCommand($this->getNextCommand());
191
                }
192 13
            });
193 13
        });
194
    }
195
    
196
    /**
197
     * Pauses the underlying stream I/O consumption.
198
     * If consumption is already paused, this will do nothing.
199
     * @return bool  Whether the operation was successful.
200
     */
201
    function pauseStreamConsumption(): bool {
202
        if($this->goingAway) {
203
            return false;
204
        }
205
        
206
        $this->connection->pause();
207
        return true;
208
    }
209
    
210
    /**
211
     * Resumes the underlying stream I/O consumption.
212
     * If consumption is not paused, this will do nothing.
213
     * @return bool  Whether the operation was successful.
214
     */
215
    function resumeStreamConsumption(): bool {
216
        if($this->goingAway) {
217
            return false;
218
        }
219
        
220
        $this->connection->resume();
221
        return true;
222
    }
223
    
224
    /**
225
     * Closes all connections gracefully after processing all outstanding requests.
226
     * @return \React\Promise\PromiseInterface
227
     */
228 1
    function close(): \React\Promise\PromiseInterface {
229 1
        if($this->goingAway) {
230
            return $this->goingAway->promise();
231
        }
232
        
233 1
        $state = $this->connectionState;
234 1
        $this->connectionState = \Plasma\DriverInterface::CONNECTION_UNUSABLE;
235
        
236 1
        $this->goingAway = new \React\Promise\Deferred();
237
        
238 1
        if(\count($this->queue) === 0) {
239
            $this->goingAway->resolve();
240
        }
241
        
242
        return $this->goingAway->promise()->then(function () use ($state) {
243 1
            if($state !== static::CONNECTION_OK) {
244
                return;
245
            }
246
            
247 1
            $deferred = new \React\Promise\Deferred();
248
            
249 1
            $quit = new \Plasma\Drivers\MySQL\Commands\QuitCommand();
250
            
251
            $this->connection->once('close', function () use (&$deferred) {
252 1
                $deferred->resolve();
253 1
            });
254
            
255
            $quit->once('end', function () {
256
                $this->connection->close();
257 1
            });
258
            
259 1
            $this->parser->invokeCommand($quit);
260
            
261 1
            return $deferred->promise();
262 1
        });
263
    }
264
    
265
    /**
266
     * Forcefully closes the connection, without waiting for any outstanding requests. This will reject all outstanding requests.
267
     * @return void
268
     */
269 1
    function quit(): void {
270 1
        if($this->goingAway) {
271
            return;
272
        }
273
        
274 1
        $state = $this->connectionState;
275 1
        $this->connectionState = \Plasma\DriverInterface::CONNECTION_UNUSABLE;
276
        
277 1
        $this->goingAway = new \React\Promise\Deferred();
278 1
        $this->goingAway->resolve();
279
        
280
        /** @var \Plasma\Drivers\MySQL\Commands\CommandInterface  $command */
281 1
        while($command = \array_shift($this->queue)) {
282 1
            $command->emit('error', array((new \Plasma\Exception('Connection is going away'))));
283
        }
284
        
285 1
        if($state === static::CONNECTION_OK) {
286 1
            $quit = new \Plasma\Drivers\MySQL\Commands\QuitCommand();
287 1
            $this->parser->invokeCommand($quit);
288
            
289 1
            $this->connection->close();
290
        }
291 1
    }
292
    
293
    /**
294
     * Whether this driver is currently in a transaction.
295
     * @return bool
296
     */
297 1
    function isInTransaction(): bool {
298 1
        return $this->transaction;
299
    }
300
    
301
    /**
302
     * Executes a plain query. Resolves with a `QueryResultInterface` instance.
303
     * When the command is done, the driver must check itself back into the client.
304
     * @param \Plasma\ClientInterface  $client
305
     * @param string                   $query
306
     * @return \React\Promise\PromiseInterface
307
     * @throws \Plasma\Exception
308
     * @see \Plasma\QueryResultInterface
309
     */
310 2
    function query(\Plasma\ClientInterface $client, string $query): \React\Promise\PromiseInterface {
311 2
        if($this->goingAway) {
312
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
313
        }
314
        
315 2
        $command = new \Plasma\Drivers\MySQL\Commands\QueryCommand($this, $query);
316 2
        $this->executeCommand($command);
317
        
318 2
        if(!$this->transaction) {
319
            $command->once('end', function () use (&$client) {
320 1
                $client->checkinConnection($this);
321 1
            });
322
        }
323
        
324 2
        return $command->getPromise();
325
    }
326
    
327
    /**
328
     * Prepares a query. Resolves with a `StatementInterface` instance.
329
     * When the command is done, the driver must check itself back into the client.
330
     * @param \Plasma\ClientInterface  $client
331
     * @param string                   $query
332
     * @return \React\Promise\PromiseInterface
333
     * @throws \Plasma\Exception
334
     * @see \Plasma\StatementInterface
335
     */
336 1
    function prepare(\Plasma\ClientInterface $client, string $query): \React\Promise\PromiseInterface {
337 1
        if($this->goingAway) {
338
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
339
        }
340
        
341 1
        $command = new \Plasma\Drivers\MySQL\Commands\PrepareCommand($client, $this, $query);
342 1
        $this->executeCommand($command);
343
        
344 1
        return $command->getPromise();
345
    }
346
    
347
    /**
348
     * Prepares and executes a query. Resolves with a `QueryResultInterface` instance.
349
     * This is equivalent to prepare -> execute -> close.
350
     * If you need to execute a query multiple times, prepare the query manually for performance reasons.
351
     * @param \Plasma\ClientInterface  $client
352
     * @param string                   $query
353
     * @param array                    $params
354
     * @return \React\Promise\PromiseInterface
355
     * @throws \Plasma\Exception
356
     * @see \Plasma\StatementInterface
357
     */
358
    function execute(\Plasma\ClientInterface $client, string $query, array $params = array()): \React\Promise\PromiseInterface {
359
        if($this->goingAway) {
360
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
361
        }
362
        
363
        return $this->prepare($client, $query)->then(function (\Plasma\StatementInterface $statement) use ($params) {
364
            return $statement->execute($params)->then(function (\Plasma\QueryResultInterface $result) use (&$statement) {
365
                return $statement->close()->then(function () use ($result) {
366
                    return $result;
367
                });
368
            }, function (\Throwable $error) use (&$statement) {
369
                return $statement->close()->then(function () use ($error) {
370
                    throw $error;
371
                });
372
            });
373
        })->always(function () use (&$client) {
0 ignored issues
show
Bug introduced by
The method always() does not exist on React\Promise\PromiseInterface. It seems like you code against a sub-type of said class. However, the method does not exist in React\Promise\SimpleFulfilledTestPromise or React\Promise\SimpleRejectedTestPromise or React\Promise\CancellablePromiseInterface. Are you sure you never get one of those? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

373
        })->/** @scrutinizer ignore-call */ always(function () use (&$client) {
Loading history...
374
            if(!$this->transaction) {
375
                $client->checkinConnection($this);
376
            }
377
        });
378
    }
379
    
380
    /**
381
     * Quotes the string for use in the query.
382
     * @param string  $str
383
     * @return string
384
     * @throws \LogicException  Thrown if the driver does not support quoting.
385
     * @throws \Plasma\Exception
386
     */
387
    function quote(string $str): string { // TODO
0 ignored issues
show
Coding Style introduced by
Opening brace must be the last content on the line
Loading history...
388
        throw new \LogicException('Not implemented yet');
389
    }
390
    
391
    /**
392
     * Begins a transaction. Resolves with a `TransactionInterface` instance.
393
     *
394
     * Checks out a connection until the transaction gets committed or rolled back.
395
     * It must be noted that the user is responsible for finishing the transaction. The client WILL NOT automatically
396
     * check the connection back into the pool, as long as the transaction is not finished.
397
     *
398
     * Some databases, including MySQL, automatically issue an implicit COMMIT when a database definition language (DDL)
399
     * statement such as DROP TABLE or CREATE TABLE is issued within a transaction.
400
     * The implicit COMMIT will prevent you from rolling back any other changes within the transaction boundary.
401
     * @param \Plasma\ClientInterface  $client
402
     * @param int                      $isolation  See the `TransactionInterface` constants.
403
     * @return \React\Promise\PromiseInterface
404
     * @throws \Plasma\Exception
405
     * @see \Plasma\TransactionInterface
406
     */
407 1
    function beginTransaction(\Plasma\ClientInterface $client, int $isolation = \Plasma\TransactionInterface::ISOLATION_COMMITTED): \React\Promise\PromiseInterface {
408 1
        if($this->goingAway) {
409
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
410
        }
411
        
412 1
        if($this->transaction) {
413
            throw new \Plasma\Exception('Driver is already in transaction');
414
        }
415
        
416
        switch ($isolation) {
417 1
            case \Plasma\TransactionInterface::ISOLATION_UNCOMMITTED:
1 ignored issue
show
Coding Style introduced by
Line indented incorrectly; expected 8 spaces, found 12
Loading history...
418
                $query = 'SET SESSION TRANSACTION ISOLATION LEVEL READ UNCOMMITTED';
419
            break;
1 ignored issue
show
Coding Style introduced by
Case breaking statement indented incorrectly; expected 16 spaces, found 12
Loading history...
420 1
            case \Plasma\TransactionInterface::ISOLATION_COMMITTED:
1 ignored issue
show
Coding Style introduced by
Line indented incorrectly; expected 8 spaces, found 12
Loading history...
421 1
                $query = 'SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED';
422 1
            break;
1 ignored issue
show
Coding Style introduced by
Case breaking statement indented incorrectly; expected 16 spaces, found 12
Loading history...
423
            case \Plasma\TransactionInterface::ISOLATION_REPEATABLE:
1 ignored issue
show
Coding Style introduced by
Line indented incorrectly; expected 8 spaces, found 12
Loading history...
424
                $query = 'SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ';
425
            break;
1 ignored issue
show
Coding Style introduced by
Case breaking statement indented incorrectly; expected 16 spaces, found 12
Loading history...
426
            case \Plasma\TransactionInterface::ISOLATION_SERIALIZABLE:
1 ignored issue
show
Coding Style introduced by
Line indented incorrectly; expected 8 spaces, found 12
Loading history...
427
                $query = 'SET SESSION TRANSACTION ISOLATION LEVEL SERIALIZABLE';
428
            break;
1 ignored issue
show
Coding Style introduced by
Case breaking statement indented incorrectly; expected 16 spaces, found 12
Loading history...
429
            default:
1 ignored issue
show
Coding Style introduced by
Line indented incorrectly; expected 8 spaces, found 12
Loading history...
430
                throw new \Plasma\Exception('Invalid isolation level given');
431
            break;
432
        }
433
        
434 1
        $this->transaction = true;
435
        
436
        return $this->query($client, $query)->then(function () use (&$client) {
437 1
            return $this->query($client, 'START TRANSACTION');
438
        })->then(function () use (&$client, $isolation) {
439 1
            return (new \Plasma\Transaction($client, $this, $isolation));
440
        })->otherwise(function (\Throwable $e) {
0 ignored issues
show
Bug introduced by
The method otherwise() does not exist on React\Promise\PromiseInterface. It seems like you code against a sub-type of said class. However, the method does not exist in React\Promise\SimpleFulfilledTestPromise or React\Promise\SimpleRejectedTestPromise or React\Promise\CancellablePromiseInterface. Are you sure you never get one of those? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

440
        })->/** @scrutinizer ignore-call */ otherwise(function (\Throwable $e) {
Loading history...
441
            $this->transaction = false;
442
            throw $e;
443 1
        });
444
    }
445
    
446
    /**
447
     * Informationally closes a transaction. This method is used by `Transaction` to inform the driver of the end of the transaction.
448
     * @return void
449
     */
450 1
    function endTransaction(): void {
451 1
        $this->transaction = false;
452 1
    }
453
    
454
    /**
455
     * Runs the given command.
456
     * Returns a Promise, which resolves with the `end` event argument (defaults to `null),
457
     * or rejects with the `Throwable` of the `error` event.
458
     * When the command is done, the driver must check itself back into the client.
459
     * @param \Plasma\ClientInterface   $client
460
     * @param \Plasma\CommandInterface  $command
461
     * @return \React\Promise\PromiseInterface
462
     */
463 4
    function runCommand(\Plasma\ClientInterface $client, \Plasma\CommandInterface $command) {
464 4
        if($this->goingAway) {
465
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
466
        }
467
        
468
        return (new \React\Promise\Promise(function (callable $resolve, callable $reject) use (&$client, &$command) {
469
            $command->once('end', function ($value = null) use (&$client, &$resolve) {
470 2
                if(!$this->transaction) {
471 2
                    $client->checkinConnection($this);
472
                }
473
                
474 2
                $resolve($value);
475 4
            });
476
            
477
            $command->once('error', function (\Throwable $error) use (&$client, &$reject) {
478 2
                if(!$this->transaction) {
479 2
                    $client->checkinConnection($this);
480
                }
481
                
482 2
                $reject($error);
483 4
            });
484
            
485 4
            $this->executeCommand($command);
486 4
        }));
487
    }
488
    
489
    /**
490
     * Executes a command.
491
     * @param \Plasma\CommandInterface  $command
492
     * @return void
493
     * @internal
494
     */
495 7
    function executeCommand(\Plasma\CommandInterface $command): void {
496 7
        $this->queue[] = $command;
497 7
        \Plasma\Drivers\MySQL\Messages\MessageUtility::debug('Command '.get_class($command).' added to queue');
498
        
499 7
        if($this->parser && $this->busy === static::STATE_IDLE) {
500 6
            \Plasma\Drivers\MySQL\Messages\MessageUtility::debug('Command '.get_class($command).' invoked into parser');
501 6
            $this->parser->invokeCommand($this->getNextCommand());
502
        }
503 7
    }
504
    
505
    /**
506
     * Get the handshake message, or null if none received yet.
507
     * @return \Plasma\Drivers\MySQL\Messages\HandshakeMessage|null
508
     */
509
    function getHandshake(): ?\Plasma\Drivers\MySQL\Messages\HandshakeMessage {
510
        if($this->parser) {
511
            return $this->parser->getHandshakeMessage();
512
        }
513
        
514
        return null;
515
    }
516
    
517
    /**
518
     * Get the next command, or null.
519
     * @return \Plasma\CommandInterface|null
520
     * @internal
521
     */
522 6
    function getNextCommand(): ?\Plasma\CommandInterface {
523 6
        if(\count($this->queue) === 0) {
524 3
            if($this->goingAway) {
525
                $this->goingAway->resolve();
526
            }
527
            
528 3
            return null;
529 6
        } elseif($this->busy === static::STATE_BUSY) {
530
            return null;
531
        }
532
        
533
        /** @var \Plasma\CommandInterface  $command */
534 6
        $command =  \array_shift($this->queue);
535
        
536 6
        \Plasma\Drivers\MySQL\Messages\MessageUtility::debug('Unshifted command '.get_class($command));
537
        
538 6
        if($command->waitForCompletion()) {
539 6
            $this->busy = static::STATE_BUSY;
540
            
541
            $command->once('error', function () use (&$command) {
542 2
                \Plasma\Drivers\MySQL\Messages\MessageUtility::debug('Command '.get_class($command).' errored');
543 2
                $this->busy = static::STATE_IDLE;
544
                
545 2
                $this->endCommand();
546 6
            });
547
            
548
            $command->once('end', function () use (&$command) {
549 4
                \Plasma\Drivers\MySQL\Messages\MessageUtility::debug('Command '.get_class($command).' ended');
550 4
                $this->busy = static::STATE_IDLE;
551
                
552 4
                $this->endCommand();
553 6
            });
554
        } else {
555
            $this->endCommand();
556
        }
557
        
558 6
        return $command;
559
    }
560
    
561
    /**
562
     * Finishes up a command.
563
     * @return void
564
     */
565 5
    protected function endCommand() {
566
        $this->loop->futureTick(function () {
567 4
            if($this->goingAway && \count($this->queue) === 0) {
568 1
                return $this->goingAway->resolve();
569
            }
570
            
571 4
            $this->parser->invokeCommand($this->getNextCommand());
572 5
        });
573 5
    }
574
    
575
    /**
576
     * Starts the handshake process.
577
     * @param array                    $credentials
578
     * @param \React\Promise\Deferred  $deferred
579
     * @return void
580
     */
581 13
    protected function startHandshake(array $credentials, \React\Promise\Deferred $deferred) {
582
        $listener = function (\Plasma\Drivers\MySQL\Messages\MessageInterface $message) use ($credentials, &$deferred, &$listener) {
583 13
            if($message instanceof \Plasma\Drivers\MySQL\Messages\HandshakeMessage) {
584 13
                $this->parser->removeListener('message', $listener);
585
                
586 13
                $this->connectionState = static::CONNECTION_SETENV;
587 13
                $clientFlags = \Plasma\Drivers\MySQL\ProtocolParser::CLIENT_CAPABILITIES;
588
                
589 13
                \extract($credentials);
590
                
591 13
                if($db !== '') {
592 1
                    $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_CONNECT_WITH_DB;
593
                }
594
                
595
                // Check if we support auth plugins
596 13
                $plugins = \Plasma\Drivers\MySQL\DriverFactory::getAuthPlugins();
597 13
                $plugin = null;
598
                
599 13
                foreach($plugins as $key => $plug) {
600 13
                    if(\is_int($key) && ($message->capability & $key) !== 0) {
601
                        $plugin = $plug;
602
                        $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_PLUGIN_AUTH;
603
                        break;
604 13
                    } elseif($key === $message->authPluginName) {
605 13
                        $plugin = $plug;
606 13
                        $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_PLUGIN_AUTH;
607 13
                        break;
608
                    }
609
                }
610
                
611 13
                $remote = \parse_url($this->connection->getRemoteAddress())['host'];
612
                
613 13
                if($remote !== '127.0.0.1' || $this->options['tls.forceLocal']) {
614 2
                    if(($message->capability & \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_SSL) !== 0) { // If SSL supported, connect through SSL
615
                        $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_SSL;
616
                        
617
                        $ssl = new \Plasma\Drivers\MySQL\Commands\SSLRequestCommand($message, $clientFlags);
618
                        
619
                        $ssl->once('end', function () use ($credentials, $clientFlags, $plugin, &$deferred, &$message) {
620
                            $this->connectionState = static::CONNECTION_SSL_STARTUP;
621
                            
622
                            $this->enableTLS()->then(function () use ($credentials, $clientFlags, $plugin, &$deferred, &$message) {
623
                                $this->createHandshakeResponse($message, $credentials, $clientFlags, $plugin, $deferred);
624
                            }, function (\Throwable $error) use (&$deferred) {
625
                                $deferred->reject($$error);
626
                                $this->connection->close();
627
                            });
628
                        });
629
                        
630
                        return $this->parser->invokeCommand($ssl);
631 2
                    } elseif($this->options['tls.force'] || $this->options['tls.forceLocal']) {
632 2
                        $deferred->reject((new \Plasma\Exception('TLS is not supported by the server')));
633 2
                        $this->connection->close();
634 2
                        return;
635
                    }
636
                }
637
                
638 11
                $this->createHandshakeResponse($message, $credentials, $clientFlags, $plugin, $deferred);
639
            }
640 13
        };
641
        
642 13
        $this->parser->on('message', $listener);
643
        
644
        $this->parser->on('message', function (\Plasma\Drivers\MySQL\Messages\MessageInterface $message) {
645 13
            if($message instanceof \Plasma\Drivers\MySQL\Messages\OkResponseMessage) {
646 10
                $this->connectionState = static::CONNECTION_OK;
647
            }
648
            
649 13
            $this->emit('eventRelay', array('message', $message));
650 13
        });
651 13
    }
652
    
653
    /**
654
     * Enables TLS on the connection.
655
     * @return \React\Promise\PromiseInterface
656
     */
657
    protected function enableTLS(): \React\Promise\PromiseInterface {
658
        // Set required SSL/TLS context options
659
        foreach($this->options['tls.context'] as $name => $value) {
660
            \stream_context_set_option($this->connection->stream, 'ssl', $name, $value);
661
        }
662
        
663
        return $this->encryption->enable($this->connection)->otherwise(function (\Throwable $error) {
664
            $this->connection->close();
665
            throw new \RuntimeException('Connection failed during TLS handshake: '.$error->getMessage(), $error->getCode());
666
        });
667
    }
668
    
669
    /**
670
     * Sends the auth command.
671
     * @param \Plasma\Drivers\MySQL\Messages\HandshakeMessage  $message
672
     * @param array                                            $credentials
673
     * @param int                                              $clientFlags
674
     * @param string|null                                      $plugin
675
     * @param \React\Promise\Deferred                          $deferred
676
     * @return void
677
     */
678 11
    protected function createHandshakeResponse(
679
        \Plasma\Drivers\MySQL\Messages\HandshakeMessage $message, array $credentials, int $clientFlags, ?string $plugin, \React\Promise\Deferred $deferred
680
    ) {
681 11
        \extract($credentials);
682
        
683 11
        $auth = new \Plasma\Drivers\MySQL\Commands\HandshakeResponseCommand($this->parser, $message, $clientFlags, $plugin, $user, $password, $db);
684
        
685
        $auth->once('end', function () use (&$deferred) {
686 10
            $deferred->resolve();
687 11
        });
688
        
689
        $auth->once('error', function (\Throwable $error) use (&$deferred) {
690 1
            $deferred->reject($error);
691 1
            $this->connection->close();
692 11
        });
693
        
694 11
        if($plugin) {
695
            $listener = function (\Plasma\Drivers\MySQL\Messages\MessageInterface $message) use ($password, &$deferred, &$listener) {
696
                /** @var \Plasma\Drivers\MySQL\AuthPlugins\AuthPluginInterface|null  $plugin */
697 11
                static $plugin;
698
                
699 11
                if($message instanceof \Plasma\Drivers\MySQL\Messages\AuthSwitchRequestMessage) {
700
                    $name = $message->authPluginName;
701
                    
702
                    if($name !== null) {
703
                        foreach($plugins as $key => $plug) {
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable $plugins does not exist. Did you maybe mean $plugin?
Loading history...
704
                            if($key === $name) {
705
                                $plugin = new $plug($this->parser, $message);
706
                                
707
                                $command = new \Plasma\Drivers\MySQL\Commands\AuthSwitchResponseCommand($message, $plugin, $password);
708
                                return $this->parser->invokeCommand($command);
709
                            }
710
                        }
711
                    }
712
                    
713
                    $deferred->reject((new \Plasma\Exception('Requested authentication method '.($name ? '"'.$name.'" ' : '').'is not supported')));
714 11
                } elseif($message instanceof \Plasma\Drivers\MySQL\Messages\AuthMoreDataMessage) {
715
                    if($plugin === null) {
716
                        $deferred->reject((new \Plasma\Exception('No auth plugin is in use, but we received auth more data packet')));
717
                        return $this->connection->close();
718
                    }
719
                    
720
                    try {
721
                        $command = $plugin->receiveMoreData($message);
722
                        return $this->parser->invokeCommand($command);
723
                    } catch (\Plasma\Exception $e) {
724
                        $deferred->reject($e);
725
                        $this->connection->close();
726
                    }
727 11
                } elseif($message instanceof \Plasma\Drivers\MySQL\Messages\OkResponseMessage) {
728 10
                    $this->parser->removeListener('message', $listener);
729
                }
730 11
            };
731
            
732 11
            $this->parser->on('message', $listener);
733
        }
734
        
735 11
        $this->parser->invokeCommand($auth);
736 11
        $this->connectionState = static::CONNECTION_AWAITING_RESPONSE;
737 11
    }
738
    
739
    /**
740
     * Validates the given options.
741
     * @param array  $options
742
     * @return void
743
     * @throws \InvalidArgumentException
744
     */
745 18
    protected function validateOptions(array $options) {
746 18
        $validator = \CharlotteDunois\Validation\Validator::make($options, array(
747 18
            'connector' => 'class:\React\Socket\ConnectorInterface=object',
748
            'tls.context' => 'array',
749
            'tls.force' => 'boolean',
750
            'tls.forceLocal' => 'boolean'
751
        ));
752
        
753 18
        $validator->throw(\InvalidArgumentException::class);
754 18
    }
755
}
756