Passed
Push — trunk ( 05c644...7dd39b )
by Christian
10:24 queued 14s
created

AdminSearchRegistry::__construct()   B

Complexity

Conditions 7
Paths 10

Size

Total Lines 23
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 7
eloc 7
nc 10
nop 8
dl 0
loc 23
rs 8.8333
c 0
b 0
f 0

How to fix   Many Parameters   

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
<?php declare(strict_types=1);
2
3
namespace Shopware\Elasticsearch\Admin;
4
5
use Doctrine\DBAL\ArrayParameterType;
6
use Doctrine\DBAL\Connection;
7
use Doctrine\DBAL\Exception;
8
use OpenSearch\Client;
9
use Shopware\Core\Framework\DataAbstractionLayer\Event\EntityWrittenContainerEvent;
10
use Shopware\Core\Framework\Event\ProgressAdvancedEvent;
11
use Shopware\Core\Framework\Event\ProgressFinishedEvent;
12
use Shopware\Core\Framework\Event\ProgressStartedEvent;
13
use Shopware\Core\Framework\Log\Package;
14
use Shopware\Core\Framework\Uuid\Uuid;
15
use Shopware\Elasticsearch\Admin\Indexer\AbstractAdminIndexer;
16
use Shopware\Elasticsearch\Exception\ElasticsearchIndexingException;
17
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
18
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
19
use Symfony\Component\Messenger\MessageBusInterface;
20
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
21
22
/**
23
 * @internal
24
 *
25
 * @final
26
 */
