GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Passed
Pull Request — master (#12)
by Charlotte
02:02
created

Client::createNewConnection()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 33
Code Lines 19

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 16
CRAP Score 1.0039

Importance

Changes 0
Metric Value
cc 1
eloc 19
nc 1
nop 0
dl 0
loc 33
ccs 16
cts 19
cp 0.8421
crap 1.0039
rs 9.6333
c 0
b 0
f 0
1
<?php
2
/**
3
 * Plasma Core component
4
 * Copyright 2018-2019 PlasmaPHP, All Rights Reserved
5
 *
6
 * Website: https://github.com/PlasmaPHP
7
 * License: https://github.com/PlasmaPHP/core/blob/master/LICENSE
8
*/
9
10
namespace Plasma;
11
12
/**
13
 * The plasma client, responsible for pooling and connections.
14
 */
15
class Client implements ClientInterface {
16
    use \Evenement\EventEmitterTrait;
17
    
18
    /**
19
     * @var \Plasma\DriverFactoryInterface
20
     */
21
    protected $factory;
22
    
23
    /**
24
     * @var string
25
     */
26
    protected $uri;
27
    
28
    /**
29
     * @var array
30
     */
31
    protected $options = array(
32
        'connections.max' => 5,
33
        'connections.lazy' => false
34
    );
35
    
36
    /**
37
     * @var \React\Promise\PromiseInterface
38
     */
39
    protected $goingAway;
40
    
41
    /**
42
     * @var \CharlotteDunois\Collect\Set
43
     */
44
    protected $connections;
45
    
46
    /**
47
     * @var \CharlotteDunois\Collect\Set
48
     */
49
    protected $busyConnections;
50
    
51
    /**
52
     * Creates a client with the specified factory and options.
53
     *
54
     * Available options:
55
     * ```
56
     * array(
57
     *     'connections.max' => int, (the maximum amount of connections to open, defaults to 5)
58
     *     'connections.lazy' => bool, (whether the first connection should be established lazily (on first request), defaults to false)
59
     * )
60
     * ```
61
     *
62
     * @param \Plasma\DriverFactoryInterface  $factory
63
     * @param string                          $uri
64
     * @param array                           $options
65
     * @throws \InvalidArgumentException
66
     * @throws \InvalidArgumentException  The driver may throw this exception when invalid arguments (connect uri) were given, this may be thrown later when connecting lazy.
67
     */
68 57
    function __construct(\Plasma\DriverFactoryInterface $factory, string $uri, array $options = array()) {
69 57
        $this->validateOptions($options);
70
        
71 57
        $this->factory = $factory;
72 57
        $this->uri = $uri;
73 57
        $this->options = \array_merge($this->options, $options);
74
        
75 57
        $this->connections = new \CharlotteDunois\Collect\Set();
76 57
        $this->busyConnections = new \CharlotteDunois\Collect\Set();
77
        
78 57
        if(!$this->options['connections.lazy']) {
79 30
            $connection = $this->createNewConnection();
80 30
            if($connection->getConnectionState() !== \Plasma\DriverInterface::CONNECTION_OK) {
81
                $this->busyConnections->add($connection);
82
            }
83
        }
84 57
    }
85
    
86
    /**
87
     * Creates a client with the specified factory and options.
88
     * @param \Plasma\DriverFactoryInterface  $factory
89
     * @param string                          $uri
90
     * @param array                           $options
91
     * @return \Plasma\ClientInterface
92
     * @throws \Throwable  The client implementation may throw any exception during this operation.
93
     * @see Client::__construct()
94
     */
95 57
    static function create(\Plasma\DriverFactoryInterface $factory, string $uri, array $options = array()): \Plasma\ClientInterface {
96 57
        return (new static($factory, $uri, $options));
97
    }
98
    
99
    /**
100
     * Get the amount of connections.
101
     * @return int
102
     */
103 5
    function getConnectionCount(): int {
104 5
        return ($this->connections->count() + $this->busyConnections->count());
105
    }
106
    
107
    /**
108
     * Checks a connection back in, if usable and not closing.
109
     * @param \Plasma\DriverInterface  $driver
110
     * @return void
111
     */
112 16
    function checkinConnection(\Plasma\DriverInterface $driver): void {
113 16
        if($driver->getConnectionState() !== \Plasma\DriverInterface::CONNECTION_UNUSABLE && !$this->goingAway) {
114 7
            $this->connections->add($driver);
115 7
            $this->busyConnections->delete($driver);
116
        }
117 16
    }
118
    
119
    /**
120
     * Begins a transaction. Resolves with a `Transaction` instance.
121
     *
122
     * Checks out a connection until the transaction gets committed or rolled back. If the transaction goes out of scope
123
     * and thus deallocated, the `Transaction` must check the connection back into the client.
124
     *
125
     * Some databases, including MySQL, automatically issue an implicit COMMIT when a database definition language (DDL)
126
     * statement such as DROP TABLE or CREATE TABLE is issued within a transaction.
127
     * The implicit COMMIT will prevent you from rolling back any other changes within the transaction boundary.
128
     * @param int  $isolation  See the `TransactionInterface` constants.
129
     * @return \React\Promise\PromiseInterface
130
     * @throws \Plasma\Exception
131
     * @see \Plasma\Transaction
132
     */
133 3
    function beginTransaction(int $isolation = \Plasma\TransactionInterface::ISOLATION_COMMITTED): \React\Promise\PromiseInterface {
134 3
        if($this->goingAway) {
135 1
            return \React\Promise\reject((new \Plasma\Exception('Client is closing all connections')));
136
        }
137
        
138 2
        $connection = $this->getOptimalConnection();
139
        
140
        return $connection->beginTransaction($this, $isolation)->then(null, function (\Throwable $error) use (&$connection) {
141 1
            $this->checkinConnection($connection);
142 1
            throw $error;
143 2
        });
144
    }
145
    
146
    /**
147
     * Executes a plain query. Resolves with a `QueryResult` instance.
148
     * @param string  $query
149
     * @return \React\Promise\PromiseInterface
150
     * @see \Plasma\QueryResultInterface
151
     */
152 5
    function query(string $query): \React\Promise\PromiseInterface {
153 5
        if($this->goingAway) {
154 1
            return \React\Promise\reject((new \Plasma\Exception('Client is closing all connections')));
155
        }
156
        
157 4
        $connection = $this->getOptimalConnection();
158
        
159
        return $connection->query($this, $query)->then(null, function (\Throwable $error) use (&$connection) {
160 1
            $this->checkinConnection($connection);
161 1
            throw $error;
162 4
        });
163
    }
164
    
165
    /**
166
     * Prepares a query. Resolves with a `StatementInterface` instance.
167
     * @param string  $query
168
     * @return \React\Promise\PromiseInterface
169
     * @see \Plasma\StatementInterface
170
     */
171 4
    function prepare(string $query): \React\Promise\PromiseInterface {
172 4
        if($this->goingAway) {
173 1
            return \React\Promise\reject((new \Plasma\Exception('Client is closing all connections')));
174
        }
175
        
176 3
        $connection = $this->getOptimalConnection();
177
        
178
        return $connection->prepare($this, $query)->then(null, function (\Throwable $error) use (&$connection) {
179 1
            $this->checkinConnection($connection);
180 1
            throw $error;
181 3
        });
182
    }
183
    
184
    /**
185
     * Prepares and executes a query. Resolves with a `QueryResultInterface` instance.
186
     * This is equivalent to prepare -> execute -> close.
187
     * If you need to execute a query multiple times, prepare the query manually for performance reasons.
188
     * @param string  $query
189
     * @param array   $params
190
     * @return \React\Promise\PromiseInterface
191
     * @throws \Plasma\Exception
192
     * @see \Plasma\StatementInterface
193
     */
194 3
    function execute(string $query, array $params = array()): \React\Promise\PromiseInterface {
195 3
        if($this->goingAway) {
196 1
            return \React\Promise\reject((new \Plasma\Exception('Client is closing all connections')));
197
        }
198
        
199 2
        $connection = $this->getOptimalConnection();
200
        
201
        return $connection->execute($this, $query, $params)->then(function ($value) use (&$connection) {
202 1
            $this->checkinConnection($connection);
203 1
            return $value;
204
        }, function (\Throwable $error) use (&$connection) {
205 1
            $this->checkinConnection($connection);
206 1
            throw $error;
207 2
        });
208
    }
209
    
210
    /**
211
     * Quotes the string for use in the query.
212
     * @param string  $str
213
     * @param int     $type  For types, see the driver interface constants.
214
     * @return string
215
     * @throws \LogicException    Thrown if the driver does not support quoting.
216
     * @throws \Plasma\Exception  Thrown if the client is closing all connections.
217
     */
218 2
    function quote(string $str, int $type = \Plasma\DriverInterface::QUOTE_TYPE_VALUE): string {
219 2
        if($this->goingAway) {
220 1
            throw new \Plasma\Exception('Client is closing all connections');
221
        }
222
        
223 1
        $connection = $this->getOptimalConnection();
224
        
225
        try {
226 1
            $quoted = $connection->quote($str, $type);
227
        } catch (\Throwable $e) {
228
            $this->checkinConnection($connection);
229
            throw $e;
230
        }
231
        
232 1
        return $quoted;
233
    }
234
    
235
    /**
236
     * Closes all connections gracefully after processing all outstanding requests.
237
     * @return \React\Promise\PromiseInterface
238
     */
239 3
    function close(): \React\Promise\PromiseInterface {
240 3
        if($this->goingAway) {
241 1
            return $this->goingAway;
242
        }
243
        
244 2
        $deferred = new \React\Promise\Deferred();
245 2
        $this->goingAway = $deferred->promise();
246
        
247 2
        $closes = array();
248
        
249
        /** @var \Plasma\DriverInterface  $conn */
250 2
        foreach($this->connections->all() as $conn) {
251 1
            $closes[] = $conn->close();
252 1
            $this->connections->delete($conn);
253
        }
254
        
255
        /** @var \Plasma\DriverInterface  $conn */
256 2
        foreach($this->busyConnections->all() as $conn) {
257 1
            $closes[] = $conn->close();
258 1
            $this->busyConnections->delete($conn);
259
        }
260
        
261 2
        \React\Promise\all($closes)->then(array($deferred, 'resolve'), array($deferred, 'reject'));
262 2
        return $this->goingAway;
263
    }
264
    
265
    /**
266
     * Forcefully closes the connection, without waiting for any outstanding requests. This will reject all oustanding requests.
267
     * @return void
268
     */
269 11
    function quit(): void {
270 11
        if($this->goingAway) {
271 1
            return;
272
        }
273
        
274 10
        $this->goingAway = \React\Promise\resolve();
275
        
276
        /** @var \Plasma\DriverInterface  $conn */
277 10
        foreach($this->connections->all() as $conn) {
278 9
            $conn->quit();
279 9
            $this->connections->delete($conn);
280
        }
281
        
282
        /** @var \Plasma\DriverInterface  $conn */
283 10
        foreach($this->busyConnections->all() as $conn) {
284 1
            $conn->quit();
285 1
            $this->busyConnections->delete($conn);
286
        }
287 10
    }
288
    
289
    /**
290
     * Runs the given command.
291
     * @param \Plasma\CommandInterface  $command
292
     * @return mixed  Return depends on command and driver.
293
     * @throws \Plasma\Exception  Thrown if the client is closing all connections.
294
     */
295 3
    function runCommand(\Plasma\CommandInterface $command) {
296 3
        if($this->goingAway) {
297 1
            throw new \Plasma\Exception('Client is closing all connections');
298
        }
299
        
300 2
        $connection = $this->getOptimalConnection();
301
        
302
        try {
303 2
            return $connection->runCommand($this, $command);
304
        } catch (\Throwable $e) {
305
            $this->checkinConnection($connection);
306
            throw $e;
307
        }
308
    }
309
    
310
    /**
311
     * Runs the given querybuilder on an underlying driver instance.
312
     * The driver CAN throw an exception if the given querybuilder is not supported.
313
     * An example would be a SQL querybuilder and a Cassandra driver.
314
     * @param \Plasma\QueryBuilderInterface  $query
315
     * @return \React\Promise\PromiseInterface
316
     * @throws \Plasma\Exception
317
     */
318 2
    function runQuery(\Plasma\QueryBuilderInterface $query): \React\Promise\PromiseInterface {
319 2
        if($this->goingAway) {
320 1
            return \React\Promise\reject((new \Plasma\Exception('Client is closing all connections')));
321
        }
322
        
323 1
        $connection = $this->getOptimalConnection();
324
        
325
        try {
326 1
            return $connection->runQuery($this, $query);
327
        } catch (\Throwable $e) {
328
            $this->checkinConnection($connection);
329
            throw $e;
330
        }
331
    }
332
    
333
    /**
334
     * Creates a new cursor to seek through SELECT query results.
335
     * @param string                   $query
336
     * @param array                    $params
337
     * @return \React\Promise\PromiseInterface
338
     * @throws \Plasma\Exception
339
     */
340 2
    function createReadCursor(string $query, array $params = array()): \React\Promise\PromiseInterface {
341 2
        if($this->goingAway) {
342 1
            return \React\Promise\reject((new \Plasma\Exception('Client is closing all connections')));
343
        }
344
        
345 1
        $connection = $this->getOptimalConnection();
346
        
347
        try {
348 1
            return $connection->createReadCursor($this, $query, $params);
349
        } catch (\Throwable $e) {
350
            $this->checkinConnection($connection);
351
            throw $e;
352
        }
353
    }
354
    
355
    /**
356
     * Get the optimal connection.
357
     * @return \Plasma\DriverInterface
358
     */
359 16
    protected function getOptimalConnection(): \Plasma\DriverInterface {
360 16
        if(\count($this->connections) === 0 && \count($this->busyConnections) < $this->options['connections.max']) {
361 1
            $connection = $this->createNewConnection();
362 1
            $this->busyConnections->add($connection);
363
            
364 1
            return $connection;
365
        }
366
        
367
        /** @var \Plasma\DriverInterface  $connection */
368 15
        $this->connections->rewind();
369 15
        $connection = $this->connections->current();
370
        
371 15
        $backlog = $connection->getBacklogLength();
372 15
        $state = $connection->getBusyState();
373
        
374
        /** @var \Plasma\DriverInterface  $conn */
375 15
        foreach($this->connections as $conn) {
376 15
            $cbacklog = $conn->getBacklogLength();
377 15
            $cstate = $conn->getBusyState();
378
            
379 15
            if($cbacklog === 0 && $conn->getConnectionState() === \Plasma\DriverInterface::CONNECTION_OK && $cstate == \Plasma\DriverInterface::STATE_IDLE) {
380 14
                $this->connections->delete($conn);
381 14
                $this->busyConnections->add($conn);
382
                
383 14
                return $conn;
384
            }
385
            
386 1
            if($backlog > $cbacklog || $state > $cstate) {
387
                $connection = $conn;
388
                $backlog = $cbacklog;
389 1
                $state = $cstate;
390
            }
391
        }
392
        
393 1
        if($this->getConnectionCount() < $this->options['connections.max']) {
394 1
            $connection = $this->createNewConnection();
395
        }
396
        
397 1
        $this->connections->delete($connection);
398 1
        $this->busyConnections->add($connection);
399
        
400 1
        return $connection;
401
    }
402
    
403
    /**
404
     * Create a new connection.
405
     * @return \Plasma\DriverInterface
406
     */
407 31
    protected function createNewConnection(): \Plasma\DriverInterface {
408 31
        $connection = $this->factory->createDriver();
409
        
410
        // We relay a driver's specific events forward, e.g. PostgreSQL notifications
411
        $connection->on('eventRelay', function (string $eventName, ...$args) use (&$connection) {
412 1
            $args[] = $connection;
413 1
            $this->emit($eventName, $args);
414 31
        });
415
        
416
        $connection->on('close', function () use (&$connection) {
417 2
            $this->connections->delete($connection);
418 2
            $this->busyConnections->delete($connection);
419
            
420 2
            $this->emit('close', array($connection));
421 31
        });
422
        
423
        $connection->on('error', function (\Throwable $error) use (&$connection) {
424 1
            $this->emit('error', array($error, $connection));
425 31
        });
426
        
427
        $connection->connect($this->uri)->then(function () use (&$connection) {
428 31
            $this->connections->add($connection);
429 31
            $this->busyConnections->delete($connection);
430
            
431 31
            $this->emit('newConnection', array($connection));
432
        }, function (\Throwable $error) use (&$connection) {
433
            $this->connections->delete($connection);
434
            $this->busyConnections->delete($connection);
435
            
436
            $this->emit('error', array($error, $connection));
437 31
        });
438
        
439 31
        return $connection;
440
    }
441
    
442
    /**
443
     * Validates the given options.
444
     * @param array  $options
445
     * @return void
446
     * @throws \InvalidArgumentException
447
     */
448 57
    protected function validateOptions(array $options) {
449 57
        $validator = \CharlotteDunois\Validation\Validator::make($options, array(
450 57
            'connections.max' => 'integer|min:1',
451
            'connections.lazy' => 'boolean'
452
        ));
453
        
454 57
        $validator->throw(\InvalidArgumentException::class);
455 57
    }
456
}
457