|
1
|
|
|
<?php declare(strict_types=1); |
|
2
|
|
|
|
|
3
|
|
|
namespace Shopware\Elasticsearch\Admin; |
|
4
|
|
|
|
|
5
|
|
|
use Doctrine\DBAL\Connection; |
|
6
|
|
|
use OpenSearch\Client; |
|
7
|
|
|
use Shopware\Core\Framework\DataAbstractionLayer\Event\EntityWrittenContainerEvent; |
|
8
|
|
|
use Shopware\Core\Framework\Event\ProgressAdvancedEvent; |
|
9
|
|
|
use Shopware\Core\Framework\Event\ProgressFinishedEvent; |
|
10
|
|
|
use Shopware\Core\Framework\Event\ProgressStartedEvent; |
|
11
|
|
|
use Shopware\Core\Framework\Uuid\Uuid; |
|
12
|
|
|
use Shopware\Elasticsearch\Admin\Indexer\AbstractAdminIndexer; |
|
13
|
|
|
use Shopware\Elasticsearch\Exception\ElasticsearchIndexingException; |
|
14
|
|
|
use Symfony\Component\EventDispatcher\EventSubscriberInterface; |
|
15
|
|
|
use Symfony\Component\Messenger\Handler\MessageHandlerInterface; |
|
16
|
|
|
use Symfony\Component\Messenger\MessageBusInterface; |
|
17
|
|
|
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface; |
|
18
|
|
|
|
|
19
|
|
|
/** |
|
20
|
|
|
* @package system-settings |
|
21
|
|
|
* |
|
22
|
|
|
* @internal |
|
23
|
|
|
* |
|
24
|
|
|
* @final |
|
25
|
|
|
*/ |
|
26
|
|
|
class AdminSearchRegistry implements MessageHandlerInterface, EventSubscriberInterface |
|
|
|
|
|
|
27
|
|
|
{ |
|
28
|
|
|
/** |
|
29
|
|
|
* @var array<string, mixed> |
|
30
|
|
|
*/ |
|
31
|
|
|
private array $indexer; |
|
32
|
|
|
|
|
33
|
|
|
private Connection $connection; |
|
34
|
|
|
|
|
35
|
|
|
private MessageBusInterface $queue; |
|
36
|
|
|
|
|
37
|
|
|
private EventDispatcherInterface $dispatcher; |
|
38
|
|
|
|
|
39
|
|
|
private Client $client; |
|
40
|
|
|
|
|
41
|
|
|
private AdminElasticsearchHelper $adminEsHelper; |
|
42
|
|
|
|
|
43
|
|
|
/** |
|
44
|
|
|
* @var array<mixed> |
|
45
|
|
|
*/ |
|
46
|
|
|
private array $config; |
|
47
|
|
|
|
|
48
|
|
|
/** |
|
49
|
|
|
* @var array<mixed> |
|
50
|
|
|
*/ |
|
51
|
|
|
private array $mapping; |
|
52
|
|
|
|
|
53
|
|
|
/** |
|
54
|
|
|
* @param AbstractAdminIndexer[] $indexer |
|
55
|
|
|
* @param array<mixed> $config |
|
56
|
|
|
* @param array<mixed> $mapping |
|
57
|
|
|
*/ |
|
58
|
|
|
public function __construct( |
|
59
|
|
|
$indexer, |
|
60
|
|
|
Connection $connection, |
|
61
|
|
|
MessageBusInterface $queue, |
|
62
|
|
|
EventDispatcherInterface $dispatcher, |
|
63
|
|
|
Client $client, |
|
64
|
|
|
AdminElasticsearchHelper $adminEsHelper, |
|
65
|
|
|
array $config, |
|
66
|
|
|
array $mapping |
|
67
|
|
|
) { |
|
68
|
|
|
$this->indexer = $indexer instanceof \Traversable ? iterator_to_array($indexer) : $indexer; |
|
|
|
|
|
|
69
|
|
|
$this->connection = $connection; |
|
70
|
|
|
$this->queue = $queue; |
|
71
|
|
|
$this->dispatcher = $dispatcher; |
|
72
|
|
|
$this->client = $client; |
|
73
|
|
|
$this->adminEsHelper = $adminEsHelper; |
|
74
|
|
|
$this->mapping = $mapping; |
|
75
|
|
|
|
|
76
|
|
|
if (isset($config['settings']['index'])) { |
|
77
|
|
|
if (\array_key_exists('number_of_shards', $config['settings']['index']) && $config['settings']['index']['number_of_shards'] === null) { |
|
78
|
|
|
unset($config['settings']['index']['number_of_shards']); |
|
79
|
|
|
} |
|
80
|
|
|
|
|
81
|
|
|
if (\array_key_exists('number_of_replicas', $config['settings']['index']) && $config['settings']['index']['number_of_replicas'] === null) { |
|
82
|
|
|
unset($config['settings']['index']['number_of_replicas']); |
|
83
|
|
|
} |
|
84
|
|
|
} |
|
85
|
|
|
|
|
86
|
|
|
$this->config = $config; |
|
87
|
|
|
} |
|
88
|
|
|
|
|
89
|
|
|
public function __invoke(AdminSearchIndexingMessage $message): void |
|
90
|
|
|
{ |
|
91
|
|
|
$indexer = $this->getIndexer($message->getEntity()); |
|
92
|
|
|
|
|
93
|
|
|
$documents = $indexer->fetch($message->getIds()); |
|
94
|
|
|
|
|
95
|
|
|
$this->push($indexer, $message->getIndices(), $documents, $message->getIds()); |
|
96
|
|
|
} |
|
97
|
|
|
|
|
98
|
|
|
public static function getSubscribedEvents(): array |
|
99
|
|
|
{ |
|
100
|
|
|
return [ |
|
101
|
|
|
EntityWrittenContainerEvent::class => [ |
|
102
|
|
|
['refresh', -1000], |
|
103
|
|
|
], |
|
104
|
|
|
]; |
|
105
|
|
|
} |
|
106
|
|
|
|
|
107
|
|
|
/** |
|
108
|
|
|
* @return iterable<class-string> |
|
109
|
|
|
*/ |
|
110
|
|
|
public static function getHandledMessages(): iterable |
|
111
|
|
|
{ |
|
112
|
|
|
return [ |
|
113
|
|
|
AdminSearchIndexingMessage::class, |
|
114
|
|
|
]; |
|
115
|
|
|
} |
|
116
|
|
|
|
|
117
|
|
|
public function iterate(AdminIndexingBehavior $indexingBehavior): void |
|
118
|
|
|
{ |
|
119
|
|
|
if ($this->adminEsHelper->getEnabled() === false) { |
|
120
|
|
|
return; |
|
121
|
|
|
} |
|
122
|
|
|
|
|
123
|
|
|
/** @var array<string> $entities */ |
|
124
|
|
|
$entities = array_keys($this->indexer); |
|
125
|
|
|
|
|
126
|
|
|
if ($indexingBehavior->getOnlyEntities()) { |
|
127
|
|
|
$entities = array_intersect($entities, $indexingBehavior->getOnlyEntities()); |
|
128
|
|
|
} elseif ($indexingBehavior->getSkipEntities()) { |
|
129
|
|
|
$entities = array_diff($entities, $indexingBehavior->getSkipEntities()); |
|
130
|
|
|
} |
|
131
|
|
|
|
|
132
|
|
|
$indices = $this->createIndices($entities); |
|
133
|
|
|
|
|
134
|
|
|
foreach ($entities as $entityName) { |
|
135
|
|
|
$indexer = $this->getIndexer($entityName); |
|
136
|
|
|
$iterator = $indexer->getIterator(); |
|
137
|
|
|
|
|
138
|
|
|
$this->dispatcher->dispatch(new ProgressStartedEvent($indexer->getName(), $iterator->fetchCount())); |
|
139
|
|
|
|
|
140
|
|
|
while ($ids = $iterator->fetch()) { |
|
141
|
|
|
// we provide no queue when the data is sent by the admin |
|
142
|
|
|
if ($indexingBehavior->getNoQueue() === true) { |
|
143
|
|
|
$this->__invoke(new AdminSearchIndexingMessage($indexer->getEntity(), $indexer->getName(), $indices, $ids)); |
|
144
|
|
|
} else { |
|
145
|
|
|
$this->queue->dispatch(new AdminSearchIndexingMessage($indexer->getEntity(), $indexer->getName(), $indices, $ids)); |
|
146
|
|
|
} |
|
147
|
|
|
|
|
148
|
|
|
$this->dispatcher->dispatch(new ProgressAdvancedEvent(\count($ids))); |
|
149
|
|
|
} |
|
150
|
|
|
|
|
151
|
|
|
$this->dispatcher->dispatch(new ProgressFinishedEvent($indexer->getName())); |
|
152
|
|
|
} |
|
153
|
|
|
|
|
154
|
|
|
$this->swapAlias($indices); |
|
155
|
|
|
} |
|
156
|
|
|
|
|
157
|
|
|
public function refresh(EntityWrittenContainerEvent $event): void |
|
158
|
|
|
{ |
|
159
|
|
|
if ($this->adminEsHelper->getEnabled() === false || !$this->isIndexedEntityWritten($event)) { |
|
160
|
|
|
return; |
|
161
|
|
|
} |
|
162
|
|
|
|
|
163
|
|
|
if ($this->adminEsHelper->getRefreshIndices()) { |
|
164
|
|
|
$this->refreshIndices(); |
|
165
|
|
|
} |
|
166
|
|
|
|
|
167
|
|
|
/** @var array<string, string> $indices */ |
|
168
|
|
|
$indices = $this->connection->fetchAllKeyValue('SELECT `alias`, `index` FROM admin_elasticsearch_index_task'); |
|
169
|
|
|
|
|
170
|
|
|
if (empty($indices)) { |
|
171
|
|
|
return; |
|
172
|
|
|
} |
|
173
|
|
|
|
|
174
|
|
|
foreach ($this->indexer as $indexer) { |
|
175
|
|
|
$ids = $event->getPrimaryKeys($indexer->getEntity()); |
|
176
|
|
|
|
|
177
|
|
|
if (empty($ids)) { |
|
178
|
|
|
continue; |
|
179
|
|
|
} |
|
180
|
|
|
$documents = $indexer->fetch($ids); |
|
181
|
|
|
|
|
182
|
|
|
$this->push($indexer, $indices, $documents, $ids); |
|
183
|
|
|
} |
|
184
|
|
|
} |
|
185
|
|
|
|
|
186
|
|
|
/** |
|
187
|
|
|
* @return AbstractAdminIndexer[] |
|
188
|
|
|
*/ |
|
189
|
|
|
public function getIndexers(): iterable |
|
190
|
|
|
{ |
|
191
|
|
|
return $this->indexer; |
|
192
|
|
|
} |
|
193
|
|
|
|
|
194
|
|
|
public function getIndexer(string $name): AbstractAdminIndexer |
|
195
|
|
|
{ |
|
196
|
|
|
$indexer = $this->indexer[$name] ?? null; |
|
197
|
|
|
if ($indexer) { |
|
198
|
|
|
return $indexer; |
|
199
|
|
|
} |
|
200
|
|
|
|
|
201
|
|
|
throw new ElasticsearchIndexingException([\sprintf('Indexer for name %s not found', $name)]); |
|
202
|
|
|
} |
|
203
|
|
|
|
|
204
|
|
|
private function isIndexedEntityWritten(EntityWrittenContainerEvent $event): bool |
|
205
|
|
|
{ |
|
206
|
|
|
foreach ($this->indexer as $indexer) { |
|
207
|
|
|
$ids = $event->getPrimaryKeys($indexer->getEntity()); |
|
208
|
|
|
|
|
209
|
|
|
if (!empty($ids)) { |
|
210
|
|
|
return true; |
|
211
|
|
|
} |
|
212
|
|
|
} |
|
213
|
|
|
|
|
214
|
|
|
return false; |
|
215
|
|
|
} |
|
216
|
|
|
|
|
217
|
|
|
/** |
|
218
|
|
|
* @param array<string, string> $indices |
|
219
|
|
|
* @param array<string, array<string|int, string>> $data |
|
220
|
|
|
* @param array<string> $ids |
|
221
|
|
|
*/ |
|
222
|
|
|
private function push(AbstractAdminIndexer $indexer, array $indices, array $data, array $ids): void |
|
223
|
|
|
{ |
|
224
|
|
|
$alias = $this->adminEsHelper->getIndex($indexer->getName()); |
|
225
|
|
|
|
|
226
|
|
|
if (!isset($indices[$alias])) { |
|
227
|
|
|
return; |
|
228
|
|
|
} |
|
229
|
|
|
|
|
230
|
|
|
$toRemove = array_filter($ids, static function (string $id) use ($data): bool { |
|
231
|
|
|
return !isset($data[$id]); |
|
232
|
|
|
}); |
|
233
|
|
|
|
|
234
|
|
|
$documents = []; |
|
235
|
|
|
foreach ($data as $id => $document) { |
|
236
|
|
|
$documents[] = ['index' => ['_id' => $id]]; |
|
237
|
|
|
|
|
238
|
|
|
$documents[] = \array_replace( |
|
239
|
|
|
['entityName' => $indexer->getEntity(), 'parameters' => [], 'textBoosted' => '', 'text' => ''], |
|
240
|
|
|
$document |
|
241
|
|
|
); |
|
242
|
|
|
} |
|
243
|
|
|
|
|
244
|
|
|
foreach ($toRemove as $id) { |
|
245
|
|
|
$documents[] = ['delete' => ['_id' => $id]]; |
|
246
|
|
|
} |
|
247
|
|
|
|
|
248
|
|
|
$arguments = [ |
|
249
|
|
|
'index' => $indices[$alias], |
|
250
|
|
|
'body' => $documents, |
|
251
|
|
|
]; |
|
252
|
|
|
|
|
253
|
|
|
$result = $this->client->bulk($arguments); |
|
254
|
|
|
|
|
255
|
|
|
if (\is_array($result) && !empty($result['errors'])) { |
|
256
|
|
|
$errors = $this->parseErrors($result); |
|
257
|
|
|
|
|
258
|
|
|
throw new ElasticsearchIndexingException($errors); |
|
259
|
|
|
} |
|
260
|
|
|
} |
|
261
|
|
|
|
|
262
|
|
|
/** |
|
263
|
|
|
* @param array<string> $entities |
|
264
|
|
|
* |
|
265
|
|
|
* @throws \Doctrine\DBAL\Exception |
|
266
|
|
|
* |
|
267
|
|
|
* @return array<string, string> |
|
268
|
|
|
*/ |
|
269
|
|
|
private function createIndices(array $entities): array |
|
270
|
|
|
{ |
|
271
|
|
|
$indexTasks = []; |
|
272
|
|
|
$indices = []; |
|
273
|
|
|
foreach ($entities as $entityName) { |
|
274
|
|
|
$indexer = $this->getIndexer($entityName); |
|
275
|
|
|
$alias = $this->adminEsHelper->getIndex($indexer->getName()); |
|
276
|
|
|
$index = $alias . '_' . time(); |
|
277
|
|
|
|
|
278
|
|
|
if ($this->indexExists($index)) { |
|
279
|
|
|
continue; |
|
280
|
|
|
} |
|
281
|
|
|
|
|
282
|
|
|
$indices[$alias] = $index; |
|
283
|
|
|
|
|
284
|
|
|
$this->create($indexer, $index, $alias); |
|
285
|
|
|
|
|
286
|
|
|
$iterator = $indexer->getIterator(); |
|
287
|
|
|
$indexTasks[] = [ |
|
288
|
|
|
'id' => Uuid::randomBytes(), |
|
289
|
|
|
'`entity`' => $indexer->getEntity(), |
|
290
|
|
|
'`index`' => $index, |
|
291
|
|
|
'`alias`' => $alias, |
|
292
|
|
|
'`doc_count`' => $iterator->fetchCount(), |
|
293
|
|
|
]; |
|
294
|
|
|
} |
|
295
|
|
|
|
|
296
|
|
|
$this->connection->executeStatement( |
|
297
|
|
|
'DELETE FROM admin_elasticsearch_index_task WHERE `entity` IN (:entities)', |
|
298
|
|
|
['entities' => $entities], |
|
299
|
|
|
['entities' => Connection::PARAM_STR_ARRAY] |
|
300
|
|
|
); |
|
301
|
|
|
|
|
302
|
|
|
foreach ($indexTasks as $task) { |
|
303
|
|
|
$this->connection->insert('admin_elasticsearch_index_task', $task); |
|
304
|
|
|
} |
|
305
|
|
|
|
|
306
|
|
|
return $indices; |
|
307
|
|
|
} |
|
308
|
|
|
|
|
309
|
|
|
private function refreshIndices(): void |
|
310
|
|
|
{ |
|
311
|
|
|
$entities = []; |
|
312
|
|
|
$indexTasks = []; |
|
313
|
|
|
foreach ($this->indexer as $indexer) { |
|
314
|
|
|
$alias = $this->adminEsHelper->getIndex($indexer->getName()); |
|
315
|
|
|
|
|
316
|
|
|
if ($this->aliasExists($alias)) { |
|
317
|
|
|
continue; |
|
318
|
|
|
} |
|
319
|
|
|
|
|
320
|
|
|
$index = $alias . '_' . time(); |
|
321
|
|
|
$this->create($indexer, $index, $alias); |
|
322
|
|
|
|
|
323
|
|
|
$entities[] = $indexer->getEntity(); |
|
324
|
|
|
|
|
325
|
|
|
$iterator = $indexer->getIterator(); |
|
326
|
|
|
$indexTasks[] = [ |
|
327
|
|
|
'id' => Uuid::randomBytes(), |
|
328
|
|
|
'`entity`' => $indexer->getEntity(), |
|
329
|
|
|
'`index`' => $index, |
|
330
|
|
|
'`alias`' => $alias, |
|
331
|
|
|
'`doc_count`' => $iterator->fetchCount(), |
|
332
|
|
|
]; |
|
333
|
|
|
} |
|
334
|
|
|
|
|
335
|
|
|
$this->connection->executeStatement( |
|
336
|
|
|
'DELETE FROM admin_elasticsearch_index_task WHERE `entity` IN (:entities)', |
|
337
|
|
|
['entities' => $entities], |
|
338
|
|
|
['entities' => Connection::PARAM_STR_ARRAY] |
|
339
|
|
|
); |
|
340
|
|
|
|
|
341
|
|
|
foreach ($indexTasks as $task) { |
|
342
|
|
|
$this->connection->insert('admin_elasticsearch_index_task', $task); |
|
343
|
|
|
} |
|
344
|
|
|
} |
|
345
|
|
|
|
|
346
|
|
|
private function create(AbstractAdminIndexer $indexer, string $index, string $alias): void |
|
347
|
|
|
{ |
|
348
|
|
|
$mapping = $indexer->mapping([ |
|
349
|
|
|
'properties' => [ |
|
350
|
|
|
'id' => ['type' => 'keyword'], |
|
351
|
|
|
'textBoosted' => ['type' => 'text'], |
|
352
|
|
|
'text' => ['type' => 'text'], |
|
353
|
|
|
'entityName' => ['type' => 'keyword'], |
|
354
|
|
|
'parameters' => ['type' => 'keyword'], |
|
355
|
|
|
], |
|
356
|
|
|
]); |
|
357
|
|
|
|
|
358
|
|
|
$mapping = array_merge_recursive($mapping, $this->mapping); |
|
359
|
|
|
|
|
360
|
|
|
$body = array_merge( |
|
361
|
|
|
$this->config, |
|
362
|
|
|
['mappings' => $mapping] |
|
363
|
|
|
); |
|
364
|
|
|
|
|
365
|
|
|
$this->client->indices()->create([ |
|
366
|
|
|
'index' => $index, |
|
367
|
|
|
'body' => $body, |
|
368
|
|
|
]); |
|
369
|
|
|
|
|
370
|
|
|
$this->createAliasIfNotExisting($index, $alias); |
|
371
|
|
|
} |
|
372
|
|
|
|
|
373
|
|
|
private function indexExists(string $name): bool |
|
374
|
|
|
{ |
|
375
|
|
|
return $this->client->indices()->exists(['index' => $name]); |
|
376
|
|
|
} |
|
377
|
|
|
|
|
378
|
|
|
private function aliasExists(string $alias): bool |
|
379
|
|
|
{ |
|
380
|
|
|
return $this->client->indices()->existsAlias(['name' => $alias]); |
|
381
|
|
|
} |
|
382
|
|
|
|
|
383
|
|
|
/** |
|
384
|
|
|
* @param array<string, array<array<string, mixed>>> $result |
|
385
|
|
|
* |
|
386
|
|
|
* @return array<array{reason: string}|string> |
|
387
|
|
|
*/ |
|
388
|
|
|
private function parseErrors(array $result): array |
|
389
|
|
|
{ |
|
390
|
|
|
$errors = []; |
|
391
|
|
|
foreach ($result['items'] as $item) { |
|
392
|
|
|
$item = $item['index'] ?? $item['delete']; |
|
393
|
|
|
|
|
394
|
|
|
if (\in_array($item['status'], [200, 201], true)) { |
|
395
|
|
|
continue; |
|
396
|
|
|
} |
|
397
|
|
|
|
|
398
|
|
|
$errors[] = [ |
|
399
|
|
|
'index' => $item['_index'], |
|
400
|
|
|
'id' => $item['_id'], |
|
401
|
|
|
'type' => $item['error']['type'] ?? $item['_type'], |
|
402
|
|
|
'reason' => $item['error']['reason'] ?? $item['result'], |
|
403
|
|
|
]; |
|
404
|
|
|
} |
|
405
|
|
|
|
|
406
|
|
|
return $errors; |
|
407
|
|
|
} |
|
408
|
|
|
|
|
409
|
|
|
private function createAliasIfNotExisting(string $index, string $alias): void |
|
410
|
|
|
{ |
|
411
|
|
|
$exist = $this->client->indices()->existsAlias(['name' => $alias]); |
|
412
|
|
|
|
|
413
|
|
|
if ($exist) { |
|
414
|
|
|
return; |
|
415
|
|
|
} |
|
416
|
|
|
|
|
417
|
|
|
$this->putAlias($index, $alias); |
|
418
|
|
|
} |
|
419
|
|
|
|
|
420
|
|
|
/** |
|
421
|
|
|
* @param array<string, string> $indices |
|
422
|
|
|
*/ |
|
423
|
|
|
private function swapAlias($indices): void |
|
424
|
|
|
{ |
|
425
|
|
|
foreach ($indices as $alias => $index) { |
|
426
|
|
|
$exist = $this->client->indices()->existsAlias(['name' => $alias]); |
|
427
|
|
|
|
|
428
|
|
|
if (!$exist) { |
|
429
|
|
|
$this->putAlias($index, $alias); |
|
430
|
|
|
|
|
431
|
|
|
return; |
|
432
|
|
|
} |
|
433
|
|
|
|
|
434
|
|
|
$current = $this->client->indices()->getAlias(['name' => $alias]); |
|
435
|
|
|
|
|
436
|
|
|
if (!isset($current[$index])) { |
|
437
|
|
|
$this->putAlias($index, $alias); |
|
438
|
|
|
} |
|
439
|
|
|
|
|
440
|
|
|
unset($current[$index]); |
|
441
|
|
|
$current = array_keys($current); |
|
442
|
|
|
|
|
443
|
|
|
foreach ($current as $value) { |
|
444
|
|
|
$this->client->indices()->delete(['index' => $value]); |
|
445
|
|
|
} |
|
446
|
|
|
} |
|
447
|
|
|
} |
|
448
|
|
|
|
|
449
|
|
|
private function putAlias(string $index, string $alias): void |
|
450
|
|
|
{ |
|
451
|
|
|
$this->client->indices()->refresh([ |
|
452
|
|
|
'index' => $index, |
|
453
|
|
|
]); |
|
454
|
|
|
$this->client->indices()->putAlias(['index' => $index, 'name' => $alias]); |
|
455
|
|
|
} |
|
456
|
|
|
} |
|
457
|
|
|
|
This interface has been deprecated. The supplier of the interface has supplied an explanatory message.
The explanatory message should give you some clue as to whether and when the interface will be removed and what other interface to use instead.