Issues (590)

src/Sharding/ShardingConnection.php (3 issues)

1
<?php
2
3
namespace Bdf\Prime\Sharding;
4
5
use Bdf\Prime\Connection\ConnectionInterface;
6
use Bdf\Prime\Connection\SimpleConnection;
7
use Bdf\Prime\Connection\SubConnectionManagerInterface;
8
use Bdf\Prime\Exception\ShardingException;
9
use Bdf\Prime\Query\Compiler\Preprocessor\PreprocessorInterface;
10
use Bdf\Prime\Query\Contract\Query\InsertQueryInterface;
11
use Bdf\Prime\Query\Contract\Query\KeyValueQueryInterface;
12
use Bdf\Prime\Query\Factory\DefaultQueryFactory;
13
use Bdf\Prime\Sharding\Query\ShardingInsertQuery;
14
use Bdf\Prime\Sharding\Query\ShardingKeyValueQuery;
15
use Doctrine\Common\EventManager;
16
use Doctrine\DBAL\Cache\QueryCacheProfile;
17
use Doctrine\DBAL\Configuration;
18
use Doctrine\DBAL\Driver;
19
use Doctrine\DBAL\Result;
20
use LogicException;
21
22
/**
23
 * ShardingConnection
24
 *
25
 * The sharding connection is a global connection (that can be a shard server) and a collection of connection wrappers for shards.
26
 *
27
 * Those methods will be used by the global connection:
28
 *    ShardingConnection#prepare    Will connect on global but should execute on shard if one is selected
29
 *    ShardingConnection#quote
30
 *    ShardingConnection#errorCode
31
 *    ShardingConnection#errorInfo
32
 *    ShardingConnection#ping
33
 *
34
 * Can connect the global connection if no shard has been selected:
35
 *    ShardingConnection#lastInsertId
36
 *    ShardingConnection#lastInsertId
37
 *
38
 * Be aware!!
39
 * This connection does not managed the distribution key. If a SQL INSERT is executed without shard selection
40
 * the SQL will be executed on each shards.
41
 *
42
 * The shard to use can be auto guess if query builder is used.
43
 *
44
 * The method MultiStatement#fetchColumn change the interface (it will return a array of result)
45
 * All aggregate function executed on each shard will return a collection of result.
46
 * The merge should be done outside this class
47
 *
48
 * @example:
49
 * // returns the count result of the first shard
50
 * $connection->from('test')->count());
51
 *
52
 * // returns an array containing all count result of each shards
53
 * $connection->query('select count(*) from test')->fetchColumn();
54
 *
55
 * @package Bdf\Prime\Sharding
56
 */
