Issues (590)

src/Sharding/Query/ShardingKeyValueQuery.php (7 issues)

1
<?php
2
3
namespace Bdf\Prime\Sharding\Query;
4
5
use Bdf\Prime\Connection\ConnectionInterface;
6
use Bdf\Prime\Connection\Result\ArrayResultSet;
7
use Bdf\Prime\Connection\Result\ResultSetInterface;
8
use Bdf\Prime\Exception\ShardingException;
9
use Bdf\Prime\Query\AbstractReadCommand;
10
use Bdf\Prime\Query\Compiler\Preprocessor\DefaultPreprocessor;
11
use Bdf\Prime\Query\Compiler\Preprocessor\PreprocessorInterface;
12
use Bdf\Prime\Query\Contract\Query\KeyValueQueryInterface;
13
use Bdf\Prime\Query\Contract\ReadOperation;
14
use Bdf\Prime\Query\Contract\WriteOperation;
15
use Bdf\Prime\Query\Extension\CachableTrait;
16
use Bdf\Prime\Query\Extension\ExecutableTrait;
17
use Bdf\Prime\Sharding\Extension\ShardPicker;
18
use Bdf\Prime\Sharding\ShardingConnection;
19
20
/**
21
 * Handle simple key/value query on sharding connection
22
 * If the distribution key is found on the filters, the corresponding sharding query is used
23
 * In other case, all shards will be queried on
24
 *
25
 * @property ShardingConnection $connection
26
 *
27
 * @template R as object|array
28
 *
29
 * @implements KeyValueQueryInterface<ShardingConnection, R>
30
 * @extends AbstractReadCommand<ShardingConnection, R>
31
 */
