1 | <?php |
||||
2 | |||||
3 | namespace Bdf\Prime\Sharding; |
||||
4 | |||||
5 | use Bdf\Prime\Connection\ConnectionInterface; |
||||
6 | use Bdf\Prime\Connection\SimpleConnection; |
||||
7 | use Bdf\Prime\Connection\SubConnectionManagerInterface; |
||||
8 | use Bdf\Prime\Exception\ShardingException; |
||||
9 | use Bdf\Prime\Query\Compiler\Preprocessor\PreprocessorInterface; |
||||
10 | use Bdf\Prime\Query\Contract\Query\InsertQueryInterface; |
||||
11 | use Bdf\Prime\Query\Contract\Query\KeyValueQueryInterface; |
||||
12 | use Bdf\Prime\Query\Factory\DefaultQueryFactory; |
||||
13 | use Bdf\Prime\Sharding\Query\ShardingInsertQuery; |
||||
14 | use Bdf\Prime\Sharding\Query\ShardingKeyValueQuery; |
||||
15 | use Doctrine\Common\EventManager; |
||||
16 | use Doctrine\DBAL\Cache\QueryCacheProfile; |
||||
17 | use Doctrine\DBAL\Configuration; |
||||
18 | use Doctrine\DBAL\Driver; |
||||
19 | use Doctrine\DBAL\Result; |
||||
20 | use LogicException; |
||||
21 | |||||
22 | /** |
||||
23 | * ShardingConnection |
||||
24 | * |
||||
25 | * The sharding connection is a global connection (that can be a shard server) and a collection of connection wrappers for shards. |
||||
26 | * |
||||
27 | * Those methods will be used by the global connection: |
||||
28 | * ShardingConnection#prepare Will connect on global but should execute on shard if one is selected |
||||
29 | * ShardingConnection#quote |
||||
30 | * ShardingConnection#errorCode |
||||
31 | * ShardingConnection#errorInfo |
||||
32 | * ShardingConnection#ping |
||||
33 | * |
||||
34 | * Can connect the global connection if no shard has been selected: |
||||
35 | * ShardingConnection#lastInsertId |
||||
36 | * ShardingConnection#lastInsertId |
||||
37 | * |
||||
38 | * Be aware!! |
||||
39 | * This connection does not managed the distribution key. If a SQL INSERT is executed without shard selection |
||||
40 | * the SQL will be executed on each shards. |
||||
41 | * |
||||
42 | * The shard to use can be auto guess if query builder is used. |
||||
43 | * |
||||
44 | * The method MultiStatement#fetchColumn change the interface (it will return a array of result) |
||||
45 | * All aggregate function executed on each shard will return a collection of result. |
||||
46 | * The merge should be done outside this class |
||||
47 | * |
||||
48 | * @example: |
||||
49 | * // returns the count result of the first shard |
||||
50 | * $connection->from('test')->count()); |
||||
51 | * |
||||
52 | * // returns an array containing all count result of each shards |
||||
53 | * $connection->query('select count(*) from test')->fetchColumn(); |
||||
54 | * |
||||
55 | * @package Bdf\Prime\Sharding |
||||
56 | */ |
||||
57 | class ShardingConnection extends SimpleConnection implements SubConnectionManagerInterface |
||||
58 | { |
||||
59 | /** |
||||
60 | * All shard connections |
||||
61 | * |
||||
62 | * @var SimpleConnection[] |
||||
63 | */ |
||||
64 | private $connections = []; |
||||
65 | |||||
66 | /** |
||||
67 | * The shard choser |
||||
68 | * |
||||
69 | * @var ShardChoserInterface |
||||
70 | */ |
||||
71 | private $shardChoser; |
||||
72 | |||||
73 | /** |
||||
74 | * The id of current shard. Null means all shards |
||||
75 | * |
||||
76 | * @var string |
||||
77 | */ |
||||
78 | private $currentShardId; |
||||
79 | |||||
80 | /** |
||||
81 | * The distribution key |
||||
82 | * |
||||
83 | * @var string |
||||
84 | */ |
||||
85 | private $distributionKey; |
||||
86 | |||||
87 | /** |
||||
88 | * Initializes a new instance of the Connection class. |
||||
89 | * |
||||
90 | * Here's a shard connections configuration |
||||
91 | * |
||||
92 | * @example |
||||
93 | * |
||||
94 | * $conn = DriverManager::getConnection([ |
||||
95 | * 'driver' => 'pdo_mysql', |
||||
96 | * 'user' => 'user', |
||||
97 | * 'password' => 'password', |
||||
98 | * 'host' => '127.0.0.1', |
||||
99 | * 'dbname' => 'basename', |
||||
100 | * 'distributionKey' => 'id', |
||||
101 | * 'shards' => [ |
||||
102 | * '{shardId}' => [ |
||||
103 | * 'user' => 'shard1', |
||||
104 | * 'host' => '...', |
||||
105 | * ] |
||||
106 | * ] |
||||
107 | * ]); |
||||
108 | * |
||||
109 | * @param array $params The connection parameters. |
||||
110 | * @param \Doctrine\DBAL\Driver $driver The driver to use. |
||||
111 | * @param \Doctrine\DBAL\Configuration|null $config The configuration, optional. |
||||
112 | * @param \Doctrine\Common\EventManager|null $eventManager The event manager, optional. |
||||
113 | */ |
||||
114 | 79 | public function __construct(array $params, Driver $driver, Configuration $config = null, EventManager $eventManager = null) |
|||
115 | { |
||||
116 | 79 | if (!isset($params['shard_connections'])) { |
|||
117 | throw new LogicException('Sharding connection needs "shard_connections" configuration in parameters'); |
||||
118 | } |
||||
119 | 79 | if (!isset($params['distributionKey'])) { |
|||
120 | throw new LogicException('Sharding connection needs distribution key in parameters'); |
||||
121 | } |
||||
122 | |||||
123 | 79 | $this->distributionKey = $params['distributionKey']; |
|||
124 | 79 | $this->shardChoser = $params['shardChoser'] ?? new ModuloChoser(); |
|||
125 | 79 | $this->connections = $params['shard_connections']; |
|||
126 | |||||
127 | 79 | parent::__construct($params, $driver, $config, $eventManager); |
|||
128 | |||||
129 | /** @var DefaultQueryFactory $queryFactory */ |
||||
130 | 79 | $queryFactory = $this->factory(); |
|||
131 | |||||
132 | /** @psalm-suppress InvalidArgument */ |
||||
133 | 79 | $queryFactory->alias(InsertQueryInterface::class, ShardingInsertQuery::class); |
|||
134 | /** @psalm-suppress InvalidArgument */ |
||||
135 | 79 | $queryFactory->alias(KeyValueQueryInterface::class, ShardingKeyValueQuery::class); |
|||
136 | } |
||||
137 | |||||
138 | /** |
||||
139 | * {@inheritdoc} |
||||
140 | */ |
||||
141 | 79 | public function getDatabase(): ?string |
|||
142 | { |
||||
143 | 79 | return ''; |
|||
144 | } |
||||
145 | |||||
146 | /** |
||||
147 | * Get the shard ids |
||||
148 | * |
||||
149 | * @return array |
||||
150 | */ |
||||
151 | 65 | public function getShardIds() |
|||
152 | { |
||||
153 | 65 | return array_keys($this->connections); |
|||
154 | } |
||||
155 | |||||
156 | /** |
||||
157 | * Get the shard choser |
||||
158 | * |
||||
159 | * @return ShardChoserInterface |
||||
160 | */ |
||||
161 | 46 | public function getShardChoser() |
|||
162 | { |
||||
163 | 46 | return $this->shardChoser; |
|||
164 | } |
||||
165 | |||||
166 | /** |
||||
167 | * Get the distribution key |
||||
168 | * |
||||
169 | * @return string |
||||
170 | */ |
||||
171 | 64 | public function getDistributionKey() |
|||
172 | { |
||||
173 | 64 | return $this->distributionKey; |
|||
174 | } |
||||
175 | |||||
176 | /** |
||||
177 | * Get the current shard |
||||
178 | * |
||||
179 | * @return string |
||||
180 | */ |
||||
181 | 47 | public function getCurrentShardId() |
|||
182 | { |
||||
183 | 47 | return $this->currentShardId; |
|||
184 | } |
||||
185 | |||||
186 | /** |
||||
187 | * Select a shard to use. |
||||
188 | * |
||||
189 | * @param mixed $distributionValue |
||||
190 | * |
||||
191 | * @return $this |
||||
192 | */ |
||||
193 | 34 | public function pickShard($distributionValue = null) |
|||
194 | { |
||||
195 | 34 | $this->useShard( |
|||
196 | 34 | $distributionValue !== null |
|||
197 | 32 | ? $this->shardChoser->pick($distributionValue, $this) |
|||
198 | 34 | : null |
|||
199 | 34 | ); |
|||
200 | |||||
201 | 34 | return $this; |
|||
202 | } |
||||
203 | |||||
204 | /** |
||||
205 | * Use a shard |
||||
206 | * |
||||
207 | * @param string|null $shardId |
||||
208 | * |
||||
209 | * @return $this |
||||
210 | * |
||||
211 | * @throws ShardingException If the shard id is not known |
||||
212 | */ |
||||
213 | 48 | public function useShard(?string $shardId = null) |
|||
214 | { |
||||
215 | 48 | if ($shardId !== null && !isset($this->connections[$shardId])) { |
|||
216 | throw ShardingException::unknown($shardId); |
||||
217 | } |
||||
218 | |||||
219 | 48 | $this->currentShardId = $shardId; |
|||
220 | |||||
221 | 48 | return $this; |
|||
222 | } |
||||
223 | |||||
224 | /** |
||||
225 | * Check whether the connection is using a shard |
||||
226 | * |
||||
227 | * @return boolean |
||||
228 | */ |
||||
229 | 79 | public function isUsingShard() |
|||
230 | { |
||||
231 | 79 | return $this->currentShardId !== null; |
|||
232 | } |
||||
233 | |||||
234 | /** |
||||
235 | * Get a shard connection by its id |
||||
236 | * Returns all connection if id is null |
||||
237 | * |
||||
238 | * @param string|null $shardId |
||||
239 | * |
||||
240 | * @return SimpleConnection[]|SimpleConnection |
||||
241 | * |
||||
242 | * @psalm-param S $shardId |
||||
243 | * @psalm-return (S is null ? SimpleConnection[] : SimpleConnection) |
||||
244 | * @template S as null|array-key |
||||
245 | * |
||||
246 | * @throws ShardingException If the shard id is not known |
||||
247 | */ |
||||
248 | 63 | public function getShardConnection($shardId = null) |
|||
249 | { |
||||
250 | 63 | if ($shardId === null) { |
|||
251 | 2 | return $this->connections; |
|||
252 | } |
||||
253 | |||||
254 | 61 | if (!isset($this->connections[$shardId])) { |
|||
255 | 1 | throw ShardingException::unknown($shardId); |
|||
256 | } |
||||
257 | |||||
258 | 60 | return $this->connections[$shardId]; |
|||
259 | } |
||||
260 | |||||
261 | /** |
||||
262 | * {@inheritdoc} |
||||
263 | */ |
||||
264 | 34 | public function getConnection(string $name): ConnectionInterface |
|||
265 | { |
||||
266 | 34 | return $this->getShardConnection($name); |
|||
267 | } |
||||
268 | |||||
269 | /** |
||||
270 | * Get the selected shards |
||||
271 | * |
||||
272 | * @return SimpleConnection[] |
||||
273 | */ |
||||
274 | 79 | protected function getSelectedShards() |
|||
275 | { |
||||
276 | 79 | if ($this->isUsingShard()) { |
|||
277 | 30 | return [$this->connections[$this->currentShardId]]; |
|||
278 | } |
||||
279 | |||||
280 | 79 | return $this->connections; |
|||
281 | } |
||||
282 | |||||
283 | /** |
||||
284 | * Get the selected shard |
||||
285 | * |
||||
286 | * @return SimpleConnection |
||||
287 | */ |
||||
288 | 10 | protected function getSelectedShard() |
|||
289 | { |
||||
290 | 10 | return $this->connections[$this->currentShardId]; |
|||
291 | } |
||||
292 | |||||
293 | /** |
||||
294 | * {@inheritdoc} |
||||
295 | */ |
||||
296 | 1 | public function close(): void |
|||
297 | { |
||||
298 | 1 | parent::close(); |
|||
299 | |||||
300 | 1 | $this->currentShardId = null; |
|||
301 | |||||
302 | 1 | foreach ($this->connections as $shard) { |
|||
303 | 1 | $shard->close(); |
|||
304 | } |
||||
305 | } |
||||
306 | |||||
307 | /** |
||||
308 | * {@inheritdoc} |
||||
309 | */ |
||||
310 | 36 | public function builder(PreprocessorInterface $preprocessor = null): ShardingQuery |
|||
311 | { |
||||
312 | 36 | return $this->factory()->make(ShardingQuery::class, $preprocessor); |
|||
0 ignored issues
–
show
Bug
Best Practice
introduced
by
![]() |
|||||
313 | } |
||||
314 | |||||
315 | /** |
||||
316 | * {@inheritdoc} |
||||
317 | */ |
||||
318 | 79 | public function executeQuery(string $sql, array $params = [], $types = [], QueryCacheProfile $qcp = null): Result |
|||
319 | { |
||||
320 | 79 | if ($this->isUsingShard()) { |
|||
321 | 8 | return $this->getSelectedShard()->executeQuery($sql, $params, $types, $qcp); |
|||
322 | } |
||||
323 | |||||
324 | 79 | $result = new MultiResult(); |
|||
325 | |||||
326 | 79 | foreach ($this->getSelectedShards() as $shard) { |
|||
327 | 79 | $result->add($shard->executeQuery($sql, $params, $types, $qcp)); |
|||
328 | } |
||||
329 | |||||
330 | /** @psalm-suppress InternalMethod */ |
||||
331 | 79 | return new Result($result, $this); |
|||
332 | } |
||||
333 | |||||
334 | /** |
||||
335 | * {@inheritdoc} |
||||
336 | */ |
||||
337 | 79 | public function executeStatement($sql, array $params = [], array $types = []): int |
|||
338 | { |
||||
339 | 79 | $result = 0; |
|||
340 | |||||
341 | 79 | foreach ($this->getSelectedShards() as $shard) { |
|||
342 | 79 | $result += $shard->executeStatement($sql, $params, $types); |
|||
343 | } |
||||
344 | |||||
345 | 79 | return $result; |
|||
346 | } |
||||
347 | |||||
348 | /** |
||||
349 | * {@inheritdoc} |
||||
350 | */ |
||||
351 | 23 | public function beginTransaction(): bool |
|||
352 | { |
||||
353 | 23 | $success = true; |
|||
354 | |||||
355 | 23 | foreach ($this->getSelectedShards() as $shard) { |
|||
356 | 23 | if (!$shard->beginTransaction()) { |
|||
357 | $success = false; |
||||
358 | } |
||||
359 | } |
||||
360 | |||||
361 | 23 | return $success; |
|||
362 | } |
||||
363 | |||||
364 | /** |
||||
365 | * {@inheritdoc} |
||||
366 | */ |
||||
367 | 1 | public function commit(): bool |
|||
368 | { |
||||
369 | 1 | $success = true; |
|||
370 | |||||
371 | 1 | foreach ($this->getSelectedShards() as $shard) { |
|||
372 | 1 | if (!$shard->commit()) { |
|||
373 | $success = false; |
||||
374 | } |
||||
375 | } |
||||
376 | |||||
377 | 1 | return $success; |
|||
378 | } |
||||
379 | |||||
380 | /** |
||||
381 | * {@inheritdoc} |
||||
382 | */ |
||||
383 | 1 | public function rollBack(): bool |
|||
384 | { |
||||
385 | 1 | $success = true; |
|||
386 | |||||
387 | 1 | foreach ($this->getSelectedShards() as $shard) { |
|||
388 | 1 | if (!$shard->rollBack()) { |
|||
389 | $success = false; |
||||
390 | } |
||||
391 | } |
||||
392 | |||||
393 | 1 | return $success; |
|||
394 | } |
||||
395 | |||||
396 | /** |
||||
397 | * {@inheritdoc} |
||||
398 | */ |
||||
399 | public function createSavepoint($savepoint) |
||||
400 | { |
||||
401 | foreach ($this->getSelectedShards() as $shard) { |
||||
402 | $shard->createSavepoint($savepoint); |
||||
403 | } |
||||
404 | } |
||||
405 | |||||
406 | /** |
||||
407 | * {@inheritdoc} |
||||
408 | */ |
||||
409 | public function releaseSavepoint($savepoint) |
||||
410 | { |
||||
411 | foreach ($this->getSelectedShards() as $shard) { |
||||
412 | $shard->releaseSavepoint($savepoint); |
||||
413 | } |
||||
414 | } |
||||
415 | |||||
416 | /** |
||||
417 | * {@inheritdoc} |
||||
418 | */ |
||||
419 | public function rollbackSavepoint($savepoint) |
||||
420 | { |
||||
421 | foreach ($this->getSelectedShards() as $shard) { |
||||
422 | $shard->rollbackSavepoint($savepoint); |
||||
423 | } |
||||
424 | } |
||||
425 | |||||
426 | /** |
||||
427 | * {@inheritdoc} |
||||
428 | */ |
||||
429 | 1 | public function lastInsertId($name = null) |
|||
430 | { |
||||
431 | 1 | if ($this->isUsingShard()) { |
|||
432 | 1 | return $this->getSelectedShard()->lastInsertId($name); |
|||
433 | } |
||||
434 | |||||
435 | // TODO doit on lever une exception ? |
||||
436 | return parent::lastInsertId($name); |
||||
437 | } |
||||
438 | |||||
439 | /** |
||||
440 | * {@inheritdoc} |
||||
441 | */ |
||||
442 | 1 | public function getWrappedConnection() |
|||
443 | { |
||||
444 | 1 | if ($this->isUsingShard()) { |
|||
445 | 1 | return $this->getSelectedShard()->getWrappedConnection(); |
|||
0 ignored issues
–
show
The function
Doctrine\DBAL\Connection::getWrappedConnection() has been deprecated: Use {@link getNativeConnection()} to access the native connection.
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
This function has been deprecated. The supplier of the function has supplied an explanatory message. The explanatory message should give you some clue as to whether and when the function will be removed and what other function to use instead. ![]() |
|||||
446 | } |
||||
447 | |||||
448 | // TODO doit on lever une exception ? |
||||
449 | return parent::getWrappedConnection(); |
||||
0 ignored issues
–
show
The function
Doctrine\DBAL\Connection::getWrappedConnection() has been deprecated: Use {@link getNativeConnection()} to access the native connection.
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
This function has been deprecated. The supplier of the function has supplied an explanatory message. The explanatory message should give you some clue as to whether and when the function will be removed and what other function to use instead. ![]() |
|||||
450 | } |
||||
451 | } |
||||
452 |