Completed
Push — master ( 09a07f...8de6a8 )
by Simonas
01:54
created

Service/Manager.php (6 issues)

Upgrade to new PHP Analysis Engine

These results are based on our legacy PHP analysis, consider migrating to our new PHP analysis engine instead. Learn more

1
<?php
2
3
/*
4
 * This file is part of the ONGR package.
5
 *
6
 * (c) NFQ Technologies UAB <[email protected]>
7
 *
8
 * For the full copyright and license information, please view the LICENSE
9
 * file that was distributed with this source code.
10
 */
11
12
namespace ONGR\ElasticsearchBundle\Service;
13
14
use Elasticsearch\Client;
15
use Elasticsearch\Common\Exceptions\Missing404Exception;
16
use ONGR\ElasticsearchBundle\Event\Events;
17
use ONGR\ElasticsearchBundle\Event\BulkEvent;
18
use ONGR\ElasticsearchBundle\Event\CommitEvent;
19
use ONGR\ElasticsearchBundle\Exception\BulkWithErrorsException;
20
use ONGR\ElasticsearchBundle\Mapping\MetadataCollector;
21
use ONGR\ElasticsearchBundle\Result\AbstractResultsIterator;
22
use ONGR\ElasticsearchBundle\Result\Converter;
23
use ONGR\ElasticsearchBundle\Result\DocumentIterator;
24
use ONGR\ElasticsearchBundle\Result\RawIterator;
25
use ONGR\ElasticsearchBundle\Result\Result;
26
use ONGR\ElasticsearchDSL\Search;
27
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
28
use Symfony\Component\Stopwatch\Stopwatch;
29
30
/**
31
 * Manager class.
32
 */
