Passed
Push — trunk ( 05c644...7dd39b )
by Christian
10:24 queued 14s
created

ElasticsearchIndexer::getCurrencies()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 1
nc 1
nop 0
dl 0
loc 3
rs 10
c 0
b 0
f 0
1
<?php declare(strict_types=1);
2
3
namespace Shopware\Elasticsearch\Framework\Indexing;
4
5
use Doctrine\DBAL\Connection;
6
use OpenSearch\Client;
7
use Psr\EventDispatcher\EventDispatcherInterface;
8
use Psr\Log\LoggerInterface;
9
use Shopware\Core\Defaults;
10
use Shopware\Core\Framework\Api\Context\SystemSource;
11
use Shopware\Core\Framework\Context;
12
use Shopware\Core\Framework\DataAbstractionLayer\Dbal\Common\IteratorFactory;
13
use Shopware\Core\Framework\DataAbstractionLayer\EntityDefinition;
14
use Shopware\Core\Framework\DataAbstractionLayer\EntityRepository;
15
use Shopware\Core\Framework\DataAbstractionLayer\Search\Criteria;
16
use Shopware\Core\Framework\DataAbstractionLayer\Search\Filter\EqualsFilter;
17
use Shopware\Core\Framework\DataAbstractionLayer\Search\Filter\NandFilter;
18
use Shopware\Core\Framework\DataAbstractionLayer\Search\Sorting\FieldSorting;
19
use Shopware\Core\Framework\Log\Package;
20
use Shopware\Core\Framework\Uuid\Uuid;
21
use Shopware\Core\System\Language\LanguageCollection;
22
use Shopware\Core\System\Language\LanguageEntity;
0 ignored issues
show
Bug introduced by
The type Shopware\Core\System\Language\LanguageEntity was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
23
use Shopware\Elasticsearch\Exception\ElasticsearchIndexingException;
24
use Shopware\Elasticsearch\Framework\ElasticsearchHelper;
25
use Shopware\Elasticsearch\Framework\ElasticsearchRegistry;
26
use Shopware\Elasticsearch\Framework\Indexing\Event\ElasticsearchIndexerLanguageCriteriaEvent;
27
use Symfony\Component\Finder\Finder;
0 ignored issues
show
Bug introduced by
The type Symfony\Component\Finder\Finder was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
28
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
29
use Symfony\Component\Messenger\MessageBusInterface;
30
31
/**
32
 * @internal
33
 *
34
 * @final
35
 */
