Completed
Push — 6.0-dev ( 1ef812...ceea66 )
by Simonas
01:30
created

IndexService::getNamespace()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 4
rs 10
c 0
b 0
f 0
cc 1
nc 1
nop 0
1
<?php
2
3
/*
4
 * This file is part of the ONGR package.
5
 *
6
 * (c) NFQ Technologies UAB <[email protected]>
7
 *
8
 * For the full copyright and license information, please view the LICENSE
9
 * file that was distributed with this source code.
10
 */
11
12
namespace ONGR\ElasticsearchBundle\Service;
13
14
use Elasticsearch\Client;
15
use Elasticsearch\ClientBuilder;
16
use ONGR\ElasticsearchBundle\Event\BulkEvent;
17
use ONGR\ElasticsearchBundle\Event\CommitEvent;
18
use ONGR\ElasticsearchBundle\Event\Events;
19
use ONGR\ElasticsearchBundle\Event\PostCreateClientEvent;
20
use ONGR\ElasticsearchBundle\Exception\BulkWithErrorsException;
21
use ONGR\ElasticsearchBundle\Mapping\Converter;
22
use ONGR\ElasticsearchBundle\Mapping\DocumentParser;
23
use ONGR\ElasticsearchBundle\Result\ArrayIterator;
24
use ONGR\ElasticsearchBundle\Result\RawIterator;
25
use ONGR\ElasticsearchDSL\Query\TermLevel\IdsQuery;
26
use ONGR\ElasticsearchDSL\Query\TermLevel\TermQuery;
27
use ONGR\ElasticsearchDSL\Search;
28
use ONGR\ElasticsearchDSL\Sort\FieldSort;
29
use ONGR\ElasticsearchBundle\Result\DocumentIterator;
30
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
31
use Symfony\Component\Serializer\Serializer;
32
33
class IndexService
34
{
35
    private $client;
36
    private $namespace;
37
    private $converter;
38
    private $parser;
39
    private $eventDispatcher;
40
41
    private $stopwatch;
42
    private $bulkCommitSize = 100;
43
    private $bulkQueries = [];
44
    private $serializer;
45
    private $tracer;
46
47
    public function __construct(
48
        string $namespace,
49
        Converter $converter,
50
        DocumentParser $parser,
51
        EventDispatcherInterface $eventDispatcher,
52
        Serializer $serializer,
53
        $tracer = null
54
    ) {
55
        $this->namespace = $namespace;
56
        $this->converter = $converter;
57
        $this->parser = $parser;
58
        $this->eventDispatcher = $eventDispatcher;
59
        $this->serializer = $serializer;
60
        $this->tracer = $tracer;
61
    }
62
63
    public function getNamespace(): string
64
    {
65
        return $this->namespace;
66
    }
67
68
    /**
69
     * @deprecated will be removed in v7 since there will be no more types in the indexes.
70
     */
71
    public function getTypeName(): string
72
    {
73
        return $this->parser->getTypeName($this->namespace);
0 ignored issues
show
Deprecated Code introduced by
The method ONGR\ElasticsearchBundle...ntParser::getTypeName() has been deprecated with message: will be deleted in v7. Types are deleted from elasticsearch.

This method has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the method will be removed from the class and what other method or class to use instead.

Loading history...
74
    }
75
76
    public function getClient(): Client
77
    {
78
        if (!$this->client) {
79
            $document = $this->parser->getParsedDocument($this->namespace);
80
            $client = ClientBuilder::create();
81
            $client->setHosts($document->hosts);
82
            $this->tracer && $client->setTracer($this->tracer);
83
//            $client->setLogger()
84
85
            $this->eventDispatcher->dispatch(
86
                Events::POST_CLIENT_CREATE,
87
                new PostCreateClientEvent($this->namespace, $client)
88
            );
89
            $this->client = $client->build();
90
        }
91
        return $this->client;
92
    }
93
94
    public function getIndexName(): string
95
    {
96
        return $this->parser->getIndexAliasName($this->namespace);
97
    }
98
99
    public function getEventDispatcher(): EventDispatcherInterface
100
    {
101
        return $this->eventDispatcher;
102
    }
103
104
    public function getConverter(): Converter
105
    {
106
        return $this->converter;
107
    }
108
109
    public function getParser(): DocumentParser
110
    {
111
        return $this->parser;
112
    }
113
114
    public function getBulkCommitSize(): int
115
    {
116
        return $this->bulkCommitSize;
117
    }
118
119
    public function setBulkCommitSize(int $bulkCommitSize)
120
    {
121
        $this->bulkCommitSize = $bulkCommitSize;
122
        return $this;
123
    }
124
125
    public function getStopwatch()
126
    {
127
        return $this->stopwatch;
128
    }
129
130
    public function setStopwatch($stopwatch)
131
    {
132
        $this->stopwatch = $stopwatch;
133
        return $this;
134
    }
135
136
    public function createIndex($noMapping = false, $params = []): array
137
    {
138
        $params = array_merge([
139
            'index' => $this->getIndexName(),
140
            'body' => $noMapping ? [] : $this->parser->getIndexMetadata($this->namespace),
141
        ], $params);
142
143
        #TODO Add event here.
144
145
        return $this->getClient()->indices()->create(array_filter($params));
146
    }
147
148
    public function dropIndex(): array
149
    {
150
        $indexName = $this->getIndexName();
151
152
        if ($this->getClient()->indices()->existsAlias(['name' => $this->getIndexName()])) {
153
            $aliases = $this->getClient()->indices()->getAlias(['name' => $this->getIndexName()]);
154
            $indexName = array_keys($aliases);
155
        }
156
157
        return $this->getClient()->indices()->delete(['index' => $indexName]);
158
    }
159
160
    public function dropAndCreateIndex($noMapping = false, $params = []): array
161
    {
162
        try {
163
            if ($this->indexExists()) {
164
                $this->dropIndex();
165
            }
166
        } catch (\Exception $e) {
167
            // Do nothing, our target is to create the new index.
168
        }
169
170
        return $this->createIndex($noMapping, $params);
171
    }
172
173
    public function indexExists(): bool
174
    {
175
        return $this->getClient()->indices()->exists(['index' => $this->getIndexName()]);
176
    }
177
178
    public function clearCache(): array
179
    {
180
        return $this->getClient()->indices()->clearCache(['index' => $this->getIndexName()]);
181
    }
182
183
    /**
184
     * Returns a single document by provided ID or null if a document was not found.
185
     */
186
    public function find($id, $params = [])
187
    {
188
        $requestParams = [
189
            'index' => $this->getIndexName(),
190
            'type' => $this->getTypeName(),
0 ignored issues
show
Deprecated Code introduced by
The method ONGR\ElasticsearchBundle...xService::getTypeName() has been deprecated with message: will be removed in v7 since there will be no more types in the indexes.

This method has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the method will be removed from the class and what other method or class to use instead.

Loading history...
191
            'id' => $id,
192
        ];
193
194
        $requestParams = array_merge($requestParams, $params);
195
196
        $result = $this->getClient()->get($requestParams);
197
198
        if (!$result['found']) {
199
            return null;
200
        }
201
202
        $result['_source']['_id'] = $result['_id'];
203
204
        return $this->converter->convertArrayToDocument($this->namespace, $result['_source'], $this->serializer);
205
    }
206
207
    public function findByIds(array $ids): DocumentIterator
208
    {
209
        $search = $this->createSearch();
210
        $search->addQuery(new IdsQuery($ids));
211
212
        return $this->findDocuments($search);
213
    }
214
215
    /**
216
     * Finds documents by a set of criteria.
217
     *
218
     * @param array      $criteria   Example: ['group' => ['best', 'worst'], 'job' => 'medic'].
219
     * @param array|null $orderBy    Example: ['name' => 'ASC', 'surname' => 'DESC'].
220
     * @param int|null   $limit      Default is 10.
221
     * @param int|null   $offset     Default is 0.
222
     *
223
     * @return array|DocumentIterator The objects.
224
     */
225
    public function findBy(
226
        array $criteria,
227
        array $orderBy = [],
228
        int $limit = 10,
229
        int $offset = 0
230
    ) {
231
        $search = $this->createSearch();
232
        $search->setSize($limit);
233
        $search->setFrom($offset);
234
235
        foreach ($criteria as $field => $value) {
236
            $search->addQuery(new TermQuery($field, $value));
237
        }
238
239
        foreach ($orderBy as $field => $direction) {
240
            $search->addSort(new FieldSort($field, $direction));
241
        }
242
243
        return $this->findDocuments($search);
244
    }
245
246
    /**
247
     * Finds a single document by a set of criteria.
248
     *
249
     * @param array      $criteria   Example: ['group' => ['best', 'worst'], 'job' => 'medic'].
250
     * @param array|null $orderBy    Example: ['name' => 'ASC', 'surname' => 'DESC'].
251
     *
252
     * @return object|null The object.
253
     */
254
    public function findOneBy(array $criteria, array $orderBy = [])
255
    {
256
        return $this->findBy($criteria, $orderBy, 1)->current();
257
    }
258
259
    public function createSearch(): Search
260
    {
261
        return new Search();
262
    }
263
264
    public function getScrollConfiguration($raw, $scrollDuration): array
265
    {
266
        $scrollConfig = [];
267
        if (isset($raw['_scroll_id'])) {
268
            $scrollConfig['_scroll_id'] = $raw['_scroll_id'];
269
            $scrollConfig['duration'] = $scrollDuration;
270
        }
271
272
        return $scrollConfig;
273
    }
274
275 View Code Duplication
    public function findDocuments(Search $search): DocumentIterator
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
276
    {
277
        $results = $this->executeSearch($search);
278
279
        return new DocumentIterator(
280
            $results,
281
            $this,
282
            $this->converter,
283
            $this->serializer,
284
            $this->getScrollConfiguration($results, $search->getScroll())
285
        );
286
    }
287
288 View Code Duplication
    public function findArray(Search $search): ArrayIterator
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
289
    {
290
        $results = $this->executeSearch($search);
291
292
        return new ArrayIterator(
293
            $results,
294
            $this,
295
            $this->converter,
296
            $this->serializer,
297
            $this->getScrollConfiguration($results, $search->getScroll())
298
        );
299
    }
300
301 View Code Duplication
    public function findRaw(Search $search): RawIterator
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
302
    {
303
        $results = $this->executeSearch($search);
304
305
        return new RawIterator(
306
            $results,
307
            $this,
308
            $this->converter,
309
            $this->serializer,
310
            $this->getScrollConfiguration($results, $search->getScroll())
311
        );
312
    }
313
314
    private function executeSearch(Search $search): array
315
    {
316
        return $this->search($search->toArray(), $search->getUriParams());
317
    }
318
319
    public function getIndexDocumentCount(): int
320
    {
321
        $body = [
322
            'index' => $this->getIndexName(),
323
            'type' => $this->getTypeName(),
0 ignored issues
show
Deprecated Code introduced by
The method ONGR\ElasticsearchBundle...xService::getTypeName() has been deprecated with message: will be removed in v7 since there will be no more types in the indexes.

This method has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the method will be removed from the class and what other method or class to use instead.

Loading history...
324
            'body' => [],
325
        ];
326
327
        $results = $this->getClient()->count($body);
328
329
        return $results['count'];
330
    }
331
332 View Code Duplication
    public function remove($id, $routing = null)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
333
    {
334
        $params = [
335
            'index' => $this->getIndexName(),
336
            'type' => $this->getTypeName(),
0 ignored issues
show
Deprecated Code introduced by
The method ONGR\ElasticsearchBundle...xService::getTypeName() has been deprecated with message: will be removed in v7 since there will be no more types in the indexes.

This method has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the method will be removed from the class and what other method or class to use instead.

Loading history...
337
            'id' => $id,
338
        ];
339
340
        if ($routing) {
341
            $params['routing'] = $routing;
342
        }
343
344
        $response = $this->getClient()->delete($params);
345
346
        return $response;
347
    }
348
349
    public function update($id, array $fields = [], $script = null, array $params = []): array
350
    {
351
        $body = array_filter(
352
            [
353
                'doc' => $fields,
354
                'script' => $script,
355
            ]
356
        );
357
358
        $params = array_merge(
359
            [
360
                'id' => $id,
361
                'index' => $this->getIndexName(),
362
                'type' => $this->getTypeName(),
0 ignored issues
show
Deprecated Code introduced by
The method ONGR\ElasticsearchBundle...xService::getTypeName() has been deprecated with message: will be removed in v7 since there will be no more types in the indexes.

This method has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the method will be removed from the class and what other method or class to use instead.

Loading history...
363
                'body' => $body,
364
            ],
365
            $params
366
        );
367
368
        return $this->getClient()->update($params);
369
    }
370
371 View Code Duplication
    public function search(array $query, array $params = []): array
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
372
    {
373
        $requestParams = [
374
            'index' => $this->getIndexName(),
375
            'type' => $this->getTypeName(),
0 ignored issues
show
Deprecated Code introduced by
The method ONGR\ElasticsearchBundle...xService::getTypeName() has been deprecated with message: will be removed in v7 since there will be no more types in the indexes.

This method has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the method will be removed from the class and what other method or class to use instead.

Loading history...
376
            'body' => $query,
377
        ];
378
379
380
        if (!empty($params)) {
381
            $requestParams = array_merge($requestParams, $params);
382
        }
383
384
//        $this->stopwatch('start', 'search');
385
        $result = $this->getClient()->search($requestParams);
386
//        $this->stopwatch('stop', 'search');
387
388
        return $result;
389
    }
390
391
    /**
392
     * Usage example
393
     *
394
     * $im->bulk('index', ['_id' => 1, 'title' => 'foo']);
395
     * $im->bulk('delete', ['_id' => 2]);
396
     * $im->bulk('create', ['title' => 'foo']);
397
     */
398
    public function bulk(string $operation, array $data = [], $autoCommit = true): array
399
    {
400
        $bulkParams = [
401
            '_index' => $this->getIndexName(),
402
            '_type' => $this->getTypeName(),
0 ignored issues
show
Deprecated Code introduced by
The method ONGR\ElasticsearchBundle...xService::getTypeName() has been deprecated with message: will be removed in v7 since there will be no more types in the indexes.

This method has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the method will be removed from the class and what other method or class to use instead.

Loading history...
403
            '_id' => $data['_id'] ?? null,
404
        ];
405
406
        unset($data['_index'], $data['_type'], $data['_id']);
407
408
        $this->eventDispatcher->dispatch(
409
            Events::BULK,
410
            new BulkEvent($operation, $bulkParams, $data)
411
        );
412
413
        $this->bulkQueries[] = [ $operation => $bulkParams];
414
415
        if (!empty($data)) {
416
            $this->bulkQueries[] = $data;
417
        }
418
419
        $response = [];
420
421
        // %X is not very accurate, but better than use counter. This place is experimental for now.
422
        if ($autoCommit && $this->getBulkCommitSize() <= count($this->bulkQueries) % $this->getBulkCommitSize() / 2) {
423
            $response = $this->commit();
424
        }
425
426
        return $response;
427
    }
428
429
    /**
430
     * Adds document to next flush.
431
     *
432
     * @param object $document
433
     */
434
    public function persist($document): void
435
    {
436
        $documentArray = array_filter($this->converter->convertDocumentToArray($document, $this->serializer));
437
438
        $this->bulk('index', $documentArray);
439
    }
440
441
    public function commit($commitMode = 'flush', array $params = []): array
442
    {
443
        $bulkResponse = [];
444
        if (!empty($this->bulkQueries)) {
445
            $this->eventDispatcher->dispatch(
446
                Events::PRE_COMMIT,
447
                new CommitEvent($commitMode, $this->bulkQueries, [])
448
            );
449
450
//            $this->stopwatch('start', 'bulk');
451
            $bulkResponse = $this->client->bulk(
452
                array_merge(
453
                    [
454
                    'index' => $this->getIndexName(),
455
                    'body' => $this->bulkQueries,
456
                    ],
457
                    $params
458
                )
459
            );
460
//            $this->stopwatch('stop', 'bulk');
461
462
            if ($bulkResponse['errors']) {
463
                throw new BulkWithErrorsException(
464
                    json_encode($bulkResponse),
465
                    0,
466
                    null,
467
                    $bulkResponse
468
                );
469
            }
470
471
//            $this->stopwatch('start', 'refresh');
472
            switch ($commitMode) {
473
                case 'flush':
474
                    $this->getClient()->indices()->flush();
475
                    break;
476
                case 'flush_synced':
477
                    $this->getClient()->indices()->flushSynced();
478
                    break;
479
                case 'refresh':
480
                    $this->getClient()->indices()->refresh();
481
                    break;
482
            }
483
484
            $this->eventDispatcher->dispatch(
485
                Events::POST_COMMIT,
486
                new CommitEvent($commitMode, $this->bulkQueries, $bulkResponse)
487
            );
488
489
            $this->bulkQueries = [];
490
491
//            $this->stopwatch('stop', $this->getCommitMode());
492
        }
493
494
        return $bulkResponse;
495
    }
496
497
    public function flush(array $params = []): array
498
    {
499
        return $this->client->indices()->flush(array_merge(['index' => $this->getIndexName()], $params));
500
    }
501
502
    public function refresh(array $params = []): array
503
    {
504
        return $this->client->indices()->refresh(array_merge(['index' => $this->getIndexName()], $params));
505
    }
506
507
    public function scroll($scrollId, $scrollDuration = '5m'): array
508
    {
509
        $results = $this->getClient()->scroll(['scroll_id' => $scrollId, 'scroll' => $scrollDuration]);
510
511
        return $results;
512
    }
513
514
    public function clearScroll($scrollId): array
515
    {
516
        return $this->getClient()->clearScroll(['scroll_id' => $scrollId]);
517
    }
518
519
    public function resetClient(): void
520
    {
521
        $this->client = null;
522
    }
523
524
    public function clearElasticIndexCache(): array
525
    {
526
        return $this->getClient()->indices()->clearCache(['index' => $this->getIndexName()]);
527
    }
528
529
    private function stopwatch($action, $name): void
530
    {
531
        if ($this->stopwatch && ($action == 'start' || $action == 'stop')) {
532
            $this->stopwatch->$action('ongr_es: '.$name, 'ongr_es');
533
        }
534
    }
535
}
536