Completed
Pull Request — master (#640)
by
unknown
02:59
created

Manager::msearch()   A

Complexity

Conditions 3
Paths 3

Size

Total Lines 22
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 1
Metric Value
c 1
b 0
f 1
dl 0
loc 22
rs 9.2
cc 3
eloc 13
nc 3
nop 1
1
<?php
2
3
/*
4
 * This file is part of the ONGR package.
5
 *
6
 * (c) NFQ Technologies UAB <[email protected]>
7
 *
8
 * For the full copyright and license information, please view the LICENSE
9
 * file that was distributed with this source code.
10
 */
11
12
namespace ONGR\ElasticsearchBundle\Service;
13
14
use Elasticsearch\Client;
15
use Elasticsearch\Common\Exceptions\Missing404Exception;
16
use ONGR\ElasticsearchBundle\Mapping\MetadataCollector;
17
use ONGR\ElasticsearchBundle\Result\AbstractResultsIterator;
18
use ONGR\ElasticsearchBundle\Result\Converter;
19
use ONGR\ElasticsearchBundle\Result\DocumentIterator;
20
use ONGR\ElasticsearchBundle\Result\RawIterator;
21
use ONGR\ElasticsearchBundle\Result\Result;
22
use ONGR\ElasticsearchDSL\Search;
23
24
/**
25
 * Manager class.
26
 */
27
class Manager
28
{
29
    /**
30
     * @var string Manager name
31
     */
32
    private $name;
33
34
    /**
35
     * @var array Manager configuration
36
     */
37
    private $config = [];
38
39
    /**
40
     * @var Client
41
     */
42
    private $client;
43
44
    /**
45
     * @var Converter
46
     */
47
    private $converter;
48
49
    /**
50
     * @var array Container for bulk queries
51
     */
52
    private $bulkQueries = [];
53
54
    /**
55
     * @var array Container for msearch queries
56
     */
57
    private $msearchQueries = [];
58
59
    /**
60
     * @var array Holder for consistency, refresh and replication parameters
61
     */
62
    private $bulkParams = [];
63
64
    /**
65
     * @var array Holder for consistency, refresh and replication parameters
66
     */
67
    private $msearchParams = [];
68
69
    /**
70
     * @var array
71
     */
72
    private $indexSettings;
73
74
    /**
75
     * @var MetadataCollector
76
     */
77
    private $metadataCollector;
78
79
    /**
80
     * After commit to make data available the refresh or flush operation is needed
81
     * so one of those methods has to be defined, the default is refresh.
82
     *
83
     * @var string
84
     */
85
    private $commitMode = 'refresh';
86
87
    /**
88
     * The size that defines after how much document inserts call commit function.
89
     *
90
     * @var int
91
     */
92
    private $bulkCommitSize = 100;
93
94
    /**
95
     * The size that defines how many searches can be executed with msearch at once.
96
     *
97
     * @var int
98
     */
99
    private $msearchSize = 100;
100
101
    /**
102
     * Container to count how many documents was passed to the bulk query.
103
     *
104
     * @var int
105
     */
106
    private $bulkCount = 0;
107
108
    /**
109
     * Container to count how many queries were passed to the msearch.
110
     *
111
     * @var int
112
     */
113
    private $msearchCount = 0;
114
115
    /**
116
     * @var Repository[] Repository local cache
117
     */
118
    private $repositories;
119
120
    /**
121
     * @param string            $name              Manager name
122
     * @param array             $config            Manager configuration
123
     * @param Client            $client
124
     * @param array             $indexSettings
125
     * @param MetadataCollector $metadataCollector
126
     * @param Converter         $converter
127
     */
128
    public function __construct(
129
        $name,
130
        array $config,
131
        $client,
132
        array $indexSettings,
133
        $metadataCollector,
134
        $converter
135
    ) {
136
        $this->name = $name;
137
        $this->config = $config;
138
        $this->client = $client;
139
        $this->indexSettings = $indexSettings;
140
        $this->metadataCollector = $metadataCollector;
141
        $this->converter = $converter;
142
    }
143
144
    /**
145
     * Returns Elasticsearch connection.
146
     *
147
     * @return Client
148
     */
149
    public function getClient()
150
    {
151
        return $this->client;
152
    }
153
154
    /**
155
     * @return string
156
     */
157
    public function getName()
158
    {
159
        return $this->name;
160
    }
161
162
    /**
163
     * @return array
164
     */
165
    public function getMsearchParams()
166
    {
167
        return $this->msearchParams;
168
    }
169
170
    /**
171
     * @param array $msearchParams
172
     */
173
    public function setMsearchParams($msearchParams)
174
    {
175
        $this->msearchParams = $msearchParams;
176
    }
177
178
    /**
179
     * @return int
180
     */
181
    public function getMsearchSize()
182
    {
183
        return $this->msearchSize;
184
    }
185
186
    /**
187
     * @param int $msearchSize
188
     */
189
    public function setMsearchSize($msearchSize)
190
    {
191
        $this->msearchSize = $msearchSize;
192
    }
193
194
    /**
195
     * @return array
196
     */
197
    public function getConfig()
198
    {
199
        return $this->config;
200
    }
201
202
    /**
203
     * Returns repository by document class.
204
     *
205
     * @param string $className FQCN or string in Bundle:Document format
206
     *
207
     * @return Repository
208
     */
209
    public function getRepository($className)
210
    {
211
        if (!is_string($className)) {
212
            throw new \InvalidArgumentException('Document class must be a string.');
213
        }
214
215
        $namespace = $this->getMetadataCollector()->getClassName($className);
216
217
        if (isset($this->repositories[$namespace])) {
218
            return $this->repositories[$namespace];
219
        }
220
221
        $repository = $this->createRepository($namespace);
222
        $this->repositories[$namespace] = $repository;
223
224
        return $repository;
225
    }
226
227
    /**
228
     * @return MetadataCollector
229
     */
230
    public function getMetadataCollector()
231
    {
232
        return $this->metadataCollector;
233
    }
234
235
    /**
236
     * @return Converter
237
     */
238
    public function getConverter()
239
    {
240
        return $this->converter;
241
    }
242
243
    /**
244
     * @return string
245
     */
246
    public function getCommitMode()
247
    {
248
        return $this->commitMode;
249
    }
250
251
    /**
252
     * @param string $commitMode
253
     */
254
    public function setCommitMode($commitMode)
255
    {
256
        if ($commitMode === 'refresh' || $commitMode === 'flush' || $commitMode === 'none') {
257
            $this->commitMode = $commitMode;
258
        } else {
259
            throw new \LogicException('The commit method must be either refresh, flush or none.');
260
        }
261
    }
262
263
    /**
264
     * @return int
265
     */
266
    public function getBulkCommitSize()
267
    {
268
        return $this->bulkCommitSize;
269
    }
270
271
    /**
272
     * @param int $bulkCommitSize
273
     */
274
    public function setBulkCommitSize($bulkCommitSize)
275
    {
276
        $this->bulkCommitSize = $bulkCommitSize;
277
    }
278
279
    /**
280
     * Creates a repository.
281
     *
282
     * @param string $className
283
     *
284
     * @return Repository
285
     */
286
    private function createRepository($className)
287
    {
288
        return new Repository($this, $className);
289
    }
290
291
    /**
292
     * Executes search query in the index.
293
     *
294
     * @param array $types             List of types to search in.
295
     * @param array $query             Query to execute.
296
     * @param array $queryStringParams Query parameters.
297
     *
298
     * @return array
299
     */
300
    public function search(array $types, array $query, array $queryStringParams = [])
301
    {
302
        $params = [];
303
        $params['index'] = $this->getIndexName();
304
        $params['type'] = implode(',', $types);
305
        $params['body'] = $query;
306
307
        if (!empty($queryStringParams)) {
308
            $params = array_merge($queryStringParams, $params);
309
        }
310
311
        return $this->client->search($params);
312
    }
313
314
    /**
315
     * Adds document to next flush.
316
     *
317
     * @param object $document
318
     */
319
    public function persist($document)
320
    {
321
        $documentArray = $this->converter->convertToArray($document);
322
        $type = $this->getMetadataCollector()->getDocumentType(get_class($document));
323
324
        $this->bulk('index', $type, $documentArray);
325
    }
326
327
    /**
328
     * Adds document for removal.
329
     *
330
     * @param object $document
331
     */
332
    public function remove($document)
333
    {
334
        $data = $this->converter->convertToArray($document, [], ['_id']);
335
336
        if (!isset($data['_id'])) {
337
            throw new \LogicException(
338
                'In order to use remove() method document class must have property with @Id annotation.'
339
            );
340
        }
341
342
        $type = $this->getMetadataCollector()->getDocumentType(get_class($document));
343
344
        $this->bulk('delete', $type, ['_id' => $data['_id']]);
345
    }
346
347
    /**
348
     * Flushes elasticsearch index.
349
     *
350
     * @param array $params
351
     *
352
     * @return array
353
     */
354
    public function flush(array $params = [])
355
    {
356
        return $this->client->indices()->flush(array_merge(['index' => $this->getIndexName()], $params));
357
    }
358
359
    /**
360
     * Refreshes elasticsearch index.
361
     *
362
     * @param array $params
363
     *
364
     * @return array
365
     */
366
    public function refresh(array $params = [])
367
    {
368
        return $this->client->indices()->refresh(array_merge(['index' => $this->getIndexName()], $params));
369
    }
370
371
    /**
372
     * Inserts the current query container to the index, used for bulk queries execution.
373
     *
374
     * @param array $params Parameters that will be passed to the flush or refresh queries.
375
     *
376
     * @return null|array
377
     */
378
    public function commit(array $params = [])
379
    {
380
        if (!empty($this->bulkQueries)) {
381
            $bulkQueries = array_merge($this->bulkQueries, $this->bulkParams);
382
383
            $bulkResponse = $this->client->bulk($bulkQueries);
384
            $this->bulkQueries = [];
385
            $this->bulkCount = 0;
386
387
            switch ($this->getCommitMode()) {
388
                case 'flush':
389
                    $this->flush($params);
390
                    break;
391
                case 'refresh':
392
                    $this->refresh($params);
393
                    break;
394
            }
395
396
            return $bulkResponse;
397
        }
398
399
        return null;
400
    }
401
402
    /**
403
     * Executes multi search queries
404
     *
405
     * @param string $resultsType
406
     *
407
     * @return null|array
408
     */
409
    public function msearch($resultsType = Result::RESULTS_OBJECT)
410
    {
411
        if (!empty($this->msearchQueries)) {
412
            $bulkQueries = array_merge($this->msearchQueries, $this->msearchParams);
413
414
            $msearchResponse = $this->client->msearch($bulkQueries);
415
            $this->msearchQueries = [];
416
            $this->msearchCount = 0;
417
418
            foreach ($msearchResponse['responses'] as $key => $result) {
419
                $msearchResponse[$key] = $this->parseResult(
420
                    $result,
421
                    $resultsType
422
                );
423
            }
424
            unset($msearchResponse['responses']);
425
426
            return $msearchResponse;
427
        }
428
429
        return null;
430
    }
431
432
    /**
433
     * Adds query to bulk queries container.
434
     *
435
     * @param string       $operation One of: index, update, delete, create.
436
     * @param string|array $type      Elasticsearch type name.
437
     * @param array        $query     DSL to execute.
438
     *
439
     * @throws \InvalidArgumentException
440
     *
441
     * @return null|array
442
     */
443
    public function bulk($operation, $type, array $query)
444
    {
445
        if (!in_array($operation, ['index', 'create', 'update', 'delete'])) {
446
            throw new \InvalidArgumentException('Wrong bulk operation selected');
447
        }
448
449
        $this->bulkQueries['body'][] = [
450
            $operation => array_filter(
451
                [
452
                    '_index' => $this->getIndexName(),
453
                    '_type' => $type,
454
                    '_id' => isset($query['_id']) ? $query['_id'] : null,
455
                    '_ttl' => isset($query['_ttl']) ? $query['_ttl'] : null,
456
                    '_parent' => isset($query['_parent']) ? $query['_parent'] : null,
457
                ]
458
            ),
459
        ];
460
        unset($query['_id'], $query['_ttl'], $query['_parent']);
461
462
        switch ($operation) {
463
            case 'index':
464
            case 'create':
465
            case 'update':
466
                $this->bulkQueries['body'][] = $query;
467
                break;
468
            case 'delete':
469
                // Body for delete operation is not needed to apply.
470
            default:
471
                // Do nothing.
472
                break;
473
        }
474
475
        // We are using counter because there is to difficult to resolve this from bulkQueries array.
476
        $this->bulkCount++;
477
478
        $response = null;
479
        if ($this->bulkCommitSize === $this->bulkCount) {
480
            $response = $this->commit();
481
        }
482
483
        return $response;
484
    }
485
486
    /**
487
     * Adds query to bulk queries container.
488
     *
489
     * @param Search   $search     DSL to execute.
490
     * @param array    $header    Search header.
491
     *
492
     * @return null|array
493
     */
494
    public function addSearch(Search $search, $header = [])
495
    {
496
        $header['index'] = $this->getIndexName();
497
        $search = $search->toArray();
0 ignored issues
show
Coding Style introduced by
Consider using a different name than the parameter $search. This often makes code more readable.
Loading history...
498
499
        $this->msearchQueries['body'][] = $header;
500
        $this->msearchQueries['body'][] = $search;
501
502
        // We are using counter because there is to difficult to resolve this from bulkQueries array.
503
        $this->msearchCount++;
504
505
        $response = null;
506
        if ($this->msearchSize === $this->msearchCount) {
507
            $response = $this->msearch();
508
        }
509
510
        return $response;
511
    }
512
513
    /**
514
     * Optional setter to change bulk query params.
515
     *
516
     * @param array $params Possible keys:
517
     *                      ['consistency'] = (enum) Explicit write consistency setting for the operation.
518
     *                      ['refresh']     = (boolean) Refresh the index after performing the operation.
519
     *                      ['replication'] = (enum) Explicitly set the replication type.
520
     */
521
    public function setBulkParams(array $params)
522
    {
523
        $this->bulkParams = $params;
524
    }
525
526
    /**
527
     * Creates fresh elasticsearch index.
528
     *
529
     * @param bool $noMapping Determines if mapping should be included.
530
     *
531
     * @return array
532
     */
533
    public function createIndex($noMapping = false)
534
    {
535
        if ($noMapping) {
536
            unset($this->indexSettings['body']['mappings']);
537
        }
538
539
        return $this->getClient()->indices()->create($this->indexSettings);
540
    }
541
542
    /**
543
     * Drops elasticsearch index.
544
     */
545
    public function dropIndex()
546
    {
547
        return $this->getClient()->indices()->delete(['index' => $this->getIndexName()]);
548
    }
549
550
    /**
551
     * Tries to drop and create fresh elasticsearch index.
552
     *
553
     * @param bool $noMapping Determines if mapping should be included.
554
     *
555
     * @return array
556
     */
557
    public function dropAndCreateIndex($noMapping = false)
558
    {
559
        try {
560
            $this->dropIndex();
561
        } catch (\Exception $e) {
562
            // Do nothing, our target is to create new index.
563
        }
564
565
        return $this->createIndex($noMapping);
566
    }
567
568
    /**
569
     * Checks if connection index is already created.
570
     *
571
     * @return bool
572
     */
573
    public function indexExists()
574
    {
575
        return $this->getClient()->indices()->exists(['index' => $this->getIndexName()]);
576
    }
577
578
    /**
579
     * Returns index name this connection is attached to.
580
     *
581
     * @return string
582
     */
583
    public function getIndexName()
584
    {
585
        return $this->indexSettings['index'];
586
    }
587
588
    /**
589
     * Sets index name for this connection.
590
     *
591
     * @param string $name
592
     */
593
    public function setIndexName($name)
594
    {
595
        $this->indexSettings['index'] = $name;
596
    }
597
598
    /**
599
     * Returns Elasticsearch version number.
600
     *
601
     * @return string
602
     */
603
    public function getVersionNumber()
604
    {
605
        return $this->client->info()['version']['number'];
606
    }
607
608
    /**
609
     * Clears elasticsearch client cache.
610
     */
611
    public function clearCache()
612
    {
613
        $this->getClient()->indices()->clearCache(['index' => $this->getIndexName()]);
614
    }
615
616
    /**
617
     * Returns a single document by ID. Returns NULL if document was not found.
618
     *
619
     * @param string $className Document class name or Elasticsearch type name
620
     * @param string $id        Document ID to find
621
     *
622
     * @return object
623
     */
624
    public function find($className, $id)
625
    {
626
        $type = $this->resolveTypeName($className);
627
628
        $params = [
629
            'index' => $this->getIndexName(),
630
            'type' => $type,
631
            'id' => $id,
632
        ];
633
634
        try {
635
            $result = $this->getClient()->get($params);
636
        } catch (Missing404Exception $e) {
637
            return null;
638
        }
639
640
        return $this->getConverter()->convertToDocument($result, $this);
0 ignored issues
show
Documentation introduced by
$result is of type callable, but the function expects a array.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
641
    }
642
643
    /**
644
     * Executes given search.
645
     *
646
     * @param array  $types
647
     * @param Search $search
648
     * @param string $resultsType
649
     *
650
     * @return DocumentIterator|RawIterator|array
651
     */
652
    public function execute($types, Search $search, $resultsType = Result::RESULTS_OBJECT)
653
    {
654
        foreach ($types as &$type) {
655
            $type = $this->resolveTypeName($type);
656
        }
657
658
        $results = $this->search($types, $search->toArray(), $search->getQueryParams());
659
660
        return $this->parseResult($results, $resultsType, $search->getScroll());
0 ignored issues
show
Documentation introduced by
$results is of type callable, but the function expects a array.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
661
    }
662
663
    /**
664
     * Parses raw result.
665
     *
666
     * @param array  $raw
667
     * @param string $resultsType
668
     * @param string $scrollDuration
669
     *
670
     * @return DocumentIterator|RawIterator|array
671
     *
672
     * @throws \Exception
673
     */
674
    private function parseResult($raw, $resultsType, $scrollDuration = null)
675
    {
676
        $scrollConfig = [];
677
        if (isset($raw['_scroll_id'])) {
678
            $scrollConfig['_scroll_id'] = $raw['_scroll_id'];
679
            $scrollConfig['duration'] = $scrollDuration;
680
        }
681
682
        switch ($resultsType) {
683
            case Result::RESULTS_OBJECT:
684
                return new DocumentIterator($raw, $this, $scrollConfig);
685
            case Result::RESULTS_ARRAY:
686
                return $this->convertToNormalizedArray($raw);
687
            case Result::RESULTS_RAW:
688
                return $raw;
689
            case Result::RESULTS_RAW_ITERATOR:
690
                return new RawIterator($raw, $this, $scrollConfig);
691
            default:
692
                throw new \Exception('Wrong results type selected');
693
        }
694
    }
695
696
    /**
697
     * Normalizes response array.
698
     *
699
     * @param array $data
700
     *
701
     * @return array
702
     */
703
    private function convertToNormalizedArray($data)
704
    {
705
        if (array_key_exists('_source', $data)) {
706
            return $data['_source'];
707
        }
708
709
        $output = [];
710
711
        if (isset($data['hits']['hits'][0]['_source'])) {
712
            foreach ($data['hits']['hits'] as $item) {
713
                $output[] = $item['_source'];
714
            }
715
        } elseif (isset($data['hits']['hits'][0]['fields'])) {
716
            foreach ($data['hits']['hits'] as $item) {
717
                $output[] = array_map('reset', $item['fields']);
718
            }
719
        }
720
721
        return $output;
722
    }
723
724
    /**
725
     * Fetches next set of results.
726
     *
727
     * @param string $scrollId
728
     * @param string $scrollDuration
729
     * @param string $resultsType
730
     *
731
     * @return AbstractResultsIterator
732
     *
733
     * @throws \Exception
734
     */
735
    public function scroll(
736
        $scrollId,
737
        $scrollDuration = '5m',
738
        $resultsType = Result::RESULTS_OBJECT
739
    ) {
740
        $results = $this->getClient()->scroll(['scroll_id' => $scrollId, 'scroll' => $scrollDuration]);
741
742
        return $this->parseResult($results, $resultsType, $scrollDuration);
0 ignored issues
show
Documentation introduced by
$results is of type callable, but the function expects a array.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
743
    }
744
745
    /**
746
     * Clears scroll.
747
     *
748
     * @param string $scrollId
749
     */
750
    public function clearScroll($scrollId)
751
    {
752
        $this->getClient()->clearScroll(['scroll_id' => $scrollId]);
753
    }
754
755
    /**
756
     * Resolves type name by class name.
757
     *
758
     * @param string $className
759
     *
760
     * @return string
761
     */
762
    private function resolveTypeName($className)
763
    {
764
        if (strpos($className, ':') !== false || strpos($className, '\\') !== false) {
765
            return $this->getMetadataCollector()->getDocumentType($className);
766
        }
767
768
        return $className;
769
    }
770
}
771