Passed
Push — trunk ( 544851...4bb49d )
by Christian
10:04 queued 12s
created

AdminSearchRegistry::getHandledMessages()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
eloc 2
c 1
b 0
f 0
nc 1
nop 0
dl 0
loc 4
rs 10
1
<?php declare(strict_types=1);
2
3
namespace Shopware\Elasticsearch\Admin;
4
5
use Doctrine\DBAL\Connection;
6
use OpenSearch\Client;
7
use Shopware\Core\Framework\DataAbstractionLayer\Event\EntityWrittenContainerEvent;
8
use Shopware\Core\Framework\Event\ProgressAdvancedEvent;
9
use Shopware\Core\Framework\Event\ProgressFinishedEvent;
10
use Shopware\Core\Framework\Event\ProgressStartedEvent;
11
use Shopware\Core\Framework\Uuid\Uuid;
12
use Shopware\Elasticsearch\Admin\Indexer\AbstractAdminIndexer;
13
use Shopware\Elasticsearch\Exception\ElasticsearchIndexingException;
14
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
15
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;
16
use Symfony\Component\Messenger\MessageBusInterface;
17
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
18
19
/**
20
 * @package system-settings
21
 *
22
 * @internal
23
 *
24
 * @final
25
 */
26
class AdminSearchRegistry implements MessageHandlerInterface, EventSubscriberInterface
0 ignored issues
show
Deprecated Code introduced by
The interface Symfony\Component\Messen...MessageHandlerInterface has been deprecated: since Symfony 6.2, use the {@see AsMessageHandler} attribute instead ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

26
class AdminSearchRegistry implements /** @scrutinizer ignore-deprecated */ MessageHandlerInterface, EventSubscriberInterface

This interface has been deprecated. The supplier of the interface has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the interface will be removed and what other interface to use instead.

