1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
/* |
4
|
|
|
* This file is part of the ONGR package. |
5
|
|
|
* |
6
|
|
|
* (c) NFQ Technologies UAB <[email protected]> |
7
|
|
|
* |
8
|
|
|
* For the full copyright and license information, please view the LICENSE |
9
|
|
|
* file that was distributed with this source code. |
10
|
|
|
*/ |
11
|
|
|
|
12
|
|
|
namespace ONGR\ElasticsearchBundle\Service; |
13
|
|
|
|
14
|
|
|
use Elasticsearch\Client; |
15
|
|
|
use Elasticsearch\ClientBuilder; |
16
|
|
|
use ONGR\ElasticsearchBundle\Event\BulkEvent; |
17
|
|
|
use ONGR\ElasticsearchBundle\Event\CommitEvent; |
18
|
|
|
use ONGR\ElasticsearchBundle\Event\Events; |
19
|
|
|
use ONGR\ElasticsearchBundle\Event\PostCreateClientEvent; |
20
|
|
|
use ONGR\ElasticsearchBundle\Exception\BulkWithErrorsException; |
21
|
|
|
use ONGR\ElasticsearchBundle\Result\ArrayIterator; |
22
|
|
|
use ONGR\ElasticsearchBundle\Result\Converter; |
23
|
|
|
use ONGR\ElasticsearchBundle\Result\RawIterator; |
24
|
|
|
use ONGR\ElasticsearchDSL\Query\FullText\QueryStringQuery; |
25
|
|
|
use ONGR\ElasticsearchDSL\Query\Compound\BoolQuery; |
26
|
|
|
use ONGR\ElasticsearchDSL\Query\TermLevel\IdsQuery; |
27
|
|
|
use ONGR\ElasticsearchDSL\Search; |
28
|
|
|
use ONGR\ElasticsearchDSL\Sort\FieldSort; |
29
|
|
|
use ONGR\ElasticsearchBundle\Result\DocumentIterator; |
30
|
|
|
use Symfony\Component\EventDispatcher\EventDispatcherInterface; |
31
|
|
|
|
32
|
|
|
/** |
33
|
|
|
* Document repository class. |
34
|
|
|
*/ |
35
|
|
|
class IndexService |
36
|
|
|
{ |
37
|
|
|
private $client; |
38
|
|
|
|
39
|
|
|
private $indexName; |
40
|
|
|
|
41
|
|
|
private $config; |
42
|
|
|
|
43
|
|
|
private $converter; |
44
|
|
|
|
45
|
|
|
private $eventDispatcher; |
46
|
|
|
|
47
|
|
|
private $stopwatch; |
48
|
|
|
|
49
|
|
|
private $bulkCommitSize = 100; |
50
|
|
|
|
51
|
|
|
private $bulkQueries = []; |
52
|
|
|
|
53
|
|
|
private $commitMode = 'refresh'; |
54
|
|
|
|
55
|
|
|
/** |
56
|
|
|
* @deprecated will be removed in v7 since there will be no more types in the indexes. |
57
|
|
|
*/ |
58
|
|
|
private $typeName; |
59
|
|
|
|
60
|
|
|
public function __construct(Converter $converter, EventDispatcherInterface $eventDispatcher, array $config = []) |
61
|
|
|
{ |
62
|
|
|
$this->config = $config; |
63
|
|
|
$this->typeName = $config['type']; |
|
|
|
|
64
|
|
|
$this->converter = $converter; |
65
|
|
|
$this->eventDispatcher = $eventDispatcher; |
66
|
|
|
} |
67
|
|
|
|
68
|
|
|
/** |
69
|
|
|
* @deprecated will be removed in v7 since there will be no more types in the indexes. |
70
|
|
|
*/ |
71
|
|
|
public function getTypeName(): string |
72
|
|
|
{ |
73
|
|
|
return $this->typeName; |
|
|
|
|
74
|
|
|
} |
75
|
|
|
|
76
|
|
|
public function getClient(): Client |
77
|
|
|
{ |
78
|
|
|
if (!$this->client) { |
79
|
|
|
$client = ClientBuilder::create(); |
80
|
|
|
$client->setHosts($this->config['hosts']); |
81
|
|
|
// $this->client->set($this->config['hosts']); |
82
|
|
|
|
83
|
|
|
$this->eventDispatcher->dispatch( |
84
|
|
|
Events::POST_CLIENT_CREATE, |
85
|
|
|
new PostCreateClientEvent($client, $this->config) |
86
|
|
|
); |
87
|
|
|
$this->client = $client->build(); |
88
|
|
|
} |
89
|
|
|
return $this->client; |
90
|
|
|
} |
91
|
|
|
|
92
|
|
|
public function getIndexName(): string |
93
|
|
|
{ |
94
|
|
|
return $this->indexName; |
95
|
|
|
} |
96
|
|
|
|
97
|
|
|
public function getConverter(): Converter |
98
|
|
|
{ |
99
|
|
|
return $this->converter; |
100
|
|
|
} |
101
|
|
|
|
102
|
|
|
public function getEventDispatcher(): EventDispatcherInterface |
103
|
|
|
{ |
104
|
|
|
return $this->eventDispatcher; |
105
|
|
|
} |
106
|
|
|
|
107
|
|
|
public function getBulkCommitSize(): int |
108
|
|
|
{ |
109
|
|
|
return $this->bulkCommitSize; |
110
|
|
|
} |
111
|
|
|
|
112
|
|
|
public function setBulkCommitSize(int $bulkCommitSize) |
113
|
|
|
{ |
114
|
|
|
$this->bulkCommitSize = $bulkCommitSize; |
115
|
|
|
return $this; |
116
|
|
|
} |
117
|
|
|
|
118
|
|
|
public function getCommitMode(): string |
119
|
|
|
{ |
120
|
|
|
return $this->commitMode; |
121
|
|
|
} |
122
|
|
|
|
123
|
|
|
public function setCommitMode(string $commitMode): IndexService |
124
|
|
|
{ |
125
|
|
|
$this->commitMode = $commitMode; |
126
|
|
|
return $this; |
127
|
|
|
} |
128
|
|
|
|
129
|
|
|
public function getStopwatch() |
130
|
|
|
{ |
131
|
|
|
return $this->stopwatch; |
132
|
|
|
} |
133
|
|
|
|
134
|
|
|
public function setStopwatch($stopwatch) |
135
|
|
|
{ |
136
|
|
|
$this->stopwatch = $stopwatch; |
137
|
|
|
return $this; |
138
|
|
|
} |
139
|
|
|
|
140
|
|
|
public function getIndexSettings(): array |
141
|
|
|
{ |
142
|
|
|
|
143
|
|
|
} |
144
|
|
|
|
145
|
|
|
public function createIndex($noMapping = false): array |
|
|
|
|
146
|
|
|
{ |
147
|
|
|
return $this->getClient()->indices()->create($this->getIndexSettings()); |
148
|
|
|
} |
149
|
|
|
|
150
|
|
|
public function dropIndex(): array |
151
|
|
|
{ |
152
|
|
|
return $this->getClient()->indices()->delete(['index' => $this->getIndexName()]); |
153
|
|
|
} |
154
|
|
|
|
155
|
|
|
public function dropAndCreateIndex($noMapping = false) |
156
|
|
|
{ |
157
|
|
|
try { |
158
|
|
|
if ($this->indexExists()) { |
159
|
|
|
$this->dropIndex(); |
160
|
|
|
} |
161
|
|
|
} catch (\Exception $e) { |
162
|
|
|
// Do nothing, our target is to create new index. |
163
|
|
|
} |
164
|
|
|
|
165
|
|
|
return $this->createIndex($noMapping); |
166
|
|
|
} |
167
|
|
|
|
168
|
|
|
public function indexExists(): bool |
169
|
|
|
{ |
170
|
|
|
return $this->getClient()->indices()->exists(['index' => $this->getIndexName()]); |
171
|
|
|
} |
172
|
|
|
|
173
|
|
|
/** |
174
|
|
|
* Returns a single document by provided ID or null if a document was not found. |
175
|
|
|
* |
176
|
|
|
* @param string $id Document ID to find |
177
|
|
|
* @param array $params Custom parameters added to the query url |
178
|
|
|
* |
179
|
|
|
* @return object |
180
|
|
|
*/ |
181
|
|
|
public function find($id, $params = []) |
182
|
|
|
{ |
183
|
|
|
$requestParams = [ |
184
|
|
|
'index' => $this->getIndexName(), |
185
|
|
|
'type' => $this->getTypeName(), |
|
|
|
|
186
|
|
|
'id' => $id, |
187
|
|
|
]; |
188
|
|
|
|
189
|
|
|
$requestParams = array_merge($requestParams, $params); |
190
|
|
|
|
191
|
|
|
$result = $this->getClient()->get($requestParams); |
192
|
|
|
|
193
|
|
|
return $this->getConverter()->convertToDocument($result, $this); |
|
|
|
|
194
|
|
|
} |
195
|
|
|
|
196
|
|
|
public function findByIds(array $ids): DocumentIterator |
197
|
|
|
{ |
198
|
|
|
$search = $this->createSearch(); |
199
|
|
|
$search->addQuery(new IdsQuery($ids)); |
200
|
|
|
|
201
|
|
|
return $this->findDocuments($search); |
202
|
|
|
} |
203
|
|
|
|
204
|
|
|
/** |
205
|
|
|
* Finds documents by a set of criteria. |
206
|
|
|
* |
207
|
|
|
* @param array $criteria Example: ['group' => ['best', 'worst'], 'job' => 'medic']. |
208
|
|
|
* @param array|null $orderBy Example: ['name' => 'ASC', 'surname' => 'DESC']. |
209
|
|
|
* @param int|null $limit Default is 10. |
210
|
|
|
* @param int|null $offset Default is 0. |
211
|
|
|
* |
212
|
|
|
* @return array|DocumentIterator The objects. |
213
|
|
|
*/ |
214
|
|
|
public function findBy( |
215
|
|
|
array $criteria, |
216
|
|
|
array $orderBy = [], |
217
|
|
|
int $limit = 10, |
218
|
|
|
int $offset = 0 |
219
|
|
|
) { |
220
|
|
|
$search = $this->createSearch(); |
221
|
|
|
$search->setSize($limit); |
222
|
|
|
$search->setFrom($offset); |
223
|
|
|
|
224
|
|
|
foreach ($criteria as $field => $value) { |
225
|
|
|
if (preg_match('/^!(.+)$/', $field)) { |
226
|
|
|
$boolType = BoolQuery::MUST_NOT; |
227
|
|
|
$field = preg_replace('/^!/', '', $field); |
228
|
|
|
} else { |
229
|
|
|
$boolType = BoolQuery::MUST; |
230
|
|
|
} |
231
|
|
|
|
232
|
|
|
$search->addQuery( |
233
|
|
|
new QueryStringQuery(is_array($value) ? implode(' OR ', $value) : $value, ['default_field' => $field]), |
234
|
|
|
$boolType |
235
|
|
|
); |
236
|
|
|
} |
237
|
|
|
|
238
|
|
|
foreach ($orderBy as $field => $direction) { |
239
|
|
|
$search->addSort(new FieldSort($field, $direction)); |
240
|
|
|
} |
241
|
|
|
|
242
|
|
|
return $this->findDocuments($search); |
243
|
|
|
} |
244
|
|
|
|
245
|
|
|
/** |
246
|
|
|
* Finds a single document by a set of criteria. |
247
|
|
|
* |
248
|
|
|
* @param array $criteria Example: ['group' => ['best', 'worst'], 'job' => 'medic']. |
249
|
|
|
* @param array|null $orderBy Example: ['name' => 'ASC', 'surname' => 'DESC']. |
250
|
|
|
* |
251
|
|
|
* @return object|null The object. |
252
|
|
|
*/ |
253
|
|
|
public function findOneBy(array $criteria, array $orderBy = []) |
254
|
|
|
{ |
255
|
|
|
return $this->findBy($criteria, $orderBy, 1, null)->current(); |
256
|
|
|
} |
257
|
|
|
|
258
|
|
|
public function createSearch(): Search |
259
|
|
|
{ |
260
|
|
|
return new Search(); |
261
|
|
|
} |
262
|
|
|
|
263
|
|
|
public function getScrollConfiguration($raw, $scrollDuration): array |
264
|
|
|
{ |
265
|
|
|
$scrollConfig = []; |
266
|
|
|
if (isset($raw['_scroll_id'])) { |
267
|
|
|
$scrollConfig['_scroll_id'] = $raw['_scroll_id']; |
268
|
|
|
$scrollConfig['duration'] = $scrollDuration; |
269
|
|
|
} |
270
|
|
|
|
271
|
|
|
return $scrollConfig; |
272
|
|
|
} |
273
|
|
|
|
274
|
|
View Code Duplication |
public function findDocuments(Search $search): DocumentIterator |
|
|
|
|
275
|
|
|
{ |
276
|
|
|
$results = $this->executeSearch($search); |
277
|
|
|
|
278
|
|
|
return new DocumentIterator( |
279
|
|
|
$results, |
280
|
|
|
$this->getManager(), |
|
|
|
|
281
|
|
|
$this->getScrollConfiguration($results, $search->getScroll()) |
282
|
|
|
); |
283
|
|
|
} |
284
|
|
|
|
285
|
|
View Code Duplication |
public function findArray(Search $search): ArrayIterator |
|
|
|
|
286
|
|
|
{ |
287
|
|
|
$results = $this->executeSearch($search); |
288
|
|
|
|
289
|
|
|
return new ArrayIterator( |
290
|
|
|
$results, |
291
|
|
|
$this->getManager(), |
|
|
|
|
292
|
|
|
$this->getScrollConfiguration($results, $search->getScroll()) |
293
|
|
|
); |
294
|
|
|
} |
295
|
|
|
|
296
|
|
View Code Duplication |
public function findRaw(Search $search): RawIterator |
|
|
|
|
297
|
|
|
{ |
298
|
|
|
$results = $this->executeSearch($search); |
299
|
|
|
|
300
|
|
|
return new RawIterator( |
301
|
|
|
$results, |
302
|
|
|
$this->getManager(), |
|
|
|
|
303
|
|
|
$this->getScrollConfiguration($results, $search->getScroll()) |
304
|
|
|
); |
305
|
|
|
} |
306
|
|
|
|
307
|
|
|
private function executeSearch(Search $search): array |
308
|
|
|
{ |
309
|
|
|
return $this->search([$this->getTypeName()], $search->toArray(), $search->getUriParams()); |
|
|
|
|
310
|
|
|
} |
311
|
|
|
|
312
|
|
|
public function getIndexDocumentCount(): int |
313
|
|
|
{ |
314
|
|
|
$body = [ |
315
|
|
|
'index' => $this->getIndexName(), |
316
|
|
|
'type' => $this->getTypeName(), |
|
|
|
|
317
|
|
|
'body' => [], |
318
|
|
|
]; |
319
|
|
|
|
320
|
|
|
$results = $this->getClient()->count($body); |
321
|
|
|
|
322
|
|
|
return $results['count']; |
323
|
|
|
} |
324
|
|
|
|
325
|
|
|
public function remove($id, $routing = null) |
326
|
|
|
{ |
327
|
|
|
$params = [ |
328
|
|
|
'index' => $this->getIndexName(), |
329
|
|
|
'type' => $this->getTypeName(), |
|
|
|
|
330
|
|
|
'id' => $id, |
331
|
|
|
]; |
332
|
|
|
|
333
|
|
|
if ($routing) { |
334
|
|
|
$params['routing'] = $routing; |
335
|
|
|
} |
336
|
|
|
|
337
|
|
|
$response = $this->getClient()->delete($params); |
338
|
|
|
|
339
|
|
|
return $response; |
340
|
|
|
} |
341
|
|
|
|
342
|
|
|
public function update($id, array $fields = [], $script = null, array $params = []): array |
343
|
|
|
{ |
344
|
|
|
$body = array_filter( |
345
|
|
|
[ |
346
|
|
|
'doc' => $fields, |
347
|
|
|
'script' => $script, |
348
|
|
|
] |
349
|
|
|
); |
350
|
|
|
|
351
|
|
|
$params = array_merge( |
352
|
|
|
[ |
353
|
|
|
'id' => $id, |
354
|
|
|
'index' => $this->getIndexName(), |
355
|
|
|
'type' => $this->getTypeName(), |
|
|
|
|
356
|
|
|
'body' => $body, |
357
|
|
|
], |
358
|
|
|
$params |
359
|
|
|
); |
360
|
|
|
|
361
|
|
|
return $this->getClient()->update($params); |
362
|
|
|
} |
363
|
|
|
|
364
|
|
|
public function search(array $query, array $params = []): array |
365
|
|
|
{ |
366
|
|
|
$requestParams = [ |
367
|
|
|
'index' => $this->getIndexName(), |
368
|
|
|
'type' => $this->getTypeName(), |
|
|
|
|
369
|
|
|
'body' => $query, |
370
|
|
|
]; |
371
|
|
|
|
372
|
|
|
|
373
|
|
|
if (!empty($params)) { |
374
|
|
|
$params = array_merge($requestParams, $params); |
|
|
|
|
375
|
|
|
} |
376
|
|
|
|
377
|
|
|
// $this->stopwatch('start', 'search'); |
378
|
|
|
$result = $this->client->search($requestParams); |
379
|
|
|
// $this->stopwatch('stop', 'search'); |
380
|
|
|
|
381
|
|
|
return $result; |
382
|
|
|
} |
383
|
|
|
|
384
|
|
|
public function bulk(string $operation, array $header, array $query = []): array |
385
|
|
|
{ |
386
|
|
|
$this->eventDispatcher->dispatch( |
387
|
|
|
Events::BULK, |
388
|
|
|
new BulkEvent($operation, $this->getTypeName(), $header, $query) |
|
|
|
|
389
|
|
|
); |
390
|
|
|
|
391
|
|
|
$this->bulkQueries[] = $header; |
392
|
|
|
|
393
|
|
|
if (!empty($query)) $this->bulkQueries[] = $query; |
394
|
|
|
|
395
|
|
|
$response = []; |
396
|
|
|
// %2 is not very accurate, but better than use counter. This place is experimental for now. |
397
|
|
|
if ($this->getBulkCommitSize() >= count($this->bulkQueries % 2)) { |
398
|
|
|
$response = $this->commit(); |
399
|
|
|
} |
400
|
|
|
|
401
|
|
|
return $response; |
402
|
|
|
} |
403
|
|
|
|
404
|
|
|
/** |
405
|
|
|
* Adds document to next flush. |
406
|
|
|
* |
407
|
|
|
* @param object $document |
408
|
|
|
*/ |
409
|
|
|
public function persist($document) |
410
|
|
|
{ |
411
|
|
|
$documentArray = $this->converter->convertToArray($document); |
412
|
|
|
$type = $this->getMetadataCollector()->getDocumentType(get_class($document)); |
|
|
|
|
413
|
|
|
|
414
|
|
|
$this->bulk('index', $type, $documentArray); |
415
|
|
|
} |
416
|
|
|
|
417
|
|
|
public function commit(array $params = []): array |
418
|
|
|
{ |
419
|
|
|
$bulkResponse = []; |
420
|
|
|
if (!empty($this->bulkQueries)) { |
421
|
|
|
$this->eventDispatcher->dispatch( |
422
|
|
|
Events::PRE_COMMIT, |
423
|
|
|
new CommitEvent($this->getCommitMode(), $this->bulkQueries) |
424
|
|
|
); |
425
|
|
|
|
426
|
|
|
// $this->stopwatch('start', 'bulk'); |
427
|
|
|
$bulkResponse = $this->client->bulk( |
428
|
|
|
array_merge( |
429
|
|
|
[ |
430
|
|
|
'index' => $this->getIndexName(), |
431
|
|
|
'body' => $this->bulkQueries, |
432
|
|
|
], |
433
|
|
|
$params |
434
|
|
|
) |
435
|
|
|
); |
436
|
|
|
// $this->stopwatch('stop', 'bulk'); |
437
|
|
|
|
438
|
|
|
if ($bulkResponse['errors']) { |
439
|
|
|
throw new BulkWithErrorsException( |
440
|
|
|
json_encode($bulkResponse), |
441
|
|
|
0, |
442
|
|
|
null, |
443
|
|
|
$bulkResponse |
444
|
|
|
); |
445
|
|
|
} |
446
|
|
|
|
447
|
|
|
$this->stopwatch('start', 'refresh'); |
448
|
|
|
|
449
|
|
|
switch ($this->getCommitMode()) { |
450
|
|
|
case 'flush': |
451
|
|
|
$this->getClient()->indices()->flush(); |
452
|
|
|
break; |
453
|
|
|
case 'flush_synced': |
454
|
|
|
$this->getClient()->indices()->flushSynced(); |
455
|
|
|
break; |
456
|
|
|
case 'refresh': |
457
|
|
|
$this->getClient()->indices()->refresh(); |
458
|
|
|
break; |
459
|
|
|
} |
460
|
|
|
|
461
|
|
|
$this->eventDispatcher->dispatch( |
462
|
|
|
Events::POST_COMMIT, |
463
|
|
|
new CommitEvent($this->getCommitMode(), $this->bulkQueries, $bulkResponse) |
464
|
|
|
); |
465
|
|
|
|
466
|
|
|
$this->bulkQueries = []; |
467
|
|
|
|
468
|
|
|
$this->stopwatch('stop', $this->getCommitMode()); |
469
|
|
|
} |
470
|
|
|
|
471
|
|
|
return $bulkResponse; |
472
|
|
|
} |
473
|
|
|
|
474
|
|
|
public function flush(array $params = []): array |
475
|
|
|
{ |
476
|
|
|
return $this->client->indices()->flush(array_merge(['index' => $this->getIndexName()], $params)); |
477
|
|
|
} |
478
|
|
|
|
479
|
|
|
public function refresh(array $params = []): array |
480
|
|
|
{ |
481
|
|
|
return $this->client->indices()->refresh(array_merge(['index' => $this->getIndexName()], $params)); |
482
|
|
|
} |
483
|
|
|
|
484
|
|
|
public function scroll($scrollId, $scrollDuration = '5m'): array |
485
|
|
|
{ |
486
|
|
|
$results = $this->getClient()->scroll(['scroll_id' => $scrollId, 'scroll' => $scrollDuration]); |
487
|
|
|
|
488
|
|
|
return $results; |
489
|
|
|
} |
490
|
|
|
|
491
|
|
|
public function clearScroll($scrollId): array |
492
|
|
|
{ |
493
|
|
|
return $this->getClient()->clearScroll(['scroll_id' => $scrollId]); |
494
|
|
|
} |
495
|
|
|
|
496
|
|
|
public function clearElasticIndexCache(): array |
497
|
|
|
{ |
498
|
|
|
return $this->getClient()->indices()->clearCache(['index' => $this->getIndexName()]); |
499
|
|
|
} |
500
|
|
|
|
501
|
|
|
private function stopwatch($action, $name): void |
502
|
|
|
{ |
503
|
|
|
if ($this->stopwatch && ($action == 'start' || $action == 'stop')) { |
504
|
|
|
$this->stopwatch->$action('ongr_es: '.$name, 'ongr_es'); |
505
|
|
|
} |
506
|
|
|
} |
507
|
|
|
} |
508
|
|
|
|
This property has been deprecated. The supplier of the class has supplied an explanatory message.
The explanatory message should give you some clue as to whether and when the property will be removed from the class and what other property to use instead.