Passed
Push — trunk ( 325349...f71779 )
by Christian
12:37 queued 16s
created

MultilingualEsIndexer::handleIndexingMessage()   B

Complexity

Conditions 8
Paths 10

Size

Total Lines 53
Code Lines 28

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 8
eloc 28
nc 10
nop 1
dl 0
loc 53
rs 8.4444
c 1
b 0
f 0

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php declare(strict_types=1);
2
3
namespace Shopware\Elasticsearch\Framework\Indexing;
4
5
use Doctrine\DBAL\Connection;
6
use OpenSearch\Client;
7
use Psr\Log\LoggerInterface;
8
use Shopware\Core\Framework\Context;
9
use Shopware\Core\Framework\DataAbstractionLayer\Dbal\Common\IteratorFactory;
10
use Shopware\Core\Framework\DataAbstractionLayer\EntityDefinition;
11
use Shopware\Core\Framework\Log\Package;
12
use Shopware\Core\Framework\Uuid\Uuid;
13
use Shopware\Elasticsearch\Framework\ElasticsearchHelper;
14
use Shopware\Elasticsearch\Framework\ElasticsearchRegistry;
15
use Symfony\Component\Finder\Finder;
16
17
/**
18
 * @internal
19
 *
20
 * @decrecated tag:v6.6.0 - Will be removed, please transfer all public methods method to ElasticsearchProductDefinition
21
 */