27
#[Package('system-settings')]
28
#[AsMessageHandler(handles: AdminSearchIndexingMessage::class)]
29
class AdminSearchRegistry implements EventSubscriberInterface
30
{
31
    /**
32
     * @var array<string, mixed>
33
     */
34
    private readonly array $indexer;
35
36
    /**
37
     * @var array<string, mixed>
38
     */
39
    private readonly array $config;
40
41
    /**
42
     * @param array<AbstractAdminIndexer>|\Traversable<AbstractAdminIndexer> $indexer
43
     * @param array<string, mixed> $config
44
     * @param array<string, mixed> $mapping
45
     */
46
    public function __construct(
47
        $indexer,
48
        private readonly Connection $connection,
49
        private readonly MessageBusInterface $queue,
50
        private readonly EventDispatcherInterface $dispatcher,
51
        private readonly Client $client,
52
        private readonly AdminElasticsearchHelper $adminEsHelper,
53
        array $config,
54
        private readonly array $mapping
55
    ) {
56
        $this->indexer = ($indexer instanceof \Traversable) ? iterator_to_array($indexer) : $indexer;
0 ignored issues
show
Bug introduced by
The property indexer is declared read-only in Shopware\Elasticsearch\Admin\AdminSearchRegistry.
Loading history...
57
58
        if (isset($config['settings']['index'])) {
59
            if (\array_key_exists('number_of_shards', $config['settings']['index']) && $config['settings']['index']['number_of_shards'] === null) {
60
                unset($config['settings']['index']['number_of_shards']);
61
            }
62
63
            if (\array_key_exists('number_of_replicas', $config['settings']['index']) && $config['settings']['index']['number_of_replicas'] === null) {
64
                unset($config['settings']['index']['number_of_replicas']);
65
            }
66
        }
67
68
        $this->config = $config;
0 ignored issues
show
Bug introduced by
The property config is declared read-only in Shopware\Elasticsearch\Admin\AdminSearchRegistry.
Loading history...
69
    }
70
71
    public function __invoke(AdminSearchIndexingMessage $message): void
72
    {
73
        $indexer = $this->getIndexer($message->getEntity());
74
75
        $documents = $indexer->fetch($message->getIds());
76
77
        $this->push($indexer, $message->getIndices(), $documents, $message->getIds());
78
    }
79
80
    public static function getSubscribedEvents(): array
81
    {
82
        return [
83
            EntityWrittenContainerEvent::class => [
84
                ['refresh', -1000],
85
            ],
86
        ];
87
    }
88
89
    public function iterate(AdminIndexingBehavior $indexingBehavior): void
90
    {
91
        if (!$this->adminEsHelper->getEnabled()) {
92
            return;
93
        }
94
95
        /** @var array<string> $entities */
96
        $entities = array_keys($this->indexer);
97
98
        if ($indexingBehavior->getOnlyEntities()) {
99
            $entities = array_intersect($entities, $indexingBehavior->getOnlyEntities());
100
        } elseif ($indexingBehavior->getSkipEntities()) {
101
            $entities = array_diff($entities, $indexingBehavior->getSkipEntities());
102
        }
103
104
        $indices = $this->createIndices($entities);
105
106
        foreach ($entities as $entityName) {
107
            $indexer = $this->getIndexer($entityName);
108
            $iterator = $indexer->getIterator();
109
110
            $this->dispatcher->dispatch(new ProgressStartedEvent($indexer->getName(), $iterator->fetchCount()));
111
112
            while ($ids = $iterator->fetch()) {
113
                // we provide no queue when the data is sent by the admin
114
                if ($indexingBehavior->getNoQueue()) {
115
                    $this->__invoke(new AdminSearchIndexingMessage($indexer->getEntity(), $indexer->getName(), $indices, $ids));
116
                } else {
117
                    $this->queue->dispatch(new AdminSearchIndexingMessage($indexer->getEntity(), $indexer->getName(), $indices, $ids));
118
                }
119
120
                $this->dispatcher->dispatch(new ProgressAdvancedEvent(\count($ids)));
121
            }
122
123
            $this->dispatcher->dispatch(new ProgressFinishedEvent($indexer->getName()));
124
        }
125
126
        $this->swapAlias($indices);
127
    }
128
129
    public function refresh(EntityWrittenContainerEvent $event): void
130
    {
131
        if (!$this->adminEsHelper->getEnabled() || !$this->isIndexedEntityWritten($event)) {
132
            return;
133
        }
134
135
        if ($this->adminEsHelper->getRefreshIndices()) {
136
            $this->refreshIndices();
137
        }
138
139
        /** @var array<string, string> $indices */
140
        $indices = $this->connection->fetchAllKeyValue('SELECT `alias`, `index` FROM admin_elasticsearch_index_task');
141
142
        if (empty($indices)) {
143
            return;
144
        }
145
146
        foreach ($this->indexer as $indexer) {
147
            $ids = $event->getPrimaryKeys($indexer->getEntity());
148
149
            if (empty($ids)) {
150
                continue;
151
            }
152
            $documents = $indexer->fetch($ids);
153
154
            $this->push($indexer, $indices, $documents, $ids);
155
        }
156
    }
157
158
    /**
159
     * @return AbstractAdminIndexer[]
160
     */
161
    public function getIndexers(): iterable
162
    {
163
        return $this->indexer;
164
    }
165
166
    public function getIndexer(string $name): AbstractAdminIndexer
167
    {
168
        $indexer = $this->indexer[$name] ?? null;
169
        if ($indexer) {
170
            return $indexer;
171
        }
172
173
        throw new ElasticsearchIndexingException([\sprintf('Indexer for name %s not found', $name)]);
174
    }
175
176
    private function isIndexedEntityWritten(EntityWrittenContainerEvent $event): bool
177
    {
178
        foreach ($this->indexer as $indexer) {
179
            $ids = $event->getPrimaryKeys($indexer->getEntity());
180
181
            if (!empty($ids)) {
182
                return true;
183
            }
184
        }
185
186
        return false;
187
    }
188
189
    /**
190
     * @param array<string, string> $indices
191
     * @param array<string, array<string|int, string>> $data
192
     * @param array<string> $ids
193
     */
194
    private function push(AbstractAdminIndexer $indexer, array $indices, array $data, array $ids): void
195
    {
196
        $alias = $this->adminEsHelper->getIndex($indexer->getName());
197
198
        if (!isset($indices[$alias])) {
199
            return;
200
        }
201
202
        $toRemove = array_filter($ids, static fn (string $id): bool => !isset($data[$id]));
203
204
        $documents = [];
205
        foreach ($data as $id => $document) {
206
            $documents[] = ['index' => ['_id' => $id]];
207
208
            $documents[] = \array_replace(
209
                ['entityName' => $indexer->getEntity(), 'parameters' => [], 'textBoosted' => '', 'text' => ''],
210
                $document
211
            );
212
        }
213
214
        foreach ($toRemove as $id) {
215
            $documents[] = ['delete' => ['_id' => $id]];
216
        }
217
218
        $arguments = [
219
            'index' => $indices[$alias],
220
            'body' => $documents,
221
        ];
222
223
        $result = $this->client->bulk($arguments);
224
225
        if (\is_array($result) && !empty($result['errors'])) {
226
            $errors = $this->parseErrors($result);
227
228
            throw new ElasticsearchIndexingException($errors);
229
        }
230
    }
231
232
    /**
233
     * @param array<string> $entities
234
     *
235
     * @throws Exception
236
     *
237
     * @return array<string, string>
238
     */
239
    private function createIndices(array $entities): array
240
    {
241
        $indexTasks = [];
242
        $indices = [];
243
        foreach ($entities as $entityName) {
244
            $indexer = $this->getIndexer($entityName);
245
            $alias = $this->adminEsHelper->getIndex($indexer->getName());
246
            $index = $alias . '_' . time();
247
248
            if ($this->client->indices()->exists(['index' => $index])) {
249
                continue;
250
            }
251
252
            $indices[$alias] = $index;
253
254
            $this->create($indexer, $index, $alias);
255
256
            $iterator = $indexer->getIterator();
257
            $indexTasks[] = [
258
                'id' => Uuid::randomBytes(),
259
                '`entity`' => $indexer->getEntity(),
260
                '`index`' => $index,
261
                '`alias`' => $alias,
262
                '`doc_count`' => $iterator->fetchCount(),
263
            ];
264
        }
265
266
        $this->connection->executeStatement(
267
            'DELETE FROM admin_elasticsearch_index_task WHERE `entity` IN (:entities)',
268
            ['entities' => $entities],
269
            ['entities' => ArrayParameterType::STRING]
270
        );
271
272
        foreach ($indexTasks as $task) {
273
            $this->connection->insert('admin_elasticsearch_index_task', $task);
274
        }
275
276
        return $indices;
277
    }
278
279
    private function refreshIndices(): void
280
    {
281
        $entities = [];
282
        $indexTasks = [];
283
        foreach ($this->indexer as $indexer) {
284
            $alias = $this->adminEsHelper->getIndex($indexer->getName());
285
286
            if ($this->client->indices()->existsAlias(['name' => $alias])) {
287
                continue;
288
            }
289
290
            $index = $alias . '_' . time();
291
            $this->create($indexer, $index, $alias);
292
293
            $entities[] = $indexer->getEntity();
294
295
            $iterator = $indexer->getIterator();
296
            $indexTasks[] = [
297
                'id' => Uuid::randomBytes(),
298
                '`entity`' => $indexer->getEntity(),
299
                '`index`' => $index,
300
                '`alias`' => $alias,
301
                '`doc_count`' => $iterator->fetchCount(),
302
            ];
303
        }
304
305
        $this->connection->executeStatement(
306
            'DELETE FROM admin_elasticsearch_index_task WHERE `entity` IN (:entities)',
307
            ['entities' => $entities],
308
            ['entities' => ArrayParameterType::STRING]
309
        );
310
311
        foreach ($indexTasks as $task) {
312
            $this->connection->insert('admin_elasticsearch_index_task', $task);
313
        }
314
    }
315
316
    private function create(AbstractAdminIndexer $indexer, string $index, string $alias): void
317
    {
318
        $mapping = $indexer->mapping([
319
            'properties' => [
320
                'id' => ['type' => 'keyword'],
321
                'textBoosted' => ['type' => 'text'],
322
                'text' => ['type' => 'text'],
323
                'entityName' => ['type' => 'keyword'],
324
                'parameters' => ['type' => 'keyword'],
325
            ],
326
        ]);
327
328
        $mapping = array_merge_recursive($mapping, $this->mapping);
329
330
        $body = array_merge(
331
            $this->config,
332
            ['mappings' => $mapping]
333
        );
334
335
        $this->client->indices()->create([
336
            'index' => $index,
337
            'body' => $body,
338
        ]);
339
340
        $this->createAliasIfNotExisting($index, $alias);
341
    }
342
343
    /**
344
     * @param array<string, array<array<string, mixed>>> $result
345
     *
346
     * @return array<array{reason: string}|string>
347
     */
348
    private function parseErrors(array $result): array
349
    {
350
        $errors = [];
351
        foreach ($result['items'] as $item) {
352
            $item = $item['index'] ?? $item['delete'];
353
354
            if (\in_array($item['status'], [200, 201], true)) {
355
                continue;
356
            }
357
358
            $errors[] = [
359
                'index' => $item['_index'],
360
                'id' => $item['_id'],
361
                'type' => $item['error']['type'] ?? $item['_type'],
362
                'reason' => $item['error']['reason'] ?? $item['result'],
363
            ];
364
        }
365
366
        return $errors;
367
    }
368
369
    private function createAliasIfNotExisting(string $index, string $alias): void
370
    {
371
        if ($this->client->indices()->existsAlias(['name' => $alias])) {
372
            return;
373
        }
374
375
        $this->putAlias($index, $alias);
376
    }
377
378
    /**
379
     * @param array<string, string> $indices
380
     */
381
    private function swapAlias(array $indices): void
382
    {
383
        foreach ($indices as $alias => $index) {
384
            if (!$this->client->indices()->existsAlias(['name' => $alias])) {
385
                $this->putAlias($index, $alias);
386
387
                return;
388
            }
389
390
            $current = $this->client->indices()->getAlias(['name' => $alias]);
391
392
            if (!isset($current[$index])) {
393
                $this->putAlias($index, $alias);
394
            }
395
396
            unset($current[$index]);
397
            $current = array_keys($current);
398
399
            foreach ($current as $value) {
400
                $this->client->indices()->delete(['index' => $value]);
401
            }
402
        }
403
    }
404
405
    private function putAlias(string $index, string $alias): void
406
    {
407
        $this->client->indices()->refresh([
408
            'index' => $index,
409
        ]);
410
        $this->client->indices()->putAlias(['index' => $index, 'name' => $alias]);
411
    }
412
}
413