Completed
Push — master ( 6efea1...0e768a )
by Simonas
04:33 queued 10s
created

Manager::msearch()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 10
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 10
rs 9.4285
c 0
b 0
f 0
cc 1
eloc 5
nc 1
nop 1
1
<?php
2
3
/*
4
 * This file is part of the ONGR package.
5
 *
6
 * (c) NFQ Technologies UAB <[email protected]>
7
 *
8
 * For the full copyright and license information, please view the LICENSE
9
 * file that was distributed with this source code.
10
 */
11
12
namespace ONGR\ElasticsearchBundle\Service;
13
14
use Elasticsearch\Client;
15
use Elasticsearch\Common\Exceptions\Missing404Exception;
16
use ONGR\ElasticsearchBundle\Event\Events;
17
use ONGR\ElasticsearchBundle\Event\BulkEvent;
18
use ONGR\ElasticsearchBundle\Event\CommitEvent;
19
use ONGR\ElasticsearchBundle\Exception\BulkWithErrorsException;
20
use ONGR\ElasticsearchBundle\Mapping\MetadataCollector;
21
use ONGR\ElasticsearchBundle\Result\Converter;
22
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
23
use Symfony\Component\Stopwatch\Stopwatch;
24
25
/**
26
 * Manager class.
27
 */
