Passed
Push — trunk ( 05c644...7dd39b )
by Christian
10:24 queued 14s
created

AdminSearchRegistry::aliasExists()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 1
nc 1
nop 1
dl 0
loc 3
rs 10
c 0
b 0
f 0
1
<?php declare(strict_types=1);
2
3
namespace Shopware\Elasticsearch\Admin;
4
5
use Doctrine\DBAL\ArrayParameterType;
6
use Doctrine\DBAL\Connection;
7
use Doctrine\DBAL\Exception;
8
use OpenSearch\Client;
9
use Shopware\Core\Framework\DataAbstractionLayer\Event\EntityWrittenContainerEvent;
10
use Shopware\Core\Framework\Event\ProgressAdvancedEvent;
11
use Shopware\Core\Framework\Event\ProgressFinishedEvent;
12
use Shopware\Core\Framework\Event\ProgressStartedEvent;
13
use Shopware\Core\Framework\Log\Package;
14
use Shopware\Core\Framework\Uuid\Uuid;
15
use Shopware\Elasticsearch\Admin\Indexer\AbstractAdminIndexer;
16
use Shopware\Elasticsearch\Exception\ElasticsearchIndexingException;
17
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
18
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
19
use Symfony\Component\Messenger\MessageBusInterface;
20
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
21
22
/**
23
 * @internal
24
 *
25
 * @final
26
 */
