Passed
Push — 6.4 ( be76f2...e19c4d )
by Christian
14:08 queued 13s
created

AdminSearchRegistry::push()   B

Complexity

Conditions 6
Paths 9

Size

Total Lines 37
Code Lines 20

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 6
eloc 20
c 1
b 0
f 0
nc 9
nop 4
dl 0
loc 37
rs 8.9777
1
<?php declare(strict_types=1);
2
3
namespace Shopware\Elasticsearch\Admin;
4
5
use Doctrine\DBAL\Connection;
6
use Elasticsearch\Client;
7
use Shopware\Core\Framework\DataAbstractionLayer\Event\EntityWrittenContainerEvent;
8
use Shopware\Core\Framework\Event\ProgressAdvancedEvent;
9
use Shopware\Core\Framework\Event\ProgressFinishedEvent;
10
use Shopware\Core\Framework\Event\ProgressStartedEvent;
11
use Shopware\Core\Framework\Log\Package;
12
use Shopware\Core\Framework\Uuid\Uuid;
13
use Shopware\Elasticsearch\Admin\Indexer\AbstractAdminIndexer;
14
use Shopware\Elasticsearch\Exception\ElasticsearchIndexingException;
15
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
16
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;
17
use Symfony\Component\Messenger\MessageBusInterface;
18
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
19
20
/**
21
 * @internal
22
 *
23
 * @final
24
 */