28
class Manager
29
{
30
    /**
31
     * @var string Manager name
32
     */
33
    private $name;
34
35
    /**
36
     * @var array Manager configuration
37
     */
38
    private $config = [];
39
40
    /**
41
     * @var Client
42
     */
43
    private $client;
44
45
    /**
46
     * @var Converter
47
     */
48
    private $converter;
49
50
    /**
51
     * @var array Container for bulk queries
52
     */
53
    private $bulkQueries = [];
54
55
    /**
56
     * @var array Holder for consistency, refresh and replication parameters
57
     */
58
    private $bulkParams = [];
59
60
    /**
61
     * @var array
62
     */
63
    private $indexSettings;
64
65
    /**
66
     * @var MetadataCollector
67
     */
68
    private $metadataCollector;
69
70
    /**
71
     * After commit to make data available the refresh or flush operation is needed
72
     * so one of those methods has to be defined, the default is refresh.
73
     *
74
     * @var string
75
     */
76
    private $commitMode = 'refresh';
77
78
    /**
79
     * The size that defines after how much document inserts call commit function.
80
     *
81
     * @var int
82
     */
83
    private $bulkCommitSize = 100;
84
85
    /**
86
     * Container to count how many documents was passed to the bulk query.
87
     *
88
     * @var int
89
     */
90
    private $bulkCount = 0;
91
92
    /**
93
     * @var Repository[] Repository local cache
94
     */
95
    private $repositories;
96
97
    /**
98
     * @var EventDispatcherInterface
99
     */
100
    private $eventDispatcher;
101
102
    /**
103
     * @var Stopwatch
104
     */
105
    private $stopwatch;
106
107
    /**
108
     * @param string            $name              Manager name
109
     * @param array             $config            Manager configuration
110
     * @param Client            $client
111
     * @param array             $indexSettings
112
     * @param MetadataCollector $metadataCollector
113
     * @param Converter         $converter
114
     */
115
    public function __construct(
116
        $name,
117
        array $config,
118
        $client,
119
        array $indexSettings,
120
        $metadataCollector,
121
        $converter
122
    ) {
123
        $this->name = $name;
124
        $this->config = $config;
125
        $this->client = $client;
126
        $this->indexSettings = $indexSettings;
127
        $this->metadataCollector = $metadataCollector;
128
        $this->converter = $converter;
129
    }
130
131
    /**
132
     * Returns Elasticsearch connection.
133
     *
134
     * @return Client
135
     */
136
    public function getClient()
137
    {
138
        return $this->client;
139
    }
140
141
    /**
142
     * @return string
143
     */
144
    public function getName()
145
    {
146
        return $this->name;
147
    }
148
149
    /**
150
     * @return array
151
     */
152
    public function getConfig()
153
    {
154
        return $this->config;
155
    }
156
157
    /**
158
     * @param EventDispatcherInterface $eventDispatcher
159
     */
160
    public function setEventDispatcher(EventDispatcherInterface $eventDispatcher)
161
    {
162
        $this->eventDispatcher = $eventDispatcher;
163
    }
164
165
    /**
166
     * @param Stopwatch $stopwatch
167
     */
168
    public function setStopwatch(Stopwatch $stopwatch)
169
    {
170
        $this->stopwatch = $stopwatch;
171
    }
172
173
    /**
174
     * Returns repository by document class.
175
     *
176
     * @param string $className FQCN or string in Bundle:Document format
177
     *
178
     * @return Repository
179
     */
180
    public function getRepository($className)
181
    {
182
        if (!is_string($className)) {
183
            throw new \InvalidArgumentException('Document class must be a string.');
184
        }
185
186
        $directory = null;
187
188
        if (strpos($className, ':')) {
189
            $bundle = explode(':', $className)[0];
190
191
            if (isset($this->config['mappings'][$bundle]['document_dir'])) {
192
                $directory = $this->config['mappings'][$bundle]['document_dir'];
193
            }
194
        }
195
196
        $namespace = $this->getMetadataCollector()->getClassName($className, $directory);
197
198
        if (isset($this->repositories[$namespace])) {
199
            return $this->repositories[$namespace];
200
        }
201
202
        $repository = $this->createRepository($namespace);
203
        $this->repositories[$namespace] = $repository;
204
205
        return $repository;
206
    }
207
208
    /**
209
     * @return MetadataCollector
210
     */
211
    public function getMetadataCollector()
212
    {
213
        return $this->metadataCollector;
214
    }
215
216
    /**
217
     * @return Converter
218
     */
219
    public function getConverter()
220
    {
221
        return $this->converter;
222
    }
223
224
    /**
225
     * @return string
226
     */
227
    public function getCommitMode()
228
    {
229
        return $this->commitMode;
230
    }
231
232
    /**
233
     * @param string $commitMode
234
     */
235
    public function setCommitMode($commitMode)
236
    {
237
        if ($commitMode === 'refresh' || $commitMode === 'flush' || $commitMode === 'none') {
238
            $this->commitMode = $commitMode;
239
        } else {
240
            throw new \LogicException('The commit method must be either refresh, flush or none.');
241
        }
242
    }
243
244
    /**
245
     * @return int
246
     */
247
    public function getBulkCommitSize()
248
    {
249
        return $this->bulkCommitSize;
250
    }
251
252
    /**
253
     * @param int $bulkCommitSize
254
     */
255
    public function setBulkCommitSize($bulkCommitSize)
256
    {
257
        $this->bulkCommitSize = $bulkCommitSize;
258
    }
259
260
    /**
261
     * Creates a repository.
262
     *
263
     * @param string $className
264
     *
265
     * @return Repository
266
     */
267
    private function createRepository($className)
268
    {
269
        return new Repository($this, $className);
270
    }
271
272
    /**
273
     * Executes search query in the index.
274
     *
275
     * @param array $types             List of types to search in.
276
     * @param array $query             Query to execute.
277
     * @param array $queryStringParams Query parameters.
278
     *
279
     * @return array
280
     */
281
    public function search(array $types, array $query, array $queryStringParams = [])
282
    {
283
        $params = [];
284
        $params['index'] = $this->getIndexName();
285
        
286
        $resolvedTypes = [];
287
        foreach ($types as $type) {
288
            $resolvedTypes[] = $this->resolveTypeName($type);
289
        }
290
        
291
        if (!empty($resolvedTypes)) {
292
            $params['type'] = implode(',', $resolvedTypes);
293
        }
294
        
295
        $params['body'] = $query;
296
297
        if (!empty($queryStringParams)) {
298
            $params = array_merge($queryStringParams, $params);
299
        }
300
301
        $this->stopwatch('start', 'search');
302
        $result = $this->client->search($params);
303
        $this->stopwatch('stop', 'search');
304
305
        return $result;
306
    }
307
308
    /**
309
     * Execute search queries using multisearch api
310
     * $body - is array of requests described in elastic Multi Search API
311
     *
312
     * @param $body
313
     * @return array
314
     */
315
    public function msearch(array $body)
316
    {
317
        $result = $this->client->msearch(
318
            [
319
                'index' => $this->getIndexName(), // set default index
320
                'body' => $body
321
            ]
322
        );
323
        return $result;
324
    }
325
326
    /**
327
     * Adds document to next flush.
328
     *
329
     * @param object $document
330
     */
331
    public function persist($document)
332
    {
333
        $documentArray = $this->converter->convertToArray($document);
334
        $type = $this->getMetadataCollector()->getDocumentType(get_class($document));
335
336
        $this->bulk('index', $type, $documentArray);
337
    }
338
339
    /**
340
     * Adds document for removal.
341
     *
342
     * @param object $document
343
     */
344
    public function remove($document)
345
    {
346
        $data = $this->converter->convertToArray($document, [], ['_id', '_routing']);
347
348
        if (!isset($data['_id'])) {
349
            throw new \LogicException(
350
                'In order to use remove() method document class must have property with @Id annotation.'
351
            );
352
        }
353
354
        $type = $this->getMetadataCollector()->getDocumentType(get_class($document));
355
356
        $this->bulk('delete', $type, $data);
357
    }
358
359
    /**
360
     * Flushes elasticsearch index.
361
     *
362
     * @param array $params
363
     *
364
     * @return array
365
     */
366
    public function flush(array $params = [])
367
    {
368
        return $this->client->indices()->flush(array_merge(['index' => $this->getIndexName()], $params));
369
    }
370
371
    /**
372
     * Refreshes elasticsearch index.
373
     *
374
     * @param array $params
375
     *
376
     * @return array
377
     */
378
    public function refresh(array $params = [])
379
    {
380
        return $this->client->indices()->refresh(array_merge(['index' => $this->getIndexName()], $params));
381
    }
382
383
    /**
384
     * Inserts the current query container to the index, used for bulk queries execution.
385
     *
386
     * @param array $params Parameters that will be passed to the flush or refresh queries.
387
     *
388
     * @return null|array
389
     *
390
     * @throws BulkWithErrorsException
391
     */
392
    public function commit(array $params = [])
393
    {
394
        if (!empty($this->bulkQueries)) {
395
            $bulkQueries = array_merge($this->bulkQueries, $this->bulkParams);
396
            $bulkQueries['index']['_index'] = $this->getIndexName();
397
            $this->eventDispatcher->dispatch(
398
                Events::PRE_COMMIT,
399
                new CommitEvent($this->getCommitMode(), $bulkQueries)
400
            );
401
402
            $this->stopwatch('start', 'bulk');
403
            $bulkResponse = $this->client->bulk($bulkQueries);
404
            $this->stopwatch('stop', 'bulk');
405
406
            if ($bulkResponse['errors']) {
407
                throw new BulkWithErrorsException(
408
                    json_encode($bulkResponse),
409
                    0,
410
                    null,
411
                    $bulkResponse
412
                );
413
            }
414
415
            $this->bulkQueries = [];
416
            $this->bulkCount = 0;
417
418
            $this->stopwatch('start', 'refresh');
419
420
            switch ($this->getCommitMode()) {
421
                case 'flush':
422
                    $this->flush($params);
423
                    break;
424
                case 'refresh':
425
                    $this->refresh($params);
426
                    break;
427
            }
428
429
            $this->eventDispatcher->dispatch(
430
                Events::POST_COMMIT,
431
                new CommitEvent($this->getCommitMode(), $bulkResponse)
432
            );
433
434
            $this->stopwatch('stop', 'refresh');
435
436
            return $bulkResponse;
437
        }
438
439
        return null;
440
    }
441
442
    /**
443
     * Adds query to bulk queries container.
444
     *
445
     * @param string       $operation One of: index, update, delete, create.
446
     * @param string|array $type      Elasticsearch type name.
447
     * @param array        $query     DSL to execute.
448
     *
449
     * @throws \InvalidArgumentException
450
     *
451
     * @return null|array
452
     */
453
    public function bulk($operation, $type, array $query)
454
    {
455
        if (!in_array($operation, ['index', 'create', 'update', 'delete'])) {
456
            throw new \InvalidArgumentException('Wrong bulk operation selected');
457
        }
458
459
        $this->eventDispatcher->dispatch(
460
            Events::BULK,
461
            new BulkEvent($operation, $type, $query)
462
        );
463
464
        $this->bulkQueries['body'][] = [
465
            $operation => array_filter(
466
                [
467
                    '_type' => $type,
468
                    '_id' => isset($query['_id']) ? $query['_id'] : null,
469
                    '_ttl' => isset($query['_ttl']) ? $query['_ttl'] : null,
470
                    '_routing' => isset($query['_routing']) ? $query['_routing'] : null,
471
                    '_parent' => isset($query['_parent']) ? $query['_parent'] : null,
472
                ]
473
            ),
474
        ];
475
        unset($query['_id'], $query['_ttl'], $query['_parent'], $query['_routing']);
476
477
        switch ($operation) {
478
            case 'index':
479
            case 'create':
480
            case 'update':
481
                $this->bulkQueries['body'][] = $query;
482
                break;
483
            case 'delete':
484
                // Body for delete operation is not needed to apply.
485
            default:
486
                // Do nothing.
487
                break;
488
        }
489
490
        // We are using counter because there is to difficult to resolve this from bulkQueries array.
491
        $this->bulkCount++;
492
493
        $response = null;
494
495
        if ($this->bulkCommitSize === $this->bulkCount) {
496
            $response = $this->commit();
497
        }
498
499
        return $response;
500
    }
501
502
    /**
503
     * Optional setter to change bulk query params.
504
     *
505
     * @param array $params Possible keys:
506
     *                      ['consistency'] = (enum) Explicit write consistency setting for the operation.
507
     *                      ['refresh']     = (boolean) Refresh the index after performing the operation.
508
     *                      ['replication'] = (enum) Explicitly set the replication type.
509
     */
510
    public function setBulkParams(array $params)
511
    {
512
        $this->bulkParams = $params;
513
    }
514
515
    /**
516
     * Creates fresh elasticsearch index.
517
     *
518
     * @param bool $noMapping Determines if mapping should be included.
519
     *
520
     * @return array
521
     */
522
    public function createIndex($noMapping = false)
523
    {
524
        if ($noMapping) {
525
            unset($this->indexSettings['body']['mappings']);
526
        }
527
528
        return $this->getClient()->indices()->create($this->indexSettings);
529
    }
530
531
    /**
532
     * Drops elasticsearch index.
533
     */
534
    public function dropIndex()
535
    {
536
        return $this->getClient()->indices()->delete(['index' => $this->getIndexName()]);
537
    }
538
539
    /**
540
     * Tries to drop and create fresh elasticsearch index.
541
     *
542
     * @param bool $noMapping Determines if mapping should be included.
543
     *
544
     * @return array
545
     */
546
    public function dropAndCreateIndex($noMapping = false)
547
    {
548
        try {
549
            if ($this->indexExists()) {
550
                $this->dropIndex();
551
            }
552
        } catch (\Exception $e) {
553
            // Do nothing, our target is to create new index.
554
        }
555
556
        return $this->createIndex($noMapping);
557
    }
558
559
    /**
560
     * Checks if connection index is already created.
561
     *
562
     * @return bool
563
     */
564
    public function indexExists()
565
    {
566
        return $this->getClient()->indices()->exists(['index' => $this->getIndexName()]);
567
    }
568
569
    /**
570
     * Returns index name this connection is attached to.
571
     *
572
     * @return string
573
     */
574
    public function getIndexName()
575
    {
576
        return $this->indexSettings['index'];
577
    }
578
579
    /**
580
     * Sets index name for this connection.
581
     *
582
     * @param string $name
583
     */
584
    public function setIndexName($name)
585
    {
586
        $this->indexSettings['index'] = $name;
587
    }
588
589
    /**
590
     * Returns mappings of the index for this connection.
591
     *
592
     * @return array
593
     */
594
    public function getIndexMappings()
595
    {
596
        return $this->indexSettings['body']['mappings'];
597
    }
598
599
    /**
600
     * Returns Elasticsearch version number.
601
     *
602
     * @return string
603
     */
604
    public function getVersionNumber()
605
    {
606
        return $this->client->info()['version']['number'];
607
    }
608
609
    /**
610
     * Clears elasticsearch client cache.
611
     */
612
    public function clearCache()
613
    {
614
        $this->getClient()->indices()->clearCache(['index' => $this->getIndexName()]);
615
    }
616
617
    /**
618
     * Returns a single document by ID. Returns NULL if document was not found.
619
     *
620
     * @param string $className Document class name or Elasticsearch type name
621
     * @param string $id        Document ID to find
622
     * @param string $routing   Custom routing for the document
623
     *
624
     * @return object
625
     */
626
    public function find($className, $id, $routing = null)
627
    {
628
        $type = $this->resolveTypeName($className);
629
630
        $params = [
631
            'index' => $this->getIndexName(),
632
            'type' => $type,
633
            'id' => $id,
634
        ];
635
636
        if ($routing) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $routing of type string|null is loosely compared to true; this is ambiguous if the string can be empty. You might want to explicitly use !== null instead.

In PHP, under loose comparison (like ==, or !=, or switch conditions), values of different types might be equal.

For string values, the empty string '' is a special case, in particular the following results might be unexpected:

''   == false // true
''   == null  // true
'ab' == false // false
'ab' == null  // false

// It is often better to use strict comparison
'' === false // false
'' === null  // false
Loading history...
637
            $params['routing'] = $routing;
638
        }
639
640
        try {
641
            $result = $this->getClient()->get($params);
642
        } catch (Missing404Exception $e) {
0 ignored issues
show
Bug introduced by
The class Elasticsearch\Common\Exc...ons\Missing404Exception does not exist. Did you forget a USE statement, or did you not list all dependencies?

Scrutinizer analyzes your composer.json/composer.lock file if available to determine the classes, and functions that are defined by your dependencies.

It seems like the listed class was neither found in your dependencies, nor was it found in the analyzed files in your repository. If you are using some other form of dependency management, you might want to disable this analysis.

Loading history...
643
            return null;
644
        }
645
646
        return $this->getConverter()->convertToDocument($result, $this);
647
    }