36
#[AsMessageHandler]
37
#[Package('core')]
38
class ElasticsearchIndexer
39
{
40
    /**
41
     * @internal
42
     */
43
    public function __construct(
44
        private readonly Connection $connection,
45
        private readonly ElasticsearchHelper $helper,
46
        private readonly ElasticsearchRegistry $registry,
47
        private readonly IndexCreator $indexCreator,
48
        private readonly IteratorFactory $iteratorFactory,
49
        private readonly Client $client,
50
        private readonly LoggerInterface $logger,
51
        private readonly EntityRepository $currencyRepository,
52
        private readonly EntityRepository $languageRepository,
53
        private readonly EventDispatcherInterface $eventDispatcher,
54
        private readonly int $indexingBatchSize,
55
        private readonly MessageBusInterface $bus
56
    ) {
57
    }
58
59
    public function __invoke(ElasticsearchIndexingMessage|ElasticsearchLanguageIndexIteratorMessage $message): void
60
    {
61
        if (!$this->helper->allowIndexing()) {
62
            return;
63
        }
64
65
        if ($message instanceof ElasticsearchLanguageIndexIteratorMessage) {
66
            $this->handleLanguageIndexIteratorMessage($message);
67
68
            return;
69
        }
70
71
        $this->handleIndexingMessage($message);
72
    }
73
74
    /**
75
     * @param IndexerOffset|null $offset
76
     */
77
    public function iterate($offset): ?ElasticsearchIndexingMessage
78
    {
79
        if (!$this->helper->allowIndexing()) {
80
            return null;
81
        }
82
83
        if ($offset === null) {
84
            $offset = $this->init();
85
        }
86
87
        if ($offset->getLanguageId() === null) {
88
            return null;
89
        }
90
91
        $language = $this->getLanguageForId($offset->getLanguageId());
92
93
        if (!$language) {
94
            return null;
95
        }
96
97
        $context = $this->createLanguageContext($language);
98
99
        // current language has next message?
100
        $message = $this->createIndexingMessage($offset, $context);
101
        if ($message) {
102
            return $message;
103
        }
104
105
        // all definitions in all languages indexed
106
        if (!$offset->hasNextLanguage()) {
107
            return null;
108
        }
109
110
        // all definitions are indexed in current language, start again with next language
111
        $offset->setNextLanguage();
112
        $offset->resetDefinitions();
113
        $offset->setLastId(null);
114
115
        return $this->iterate($offset);
116
    }
117
118
    /**
119
     * @param array<string> $ids
120
     */
121
    public function updateIds(EntityDefinition $definition, array $ids): void
122
    {
123
        if (!$this->helper->allowIndexing()) {
124
            return;
125
        }
126
127
        $alias = $this->helper->getIndexName($definition, Defaults::LANGUAGE_SYSTEM);
128
129
        if (!$this->client->indices()->existsAlias(['name' => $alias])) {
130
            $this->init();
131
        }
132
133
        $messages = $this->generateMessages($definition, $ids);
134
135
        /** @var ElasticsearchIndexingMessage $message */
136
        foreach ($messages as $message) {
137
            $this->__invoke($message);
138
        }
139
    }
140
141
    /**
142
     * @param array<string> $ids
143
     *
144
     * @return ElasticsearchIndexingMessage[]
145
     */
146
    private function generateMessages(EntityDefinition $definition, array $ids): array
147
    {
148
        $languages = $this->getLanguages();
149
150
        $messages = [];
151
        foreach ($languages as $language) {
152
            $context = $this->createLanguageContext($language);
153
154
            $alias = $this->helper->getIndexName($definition, $language->getId());
155
156
            $indexing = new IndexingDto($ids, $alias, $definition->getEntityName());
157
158
            $message = new ElasticsearchIndexingMessage($indexing, null, $context);
159
160
            $messages[] = $message;
161
        }
162
163
        return $messages;
164
    }
165
166
    private function createIndexingMessage(IndexerOffset $offset, Context $context): ?ElasticsearchIndexingMessage
167
    {
168
        $definition = $this->registry->get((string) $offset->getDefinition());
169
170
        if (!$definition) {
171
            throw new \RuntimeException(sprintf('Definition %s not found', $offset->getDefinition()));
172
        }
173
174
        $entity = $definition->getEntityDefinition()->getEntityName();
175
176
        $iterator = $this->iteratorFactory->createIterator($definition->getEntityDefinition(), $offset->getLastId(), $this->indexingBatchSize);
177
178
        $ids = $iterator->fetch();
179
180
        // current definition in current language has more ids to index
181
        if (!empty($ids)) {
182
            // increment last id with iterator offset
183
            $offset->setLastId($iterator->getOffset());
184
185
            $alias = $this->helper->getIndexName($definition->getEntityDefinition(), (string) $offset->getLanguageId());
186
187
            $index = $alias . '_' . $offset->getTimestamp();
188
189
            // return indexing message for current offset
190
            return new ElasticsearchIndexingMessage(new IndexingDto(array_values($ids), $index, $entity), $offset, $context);
191
        }
192
193
        if (!$offset->hasNextDefinition()) {
194
            return null;
195
        }
196
197
        // increment definition offset
198
        $offset->setNextDefinition();
199
200
        // reset last id to start iterator at the beginning
201
        $offset->setLastId(null);
202
203
        return $this->createIndexingMessage($offset, $context);
204
    }
205
206
    private function init(): IndexerOffset
207
    {
208
        $this->connection->executeStatement('DELETE FROM elasticsearch_index_task');
209
210
        $this->createScripts();
211
212
        $languages = $this->getLanguages();
213
214
        $timestamp = new \DateTime();
215
216
        foreach ($languages as $language) {
217
            $this->createLanguageIndex($language, $timestamp);
218
        }
219
220
        return new IndexerOffset(
221
            array_values($languages->getIds()),
222
            $this->registry->getDefinitions(),
223
            $timestamp->getTimestamp()
224
        );
225
    }
226
227
    /**
228
     * @param array<mixed> $result
229
     *
230
     * @return array{index: string, id: string, type: string, reason: string}[]
231
     */
232
    private function parseErrors(array $result): array
233
    {
234
        $errors = [];
235
        foreach ($result['items'] as $item) {
236
            $item = $item['index'] ?? $item['delete'];
237
238
            if (\in_array($item['status'], [200, 201], true)) {
239
                continue;
240
            }
241
242
            $errors[] = [
243
                'index' => $item['_index'],
244
                'id' => $item['_id'],
245
                'type' => $item['error']['type'] ?? $item['_type'],
246
                'reason' => $item['error']['reason'] ?? $item['result'],
247
            ];
248
249
            $this->logger->error($item['error']['reason'] ?? $item['result']);
250
        }
251
252
        return $errors;
253
    }
254
255
    private function getLanguages(): LanguageCollection
256
    {
257
        $context = Context::createDefaultContext();
258
        $criteria = new Criteria();
259
        $criteria->addFilter(new NandFilter([new EqualsFilter('salesChannels.id', null)]));
260
        $criteria->addSorting(new FieldSorting('id'));
261
262
        $this->eventDispatcher->dispatch(new ElasticsearchIndexerLanguageCriteriaEvent($criteria, $context));
263
264
        /** @var LanguageCollection $languages */
265
        $languages = $this->languageRepository
266
            ->search($criteria, $context)
267
            ->getEntities();
268
269
        return $languages;
270
    }
271
272
    private function createLanguageContext(LanguageEntity $language): Context
273
    {
274
        return new Context(
275
            new SystemSource(),
276
            [],
277
            Defaults::CURRENCY,
278
            array_filter([$language->getId(), $language->getParentId(), Defaults::LANGUAGE_SYSTEM])
279
        );
280
    }
281
282
    private function getLanguageForId(string $languageId): ?LanguageEntity
283
    {
284
        $context = Context::createDefaultContext();
285
        $criteria = new Criteria([$languageId]);
286
287
        /** @var LanguageCollection $languages */
288
        $languages = $this->languageRepository
289
            ->search($criteria, $context);
290
291
        return $languages->get($languageId);
292
    }
293
294
    private function createScripts(): void
295
    {
296
        $finder = (new Finder())
297
            ->files()
298
            ->in(__DIR__ . '/Scripts')
299
            ->name('*.groovy');
300
301
        foreach ($finder as $file) {
302
            $name = pathinfo($file->getFilename(), \PATHINFO_FILENAME);
303
304
            $this->client->putScript([
305
                'id' => $name,
306
                'body' => [
307
                    'script' => [
308
                        'lang' => 'painless',
309
                        'source' => file_get_contents($file->getPathname()),
310
                    ],
311
                ],
312
            ]);
313
        }
314
    }
315
316
    private function createLanguageIndex(LanguageEntity $language, \DateTime $timestamp): void
317
    {
318
        $context = $this->createLanguageContext($language);
319
320
        foreach ($this->registry->getDefinitions() as $definition) {
321
            $alias = $this->helper->getIndexName($definition->getEntityDefinition(), $language->getId());
322
323
            $index = $alias . '_' . $timestamp->getTimestamp();
324
325
            $hasAlias = $this->indexCreator->aliasExists($alias);
326
327
            $this->indexCreator->createIndex($definition, $index, $alias, $context);
328
329
            $iterator = $this->iteratorFactory->createIterator($definition->getEntityDefinition());
330
331
            // We don't need an index task, when it's the first indexing. This will allow alias swapping to nothing
332
            if ($hasAlias) {
333
                $this->connection->insert('elasticsearch_index_task', [
334
                    'id' => Uuid::randomBytes(),
335
                    '`entity`' => $definition->getEntityDefinition()->getEntityName(),
336
                    '`index`' => $index,
337
                    '`alias`' => $alias,
338
                    '`doc_count`' => $iterator->fetchCount(),
339
                ]);
340
            }
341
        }
342
    }
343
344
    private function handleIndexingMessage(ElasticsearchIndexingMessage $message): void
345
    {
346
        $task = $message->getData();
347
348
        $ids = $task->getIds();
349
350
        $index = $task->getIndex();
351
352
        $this->connection->executeStatement('UPDATE elasticsearch_index_task SET `doc_count` = `doc_count` - :idCount WHERE `index` = :index', [
353
            'idCount' => \count($ids),
354
            'index' => $index,
355
        ]);
356
357
        if (!$this->client->indices()->exists(['index' => $index])) {
358
            return;
359
        }
360
361
        $entity = $task->getEntity();
362
363
        $definition = $this->registry->get($entity);
364
365
        $context = $message->getContext();
366
367
        $context->addExtension('currencies', $this->currencyRepository->search(new Criteria(), Context::createDefaultContext()));
368
369
        if (!$definition) {
370
            throw new \RuntimeException(sprintf('Entity %s has no registered elasticsearch definition', $entity));
371
        }
372
373
        $data = $definition->fetch(Uuid::fromHexToBytesList($ids), $context);
374
375
        $toRemove = array_filter($ids, fn (string $id) => !isset($data[$id]));
376
377
        $documents = [];
378
        foreach ($data as $id => $document) {
379
            $documents[] = ['index' => ['_id' => $id]];
380
            $documents[] = $document;
381
        }
382
383
        foreach ($toRemove as $id) {
384
            $documents[] = ['delete' => ['_id' => $id]];
385
        }
386
387
        $arguments = [
388
            'index' => $index,
389
            'body' => $documents,
390
        ];
391
392
        $result = $this->client->bulk($arguments);
393
394
        if (\is_array($result) && isset($result['errors']) && $result['errors']) {
395
            $errors = $this->parseErrors($result);
396
397
            throw new ElasticsearchIndexingException($errors);
398
        }
399
    }
400
401
    private function handleLanguageIndexIteratorMessage(ElasticsearchLanguageIndexIteratorMessage $message): void
402
    {
403
        /** @var LanguageEntity|null $language */
404
        $language = $this->languageRepository->search(new Criteria([$message->getLanguageId()]), Context::createDefaultContext())->first();
405
406
        if ($language === null) {
407
            return;
408
        }
409
410
        $timestamp = new \DateTime();
411
        $this->createLanguageIndex($language, $timestamp);
412
413
        $offset = new IndexerOffset([$language->getId()], $this->registry->getDefinitions(), $timestamp->getTimestamp());
414
        while ($message = $this->iterate($offset)) {
415
            $offset = $message->getOffset();
416
417
            $this->bus->dispatch($message);
418
        }
419
    }
420
}
421