57
class ShardingConnection extends SimpleConnection implements SubConnectionManagerInterface
58
{
59
    /**
60
     * All shard connections
61
     *
62
     * @var SimpleConnection[]
63
     */
64
    private $connections = [];
65
66
    /**
67
     * The shard choser
68
     *
69
     * @var ShardChoserInterface
70
     */
71
    private $shardChoser;
72
73
    /**
74
     * The id of current shard. Null means all shards
75
     *
76
     * @var string
77
     */
78
    private $currentShardId;
79
80
    /**
81
     * The distribution key
82
     *
83
     * @var string
84
     */
85
    private $distributionKey;
86
87
    /**
88
     * Initializes a new instance of the Connection class.
89
     *
90
     * Here's a shard connections configuration
91
     *
92
     * @example
93
     *
94
     * $conn = DriverManager::getConnection([
95
     *    'driver' => 'pdo_mysql',
96
     *    'user'     => 'user',
97
     *    'password' => 'password',
98
     *    'host'     => '127.0.0.1',
99
     *    'dbname'   => 'basename',
100
     *    'distributionKey' => 'id',
101
     *    'shards' => [
102
     *      '{shardId}' => [
103
     *        'user'     => 'shard1',
104
     *        'host'     => '...',
105
     *      ]
106
     *    ]
107
     * ]);
108
     *
109
     * @param array                              $params       The connection parameters.
110
     * @param \Doctrine\DBAL\Driver              $driver       The driver to use.
111
     * @param \Doctrine\DBAL\Configuration|null  $config       The configuration, optional.
112
     * @param \Doctrine\Common\EventManager|null $eventManager The event manager, optional.
113
     */
114 79
    public function __construct(array $params, Driver $driver, Configuration $config = null, EventManager $eventManager = null)
115
    {
116 79
        if (!isset($params['shard_connections'])) {
117
            throw new LogicException('Sharding connection needs "shard_connections" configuration in parameters');
118
        }
119 79
        if (!isset($params['distributionKey'])) {
120
            throw new LogicException('Sharding connection needs distribution key in parameters');
121
        }
122
123 79
        $this->distributionKey = $params['distributionKey'];
124 79
        $this->shardChoser = $params['shardChoser'] ?? new ModuloChoser();
125 79
        $this->connections = $params['shard_connections'];
126
127 79
        parent::__construct($params, $driver, $config, $eventManager);
128
129
        /** @var DefaultQueryFactory $queryFactory */
130 79
        $queryFactory = $this->factory();
131
132
        /** @psalm-suppress InvalidArgument */
133 79
        $queryFactory->alias(InsertQueryInterface::class, ShardingInsertQuery::class);
134
        /** @psalm-suppress InvalidArgument */
135 79
        $queryFactory->alias(KeyValueQueryInterface::class, ShardingKeyValueQuery::class);
136
    }
137
138
    /**
139
     * {@inheritdoc}
140
     */
141 79
    public function getDatabase(): ?string
142
    {
143 79
        return '';
144
    }
145
146
    /**
147
     * Get the shard ids
148
     *
149
     * @return array
150
     */
151 65
    public function getShardIds()
152
    {
153 65
        return array_keys($this->connections);
154
    }
155
156
    /**
157
     * Get the shard choser
158
     *
159
     * @return ShardChoserInterface
160
     */
161 46
    public function getShardChoser()
162
    {
163 46
        return $this->shardChoser;
164
    }
165
166
    /**
167
     * Get the distribution key
168
     *
169
     * @return string
170
     */
171 64
    public function getDistributionKey()
172
    {
173 64
        return $this->distributionKey;
174
    }
175
176
    /**
177
     * Get the current shard
178
     *
179
     * @return string
180
     */
181 47
    public function getCurrentShardId()
182
    {
183 47
        return $this->currentShardId;
184
    }
185
186
    /**
187
     * Select a shard to use.
188
     *
189
     * @param mixed $distributionValue
190
     *
191
     * @return $this
192
     */
193 34
    public function pickShard($distributionValue = null)
194
    {
195 34
        $this->useShard(
196 34
            $distributionValue !== null
197 32
                ? $this->shardChoser->pick($distributionValue, $this)
198 34
                : null
199 34
        );
200
201 34
        return $this;
202
    }
203
204
    /**
205
     * Use a shard
206
     *
207
     * @param string|null $shardId
208
     *
209
     * @return $this
210
     *
211
     * @throws ShardingException   If the shard id is not known
212
     */
213 48
    public function useShard(?string $shardId = null)
214
    {
215 48
        if ($shardId !== null && !isset($this->connections[$shardId])) {
216
            throw ShardingException::unknown($shardId);
217
        }
218
219 48
        $this->currentShardId = $shardId;
220
221 48
        return $this;
222
    }
223
224
    /**
225
     * Check whether the connection is using a shard
226
     *
227
     * @return boolean
228
     */
229 79
    public function isUsingShard()
230
    {
231 79
        return $this->currentShardId !== null;
232
    }
233
234
    /**
235
     * Get a shard connection by its id
236
     * Returns all connection if id is null
237
     *
238
     * @param string|null $shardId
239
     *
240
     * @return SimpleConnection[]|SimpleConnection
241
     *
242
     * @psalm-param S $shardId
243
     * @psalm-return (S is null ? SimpleConnection[] : SimpleConnection)
244
     * @template S as null|array-key
245
     *
246
     * @throws ShardingException   If the shard id is not known
247
     */
248 63
    public function getShardConnection($shardId = null)
249
    {
250 63
        if ($shardId === null) {
251 2
            return $this->connections;
252
        }
253
254 61
        if (!isset($this->connections[$shardId])) {
255 1
            throw ShardingException::unknown($shardId);
256
        }
257
258 60
        return $this->connections[$shardId];
259
    }
260
261
    /**
262
     * {@inheritdoc}
263
     */
264 34
    public function getConnection(string $name): ConnectionInterface
265
    {
266 34
        return $this->getShardConnection($name);
267
    }
268
269
    /**
270
     * Get the selected shards
271
     *
272
     * @return SimpleConnection[]
273
     */
274 79
    protected function getSelectedShards()
275
    {
276 79
        if ($this->isUsingShard()) {
277 30
            return [$this->connections[$this->currentShardId]];
278
        }
279
280 79
        return $this->connections;
281
    }
282
283
    /**
284
     * Get the selected shard
285
     *
286
     * @return SimpleConnection
287
     */
288 10
    protected function getSelectedShard()
289
    {
290 10
        return $this->connections[$this->currentShardId];
291
    }
292
293
    /**
294
     * {@inheritdoc}
295
     */
296 1
    public function close(): void
297
    {
298 1
        parent::close();
299
300 1
        $this->currentShardId = null;
301
302 1
        foreach ($this->connections as $shard) {
303 1
            $shard->close();
304
        }
305
    }
306
307
    /**
308
     * {@inheritdoc}
309
     */
310 36
    public function builder(PreprocessorInterface $preprocessor = null): ShardingQuery
311
    {
312 36
        return $this->factory()->make(ShardingQuery::class, $preprocessor);
0 ignored issues
show
Bug Best Practice introduced by
The expression return $this->factory()-...::class, $preprocessor) returns the type Bdf\Prime\Query\CommandInterface which includes types incompatible with the type-hinted return Bdf\Prime\Sharding\ShardingQuery.
Loading history...
313
    }
