Passed
Push — trunk ( 325349...f71779 )
by Christian
12:37 queued 16s
created

MultilingualEsIndexer::__construct()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 10
Code Lines 0

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
eloc 0
nc 1
nop 8
dl 0
loc 10
rs 10
c 1
b 0
f 0

How to fix   Many Parameters   

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
<?php declare(strict_types=1);
2
3
namespace Shopware\Elasticsearch\Framework\Indexing;
4
5
use Doctrine\DBAL\Connection;
6
use OpenSearch\Client;
7
use Psr\Log\LoggerInterface;
8
use Shopware\Core\Framework\Context;
9
use Shopware\Core\Framework\DataAbstractionLayer\Dbal\Common\IteratorFactory;
10
use Shopware\Core\Framework\DataAbstractionLayer\EntityDefinition;
11
use Shopware\Core\Framework\Log\Package;
12
use Shopware\Core\Framework\Uuid\Uuid;
13
use Shopware\Elasticsearch\Framework\ElasticsearchHelper;
14
use Shopware\Elasticsearch\Framework\ElasticsearchRegistry;
15
use Symfony\Component\Finder\Finder;
16
17
/**
18
 * @internal
19
 *
20
 * @decrecated tag:v6.6.0 - Will be removed, please transfer all public methods method to ElasticsearchProductDefinition
21
 */