27
#[Package('system-settings')]
28
#[AsMessageHandler(handles: AdminSearchIndexingMessage::class)]
29
class AdminSearchRegistry implements EventSubscriberInterface
30
{
31
    /**
32
     * @var array<string, mixed>
33
     */
34
    private readonly array $indexer;
35
36
    /**
37
     * @var array<string, mixed>
38
     */
39
    private readonly array $config;
40
41
    /**
42
     * @param array<AbstractAdminIndexer>|\Traversable<AbstractAdminIndexer> $indexer
43
     * @param array<string, mixed> $config
44
     * @param array<string, mixed> $mapping
45
     */
46
    public function __construct(
47
        $indexer,
48
        private readonly Connection $connection,
49
        private readonly MessageBusInterface $queue,
50
        private readonly EventDispatcherInterface $dispatcher,
51
        private readonly Client $client,
52
        private readonly AdminElasticsearchHelper $adminEsHelper,
53
        array $config,
54
        private readonly array $mapping
55
    ) {
56
        $this->indexer = ($indexer instanceof \Traversable) ? iterator_to_array($indexer) : $indexer;
0 ignored issues
show
Bug introduced by
The property indexer is declared read-only in Shopware\Elasticsearch\Admin\AdminSearchRegistry.
Loading history...
57
58
        if (isset($config['settings']['index'])) {
59
            if (\array_key_exists('number_of_shards', $config['settings']['index']) && $config['settings']['index']['number_of_shards'] === null) {
60
                unset($config['settings']['index']['number_of_shards']);
61
            }
62
63
            if (\array_key_exists('number_of_replicas', $config['settings']['index']) && $config['settings']['index']['number_of_replicas'] === null) {
64
                unset($config['settings']['index']['number_of_replicas']);
65
            }
66
        }
67
68
        $this->config = $config;
0 ignored issues
show
Bug introduced by
The property config is declared read-only in Shopware\Elasticsearch\Admin\AdminSearchRegistry.
Loading history...
69
    }
70
71
    public function __invoke(AdminSearchIndexingMessage $message): void
72
    {
73
        $indexer = $this->getIndexer($message->getEntity());
74
75
        $documents = $indexer->fetch($message->getIds());
76
77
        $this->push($indexer, $message->getIndices(), $documents, $message->getIds());
78
    }
79
80
    public static function getSubscribedEvents(): array
81
    {
82
        return [
83
            EntityWrittenContainerEvent::class => [
84
                ['refresh', -1000],
85
            ],
86
        ];
87
    }
88
89
    public function iterate(AdminIndexingBehavior $indexingBehavior): void
90
    {
91
        if (!$this->adminEsHelper->getEnabled()) {
92
            return;
93
        }
94
95
        /** @var array<string> $entities */
96
        $entities = array_keys($this->indexer);
97
98
        if ($indexingBehavior->getOnlyEntities()) {
99
            $entities = array_intersect($entities, $indexingBehavior->getOnlyEntities());
100
        } elseif ($indexingBehavior->getSkipEntities()) {
101
            $entities = array_diff($entities, $indexingBehavior->getSkipEntities());
102
        }
103
104
        $indices = $this->createIndices($entities);
105
106
        foreach ($entities as $entityName) {
107
            $indexer = $this->getIndexer($entityName);
108
            $iterator = $indexer->getIterator();
109
110
            $this->dispatcher->dispatch(new ProgressStartedEvent($indexer->getName(), $iterator->fetchCount()));
111
112
            while ($ids = $iterator->fetch()) {
113
                // we provide no queue when the data is sent by the admin
114
                if ($indexingBehavior->getNoQueue()) {
115
                    $this->__invoke(new AdminSearchIndexingMessage($indexer->getEntity(), $indexer->getName(), $indices, $ids));
116
                } else {
117
                    $this->queue->dispatch(new AdminSearchIndexingMessage($indexer->getEntity(), $indexer->getName(), $indices, $ids));
118
                }
119
120
                $this->dispatcher->dispatch(new ProgressAdvancedEvent(\count($ids)));
121
            }
122
123
            $this->dispatcher->dispatch(new ProgressFinishedEvent($indexer->getName()));
124
        }
125
126
        $this->swapAlias($indices);
127
    }
128
129
    public function refresh(EntityWrittenContainerEvent $event): void
130
    {
131
        if (!$this->adminEsHelper->getEnabled() || !$this->isIndexedEntityWritten($event)) {
132
            return;
133
        }
134
135
        if ($this->adminEsHelper->getRefreshIndices()) {
136
            $this->refreshIndices();
137
        }
138
139
        /** @var array<string, string> $indices */
140
        $indices = $this->connection->fetchAllKeyValue('SELECT `alias`, `index` FROM admin_elasticsearch_index_task');
141
142
        if (empty($indices)) {
143
            return;
144
        }
145
146
        foreach ($this->indexer as $indexer) {
147
            $ids = $event->getPrimaryKeys($indexer->getEntity());
148
149
            if (empty($ids)) {
150
                continue;
151
            }
152
            $documents = $indexer->fetch($ids);
153
154
            $this->push($indexer, $indices, $documents, $ids);
155
        }
156
    }
157
158
    /**
159
     * @return AbstractAdminIndexer[]
160
     */
161
    public function getIndexers(): iterable
162
    {
163
        return $this->indexer;
164
    }
165
166
    public function getIndexer(string $name): AbstractAdminIndexer
167
    {
168
        $indexer = $this->indexer[$name] ?? null;
169
        if ($indexer) {
170
            return $indexer;
171
        }
172
173
        throw new ElasticsearchIndexingException([\sprintf('Indexer for name %s not found', $name)]);
174
    }
175
176
    private function isIndexedEntityWritten(EntityWrittenContainerEvent $event): bool
177
    {
178
        foreach ($this->indexer as $indexer) {
179
            $ids = $event->getPrimaryKeys($indexer->getEntity());
180
181
            if (!empty($ids)) {
182
                return true;
183
            }
184
        }
185
186
        return false;
187
    }
188
189
    /**
190
     * @param array<string, string> $indices
191
     * @param array<string, array<string|int, string>> $data
192
     * @param array<string> $ids
193
     */
194
    private function push(AbstractAdminIndexer $indexer, array $indices, array $data, array $ids): void
195
    {
196
        $alias = $this->adminEsHelper->getIndex($indexer->getName());
197
198
        if (!isset($indices[$alias])) {
199
            return;
200
        }
201
202
        $toRemove = array_filter($ids, static fn (string $id): bool => !isset($data[$id]));
203
204
        $documents = [];
205
        foreach ($data as $id => $document) {
206
            $documents[] = ['index' => ['_id' => $id]];
207
208
            $documents[] = \array_replace(
209
                ['entityName' => $indexer->getEntity(), 'parameters' => [], 'textBoosted' => '', 'text' => ''],
210
                $document
211
            );
212
        }
213
214
        foreach ($toRemove as $id) {
215
            $documents[] = ['delete' => ['_id' => $id]];
216
        }
217
218
        $arguments = [
219
            'index' => $indices[$alias],
220
            'body' => $documents,
221
        ];
222
223
        $result = $this->client->bulk($arguments);
224
225
        if (\is_array($result) && !empty($result['errors'])) {
226
            $errors = $this->parseErrors($result);
227
228
            throw new ElasticsearchIndexingException($errors);
229
        }
230
    }
231
232
    /**
233
     * @param array<string> $entities
234
     *
235
     * @throws Exception
236
     *
237
     * @return array<string, string>
238
     */
239
    private function createIndices(array $entities): array
240
    {
241
        $indexTasks = [];
242
        $indices = [];
243
        foreach ($entities as $entityName) {
244
            $indexer = $this->getIndexer($entityName);
245
            $alias = $this->adminEsHelper->getIndex($indexer->getName());
246
            $index = $alias . '_' . time();
247
248
            if ($this->client->indices()->exists(['index' => $index])) {
249
                continue;
250
            }
251
252
            $indices[$alias] = $index;
253
254
            $this->create($indexer, $index, $alias);
255
256
            $iterator = $indexer->getIterator();
257
            $indexTasks[] = [
258
                'id' => Uuid::randomBytes(),
259
                '`entity`' => $indexer->getEntity(),
260
                '`index`' => $index,
261
                '`alias`' => $alias,
262
                '`doc_count`' => $iterator->fetchCount(),
263
            ];
264
        }
265
266
        $this->connection->executeStatement(
267
            'DELETE FROM admin_elasticsearch_index_task WHERE `entity` IN (:entities)',
268
            ['entities' => $entities],
269
            ['entities' => ArrayParameterType::STRING]
270
        );
271
272
        foreach ($indexTasks as $task) {
273
            $this->connection->insert('admin_elasticsearch_index_task', $task);
274
        }
275
276
        return $indices;
277
    }
278
279
    private function refreshIndices(): void
280
    {
281
        $entities = [];
282
        $indexTasks = [];
283
        foreach ($this->indexer as $indexer) {
284
            $alias = $this->adminEsHelper->getIndex($indexer->getName());
285
286
            if ($this->client->indices()->existsAlias(['name' => $alias])) {
287
                continue;
288
            }
289
290
            $index = $alias . '_' . time();
291
            $this->create($indexer, $index, $alias);
292
293
            $entities[] = $indexer->getEntity();
294
295
            $iterator = $indexer->getIterator();
296
            $indexTasks[] = [
297
                'id' => Uuid::randomBytes(),
298
                '`entity`' => $indexer->getEntity(),
299
                '`index`' => $index,
300
                '`alias`' => $alias,
301
                '`doc_count`' => $iterator->fetchCount(),
302
            ];
303
        }
304
305
        $this->connection->executeStatement(
306
            'DELETE FROM admin_elasticsearch_index_task WHERE `entity` IN (:entities)',
307
            ['entities' => $entities],
308
            ['entities' => ArrayParameterType::STRING]
309
        );
310
311
        foreach ($indexTasks as $task) {
312
            $this->connection->insert('admin_elasticsearch_index_task', $task);
313
        }
314
    }
315
316
    private function create(AbstractAdminIndexer $indexer, string $index, string $alias): void
317
    {
318
        $mapping = $indexer->mapping([
319
            'properties' => [
320
                'id' => ['type' => 'keyword'],
321
                'textBoosted' => ['type' => 'text'],
322
                'text' => ['type' => 'text'],
323
                'entityName' => ['type' => 'keyword'],
324
                'parameters' => ['type' => 'keyword'],
325
            ],
326
        ]);
327
328
        $mapping = array_merge_recursive($mapping, $this->mapping);
329
330
        $body = array_merge(
331
            $this->config,
332
            ['mappings' => $mapping]
333
        );
334
335
        $this->client->indices()->create([
336
            'index' => $index,
337
            'body' => $body,
338
        ]);
339
340
        $this->createAliasIfNotExisting($index, $alias);
341
    }
342
343
    /**
344
     * @param array<string, array<array<string, mixed>>> $result
345
     *
346
     * @return array<array{reason: string}|string>
347
     */
348
    private function parseErrors(array $result): array
349
    {
350
        $errors = [];
351
        foreach ($result['items'] as $item) {
352
            $item = $item['index'] ?? $item['delete'];
353
354
            if (\in_array($item['status'], [200, 201], true)) {
355
                continue;
356
            }
357
358
            $errors[] = [
359
                'index' => $item['_index'],
360
                'id' => $item['_id'],
361
                'type' => $item['error']['type'] ?? $item['_type'],
362
                'reason' => $item['error']['reason'] ?? $item['result'],
363
            ];
364
        }
365
366
        return $errors;
367
    }
368
369
    private function createAliasIfNotExisting(string $index, string $alias): void
370
    {
371
        if ($this->client->indices()->existsAlias(['name' => $alias])) {
372
            return;
373
        }
374
375
        $this->putAlias($index, $alias);
376
    }
377
378
    /**
379
     * @param array<string, string> $indices
380
     */
381
    private function swapAlias(array $indices): void
382
    {
383
        foreach ($indices as $alias => $index) {
384
            if (!$this->client->indices()->existsAlias(['name' => $alias])) {
385
                $this->putAlias($index, $alias);
386
387
                return;
388
            }
389
390
            $current = $this->client->indices()->getAlias(['name' => $alias]);
391
392
            if (!isset($current[$index])) {
393
                $this->putAlias($index, $alias);
394
            }
395
396
            unset($current[$index]);
397
            $current = array_keys($current);
398
399
            foreach ($current as $value) {
400
                $this->client->indices()->delete(['index' => $value]);
401
            }
402
        }
403
    }
404
405
    private function putAlias(string $index, string $alias): void
406
    {
407
        $this->client->indices()->refresh([
408
            'index' => $index,
409
        ]);
410
        $this->client->indices()->putAlias(['index' => $index, 'name' => $alias]);
411
    }
412
}
413