Completed
Pull Request — 1.1 (#636)
by
unknown
02:42
created

Manager::search()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 17
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 1
Metric Value
c 1
b 0
f 1
dl 0
loc 17
rs 9.4285
cc 2
eloc 11
nc 2
nop 3
1
<?php
2
3
/*
4
 * This file is part of the ONGR package.
5
 *
6
 * (c) NFQ Technologies UAB <[email protected]>
7
 *
8
 * For the full copyright and license information, please view the LICENSE
9
 * file that was distributed with this source code.
10
 */
11
12
namespace ONGR\ElasticsearchBundle\Service;
13
14
use Elasticsearch\Client;
15
use Elasticsearch\Common\Exceptions\Missing404Exception;
16
use ONGR\ElasticsearchBundle\Mapping\MetadataCollector;
17
use ONGR\ElasticsearchBundle\Result\AbstractResultsIterator;
18
use ONGR\ElasticsearchBundle\Result\Converter;
19
use ONGR\ElasticsearchBundle\Result\DocumentIterator;
20
use ONGR\ElasticsearchBundle\Result\RawIterator;
21
use ONGR\ElasticsearchBundle\Result\Result;
22
use ONGR\ElasticsearchDSL\Search;
23
use Symfony\Component\Stopwatch\Stopwatch;
24
25
/**
26
 * Manager class.
27
 */
28
class Manager
29
{
30
    /**
31
     * @var string Manager name
32
     */
33
    private $name;
34
35
    /**
36
     * @var array Manager configuration
37
     */
38
    private $config = [];
39
40
    /**
41
     * @var Client
42
     */
43
    private $client;
44
45
    /**
46
     * @var Converter
47
     */
48
    private $converter;
49
50
    /**
51
     * @var array Container for bulk queries
52
     */
53
    private $bulkQueries = [];
54
55
    /**
56
     * @var array Holder for consistency, refresh and replication parameters
57
     */
58
    private $bulkParams = [];
59
60
    /**
61
     * @var array
62
     */
63
    private $indexSettings;
64
65
    /**
66
     * @var MetadataCollector
67
     */
68
    private $metadataCollector;
69
70
    /**
71
     * After commit to make data available the refresh or flush operation is needed
72
     * so one of those methods has to be defined, the default is refresh.
73
     *
74
     * @var string
75
     */
76
    private $commitMode = 'refresh';
77
78
    /**
79
     * The size that defines after how much document inserts call commit function.
80
     *
81
     * @var int
82
     */
83
    private $bulkCommitSize = 100;
84
85
    /**
86
     * Container to count how many documents was passed to the bulk query.
87
     *
88
     * @var int
89
     */
90
    private $bulkCount = 0;
91
92
    /**
93
     * @var Repository[] Repository local cache
94
     */
95
    private $repositories;
96
97
    /**
98
     * @var Stopwatch
99
     */
100
    private $stopwatch;
101
102
    /**
103
     * @param string            $name              Manager name
104
     * @param array             $config            Manager configuration
105
     * @param Client            $client
106
     * @param array             $indexSettings
107
     * @param MetadataCollector $metadataCollector
108
     * @param Converter         $converter
109
     */
110
    public function __construct(
111
        $name,
112
        array $config,
113
        $client,
114
        array $indexSettings,
115
        $metadataCollector,
116
        $converter
117
    ) {
118
        $this->name = $name;
119
        $this->config = $config;
120
        $this->client = $client;
121
        $this->indexSettings = $indexSettings;
122
        $this->metadataCollector = $metadataCollector;
123
        $this->converter = $converter;
124
    }
125
126
    /**
127
     * Returns Elasticsearch connection.
128
     *
129
     * @return Client
130
     */
131
    public function getClient()
132
    {
133
        return $this->client;
134
    }
135
136
    /**
137
     * @return string
138
     */
139
    public function getName()
140
    {
141
        return $this->name;
142
    }
143
144
    /**
145
     * @return array
146
     */
147
    public function getConfig()
148
    {
149
        return $this->config;
150
    }
151
152
    /**
153
     * @param Stopwatch $stopwatch
154
     */
155
    public function setStopwatch(Stopwatch $stopwatch)
156
    {
157
        $this->stopwatch = $stopwatch;
158
    }
159
160
    /**
161
     * Returns repository by document class.
162
     *
163
     * @param string $className FQCN or string in Bundle:Document format
164
     *
165
     * @return Repository
166
     */
167
    public function getRepository($className)
168
    {
169
        if (!is_string($className)) {
170
            throw new \InvalidArgumentException('Document class must be a string.');
171
        }
172
173
        $namespace = $this->getMetadataCollector()->getClassName($className);
174
175
        if (isset($this->repositories[$namespace])) {
176
            return $this->repositories[$namespace];
177
        }
178
179
        $repository = $this->createRepository($namespace);
180
        $this->repositories[$namespace] = $repository;
181
182
        return $repository;
183
    }
184
185
    /**
186
     * @return MetadataCollector
187
     */
188
    public function getMetadataCollector()
189
    {
190
        return $this->metadataCollector;
191
    }
192
193
    /**
194
     * @return Converter
195
     */
196
    public function getConverter()
197
    {
198
        return $this->converter;
199
    }
200
201
    /**
202
     * @return string
203
     */
204
    public function getCommitMode()
205
    {
206
        return $this->commitMode;
207
    }
208
209
    /**
210
     * @param string $commitMode
211
     */
212
    public function setCommitMode($commitMode)
213
    {
214
        if ($commitMode === 'refresh' || $commitMode === 'flush' || $commitMode === 'none') {
215
            $this->commitMode = $commitMode;
216
        } else {
217
            throw new \LogicException('The commit method must be either refresh, flush or none.');
218
        }
219
    }
220
221
    /**
222
     * @return int
223
     */
224
    public function getBulkCommitSize()
225
    {
226
        return $this->bulkCommitSize;
227
    }
228
229
    /**
230
     * @param int $bulkCommitSize
231
     */
232
    public function setBulkCommitSize($bulkCommitSize)
233
    {
234
        $this->bulkCommitSize = $bulkCommitSize;
235
    }
236
237
    /**
238
     * Creates a repository.
239
     *
240
     * @param string $className
241
     *
242
     * @return Repository
243
     */
244
    private function createRepository($className)
245
    {
246
        return new Repository($this, $className);
247
    }
248
249
    /**
250
     * Executes search query in the index.
251
     *
252
     * @param array $types             List of types to search in.
253
     * @param array $query             Query to execute.
254
     * @param array $queryStringParams Query parameters.
255
     *
256
     * @return array
257
     */
258
    public function search(array $types, array $query, array $queryStringParams = [])
259
    {
260
        $params = [];
261
        $params['index'] = $this->getIndexName();
262
        $params['type'] = implode(',', $types);
263
        $params['body'] = $query;
264
265
        if (!empty($queryStringParams)) {
266
            $params = array_merge($queryStringParams, $params);
267
        }
268
269
        $this->stopwatch('start', 'search');
270
        $result = $this->client->search($params);
271
        $this->stopwatch('stop', 'search');
272
273
        return $result;
274
    }
275
276
    /**
277
     * Adds document to next flush.
278
     *
279
     * @param object $document
280
     */
281
    public function persist($document)
282
    {
283
        $documentArray = $this->converter->convertToArray($document);
284
        $type = $this->getMetadataCollector()->getDocumentType(get_class($document));
285
286
        $this->bulk('index', $type, $documentArray);
287
    }
288
289
    /**
290
     * Adds document for removal.
291
     *
292
     * @param object $document
293
     */
294
    public function remove($document)
295
    {
296
        $data = $this->converter->convertToArray($document, [], ['_id']);
297
298
        if (!isset($data['_id'])) {
299
            throw new \LogicException(
300
                'In order to use remove() method document class must have property with @Id annotation.'
301
            );
302
        }
303
304
        $type = $this->getMetadataCollector()->getDocumentType(get_class($document));
305
306
        $this->bulk('delete', $type, ['_id' => $data['_id']]);
307
    }
308
309
    /**
310
     * Flushes elasticsearch index.
311
     *
312
     * @param array $params
313
     *
314
     * @return array
315
     */
316
    public function flush(array $params = [])
317
    {
318
        return $this->client->indices()->flush($params);
319
    }
320
321
    /**
322
     * Refreshes elasticsearch index.
323
     *
324
     * @param array $params
325
     *
326
     * @return array
327
     */
328
    public function refresh(array $params = [])
329
    {
330
        return $this->client->indices()->refresh($params);
331
    }
332
333
    /**
334
     * Inserts the current query container to the index, used for bulk queries execution.
335
     *
336
     * @param array $params Parameters that will be passed to the flush or refresh queries.
337
     *
338
     * @return null|array
339
     */
340
    public function commit(array $params = [])
341
    {
342
        if (!empty($this->bulkQueries)) {
343
            $bulkQueries = array_merge($this->bulkQueries, $this->bulkParams);
344
345
            $this->stopwatch('start', 'bulk');
346
            $bulkResponse = $this->client->bulk($bulkQueries);
347
            $this->stopwatch('stop', 'bulk');
348
349
            $this->bulkQueries = [];
350
            $this->bulkCount = 0;
351
352
            $this->stopwatch('start', 'refresh');
353
354
            switch ($this->getCommitMode()) {
355
                case 'flush':
356
                    $this->flush($params);
357
                    break;
358
                case 'refresh':
359
                    $this->refresh($params);
360
                    break;
361
            }
362
363
            $this->stopwatch('stop', 'refresh');
364
365
            return $bulkResponse;
366
        }
367
368
        return null;
369
    }
370
371
    /**
372
     * Adds query to bulk queries container.
373
     *
374
     * @param string       $operation One of: index, update, delete, create.
375
     * @param string|array $type      Elasticsearch type name.
376
     * @param array        $query     DSL to execute.
377
     *
378
     * @throws \InvalidArgumentException
379
     *
380
     * @return null|array
381
     */
382
    public function bulk($operation, $type, array $query)
383
    {
384
        if (!in_array($operation, ['index', 'create', 'update', 'delete'])) {
385
            throw new \InvalidArgumentException('Wrong bulk operation selected');
386
        }
387
388
        $this->bulkQueries['body'][] = [
389
            $operation => array_filter(
390
                [
391
                    '_index' => $this->getIndexName(),
392
                    '_type' => $type,
393
                    '_id' => isset($query['_id']) ? $query['_id'] : null,
394
                    '_ttl' => isset($query['_ttl']) ? $query['_ttl'] : null,
395
                    '_parent' => isset($query['_parent']) ? $query['_parent'] : null,
396
                ]
397
            ),
398
        ];
399
        unset($query['_id'], $query['_ttl'], $query['_parent']);
400
401
        switch ($operation) {
402
            case 'index':
403
            case 'create':
404
            case 'update':
405
                $this->bulkQueries['body'][] = $query;
406
                break;
407
            case 'delete':
408
                // Body for delete operation is not needed to apply.
409
            default:
410
                // Do nothing.
411
                break;
412
        }
413
414
        // We are using counter because there is to difficult to resolve this from bulkQueries array.
415
        $this->bulkCount++;
416
417
        $response = null;
418
        if ($this->bulkCommitSize === $this->bulkCount) {
419
            $response = $this->commit();
420
        }
421
        return $response;
422
    }
423
424
    /**
425
     * Optional setter to change bulk query params.
426
     *
427
     * @param array $params Possible keys:
428
     *                      ['consistency'] = (enum) Explicit write consistency setting for the operation.
429
     *                      ['refresh']     = (boolean) Refresh the index after performing the operation.
430
     *                      ['replication'] = (enum) Explicitly set the replication type.
431
     */
432
    public function setBulkParams(array $params)
433
    {
434
        $this->bulkParams = $params;
435
    }
436
437
    /**
438
     * Creates fresh elasticsearch index.
439
     *
440
     * @param bool $noMapping Determines if mapping should be included.
441
     *
442
     * @return array
443
     */
444
    public function createIndex($noMapping = false)
445
    {
446
        if ($noMapping) {
447
            unset($this->indexSettings['body']['mappings']);
448
        }
449
450
        return $this->getClient()->indices()->create($this->indexSettings);
451
    }
452
453
    /**
454
     * Drops elasticsearch index.
455
     */
456
    public function dropIndex()
457
    {
458
        return $this->getClient()->indices()->delete(['index' => $this->getIndexName()]);
459
    }
460
461
    /**
462
     * Tries to drop and create fresh elasticsearch index.
463
     *
464
     * @param bool $noMapping Determines if mapping should be included.
465
     *
466
     * @return array
467
     */
468
    public function dropAndCreateIndex($noMapping = false)
469
    {
470
        try {
471
            $this->dropIndex();
472
        } catch (\Exception $e) {
473
            // Do nothing, our target is to create new index.
474
        }
475
476
        return $this->createIndex($noMapping);
477
    }
478
479
    /**
480
     * Checks if connection index is already created.
481
     *
482
     * @return bool
483
     */
484
    public function indexExists()
485
    {
486
        return $this->getClient()->indices()->exists(['index' => $this->getIndexName()]);
487
    }
488
489
    /**
490
     * Returns index name this connection is attached to.
491
     *
492
     * @return string
493
     */
494
    public function getIndexName()
495
    {
496
        return $this->indexSettings['index'];
497
    }
498
499
    /**
500
     * Sets index name for this connection.
501
     *
502
     * @param string $name
503
     */
504
    public function setIndexName($name)
505
    {
506
        $this->indexSettings['index'] = $name;
507
    }
508
509
    /**
510
     * Returns Elasticsearch version number.
511
     *
512
     * @return string
513
     */
514
    public function getVersionNumber()
515
    {
516
        return $this->client->info()['version']['number'];
517
    }
518
519
    /**
520
     * Clears elasticsearch client cache.
521
     */
522
    public function clearCache()
523
    {
524
        $this->getClient()->indices()->clearCache(['index' => $this->getIndexName()]);
525
    }
526
527
    /**
528
     * Returns a single document by ID. Returns NULL if document was not found.
529
     *
530
     * @param string $className Document class name or Elasticsearch type name
531
     * @param string $id        Document ID to find
532
     *
533
     * @return object
534
     */
535
    public function find($className, $id)
536
    {
537
        $type = $this->resolveTypeName($className);
538
539
        $params = [
540
            'index' => $this->getIndexName(),
541
            'type' => $type,
542
            'id' => $id,
543
        ];
544
545
        try {
546
            $result = $this->getClient()->get($params);
547
        } catch (Missing404Exception $e) {
548
            return null;
549
        }
550
551
        return $this->getConverter()->convertToDocument($result, $this);
0 ignored issues
show
Documentation introduced by
$result is of type callable, but the function expects a array.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
552
    }
553
554
    /**
555
     * Executes given search.
556
     *
557
     * @param array  $types
558
     * @param Search $search
559
     * @param string $resultsType
560
     *
561
     * @return DocumentIterator|RawIterator|array
562
     */
563
    public function execute($types, Search $search, $resultsType = Result::RESULTS_OBJECT)
564
    {
565
        foreach ($types as &$type) {
566
            $type = $this->resolveTypeName($type);
567
        }
568
569
        $results = $this->search($types, $search->toArray(), $search->getQueryParams());
570
571
        return $this->parseResult($results, $resultsType, $search->getScroll());
0 ignored issues
show
Documentation introduced by
$results is of type callable, but the function expects a array.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
572
    }
573
574
    /**
575
     * Parses raw result.
576
     *
577
     * @param array  $raw
578
     * @param string $resultsType
579
     * @param string $scrollDuration
580
     *
581
     * @return DocumentIterator|RawIterator|array
582
     *
583
     * @throws \Exception
584
     */
585
    private function parseResult($raw, $resultsType, $scrollDuration = null)
586
    {
587
        $scrollConfig = [];
588
        if (isset($raw['_scroll_id'])) {
589
            $scrollConfig['_scroll_id'] = $raw['_scroll_id'];
590
            $scrollConfig['duration'] = $scrollDuration;
591
        }
592
593
        switch ($resultsType) {
594
            case Result::RESULTS_OBJECT:
595
                return new DocumentIterator($raw, $this, $scrollConfig);
596
            case Result::RESULTS_ARRAY:
597
                return $this->convertToNormalizedArray($raw);
598
            case Result::RESULTS_RAW:
599
                return $raw;
600
            case Result::RESULTS_RAW_ITERATOR:
601
                return new RawIterator($raw, $this, $scrollConfig);
602
            default:
603
                throw new \Exception('Wrong results type selected');
604
        }
605
    }
606
607
    /**
608
     * Normalizes response array.
609
     *
610
     * @param array $data
611
     *
612
     * @return array
613
     */
614
    private function convertToNormalizedArray($data)
615
    {
616
        if (array_key_exists('_source', $data)) {
617
            return $data['_source'];
618
        }
619
620
        $output = [];
621
622
        if (isset($data['hits']['hits'][0]['_source'])) {
623
            foreach ($data['hits']['hits'] as $item) {
624
                $output[] = $item['_source'];
625
            }
626
        } elseif (isset($data['hits']['hits'][0]['fields'])) {
627
            foreach ($data['hits']['hits'] as $item) {
628
                $output[] = array_map('reset', $item['fields']);
629
            }
630
        }
631
632
        return $output;
633
    }
634
635
    /**
636
     * Fetches next set of results.
637
     *
638
     * @param string $scrollId
639
     * @param string $scrollDuration
640
     * @param string $resultsType
641
     *
642
     * @return AbstractResultsIterator
643
     *
644
     * @throws \Exception
645
     */
646
    public function scroll(
647
        $scrollId,
648
        $scrollDuration = '5m',
649
        $resultsType = Result::RESULTS_OBJECT
650
    ) {
651
        $results = $this->getClient()->scroll(['scroll_id' => $scrollId, 'scroll' => $scrollDuration]);
652
653
        return $this->parseResult($results, $resultsType, $scrollDuration);
0 ignored issues
show
Documentation introduced by
$results is of type callable, but the function expects a array.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
654
    }
655
656
    /**
657
     * Clears scroll.
658
     *
659
     * @param string $scrollId
660
     */
661
    public function clearScroll($scrollId)
662
    {
663
        $this->getClient()->clearScroll(['scroll_id' => $scrollId]);
664
    }
665
666
    /**
667
     * Resolves type name by class name.
668
     *
669
     * @param string $className
670
     *
671
     * @return string
672
     */
673
    private function resolveTypeName($className)
674
    {
675
        if (strpos($className, ':') !== false || strpos($className, '\\') !== false) {
676
            return $this->getMetadataCollector()->getDocumentType($className);
677
        }
678
679
        return $className;
680
    }
681
682
    /**
683
     * Starts and stops an event in the stopwatch
684
     *
685
     * @param string $action   only 'start' and 'stop'
686
     * @param string $name     name of the event
687
     */
688
    private function stopwatch($action, $name)
689
    {
690
        if (isset($this->stopwatch)) {
691
            $this->stopwatch->$action('ongr_es: '.$name, 'ongr_es');
692
        }
693
    }
694
}
695