Completed
Pull Request — master (#640)
by
unknown
02:28
created

Manager::getMsearchParams()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 1
Metric Value
c 1
b 0
f 1
dl 0
loc 4
rs 10
cc 1
eloc 2
nc 1
nop 0
1
<?php
2
3
/*
4
 * This file is part of the ONGR package.
5
 *
6
 * (c) NFQ Technologies UAB <[email protected]>
7
 *
8
 * For the full copyright and license information, please view the LICENSE
9
 * file that was distributed with this source code.
10
 */
11
12
namespace ONGR\ElasticsearchBundle\Service;
13
14
use Elasticsearch\Client;
15
use Elasticsearch\Common\Exceptions\Missing404Exception;
16
use ONGR\ElasticsearchBundle\Mapping\MetadataCollector;
17
use ONGR\ElasticsearchBundle\Result\AbstractResultsIterator;
18
use ONGR\ElasticsearchBundle\Result\Converter;
19
use ONGR\ElasticsearchBundle\Result\DocumentIterator;
20
use ONGR\ElasticsearchBundle\Result\RawIterator;
21
use ONGR\ElasticsearchBundle\Result\Result;
22
use ONGR\ElasticsearchDSL\Search;
23
24
/**
25
 * Manager class.
26
 */
27
class Manager
28
{
29
    /**
30
     * @var string Manager name
31
     */
32
    private $name;
33
34
    /**
35
     * @var array Manager configuration
36
     */
37
    private $config = [];
38
39
    /**
40
     * @var Client
41
     */
42
    private $client;
43
44
    /**
45
     * @var Converter
46
     */
47
    private $converter;
48
49
    /**
50
     * @var array Container for bulk queries
51
     */
52
    private $bulkQueries = [];
53
54
    /**
55
     * @var array Container for msearch queries
56
     */
57
    private $msearchQueries = [];
58
59
    /**
60
     * @var array Holder for consistency, refresh and replication parameters
61
     */
62
    private $bulkParams = [];
63
64
    /**
65
     * @var array Holder for consistency, refresh and replication parameters
66
     */
67
    private $msearchParams = [];
68
69
    /**
70
     * @var array
71
     */
72
    private $indexSettings;
73
74
    /**
75
     * @var MetadataCollector
76
     */
77
    private $metadataCollector;
78
79
    /**
80
     * After commit to make data available the refresh or flush operation is needed
81
     * so one of those methods has to be defined, the default is refresh.
82
     *
83
     * @var string
84
     */
85
    private $commitMode = 'refresh';
86
87
    /**
88
     * The size that defines after how much document inserts call commit function.
89
     *
90
     * @var int
91
     */
92
    private $bulkCommitSize = 100;
93
94
    /**
95
     * The size that defines how many searches can be executed with msearch at once.
96
     *
97
     * @var int
98
     */
99
    private $msearchSize = 100;
100
101
    /**
102
     * Container to count how many documents was passed to the bulk query.
103
     *
104
     * @var int
105
     */
106
    private $bulkCount = 0;
107
108
    /**
109
     * Container to count how many queries were passed to the msearch.
110
     *
111
     * @var int
112
     */
113
    private $msearchCount = 0;
114
115
    /**
116
     * @var Repository[] Repository local cache
117
     */
118
    private $repositories;
119
120
    /**
121
     * @param string            $name              Manager name
122
     * @param array             $config            Manager configuration
123
     * @param Client            $client
124
     * @param array             $indexSettings
125
     * @param MetadataCollector $metadataCollector
126
     * @param Converter         $converter
127
     */
128
    public function __construct(
129
        $name,
130
        array $config,
131
        $client,
132
        array $indexSettings,
133
        $metadataCollector,
134
        $converter
135
    ) {
136
        $this->name = $name;
137
        $this->config = $config;
138
        $this->client = $client;
139
        $this->indexSettings = $indexSettings;
140
        $this->metadataCollector = $metadataCollector;
141
        $this->converter = $converter;
142
    }
143
144
    /**
145
     * Returns Elasticsearch connection.
146
     *
147
     * @return Client
148
     */
149
    public function getClient()
150
    {
151
        return $this->client;
152
    }
153
154
    /**
155
     * @return string
156
     */
157
    public function getName()
158
    {
159
        return $this->name;
160
    }
161
162
    /**
163
     * @return array
164
     */
165
    public function getMsearchParams()
166
    {
167
        return $this->msearchParams;
168
    }
169
170
    /**
171
     * @param array $msearchParams
172
     */
173
    public function setMsearchParams($msearchParams)
174
    {
175
        $this->msearchParams = $msearchParams;
176
    }
177
178
    /**
179
     * @return int
180
     */
181
    public function getMsearchSize()
182
    {
183
        return $this->msearchSize;
184
    }
185
186
    /**
187
     * @param int $msearchSize
188
     */
189
    public function setMsearchSize($msearchSize)
190
    {
191
        $this->msearchSize = $msearchSize;
192
    }
193
194
    /**
195
     * @return array
196
     */
197
    public function getConfig()
198
    {
199
        return $this->config;
200
    }
201
202
    /**
203
     * Returns repository by document class.
204
     *
205
     * @param string $className FQCN or string in Bundle:Document format
206
     *
207
     * @return Repository
208
     */
209
    public function getRepository($className)
210
    {
211
        if (!is_string($className)) {
212
            throw new \InvalidArgumentException('Document class must be a string.');
213
        }
214
215
        $namespace = $this->getMetadataCollector()->getClassName($className);
216
217
        if (isset($this->repositories[$namespace])) {
218
            return $this->repositories[$namespace];
219
        }
220
221
        $repository = $this->createRepository($namespace);
222
        $this->repositories[$namespace] = $repository;
223
224
        return $repository;
225
    }
226
227
    /**
228
     * @return MetadataCollector
229
     */
230
    public function getMetadataCollector()
231
    {
232
        return $this->metadataCollector;
233
    }
234
235
    /**
236
     * @return Converter
237
     */
238
    public function getConverter()
239
    {
240
        return $this->converter;
241
    }
242
243
    /**
244
     * @return string
245
     */
246
    public function getCommitMode()
247
    {
248
        return $this->commitMode;
249
    }
250
251
    /**
252
     * @param string $commitMode
253
     */
254
    public function setCommitMode($commitMode)
255
    {
256
        if ($commitMode === 'refresh' || $commitMode === 'flush' || $commitMode === 'none') {
257
            $this->commitMode = $commitMode;
258
        } else {
259
            throw new \LogicException('The commit method must be either refresh, flush or none.');
260
        }
261
    }
262
263
    /**
264
     * @return int
265
     */
266
    public function getBulkCommitSize()
267
    {
268
        return $this->bulkCommitSize;
269
    }
270
271
    /**
272
     * @param int $bulkCommitSize
273
     */
274
    public function setBulkCommitSize($bulkCommitSize)
275
    {
276
        $this->bulkCommitSize = $bulkCommitSize;
277
    }
278
279
    /**
280
     * Creates a repository.
281
     *
282
     * @param string $className
283
     *
284
     * @return Repository
285
     */
286
    private function createRepository($className)
287
    {
288
        return new Repository($this, $className);
289
    }
290
291
    /**
292
     * Executes search query in the index.
293
     *
294
     * @param array $types             List of types to search in.
295
     * @param array $query             Query to execute.
296
     * @param array $queryStringParams Query parameters.
297
     *
298
     * @return array
299
     */
300
    public function search(array $types, array $query, array $queryStringParams = [])
301
    {
302
        $params = [];
303
        $params['index'] = $this->getIndexName();
304
        $params['type'] = implode(',', $types);
305
        $params['body'] = $query;
306
307
        if (!empty($queryStringParams)) {
308
            $params = array_merge($queryStringParams, $params);
309
        }
310
311
        return $this->client->search($params);
312
    }
313
314
    /**
315
     * Adds document to next flush.
316
     *
317
     * @param object $document
318
     */
319
    public function persist($document)
320
    {
321
        $documentArray = $this->converter->convertToArray($document);
322
        $type = $this->getMetadataCollector()->getDocumentType(get_class($document));
323
324
        $this->bulk('index', $type, $documentArray);
325
    }
326
327
    /**
328
     * Adds document for removal.
329
     *
330
     * @param object $document
331
     */
332
    public function remove($document)
333
    {
334
        $data = $this->converter->convertToArray($document, [], ['_id']);
335
336
        if (!isset($data['_id'])) {
337
            throw new \LogicException(
338
                'In order to use remove() method document class must have property with @Id annotation.'
339
            );
340
        }
341
342
        $type = $this->getMetadataCollector()->getDocumentType(get_class($document));
343
344
        $this->bulk('delete', $type, ['_id' => $data['_id']]);
345
    }
346
347
    /**
348
     * Flushes elasticsearch index.
349
     *
350
     * @param array $params
351
     *
352
     * @return array
353
     */
354
    public function flush(array $params = [])
355
    {
356
        return $this->client->indices()->flush(array_merge(['index' => $this->getIndexName()], $params));
357
    }
358
359
    /**
360
     * Refreshes elasticsearch index.
361
     *
362
     * @param array $params
363
     *
364
     * @return array
365
     */
366
    public function refresh(array $params = [])
367
    {
368
        return $this->client->indices()->refresh(array_merge(['index' => $this->getIndexName()], $params));
369
    }
370
371
    /**
372
     * Inserts the current query container to the index, used for bulk queries execution.
373
     *
374
     * @param array $params Parameters that will be passed to the flush or refresh queries.
375
     *
376
     * @return null|array
377
     */
378
    public function commit(array $params = [])
379
    {
380
        if (!empty($this->bulkQueries)) {
381
            $bulkQueries = array_merge($this->bulkQueries, $this->bulkParams);
382
383
            $bulkResponse = $this->client->bulk($bulkQueries);
384
            $this->bulkQueries = [];
385
            $this->bulkCount = 0;
386
387
            switch ($this->getCommitMode()) {
388
                case 'flush':
389
                    $this->flush($params);
390
                    break;
391
                case 'refresh':
392
                    $this->refresh($params);
393
                    break;
394
            }
395
396
            return $bulkResponse;
397
        }
398
399
        return null;
400
    }
401
402
    /**
403
     * Executes multi search queries
404
     *
405
     * @param string $resultsType
406
     *
407
     * @return null|array
408
     */
409
    public function msearch($resultsType = Result::RESULTS_OBJECT)
410
    {
411
        if (!empty($this->msearchQueries)) {
412
            $bulkQueries = array_merge($this->msearchQueries, $this->msearchParams);
413
414
            $msearchResponse = $this->client->msearch($bulkQueries);
415
            $this->msearchQueries = [];
416
            $this->msearchCount = 0;
417
418
            foreach ($msearchResponse['responses'] as $key => $result) {
419
                $msearchResponse[$key] = $this->parseResult(
420
                    $result,
421
                    $resultsType
422
                );
423
            }
424
            unset($msearchResponse['responses']);
425
426
            return $msearchResponse;
427
        }
428
429
        return null;
430
    }
431
432
    /**
433
     * Adds query to bulk queries container.
434
     *
435
     * @param string       $operation One of: index, update, delete, create.
436
     * @param string|array $type      Elasticsearch type name.
437
     * @param array        $query     DSL to execute.
438
     *
439
     * @throws \InvalidArgumentException
440
     *
441
     * @return null|array
442
     */
443
    public function bulk($operation, $type, array $query)
444
    {
445
        if (!in_array($operation, ['index', 'create', 'update', 'delete'])) {
446
            throw new \InvalidArgumentException('Wrong bulk operation selected');
447
        }
448
449
        $this->bulkQueries['body'][] = [
450
            $operation => array_filter(
451
                [
452
                    '_index' => $this->getIndexName(),
453
                    '_type' => $type,
454
                    '_id' => isset($query['_id']) ? $query['_id'] : null,
455
                    '_ttl' => isset($query['_ttl']) ? $query['_ttl'] : null,
456
                    '_parent' => isset($query['_parent']) ? $query['_parent'] : null,
457
                ]
458
            ),
459
        ];
460
        unset($query['_id'], $query['_ttl'], $query['_parent']);
461
462
        switch ($operation) {
463
            case 'index':
464
            case 'create':
465
            case 'update':
466
                $this->bulkQueries['body'][] = $query;
467
                break;
468
            case 'delete':
469
                // Body for delete operation is not needed to apply.
470
            default:
471
                // Do nothing.
472
                break;
473
        }
474
475
        // We are using counter because there is to difficult to resolve this from bulkQueries array.
476
        $this->bulkCount++;
477
478
        $response = null;
479
        if ($this->bulkCommitSize === $this->bulkCount) {
480
            $response = $this->commit();
481
        }
482
483
        return $response;
484
    }
485
486
    /**
487
     * Adds query to bulk queries container.
488
     *
489
     * @param Search   $search     DSL to execute.
490
     * @param array    $header    Search header.
491
     *
492
     * @return null|array
493
     */
494
    public function addSearch(Search $search, $header = [])
495
    {
496
        $header['index'] = $this->getIndexName();
497
        $search = $search->toArray();
0 ignored issues
show
Coding Style introduced by
Consider using a different name than the parameter $search. This often makes code more readable.
Loading history...
498
499
        $this->msearchQueries['body'][] = $header;
500
        $this->msearchQueries['body'][] = $search;
501
502
        $this->msearchCount++;
503
504
        $response = null;
505
        if ($this->msearchSize === $this->msearchCount) {
506
            $response = $this->msearch();
507
        }
508
509
        return $response;
510
    }
511
512
    /**
513
     * Optional setter to change bulk query params.
514
     *
515
     * @param array $params Possible keys:
516
     *                      ['consistency'] = (enum) Explicit write consistency setting for the operation.
517
     *                      ['refresh']     = (boolean) Refresh the index after performing the operation.
518
     *                      ['replication'] = (enum) Explicitly set the replication type.
519
     */
520
    public function setBulkParams(array $params)
521
    {
522
        $this->bulkParams = $params;
523
    }
524
525
    /**
526
     * Creates fresh elasticsearch index.
527
     *
528
     * @param bool $noMapping Determines if mapping should be included.
529
     *
530
     * @return array
531
     */
532
    public function createIndex($noMapping = false)
533
    {
534
        if ($noMapping) {
535
            unset($this->indexSettings['body']['mappings']);
536
        }
537
538
        return $this->getClient()->indices()->create($this->indexSettings);
539
    }
540
541
    /**
542
     * Drops elasticsearch index.
543
     */
544
    public function dropIndex()
545
    {
546
        return $this->getClient()->indices()->delete(['index' => $this->getIndexName()]);
547
    }
548
549
    /**
550
     * Tries to drop and create fresh elasticsearch index.
551
     *
552
     * @param bool $noMapping Determines if mapping should be included.
553
     *
554
     * @return array
555
     */
556
    public function dropAndCreateIndex($noMapping = false)
557
    {
558
        try {
559
            $this->dropIndex();
560
        } catch (\Exception $e) {
561
            // Do nothing, our target is to create new index.
562
        }
563
564
        return $this->createIndex($noMapping);
565
    }
566
567
    /**
568
     * Checks if connection index is already created.
569
     *
570
     * @return bool
571
     */
572
    public function indexExists()
573
    {
574
        return $this->getClient()->indices()->exists(['index' => $this->getIndexName()]);
575
    }
576
577
    /**
578
     * Returns index name this connection is attached to.
579
     *
580
     * @return string
581
     */
582
    public function getIndexName()
583
    {
584
        return $this->indexSettings['index'];
585
    }
586
587
    /**
588
     * Sets index name for this connection.
589
     *
590
     * @param string $name
591
     */
592
    public function setIndexName($name)
593
    {
594
        $this->indexSettings['index'] = $name;
595
    }
596
597
    /**
598
     * Returns Elasticsearch version number.
599
     *
600
     * @return string
601
     */
602
    public function getVersionNumber()
603
    {
604
        return $this->client->info()['version']['number'];
605
    }
606
607
    /**
608
     * Clears elasticsearch client cache.
609
     */
610
    public function clearCache()
611
    {
612
        $this->getClient()->indices()->clearCache(['index' => $this->getIndexName()]);
613
    }
614
615
    /**
616
     * Returns a single document by ID. Returns NULL if document was not found.
617
     *
618
     * @param string $className Document class name or Elasticsearch type name
619
     * @param string $id        Document ID to find
620
     *
621
     * @return object
622
     */
623
    public function find($className, $id)
624
    {
625
        $type = $this->resolveTypeName($className);
626
627
        $params = [
628
            'index' => $this->getIndexName(),
629
            'type' => $type,
630
            'id' => $id,
631
        ];
632
633
        try {
634
            $result = $this->getClient()->get($params);
635
        } catch (Missing404Exception $e) {
636
            return null;
637
        }
638
639
        return $this->getConverter()->convertToDocument($result, $this);
0 ignored issues
show
Documentation introduced by
$result is of type callable, but the function expects a array.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
640
    }
641
642
    /**
643
     * Executes given search.
644
     *
645
     * @param array  $types
646
     * @param Search $search
647
     * @param string $resultsType
648
     *
649
     * @return DocumentIterator|RawIterator|array
650
     */
651
    public function execute($types, Search $search, $resultsType = Result::RESULTS_OBJECT)
652
    {
653
        foreach ($types as &$type) {
654
            $type = $this->resolveTypeName($type);
655
        }
656
657
        $results = $this->search($types, $search->toArray(), $search->getQueryParams());
658
659
        return $this->parseResult($results, $resultsType, $search->getScroll());
0 ignored issues
show
Documentation introduced by
$results is of type callable, but the function expects a array.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
660
    }
661
662
    /**
663
     * Parses raw result.
664
     *
665
     * @param array  $raw
666
     * @param string $resultsType
667
     * @param string $scrollDuration
668
     *
669
     * @return DocumentIterator|RawIterator|array
670
     *
671
     * @throws \Exception
672
     */
673
    private function parseResult($raw, $resultsType, $scrollDuration = null)
674
    {
675
        $scrollConfig = [];
676
        if (isset($raw['_scroll_id'])) {
677
            $scrollConfig['_scroll_id'] = $raw['_scroll_id'];
678
            $scrollConfig['duration'] = $scrollDuration;
679
        }
680
681
        switch ($resultsType) {
682
            case Result::RESULTS_OBJECT:
683
                return new DocumentIterator($raw, $this, $scrollConfig);
684
            case Result::RESULTS_ARRAY:
685
                return $this->convertToNormalizedArray($raw);
686
            case Result::RESULTS_RAW:
687
                return $raw;
688
            case Result::RESULTS_RAW_ITERATOR:
689
                return new RawIterator($raw, $this, $scrollConfig);
690
            default:
691
                throw new \Exception('Wrong results type selected');
692
        }
693
    }
694
695
    /**
696
     * Normalizes response array.
697
     *
698
     * @param array $data
699
     *
700
     * @return array
701
     */
702
    private function convertToNormalizedArray($data)
703
    {
704
        if (array_key_exists('_source', $data)) {
705
            return $data['_source'];
706
        }
707
708
        $output = [];
709
710
        if (isset($data['hits']['hits'][0]['_source'])) {
711
            foreach ($data['hits']['hits'] as $item) {
712
                $output[] = $item['_source'];
713
            }
714
        } elseif (isset($data['hits']['hits'][0]['fields'])) {
715
            foreach ($data['hits']['hits'] as $item) {
716
                $output[] = array_map('reset', $item['fields']);
717
            }
718
        }
719
720
        return $output;
721
    }
722
723
    /**
724
     * Fetches next set of results.
725
     *
726
     * @param string $scrollId
727
     * @param string $scrollDuration
728
     * @param string $resultsType
729
     *
730
     * @return AbstractResultsIterator
731
     *
732
     * @throws \Exception
733
     */
734
    public function scroll(
735
        $scrollId,
736
        $scrollDuration = '5m',
737
        $resultsType = Result::RESULTS_OBJECT
738
    ) {
739
        $results = $this->getClient()->scroll(['scroll_id' => $scrollId, 'scroll' => $scrollDuration]);
740
741
        return $this->parseResult($results, $resultsType, $scrollDuration);
0 ignored issues
show
Documentation introduced by
$results is of type callable, but the function expects a array.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
742
    }
743
744
    /**
745
     * Clears scroll.
746
     *
747
     * @param string $scrollId
748
     */
749
    public function clearScroll($scrollId)
750
    {
751
        $this->getClient()->clearScroll(['scroll_id' => $scrollId]);
752
    }
753
754
    /**
755
     * Resolves type name by class name.
756
     *
757
     * @param string $className
758
     *
759
     * @return string
760
     */
761
    private function resolveTypeName($className)
762
    {
763
        if (strpos($className, ':') !== false || strpos($className, '\\') !== false) {
764
            return $this->getMetadataCollector()->getDocumentType($className);
765
        }
766
767
        return $className;
768
    }
769
}
770