Completed
Pull Request — master (#869)
by
unknown
03:07 queued 01:21
created

IndexService::createSearch()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 4
c 0
b 0
f 0
rs 10
cc 1
nc 1
nop 0
1
<?php
2
3
/*
4
 * This file is part of the ONGR package.
5
 *
6
 * (c) NFQ Technologies UAB <[email protected]>
7
 *
8
 * For the full copyright and license information, please view the LICENSE
9
 * file that was distributed with this source code.
10
 */
11
12
namespace ONGR\ElasticsearchBundle\Service;
13
14
use Elasticsearch\Client;
15
use Elasticsearch\ClientBuilder;
16
use ONGR\ElasticsearchBundle\Event\BulkEvent;
17
use ONGR\ElasticsearchBundle\Event\CommitEvent;
18
use ONGR\ElasticsearchBundle\Event\Events;
19
use ONGR\ElasticsearchBundle\Event\PostCreateClientEvent;
20
use ONGR\ElasticsearchBundle\Exception\BulkWithErrorsException;
21
use ONGR\ElasticsearchBundle\Mapping\Converter;
22
use ONGR\ElasticsearchBundle\Mapping\IndexSettings;
23
use ONGR\ElasticsearchBundle\Result\ArrayIterator;
24
use ONGR\ElasticsearchBundle\Result\RawIterator;
25
use ONGR\ElasticsearchDSL\Query\TermLevel\IdsQuery;
26
use ONGR\ElasticsearchDSL\Query\TermLevel\TermQuery;
27
use ONGR\ElasticsearchDSL\Search;
28
use ONGR\ElasticsearchDSL\Sort\FieldSort;
29
use ONGR\ElasticsearchBundle\Result\DocumentIterator;
30
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
31
use Symfony\Component\Serializer\Serializer;
32
33
class IndexService
34
{
35
    private $client;
36
    private $namespace;
37
    private $converter;
38
    private $eventDispatcher;
39
40
    private $stopwatch;
41
    private $bulkCommitSize = 100;
42
    private $bulkQueries = [];
43
    private $indexSettings = [];
44
    private $serializer;
45
    private $tracer;
46
47
    public function __construct(
48
        string $namespace,
49
        Converter $converter,
50
        EventDispatcherInterface $eventDispatcher,
51
        Serializer $serializer,
52
        IndexSettings $indexSettings,
53
        $tracer = null
54
    ) {
55
        $this->namespace = $namespace;
56
        $this->converter = $converter;
57
        $this->eventDispatcher = $eventDispatcher;
58
        $this->serializer = $serializer;
59
        $this->indexSettings = $indexSettings;
0 ignored issues
show
Documentation Bug introduced by
It seems like $indexSettings of type object<ONGR\Elasticsearc...\Mapping\IndexSettings> is incompatible with the declared type array of property $indexSettings.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
60
        $this->tracer = $tracer;
61
        $this->getClient();
62
    }
63
64
    public function getNamespace(): string
65
    {
66
        return $this->namespace;
67
    }
68
69
    /**
70
     * @deprecated will be removed in v7 since there will be no more types in the indexes.
71
     */
72
    public function getTypeName(): string
73
    {
74
        return $this->indexSettings->getType();
0 ignored issues
show
Bug introduced by
The method getType cannot be called on $this->indexSettings (of type array).

Methods can only be called on objects. This check looks for methods being called on variables that have been inferred to never be objects.

Loading history...
75
    }
76
77
    public function getIndexSettings()
78
    {
79
        return $this->indexSettings;
80
    }
81
82
    public function setIndexSettings($indexSettings): self
83
    {
84
        $this->indexSettings = $indexSettings;
85
        return $this;
86
    }
87
88
    public function getClient(): Client
89
    {
90
        if (!$this->client) {
91
            $client = ClientBuilder::create();
92
            $client->setHosts($this->indexSettings->getHosts());
0 ignored issues
show
Bug introduced by
The method getHosts cannot be called on $this->indexSettings (of type array).

Methods can only be called on objects. This check looks for methods being called on variables that have been inferred to never be objects.

Loading history...
93
            $this->tracer && $client->setTracer($this->tracer);
94
//            $client->setLogger()
95
96
            $this->eventDispatcher->dispatch(
97
                Events::POST_CLIENT_CREATE,
98
                new PostCreateClientEvent($this->namespace, $client)
99
            );
100
            $this->client = $client->build();
101
        }
102
        return $this->client;
103
    }
104
105
    public function getIndexName(): string
106
    {
107
        return $this->indexSettings->getIndexName();
0 ignored issues
show
Bug introduced by
The method getIndexName cannot be called on $this->indexSettings (of type array).

Methods can only be called on objects. This check looks for methods being called on variables that have been inferred to never be objects.

Loading history...
108
    }
109
110
    public function getEventDispatcher(): EventDispatcherInterface
111
    {
112
        return $this->eventDispatcher;
113
    }
114
115
    public function getConverter(): Converter
116
    {
117
        return $this->converter;
118
    }
119
120
    public function getBulkCommitSize(): int
121
    {
122
        return $this->bulkCommitSize;
123
    }
124
125
    public function setBulkCommitSize(int $bulkCommitSize)
126
    {
127
        $this->bulkCommitSize = $bulkCommitSize;
128
        return $this;
129
    }
130
131
    public function getStopwatch()
132
    {
133
        return $this->stopwatch;
134
    }
135
136
    public function setStopwatch($stopwatch)
137
    {
138
        $this->stopwatch = $stopwatch;
139
        return $this;
140
    }
141
142
    public function createIndex($noMapping = false, $params = []): array
143
    {
144
        $params = array_merge([
145
            'index' => $this->getIndexName(),
146
            'body' => $noMapping ? [] : $this->indexSettings->getIndexMetadata(),
0 ignored issues
show
Bug introduced by
The method getIndexMetadata cannot be called on $this->indexSettings (of type array).

Methods can only be called on objects. This check looks for methods being called on variables that have been inferred to never be objects.

Loading history...
147
        ], $params);
148
149
        #TODO Add event here.
150
151
        return $this->getClient()->indices()->create(array_filter($params));
152
    }
153
154
    public function dropIndex(): array
155
    {
156
        $indexName = $this->getIndexName();
157
158
        if ($this->getClient()->indices()->existsAlias(['name' => $this->getIndexName()])) {
159
            $aliases = $this->getClient()->indices()->getAlias(['name' => $this->getIndexName()]);
160
            $indexName = array_keys($aliases);
161
        }
162
163
        return $this->getClient()->indices()->delete(['index' => $indexName]);
164
    }
165
166
    public function dropAndCreateIndex($noMapping = false, $params = []): array
167
    {
168
        try {
169
            if ($this->indexExists()) {
170
                $this->dropIndex();
171
            }
172
        } catch (\Exception $e) {
173
            // Do nothing, our target is to create the new index.
174
        }
175
176
        return $this->createIndex($noMapping, $params);
177
    }
178
179
    public function indexExists(): bool
180
    {
181
        return $this->getClient()->indices()->exists(['index' => $this->getIndexName()]);
182
    }
183
184
    public function clearCache(): array
185
    {
186
        return $this->getClient()->indices()->clearCache(['index' => $this->getIndexName()]);
187
    }
188
189
    /**
190
     * Returns a single document by provided ID or null if a document was not found.
191
     */
192
    public function find($id, $params = [])
193
    {
194
        $requestParams = [
195
            'index' => $this->getIndexName(),
196
            'type' => $this->getTypeName(),
0 ignored issues
show
Deprecated Code introduced by
The method ONGR\ElasticsearchBundle...xService::getTypeName() has been deprecated with message: will be removed in v7 since there will be no more types in the indexes.

This method has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the method will be removed from the class and what other method or class to use instead.

Loading history...
197
            'id' => $id,
198
        ];
199
200
        $requestParams = array_merge($requestParams, $params);
201
202
        $result = $this->getClient()->get($requestParams);
203
204
        if (!$result['found']) {
205
            return null;
206
        }
207
208
        $result['_source']['_id'] = $result['_id'];
209
210
        return $this->converter->convertArrayToDocument($this->namespace, $result['_source'], $this->serializer);
211
    }
212
213
    public function findByIds(array $ids): DocumentIterator
214
    {
215
        $search = $this->createSearch();
216
        $search->addQuery(new IdsQuery($ids));
217
218
        return $this->findDocuments($search);
219
    }
220
221
    /**
222
     * Finds documents by a set of criteria.
223
     *
224
     * @param array      $criteria   Example: ['group' => ['best', 'worst'], 'job' => 'medic'].
225
     * @param array|null $orderBy    Example: ['name' => 'ASC', 'surname' => 'DESC'].
226
     * @param int|null   $limit      Default is 10.
227
     * @param int|null   $offset     Default is 0.
228
     *
229
     * @return array|DocumentIterator The objects.
230
     */
231
    public function findBy(
232
        array $criteria,
233
        array $orderBy = [],
234
        int $limit = 10,
235
        int $offset = 0
236
    ) {
237
        $search = $this->createSearch();
238
        $search->setSize($limit);
239
        $search->setFrom($offset);
240
241
        foreach ($criteria as $field => $value) {
242
            $search->addQuery(new TermQuery($field, $value));
243
        }
244
245
        foreach ($orderBy as $field => $direction) {
246
            $search->addSort(new FieldSort($field, $direction));
247
        }
248
249
        return $this->findDocuments($search);
250
    }
251
252
    /**
253
     * Finds a single document by a set of criteria.
254
     *
255
     * @param array      $criteria   Example: ['group' => ['best', 'worst'], 'job' => 'medic'].
256
     * @param array|null $orderBy    Example: ['name' => 'ASC', 'surname' => 'DESC'].
257
     *
258
     * @return object|null The object.
259
     */
260
    public function findOneBy(array $criteria, array $orderBy = [])
261
    {
262
        return $this->findBy($criteria, $orderBy, 1)->current();
263
    }
264
265
    public function createSearch(): Search
266
    {
267
        return new Search();
268
    }
269
270
    public function getScrollConfiguration($raw, $scrollDuration): array
271
    {
272
        $scrollConfig = [];
273
        if (isset($raw['_scroll_id'])) {
274
            $scrollConfig['_scroll_id'] = $raw['_scroll_id'];
275
            $scrollConfig['duration'] = $scrollDuration;
276
        }
277
278
        return $scrollConfig;
279
    }
280
281 View Code Duplication
    public function findDocuments(Search $search): DocumentIterator
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
282
    {
283
        $results = $this->executeSearch($search);
284
285
        return new DocumentIterator(
286
            $results,
287
            $this,
288
            $this->converter,
289
            $this->serializer,
290
            $this->getScrollConfiguration($results, $search->getScroll())
291
        );
292
    }
293
294 View Code Duplication
    public function findArray(Search $search): ArrayIterator
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
295
    {
296
        $results = $this->executeSearch($search);
297
298
        return new ArrayIterator(
299
            $results,
300
            $this,
301
            $this->converter,
302
            $this->serializer,
303
            $this->getScrollConfiguration($results, $search->getScroll())
304
        );
305
    }
306
307 View Code Duplication
    public function findRaw(Search $search): RawIterator
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
308
    {
309
        $results = $this->executeSearch($search);
310
311
        return new RawIterator(
312
            $results,
313
            $this,
314
            $this->converter,
315
            $this->serializer,
316
            $this->getScrollConfiguration($results, $search->getScroll())
317
        );
318
    }
319
320
    private function executeSearch(Search $search): array
321
    {
322
        return $this->search($search->toArray(), $search->getUriParams());
323
    }
324
325
    public function getIndexDocumentCount(): int
326
    {
327
        $body = [
328
            'index' => $this->getIndexName(),
329
            'type' => $this->getTypeName(),
0 ignored issues
show
Deprecated Code introduced by
The method ONGR\ElasticsearchBundle...xService::getTypeName() has been deprecated with message: will be removed in v7 since there will be no more types in the indexes.

This method has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the method will be removed from the class and what other method or class to use instead.

Loading history...
330
            'body' => [],
331
        ];
332
333
        $results = $this->getClient()->count($body);
334
335
        return $results['count'];
336
    }
337
338 View Code Duplication
    public function remove($id, $routing = null)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
339
    {
340
        $params = [
341
            'index' => $this->getIndexName(),
342
            'type' => $this->getTypeName(),
0 ignored issues
show
Deprecated Code introduced by
The method ONGR\ElasticsearchBundle...xService::getTypeName() has been deprecated with message: will be removed in v7 since there will be no more types in the indexes.

This method has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the method will be removed from the class and what other method or class to use instead.

Loading history...
343
            'id' => $id,
344
        ];
345
346
        if ($routing) {
347
            $params['routing'] = $routing;
348
        }
349
350
        $response = $this->getClient()->delete($params);
351
352
        return $response;
353
    }
354
355
    public function update($id, array $fields = [], $script = null, array $params = []): array
356
    {
357
        $body = array_filter(
358
            [
359
                'doc' => $fields,
360
                'script' => $script,
361
            ]
362
        );
363
364
        $params = array_merge(
365
            [
366
                'id' => $id,
367
                'index' => $this->getIndexName(),
368
                'type' => $this->getTypeName(),
0 ignored issues
show
Deprecated Code introduced by
The method ONGR\ElasticsearchBundle...xService::getTypeName() has been deprecated with message: will be removed in v7 since there will be no more types in the indexes.

This method has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the method will be removed from the class and what other method or class to use instead.

Loading history...
369
                'body' => $body,
370
            ],
371
            $params
372
        );
373
374
        return $this->getClient()->update($params);
375
    }
376
377 View Code Duplication
    public function search(array $query, array $params = []): array
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
378
    {
379
        $requestParams = [
380
            'index' => $this->getIndexName(),
381
            'type' => $this->getTypeName(),
0 ignored issues
show
Deprecated Code introduced by
The method ONGR\ElasticsearchBundle...xService::getTypeName() has been deprecated with message: will be removed in v7 since there will be no more types in the indexes.

This method has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the method will be removed from the class and what other method or class to use instead.

Loading history...
382
            'body' => $query,
383
        ];
384
385
386
        if (!empty($params)) {
387
            $requestParams = array_merge($requestParams, $params);
388
        }
389
390
//        $this->stopwatch('start', 'search');
391
        $result = $this->getClient()->search($requestParams);
392
//        $this->stopwatch('stop', 'search');
393
394
        return $result;
395
    }
396
397
    /**
398
     * Usage example
399
     *
400
     * $im->bulk('index', ['_id' => 1, 'title' => 'foo']);
401
     * $im->bulk('delete', ['_id' => 2]);
402
     * $im->bulk('create', ['title' => 'foo']);
403
     */
404
    public function bulk(string $operation, array $data = [], $autoCommit = true): array
405
    {
406
        $bulkParams = [
407
            '_index' => $this->getIndexName(),
408
            '_type' => $this->getTypeName(),
0 ignored issues
show
Deprecated Code introduced by
The method ONGR\ElasticsearchBundle...xService::getTypeName() has been deprecated with message: will be removed in v7 since there will be no more types in the indexes.

This method has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the method will be removed from the class and what other method or class to use instead.

Loading history...
409
            '_id' => $data['_id'] ?? null,
410
        ];
411
412
        unset($data['_index'], $data['_type'], $data['_id']);
413
414
        $this->eventDispatcher->dispatch(
415
            Events::BULK,
416
            new BulkEvent($operation, $bulkParams, $data)
417
        );
418
419
        $this->bulkQueries[] = [ $operation => $bulkParams];
420
421
        if (!empty($data)) {
422
            $this->bulkQueries[] = $data;
423
        }
424
425
        $response = [];
426
427
        // %X is not very accurate, but might be better than use counter. This place is experimental for now.
428
        if ($autoCommit && $this->getBulkCommitSize() <= count($this->bulkQueries) % $this->getBulkCommitSize() / 2) {
429
            $response = $this->commit();
430
        }
431
432
        return $response;
433
    }
434
435
    /**
436
     * Adds document to next flush.
437
     *
438
     * @param object $document
439
     */
440
    public function persist($document): void
441
    {
442
        $documentArray = array_filter($this->converter->convertDocumentToArray($document, $this->serializer));
443
444
        $this->bulk('index', $documentArray);
445
    }
446
447
    public function commit($commitMode = 'flush', array $params = []): array
448
    {
449
        $bulkResponse = [];
450
        if (!empty($this->bulkQueries)) {
451
            $this->eventDispatcher->dispatch(
452
                Events::PRE_COMMIT,
453
                new CommitEvent($commitMode, $this->bulkQueries, [])
454
            );
455
456
//            $this->stopwatch('start', 'bulk');
457
            $bulkResponse = $this->client->bulk(
458
                array_merge(
459
                    [
460
                    'index' => $this->getIndexName(),
461
                    'body' => $this->bulkQueries,
462
                    ],
463
                    $params
464
                )
465
            );
466
//            $this->stopwatch('stop', 'bulk');
467
468
            if ($bulkResponse['errors']) {
469
                throw new BulkWithErrorsException(
470
                    json_encode($bulkResponse),
471
                    0,
472
                    null,
473
                    $bulkResponse
474
                );
475
            }
476
477
//            $this->stopwatch('start', 'refresh');
478
            switch ($commitMode) {
479
                case 'flush':
480
                    $this->getClient()->indices()->flush();
481
                    break;
482
                case 'flush_synced':
483
                    $this->getClient()->indices()->flushSynced();
484
                    break;
485
                case 'refresh':
486
                    $this->getClient()->indices()->refresh();
487
                    break;
488
            }
489
490
            $this->eventDispatcher->dispatch(
491
                Events::POST_COMMIT,
492
                new CommitEvent($commitMode, $this->bulkQueries, $bulkResponse)
493
            );
494
495
            $this->bulkQueries = [];
496
497
//            $this->stopwatch('stop', $this->getCommitMode());
498
        }
499
500
        return $bulkResponse;
501
    }
502
503
    public function flush(array $params = []): array
504
    {
505
        return $this->client->indices()->flush(array_merge(['index' => $this->getIndexName()], $params));
506
    }
507
508
    public function refresh(array $params = []): array
509
    {
510
        return $this->client->indices()->refresh(array_merge(['index' => $this->getIndexName()], $params));
511
    }
512
513
    public function scroll($scrollId, $scrollDuration = '5m'): array
514
    {
515
        $results = $this->getClient()->scroll(['scroll_id' => $scrollId, 'scroll' => $scrollDuration]);
516
517
        return $results;
518
    }
519
520
    public function clearScroll($scrollId): array
521
    {
522
        return $this->getClient()->clearScroll(['scroll_id' => $scrollId]);
523
    }
524
525
    public function resetClient(): void
526
    {
527
        $this->client = null;
528
    }
529
530
    public function clearElasticIndexCache(): array
531
    {
532
        return $this->getClient()->indices()->clearCache(['index' => $this->getIndexName()]);
533
    }
534
535
    private function stopwatch($action, $name): void
536
    {
537
        if ($this->stopwatch && ($action == 'start' || $action == 'stop')) {
538
            $this->stopwatch->$action('ongr_es: '.$name, 'ongr_es');
539
        }
540
    }
541
}
542