Passed
Push — trunk ( 05c644...7dd39b )
by Christian
10:24 queued 14s
created

ElasticsearchIndexer::__construct()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 14
Code Lines 0

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 0
nc 1
nop 12
dl 0
loc 14
rs 10
c 0
b 0
f 0

How to fix   Many Parameters   

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
<?php declare(strict_types=1);
2
3
namespace Shopware\Elasticsearch\Framework\Indexing;
4
5
use Doctrine\DBAL\Connection;
6
use OpenSearch\Client;
7
use Psr\EventDispatcher\EventDispatcherInterface;
8
use Psr\Log\LoggerInterface;
9
use Shopware\Core\Defaults;
10
use Shopware\Core\Framework\Api\Context\SystemSource;
11
use Shopware\Core\Framework\Context;
12
use Shopware\Core\Framework\DataAbstractionLayer\Dbal\Common\IteratorFactory;
13
use Shopware\Core\Framework\DataAbstractionLayer\EntityDefinition;
14
use Shopware\Core\Framework\DataAbstractionLayer\EntityRepository;
15
use Shopware\Core\Framework\DataAbstractionLayer\Search\Criteria;
16
use Shopware\Core\Framework\DataAbstractionLayer\Search\Filter\EqualsFilter;
17
use Shopware\Core\Framework\DataAbstractionLayer\Search\Filter\NandFilter;
18
use Shopware\Core\Framework\DataAbstractionLayer\Search\Sorting\FieldSorting;
19
use Shopware\Core\Framework\Log\Package;
20
use Shopware\Core\Framework\Uuid\Uuid;
21
use Shopware\Core\System\Language\LanguageCollection;
22
use Shopware\Core\System\Language\LanguageEntity;
0 ignored issues
show
Bug introduced by
The type Shopware\Core\System\Language\LanguageEntity was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
23
use Shopware\Elasticsearch\Exception\ElasticsearchIndexingException;
24
use Shopware\Elasticsearch\Framework\ElasticsearchHelper;
25
use Shopware\Elasticsearch\Framework\ElasticsearchRegistry;
26
use Shopware\Elasticsearch\Framework\Indexing\Event\ElasticsearchIndexerLanguageCriteriaEvent;
27
use Symfony\Component\Finder\Finder;
0 ignored issues
show
Bug introduced by
The type Symfony\Component\Finder\Finder was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
28
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
29
use Symfony\Component\Messenger\MessageBusInterface;
30
31
/**
32
 * @internal
33
 *
34
 * @final
35
 */