32
class ShardingKeyValueQuery extends AbstractReadCommand implements KeyValueQueryInterface
33
{
34
    use CachableTrait;
35
    use ExecutableTrait;
36
    use ShardPicker;
37
38
    /**
39
     * @var KeyValueQueryInterface[]
40
     */
41
    private $queries = [];
42
43
44
    /**
45
     * ShardingKeyValueQuery constructor.
46
     *
47
     * @param ShardingConnection $connection
48
     * @param PreprocessorInterface|null $preprocessor
49
     */
50 36
    public function __construct(ShardingConnection $connection, ?PreprocessorInterface $preprocessor = null)
51
    {
52 36
        parent::__construct($connection, $preprocessor ?: new DefaultPreprocessor());
53
54 36
        $this->statements = [
55 36
            'where'     => [],
56 36
            'table'     => null,
57 36
            'columns'   => [],
58 36
            'aggregate' => null,
59 36
            'limit'     => null,
60 36
            'values'    => [
61 36
                'data'  => [],
62 36
                'types' => []
63 36
            ]
64 36
        ];
65
    }
66
67
    /**
68
     * {@inheritdoc}
69
     */
70 36
    public function on(ConnectionInterface $connection)
71
    {
72 36
        $this->connection = $connection;
0 ignored issues
show
Documentation Bug introduced by
$connection is of type Bdf\Prime\Connection\ConnectionInterface, but the property $connection was declared to be of type Bdf\Prime\Sharding\ShardingConnection. Are you sure that you always receive this specific sub-class here, or does it make sense to add an instanceof check?

Our type inference engine has found a suspicous assignment of a value to a property. This check raises an issue when a value that can be of a given class or a super-class is assigned to a property that is type hinted more strictly.

Either this assignment is in error or an instanceof check should be added for that assignment.

class Alien {}

class Dalek extends Alien {}

class Plot
{
    /** @var  Dalek */
    public $villain;
}

$alien = new Alien();
$plot = new Plot();
if ($alien instanceof Dalek) {
    $plot->villain = $alien;
}
Loading history...
73 36
        $this->queries    = [];
74
75 36
        return $this;
76
    }
77
78
    /**
79
     * {@inheritdoc}
80
     */
81 1
    public function project($columns = null)
82
    {
83 1
        $this->statements['columns'] = (array) $columns;
84
85 1
        return $this;
86
    }
87
88
    /**
89
     * {@inheritdoc}
90
     */
91
    public function select($columns = null)
92
    {
93
        return $this->select($columns);
94
    }
95
96
    /**
97
     * {@inheritdoc}
98
     */
99
    public function addSelect($columns)
100
    {
101
        $this->statements['columns'] = array_merge($this->statements['columns'], $columns);
102
103
        return $this;
104
    }
105
106
    /**
107
     * {@inheritdoc}
108
     */
109 35
    public function from(string $from, ?string $alias = null)
110
    {
111 35
        $this->statements['table'] = $from;
112
113 35
        return $this;
114
    }
115
116
    /**
117
     * {@inheritdoc}
118
     */
119 28
    public function where($field, $value = null)
120
    {
121 28
        if (is_array($field)) {
122 14
            $this->statements['where'] = $field + $this->statements['where'];
123
        } else {
124 14
            $this->statements['where'][$field] = $value;
125
        }
126
127 28
        return $this;
128
    }
129
130
    /**
131
     * {@inheritdoc}
132
     */
133 9
    public function values(array $values = [], array $types = [])
134
    {
135 9
        $this->statements['values'] = [
136 9
            'data'  => $values,
137 9
            'types' => $types,
138 9
        ];
139
140 9
        return $this;
141
    }
142
143
    /**
144
     * {@inheritdoc}
145
     *
146
     * @internal Use internally for optimise "first" query. The offset parameter is not used
147
     *
148
     * @return static<R>
149
     */
150 13
    public function limit(?int $limit, ?int $offset = null)
0 ignored issues
show
The parameter $offset is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

150
    public function limit(?int $limit, /** @scrutinizer ignore-unused */ ?int $offset = null)

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
151
    {
152 13
        $this->statements['limit'] = $limit;
153
154 13
        return $this;
155
    }
156
157
    /**
158
     * {@inheritdoc}
159
     */
160
    #[ReadOperation]
161 7
    public function count(?string $column = null): int
162
    {
163 7
        return (int) array_sum($this->aggregate(__FUNCTION__, $column));
164
    }
165
166
    /**
167
     * {@inheritdoc}
168
     */
169
    #[ReadOperation]
170
    public function avg(?string $column = null): float
171
    {
172
        $results = $this->aggregate(__FUNCTION__, $column);
173
174
        return (float) array_sum($results) / count($results);
175
    }
176
177
    /**
178
     * {@inheritdoc}
179
     */
180
    #[ReadOperation]
181 1
    public function min(?string $column = null)
182
    {
183 1
        return min($this->aggregate(__FUNCTION__, $column));
184
    }
185
186
    /**
187
     * {@inheritdoc}
188
     */
189
    #[ReadOperation]
190 1
    public function max(?string $column = null)
191
    {
192 1
        return max($this->aggregate(__FUNCTION__, $column));
193
    }
194
195
    /**
196
     * {@inheritdoc}
197
     */
198
    #[ReadOperation]
199 1
    public function sum(?string $column = null): float
200
    {
201 1
        return (float) array_sum($this->aggregate(__FUNCTION__, $column));
202
    }
203
204
    /**
205
     * {@inheritdoc}
206
     *
207
     * @return array
208
     */
209
    #[ReadOperation]
210 7
    public function aggregate(string $function, ?string $column = null): array
211
    {
212 7
        $results = [];
213
214 7
        foreach ($this->selectQueries() as $query) {
215 7
            $results[] = $query->aggregate($function, $column);
216
        }
217
218 7
        return $results;
219
    }
220
221
    /**
222
     * {@inheritdoc}
223
     *
224
     * @todo execute cached with closure
225
     */
226
    #[ReadOperation]
227 20
    public function execute($columns = null): ResultSetInterface
228
    {
229 20
        $results = [];
230 20
        $limit = $this->statements['limit'];
231
232 20
        foreach ($this->selectQueries() as $query) {
233 20
            $results = array_merge($results, $query->execute($columns)->all());
234
235 20
            if ($limit) {
236 13
                $count = count($results);
237
238 13
                if ($count == $limit) {
239 10
                    break;
240
                }
241
242 4
                if ($count > $limit) {
243 1
                    $results = array_slice($results, 0, $limit);
244 1
                    break;
245
                }
246
            }
247
        }
248
249 20
        return new ArrayResultSet($results);
0 ignored issues
show
$results of type array is incompatible with the type Bdf\Prime\Connection\Result\list expected by parameter $array of Bdf\Prime\Connection\Res...esultSet::__construct(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

249
        return new ArrayResultSet(/** @scrutinizer ignore-type */ $results);
Loading history...
250
    }
251
252
    /**
253
     * {@inheritdoc}
254
     */
255
    #[WriteOperation]
256 12
    public function update($values = null): int
257
    {
258 12
        $count = 0;
259
260 12
        foreach ($this->selectQueries() as $query) {
261 12
            $count += $query->update($values);
262
        }
263
264 12
        if ($count > 0) {
265 9
            $this->clearCacheOnWrite();
266
        }
267
268 12
        return $count;
269
    }
270
271
    /**
272
     * {@inheritdoc}
273
     */
274
    #[WriteOperation]
275 5
    public function delete(): int
276
    {
277 5
        $count = 0;
278
279 5
        foreach ($this->selectQueries() as $query) {
280 5
            $count += $query->delete();
281
        }
282
283 5
        if ($count > 0) {
284 4
            $this->clearCacheOnWrite();
285
        }
286
287 5
        return $count;
288
    }
289
290
    /**
291
     * Select the queries to use
292
     *
293
     * @return iterable<KeyValueQueryInterface>
294
     *
295
     * @throws ShardingException
296
     */
297 35
    private function selectQueries(): iterable
298
    {
299 35
        foreach ($this->getShardIds() as $shardId) {
300 35
            yield $this->getQueryByShard($shardId);
301
        }
302
    }
303
304
    /**
305
     * Get the targeted shard IDs
306
     *
307
     * @return list<string>
0 ignored issues
show
The type Bdf\Prime\Sharding\Query\list was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
308
     */
309 35
    private function getShardIds(): array
310
    {
311 35
        if ($this->shardId !== null) {
312 1
            return [$this->shardId];
0 ignored issues
show
Bug Best Practice introduced by
The expression return array($this->shardId) returns the type array<integer,string> which is incompatible with the documented return type Bdf\Prime\Sharding\Query\list.
Loading history...
313
        }
314
315 34
        $distributionKey = $this->connection->getDistributionKey();
316
317 34
        if (isset($this->statements['where'][$distributionKey])) {
318 24
            return [$this->connection->getShardChoser()->pick($this->statements['where'][$distributionKey], $this->connection)];
0 ignored issues
show
Bug Best Practice introduced by
The expression return array($this->conn...y], $this->connection)) returns the type array<integer,string> which is incompatible with the documented return type Bdf\Prime\Sharding\Query\list.
Loading history...
319
        }
320
321 10
        return $this->connection->getShardIds();
0 ignored issues
show
Bug Best Practice introduced by
The expression return $this->connection->getShardIds() returns the type array which is incompatible with the documented return type Bdf\Prime\Sharding\Query\list.
Loading history...
322
    }