Loading history...
27
{
28
    /**
29
     * @var array<string, mixed>
30
     */
31
    private array $indexer;
32
33
    private Connection $connection;
34
35
    private MessageBusInterface $queue;
36
37
    private EventDispatcherInterface $dispatcher;
38
39
    private Client $client;
40
41
    private AdminElasticsearchHelper $adminEsHelper;
42
43
    /**
44
     * @var array<mixed>
45
     */
46
    private array $config;
47
48
    /**
49
     * @var array<mixed>
50
     */
51
    private array $mapping;
52
53
    /**
54
     * @param AbstractAdminIndexer[] $indexer
55
     * @param array<mixed> $config
56
     * @param array<mixed> $mapping
57
     */
58
    public function __construct(
59
        $indexer,
60
        Connection $connection,
61
        MessageBusInterface $queue,
62
        EventDispatcherInterface $dispatcher,
63
        Client $client,
64
        AdminElasticsearchHelper $adminEsHelper,
65
        array $config,
66
        array $mapping
67
    ) {
68
        $this->indexer = $indexer instanceof \Traversable ? iterator_to_array($indexer) : $indexer;
0 ignored issues
show
introduced by
$indexer is never a sub-type of Traversable.
Loading history...
69
        $this->connection = $connection;
70
        $this->queue = $queue;
71
        $this->dispatcher = $dispatcher;
72
        $this->client = $client;
73
        $this->adminEsHelper = $adminEsHelper;
74
        $this->mapping = $mapping;
75
76
        if (isset($config['settings']['index'])) {
77
            if (\array_key_exists('number_of_shards', $config['settings']['index']) && $config['settings']['index']['number_of_shards'] === null) {
78
                unset($config['settings']['index']['number_of_shards']);
79
            }
80
81
            if (\array_key_exists('number_of_replicas', $config['settings']['index']) && $config['settings']['index']['number_of_replicas'] === null) {
82
                unset($config['settings']['index']['number_of_replicas']);
83
            }
84
        }
85
86
        $this->config = $config;
87
    }
88
89
    public function __invoke(AdminSearchIndexingMessage $message): void
90
    {
91
        $indexer = $this->getIndexer($message->getEntity());
92
93
        $documents = $indexer->fetch($message->getIds());
94
95
        $this->push($indexer, $message->getIndices(), $documents, $message->getIds());
96
    }
97
98
    public static function getSubscribedEvents(): array
99
    {
100
        return [
101
            EntityWrittenContainerEvent::class => [
102
                ['refresh', -1000],
103
            ],
104
        ];
105
    }
106
107
    /**
108
     * @return iterable<class-string>
109
     */
110
    public static function getHandledMessages(): iterable
111
    {
112
        return [
113
            AdminSearchIndexingMessage::class,
114
        ];
115
    }
116
117
    public function iterate(AdminIndexingBehavior $indexingBehavior): void
118
    {
119
        if ($this->adminEsHelper->getEnabled() === false) {
120
            return;
121
        }
122
123
        /** @var array<string> $entities */
124
        $entities = array_keys($this->indexer);
125
126
        if ($indexingBehavior->getOnlyEntities()) {
127
            $entities = array_intersect($entities, $indexingBehavior->getOnlyEntities());
128
        } elseif ($indexingBehavior->getSkipEntities()) {
129
            $entities = array_diff($entities, $indexingBehavior->getSkipEntities());
130
        }
131
132
        $indices = $this->createIndices($entities);
133
134
        foreach ($entities as $entityName) {
135
            $indexer = $this->getIndexer($entityName);
136
            $iterator = $indexer->getIterator();
137
138
            $this->dispatcher->dispatch(new ProgressStartedEvent($indexer->getName(), $iterator->fetchCount()));
139
140
            while ($ids = $iterator->fetch()) {
141
                // we provide no queue when the data is sent by the admin
142
                if ($indexingBehavior->getNoQueue() === true) {
143
                    $this->__invoke(new AdminSearchIndexingMessage($indexer->getEntity(), $indexer->getName(), $indices, $ids));
144
                } else {
145
                    $this->queue->dispatch(new AdminSearchIndexingMessage($indexer->getEntity(), $indexer->getName(), $indices, $ids));
146
                }
147
148
                $this->dispatcher->dispatch(new ProgressAdvancedEvent(\count($ids)));
149
            }
150
151
            $this->dispatcher->dispatch(new ProgressFinishedEvent($indexer->getName()));
152
        }
153
154
        $this->swapAlias($indices);
155
    }
156
157
    public function refresh(EntityWrittenContainerEvent $event): void
158
    {
159
        if ($this->adminEsHelper->getEnabled() === false || !$this->isIndexedEntityWritten($event)) {
160
            return;
161
        }
162
163
        if ($this->adminEsHelper->getRefreshIndices()) {
164
            $this->refreshIndices();
165
        }
166
167
        /** @var array<string, string> $indices */
168
        $indices = $this->connection->fetchAllKeyValue('SELECT `alias`, `index` FROM admin_elasticsearch_index_task');
169
170
        if (empty($indices)) {
171
            return;
172
        }
173
174
        foreach ($this->indexer as $indexer) {
175
            $ids = $event->getPrimaryKeys($indexer->getEntity());
176
177
            if (empty($ids)) {
178
                continue;
179
            }
180
            $documents = $indexer->fetch($ids);
181
182
            $this->push($indexer, $indices, $documents, $ids);
183
        }
184
    }
185
186
    /**
187
     * @return AbstractAdminIndexer[]
188
     */
189
    public function getIndexers(): iterable
190
    {
191
        return $this->indexer;
192
    }
193
194
    public function getIndexer(string $name): AbstractAdminIndexer
195
    {
196
        $indexer = $this->indexer[$name] ?? null;
197
        if ($indexer) {
198
            return $indexer;
199
        }
200
201
        throw new ElasticsearchIndexingException([\sprintf('Indexer for name %s not found', $name)]);
202
    }
203
204
    private function isIndexedEntityWritten(EntityWrittenContainerEvent $event): bool
205
    {
206
        foreach ($this->indexer as $indexer) {
207
            $ids = $event->getPrimaryKeys($indexer->getEntity());
208
209
            if (!empty($ids)) {
210
                return true;
211
            }
212
        }
213
214
        return false;
215
    }
216
217
    /**
218
     * @param array<string, string> $indices
219
     * @param array<string, array<string|int, string>> $data
220
     * @param array<string> $ids
221
     */
222
    private function push(AbstractAdminIndexer $indexer, array $indices, array $data, array $ids): void
223
    {
224
        $alias = $this->adminEsHelper->getIndex($indexer->getName());
225
226
        if (!isset($indices[$alias])) {
227
            return;
228
        }
229
230
        $toRemove = array_filter($ids, static function (string $id) use ($data): bool {
231
            return !isset($data[$id]);
232
        });
233
234
        $documents = [];
235
        foreach ($data as $id => $document) {
236
            $documents[] = ['index' => ['_id' => $id]];
237
238
            $documents[] = \array_replace(
239
                ['entityName' => $indexer->getEntity(), 'parameters' => [], 'textBoosted' => '', 'text' => ''],
240
                $document
241
            );
242
        }
243
244
        foreach ($toRemove as $id) {
245
            $documents[] = ['delete' => ['_id' => $id]];
246
        }
247
248
        $arguments = [
249
            'index' => $indices[$alias],
250
            'body' => $documents,
251
        ];
252
253
        $result = $this->client->bulk($arguments);
254
255
        if (\is_array($result) && !empty($result['errors'])) {
256
            $errors = $this->parseErrors($result);
257
258
            throw new ElasticsearchIndexingException($errors);
259
        }
260
    }
261
262
    /**
263
     * @param array<string> $entities
264
     *
265
     * @throws \Doctrine\DBAL\Exception
266
     *
267
     * @return array<string, string>
268
     */
269
    private function createIndices(array $entities): array
270
    {
271
        $indexTasks = [];
272
        $indices = [];
273
        foreach ($entities as $entityName) {
274
            $indexer = $this->getIndexer($entityName);
275
            $alias = $this->adminEsHelper->getIndex($indexer->getName());
276
            $index = $alias . '_' . time();
277
278
            if ($this->indexExists($index)) {
279
                continue;
280
            }
281
282
            $indices[$alias] = $index;
283
284
            $this->create($indexer, $index, $alias);
285
286
            $iterator = $indexer->getIterator();
287
            $indexTasks[] = [
288
                'id' => Uuid::randomBytes(),
289
                '`entity`' => $indexer->getEntity(),
290
                '`index`' => $index,
291
                '`alias`' => $alias,
292
                '`doc_count`' => $iterator->fetchCount(),
293
            ];
294
        }
295
296
        $this->connection->executeStatement(
297
            'DELETE FROM admin_elasticsearch_index_task WHERE `entity` IN (:entities)',
298
            ['entities' => $entities],
299
            ['entities' => Connection::PARAM_STR_ARRAY]
300
        );
301
302
        foreach ($indexTasks as $task) {
303
            $this->connection->insert('admin_elasticsearch_index_task', $task);
304
        }
305
306
        return $indices;
307
    }
308
309
    private function refreshIndices(): void
310
    {
311
        $entities = [];
312
        $indexTasks = [];
313
        foreach ($this->indexer as $indexer) {
314
            $alias = $this->adminEsHelper->getIndex($indexer->getName());
315
316
            if ($this->aliasExists($alias)) {
317
                continue;
318
            }
319
320
            $index = $alias . '_' . time();
321
            $this->create($indexer, $index, $alias);
322
323
            $entities[] = $indexer->getEntity();
324
325
            $iterator = $indexer->getIterator();
326
            $indexTasks[] = [
327
                'id' => Uuid::randomBytes(),
328
                '`entity`' => $indexer->getEntity(),
329
                '`index`' => $index,
330
                '`alias`' => $alias,
331
                '`doc_count`' => $iterator->fetchCount(),
332
            ];
333
        }
334
335
        $this->connection->executeStatement(
336
            'DELETE FROM admin_elasticsearch_index_task WHERE `entity` IN (:entities)',
337
            ['entities' => $entities],
338
            ['entities' => Connection::PARAM_STR_ARRAY]
339
        );
340
341
        foreach ($indexTasks as $task) {
342
            $this->connection->insert('admin_elasticsearch_index_task', $task);
343
        }
344
    }
345
346
    private function create(AbstractAdminIndexer $indexer, string $index, string $alias): void
347
    {
348
        $mapping = $indexer->mapping([
349
            'properties' => [
350
                'id' => ['type' => 'keyword'],
351
                'textBoosted' => ['type' => 'text'],
352
                'text' => ['type' => 'text'],
353
                'entityName' => ['type' => 'keyword'],
354
                'parameters' => ['type' => 'keyword'],
355
            ],
356
        ]);
357
358
        $mapping = array_merge_recursive($mapping, $this->mapping);
359
360
        $body = array_merge(
361
            $this->config,
362
            ['mappings' => $mapping]
363
        );
364
365
        $this->client->indices()->create([
366
            'index' => $index,
367
            'body' => $body,
368
        ]);
369
370
        $this->createAliasIfNotExisting($index, $alias);
371
    }
372
373
    private function indexExists(string $name): bool
374
    {
375
        return $this->client->indices()->exists(['index' => $name]);
376
    }
377
378
    private function aliasExists(string $alias): bool
379
    {
380
        return $this->client->indices()->existsAlias(['name' => $alias]);
381
    }
382
383
    /**
384
     * @param array<string, array<array<string, mixed>>> $result
385
     *
386
     * @return array<array{reason: string}|string>
387
     */
388
    private function parseErrors(array $result): array
389
    {
390
        $errors = [];
391
        foreach ($result['items'] as $item) {
392
            $item = $item['index'] ?? $item['delete'];
393
394
            if (\in_array($item['status'], [200, 201], true)) {
395
                continue;
396
            }
397
398
            $errors[] = [
399
                'index' => $item['_index'],
400
                'id' => $item['_id'],
401
                'type' => $item['error']['type'] ?? $item['_type'],
402
                'reason' => $item['error']['reason'] ?? $item['result'],
403
            ];
404
        }
405
406
        return $errors;
407
    }
408
409
    private function createAliasIfNotExisting(string $index, string $alias): void
410
    {
411
        $exist = $this->client->indices()->existsAlias(['name' => $alias]);
412
413
        if ($exist) {
414
            return;
415
        }
416
417
        $this->putAlias($index, $alias);
418
    }
419
420
    /**
421
     * @param array<string, string> $indices
422
     */
423
    private function swapAlias($indices): void
424
    {
425
        foreach ($indices as $alias => $index) {
426
            $exist = $this->client->indices()->existsAlias(['name' => $alias]);
427
428
            if (!$exist) {
429
                $this->putAlias($index, $alias);
430
431
                return;
432
            }
433
434
            $current = $this->client->indices()->getAlias(['name' => $alias]);
435
436
            if (!isset($current[$index])) {
437
                $this->putAlias($index, $alias);
438
            }
439
440
            unset($current[$index]);
441
            $current = array_keys($current);
442
443
            foreach ($current as $value) {
444
                $this->client->indices()->delete(['index' => $value]);
445
            }
446
        }
447
    }
448
449
    private function putAlias(string $index, string $alias): void
450
    {
451
        $this->client->indices()->refresh([
452
            'index' => $index,
453
        ]);
454
        $this->client->indices()->putAlias(['index' => $index, 'name' => $alias]);
455
    }
456
}
457