Passed
Push — 6.4 ( be76f2...e19c4d )
by Christian
14:08 queued 13s
created

AdminSearchRegistry::__construct()   B

Complexity

Conditions 7
Paths 10

Size

Total Lines 29
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 7
eloc 13
c 1
b 0
f 0
nc 10
nop 8
dl 0
loc 29
rs 8.8333

How to fix   Many Parameters   

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
<?php declare(strict_types=1);
2
3
namespace Shopware\Elasticsearch\Admin;
4
5
use Doctrine\DBAL\Connection;
6
use Elasticsearch\Client;
7
use Shopware\Core\Framework\DataAbstractionLayer\Event\EntityWrittenContainerEvent;
8
use Shopware\Core\Framework\Event\ProgressAdvancedEvent;
9
use Shopware\Core\Framework\Event\ProgressFinishedEvent;
10
use Shopware\Core\Framework\Event\ProgressStartedEvent;
11
use Shopware\Core\Framework\Log\Package;
12
use Shopware\Core\Framework\Uuid\Uuid;
13
use Shopware\Elasticsearch\Admin\Indexer\AbstractAdminIndexer;
14
use Shopware\Elasticsearch\Exception\ElasticsearchIndexingException;
15
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
16
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;
17
use Symfony\Component\Messenger\MessageBusInterface;
18
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
19
20
/**
21
 * @internal
22
 *
23
 * @final
24
 */
