1 | <?php |
||||
2 | |||||
3 | namespace Bdf\Prime\Sharding\Query; |
||||
4 | |||||
5 | use Bdf\Prime\Connection\ConnectionInterface; |
||||
6 | use Bdf\Prime\Connection\Result\ArrayResultSet; |
||||
7 | use Bdf\Prime\Connection\Result\ResultSetInterface; |
||||
8 | use Bdf\Prime\Exception\ShardingException; |
||||
9 | use Bdf\Prime\Query\AbstractReadCommand; |
||||
10 | use Bdf\Prime\Query\Compiler\Preprocessor\DefaultPreprocessor; |
||||
11 | use Bdf\Prime\Query\Compiler\Preprocessor\PreprocessorInterface; |
||||
12 | use Bdf\Prime\Query\Contract\Query\KeyValueQueryInterface; |
||||
13 | use Bdf\Prime\Query\Contract\ReadOperation; |
||||
14 | use Bdf\Prime\Query\Contract\WriteOperation; |
||||
15 | use Bdf\Prime\Query\Extension\CachableTrait; |
||||
16 | use Bdf\Prime\Query\Extension\ExecutableTrait; |
||||
17 | use Bdf\Prime\Sharding\Extension\ShardPicker; |
||||
18 | use Bdf\Prime\Sharding\ShardingConnection; |
||||
19 | |||||
20 | /** |
||||
21 | * Handle simple key/value query on sharding connection |
||||
22 | * If the distribution key is found on the filters, the corresponding sharding query is used |
||||
23 | * In other case, all shards will be queried on |
||||
24 | * |
||||
25 | * @property ShardingConnection $connection |
||||
26 | * |
||||
27 | * @template R as object|array |
||||
28 | * |
||||
29 | * @implements KeyValueQueryInterface<ShardingConnection, R> |
||||
30 | * @extends AbstractReadCommand<ShardingConnection, R> |
||||
31 | */ |
||||
32 | class ShardingKeyValueQuery extends AbstractReadCommand implements KeyValueQueryInterface |
||||
33 | { |
||||
34 | use CachableTrait; |
||||
35 | use ExecutableTrait; |
||||
36 | use ShardPicker; |
||||
37 | |||||
38 | /** |
||||
39 | * @var KeyValueQueryInterface[] |
||||
40 | */ |
||||
41 | private $queries = []; |
||||
42 | |||||
43 | |||||
44 | /** |
||||
45 | * ShardingKeyValueQuery constructor. |
||||
46 | * |
||||
47 | * @param ShardingConnection $connection |
||||
48 | * @param PreprocessorInterface|null $preprocessor |
||||
49 | */ |
||||
50 | 36 | public function __construct(ShardingConnection $connection, ?PreprocessorInterface $preprocessor = null) |
|||
51 | { |
||||
52 | 36 | parent::__construct($connection, $preprocessor ?: new DefaultPreprocessor()); |
|||
53 | |||||
54 | 36 | $this->statements = [ |
|||
55 | 36 | 'where' => [], |
|||
56 | 36 | 'table' => null, |
|||
57 | 36 | 'columns' => [], |
|||
58 | 36 | 'aggregate' => null, |
|||
59 | 36 | 'limit' => null, |
|||
60 | 36 | 'values' => [ |
|||
61 | 36 | 'data' => [], |
|||
62 | 36 | 'types' => [] |
|||
63 | 36 | ] |
|||
64 | 36 | ]; |
|||
65 | } |
||||
66 | |||||
67 | /** |
||||
68 | * {@inheritdoc} |
||||
69 | */ |
||||
70 | 36 | public function on(ConnectionInterface $connection) |
|||
71 | { |
||||
72 | 36 | $this->connection = $connection; |
|||
0 ignored issues
–
show
|
|||||
73 | 36 | $this->queries = []; |
|||
74 | |||||
75 | 36 | return $this; |
|||
76 | } |
||||
77 | |||||
78 | /** |
||||
79 | * {@inheritdoc} |
||||
80 | */ |
||||
81 | 1 | public function project($columns = null) |
|||
82 | { |
||||
83 | 1 | $this->statements['columns'] = (array) $columns; |
|||
84 | |||||
85 | 1 | return $this; |
|||
86 | } |
||||
87 | |||||
88 | /** |
||||
89 | * {@inheritdoc} |
||||
90 | */ |
||||
91 | public function select($columns = null) |
||||
92 | { |
||||
93 | return $this->select($columns); |
||||
94 | } |
||||
95 | |||||
96 | /** |
||||
97 | * {@inheritdoc} |
||||
98 | */ |
||||
99 | public function addSelect($columns) |
||||
100 | { |
||||
101 | $this->statements['columns'] = array_merge($this->statements['columns'], $columns); |
||||
102 | |||||
103 | return $this; |
||||
104 | } |
||||
105 | |||||
106 | /** |
||||
107 | * {@inheritdoc} |
||||
108 | */ |
||||
109 | 35 | public function from(string $from, ?string $alias = null) |
|||
110 | { |
||||
111 | 35 | $this->statements['table'] = $from; |
|||
112 | |||||
113 | 35 | return $this; |
|||
114 | } |
||||
115 | |||||
116 | /** |
||||
117 | * {@inheritdoc} |
||||
118 | */ |
||||
119 | 28 | public function where($field, $value = null) |
|||
120 | { |
||||
121 | 28 | if (is_array($field)) { |
|||
122 | 14 | $this->statements['where'] = $field + $this->statements['where']; |
|||
123 | } else { |
||||
124 | 14 | $this->statements['where'][$field] = $value; |
|||
125 | } |
||||
126 | |||||
127 | 28 | return $this; |
|||
128 | } |
||||
129 | |||||
130 | /** |
||||
131 | * {@inheritdoc} |
||||
132 | */ |
||||
133 | 9 | public function values(array $values = [], array $types = []) |
|||
134 | { |
||||
135 | 9 | $this->statements['values'] = [ |
|||
136 | 9 | 'data' => $values, |
|||
137 | 9 | 'types' => $types, |
|||
138 | 9 | ]; |
|||
139 | |||||
140 | 9 | return $this; |
|||
141 | } |
||||
142 | |||||
143 | /** |
||||
144 | * {@inheritdoc} |
||||
145 | * |
||||
146 | * @internal Use internally for optimise "first" query. The offset parameter is not used |
||||
147 | * |
||||
148 | * @return static<R> |
||||
149 | */ |
||||
150 | 13 | public function limit(?int $limit, ?int $offset = null) |
|||
0 ignored issues
–
show
The parameter
$offset is not used and could be removed.
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
This check looks for parameters that have been defined for a function or method, but which are not used in the method body. ![]() |
|||||
151 | { |
||||
152 | 13 | $this->statements['limit'] = $limit; |
|||
153 | |||||
154 | 13 | return $this; |
|||
155 | } |
||||
156 | |||||
157 | /** |
||||
158 | * {@inheritdoc} |
||||
159 | */ |
||||
160 | #[ReadOperation] |
||||
161 | 7 | public function count(?string $column = null): int |
|||
162 | { |
||||
163 | 7 | return (int) array_sum($this->aggregate(__FUNCTION__, $column)); |
|||
164 | } |
||||
165 | |||||
166 | /** |
||||
167 | * {@inheritdoc} |
||||
168 | */ |
||||
169 | #[ReadOperation] |
||||
170 | public function avg(?string $column = null): float |
||||
171 | { |
||||
172 | $results = $this->aggregate(__FUNCTION__, $column); |
||||
173 | |||||
174 | return (float) array_sum($results) / count($results); |
||||
175 | } |
||||
176 | |||||
177 | /** |
||||
178 | * {@inheritdoc} |
||||
179 | */ |
||||
180 | #[ReadOperation] |
||||
181 | 1 | public function min(?string $column = null) |
|||
182 | { |
||||
183 | 1 | return min($this->aggregate(__FUNCTION__, $column)); |
|||
184 | } |
||||
185 | |||||
186 | /** |
||||
187 | * {@inheritdoc} |
||||
188 | */ |
||||
189 | #[ReadOperation] |
||||
190 | 1 | public function max(?string $column = null) |
|||
191 | { |
||||
192 | 1 | return max($this->aggregate(__FUNCTION__, $column)); |
|||
193 | } |
||||
194 | |||||
195 | /** |
||||
196 | * {@inheritdoc} |
||||
197 | */ |
||||
198 | #[ReadOperation] |
||||
199 | 1 | public function sum(?string $column = null): float |
|||
200 | { |
||||
201 | 1 | return (float) array_sum($this->aggregate(__FUNCTION__, $column)); |
|||
202 | } |
||||
203 | |||||
204 | /** |
||||
205 | * {@inheritdoc} |
||||
206 | * |
||||
207 | * @return array |
||||
208 | */ |
||||
209 | #[ReadOperation] |
||||
210 | 7 | public function aggregate(string $function, ?string $column = null): array |
|||
211 | { |
||||
212 | 7 | $results = []; |
|||
213 | |||||
214 | 7 | foreach ($this->selectQueries() as $query) { |
|||
215 | 7 | $results[] = $query->aggregate($function, $column); |
|||
216 | } |
||||
217 | |||||
218 | 7 | return $results; |
|||
219 | } |
||||
220 | |||||
221 | /** |
||||
222 | * {@inheritdoc} |
||||
223 | * |
||||
224 | * @todo execute cached with closure |
||||
225 | */ |
||||
226 | #[ReadOperation] |
||||
227 | 20 | public function execute($columns = null): ResultSetInterface |
|||
228 | { |
||||
229 | 20 | $results = []; |
|||
230 | 20 | $limit = $this->statements['limit']; |
|||
231 | |||||
232 | 20 | foreach ($this->selectQueries() as $query) { |
|||
233 | 20 | $results = array_merge($results, $query->execute($columns)->all()); |
|||
234 | |||||
235 | 20 | if ($limit) { |
|||
236 | 13 | $count = count($results); |
|||
237 | |||||
238 | 13 | if ($count == $limit) { |
|||
239 | 10 | break; |
|||
240 | } |
||||
241 | |||||
242 | 4 | if ($count > $limit) { |
|||
243 | 1 | $results = array_slice($results, 0, $limit); |
|||
244 | 1 | break; |
|||
245 | } |
||||
246 | } |
||||
247 | } |
||||
248 | |||||
249 | 20 | return new ArrayResultSet($results); |
|||
0 ignored issues
–
show
$results of type array is incompatible with the type Bdf\Prime\Connection\Result\list expected by parameter $array of Bdf\Prime\Connection\Res...esultSet::__construct() .
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
![]() |
|||||
250 | } |
||||
251 | |||||
252 | /** |
||||
253 | * {@inheritdoc} |
||||
254 | */ |
||||
255 | #[WriteOperation] |
||||
256 | 12 | public function update($values = null): int |
|||
257 | { |
||||
258 | 12 | $count = 0; |
|||
259 | |||||
260 | 12 | foreach ($this->selectQueries() as $query) { |
|||
261 | 12 | $count += $query->update($values); |
|||
262 | } |
||||
263 | |||||
264 | 12 | if ($count > 0) { |
|||
265 | 9 | $this->clearCacheOnWrite(); |
|||
266 | } |
||||
267 | |||||
268 | 12 | return $count; |
|||
269 | } |
||||
270 | |||||
271 | /** |
||||
272 | * {@inheritdoc} |
||||
273 | */ |
||||
274 | #[WriteOperation] |
||||
275 | 5 | public function delete(): int |
|||
276 | { |
||||
277 | 5 | $count = 0; |
|||
278 | |||||
279 | 5 | foreach ($this->selectQueries() as $query) { |
|||
280 | 5 | $count += $query->delete(); |
|||
281 | } |
||||
282 | |||||
283 | 5 | if ($count > 0) { |
|||
284 | 4 | $this->clearCacheOnWrite(); |
|||
285 | } |
||||
286 | |||||
287 | 5 | return $count; |
|||
288 | } |
||||
289 | |||||
290 | /** |
||||
291 | * Select the queries to use |
||||
292 | * |
||||
293 | * @return iterable<KeyValueQueryInterface> |
||||
294 | * |
||||
295 | * @throws ShardingException |
||||
296 | */ |
||||
297 | 35 | private function selectQueries(): iterable |
|||
298 | { |
||||
299 | 35 | foreach ($this->getShardIds() as $shardId) { |
|||
300 | 35 | yield $this->getQueryByShard($shardId); |
|||
301 | } |
||||
302 | } |
||||
303 | |||||
304 | /** |
||||
305 | * Get the targeted shard IDs |
||||
306 | * |
||||
307 | * @return list<string> |
||||
0 ignored issues
–
show
The type
Bdf\Prime\Sharding\Query\list was not found. Maybe you did not declare it correctly or list all dependencies?
The issue could also be caused by a filter entry in the build configuration.
If the path has been excluded in your configuration, e.g. filter:
dependency_paths: ["lib/*"]
For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths ![]() |
|||||
308 | */ |
||||
309 | 35 | private function getShardIds(): array |
|||
310 | { |
||||
311 | 35 | if ($this->shardId !== null) { |
|||
312 | 1 | return [$this->shardId]; |
|||
0 ignored issues
–
show
|
|||||
313 | } |
||||
314 | |||||
315 | 34 | $distributionKey = $this->connection->getDistributionKey(); |
|||
316 | |||||
317 | 34 | if (isset($this->statements['where'][$distributionKey])) { |
|||
318 | 24 | return [$this->connection->getShardChoser()->pick($this->statements['where'][$distributionKey], $this->connection)]; |
|||
0 ignored issues
–
show
|
|||||
319 | } |
||||
320 | |||||
321 | 10 | return $this->connection->getShardIds(); |
|||
0 ignored issues
–
show
|
|||||
322 | } |
||||
323 | |||||
324 | /** |
||||
325 | * Get and configure a query for the given shard |
||||
326 | * |
||||
327 | * @param string $shardId The shard id |
||||
328 | * |
||||
329 | * @return KeyValueQueryInterface |
||||
330 | * |
||||
331 | * @throws ShardingException |
||||
332 | */ |
||||
333 | 35 | private function getQueryByShard($shardId) |
|||
334 | { |
||||
335 | 35 | if (isset($this->queries[$shardId])) { |
|||
336 | 8 | $query = $this->queries[$shardId]; |
|||
337 | } else { |
||||
338 | /** @var KeyValueQueryInterface $query */ |
||||
339 | 35 | $this->queries[$shardId] = $query = $this->connection->getShardConnection($shardId)->make(KeyValueQueryInterface::class, $this->preprocessor()); |
|||
340 | 35 | $query->setExtension($this->extension); |
|||
341 | } |
||||
342 | |||||
343 | 35 | $query->from($this->statements['table']); |
|||
344 | |||||
345 | 35 | if (!empty($this->statements['limit'])) { |
|||
346 | /** @var KeyValueQueryInterface&\Bdf\Prime\Query\Contract\Limitable $query */ |
||||
347 | 13 | $query->limit($this->statements['limit']); |
|||
348 | } |
||||
349 | |||||
350 | 35 | if (!empty($this->statements['columns'])) { |
|||
351 | 1 | $query->project($this->statements['columns']); |
|||
352 | } |
||||
353 | |||||
354 | 35 | if (!empty($this->statements['where'])) { |
|||
355 | 28 | $query->where($this->statements['where']); |
|||
356 | } |
||||
357 | |||||
358 | 35 | if (!empty($this->statements['values']['data'])) { |
|||
359 | 9 | $query->values($this->statements['values']['data'], $this->statements['values']['types']); |
|||
360 | } |
||||
361 | |||||
362 | 35 | return $query; |
|||
363 | } |
||||
364 | |||||
365 | /** |
||||
366 | * {@inheritdoc} |
||||
367 | */ |
||||
368 | 2 | protected function cacheNamespace(): string |
|||
369 | { |
||||
370 | 2 | return $this->connection->getName().':'.$this->statements['table']; |
|||
371 | } |
||||
372 | } |
||||
373 |
Our type inference engine has found a suspicous assignment of a value to a property. This check raises an issue when a value that can be of a given class or a super-class is assigned to a property that is type hinted more strictly.
Either this assignment is in error or an instanceof check should be added for that assignment.