Completed
Push — master ( 2aeb83...bf4935 )
by Simonas
01:53
created

Manager::search()   B

Complexity

Conditions 4
Paths 8

Size

Total Lines 26
Code Lines 15

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 26
rs 8.5806
c 0
b 0
f 0
cc 4
eloc 15
nc 8
nop 3
1
<?php
2
3
/*
4
 * This file is part of the ONGR package.
5
 *
6
 * (c) NFQ Technologies UAB <[email protected]>
7
 *
8
 * For the full copyright and license information, please view the LICENSE
9
 * file that was distributed with this source code.
10
 */
11
12
namespace ONGR\ElasticsearchBundle\Service;
13
14
use Elasticsearch\Client;
15
use Elasticsearch\Common\Exceptions\Missing404Exception;
16
use ONGR\ElasticsearchBundle\Event\Events;
17
use ONGR\ElasticsearchBundle\Event\BulkEvent;
18
use ONGR\ElasticsearchBundle\Event\CommitEvent;
19
use ONGR\ElasticsearchBundle\Exception\BulkWithErrorsException;
20
use ONGR\ElasticsearchBundle\Mapping\MetadataCollector;
21
use ONGR\ElasticsearchBundle\Result\Converter;
22
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
23
use Symfony\Component\Stopwatch\Stopwatch;
24
25
/**
26
 * Manager class.
27
 */
