GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Branch master (65ffdb)
by Charlotte
02:16
created

Driver::prepare()   A

Complexity

Conditions 4
Paths 4

Size

Total Lines 17
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 9
CRAP Score 4.0961

Importance

Changes 0
Metric Value
cc 4
eloc 10
nc 4
nop 2
dl 0
loc 17
ccs 9
cts 11
cp 0.8182
crap 4.0961
rs 9.9332
c 0
b 0
f 0
1
<?php
2
/**
3
 * Plasma Driver MySQL component
4
 * Copyright 2018 PlasmaPHP, All Rights Reserved
5
 *
6
 * Website: https://github.com/PlasmaPHP
7
 * License: https://github.com/PlasmaPHP/driver-mysql/blob/master/LICENSE
8
*/
9
10
namespace Plasma\Drivers\MySQL;
11
12
/**
13
 * The MySQL Driver.
14
 * @internal
15
 */
16
class Driver implements \Plasma\DriverInterface {
17
    use \Evenement\EventEmitterTrait;
18
    
19
    /**
20
     * @var \React\EventLoop\LoopInterface
21
     */
22
    protected $loop;
23
    
24
    /**
25
     * @var array
26
     */
27
    protected $options = array(
28
        'characters.set' => 'utf8mb4',
29
        'characters.collate' => null,
30
        'tls.context' => array(),
31
        'tls.force' => true,
32
        'tls.forceLocal' => false
33
    );
34
    
35
    /**
36
     * @var string[]
37
     */
38
    protected $allowedSchemes = array('mysql', 'tcp', 'unix');
39
    
40
    /**
41
     * @var \React\Socket\ConnectorInterface
42
     */
43
    protected $connector;
44
    
45
    /**
46
     * Internal class is intentional used, as there's no other way currently.
47
     * @var \React\Socket\StreamEncryption
48
     * @see https://github.com/reactphp/socket/issues/180
49
     */
50
    protected $encryption;
51
    
52
    /**
53
     * @var \React\Promise\Promise|null
54
     */
55
    protected $connectPromise;
56
    
57
    /**
58
     * @var \React\Socket\Connection
59
     */
60
    protected $connection;
61
    
62
    /**
63
     * @var int
64
     */
65
    protected $connectionState = \Plasma\DriverInterface::CONNECTION_CLOSED;
66
    
67
    /**
68
     * @var \Plasma\Drivers\MySQL\ProtocolParser
69
     */
70
    protected $parser;
71
    
72
    /**
73
     * @var array
74
     */
75
    protected $queue;
76
    
77
    /**
78
     * @var int
79
     */
80
    protected $busy = \Plasma\DriverInterface::STATE_IDLE;
81
    
82
    /**
83
     * @var bool
84
     */
85
    protected $transaction = false;
86
    
87
    /**
88
     * @var \React\Promise\Deferred
89
     */
90
    protected $goingAway;
91
    
92
    /**
93
     * @var string|null
94
     */
95
    protected $charset;
96
    
97
    /**
98
     * Constructor.
99
     * @param \React\EventLoop\LoopInterface  $loop
100
     * @param array                           $options
101
     */
102 80
    function __construct(\React\EventLoop\LoopInterface $loop, array $options) {
103 80
        $this->validateOptions($options);
104
        
105 80
        $this->loop = $loop;
106 80
        $this->options = \array_merge($this->options, $options);
107
        
108 80
        $this->connector = ($options['connector'] ?? (new \React\Socket\Connector($loop)));
109 80
        $this->encryption = new \React\Socket\StreamEncryption($this->loop, false);
110 80
        $this->queue = array();
111 80
    }
112
    
113
    /**
114
     * Returns the event loop.
115
     * @return \React\EventLoop\LoopInterface
116
     */
117 54
    function getLoop(): \React\EventLoop\LoopInterface {
118 54
        return $this->loop;
119
    }
120
    
121
    /**
122
     * Retrieves the current connection state.
123
     * @return int
124
     */
125 6
    function getConnectionState(): int {
126 6
        return $this->connectionState;
127
    }
128
    
129
    /**
130
     * Retrieves the current busy state.
131
     * @return int
132
     */
133 1
    function getBusyState(): int {
134 1
        return $this->busy;
135
    }
136
    
137
    /**
138
     * Get the length of the driver backlog queue.
139
     * @return int
140
     */
141 1
    function getBacklogLength(): int {
142 1
        return \count($this->queue);
143
    }
144
    
145
    /**
146
     * Connects to the given URI.
147
     * @param string  $uri
148
     * @return \React\Promise\PromiseInterface
149
     */
150 59
    function connect(string $uri): \React\Promise\PromiseInterface {
151 59
        if($this->goingAway || $this->connectionState === \Plasma\DriverInterface::CONNECTION_UNUSABLE) {
152 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
153 58
        } elseif($this->connectionState === \Plasma\DriverInterface::CONNECTION_OK) {
154 1
            return \React\Promise\resolve();
155 58
        } elseif($this->connectPromise !== null) {
156 1
            return $this->connectPromise;
157
        }
158
        
159 58
        if(\strpos($uri, '://') === false) {
160 2
            $uri = 'tcp://'.$uri;
161
        }
162
        
163 58
        $parts = \parse_url($uri);
164 58
        if(!isset($parts['scheme']) || !isset($parts['host']) || !\in_array($parts['scheme'], $this->allowedSchemes)) {
165 2
            return \React\Promise\reject((new \InvalidArgumentException('Invalid connect uri given')));
166
        }
167
        
168 56
        if($parts['scheme'] === 'mysql') {
169 1
            $parts['scheme'] = 'tcp';
170
        }
171
        
172 56
        $host = $parts['scheme'].'://'.$parts['host'].($parts['scheme'] !== 'unix' ? ':'.($parts['port'] ?? 3306) : '');
173 56
        $this->connectionState = static::CONNECTION_STARTED;
174 56
        $resolved = false;
175
        
176
        $this->connectPromise =  $this->connector->connect($host)->then(function (\React\Socket\ConnectionInterface $connection) use ($parts, &$resolved) {
177
            // See description of property encryption
178 56
            if(!($connection instanceof \React\Socket\Connection)) {
179
                throw new \LogicException('Custom connection class is NOT supported yet (encryption limitation)');
180
            }
181
            
182 56
            $this->busy = static::STATE_BUSY;
183 56
            $this->connectionState = static::CONNECTION_MADE;
184 56
            $this->connection = $connection;
185
            
186
            $this->connection->on('error', function (\Throwable $error) {
187
                $this->emit('error', array($error));
188 56
            });
189
            
190
            $this->connection->on('close', function () {
191 3
                $this->connection = null;
192 3
                $this->connectionState = static::CONNECTION_UNUSABLE;
193
                
194 3
                $this->emit('close');
195 56
            });
196
            
197 56
            $deferred = new \React\Promise\Deferred();
198 56
            $this->parser = new \Plasma\Drivers\MySQL\ProtocolParser($this, $this->connection);
199
            
200
            $this->parser->on('error', function (\Throwable $error) use (&$deferred, &$resolved) {
201
                if($resolved) {
202
                    $this->emit('error', array($error));
203
                } else {
204
                    $deferred->reject($error);
205
                }
206 56
            });
207
            
208 56
            $user = ($parts['user'] ?? 'root');
209 56
            $password = ($parts['pass'] ?? '');
210 56
            $db = (!empty($parts['path']) ? \ltrim($parts['path'], '/') : '');
211
            
212 56
            $credentials = \compact('user', 'password', 'db');
213
            
214 56
            $this->startHandshake($credentials, $deferred);
215
            return $deferred->promise()->then(function () use (&$resolved) {
216 55
                $this->busy = static::STATE_IDLE;
217 55
                $resolved = true;
218
                
219 55
                if(\count($this->queue) > 0) {
220
                    $this->parser->invokeCommand($this->getNextCommand());
221
                }
222 56
            });
223 56
        });
224
        
225 56
        if($this->options['characters.set']) {
226 54
            $this->charset = $this->options['characters.set'];
227
            $this->connectPromise = $this->connectPromise->then(function () {
228 53
                $query = 'SET NAMES "'.$this->options['characters.set'].'"'
229 53
                    .($this->options['characters.collate'] ? ' COLLATE "'.$this->options['characters.collate'].'"' : '');
230
                
231 53
                $cmd = new \Plasma\Drivers\MySQL\Commands\QueryCommand($this, $query);
232 53
                $this->executeCommand($cmd);
233
                
234 53
                return $cmd->getPromise();
235 54
            });
236
        }
237
        
238 56
        return $this->connectPromise;
239
    }
240
    
241
    /**
242
     * Pauses the underlying stream I/O consumption.
243
     * If consumption is already paused, this will do nothing.
244
     * @return bool  Whether the operation was successful.
245
     */
246 1
    function pauseStreamConsumption(): bool {
247 1
        if($this->connection === null || $this->goingAway) {
248 1
            return false;
249
        }
250
        
251
        $this->connection->pause();
252
        return true;
253
    }
254
    
255
    /**
256
     * Resumes the underlying stream I/O consumption.
257
     * If consumption is not paused, this will do nothing.
258
     * @return bool  Whether the operation was successful.
259
     */
260 1
    function resumeStreamConsumption(): bool {
261 1
        if($this->connection === null || $this->goingAway) {
262 1
            return false;
263
        }
264
        
265
        $this->connection->resume();
266
        return true;
267
    }
268
    
269
    /**
270
     * Closes all connections gracefully after processing all outstanding requests.
271
     * @return \React\Promise\PromiseInterface
272
     */
273 2
    function close(): \React\Promise\PromiseInterface {
274 2
        if($this->goingAway) {
275 2
            return $this->goingAway->promise();
276
        }
277
        
278 1
        $state = $this->connectionState;
279 1
        $this->connectionState = \Plasma\DriverInterface::CONNECTION_UNUSABLE;
280
        
281 1
        $this->goingAway = new \React\Promise\Deferred();
282
        
283 1
        if(\count($this->queue) === 0) {
284
            $this->goingAway->resolve();
285
        }
286
        
287
        return $this->goingAway->promise()->then(function () use ($state) {
288 1
            if($state !== static::CONNECTION_OK) {
289
                return;
290
            }
291
            
292 1
            $deferred = new \React\Promise\Deferred();
293
            
294 1
            $quit = new \Plasma\Drivers\MySQL\Commands\QuitCommand();
295
            
296
            $this->connection->once('close', function () use (&$deferred) {
297 1
                $deferred->resolve();
298 1
            });
299
            
300
            $quit->once('end', function () {
301
                $this->connection->close();
302 1
            });
303
            
304 1
            $this->parser->invokeCommand($quit);
305
            
306 1
            return $deferred->promise();
307 1
        });
308
    }
309
    
310
    /**
311
     * Forcefully closes the connection, without waiting for any outstanding requests. This will reject all outstanding requests.
312
     * @return void
313
     */
314 10
    function quit(): void {
315 10
        if($this->goingAway) {
316 1
            return;
317
        }
318
        
319 10
        $state = $this->connectionState;
320 10
        $this->connectionState = \Plasma\DriverInterface::CONNECTION_UNUSABLE;
321
        
322 10
        $this->goingAway = new \React\Promise\Deferred();
323 10
        $this->goingAway->resolve();
324
        
325
        /** @var \Plasma\Drivers\MySQL\Commands\CommandInterface  $command */
326 10
        while($command = \array_shift($this->queue)) {
327 1
            $command->emit('error', array((new \Plasma\Exception('Connection is going away'))));
328
        }
329
        
330 10
        if($state === static::CONNECTION_OK) {
331 1
            $quit = new \Plasma\Drivers\MySQL\Commands\QuitCommand();
332 1
            $this->parser->invokeCommand($quit);
333
            
334 1
            $this->connection->close();
335
        }
336 10
    }
337
    
338
    /**
339
     * Whether this driver is currently in a transaction.
340
     * @return bool
341
     */
342 2
    function isInTransaction(): bool {
343 2
        return $this->transaction;
344
    }
345
    
346
    /**
347
     * Executes a plain query. Resolves with a `QueryResultInterface` instance.
348
     * When the command is done, the driver must check itself back into the client.
349
     * @param \Plasma\ClientInterface  $client
350
     * @param string                   $query
351
     * @return \React\Promise\PromiseInterface
352
     * @throws \Plasma\Exception
353
     * @see \Plasma\QueryResultInterface
354
     */
355 45
    function query(\Plasma\ClientInterface $client, string $query): \React\Promise\PromiseInterface {
356 45
        if($this->goingAway) {
357 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
358 44
        } elseif($this->connectionState !== \Plasma\DriverInterface::CONNECTION_OK) {
359 1
            if($this->connectPromise !== null) {
360
                return $this->connectPromise->then(function () use (&$client, &$query) {
361
                    return $this->query($client, $query);
362
                });
363
            }
364
            
365 1
            throw new \Plasma\Exception('Unable to continue without connection');
366
        }
367
        
368 43
        $command = new \Plasma\Drivers\MySQL\Commands\QueryCommand($this, $query);
369 43
        $this->executeCommand($command);
370
        
371 43
        if(!$this->transaction) {
372
            $command->once('end', function () use (&$client) {
373 41
                $client->checkinConnection($this);
374 41
            });
375
        }
376
        
377 43
        return $command->getPromise();
378
    }
379
    
380
    /**
381
     * Prepares a query. Resolves with a `StatementInterface` instance.
382
     * When the command is done, the driver must check itself back into the client.
383
     * @param \Plasma\ClientInterface  $client
384
     * @param string                   $query
385
     * @return \React\Promise\PromiseInterface
386
     * @throws \Plasma\Exception
387
     * @see \Plasma\StatementInterface
388
     */
389 38
    function prepare(\Plasma\ClientInterface $client, string $query): \React\Promise\PromiseInterface {
390 38
        if($this->goingAway) {
391 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
392 37
        } elseif($this->connectionState !== \Plasma\DriverInterface::CONNECTION_OK) {
393 1
            if($this->connectPromise !== null) {
394
                return $this->connectPromise->then(function () use (&$client, &$query) {
395
                    return $this->prepare($client, $query);
396
                });
397
            }
398
            
399 1
            throw new \Plasma\Exception('Unable to continue without connection');
400
        }
401
        
402 36
        $command = new \Plasma\Drivers\MySQL\Commands\StatementPrepareCommand($client, $this, $query);
403 36
        $this->executeCommand($command);
404
        
405 36
        return $command->getPromise();
406
    }
407
    
408
    /**
409
     * Prepares and executes a query. Resolves with a `QueryResultInterface` instance.
410
     * This is equivalent to prepare -> execute -> close.
411
     * If you need to execute a query multiple times, prepare the query manually for performance reasons.
412
     * @param \Plasma\ClientInterface  $client
413
     * @param string                   $query
414
     * @param array                    $params
415
     * @return \React\Promise\PromiseInterface
416
     * @throws \Plasma\Exception
417
     * @see \Plasma\StatementInterface
418
     */
419 37
    function execute(\Plasma\ClientInterface $client, string $query, array $params = array()): \React\Promise\PromiseInterface {
420 37
        if($this->goingAway) {
421 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
422 36
        } elseif($this->connectionState !== \Plasma\DriverInterface::CONNECTION_OK) {
423 1
            if($this->connectPromise !== null) {
424
                return $this->connectPromise->then(function () use (&$client, &$query, $params) {
425
                    return $this->execute($client, $query, $params);
426
                });
427
            }
428
            
429 1
            throw new \Plasma\Exception('Unable to continue without connection');
430
        }
431
        
432
        return $this->prepare($client, $query)->then(function (\Plasma\StatementInterface $statement) use ($params) {
433
            return $statement->execute($params)->then(function (\Plasma\QueryResultInterface $result) use (&$statement) {
434 34
                if($result instanceof \Plasma\StreamQueryResultInterface) {
435
                    $statement->close(null, function (\Throwable $error) {
436
                        $this->emit('error', array($error));
437 34
                    });
438
                    
439 34
                    return $result;
440
                }
441
                
442
                return $statement->close()->then(function () use ($result) {
443 33
                    return $result;
444 33
                });
445
            }, function (\Throwable $error) use (&$statement) {
446
                return $statement->close()->then(function () use ($error) {
447
                    throw $error;
448
                });
449 35
            });
450 35
        });
451
    }
452
    
453
    /**
454
     * Quotes the string for use in the query.
455
     * @param string  $str
456
     * @param int     $type  For types, see the constants.
457
     * @return string
458
     * @throws \LogicException  Thrown if the driver does not support quoting.
459
     * @throws \Plasma\Exception
460
     */
461 5
    function quote(string $str, int $type = \Plasma\DriverInterface::QUOTE_TYPE_VALUE): string {
462 5
        if($this->parser === null) {
463 1
            throw new \Plasma\Exception('Unable to continue without connection');
464
        }
465
        
466 4
        $message = $this->parser->getLastOkMessage();
467 4
        if($message === null) {
468
            $message = $this->parser->getHandshakeMessage();
469
            
470
            if($message === null) {
471
                throw new \Plasma\Exception('Unable to quote without a previous handshake');
472
            }
473
        }
474
        
475 4
        $pos = \strpos($this->charset, '_');
476 4
        $dbCharset = \substr($this->charset, 0, ($pos !== false ? $pos : \strlen($this->charset)));
477 4
        $realCharset = $this->getRealCharset($dbCharset);
478
        
479 4
        if(($message->statusFlags & \Plasma\Drivers\MySQL\StatusFlags::SERVER_STATUS_NO_BACKSLASH_ESCAPES) !== 0) {
480
            return $this->escapeUsingQuotes($realCharset, $str, $type);
481
        }
482
        
483 4
        return $this->escapeUsingBackslashes($realCharset, $str, $type);
484
    }
485
    
486
    /**
487
     * Escapes using quotes.
488
     * @param string  $realCharset
489
     * @param string  $str
490
     * @param int     $type
491
     * @return string
492
     */
493 2
    function escapeUsingQuotes(string $realCharset, string $str, int $type): string {
494 2
        if($type === \Plasma\DriverInterface::QUOTE_TYPE_IDENTIFIER) {
495 1
            return '`'.\str_replace('`', '``', $str).'`';
496
        }
497
        
498
        $escapeChars = array(
499 1
            '"',
500
            '\\',
501
        );
502
        
503
        $escapeReplace = array(
504 1
            '""',
505
            '\\\\',
506
        );
507
        
508 1
        return '"'.\str_replace($escapeChars, $escapeReplace, $str).'"';
509
    }
510
    
511
    /**
512
     * Escapes using backslashes.
513
     * @param string  $realCharset
514
     * @param string  $str
515
     * @param int     $type
516
     * @return string
517
     */
518 6
    function escapeUsingBackslashes(string $realCharset, string $str, int $type): string {
519 6
        if($type === \Plasma\DriverInterface::QUOTE_TYPE_IDENTIFIER) {
520 3
            return '`'.\str_replace('`', '\\`', $str).'`';
521
        }
522
        
523
        $escapeChars = array(
524 3
            '\\',
525
            '"',
526
        );
527
        
528
        $escapeReplace = array(
529 3
            '\\\\',
530
            '\"',
531
        );
532
        
533 3
        return '"'.\str_replace($escapeChars, $escapeReplace, $str).'"';
534
    }
535
    
536
    /**
537
     * Begins a transaction. Resolves with a `TransactionInterface` instance.
538
     *
539
     * Checks out a connection until the transaction gets committed or rolled back.
540
     * It must be noted that the user is responsible for finishing the transaction. The client WILL NOT automatically
541
     * check the connection back into the pool, as long as the transaction is not finished.
542
     *
543
     * Some databases, including MySQL, automatically issue an implicit COMMIT when a database definition language (DDL)
544
     * statement such as DROP TABLE or CREATE TABLE is issued within a transaction.
545
     * The implicit COMMIT will prevent you from rolling back any other changes within the transaction boundary.
546
     * @param \Plasma\ClientInterface  $client
547
     * @param int                      $isolation  See the `TransactionInterface` constants.
548
     * @return \React\Promise\PromiseInterface
549
     * @throws \Plasma\Exception
550
     * @see \Plasma\TransactionInterface
551
     */
552 3
    function beginTransaction(\Plasma\ClientInterface $client, int $isolation = \Plasma\TransactionInterface::ISOLATION_COMMITTED): \React\Promise\PromiseInterface {
553 3
        if($this->goingAway) {
554 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
555
        }
556
        
557 2
        if($this->transaction) {
558 1
            throw new \Plasma\Exception('Driver is already in transaction');
559
        }
560
        
561
        switch ($isolation) {
562 2
            case \Plasma\TransactionInterface::ISOLATION_UNCOMMITTED:
563
                $query = 'SET SESSION TRANSACTION ISOLATION LEVEL READ UNCOMMITTED';
564
            break;
565 2
            case \Plasma\TransactionInterface::ISOLATION_COMMITTED:
566 2
                $query = 'SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED';
567 2
            break;
568
            case \Plasma\TransactionInterface::ISOLATION_REPEATABLE:
569
                $query = 'SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ';
570
            break;
571
            case \Plasma\TransactionInterface::ISOLATION_SERIALIZABLE:
572
                $query = 'SET SESSION TRANSACTION ISOLATION LEVEL SERIALIZABLE';
573
            break;
574
            default:
575
                throw new \Plasma\Exception('Invalid isolation level given');
576
            break;
577
        }
578
        
579 2
        $this->transaction = true;
580
        
581
        return $this->query($client, $query)->then(function () use (&$client) {
582 2
            return $this->query($client, 'START TRANSACTION');
583
        })->then(function () use (&$client, $isolation) {
584 2
            return (new \Plasma\Transaction($client, $this, $isolation));
585
        })->then(null, function (\Throwable $e) {
586
            $this->transaction = false;
587
            throw $e;
588 2
        });
589
    }
590
    
591
    /**
592
     * Informationally closes a transaction. This method is used by `Transaction` to inform the driver of the end of the transaction.
593
     * @return void
594
     */
595 1
    function endTransaction(): void {
596 1
        $this->transaction = false;
597 1
    }
598
    
599
    /**
600
     * Runs the given command.
601
     * Returns a Promise, which resolves with the `end` event argument (defaults to `null),
602
     * or rejects with the `Throwable` of the `error` event.
603
     * When the command is done, the driver must check itself back into the client.
604
     * @param \Plasma\ClientInterface   $client
605
     * @param \Plasma\CommandInterface  $command
606
     * @return \React\Promise\PromiseInterface
607
     */
608 6
    function runCommand(\Plasma\ClientInterface $client, \Plasma\CommandInterface $command) {
609 6
        if($this->goingAway) {
610 1
            return \React\Promise\reject((new \Plasma\Exception('Connection is going away')));
611
        }
612
        
613
        return (new \React\Promise\Promise(function (callable $resolve, callable $reject) use (&$client, &$command) {
614
            $command->once('end', function ($value = null) use (&$client, &$resolve) {
615 3
                if(!$this->transaction) {
616 3
                    $client->checkinConnection($this);
617
                }
618
                
619 3
                $resolve($value);
620 5
            });
621
            
622
            $command->once('error', function (\Throwable $error) use (&$client, &$reject) {
623 2
                if(!$this->transaction) {
624 2
                    $client->checkinConnection($this);
625
                }
626
                
627 2
                $reject($error);
628 5
            });
629
            
630 5
            $this->executeCommand($command);
631 5
        }));
632
    }
633
    
634
    /**
635
     * Executes a command.
636
     * @param \Plasma\CommandInterface  $command
637
     * @return void
638
     * @internal
639
     */
640 54
    function executeCommand(\Plasma\CommandInterface $command): void {
641 54
        $this->queue[] = $command;
642
        //\assert((\Plasma\Drivers\MySQL\Messages\MessageUtility::debug('Command '.get_class($command).' added to queue') || true));
643
        
644 54
        if($this->parser && $this->busy === static::STATE_IDLE) {
645
            //\assert((\Plasma\Drivers\MySQL\Messages\MessageUtility::debug('Command '.get_class($command).' invoked into parser') || true));
646 53
            $this->parser->invokeCommand($this->getNextCommand());
647
        }
648 54
    }
649
    
650
    /**
651
     * Get the handshake message, or null if none received yet.
652
     * @return \Plasma\Drivers\MySQL\Messages\HandshakeMessage|null
653
     */
654 53
    function getHandshake(): ?\Plasma\Drivers\MySQL\Messages\HandshakeMessage {
655 53
        if($this->parser) {
656 53
            return $this->parser->getHandshakeMessage();
657
        }
658
        
659
        return null;
660
    }
661
    
662
    /**
663
     * Get the next command, or null.
664
     * @return \Plasma\CommandInterface|null
665
     * @internal
666
     */
667 53
    function getNextCommand(): ?\Plasma\CommandInterface {
668 53
        if(\count($this->queue) === 0) {
669 53
            if($this->goingAway) {
670
                $this->goingAway->resolve();
671
            }
672
            
673 53
            return null;
674 53
        } elseif($this->busy === static::STATE_BUSY) {
675
            return null;
676
        }
677
        
678
        /** @var \Plasma\CommandInterface  $command */
679 53
        $command =  \array_shift($this->queue);
680
        
681
        //\assert((\Plasma\Drivers\MySQL\Messages\MessageUtility::debug('Unshifted command '.get_class($command)) || true));
682
        
683 53
        if($command->waitForCompletion()) {
684 53
            $this->busy = static::STATE_BUSY;
685
            
686
            $command->once('error', function () use (&$command) {
687
                //\assert((\Plasma\Drivers\MySQL\Messages\MessageUtility::debug('Command '.get_class($command).' errored') || true));
688 2
                $this->busy = static::STATE_IDLE;
689
                
690 2
                $this->endCommand();
691 53
            });
692
            
693
            $command->once('end', function () use (&$command) {
694
                //\assert((\Plasma\Drivers\MySQL\Messages\MessageUtility::debug('Command '.get_class($command).' ended') || true));
695 53
                $this->busy = static::STATE_IDLE;
696
                
697 53
                $this->endCommand();
698 53
            });
699
        } else {
700 35
            $this->endCommand();
701
        }
702
        
703 53
        return $command;
704
    }
705
    
706
    /**
707
     * Finishes up a command.
708
     * @return void
709
     */
710 53
    protected function endCommand() {
711
        $this->loop->futureTick(function () {
712 53
            if($this->goingAway && \count($this->queue) === 0) {
713 1
                return $this->goingAway->resolve();
714
            }
715
            
716 53
            $this->parser->invokeCommand($this->getNextCommand());
717 53
        });
718 53
    }
719
    
720
    /**
721
     * Starts the handshake process.
722
     * @param array                    $credentials
723
     * @param \React\Promise\Deferred  $deferred
724
     * @return void
725
     */
726 56
    protected function startHandshake(array $credentials, \React\Promise\Deferred $deferred) {
727
        $listener = function (\Plasma\Drivers\MySQL\Messages\MessageInterface $message) use ($credentials, &$deferred, &$listener) {
728 56
            if($message instanceof \Plasma\Drivers\MySQL\Messages\HandshakeMessage) {
729 56
                $this->parser->removeListener('message', $listener);
730
                
731 56
                $this->connectionState = static::CONNECTION_SETENV;
732 56
                $clientFlags = \Plasma\Drivers\MySQL\ProtocolParser::CLIENT_CAPABILITIES;
733
                
734 56
                \extract($credentials);
735
                
736 56
                if($db !== '') {
737 43
                    $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_CONNECT_WITH_DB;
738
                }
739
                
740 56
                if($this->charset === null) {
741 2
                    $this->charset = \Plasma\Drivers\MySQL\CharacterSetFlags::CHARSET_MAP[$message->characterSet] ?? 'latin1';
742
                }
743
                
744
                // Check if we support auth plugins
745 56
                $plugins = \Plasma\Drivers\MySQL\DriverFactory::getAuthPlugins();
746 56
                $plugin = null;
747
                
748 56
                foreach($plugins as $key => $plug) {
749 56
                    if(\is_int($key) && ($message->capability & $key) !== 0) {
750 56
                        $plugin = $plug;
751 56
                        $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_PLUGIN_AUTH;
752 56
                        break;
753
                    } elseif($key === $message->authPluginName) {
754
                        $plugin = $plug;
755
                        $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_PLUGIN_AUTH;
756
                        break;
757
                    }
758
                }
759
                
760 56
                $remote = \parse_url($this->connection->getRemoteAddress())['host'];
761
                
762 56
                if($remote !== '127.0.0.1' || $this->options['tls.forceLocal']) {
763
                    if(($message->capability & \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_SSL) !== 0) { // If SSL supported, connect through SSL
764
                        $clientFlags |= \Plasma\Drivers\MySQL\CapabilityFlags::CLIENT_SSL;
765
                        
766
                        $ssl = new \Plasma\Drivers\MySQL\Commands\SSLRequestCommand($message, $clientFlags);
767
                        
768
                        $ssl->once('end', function () use ($credentials, $clientFlags, $plugin, &$deferred, &$message) {
769
                            $this->connectionState = static::CONNECTION_SSL_STARTUP;
770
                            
771
                            $this->enableTLS()->then(function () use ($credentials, $clientFlags, $plugin, &$deferred, &$message) {
772
                                $this->createHandshakeResponse($message, $credentials, $clientFlags, $plugin, $deferred);
773
                            }, function (\Throwable $error) use (&$deferred) {
774
                                $deferred->reject($$error);
775
                                $this->connection->close();
776
                            });
777
                        });
778
                        
779
                        return $this->parser->invokeCommand($ssl);
780
                    } elseif($this->options['tls.force'] || $this->options['tls.forceLocal']) {
781
                        $deferred->reject((new \Plasma\Exception('TLS is not supported by the server')));
782
                        $this->connection->close();
783
                        return;
784
                    }
785
                }
786
                
787 56
                $this->createHandshakeResponse($message, $credentials, $clientFlags, $plugin, $deferred);
788
            }
789 56
        };
790
        
791 56
        $this->parser->on('message', $listener);
792
        
793
        $this->parser->on('message', function (\Plasma\Drivers\MySQL\Messages\MessageInterface $message) {
794 56
            if($message instanceof \Plasma\Drivers\MySQL\Messages\OkResponseMessage) {
795 55
                $this->connectionState = static::CONNECTION_OK;
796
            }
797
            
798 56
            $this->emit('eventRelay', array('message', $message));
799 56
        });
800 56
    }
801
    
802
    /**
803
     * Enables TLS on the connection.
804
     * @return \React\Promise\PromiseInterface
805
     */
806
    protected function enableTLS(): \React\Promise\PromiseInterface {
807
        // Set required SSL/TLS context options
808
        foreach($this->options['tls.context'] as $name => $value) {
809
            \stream_context_set_option($this->connection->stream, 'ssl', $name, $value);
810
        }
811
        
812
        return $this->encryption->enable($this->connection)->then(null, function (\Throwable $error) {
813
            $this->connection->close();
814
            throw new \RuntimeException('Connection failed during TLS handshake: '.$error->getMessage(), $error->getCode());
815
        });
816
    }
817
    
818
    /**
819
     * Sends the auth command.
820
     * @param \Plasma\Drivers\MySQL\Messages\HandshakeMessage  $message
821
     * @param array                                            $credentials
822
     * @param int                                              $clientFlags
823
     * @param string|null                                      $plugin
824
     * @param \React\Promise\Deferred                          $deferred
825
     * @return void
826
     */
827 56
    protected function createHandshakeResponse(
828
        \Plasma\Drivers\MySQL\Messages\HandshakeMessage $message, array $credentials, int $clientFlags, ?string $plugin, \React\Promise\Deferred $deferred
829
    ) {
830 56
        \extract($credentials);
831
        
832 56
        $auth = new \Plasma\Drivers\MySQL\Commands\HandshakeResponseCommand($this->parser, $message, $clientFlags, $plugin, $user, $password, $db);
833
        
834
        $auth->once('end', function () use (&$deferred) {
835
            $this->loop->futureTick(function () use (&$deferred) {
836 55
                $deferred->resolve();
837 55
            });
838 56
        });
839
        
840
        $auth->once('error', function (\Throwable $error) use (&$deferred) {
841 1
            $deferred->reject($error);
842 1
            $this->connection->close();
843 56
        });
844
        
845 56
        if($plugin) {
846
            $listener = function (\Plasma\Drivers\MySQL\Messages\MessageInterface $message) use ($password, &$deferred, &$listener) {
847
                /** @var \Plasma\Drivers\MySQL\AuthPlugins\AuthPluginInterface|null  $plugin */
848 56
                static $plugin;
849
                
850 56
                if($message instanceof \Plasma\Drivers\MySQL\Messages\AuthSwitchRequestMessage) {
851
                    $name = $message->authPluginName;
852
                    
853
                    if($name !== null) {
854
                        $plugins = \Plasma\Drivers\MySQL\DriverFactory::getAuthPlugins();
855
                        foreach($plugins as $key => $plug) {
856
                            if($key === $name) {
857
                                $plugin = new $plug($this->parser, $this->parser->getHandshakeMessage());
858
                                
859
                                $command = new \Plasma\Drivers\MySQL\Commands\AuthSwitchResponseCommand($message, $plugin, $password);
860
                                return $this->parser->invokeCommand($command);
861
                            }
862
                        }
863
                    }
864
                    
865
                    $deferred->reject((new \Plasma\Exception('Requested authentication method '.($name ? '"'.$name.'" ' : '').'is not supported')));
866 56
                } elseif($message instanceof \Plasma\Drivers\MySQL\Messages\AuthMoreDataMessage) {
867
                    if($plugin === null) {
868
                        $deferred->reject((new \Plasma\Exception('No auth plugin is in use, but we received auth more data packet')));
869
                        return $this->connection->close();
870
                    }
871
                    
872
                    try {
873
                        $command = $plugin->receiveMoreData($message);
874
                        return $this->parser->invokeCommand($command);
875
                    } catch (\Plasma\Exception $e) {
876
                        $deferred->reject($e);
877
                        $this->connection->close();
878
                    }
879 56
                } elseif($message instanceof \Plasma\Drivers\MySQL\Messages\OkResponseMessage) {
880 55
                    $this->parser->removeListener('message', $listener);
881
                }
882 56
            };
883
            
884 56
            $this->parser->on('message', $listener);
885
        }
886
        
887 56
        $this->parser->invokeCommand($auth);
888 56
        $this->connectionState = static::CONNECTION_AWAITING_RESPONSE;
889 56
    }
890
    
891
    /**
892
     * Get the real charset from the DB charset.
893
     * @param string  $charset
894
     * @return string
895
     */
896 4
    protected function getRealCharset(string $charset): string {
897 4
        if(\substr($charset, 0, 4) === 'utf8') {
898 2
            return 'UTF-8';
899
        }
900
        
901 2
        $charsets = \mb_list_encodings();
902
        
903 2
        foreach($charsets as $set) {
904 2
            if(\stripos($set, $charset) === 0) {
905 2
                return $set;
906
            }
907
        }
908
        
909 2
        return 'UTF-8';
910
    }
911
    
912
    /**
913
     * Validates the given options.
914
     * @param array  $options
915
     * @return void
916
     * @throws \InvalidArgumentException
917
     */
918 80
    protected function validateOptions(array $options) {
919 80
        $validator = \CharlotteDunois\Validation\Validator::make($options, array(
920 80
            'characters.set' => 'string',
921
            'characters.collate' => 'string',
922
            'connector' => 'class:\React\Socket\ConnectorInterface=object',
923
            'tls.context' => 'array',
924
            'tls.force' => 'boolean',
925
            'tls.forceLocal' => 'boolean'
926
        ));
927
        
928 80
        $validator->throw(\InvalidArgumentException::class);
929 80
    }
930
}
931