33
class Manager
34
{
35
    /**
36
     * @var string Manager name
37
     */
38
    private $name;
39
40
    /**
41
     * @var array Manager configuration
42
     */
43
    private $config = [];
44
45
    /**
46
     * @var Client
47
     */
48
    private $client;
49
50
    /**
51
     * @var Converter
52
     */
53
    private $converter;
54
55
    /**
56
     * @var array Container for bulk queries
57
     */
58
    private $bulkQueries = [];
59
60
    /**
61
     * @var array Holder for consistency, refresh and replication parameters
62
     */
63
    private $bulkParams = [];
64
65
    /**
66
     * @var array
67
     */
68
    private $indexSettings;
69
70
    /**
71
     * @var MetadataCollector
72
     */
73
    private $metadataCollector;
74
75
    /**
76
     * After commit to make data available the refresh or flush operation is needed
77
     * so one of those methods has to be defined, the default is refresh.
78
     *
79
     * @var string
80
     */
81
    private $commitMode = 'refresh';
82
83
    /**
84
     * The size that defines after how much document inserts call commit function.
85
     *
86
     * @var int
87
     */
88
    private $bulkCommitSize = 100;
89
90
    /**
91
     * Container to count how many documents was passed to the bulk query.
92
     *
93
     * @var int
94
     */
95
    private $bulkCount = 0;
96
97
    /**
98
     * @var Repository[] Repository local cache
99
     */
100
    private $repositories;
101
102
    /**
103
     * @var EventDispatcherInterface
104
     */
105
    private $eventDispatcher;
106
107
    /**
108
     * @var Stopwatch
109
     */
110
    private $stopwatch;
111
112
    /**
113
     * @param string            $name              Manager name
114
     * @param array             $config            Manager configuration
115
     * @param Client            $client
116
     * @param array             $indexSettings
117
     * @param MetadataCollector $metadataCollector
118
     * @param Converter         $converter
119
     */
120
    public function __construct(
121
        $name,
122
        array $config,
123
        $client,
124
        array $indexSettings,
125
        $metadataCollector,
126
        $converter
127
    ) {
128
        $this->name = $name;
129
        $this->config = $config;
130
        $this->client = $client;
131
        $this->indexSettings = $indexSettings;
132
        $this->metadataCollector = $metadataCollector;
133
        $this->converter = $converter;
134
    }
135
136
    /**
137
     * Returns Elasticsearch connection.
138
     *
139
     * @return Client
140
     */
141
    public function getClient()
142
    {
143
        return $this->client;
144
    }
145
146
    /**
147
     * @return string
148
     */
149
    public function getName()
150
    {
151
        return $this->name;
152
    }
153
154
    /**
155
     * @return array
156
     */
157
    public function getConfig()
158
    {
159
        return $this->config;
160
    }
161
162
    /**
163
     * @param EventDispatcherInterface $eventDispatcher
164
     */
165
    public function setEventDispatcher(EventDispatcherInterface $eventDispatcher)
166
    {
167
        $this->eventDispatcher = $eventDispatcher;
168
    }
169
170
    /**
171
     * @param Stopwatch $stopwatch
172
     */
173
    public function setStopwatch(Stopwatch $stopwatch)
174
    {
175
        $this->stopwatch = $stopwatch;
176
    }
177
178
    /**
179
     * Returns repository by document class.
180
     *
181
     * @param string $className FQCN or string in Bundle:Document format
182
     *
183
     * @return Repository
184
     */
185
    public function getRepository($className)
186
    {
187
        if (!is_string($className)) {
188
            throw new \InvalidArgumentException('Document class must be a string.');
189
        }
190
191
        $namespace = $this->getMetadataCollector()->getClassName($className);
192
193
        if (isset($this->repositories[$namespace])) {
194
            return $this->repositories[$namespace];
195
        }
196
197
        $repository = $this->createRepository($namespace);
198
        $this->repositories[$namespace] = $repository;
199
200
        return $repository;
201
    }
202
203
    /**
204
     * @return MetadataCollector
205
     */
206
    public function getMetadataCollector()
207
    {
208
        return $this->metadataCollector;
209
    }
210
211
    /**
212
     * @return Converter
213
     */
214
    public function getConverter()
215
    {
216
        return $this->converter;
217
    }
218
219
    /**
220
     * @return string
221
     */
222
    public function getCommitMode()
223
    {
224
        return $this->commitMode;
225
    }
226
227
    /**
228
     * @param string $commitMode
229
     */
230
    public function setCommitMode($commitMode)
231
    {
232
        if ($commitMode === 'refresh' || $commitMode === 'flush' || $commitMode === 'none') {
233
            $this->commitMode = $commitMode;
234
        } else {
235
            throw new \LogicException('The commit method must be either refresh, flush or none.');
236
        }
237
    }
238
239
    /**
240
     * @return int
241
     */
242
    public function getBulkCommitSize()
243
    {
244
        return $this->bulkCommitSize;
245
    }
246
247
    /**
248
     * @param int $bulkCommitSize
249
     */
250
    public function setBulkCommitSize($bulkCommitSize)
251
    {
252
        $this->bulkCommitSize = $bulkCommitSize;
253
    }
254
255
    /**
256
     * Creates a repository.
257
     *
258
     * @param string $className
259
     *
260
     * @return Repository
261
     */
262
    private function createRepository($className)
263
    {
264
        return new Repository($this, $className);
265
    }
266
267
    /**
268
     * Executes search query in the index.
269
     *
270
     * @param array $types             List of types to search in.
271
     * @param array $query             Query to execute.
272
     * @param array $queryStringParams Query parameters.
273
     *
274
     * @return array
275
     */
276
    public function search(array $types, array $query, array $queryStringParams = [])
277
    {
278
        $params = [];
279
        $params['index'] = $this->getIndexName();
280
        
281
        if (!empty($types)) {
282
            $params['type'] = implode(',', $types);
283
        }
284
        
285
        $params['body'] = $query;
286
287
        if (!empty($queryStringParams)) {
288
            $params = array_merge($queryStringParams, $params);
289
        }
290
291
        $this->stopwatch('start', 'search');
292
        $result = $this->client->search($params);
293
        $this->stopwatch('stop', 'search');
294
295
        return $result;
296
    }
297
298
    /**
299
     * Adds document to next flush.
300
     *
301
     * @param object $document
302
     */
303
    public function persist($document)
304
    {
305
        $documentArray = $this->converter->convertToArray($document);
306
        $type = $this->getMetadataCollector()->getDocumentType(get_class($document));
307
308
        $this->bulk('index', $type, $documentArray);
309
    }
310
311
    /**
312
     * Adds document for removal.
313
     *
314
     * @param object $document
315
     */
316
    public function remove($document)
317
    {
318
        $data = $this->converter->convertToArray($document, [], ['_id', '_routing']);
319
320
        if (!isset($data['_id'])) {
321
            throw new \LogicException(
322
                'In order to use remove() method document class must have property with @Id annotation.'
323
            );
324
        }
325
326
        $type = $this->getMetadataCollector()->getDocumentType(get_class($document));
327
328
        $this->bulk('delete', $type, $data);
329
    }
330
331
    /**
332
     * Flushes elasticsearch index.
333
     *
334
     * @param array $params
335
     *
336
     * @return array
337
     */
338
    public function flush(array $params = [])
339
    {
340
        return $this->client->indices()->flush(array_merge(['index' => $this->getIndexName()], $params));
341
    }
342
343
    /**
344
     * Refreshes elasticsearch index.
345
     *
346
     * @param array $params
347
     *
348
     * @return array
349
     */
350
    public function refresh(array $params = [])
351
    {
352
        return $this->client->indices()->refresh(array_merge(['index' => $this->getIndexName()], $params));
353
    }
354
355
    /**
356
     * Inserts the current query container to the index, used for bulk queries execution.
357
     *
358
     * @param array $params Parameters that will be passed to the flush or refresh queries.
359
     *
360
     * @return null|array
361
     *
362
     * @throws BulkWithErrorsException
363
     */
364
    public function commit(array $params = [])
365
    {
366
        if (!empty($this->bulkQueries)) {
367
            $bulkQueries = array_merge($this->bulkQueries, $this->bulkParams);
368
            $bulkQueries['index']['_index'] = $this->getIndexName();
369
            $this->eventDispatcher->dispatch(
370
                Events::PRE_COMMIT,
371
                new CommitEvent($this->getCommitMode(), $bulkQueries)
372
            );
373
374
            $this->stopwatch('start', 'bulk');
375
            $bulkResponse = $this->client->bulk($bulkQueries);
376
            $this->stopwatch('stop', 'bulk');
377
378
            if ($bulkResponse['errors']) {
379
                throw new BulkWithErrorsException(
380
                    json_encode($bulkResponse),
381
                    0,
382
                    null,
383
                    $bulkResponse
384
                );
385
            }
386
387
            $this->bulkQueries = [];
388
            $this->bulkCount = 0;
389
390
            $this->stopwatch('start', 'refresh');
391
392
            switch ($this->getCommitMode()) {
393
                case 'flush':
394
                    $this->flush($params);
395
                    break;
396
                case 'refresh':
397
                    $this->refresh($params);
398
                    break;
399
            }
400
401
            $this->eventDispatcher->dispatch(
402
                Events::POST_COMMIT,
403
                new CommitEvent($this->getCommitMode(), $bulkResponse)
404
            );
405
406
            $this->stopwatch('stop', 'refresh');
407
408
            return $bulkResponse;
409
        }
410
411
        return null;
412
    }
413
414
    /**
415
     * Adds query to bulk queries container.
416
     *
417
     * @param string       $operation One of: index, update, delete, create.
418
     * @param string|array $type      Elasticsearch type name.
419
     * @param array        $query     DSL to execute.
420
     *
421
     * @throws \InvalidArgumentException
422
     *
423
     * @return null|array
424
     */
425
    public function bulk($operation, $type, array $query)
426
    {
427
        if (!in_array($operation, ['index', 'create', 'update', 'delete'])) {
428
            throw new \InvalidArgumentException('Wrong bulk operation selected');
429
        }
430
431
        $this->eventDispatcher->dispatch(
432
            Events::BULK,
433
            new BulkEvent($operation, $type, $query)
434
        );
435
436
        $this->bulkQueries['body'][] = [
437
            $operation => array_filter(
438
                [
439
                    '_type' => $type,
440
                    '_id' => isset($query['_id']) ? $query['_id'] : null,
441
                    '_ttl' => isset($query['_ttl']) ? $query['_ttl'] : null,
442
                    '_routing' => isset($query['_routing']) ? $query['_routing'] : null,
443
                    '_parent' => isset($query['_parent']) ? $query['_parent'] : null,
444
                ]
445
            ),
446
        ];
447
        unset($query['_id'], $query['_ttl'], $query['_parent'], $query['_routing']);
448
449
        switch ($operation) {
450
            case 'index':
451
            case 'create':
452
            case 'update':
453
                $this->bulkQueries['body'][] = $query;
454
                break;
455
            case 'delete':
456
                // Body for delete operation is not needed to apply.
457
            default:
458
                // Do nothing.
459
                break;
460
        }
461
462
        // We are using counter because there is to difficult to resolve this from bulkQueries array.
463
        $this->bulkCount++;
464
465
        $response = null;
466
467
        if ($this->bulkCommitSize === $this->bulkCount) {
468
            $response = $this->commit();
469
        }
470
471
        return $response;
472
    }
473
474
    /**
475
     * Optional setter to change bulk query params.
476
     *
477
     * @param array $params Possible keys:
478
     *                      ['consistency'] = (enum) Explicit write consistency setting for the operation.
479
     *                      ['refresh']     = (boolean) Refresh the index after performing the operation.
480
     *                      ['replication'] = (enum) Explicitly set the replication type.
481
     */
482
    public function setBulkParams(array $params)
483
    {
484
        $this->bulkParams = $params;
485
    }
486
487
    /**
488
     * Creates fresh elasticsearch index.
489
     *
490
     * @param bool $noMapping Determines if mapping should be included.
491
     *
492
     * @return array
493
     */
494
    public function createIndex($noMapping = false)
495
    {
496
        if ($noMapping) {
497
            unset($this->indexSettings['body']['mappings']);
498
        }
499
500
        return $this->getClient()->indices()->create($this->indexSettings);
501
    }
502
503
    /**
504
     * Drops elasticsearch index.
505
     */
506
    public function dropIndex()
507
    {
508
        return $this->getClient()->indices()->delete(['index' => $this->getIndexName()]);
509
    }
510
511
    /**
512
     * Tries to drop and create fresh elasticsearch index.
513
     *
514
     * @param bool $noMapping Determines if mapping should be included.
515
     *
516
     * @return array
517
     */
518
    public function dropAndCreateIndex($noMapping = false)
519
    {
520
        try {
521
            $this->dropIndex();
522
        } catch (\Exception $e) {
523
            // Do nothing, our target is to create new index.
524
        }
525
526
        return $this->createIndex($noMapping);
527
    }
528
529
    /**
530
     * Checks if connection index is already created.
531
     *
532
     * @return bool
533
     */
534
    public function indexExists()
535
    {
536
        return $this->getClient()->indices()->exists(['index' => $this->getIndexName()]);
537
    }
538
539
    /**
540
     * Returns index name this connection is attached to.
541
     *
542
     * @return string
543
     */
544
    public function getIndexName()
545
    {
546
        return $this->indexSettings['index'];
547
    }
548
549
    /**
550
     * Sets index name for this connection.
551
     *
552
     * @param string $name
553
     */
554
    public function setIndexName($name)
555
    {
556
        $this->indexSettings['index'] = $name;
557
    }
558
559
    /**
560
     * Returns mappings of the index for this connection.
561
     *
562
     * @return array
563
     */
564
    public function getIndexMappings()
565
    {
566
        return $this->indexSettings['body']['mappings'];
567
    }
568
569
    /**
570
     * Returns Elasticsearch version number.
571
     *
572
     * @return string
573
     */
574
    public function getVersionNumber()
575
    {
576
        return $this->client->info()['version']['number'];
577
    }
578
579
    /**
580
     * Clears elasticsearch client cache.
581
     */
582
    public function clearCache()
583
    {
584
        $this->getClient()->indices()->clearCache(['index' => $this->getIndexName()]);
585
    }
586
587
    /**
588
     * Returns a single document by ID. Returns NULL if document was not found.
589
     *
590
     * @param string $className Document class name or Elasticsearch type name
591
     * @param string $id        Document ID to find
592
     * @param string $routing   Custom routing for the document
593
     *
594
     * @return object
595
     */
596
    public function find($className, $id, $routing = null)
597
    {
598
        $type = $this->resolveTypeName($className);
599
600
        $params = [
601
            'index' => $this->getIndexName(),
602
            'type' => $type,
603
            'id' => $id,
604
        ];
605
606
        if ($routing) {
607
            $params['routing'] = $routing;
608
        }
609
610
        try {
611
            $result = $this->getClient()->get($params);
612
        } catch (Missing404Exception $e) {
613
            return null;
614
        }
615
616
        return $this->getConverter()->convertToDocument($result, $this);
617
    }
618
619
    /**
620
     * Executes given search.
621
     *
622
     * @deprecated use strict return type functions from Repository instead.
623
     *
624
     * @param array  $types
625
     * @param Search $search
626
     * @param string $resultsType
627
     *
628
     * @return DocumentIterator|RawIterator|array
629
     */
630
    public function execute($types, Search $search, $resultsType = Result::RESULTS_OBJECT)
631
    {
632
        foreach ($types as &$type) {
633
            $type = $this->resolveTypeName($type);
634
        }
635
636
        $results = $this->search($types, $search->toArray(), $search->getQueryParams());
637
638
        return $this->parseResult($results, $resultsType, $search->getScroll());
0 ignored issues
show
$results is of type callable, but the function expects a array.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
Deprecated Code introduced by
The method ONGR\ElasticsearchBundle...\Manager::parseResult() has been deprecated with message: use strict return type functions from Repository class instead.

This method has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the method will be removed from the class and what other method or class to use instead.

Loading history...
639
    }
640
641
    /**
642
     * Parses raw result.
643
     *
644
     * @deprecated use strict return type functions from Repository class instead.
645
     *
646
     * @param array  $raw
647
     * @param string $resultsType
648
     * @param string $scrollDuration
649
     *
650
     * @return DocumentIterator|RawIterator|array
651
     *
652
     * @throws \Exception
653
     */
654
    private function parseResult($raw, $resultsType, $scrollDuration = null)
655
    {
656
        $scrollConfig = [];
657 View Code Duplication
        if (isset($raw['_scroll_id'])) {
0 ignored issues
show
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
658
            $scrollConfig['_scroll_id'] = $raw['_scroll_id'];
659
            $scrollConfig['duration'] = $scrollDuration;
660
        }
661
662
        switch ($resultsType) {
663
            case Result::RESULTS_OBJECT:
664
                return new DocumentIterator($raw, $this, $scrollConfig);
665
            case Result::RESULTS_ARRAY:
666
                return $this->convertToNormalizedArray($raw);
0 ignored issues
show
Deprecated Code introduced by
The method ONGR\ElasticsearchBundle...vertToNormalizedArray() has been deprecated with message: Use ArrayIterator from Result namespace instead.

This method has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the method will be removed from the class and what other method or class to use instead.

Loading history...
667
            case Result::RESULTS_RAW:
668
                return $raw;
669
            case Result::RESULTS_RAW_ITERATOR:
670
                return new RawIterator($raw, $this, $scrollConfig);
671
            default:
672
                throw new \Exception('Wrong results type selected');
673
        }
674
    }
675
676
    /**
677
     * Normalizes response array.
678
     *
679
     * @deprecated Use ArrayIterator from Result namespace instead.
680
     *
681
     * @param array $data
682
     *
683
     * @return array
684
     */
685
    private function convertToNormalizedArray($data)
686
    {
687
        if (array_key_exists('_source', $data)) {
688
            $data['_source']['_id'] = $data['_id'];
689
            return $data['_source'];
690
        }
691
692
        $output = [];
693
694
        if (isset($data['hits']['hits'][0]['_source'])) {
695
            foreach ($data['hits']['hits'] as $item) {
696
                $item['_source']['_id'] = $item['_id'];
697
                $output[] = $item['_source'];
698
            }
699
        } elseif (isset($data['hits']['hits'][0]['fields'])) {
700
            foreach ($data['hits']['hits'] as $item) {
701
                $output[] = array_map('reset', $item['fields']);
702
            }
703
        }
704
705
        return $output;
706
    }
707
708
    /**
709
     * Fetches next set of results.
710
     *
711
     * @param string $scrollId
712
     * @param string $scrollDuration
713
     * @param string $resultsType
714
     *
715
     * @return mixed
716
     *
717
     * @throws \Exception
718
     */
719
    public function scroll(
720
        $scrollId,
721
        $scrollDuration = '5m',
722
        $resultsType = Result::RESULTS_OBJECT
723
    ) {
724
        $results = $this->getClient()->scroll(['scroll_id' => $scrollId, 'scroll' => $scrollDuration]);
725
726
        if ($resultsType == Result::RESULTS_RAW) {
727
            return $results;
728
        } else {
729
            trigger_error(
730
                '$resultsType parameter was deprecated in scroll() fucntion. ' .
731
                'Use strict type findXXX functions from repository instead. Will be removed in 2.0',
732
                E_USER_DEPRECATED
733
            );
734
            return $this->parseResult($results, $resultsType, $scrollDuration);
0 ignored issues
show
$results is of type callable, but the function expects a array.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
Deprecated Code introduced by
The method ONGR\ElasticsearchBundle...\Manager::parseResult() has been deprecated with message: use strict return type functions from Repository class instead.

This method has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the method will be removed from the class and what other method or class to use instead.

Loading history...
735
        }
736
    }
737
738
    /**
739
     * Clears scroll.
740
     *
741
     * @param string $scrollId
742
     */
743
    public function clearScroll($scrollId)
744
    {
745
        $this->getClient()->clearScroll(['scroll_id' => $scrollId]);
746
    }
747
748
    /**
749
     * Resolves type name by class name.
750
     *
751
     * @param string $className
752
     *
753
     * @return string
754
     */
755
    private function resolveTypeName($className)
756
    {
757
        if (strpos($className, ':') !== false || strpos($className, '\\') !== false) {
758
            return $this->getMetadataCollector()->getDocumentType($className);
759
        }
760
761
        return $className;
762
    }
763
764
    /**
765
     * Starts and stops an event in the stopwatch
766
     *
767
     * @param string $action   only 'start' and 'stop'
768
     * @param string $name     name of the event
769
     */
770
    private function stopwatch($action, $name)
771
    {
772
        if (isset($this->stopwatch)) {
773
            $this->stopwatch->$action('ongr_es: '.$name, 'ongr_es');
774
        }
775
    }
776
}
777