28
class Manager
29
{
30
    /**
31
     * @var string Manager name
32
     */
33
    private $name;
34
35
    /**
36
     * @var array Manager configuration
37
     */
38
    private $config = [];
39
40
    /**
41
     * @var Client
42
     */
43
    private $client;
44
45
    /**
46
     * @var Converter
47
     */
48
    private $converter;
49
50
    /**
51
     * @var array Container for bulk queries
52
     */
53
    private $bulkQueries = [];
54
55
    /**
56
     * @var array Holder for consistency, refresh and replication parameters
57
     */
58
    private $bulkParams = [];
59
60
    /**
61
     * @var array
62
     */
63
    private $indexSettings;
64
65
    /**
66
     * @var MetadataCollector
67
     */
68
    private $metadataCollector;
69
70
    /**
71
     * After commit to make data available the refresh or flush operation is needed
72
     * so one of those methods has to be defined, the default is refresh.
73
     *
74
     * @var string
75
     */
76
    private $commitMode = 'refresh';
77
78
    /**
79
     * The size that defines after how much document inserts call commit function.
80
     *
81
     * @var int
82
     */
83
    private $bulkCommitSize = 100;
84
85
    /**
86
     * Container to count how many documents was passed to the bulk query.
87
     *
88
     * @var int
89
     */
90
    private $bulkCount = 0;
91
92
    /**
93
     * @var Repository[] Repository local cache
94
     */
95
    private $repositories;
96
97
    /**
98
     * @var EventDispatcherInterface
99
     */
100
    private $eventDispatcher;
101
102
    /**
103
     * @var Stopwatch
104
     */
105
    private $stopwatch;
106
107
    /**
108
     * @param string            $name              Manager name
109
     * @param array             $config            Manager configuration
110
     * @param Client            $client
111
     * @param array             $indexSettings
112
     * @param MetadataCollector $metadataCollector
113
     * @param Converter         $converter
114
     */
115
    public function __construct(
116
        $name,
117
        array $config,
118
        $client,
119
        array $indexSettings,
120
        $metadataCollector,
121
        $converter
122
    ) {
123
        $this->name = $name;
124
        $this->config = $config;
125
        $this->client = $client;
126
        $this->indexSettings = $indexSettings;
127
        $this->metadataCollector = $metadataCollector;
128
        $this->converter = $converter;
129
    }
130
131
    /**
132
     * Returns Elasticsearch connection.
133
     *
134
     * @return Client
135
     */
136
    public function getClient()
137
    {
138
        return $this->client;
139
    }
140
141
    /**
142
     * @return string
143
     */
144
    public function getName()
145
    {
146
        return $this->name;
147
    }
148
149
    /**
150
     * @return array
151
     */
152
    public function getConfig()
153
    {
154
        return $this->config;
155
    }
156
157
    /**
158
     * @param EventDispatcherInterface $eventDispatcher
159
     */
160
    public function setEventDispatcher(EventDispatcherInterface $eventDispatcher)
161
    {
162
        $this->eventDispatcher = $eventDispatcher;
163
    }
164
165
    /**
166
     * @param Stopwatch $stopwatch
167
     */
168
    public function setStopwatch(Stopwatch $stopwatch)
169
    {
170
        $this->stopwatch = $stopwatch;
171
    }
172
173
    /**
174
     * Returns repository by document class.
175
     *
176
     * @param string $className FQCN or string in Bundle:Document format
177
     *
178
     * @return Repository
179
     */
180
    public function getRepository($className)
181
    {
182
        if (!is_string($className)) {
183
            throw new \InvalidArgumentException('Document class must be a string.');
184
        }
185
186
        $directory = null;
187
188
        if (strpos($className, ':')) {
189
            $bundle = explode(':', $className)[0];
190
191
            if (isset($this->config['mappings'][$bundle]['document_dir'])) {
192
                $directory = $this->config['mappings'][$bundle]['document_dir'];
193
            }
194
        }
195
196
        $namespace = $this->getMetadataCollector()->getClassName($className, $directory);
197
198
        if (isset($this->repositories[$namespace])) {
199
            return $this->repositories[$namespace];
200
        }
201
202
        $repository = $this->createRepository($namespace);
203
        $this->repositories[$namespace] = $repository;
204
205
        return $repository;
206
    }
207
208
    /**
209
     * @return MetadataCollector
210
     */
211
    public function getMetadataCollector()
212
    {
213
        return $this->metadataCollector;
214
    }
215
216
    /**
217
     * @return Converter
218
     */
219
    public function getConverter()
220
    {
221
        return $this->converter;
222
    }
223
224
    /**
225
     * @return string
226
     */
227
    public function getCommitMode()
228
    {
229
        return $this->commitMode;
230
    }
231
232
    /**
233
     * @param string $commitMode
234
     */
235
    public function setCommitMode($commitMode)
236
    {
237
        if ($commitMode === 'refresh' || $commitMode === 'flush' || $commitMode === 'none') {
238
            $this->commitMode = $commitMode;
239
        } else {
240
            throw new \LogicException('The commit method must be either refresh, flush or none.');
241
        }
242
    }
243
244
    /**
245
     * @return int
246
     */
247
    public function getBulkCommitSize()
248
    {
249
        return $this->bulkCommitSize;
250
    }
251
252
    /**
253
     * @param int $bulkCommitSize
254
     */
255
    public function setBulkCommitSize($bulkCommitSize)
256
    {
257
        $this->bulkCommitSize = $bulkCommitSize;
258
    }
259
260
    /**
261
     * Creates a repository.
262
     *
263
     * @param string $className
264
     *
265
     * @return Repository
266
     */
267
    private function createRepository($className)
268
    {
269
        return new Repository($this, $className);
270
    }
271
272
    /**
273
     * Executes search query in the index.
274
     *
275
     * @param array $types             List of types to search in.
276
     * @param array $query             Query to execute.
277
     * @param array $queryStringParams Query parameters.
278
     *
279
     * @return array
280
     */
281
    public function search(array $types, array $query, array $queryStringParams = [])
282
    {
283
        $params = [];
284
        $params['index'] = $this->getIndexName();
285
        
286
        $resolvedTypes = [];
287
        foreach ($types as $type) {
288
            $resolvedTypes[] = $this->resolveTypeName($type);
289
        }
290
        
291
        if (!empty($resolvedTypes)) {
292
            $params['type'] = implode(',', $resolvedTypes);
293
        }
294
        
295
        $params['body'] = $query;
296
297
        if (!empty($queryStringParams)) {
298
            $params = array_merge($queryStringParams, $params);
299
        }
300
301
        $this->stopwatch('start', 'search');
302
        $result = $this->client->search($params);
303
        $this->stopwatch('stop', 'search');
304
305
        return $result;
306
    }
307
308
    /**
309
     * Adds document to next flush.
310
     *
311
     * @param object $document
312
     */
313
    public function persist($document)
314
    {
315
        $documentArray = $this->converter->convertToArray($document);
316
        $type = $this->getMetadataCollector()->getDocumentType(get_class($document));
317
318
        $this->bulk('index', $type, $documentArray);
319
    }
320
321
    /**
322
     * Adds document for removal.
323
     *
324
     * @param object $document
325
     */
326
    public function remove($document)
327
    {
328
        $data = $this->converter->convertToArray($document, [], ['_id', '_routing']);
329
330
        if (!isset($data['_id'])) {
331
            throw new \LogicException(
332
                'In order to use remove() method document class must have property with @Id annotation.'
333
            );
334
        }
335
336
        $type = $this->getMetadataCollector()->getDocumentType(get_class($document));
337
338
        $this->bulk('delete', $type, $data);
339
    }
340
341
    /**
342
     * Flushes elasticsearch index.
343
     *
344
     * @param array $params
345
     *
346
     * @return array
347
     */
348
    public function flush(array $params = [])
349
    {
350
        return $this->client->indices()->flush(array_merge(['index' => $this->getIndexName()], $params));
351
    }
352
353
    /**
354
     * Refreshes elasticsearch index.
355
     *
356
     * @param array $params
357
     *
358
     * @return array
359
     */
360
    public function refresh(array $params = [])
361
    {
362
        return $this->client->indices()->refresh(array_merge(['index' => $this->getIndexName()], $params));
363
    }
364
365
    /**
366
     * Inserts the current query container to the index, used for bulk queries execution.
367
     *
368
     * @param array $params Parameters that will be passed to the flush or refresh queries.
369
     *
370
     * @return null|array
371
     *
372
     * @throws BulkWithErrorsException
373
     */
374
    public function commit(array $params = [])
375
    {
376
        if (!empty($this->bulkQueries)) {
377
            $bulkQueries = array_merge($this->bulkQueries, $this->bulkParams);
378
            $bulkQueries['index']['_index'] = $this->getIndexName();
379
            $this->eventDispatcher->dispatch(
380
                Events::PRE_COMMIT,
381
                new CommitEvent($this->getCommitMode(), $bulkQueries)
382
            );
383
384
            $this->stopwatch('start', 'bulk');
385
            $bulkResponse = $this->client->bulk($bulkQueries);
386
            $this->stopwatch('stop', 'bulk');
387
388
            if ($bulkResponse['errors']) {
389
                throw new BulkWithErrorsException(
390
                    json_encode($bulkResponse),
391
                    0,
392
                    null,
393
                    $bulkResponse
0 ignored issues
show
Documentation introduced by
$bulkResponse is of type callable, but the function expects a array.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
394
                );
395
            }
396
397
            $this->bulkQueries = [];
398
            $this->bulkCount = 0;
399
400
            $this->stopwatch('start', 'refresh');
401
402
            switch ($this->getCommitMode()) {
403
                case 'flush':
404
                    $this->flush($params);
405
                    break;
406
                case 'refresh':
407
                    $this->refresh($params);
408
                    break;
409
            }
410
411
            $this->eventDispatcher->dispatch(
412
                Events::POST_COMMIT,
413
                new CommitEvent($this->getCommitMode(), $bulkResponse)
0 ignored issues
show
Documentation introduced by
$bulkResponse is of type callable, but the function expects a array|null.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
414
            );
415
416
            $this->stopwatch('stop', 'refresh');
417
418
            return $bulkResponse;
419
        }
420
421
        return null;
422
    }
423
424
    /**
425
     * Adds query to bulk queries container.
426
     *
427
     * @param string       $operation One of: index, update, delete, create.
428
     * @param string|array $type      Elasticsearch type name.
429
     * @param array        $query     DSL to execute.
430
     *
431
     * @throws \InvalidArgumentException
432
     *
433
     * @return null|array
434
     */
435
    public function bulk($operation, $type, array $query)
436
    {
437
        if (!in_array($operation, ['index', 'create', 'update', 'delete'])) {
438
            throw new \InvalidArgumentException('Wrong bulk operation selected');
439
        }
440
441
        $this->eventDispatcher->dispatch(
442
            Events::BULK,
443
            new BulkEvent($operation, $type, $query)
444
        );
445
446
        $this->bulkQueries['body'][] = [
447
            $operation => array_filter(
448
                [
449
                    '_type' => $type,
450
                    '_id' => isset($query['_id']) ? $query['_id'] : null,
451
                    '_ttl' => isset($query['_ttl']) ? $query['_ttl'] : null,
452
                    '_routing' => isset($query['_routing']) ? $query['_routing'] : null,
453
                    '_parent' => isset($query['_parent']) ? $query['_parent'] : null,
454
                ]
455
            ),
456
        ];
457
        unset($query['_id'], $query['_ttl'], $query['_parent'], $query['_routing']);
458
459
        switch ($operation) {
460
            case 'index':
461
            case 'create':
462
            case 'update':
463
                $this->bulkQueries['body'][] = $query;
464
                break;
465
            case 'delete':
466
                // Body for delete operation is not needed to apply.
467
            default:
468
                // Do nothing.
469
                break;
470
        }
471
472
        // We are using counter because there is to difficult to resolve this from bulkQueries array.
473
        $this->bulkCount++;
474
475
        $response = null;
476
477
        if ($this->bulkCommitSize === $this->bulkCount) {
478
            $response = $this->commit();
479
        }
480
481
        return $response;
482
    }
483
484
    /**
485
     * Optional setter to change bulk query params.
486
     *
487
     * @param array $params Possible keys:
488
     *                      ['consistency'] = (enum) Explicit write consistency setting for the operation.
489
     *                      ['refresh']     = (boolean) Refresh the index after performing the operation.
490
     *                      ['replication'] = (enum) Explicitly set the replication type.
491
     */
492
    public function setBulkParams(array $params)
493
    {
494
        $this->bulkParams = $params;
495
    }
496
497
    /**
498
     * Creates fresh elasticsearch index.
499
     *
500
     * @param bool $noMapping Determines if mapping should be included.
501
     *
502
     * @return array
503
     */
504
    public function createIndex($noMapping = false)
505
    {
506
        if ($noMapping) {
507
            unset($this->indexSettings['body']['mappings']);
508
        }
509
510
        return $this->getClient()->indices()->create($this->indexSettings);
511
    }
512
513
    /**
514
     * Drops elasticsearch index.
515
     */
516
    public function dropIndex()
517
    {
518
        return $this->getClient()->indices()->delete(['index' => $this->getIndexName()]);
519
    }
520
521
    /**
522
     * Tries to drop and create fresh elasticsearch index.
523
     *
524
     * @param bool $noMapping Determines if mapping should be included.
525
     *
526
     * @return array
527
     */
528
    public function dropAndCreateIndex($noMapping = false)
529
    {
530
        try {
531
            $this->dropIndex();
532
        } catch (\Exception $e) {
533
            // Do nothing, our target is to create new index.
534
        }
535
536
        return $this->createIndex($noMapping);
537
    }
538
539
    /**
540
     * Checks if connection index is already created.
541
     *
542
     * @return bool
543
     */
544
    public function indexExists()
545
    {
546
        return $this->getClient()->indices()->exists(['index' => $this->getIndexName()]);
547
    }
548
549
    /**
550
     * Returns index name this connection is attached to.
551
     *
552
     * @return string
553
     */
554
    public function getIndexName()
555
    {
556
        return $this->indexSettings['index'];
557
    }
558
559
    /**
560
     * Sets index name for this connection.
561
     *
562
     * @param string $name
563
     */
564
    public function setIndexName($name)
565
    {
566
        $this->indexSettings['index'] = $name;
567
    }
568
569
    /**
570
     * Returns mappings of the index for this connection.
571
     *
572
     * @return array
573
     */
574
    public function getIndexMappings()
575
    {
576
        return $this->indexSettings['body']['mappings'];
577
    }
578
579
    /**
580
     * Returns Elasticsearch version number.
581
     *
582
     * @return string
583
     */
584
    public function getVersionNumber()
585
    {
586
        return $this->client->info()['version']['number'];
587
    }
588
589
    /**
590
     * Clears elasticsearch client cache.
591
     */
592
    public function clearCache()
593
    {
594
        $this->getClient()->indices()->clearCache(['index' => $this->getIndexName()]);
595
    }
596
597
    /**
598
     * Returns a single document by ID. Returns NULL if document was not found.
599
     *
600
     * @param string $className Document class name or Elasticsearch type name
601
     * @param string $id        Document ID to find
602
     * @param string $routing   Custom routing for the document
603
     *
604
     * @return object
605
     */
606
    public function find($className, $id, $routing = null)
607
    {
608
        $type = $this->resolveTypeName($className);
609
610
        $params = [
611
            'index' => $this->getIndexName(),
612
            'type' => $type,
613
            'id' => $id,
614
        ];
615
616
        if ($routing) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $routing of type string|null is loosely compared to true; this is ambiguous if the string can be empty. You might want to explicitly use !== null instead.

In PHP, under loose comparison (like ==, or !=, or switch conditions), values of different types might be equal.

For string values, the empty string '' is a special case, in particular the following results might be unexpected:

''   == false // true
''   == null  // true
'ab' == false // false
'ab' == null  // false

// It is often better to use strict comparison
'' === false // false
'' === null  // false
Loading history...
617
            $params['routing'] = $routing;
618
        }
619
620
        try {
621
            $result = $this->getClient()->get($params);
622
        } catch (Missing404Exception $e) {
623
            return null;
624
        }
625
626
        return $this->getConverter()->convertToDocument($result, $this);
0 ignored issues
show
Documentation introduced by
$result is of type callable, but the function expects a array.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
627
    }