22
#[Package('core')]
23
class MultilingualEsIndexer
24
{
25
    /**
26
     * @internal
27
     */
28
    public function __construct(
29
        private readonly Connection $connection,
30
        private readonly ElasticsearchHelper $helper,
31
        private readonly ElasticsearchRegistry $registry,
32
        private readonly IndexCreator $indexCreator,
33
        private readonly IteratorFactory $iteratorFactory,
34
        private readonly Client $client,
35
        private readonly LoggerInterface $logger,
36
        private readonly int $indexingBatchSize
37
    ) {
38
    }
39
40
    public function __invoke(ElasticsearchIndexingMessage $message): void
41
    {
42
        if (!$this->helper->allowIndexing()) {
43
            return;
44
        }
45
46
        $this->handleIndexingMessage($message);
47
    }
48
49
    public function iterate(?IndexerOffset $offset = null): ?ElasticsearchIndexingMessage
50
    {
51
        if (!$this->helper->allowIndexing()) {
52
            return null;
53
        }
54
55
        if ($offset === null) {
56
            $offset = $this->init();
57
        }
58
59
        return $this->createIndexingMessage($offset);
60
    }
61
62
    /**
63
     * @param array<string> $ids
64
     */
65
    public function updateIds(EntityDefinition $definition, array $ids): void
66
    {
67
        if (!$this->helper->allowIndexing()) {
68
            return;
69
        }
70
71
        $alias = $this->helper->getIndexName($definition);
72
73
        if (!$this->client->indices()->existsAlias(['name' => $alias])) {
74
            $this->init();
75
        }
76
77
        $this->__invoke($this->generateMessage($definition, $ids));
78
    }
79
80
    /**
81
     * @param array<string> $ids
82
     */
83
    private function generateMessage(EntityDefinition $definition, array $ids): ElasticsearchIndexingMessage
84
    {
85
        $context = Context::createDefaultContext();
86
87
        $alias = $this->helper->getIndexName($definition);
88
89
        $indexing = new IndexingDto($ids, $alias, $definition->getEntityName());
90
91
        return new ElasticsearchIndexingMessage($indexing, null, $context);
92
    }
93
94
    private function createIndexingMessage(IndexerOffset $offset): ?ElasticsearchIndexingMessage
95
    {
96
        $definition = $this->registry->get((string) $offset->getDefinition());
97
98
        if (!$definition) {
99
            throw ElasticsearchIndexingException::definitionNotFound((string) $offset->getDefinition());
100
        }
101
102
        $entity = $definition->getEntityDefinition()->getEntityName();
103
104
        $iterator = $this->iteratorFactory->createIterator($definition->getEntityDefinition(), $offset->getLastId(), $this->indexingBatchSize);
105
106
        $ids = $iterator->fetch();
107
108
        if (empty($ids)) {
109
            if (!$offset->hasNextDefinition()) {
110
                return null;
111
            }
112
            // increment definition offset
113
            $offset->selectNextDefinition();
114
115
            // reset last id to start iterator at the beginning
116
            $offset->setLastId(null);
117
118
            return $this->createIndexingMessage($offset);
119
        }
120
121
        // increment last id with iterator offset
122
        $offset->setLastId($iterator->getOffset());
123
124
        $alias = $this->helper->getIndexName($definition->getEntityDefinition());
125
126
        $index = $alias . '_' . $offset->getTimestamp();
127
128
        // return indexing message for current offset
129
        return new ElasticsearchIndexingMessage(new IndexingDto(array_values($ids), $index, $entity), $offset, Context::createDefaultContext());
130
    }
131
132
    private function init(): IndexerOffset
133
    {
134
        $this->connection->executeStatement('DELETE FROM elasticsearch_index_task');
135
136
        $this->createScripts();
137
138
        $timestamp = new \DateTime();
139
140
        $this->createIndex($timestamp);
141
142
        return new IndexerOffset(
143
            [],
144
            $this->registry->getDefinitions(),
145
            $timestamp->getTimestamp()
146
        );
147
    }
148
149
    /**
150
     * @param array<mixed> $result
151
     *
152
     * @return array{index: string, id: string, type: string, reason: string}[]
153
     */
154
    private function parseErrors(array $result): array
155
    {
156
        $errors = [];
157
        foreach ($result['items'] as $item) {
158
            $item = $item['index'] ?? $item['delete'];
159
160
            if (\in_array($item['status'], [200, 201], true)) {
161
                continue;
162
            }
163
164
            $errors[] = [
165
                'index' => $item['_index'],
166
                'id' => $item['_id'],
167
                'type' => $item['error']['type'] ?? $item['_type'],
168
                'reason' => $item['error']['reason'] ?? $item['result'],
169
            ];
170
171
            $this->logger->error($item['error']['reason'] ?? $item['result']);
172
        }
173
174
        return $errors;
175
    }
176
177
    private function createScripts(): void
178
    {
179
        $finder = (new Finder())
180
            ->files()
181
            ->in(__DIR__ . '/Scripts')
182
            ->name('*.groovy');
183
184
        foreach ($finder as $file) {
185
            $name = pathinfo($file->getFilename(), \PATHINFO_FILENAME);
186
187
            $this->client->putScript([
188
                'id' => $name,
189
                'body' => [
190
                    'script' => [
191
                        'lang' => 'painless',
192
                        'source' => file_get_contents($file->getPathname()),
193
                    ],
194
                ],
195
            ]);
196
        }
197
    }
198
199
    private function createIndex(\DateTime $timestamp): void
200
    {
201
        $context = Context::createDefaultContext();
202
203
        foreach ($this->registry->getDefinitions() as $definition) {
204
            $alias = $this->helper->getIndexName($definition->getEntityDefinition());
205
206
            $index = $alias . '_' . $timestamp->getTimestamp();
207
208
            $hasAlias = $this->indexCreator->aliasExists($alias);
209
210
            $this->indexCreator->createIndex($definition, $index, $alias, $context);
211
212
            $iterator = $this->iteratorFactory->createIterator($definition->getEntityDefinition());
213
214
            // We don't need an index task, when it's the first indexing. This will allow alias swapping to nothing
215
            if ($hasAlias) {
216
                $this->connection->insert('elasticsearch_index_task', [
217
                    'id' => Uuid::randomBytes(),
218
                    '`entity`' => $definition->getEntityDefinition()->getEntityName(),
219
                    '`index`' => $index,
220
                    '`alias`' => $alias,
221
                    '`doc_count`' => $iterator->fetchCount(),
222
                ]);
223
            }
224
        }
225
    }
226
227
    private function handleIndexingMessage(ElasticsearchIndexingMessage $message): void
228
    {
229
        $task = $message->getData();
230
231
        $ids = $task->getIds();
232
233
        $index = $task->getIndex();
234
235
        $this->connection->executeStatement('UPDATE elasticsearch_index_task SET `doc_count` = `doc_count` - :idCount WHERE `index` = :index', [
236
            'idCount' => \count($ids),
237
            'index' => $index,
238
        ]);
239
240
        if (!$this->client->indices()->exists(['index' => $index])) {
241
            return;
242
        }
243
244
        $entity = $task->getEntity();
245
246
        $definition = $this->registry->get($entity);
247
248
        $context = $message->getContext();
249
250
        if (!$definition) {
251
            throw ElasticsearchIndexingException::definitionNotFound($entity);
252
        }
253
254
        $data = $definition->fetch($ids, $context);
255
256
        $toRemove = array_filter($ids, fn (string $id) => !isset($data[$id]));
257
258
        $documents = [];
259
260
        foreach ($data as $id => $document) {
261
            $documents[] = ['index' => ['_id' => $id]];
262
            $documents[] = $document;
263
        }
264
265
        foreach ($toRemove as $id) {
266
            $documents[] = ['delete' => ['_id' => $id]];
267
        }
268
269
        $arguments = [
270
            'index' => $index,
271
            'body' => $documents,
272
        ];
273
274
        $result = $this->client->bulk($arguments);
275
276
        if (\is_array($result) && isset($result['errors']) && $result['errors']) {
277
            $errors = $this->parseErrors($result);
278
279
            throw ElasticsearchIndexingException::indexingError($errors);
280
        }
281
    }
282
}
283