36
#[AsMessageHandler]
37
#[Package('core')]
38
class ElasticsearchIndexer
39
{
40
    /**
41
     * @internal
42
     */
43
    public function __construct(
44
        private readonly Connection $connection,
45
        private readonly ElasticsearchHelper $helper,
46
        private readonly ElasticsearchRegistry $registry,
47
        private readonly IndexCreator $indexCreator,
48
        private readonly IteratorFactory $iteratorFactory,
49
        private readonly Client $client,
50
        private readonly LoggerInterface $logger,
51
        private readonly EntityRepository $currencyRepository,
52
        private readonly EntityRepository $languageRepository,
53
        private readonly EventDispatcherInterface $eventDispatcher,
54
        private readonly int $indexingBatchSize,
55
        private readonly MessageBusInterface $bus
56
    ) {
57
    }
58
59
    public function __invoke(ElasticsearchIndexingMessage|ElasticsearchLanguageIndexIteratorMessage $message): void
60
    {
61
        if (!$this->helper->allowIndexing()) {
62
            return;
63
        }
64
65
        if ($message instanceof ElasticsearchLanguageIndexIteratorMessage) {
66
            $this->handleLanguageIndexIteratorMessage($message);
67
68
            return;
69
        }
70
71
        $this->handleIndexingMessage($message);
72
    }
73
74
    /**
75
     * @param IndexerOffset|null $offset
76
     */
77
    public function iterate($offset): ?ElasticsearchIndexingMessage
78
    {
79
        if (!$this->helper->allowIndexing()) {
80
            return null;
81
        }
82
83
        if ($offset === null) {
84
            $offset = $this->init();
85
        }
86
87
        if ($offset->getLanguageId() === null) {
88
            return null;
89
        }
90
91
        $language = $this->getLanguageForId($offset->getLanguageId());
92
93
        if (!$language) {
94
            return null;
95
        }
96
97
        $context = $this->createLanguageContext($language);
98
99
        // current language has next message?
100
        $message = $this->createIndexingMessage($offset, $context);
101
        if ($message) {
102
            return $message;
103
        }
104
105
        // all definitions in all languages indexed
106
        if (!$offset->hasNextLanguage()) {
107
            return null;
108
        }
109
110
        // all definitions are indexed in current language, start again with next language
111
        $offset->setNextLanguage();
112
        $offset->resetDefinitions();
113
        $offset->setLastId(null);
114
115
        return $this->iterate($offset);
116
    }
117
118
    /**
119
     * @param array<string> $ids
120
     */
121
    public function updateIds(EntityDefinition $definition, array $ids): void
122
    {
123
        if (!$this->helper->allowIndexing()) {
124
            return;
125
        }
126
127
        $alias = $this->helper->getIndexName($definition, Defaults::LANGUAGE_SYSTEM);
128
129
        if (!$this->client->indices()->existsAlias(['name' => $alias])) {
130
            $this->init();
131
        }
132
133
        $messages = $this->generateMessages($definition, $ids);
134
135
        /** @var ElasticsearchIndexingMessage $message */
136
        foreach ($messages as $message) {
137
            $this->__invoke($message);
138
        }
139
    }
140
141
    /**
142
     * @param array<string> $ids
143
     *
144
     * @return ElasticsearchIndexingMessage[]
145
     */
146
    private function generateMessages(EntityDefinition $definition, array $ids): array
147
    {
148
        $languages = $this->getLanguages();
149
150
        $messages = [];
151
        foreach ($languages as $language) {
152
            $context = $this->createLanguageContext($language);
153
154
            $alias = $this->helper->getIndexName($definition, $language->getId());
155
156
            $indexing = new IndexingDto($ids, $alias, $definition->getEntityName());
157
158
            $message = new ElasticsearchIndexingMessage($indexing, null, $context);
159
160
            $messages[] = $message;
161
        }
162
163
        return $messages;
164
    }
165
166
    private function createIndexingMessage(IndexerOffset $offset, Context $context): ?ElasticsearchIndexingMessage
167
    {
168
        $definition = $this->registry->get((string) $offset->getDefinition());
169
170
        if (!$definition) {
171
            throw new \RuntimeException(sprintf('Definition %s not found', $offset->getDefinition()));
172
        }
173
174
        $entity = $definition->getEntityDefinition()->getEntityName();
175
176
        $iterator = $this->iteratorFactory->createIterator($definition->getEntityDefinition(), $offset->getLastId(), $this->indexingBatchSize);
177
178
        $ids = $iterator->fetch();
179
180
        // current definition in current language has more ids to index
181
        if (!empty($ids)) {
182
            // increment last id with iterator offset
183
            $offset->setLastId($iterator->getOffset());
184
185
            $alias = $this->helper->getIndexName($definition->getEntityDefinition(), (string) $offset->getLanguageId());
186
187
            $index = $alias . '_' . $offset->getTimestamp();
188
189
            // return indexing message for current offset
190
            return new ElasticsearchIndexingMessage(new IndexingDto(array_values($ids), $index, $entity), $offset, $context);
191
        }
192
193
        if (!$offset->hasNextDefinition()) {
194
            return null;
195
        }
196
197
        // increment definition offset
198
        $offset->setNextDefinition();
199
200
        // reset last id to start iterator at the beginning
201
        $offset->setLastId(null);
202
203
        return $this->createIndexingMessage($offset, $context);
204
    }
205
206
    private function init(): IndexerOffset
207
    {
208
        $this->connection->executeStatement('DELETE FROM elasticsearch_index_task');
209
210
        $this->createScripts();
211
212
        $languages = $this->getLanguages();
213
214
        $timestamp = new \DateTime();
215
216
        foreach ($languages as $language) {
217
            $this->createLanguageIndex($language, $timestamp);
218
        }
219
220
        return new IndexerOffset(
221
            array_values($languages->getIds()),
222
            $this->registry->getDefinitions(),
223
            $timestamp->getTimestamp()
224
        );
225
    }
226
227
    /**
228
     * @param array<mixed> $result
229
     *
230
     * @return array{index: string, id: string, type: string, reason: string}[]
231
     */
232
    private function parseErrors(array $result): array
233
    {
234
        $errors = [];
235
        foreach ($result['items'] as $item) {
236
            $item = $item['index'] ?? $item['delete'];
237
238
            if (\in_array($item['status'], [200, 201], true)) {
239
                continue;
240
            }
241
242
            $errors[] = [
243
                'index' => $item['_index'],
244
                'id' => $item['_id'],
245
                'type' => $item['error']['type'] ?? $item['_type'],
246
                'reason' => $item['error']['reason'] ?? $item['result'],
247
            ];
248
249
            $this->logger->error($item['error']['reason'] ?? $item['result']);
250
        }
251
252
        return $errors;
253
    }
254
255
    private function getLanguages(): LanguageCollection
256
    {
257
        $context = Context::createDefaultContext();
258
        $criteria = new Criteria();
259
        $criteria->addFilter(new NandFilter([new EqualsFilter('salesChannels.id', null)]));
260
        $criteria->addSorting(new FieldSorting('id'));
261
262
        $this->eventDispatcher->dispatch(new ElasticsearchIndexerLanguageCriteriaEvent($criteria, $context));
263
264
        /** @var LanguageCollection $languages */
265
        $languages = $this->languageRepository
266
            ->search($criteria, $context)
267
            ->getEntities();
268
269
        return $languages;
270
    }
271
272
    private function createLanguageContext(LanguageEntity $language): Context
273
    {
274
        return new Context(
275
            new SystemSource(),
276
            [],
277
            Defaults::CURRENCY,
278
            array_filter([$language->getId(), $language->getParentId(), Defaults::LANGUAGE_SYSTEM])
279
        );
280
    }
281
282
    private function getLanguageForId(string $languageId): ?LanguageEntity
283
    {
284
        $context = Context::createDefaultContext();
285
        $criteria = new Criteria([$languageId]);
286
287
        /** @var LanguageCollection $languages */
288
        $languages = $this->languageRepository
289
            ->search($criteria, $context);
290
291
        return $languages->get($languageId);
292
    }
293
294
    private function createScripts(): void
295
    {
296
        $finder = (new Finder())
297
            ->files()
298
            ->in(__DIR__ . '/Scripts')
299
            ->name('*.groovy');
300
301
        foreach ($finder as $file) {
302
            $name = pathinfo($file->getFilename(), \PATHINFO_FILENAME);
303
304
            $this->client->putScript([
305
                'id' => $name,
306
                'body' => [
307
                    'script' => [
308
                        'lang' => 'painless',
309
                        'source' => file_get_contents($file->getPathname()),
310
                    ],
311
                ],
312
            ]);
313
        }
314
    }
315
316
    private function createLanguageIndex(LanguageEntity $language, \DateTime $timestamp): void
317
    {
318
        $context = $this->createLanguageContext($language);
319
320
        foreach ($this->registry->getDefinitions() as $definition) {
321
            $alias = $this->helper->getIndexName($definition->getEntityDefinition(), $language->getId());
322
323
            $index = $alias . '_' . $timestamp->getTimestamp();
324
325
            $hasAlias = $this->indexCreator->aliasExists($alias);
326
327
            $this->indexCreator->createIndex($definition, $index, $alias, $context);
328
329
            $iterator = $this->iteratorFactory->createIterator($definition->getEntityDefinition());
330
331
            // We don't need an index task, when it's the first indexing. This will allow alias swapping to nothing
332
            if ($hasAlias) {
333
                $this->connection->insert('elasticsearch_index_task', [
334
                    'id' => Uuid::randomBytes(),
335
                    '`entity`' => $definition->getEntityDefinition()->getEntityName(),
336
                    '`index`' => $index,
337
                    '`alias`' => $alias,
338
                    '`doc_count`' => $iterator->fetchCount(),
339
                ]);
340
            }
341
        }
342
    }
343
344
    private function handleIndexingMessage(ElasticsearchIndexingMessage $message): void
345
    {
346
        $task = $message->getData();
347
348
        $ids = $task->getIds();
349
350
        $index = $task->getIndex();
351
352
        $this->connection->executeStatement('UPDATE elasticsearch_index_task SET `doc_count` = `doc_count` - :idCount WHERE `index` = :index', [
353
            'idCount' => \count($ids),
354
            'index' => $index,
355
        ]);
356
357
        if (!$this->client->indices()->exists(['index' => $index])) {
358
            return;
359
        }
360
361
        $entity = $task->getEntity();
362
363
        $definition = $this->registry->get($entity);
364
365
        $context = $message->getContext();
366
367
        $context->addExtension('currencies', $this->currencyRepository->search(new Criteria(), Context::createDefaultContext()));
368
369
        if (!$definition) {
370
            throw new \RuntimeException(sprintf('Entity %s has no registered elasticsearch definition', $entity));
371
        }
372
373
        $data = $definition->fetch(Uuid::fromHexToBytesList($ids), $context);
374
375
        $toRemove = array_filter($ids, fn (string $id) => !isset($data[$id]));
376
377
        $documents = [];
378
        foreach ($data as $id => $document) {
379
            $documents[] = ['index' => ['_id' => $id]];
380
            $documents[] = $document;
381
        }
382
383
        foreach ($toRemove as $id) {
384
            $documents[] = ['delete' => ['_id' => $id]];
385
        }
386
387
        $arguments = [
388
            'index' => $index,
389
            'body' => $documents,
390
        ];
391
392
        $result = $this->client->bulk($arguments);
393
394
        if (\is_array($result) && isset($result['errors']) && $result['errors']) {
395
            $errors = $this->parseErrors($result);
396
397
            throw new ElasticsearchIndexingException($errors);
398
        }
399
    }
400
401
    private function handleLanguageIndexIteratorMessage(ElasticsearchLanguageIndexIteratorMessage $message): void
402
    {
403
        /** @var LanguageEntity|null $language */
404
        $language = $this->languageRepository->search(new Criteria([$message->getLanguageId()]), Context::createDefaultContext())->first();
405
406
        if ($language === null) {
407
            return;
408
        }
409
410
        $timestamp = new \DateTime();
411
        $this->createLanguageIndex($language, $timestamp);
412
413
        $offset = new IndexerOffset([$language->getId()], $this->registry->getDefinitions(), $timestamp->getTimestamp());
414
        while ($message = $this->iterate($offset)) {
415
            $offset = $message->getOffset();
416
417
            $this->bus->dispatch($message);
418
        }
419
    }
420
}
421