323
324
    /**
325
     * Get and configure a query for the given shard
326
     *
327
     * @param string $shardId The shard id
328
     *
329
     * @return KeyValueQueryInterface
330
     *
331
     * @throws ShardingException
332
     */
333 35
    private function getQueryByShard($shardId)
334
    {
335 35
        if (isset($this->queries[$shardId])) {
336 8
            $query = $this->queries[$shardId];
337
        } else {
338
            /** @var KeyValueQueryInterface $query */
339 35
            $this->queries[$shardId] = $query = $this->connection->getShardConnection($shardId)->make(KeyValueQueryInterface::class, $this->preprocessor());
340 35
            $query->setExtension($this->extension);
341
        }
342
343 35
        $query->from($this->statements['table']);
344
345 35
        if (!empty($this->statements['limit'])) {
346
            /** @var KeyValueQueryInterface&\Bdf\Prime\Query\Contract\Limitable $query */
347 13
            $query->limit($this->statements['limit']);
348
        }
349
350 35
        if (!empty($this->statements['columns'])) {
351 1
            $query->project($this->statements['columns']);
352
        }
353
354 35
        if (!empty($this->statements['where'])) {
355 28
            $query->where($this->statements['where']);
356
        }
357
358 35
        if (!empty($this->statements['values']['data'])) {
359 9
            $query->values($this->statements['values']['data'], $this->statements['values']['types']);
360
        }
361
362 35
        return $query;
363
    }
364
365
    /**
366
     * {@inheritdoc}
367
     */
368 2
    protected function cacheNamespace(): string
369
    {
370 2
        return $this->connection->getName().':'.$this->statements['table'];
371
    }
372
}
373