628
629
    /**
630
     * Fetches next set of results.
631
     *
632
     * @param string $scrollId
633
     * @param string $scrollDuration
634
     *
635
     * @return mixed
636
     *
637
     * @throws \Exception
638
     */
639
    public function scroll(
640
        $scrollId,
641
        $scrollDuration = '5m'
642
    ) {
643
        $results = $this->getClient()->scroll(['scroll_id' => $scrollId, 'scroll' => $scrollDuration]);
644
645
        return $results;
646
    }
647
648
    /**
649
     * Clears scroll.
650
     *
651
     * @param string $scrollId
652
     */
653
    public function clearScroll($scrollId)
654
    {
655
        $this->getClient()->clearScroll(['scroll_id' => $scrollId]);
656
    }
657
658
    /**
659
     * Calls "Get Settings API" in Elasticsearch and will return you the currently configured settings.
660
     *
661
     * return array
662
     */
663
    public function getSettings()
664
    {
665
        return $this->getClient()->indices()->getSettings(['index' => $this->getIndexName()]);
666
    }
667
668
    /**
669
     * Gets Elasticsearch aliases information.
670
     * @param $params
671
     *
672
     * @return array
673
     */
674
    public function getAliases($params = [])
675
    {
676
        return $this->getClient()->indices()->getAliases(array_merge(['index' => $this->getIndexName()], $params));
677
    }
678
679
    /**
680
     * Resolves type name by class name.
681
     *
682
     * @param string $className
683
     *
684
     * @return string
685
     */
686
    private function resolveTypeName($className)
687
    {
688
        if (strpos($className, ':') !== false || strpos($className, '\\') !== false) {
689
            return $this->getMetadataCollector()->getDocumentType($className);
690
        }
691
692
        return $className;
693
    }
694
695
    /**
696
     * Starts and stops an event in the stopwatch
697
     *
698
     * @param string $action   only 'start' and 'stop'
699
     * @param string $name     name of the event
700
     */
701
    private function stopwatch($action, $name)
702
    {
703
        if (isset($this->stopwatch)) {
704
            $this->stopwatch->$action('ongr_es: '.$name, 'ongr_es');
705
        }
706
    }
707
}
708