25
#[Package('system-settings')]
26
class AdminSearchRegistry implements MessageHandlerInterface, EventSubscriberInterface
27
{
28
    /**
29
     * @var array<string, mixed>
30
     */
31
    private array $indexer;
32
33
    private Connection $connection;
34
35
    private MessageBusInterface $queue;
36
37
    private EventDispatcherInterface $dispatcher;
38
39
    private Client $client;
40
41
    private AdminElasticsearchHelper $adminEsHelper;
42
43
    /**
44
     * @var array<mixed>
45
     */
46
    private array $config;
47
48
    /**
49
     * @var array<mixed>
50
     */
51
    private array $mapping;
52
53
    /**
54
     * @param AbstractAdminIndexer[] $indexer
55
     * @param array<mixed> $config
56
     * @param array<mixed> $mapping
57
     */
58
    public function __construct(
59
        $indexer,
60
        Connection $connection,
61
        MessageBusInterface $queue,
62
        EventDispatcherInterface $dispatcher,
63
        Client $client,
64
        AdminElasticsearchHelper $adminEsHelper,
65
        array $config,
66
        array $mapping
67
    ) {
68
        $this->indexer = $indexer instanceof \Traversable ? iterator_to_array($indexer) : $indexer;
0 ignored issues
show
introduced by
$indexer is never a sub-type of Traversable.
Loading history...
69
        $this->connection = $connection;
70
        $this->queue = $queue;
71
        $this->dispatcher = $dispatcher;
72
        $this->client = $client;
73
        $this->adminEsHelper = $adminEsHelper;
74
        $this->mapping = $mapping;
75
76
        if (isset($config['settings']['index'])) {
77
            if (\array_key_exists('number_of_shards', $config['settings']['index']) && $config['settings']['index']['number_of_shards'] === null) {
78
                unset($config['settings']['index']['number_of_shards']);
79
            }
80
81
            if (\array_key_exists('number_of_replicas', $config['settings']['index']) && $config['settings']['index']['number_of_replicas'] === null) {
82
                unset($config['settings']['index']['number_of_replicas']);
83
            }
84
        }
85
86
        $this->config = $config;
87
    }
88
89
    public function __invoke(AdminSearchIndexingMessage $message): void
90
    {
91
        $indexer = $this->getIndexer($message->getEntity());
92
93
        $documents = $indexer->fetch($message->getIds());
94
95
        $this->push($indexer, $message->getIndices(), $documents, $message->getIds());
96
    }
97
98
    public static function getSubscribedEvents(): array
99
    {
100
        return [
101
            EntityWrittenContainerEvent::class => [
102
                ['refresh', -1000],
103
            ],
104
        ];
105
    }
106
107
    /**
108
     * @return iterable<class-string>
109
     */
110
    public static function getHandledMessages(): iterable
111
    {
112
        return [
113
            AdminSearchIndexingMessage::class,
114
        ];
115
    }
116
117
    public function iterate(AdminIndexingBehavior $indexingBehavior): void
118
    {
119
        if ($this->adminEsHelper->getEnabled() === false) {
120
            return;
121
        }
122
123
        /** @var array<string> $entities */
124
        $entities = array_keys($this->indexer);
125
126
        if ($indexingBehavior->getOnlyEntities()) {
127
            $entities = array_intersect($entities, $indexingBehavior->getOnlyEntities());
128
        } elseif ($indexingBehavior->getSkipEntities()) {
129
            $entities = array_diff($entities, $indexingBehavior->getSkipEntities());
130
        }
131
132
        $indices = $this->createIndices($entities);
133
134
        foreach ($entities as $entityName) {
135
            $indexer = $this->getIndexer($entityName);
136
            $iterator = $indexer->getIterator();
137
138
            $this->dispatcher->dispatch(new ProgressStartedEvent($indexer->getName(), $iterator->fetchCount()));
139
140
            while ($ids = $iterator->fetch()) {
141
                // we provide no queue when the data is sent by the admin
142
                if ($indexingBehavior->getNoQueue() === true) {
143
                    $this->__invoke(new AdminSearchIndexingMessage($indexer->getEntity(), $indexer->getName(), $indices, $ids));
144
                } else {
145
                    $this->queue->dispatch(new AdminSearchIndexingMessage($indexer->getEntity(), $indexer->getName(), $indices, $ids));
146
                }
147
148
                $this->dispatcher->dispatch(new ProgressAdvancedEvent(\count($ids)));
149
            }
150
151
            $this->dispatcher->dispatch(new ProgressFinishedEvent($indexer->getName()));
152
        }
153
154
        $this->swapAlias($indices);
155
    }
156
157
    public function refresh(EntityWrittenContainerEvent $event): void
158
    {
159
        if ($this->adminEsHelper->getEnabled() === false || !$this->isIndexedEntityWritten($event)) {
160
            return;
161
        }
162
163
        if ($this->adminEsHelper->getRefreshIndices()) {
164
            $this->refreshIndices();
165
        }
166
167
        /** @var array<string, string> $indices */
168
        $indices = $this->connection->fetchAllKeyValue('SELECT `alias`, `index` FROM admin_elasticsearch_index_task');
169
170
        if (empty($indices)) {
171
            return;
172
        }
173
174
        foreach ($this->indexer as $indexer) {
175
            $ids = $event->getPrimaryKeys($indexer->getEntity());
176
177
            if (empty($ids)) {
178
                continue;
179
            }
180
            $documents = $indexer->fetch($ids);
181
182
            $this->push($indexer, $indices, $documents, $ids);
183
        }
184
    }
185
186
    /**
187
     * @return AbstractAdminIndexer[]
188
     */
189
    public function getIndexers(): iterable
190
    {
191
        return $this->indexer;
192
    }
193
194
    public function getIndexer(string $name): AbstractAdminIndexer
195
    {
196
        $indexer = $this->indexer[$name] ?? null;
197
        if ($indexer) {
198
            return $indexer;
199
        }
200
201
        throw new ElasticsearchIndexingException([\sprintf('Indexer for name %s not found', $name)]);
202
    }
203
204
    private function isIndexedEntityWritten(EntityWrittenContainerEvent $event): bool
205
    {
206
        foreach ($this->indexer as $indexer) {
207
            $ids = $event->getPrimaryKeys($indexer->getEntity());
208
209
            if (!empty($ids)) {
210
                return true;
211
            }
212
        }
213
214
        return false;
215
    }
216
217
    /**
218
     * @param array<string, string> $indices
219
     * @param array<string, array<string|int, string>> $data
220
     * @param array<string> $ids
221
     */
222
    private function push(AbstractAdminIndexer $indexer, array $indices, array $data, array $ids): void
223
    {
224
        $alias = $this->adminEsHelper->getIndex($indexer->getName());
225
226
        if (!isset($indices[$alias])) {
227
            return;
228
        }
229
230
        $toRemove = array_filter($ids, static function (string $id) use ($data): bool {
231
            return !isset($data[$id]);
232
        });
233
234
        $documents = [];
235
        foreach ($data as $id => $document) {
236
            $documents[] = ['index' => ['_id' => $id]];
237
238
            $documents[] = \array_replace(
239
                ['entityName' => $indexer->getEntity(), 'parameters' => [], 'textBoosted' => '', 'text' => ''],
240
                $document
241
            );
242
        }
243
244
        foreach ($toRemove as $id) {
245
            $documents[] = ['delete' => ['_id' => $id]];
246
        }
247
248
        $arguments = [
249
            'index' => $indices[$alias],
250
            'body' => $documents,
251
        ];
252
253
        $result = $this->client->bulk($arguments);
254
255
        if (\is_array($result) && !empty($result['errors'])) {
256
            $errors = $this->parseErrors($result);
257
258
            throw new ElasticsearchIndexingException($errors);
259
        }
260
    }
261
262
    /**
263
     * @param array<string> $entities
264
     *
265
     * @throws \Doctrine\DBAL\Exception
266
     *
267
     * @return array<string, string>
268
     */
269
    private function createIndices(array $entities): array
270
    {
271
        $indexTasks = [];
272
        $indices = [];
273
        foreach ($entities as $entityName) {
274
            $indexer = $this->getIndexer($entityName);
275
            $alias = $this->adminEsHelper->getIndex($indexer->getName());
276
            $index = $alias . '_' . time();
277
278
            if ($this->indexExists($index)) {
279
                continue;
280
            }
281
282
            $indices[$alias] = $index;
283
284
            $this->create($indexer, $index, $alias);
285
286
            $iterator = $indexer->getIterator();
287
            $indexTasks[] = [
288
                'id' => Uuid::randomBytes(),
289
                '`entity`' => $indexer->getEntity(),
290
                '`index`' => $index,
291
                '`alias`' => $alias,
292
                '`doc_count`' => $iterator->fetchCount(),
293
            ];
294
        }
295
296
        $this->connection->executeStatement(
297
            'DELETE FROM admin_elasticsearch_index_task WHERE `entity` IN (:entities)',
298
            ['entities' => $entities],
299
            ['entities' => Connection::PARAM_STR_ARRAY]
300
        );
301
302
        foreach ($indexTasks as $task) {
303
            $this->connection->insert('admin_elasticsearch_index_task', $task);
304
        }
305
306
        return $indices;
307
    }
308
309
    private function refreshIndices(): void
310
    {
311
        $entities = [];
312
        $indexTasks = [];
313
        foreach ($this->indexer as $indexer) {
314
            $alias = $this->adminEsHelper->getIndex($indexer->getName());
315
316
            if ($this->aliasExists($alias)) {
317
                continue;
318
            }
319
320
            $index = $alias . '_' . time();
321
            $this->create($indexer, $index, $alias);
322
323
            $entities[] = $indexer->getEntity();
324
325
            $iterator = $indexer->getIterator();
326
            $indexTasks[] = [
327
                'id' => Uuid::randomBytes(),
328
                '`entity`' => $indexer->getEntity(),
329
                '`index`' => $index,
330
                '`alias`' => $alias,
331
                '`doc_count`' => $iterator->fetchCount(),
332
            ];
333
        }
334
335
        $this->connection->executeStatement(
336
            'DELETE FROM admin_elasticsearch_index_task WHERE `entity` IN (:entities)',
337
            ['entities' => $entities],
338
            ['entities' => Connection::PARAM_STR_ARRAY]
339
        );
340
341
        foreach ($indexTasks as $task) {
342
            $this->connection->insert('admin_elasticsearch_index_task', $task);
343
        }
344
    }
345
346
    private function create(AbstractAdminIndexer $indexer, string $index, string $alias): void
347
    {
348
        $mapping = $indexer->mapping([
349
            'properties' => [
350
                'id' => ['type' => 'keyword'],
351
                'textBoosted' => ['type' => 'text'],
352
                'text' => ['type' => 'text'],
353
                'entityName' => ['type' => 'keyword'],
354
                'parameters' => ['type' => 'keyword'],
355
            ],
356
        ]);
357
358
        $mapping = array_merge_recursive($mapping, $this->mapping);
359
360
        $body = array_merge(
361
            $this->config,
362
            ['mappings' => $mapping]
363
        );
364
365
        $this->client->indices()->create([
366
            'index' => $index,
367
            'body' => $body,
368
        ]);
369
370
        $this->createAliasIfNotExisting($index, $alias);
371
    }
372
373
    private function indexExists(string $name): bool
374
    {
375
        return $this->client->indices()->exists(['index' => $name]);
376
    }
377
378
    private function aliasExists(string $alias): bool
379
    {
380
        return $this->client->indices()->existsAlias(['name' => $alias]);
381
    }
382
383
    /**
384
     * @param array<string, array<array<string, mixed>>> $result
385
     *
386
     * @return array<array{reason: string}|string>
387
     */
388
    private function parseErrors(array $result): array
389
    {
390
        $errors = [];
391
        foreach ($result['items'] as $item) {
392
            $item = $item['index'] ?? $item['delete'];
393
394
            if (\in_array($item['status'], [200, 201], true)) {
395
                continue;
396
            }
397
398
            $errors[] = [
399
                'index' => $item['_index'],
400
                'id' => $item['_id'],
401
                'type' => $item['error']['type'] ?? $item['_type'],
402
                'reason' => $item['error']['reason'] ?? $item['result'],
403
            ];
404
        }
405
406
        return $errors;
407
    }
408
409
    private function createAliasIfNotExisting(string $index, string $alias): void
410
    {
411
        $exist = $this->client->indices()->existsAlias(['name' => $alias]);
412
413
        if ($exist) {
414
            return;
415
        }
416
417
        $this->putAlias($index, $alias);
418
    }
419
420
    /**
421
     * @param array<string, string> $indices
422
     */
423
    private function swapAlias($indices): void
424
    {
425
        foreach ($indices as $alias => $index) {
426
            $exist = $this->client->indices()->existsAlias(['name' => $alias]);
427
428
            if (!$exist) {
429
                $this->putAlias($index, $alias);
430
431
                return;
432
            }
433
434
            $current = $this->client->indices()->getAlias(['name' => $alias]);
435
436
            if (!isset($current[$index])) {
437
                $this->putAlias($index, $alias);
438
            }
439
440
            unset($current[$index]);
441
            $current = array_keys($current);
442
443
            foreach ($current as $value) {
444
                $this->client->indices()->delete(['index' => $value]);
445
            }
446
        }
447
    }
448
449
    private function putAlias(string $index, string $alias): void
450
    {
451
        $this->client->indices()->refresh([
452
            'index' => $index,
453
        ]);
454
        $this->client->indices()->putAlias(['index' => $index, 'name' => $alias]);
455
    }
456
}
457