648
649
    /**
650
     * Fetches next set of results.
651
     *
652
     * @param string $scrollId
653
     * @param string $scrollDuration
654
     *
655
     * @return mixed
656
     *
657
     * @throws \Exception
658
     */
659
    public function scroll(
660
        $scrollId,
661
        $scrollDuration = '5m'
662
    ) {
663
        $results = $this->getClient()->scroll(['scroll_id' => $scrollId, 'scroll' => $scrollDuration]);
664
665
        return $results;
666
    }
667
668
    /**
669
     * Clears scroll.
670
     *
671
     * @param string $scrollId
672
     */
673
    public function clearScroll($scrollId)
674
    {
675
        $this->getClient()->clearScroll(['scroll_id' => $scrollId]);
676
    }
677
678
    /**
679
     * Calls "Get Settings API" in Elasticsearch and will return you the currently configured settings.
680
     *
681
     * return array
682
     */
683
    public function getSettings()
684
    {
685
        return $this->getClient()->indices()->getSettings(['index' => $this->getIndexName()]);
686
    }
687
688
    /**
689
     * Gets Elasticsearch aliases information.
690
     * @param $params
691
     *
692
     * @return array
693
     */
694
    public function getAliases($params = [])
695
    {
696
        return $this->getClient()->indices()->getAliases(array_merge(['index' => $this->getIndexName()], $params));
697
    }
698
699
    /**
700
     * Resolves type name by class name.
701
     *
702
     * @param string $className
703
     *
704
     * @return string
705
     */
706
    private function resolveTypeName($className)
707
    {
708
        if (strpos($className, ':') !== false || strpos($className, '\\') !== false) {
709
            return $this->getMetadataCollector()->getDocumentType($className);
710
        }
711
712
        return $className;
713
    }
714
715
    /**
716
     * Starts and stops an event in the stopwatch
717
     *
718
     * @param string $action   only 'start' and 'stop'
719
     * @param string $name     name of the event
720
     */
721
    private function stopwatch($action, $name)
722
    {
723
        if (isset($this->stopwatch)) {
724
            $this->stopwatch->$action('ongr_es: '.$name, 'ongr_es');
725
        }
726
    }
727
}
728