22
#[Package('core')]
23
class MultilingualEsIndexer
24
{
25
    /**
26
     * @internal
27
     */
28
    public function __construct(
29
        private readonly Connection $connection,
30
        private readonly ElasticsearchHelper $helper,
31
        private readonly ElasticsearchRegistry $registry,
32
        private readonly IndexCreator $indexCreator,
33
        private readonly IteratorFactory $iteratorFactory,
34
        private readonly Client $client,
35
        private readonly LoggerInterface $logger,
36
        private readonly int $indexingBatchSize
37
    ) {
38
    }
39
40
    public function __invoke(ElasticsearchIndexingMessage $message): void
41
    {
42
        if (!$this->helper->allowIndexing()) {
43
            return;
44
        }
45
46
        $this->handleIndexingMessage($message);
47
    }
48
49
    public function iterate(?IndexerOffset $offset = null): ?ElasticsearchIndexingMessage
50
    {
51
        if (!$this->helper->allowIndexing()) {
52
            return null;
53
        }
54
55
        if ($offset === null) {
56
            $offset = $this->init();
57
        }
58
59
        return $this->createIndexingMessage($offset);
60
    }
61
62
    /**
63
     * @param array<string> $ids
64
     */
65
    public function updateIds(EntityDefinition $definition, array $ids): void
66
    {
67
        if (!$this->helper->allowIndexing()) {
68
            return;
69
        }
70
71
        $alias = $this->helper->getIndexName($definition);
72
73
        if (!$this->client->indices()->existsAlias(['name' => $alias])) {
74
            $this->init();
75
        }
76
77
        $this->__invoke($this->generateMessage($definition, $ids));
78
    }
79
80
    /**
81
     * @param array<string> $ids
82
     */
83
    private function generateMessage(EntityDefinition $definition, array $ids): ElasticsearchIndexingMessage
84
    {
85
        $context = Context::createDefaultContext();
86
87
        $alias = $this->helper->getIndexName($definition);
88
89
        $indexing = new IndexingDto($ids, $alias, $definition->getEntityName());
90
91
        return new ElasticsearchIndexingMessage($indexing, null, $context);
92
    }
93
94
    private function createIndexingMessage(IndexerOffset $offset): ?ElasticsearchIndexingMessage
95
    {
96
        $definition = $this->registry->get((string) $offset->getDefinition());
97
98
        if (!$definition) {
99
            throw ElasticsearchIndexingException::definitionNotFound((string) $offset->getDefinition());
100
        }
101
102
        $entity = $definition->getEntityDefinition()->getEntityName();
103
104
        $iterator = $this->iteratorFactory->createIterator($definition->getEntityDefinition(), $offset->getLastId(), $this->indexingBatchSize);
105
106
        $ids = $iterator->fetch();
107
108
        if (empty($ids)) {
109
            if (!$offset->hasNextDefinition()) {
110
                return null;
111
            }
112
            // increment definition offset
113
            $offset->selectNextDefinition();
114
115
            // reset last id to start iterator at the beginning
116
            $offset->setLastId(null);
117
118
            return $this->createIndexingMessage($offset);
119
        }
120
121
        // increment last id with iterator offset
122
        $offset->setLastId($iterator->getOffset());
123
124
        $alias = $this->helper->getIndexName($definition->getEntityDefinition());
125
126
        $index = $alias . '_' . $offset->getTimestamp();
127
128
        // return indexing message for current offset
129
        return new ElasticsearchIndexingMessage(new IndexingDto(array_values($ids), $index, $entity), $offset, Context::createDefaultContext());
130
    }
131
132
    private function init(): IndexerOffset
133
    {
134
        $this->connection->executeStatement('DELETE FROM elasticsearch_index_task');
135
136
        $this->createScripts();
137
138
        $timestamp = new \DateTime();
139
140
        $this->createIndex($timestamp);
141
142
        return new IndexerOffset(
143
            [],
144
            $this->registry->getDefinitions(),
145
            $timestamp->getTimestamp()
146
        );
147
    }
148
149
    /**
150
     * @param array<mixed> $result
151
     *
152
     * @return array{index: string, id: string, type: string, reason: string}[]
153
     */
154
    private function parseErrors(array $result): array
155
    {
156
        $errors = [];
157
        foreach ($result['items'] as $item) {
158
            $item = $item['index'] ?? $item['delete'];
159
160
            if (\in_array($item['status'], [200, 201], true)) {
161
                continue;
162
            }
163
164
            $errors[] = [
165
                'index' => $item['_index'],
166
                'id' => $item['_id'],
167
                'type' => $item['error']['type'] ?? $item['_type'],
168
                'reason' => $item['error']['reason'] ?? $item['result'],
169
            ];
170
171
            $this->logger->error($item['error']['reason'] ?? $item['result']);
172
        }
173
174
        return $errors;
175
    }
176
177
    private function createScripts(): void
178
    {
179
        $finder = (new Finder())
180
            ->files()
181
            ->in(__DIR__ . '/Scripts')
182
            ->name('*.groovy');
183
184
        foreach ($finder as $file) {
185
            $name = pathinfo($file->getFilename(), \PATHINFO_FILENAME);
186
187
            $this->client->putScript([
188
                'id' => $name,
189
                'body' => [
190
                    'script' => [
191
                        'lang' => 'painless',
192
                        'source' => file_get_contents($file->getPathname()),
193
                    ],
194
                ],
195
            ]);
196
        }
197
    }
198
199
    private function createIndex(\DateTime $timestamp): void
200
    {
201
        $context = Context::createDefaultContext();
202
203
        foreach ($this->registry->getDefinitions() as $definition) {
204
            $alias = $this->helper->getIndexName($definition->getEntityDefinition());
205
206
            $index = $alias . '_' . $timestamp->getTimestamp();
207
208
            $hasAlias = $this->indexCreator->aliasExists($alias);
209
210
            $this->indexCreator->createIndex($definition, $index, $alias, $context);
211
212
            $iterator = $this->iteratorFactory->createIterator($definition->getEntityDefinition());
213
214
            // We don't need an index task, when it's the first indexing. This will allow alias swapping to nothing
215
            if ($hasAlias) {
216
                $this->connection->insert('elasticsearch_index_task', [
217
                    'id' => Uuid::randomBytes(),
218
                    '`entity`' => $definition->getEntityDefinition()->getEntityName(),
219
                    '`index`' => $index,
220
                    '`alias`' => $alias,
221
                    '`doc_count`' => $iterator->fetchCount(),
222
                ]);
223
            }
224
        }
225
    }
226
227
    private function handleIndexingMessage(ElasticsearchIndexingMessage $message): void
228
    {
229
        $task = $message->getData();
230
231
        $ids = $task->getIds();
232
233
        $index = $task->getIndex();
234
235
        $this->connection->executeStatement('UPDATE elasticsearch_index_task SET `doc_count` = `doc_count` - :idCount WHERE `index` = :index', [
236
            'idCount' => \count($ids),
237
            'index' => $index,
238
        ]);
239
240
        if (!$this->client->indices()->exists(['index' => $index])) {
241
            return;
242
        }
243
244
        $entity = $task->getEntity();
245
246
        $definition = $this->registry->get($entity);
247
248
        $context = $message->getContext();
249
250
        if (!$definition) {
251
            throw ElasticsearchIndexingException::definitionNotFound($entity);
252
        }
253
254
        $data = $definition->fetch($ids, $context);
255
256
        $toRemove = array_filter($ids, fn (string $id) => !isset($data[$id]));
257
258
        $documents = [];
259
260
        foreach ($data as $id => $document) {
261
            $documents[] = ['index' => ['_id' => $id]];
262
            $documents[] = $document;
263
        }
264
265
        foreach ($toRemove as $id) {
266
            $documents[] = ['delete' => ['_id' => $id]];
267
        }
268
269
        $arguments = [
270
            'index' => $index,
271
            'body' => $documents,
272
        ];
273
274
        $result = $this->client->bulk($arguments);
275
276
        if (\is_array($result) && isset($result['errors']) && $result['errors']) {
277
            $errors = $this->parseErrors($result);
278
279
            throw ElasticsearchIndexingException::indexingError($errors);
280
        }
281
    }
282
}
283