Passed
Pull Request — master (#29)
by Sébastien
08:57
created

ShardingConnection::__construct()   A

Complexity

Conditions 3
Paths 3

Size

Total Lines 20
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 10
CRAP Score 3.0416

Importance

Changes 0
Metric Value
eloc 11
dl 0
loc 20
ccs 10
cts 12
cp 0.8333
rs 9.9
c 0
b 0
f 0
cc 3
nc 3
nop 4
crap 3.0416
1
<?php
2
3
namespace Bdf\Prime\Sharding;
4
5
use Bdf\Prime\Connection\SimpleConnection;
6
use Bdf\Prime\Connection\SubConnectionManagerInterface;
7
use Bdf\Prime\Exception\ShardingException;
8
use Bdf\Prime\Query\Compiler\Preprocessor\PreprocessorInterface;
9
use Bdf\Prime\Query\Contract\Query\InsertQueryInterface;
10
use Bdf\Prime\Query\Contract\Query\KeyValueQueryInterface;
11
use Bdf\Prime\Query\Factory\DefaultQueryFactory;
12
use Bdf\Prime\Sharding\Query\ShardingInsertQuery;
13
use Bdf\Prime\Sharding\Query\ShardingKeyValueQuery;
14
use Doctrine\Common\EventManager;
15
use Doctrine\DBAL\Cache\QueryCacheProfile;
16
use Doctrine\DBAL\Configuration;
17
use Doctrine\DBAL\Driver;
18
use LogicException;
19
20
/**
21
 * ShardingConnection
22
 *
23
 * The sharding connection is a global connection (that can be a shard server) and a collection of connection wrappers for shards.
24
 *
25
 * Those methods will be used by the global connection:
26
 *    ShardingConnection#prepare    Will connect on global but should execute on shard if one is selected
27
 *    ShardingConnection#quote
28
 *    ShardingConnection#errorCode
29
 *    ShardingConnection#errorInfo
30
 *    ShardingConnection#ping
31
 *
32
 * Can connect the global connection if no shard has been selected:
33
 *    ShardingConnection#lastInsertId
34
 *    ShardingConnection#lastInsertId
35
 *
36
 * Be aware!!
37
 * This connection does not managed the distribution key. If a SQL INSERT is executed without shard selection
38
 * the SQL will be executed on each shards.
39
 *
40
 * The shard to use can be auto guess if query builder is used.
41
 *
42
 * The method MultiStatement#fetchColumn change the interface (it will return a array of result)
43
 * All aggregate function executed on each shard will return a collection of result.
44
 * The merge should be done outside this class
45
 *
46
 * @example:
47
 * // returns the count result of the first shard
48
 * $connection->from('test')->count());
49
 *
50
 * // returns an array containing all count result of each shards
51
 * $connection->query('select count(*) from test')->fetchColumn();
52
 *
53
 * @package Bdf\Prime\Sharding
54
 */
55
class ShardingConnection extends SimpleConnection implements SubConnectionManagerInterface
56
{
57
    /**
58
     * All shard connections
59
     *
60
     * @var SimpleConnection[]
61
     */
62
    private $connections = [];
63
    
64
    /**
65
     * The shard choser
66
     * 
67
     * @var ShardChoserInterface
68
     */
69
    private $shardChoser;
70
71
    /**
72
     * The id of current shard. Null means all shards
73
     *
74
     * @var string
75
     */
76
    private $currentShardId;
77
78
    /**
79
     * The distribution key
80
     *
81
     * @var string
82
     */
83
    private $distributionKey;
84
85
    /**
86
     * Initializes a new instance of the Connection class.
87
     * 
88
     * Here's a shard connections configuration
0 ignored issues
show
introduced by
Doc comment long description must end with a full stop
Loading history...
89
     * 
90
     * @example
91
     *
92
     * $conn = DriverManager::getConnection([
93
     *    'driver' => 'pdo_mysql',
94
     *    'user'     => 'user',
95
     *    'password' => 'password',
96
     *    'host'     => '127.0.0.1',
97
     *    'dbname'   => 'basename',
98
     *    'distributionKey' => 'id',
99
     *    'shards' => [
100
     *      '{shardId}' => [
101
     *        'user'     => 'shard1',
102
     *        'host'     => '...',
103
     *      ]
104
     *    ]
105
     * ]);
106
     *
107
     * @param array                              $params       The connection parameters.
0 ignored issues
show
Coding Style introduced by
Parameter tags must be defined first in a doc comment
Loading history...
108
     * @param \Doctrine\DBAL\Driver              $driver       The driver to use.
109
     * @param \Doctrine\DBAL\Configuration|null  $config       The configuration, optional.
110
     * @param \Doctrine\Common\EventManager|null $eventManager The event manager, optional.
111
     */
0 ignored issues
show
Coding Style Documentation introduced by
Missing @throws tag in function comment
Loading history...
112 79
    public function __construct(array $params, Driver $driver, Configuration $config = null, EventManager $eventManager = null)
113
    {
114 79
        if (!isset($params['shard_connections'])) {
115
            throw new LogicException('Sharding connection needs "shard_connections" configuration in parameters');
116
        }
0 ignored issues
show
Coding Style introduced by
No blank line found after control structure
Loading history...
117 79
        if (!isset($params['distributionKey'])) {
118
            throw new LogicException('Sharding connection needs distribution key in parameters');
119
        }
120
121 79
        $this->distributionKey = $params['distributionKey'];
122 79
        $this->shardChoser = $params['shardChoser'] ?? new ModuloChoser();
0 ignored issues
show
Coding Style introduced by
Operation must be bracketed
Loading history...
123 79
        $this->connections = $params['shard_connections'];
124
125 79
        parent::__construct($params, $driver, $config, $eventManager);
126
127
        /** @var DefaultQueryFactory $queryFactory */
0 ignored issues
show
Coding Style introduced by
The open comment tag must be the only content on the line
Loading history...
Coding Style introduced by
The close comment tag must be the only content on the line
Loading history...
Coding Style introduced by
Block comments must be started with /*
Loading history...
Coding Style introduced by
Inline doc block comments are not allowed; use "/* Comment */" or "// Comment" instead
Loading history...
128 79
        $queryFactory = $this->factory();
129
130 79
        $queryFactory->alias(InsertQueryInterface::class, ShardingInsertQuery::class);
131 79
        $queryFactory->alias(KeyValueQueryInterface::class, ShardingKeyValueQuery::class);
132 79
    }
133
134
    /**
135
     * Get the shard ids
136
     *
137
     * @return array
138
     */
139 65
    public function getShardIds()
140
    {
141 65
        return array_keys($this->connections);
142
    }
143
144
    /**
145
     * Get the shard choser
146
     *
147
     * @return ShardChoserInterface
148
     */
149 46
    public function getShardChoser()
150
    {
151 46
        return $this->shardChoser;
152
    }
153
154
    /**
155
     * Get the distribution key
156
     *
157
     * @return string
158
     */
159 64
    public function getDistributionKey()
160
    {
161 64
        return $this->distributionKey;
162
    }
163
164
    /**
165
     * Get the current shard
166
     *
167
     * @return string
168
     */
169 47
    public function getCurrentShardId()
170
    {
171 47
        return $this->currentShardId;
172
    }
173
174
    /**
175
     * Select a shard to use.
176
     *
177
     * @param mixed $distributionValue
178
     *
179
     * @return $this
180
     */
181 34
    public function pickShard($distributionValue = null)
182
    {
183 34
        $this->useShard(
184 34
            $distributionValue !== null
185 32
                ? $this->shardChoser->pick($distributionValue, $this)
0 ignored issues
show
Coding Style introduced by
Inline shorthand IF statement must be declared on a single line
Loading history...
186 34
                : null
187
        );
188
189 34
        return $this;
190
    }
191
192
    /**
193
     * Use a shard
194
     *
195
     * @param string $shardId
196
     *
197
     * @return $this
198
     *
199
     * @throws ShardingException   If the shard id is not known
0 ignored issues
show
introduced by
@throws comment must be on the next line
Loading history...
introduced by
@throws tag comment must end with a full stop
Loading history...
200
     */
201 48
    public function useShard($shardId = null)
202
    {
203 48
        if ($shardId !== null && !isset($this->connections[$shardId])) {
204
            throw ShardingException::unknown($shardId);
205
        }
206
207 48
        $this->currentShardId = $shardId;
208
209 48
        return $this;
210
    }
211
212
    /**
213
     * Check whether the connection is using a shard
214
     *
215
     * @return boolean
0 ignored issues
show
introduced by
Expected "bool" but found "boolean" for function return type
Loading history...
216
     */
217 79
    public function isUsingShard()
218
    {
219 79
        return $this->currentShardId !== null;
220
    }
221
222
    /**
223
     * Get a shard connection by its id
224
     * Returns all connection if id is null
225
     *
226
     * @param null|string|int $shardId
0 ignored issues
show
Coding Style introduced by
Expected "null|string|integer" but found "null|string|int" for parameter type
Loading history...
227
     *
228
     * @return SimpleConnection[]|SimpleConnection
229
     *
230
     * @psalm-param S $shardId
231
     * @psalm-return (S is null ? SimpleConnection[] : SimpleConnection)
232
     * @template S as null|array-key
233
     *
234
     * @throws ShardingException   If the shard id is not known
0 ignored issues
show
introduced by
@throws comment must be on the next line
Loading history...
introduced by
@throws tag comment must end with a full stop
Loading history...
235
     */
236 63
    public function getShardConnection($shardId = null)
237
    {
238 63
        if ($shardId === null) {
239 2
            return $this->connections;
240
        }
241
242 61
        if (!isset($this->connections[$shardId])) {
243 1
            throw ShardingException::unknown($shardId);
244
        }
245
246 60
        return $this->connections[$shardId];
247
    }
248
249
    /**
0 ignored issues
show
Coding Style introduced by
Parameter $name should have a doc-comment as per coding-style.
Loading history...
250
     * {@inheritdoc}
251
     */
252 34
    public function getConnection($name)
253
    {
254 34
        return $this->getShardConnection($name);
255
    }
256
257
    /**
258
     * Get the selected shards
259
     *
260
     * @return SimpleConnection[]
261
     */
262 79
    protected function getSelectedShards()
263
    {
264 79
        if ($this->isUsingShard()) {
265 30
            return [$this->connections[$this->currentShardId]];
266
        }
267
268 79
        return $this->connections;
269
    }
270
271
    /**
272
     * Get the selected shard
273
     *
274
     * @return SimpleConnection
275
     */
276 10
    protected function getSelectedShard()
277
    {
278 10
        return $this->connections[$this->currentShardId];
279
    }
280
281
    /**
282
     * {@inheritdoc}
283
     */
284 1
    public function close()
285
    {
286 1
        parent::close();
287
288 1
        $this->currentShardId = null;
289
290 1
        foreach ($this->connections as $shard) {
291 1
            $shard->close();
292
        }
293 1
    }
294
295
    /**
0 ignored issues
show
Coding Style introduced by
Parameter $preprocessor should have a doc-comment as per coding-style.
Loading history...
296
     * {@inheritdoc}
297
     */
298 36
    public function builder(PreprocessorInterface $preprocessor = null)
299
    {
300 36
        return $this->factory()->make(ShardingQuery::class, $preprocessor);
301
    }
302
303
    /**
0 ignored issues
show
Coding Style introduced by
Parameter $sql should have a doc-comment as per coding-style.
Loading history...
Coding Style introduced by
Parameter $params should have a doc-comment as per coding-style.
Loading history...
Coding Style introduced by
Parameter $types should have a doc-comment as per coding-style.
Loading history...
Coding Style introduced by
Parameter $qcp should have a doc-comment as per coding-style.
Loading history...
304
     * {@inheritdoc}
305
     */
306 79
    public function executeQuery($sql, array $params = [], $types = [], QueryCacheProfile $qcp = null)
307
    {
308 79
        if ($this->isUsingShard()) {
309 8
            return $this->getSelectedShard()->executeQuery($sql, $params, $types, $qcp);
310
        }
311
312 79
        $stmt = new MultiStatement();
313
314 79
        foreach ($this->getSelectedShards() as $shard) {
315 79
            $stmt->add($shard->executeQuery($sql, $params, $types, $qcp));
316
        }
317
318 79
        return $stmt;
319
    }
320
321
    /**
0 ignored issues
show
Coding Style introduced by
Parameter $sql should have a doc-comment as per coding-style.
Loading history...
Coding Style introduced by
Parameter $types should have a doc-comment as per coding-style.
Loading history...
Coding Style introduced by
Parameter $params should have a doc-comment as per coding-style.
Loading history...
322
     * {@inheritdoc}
323
     */
324 31
    public function executeUpdate($sql, array $params = [], array $types = [])
325
    {
326 31
        $result = 0;
327
328 31
        foreach ($this->getSelectedShards() as $shard) {
329 31
            $result += $shard->executeUpdate($sql, $params, $types);
330
        }
331
332 31
        return $result;
333
    }
334
335
    /**
336
     * {@inheritdoc}
337
     *
338
     * @psalm-suppress InvalidReturnType
339
     */
340 4
    public function query()
341
    {
342 4
        $args = func_get_args();
343
344 4
        if ($this->isUsingShard()) {
345
            return $this->getSelectedShard()->query(...$args);
346
        }
347
348 4
        $stmt = new MultiStatement();
349
350 4
        foreach ($this->getSelectedShards() as $shard) {
351 4
            $stmt->add($shard->query(...$args));
352
        }
353
354
        /** @psalm-suppress InvalidReturnStatement */
0 ignored issues
show
Coding Style introduced by
The open comment tag must be the only content on the line
Loading history...
Coding Style introduced by
The close comment tag must be the only content on the line
Loading history...
Coding Style introduced by
Block comments must be started with /*
Loading history...
Coding Style introduced by
Inline doc block comments are not allowed; use "/* Comment */" or "// Comment" instead
Loading history...
355 4
        return $stmt;
0 ignored issues
show
Bug Best Practice introduced by
The expression return $stmt returns the type Bdf\Prime\Sharding\MultiStatement which is incompatible with the return type mandated by Doctrine\DBAL\Driver\Connection::query() of Doctrine\DBAL\Driver\Statement.

In the issue above, the returned value is violating the contract defined by the mentioned interface.

Let's take a look at an example:

interface HasName {
    /** @return string */
    public function getName();
}

class Name {
    public $name;
}

class User implements HasName {
    /** @return string|Name */
    public function getName() {
        return new Name('foo'); // This is a violation of the ``HasName`` interface
                                // which only allows a string value to be returned.
    }
}
Loading history...
356
    }
357
358
    /**
0 ignored issues
show
Coding Style introduced by
Parameter $statement should have a doc-comment as per coding-style.
Loading history...
359
     * {@inheritdoc}
360
     */
361 79
    public function exec($statement)
362
    {
363 79
        $result = 0;
364
365 79
        foreach ($this->getSelectedShards() as $shard) {
366 79
            $result += $shard->exec($statement);
367
        }
368
369 79
        return $result;
370
    }
371
372
    /**
373
     * {@inheritdoc}
374
     */
375 23
    public function beginTransaction(): bool
376
    {
377 23
        $success = true;
378
379 23
        foreach ($this->getSelectedShards() as $shard) {
380 23
            if (!$shard->beginTransaction()) {
381 23
                $success = false;
382
            }
383
        }
384
385 23
        return $success;
386
    }
387
388
    /**
389
     * {@inheritdoc}
390
     */
391 1
    public function commit(): bool
392
    {
393 1
        $success = true;
394
395 1
        foreach ($this->getSelectedShards() as $shard) {
396 1
            if (!$shard->commit()) {
397 1
                $success = false;
398
            }
399
        }
400
401 1
        return $success;
402
    }
403
404
    /**
405
     * {@inheritdoc}
406
     */
407 1
    public function rollBack(): bool
408
    {
409 1
        $success = true;
410
411 1
        foreach ($this->getSelectedShards() as $shard) {
412 1
            if (!$shard->rollBack()) {
413 1
                $success = false;
414
            }
415
        }
416
417 1
        return $success;
418
    }
419
420
    /**
0 ignored issues
show
Coding Style introduced by
Parameter $savepoint should have a doc-comment as per coding-style.
Loading history...
421
     * {@inheritdoc}
422
     */
423
    public function createSavepoint($savepoint)
424
    {
425
        foreach ($this->getSelectedShards() as $shard) {
426
            $shard->createSavepoint($savepoint);
427
        }
428
    }
429
430
    /**
0 ignored issues
show
Coding Style introduced by
Parameter $savepoint should have a doc-comment as per coding-style.
Loading history...
431
     * {@inheritdoc}
432
     */
433
    public function releaseSavepoint($savepoint)
434
    {
435
        foreach ($this->getSelectedShards() as $shard) {
436
            $shard->releaseSavepoint($savepoint);
437
        }
438
    }
439
440
    /**
0 ignored issues
show
Coding Style introduced by
Parameter $savepoint should have a doc-comment as per coding-style.
Loading history...
441
     * {@inheritdoc}
442
     */
443
    public function rollbackSavepoint($savepoint)
444
    {
445
        foreach ($this->getSelectedShards() as $shard) {
446
            $shard->rollbackSavepoint($savepoint);
447
        }
448
    }
449
450
    /**
0 ignored issues
show
Coding Style introduced by
Parameter $seqName should have a doc-comment as per coding-style.
Loading history...
451
     * {@inheritdoc}
452
     */
453 1
    public function lastInsertId($seqName = null)
454
    {
455 1
        if ($this->isUsingShard()) {
456 1
            return $this->getSelectedShard()->lastInsertId($seqName);
457
        }
458
459
        // TODO doit on lever une exception ?
0 ignored issues
show
Coding Style Best Practice introduced by
Comments for TODO tasks are often forgotten in the code; it might be better to use a dedicated issue tracker.
Loading history...
460
        return parent::lastInsertId($seqName);
461
    }
462
463
    /**
464
     * {@inheritdoc}
465
     */
466 1
    public function getWrappedConnection()
467
    {
468 1
        if ($this->isUsingShard()) {
469 1
            return $this->getSelectedShard()->getWrappedConnection();
470
        }
471
472
        // TODO doit on lever une exception ?
0 ignored issues
show
Coding Style Best Practice introduced by
Comments for TODO tasks are often forgotten in the code; it might be better to use a dedicated issue tracker.
Loading history...
473
        return parent::getWrappedConnection();
474
    }
475
}
476