314
315
    /**
316
     * {@inheritdoc}
317
     */
318 79
    public function executeQuery(string $sql, array $params = [], $types = [], QueryCacheProfile $qcp = null): Result
319
    {
320 79
        if ($this->isUsingShard()) {
321 8
            return $this->getSelectedShard()->executeQuery($sql, $params, $types, $qcp);
322
        }
323
324 79
        $result = new MultiResult();
325
326 79
        foreach ($this->getSelectedShards() as $shard) {
327 79
            $result->add($shard->executeQuery($sql, $params, $types, $qcp));
328
        }
329
330
        /** @psalm-suppress InternalMethod */
331 79
        return new Result($result, $this);
332
    }
333
334
    /**
335
     * {@inheritdoc}
336
     */
337 79
    public function executeStatement($sql, array $params = [], array $types = []): int
338
    {
339 79
        $result = 0;
340
341 79
        foreach ($this->getSelectedShards() as $shard) {
342 79
            $result += $shard->executeStatement($sql, $params, $types);
343
        }
344
345 79
        return $result;
346
    }
347
348
    /**
349
     * {@inheritdoc}
350
     */
351 23
    public function beginTransaction(): bool
352
    {
353 23
        $success = true;
354
355 23
        foreach ($this->getSelectedShards() as $shard) {
356 23
            if (!$shard->beginTransaction()) {
357
                $success = false;
358
            }
359
        }
360
361 23
        return $success;
362
    }
363
364
    /**
365
     * {@inheritdoc}
366
     */
367 1
    public function commit(): bool
368
    {
369 1
        $success = true;
370
371 1
        foreach ($this->getSelectedShards() as $shard) {
372 1
            if (!$shard->commit()) {
373
                $success = false;
374
            }
375
        }
376
377 1
        return $success;
378
    }
379
380
    /**
381
     * {@inheritdoc}
382
     */
383 1
    public function rollBack(): bool
384
    {
385 1
        $success = true;
386
387 1
        foreach ($this->getSelectedShards() as $shard) {
388 1
            if (!$shard->rollBack()) {
389
                $success = false;
390
            }
391
        }
392
393 1
        return $success;
394
    }
395
396
    /**
397
     * {@inheritdoc}
398
     */
399
    public function createSavepoint($savepoint)
400
    {
401
        foreach ($this->getSelectedShards() as $shard) {
402
            $shard->createSavepoint($savepoint);
403
        }
404
    }
405
406
    /**
407
     * {@inheritdoc}
408
     */
409
    public function releaseSavepoint($savepoint)
410
    {
411
        foreach ($this->getSelectedShards() as $shard) {
412
            $shard->releaseSavepoint($savepoint);
413
        }
414
    }
415
416
    /**
417
     * {@inheritdoc}
418
     */
419
    public function rollbackSavepoint($savepoint)
420
    {
421
        foreach ($this->getSelectedShards() as $shard) {
422
            $shard->rollbackSavepoint($savepoint);
423
        }
424
    }
425
426
    /**
427
     * {@inheritdoc}
428
     */
429 1
    public function lastInsertId($name = null)
430
    {
431 1
        if ($this->isUsingShard()) {
432 1
            return $this->getSelectedShard()->lastInsertId($name);
433
        }
434
435
        // TODO doit on lever une exception ?
436
        return parent::lastInsertId($name);
437
    }
438
439
    /**
440
     * {@inheritdoc}
441
     */
442 1
    public function getWrappedConnection()
443
    {
444 1
        if ($this->isUsingShard()) {
445 1
            return $this->getSelectedShard()->getWrappedConnection();
0 ignored issues
show
Deprecated Code introduced by
The function Doctrine\DBAL\Connection::getWrappedConnection() has been deprecated: Use {@link getNativeConnection()} to access the native connection. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

445
            return /** @scrutinizer ignore-deprecated */ $this->getSelectedShard()->getWrappedConnection();

This function has been deprecated. The supplier of the function has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the function will be removed and what other function to use instead.

Loading history...
446
        }
447
448
        // TODO doit on lever une exception ?
449
        return parent::getWrappedConnection();
0 ignored issues
show
Deprecated Code introduced by
The function Doctrine\DBAL\Connection::getWrappedConnection() has been deprecated: Use {@link getNativeConnection()} to access the native connection. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

449
        return /** @scrutinizer ignore-deprecated */ parent::getWrappedConnection();

This function has been deprecated. The supplier of the function has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the function will be removed and what other function to use instead.

Loading history...
450
    }
451
}
452