25
#[Package('system-settings')]
26
class AdminSearchRegistry implements MessageHandlerInterface, EventSubscriberInterface
27
{
28
    /**
29
     * @var array<string, mixed>
30
     */
31
    private array $indexer;
32
33
    private Connection $connection;
34
35
    private MessageBusInterface $queue;
36
37
    private EventDispatcherInterface $dispatcher;
38
39
    private Client $client;
40
41
    private AdminElasticsearchHelper $adminEsHelper;
42
43
    /**
44
     * @var array<mixed>
45
     */
46
    private array $config;
47
48
    /**
49
     * @var array<mixed>
50
     */
51
    private array $mapping;
52
53
    /**
54
     * @param AbstractAdminIndexer[] $indexer
55
     * @param array<mixed> $config
56
     * @param array<mixed> $mapping
57
     */
58
    public function __construct(
59
        $indexer,
60
        Connection $connection,
61
        MessageBusInterface $queue,
62
        EventDispatcherInterface $dispatcher,
63
        Client $client,
64
        AdminElasticsearchHelper $adminEsHelper,
65
        array $config,
66
        array $mapping
67
    ) {
68
        $this->indexer = $indexer instanceof \Traversable ? iterator_to_array($indexer) : $indexer;
0 ignored issues
show
introduced by
$indexer is never a sub-type of Traversable.
Loading history...
69
        $this->connection = $connection;
70
        $this->queue = $queue;
71
        $this->dispatcher = $dispatcher;
72
        $this->client = $client;
73
        $this->adminEsHelper = $adminEsHelper;
74
        $this->mapping = $mapping;
75
76
        if (isset($config['settings']['index'])) {
77
            if (\array_key_exists('number_of_shards', $config['settings']['index']) && $config['settings']['index']['number_of_shards'] === null) {
78
                unset($config['settings']['index']['number_of_shards']);
79
            }
80
81
            if (\array_key_exists('number_of_replicas', $config['settings']['index']) && $config['settings']['index']['number_of_replicas'] === null) {
82
                unset($config['settings']['index']['number_of_replicas']);
83
            }
84
        }
85
86
        $this->config = $config;
87
    }
88
89
    public function __invoke(AdminSearchIndexingMessage $message): void
90
    {
91
        $indexer = $this->getIndexer($message->getEntity());
92
93
        $documents = $indexer->fetch($message->getIds());
94
95
        $this->push($indexer, $message->getIndices(), $documents, $message->getIds());
96
    }
97
98
    public static function getSubscribedEvents(): array
99
    {
100
        return [
101
            EntityWrittenContainerEvent::class => [
102
                ['refresh', -1000],
103
            ],
104
        ];
105
    }
106
107
    /**
108
     * @return iterable<class-string>
109
     */
110
    public static function getHandledMessages(): iterable
111
    {
112
        return [
113
            AdminSearchIndexingMessage::class,
114
        ];
115
    }
116
117
    public function iterate(AdminIndexingBehavior $indexingBehavior): void
118
    {
119
        if ($this->adminEsHelper->getEnabled() === false) {
120
            return;
121
        }
122
123
        /** @var array<string> $entities */
124
        $entities = array_keys($this->indexer);
125
126
        if ($indexingBehavior->getOnlyEntities()) {
127
            $entities = array_intersect($entities, $indexingBehavior->getOnlyEntities());
128
        } elseif ($indexingBehavior->getSkipEntities()) {
129
            $entities = array_diff($entities, $indexingBehavior->getSkipEntities());
130
        }
131
132
        $indices = $this->createIndices($entities);
133
134
        foreach ($entities as $entityName) {
135
            $indexer = $this->getIndexer($entityName);
136
            $iterator = $indexer->getIterator();
137
138
            $this->dispatcher->dispatch(new ProgressStartedEvent($indexer->getName(), $iterator->fetchCount()));
139
140
            while ($ids = $iterator->fetch()) {
141
                // we provide no queue when the data is sent by the admin
142
                if ($indexingBehavior->getNoQueue() === true) {
143
                    $this->__invoke(new AdminSearchIndexingMessage($indexer->getEntity(), $indexer->getName(), $indices, $ids));
144
                } else {
145
                    $this->queue->dispatch(new AdminSearchIndexingMessage($indexer->getEntity(), $indexer->getName(), $indices, $ids));
146
                }
147
148
                $this->dispatcher->dispatch(new ProgressAdvancedEvent(\count($ids)));
149
            }
150
151
            $this->dispatcher->dispatch(new ProgressFinishedEvent($indexer->getName()));
152
        }
153
154
        $this->swapAlias($indices);
155
    }
156
157
    public function refresh(EntityWrittenContainerEvent $event): void
158
    {
159
        if ($this->adminEsHelper->getEnabled() === false || !$this->isIndexedEntityWritten($event)) {
160
            return;
161
        }
162
163
        if ($this->adminEsHelper->getRefreshIndices()) {
164
            $this->refreshIndices();
165
        }
166
167
        /** @var array<string, string> $indices */
168
        $indices = $this->connection->fetchAllKeyValue('SELECT `alias`, `index` FROM admin_elasticsearch_index_task');
169
170
        if (empty($indices)) {
171
            return;
172
        }
173
174
        foreach ($this->indexer as $indexer) {
175
            $ids = $event->getPrimaryKeys($indexer->getEntity());
176
177
            if (empty($ids)) {
178
                continue;
179
            }
180
            $documents = $indexer->fetch($ids);
181
182
            $this->push($indexer, $indices, $documents, $ids);
183
        }
184
    }
185
186
    /**
187
     * @return AbstractAdminIndexer[]
188
     */
189
    public function getIndexers(): iterable
190
    {
191
        return $this->indexer;
192
    }
193
194
    public function getIndexer(string $name): AbstractAdminIndexer
195
    {
196
        $indexer = $this->indexer[$name] ?? null;
197
        if ($indexer) {
198
            return $indexer;
199
        }
200
201
        throw new ElasticsearchIndexingException([\sprintf('Indexer for name %s not found', $name)]);
202
    }
203
204
    private function isIndexedEntityWritten(EntityWrittenContainerEvent $event): bool
205
    {
206
        foreach ($this->indexer as $indexer) {
207
            $ids = $event->getPrimaryKeys($indexer->getEntity());
208
209
            if (!empty($ids)) {
210
                return true;
211
            }
212
        }
213
214
        return false;
215
    }
216
217
    /**
218
     * @param array<string, string> $indices
219
     * @param array<string, array<string|int, string>> $data
220
     * @param array<string> $ids
221
     */
222
    private function push(AbstractAdminIndexer $indexer, array $indices, array $data, array $ids): void
223
    {
224
        $alias = $this->adminEsHelper->getIndex($indexer->getName());
225
226
        if (!isset($indices[$alias])) {
227
            return;
228
        }
229
230
        $toRemove = array_filter($ids, static function (string $id) use ($data): bool {
231
            return !isset($data[$id]);
232
        });
233
234
        $documents = [];
235
        foreach ($data as $id => $document) {
236
            $documents[] = ['index' => ['_id' => $id]];
237
238
            $documents[] = \array_replace(
239
                ['entityName' => $indexer->getEntity(), 'parameters' => [], 'textBoosted' => '', 'text' => ''],
240
                $document
241
            );
242
        }
243
244
        foreach ($toRemove as $id) {
245
            $documents[] = ['delete' => ['_id' => $id]];
246
        }
247
248
        $arguments = [
249
            'index' => $indices[$alias],
250
            'body' => $documents,
251
        ];
252
253
        $result = $this->client->bulk($arguments);
254
255
        if (\is_array($result) && !empty($result['errors'])) {
256
            $errors = $this->parseErrors($result);
257
258
            throw new ElasticsearchIndexingException($errors);
259
        }
260
    }
261
262
    /**
263
     * @param array<string> $entities
264
     *
265
     * @throws \Doctrine\DBAL\Exception
266
     *
267
     * @return array<string, string>
268
     */
269
    private function createIndices(array $entities): array
270
    {
271
        $indexTasks = [];
272
        $indices = [];
273
        foreach ($entities as $entityName) {
274
            $indexer = $this->getIndexer($entityName);
275
            $alias = $this->adminEsHelper->getIndex($indexer->getName());
276
            $index = $alias . '_' . time();
277
278
            if ($this->indexExists($index)) {
279
                continue;
280
            }
281
282
            $indices[$alias] = $index;
283
284
            $this->create($indexer, $index, $alias);
285
286
            $iterator = $indexer->getIterator();
287
            $indexTasks[] = [
288
                'id' => Uuid::randomBytes(),
289
                '`entity`' => $indexer->getEntity(),
290
                '`index`' => $index,
291
                '`alias`' => $alias,
292
                '`doc_count`' => $iterator->fetchCount(),
293
            ];
294
        }
295
296
        $this->connection->executeStatement(
297
            'DELETE FROM admin_elasticsearch_index_task WHERE `entity` IN (:entities)',
298
            ['entities' => $entities],
299
            ['entities' => Connection::PARAM_STR_ARRAY]
300
        );
301
302
        foreach ($indexTasks as $task) {
303
            $this->connection->insert('admin_elasticsearch_index_task', $task);
304
        }
305
306
        return $indices;
307
    }
308
309
    private function refreshIndices(): void
310
    {
311
        $entities = [];
312
        $indexTasks = [];
313
        foreach ($this->indexer as $indexer) {
314
            $alias = $this->adminEsHelper->getIndex($indexer->getName());
315
316
            if ($this->aliasExists($alias)) {
317
                continue;
318
            }
319
320
            $index = $alias . '_' . time();
321
            $this->create($indexer, $index, $alias);
322
323
            $entities[] = $indexer->getEntity();
324
325
            $iterator = $indexer->getIterator();
326
            $indexTasks[] = [
327
                'id' => Uuid::randomBytes(),
328
                '`entity`' => $indexer->getEntity(),
329
                '`index`' => $index,
330
                '`alias`' => $alias,
331
                '`doc_count`' => $iterator->fetchCount(),
332
            ];
333
        }
334
335
        $this->connection->executeStatement(
336
            'DELETE FROM admin_elasticsearch_index_task WHERE `entity` IN (:entities)',
337
            ['entities' => $entities],
338
            ['entities' => Connection::PARAM_STR_ARRAY]
339
        );
340
341
        foreach ($indexTasks as $task) {
342
            $this->connection->insert('admin_elasticsearch_index_task', $task);
343
        }
344
    }
345
346
    private function create(AbstractAdminIndexer $indexer, string $index, string $alias): void
347
    {
348
        $mapping = $indexer->mapping([
349
            'properties' => [
350
                'id' => ['type' => 'keyword'],
351
                'textBoosted' => ['type' => 'text'],
352
                'text' => ['type' => 'text'],
353
                'entityName' => ['type' => 'keyword'],
354
                'parameters' => ['type' => 'keyword'],
355
            ],
356
        ]);
357
358
        $mapping = array_merge_recursive($mapping, $this->mapping);
359
360
        $body = array_merge(
361
            $this->config,
362
            ['mappings' => $mapping]
363
        );
364
365
        $this->client->indices()->create([
366
            'index' => $index,
367
            'body' => $body,
368
        ]);
369
370
        $this->createAliasIfNotExisting($index, $alias);
371
    }
372
373
    private function indexExists(string $name): bool
374
    {
375
        return $this->client->indices()->exists(['index' => $name]);
376
    }
377
378
    private function aliasExists(string $alias): bool
379
    {
380
        return $this->client->indices()->existsAlias(['name' => $alias]);
381
    }
382
383
    /**
384
     * @param array<string, array<array<string, mixed>>> $result
385
     *
386
     * @return array<array{reason: string}|string>
387
     */
388
    private function parseErrors(array $result): array
389
    {
390
        $errors = [];
391
        foreach ($result['items'] as $item) {
392
            $item = $item['index'] ?? $item['delete'];
393
394
            if (\in_array($item['status'], [200, 201], true)) {
395
                continue;
396
            }
397
398
            $errors[] = [
399
                'index' => $item['_index'],
400
                'id' => $item['_id'],
401
                'type' => $item['error']['type'] ?? $item['_type'],
402
                'reason' => $item['error']['reason'] ?? $item['result'],
403
            ];
404
        }
405
406
        return $errors;
407
    }
408
409
    private function createAliasIfNotExisting(string $index, string $alias): void
410
    {
411
        $exist = $this->client->indices()->existsAlias(['name' => $alias]);
412
413
        if ($exist) {
414
            return;
415
        }
416
417
        $this->putAlias($index, $alias);
418
    }
419
420
    /**
421
     * @param array<string, string> $indices
422
     */
423
    private function swapAlias($indices): void
424
    {
425
        foreach ($indices as $alias => $index) {
426
            $exist = $this->client->indices()->existsAlias(['name' => $alias]);
427
428
            if (!$exist) {
429
                $this->putAlias($index, $alias);
430
431
                return;
432
            }
433
434
            $current = $this->client->indices()->getAlias(['name' => $alias]);
435
436
            if (!isset($current[$index])) {
437
                $this->putAlias($index, $alias);
438
            }
439
440
            unset($current[$index]);
441
            $current = array_keys($current);
442
443
            foreach ($current as $value) {
444
                $this->client->indices()->delete(['index' => $value]);
445
            }
446
        }
447
    }
448
449
    private function putAlias(string $index, string $alias): void
450
    {
451
        $this->client->indices()->refresh([
452
            'index' => $index,
453
        ]);
454
        $this->client->indices()->putAlias(['index' => $index, 'name' => $alias]);
455
    }
456
}
457