Completed
Pull Request — master (#751)
by Darius
02:29
created

Manager::msearch()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 10
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 10
rs 9.4285
c 0
b 0
f 0
cc 1
eloc 5
nc 1
nop 1
1
<?php
2
3
/*
4
 * This file is part of the ONGR package.
5
 *
6
 * (c) NFQ Technologies UAB <[email protected]>
7
 *
8
 * For the full copyright and license information, please view the LICENSE
9
 * file that was distributed with this source code.
10
 */
11
12
namespace ONGR\ElasticsearchBundle\Service;
13
14
use Elasticsearch\Client;
15
use Elasticsearch\Common\Exceptions\Missing404Exception;
16
use ONGR\ElasticsearchBundle\Event\Events;
17
use ONGR\ElasticsearchBundle\Event\BulkEvent;
18
use ONGR\ElasticsearchBundle\Event\CommitEvent;
19
use ONGR\ElasticsearchBundle\Exception\BulkWithErrorsException;
20
use ONGR\ElasticsearchBundle\Mapping\MetadataCollector;
21
use ONGR\ElasticsearchBundle\Result\Converter;
22
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
23
use Symfony\Component\Stopwatch\Stopwatch;
24
25
/**
26
 * Manager class.
27
 */
28
class Manager
29
{
30
    /**
31
     * @var string Manager name
32
     */
33
    private $name;
34
35
    /**
36
     * @var array Manager configuration
37
     */
38
    private $config = [];
39
40
    /**
41
     * @var Client
42
     */
43
    private $client;
44
45
    /**
46
     * @var Converter
47
     */
48
    private $converter;
49
50
    /**
51
     * @var array Container for bulk queries
52
     */
53
    private $bulkQueries = [];
54
55
    /**
56
     * @var array Holder for consistency, refresh and replication parameters
57
     */
58
    private $bulkParams = [];
59
60
    /**
61
     * @var array
62
     */
63
    private $indexSettings;
64
65
    /**
66
     * @var MetadataCollector
67
     */
68
    private $metadataCollector;
69
70
    /**
71
     * After commit to make data available the refresh or flush operation is needed
72
     * so one of those methods has to be defined, the default is refresh.
73
     *
74
     * @var string
75
     */
76
    private $commitMode = 'refresh';
77
78
    /**
79
     * The size that defines after how much document inserts call commit function.
80
     *
81
     * @var int
82
     */
83
    private $bulkCommitSize = 100;
84
85
    /**
86
     * Container to count how many documents was passed to the bulk query.
87
     *
88
     * @var int
89
     */
90
    private $bulkCount = 0;
91
92
    /**
93
     * @var Repository[] Repository local cache
94
     */
95
    private $repositories;
96
97
    /**
98
     * @var EventDispatcherInterface
99
     */
100
    private $eventDispatcher;
101
102
    /**
103
     * @var Stopwatch
104
     */
105
    private $stopwatch;
106
107
    /**
108
     * @param string            $name              Manager name
109
     * @param array             $config            Manager configuration
110
     * @param Client            $client
111
     * @param array             $indexSettings
112
     * @param MetadataCollector $metadataCollector
113
     * @param Converter         $converter
114
     */
115
    public function __construct(
116
        $name,
117
        array $config,
118
        $client,
119
        array $indexSettings,
120
        $metadataCollector,
121
        $converter
122
    ) {
123
        $this->name = $name;
124
        $this->config = $config;
125
        $this->client = $client;
126
        $this->indexSettings = $indexSettings;
127
        $this->metadataCollector = $metadataCollector;
128
        $this->converter = $converter;
129
    }
130
131
    /**
132
     * Returns Elasticsearch connection.
133
     *
134
     * @return Client
135
     */
136
    public function getClient()
137
    {
138
        return $this->client;
139
    }
140
141
    /**
142
     * @return string
143
     */
144
    public function getName()
145
    {
146
        return $this->name;
147
    }
148
149
    /**
150
     * @return array
151
     */
152
    public function getConfig()
153
    {
154
        return $this->config;
155
    }
156
157
    /**
158
     * @param EventDispatcherInterface $eventDispatcher
159
     */
160
    public function setEventDispatcher(EventDispatcherInterface $eventDispatcher)
161
    {
162
        $this->eventDispatcher = $eventDispatcher;
163
    }
164
165
    /**
166
     * @param Stopwatch $stopwatch
167
     */
168
    public function setStopwatch(Stopwatch $stopwatch)
169
    {
170
        $this->stopwatch = $stopwatch;
171
    }
172
173
    /**
174
     * Returns repository by document class.
175
     *
176
     * @param string $className FQCN or string in Bundle:Document format
177
     *
178
     * @return Repository
179
     */
180
    public function getRepository($className)
181
    {
182
        if (!is_string($className)) {
183
            throw new \InvalidArgumentException('Document class must be a string.');
184
        }
185
186
        $namespace = $this->getMetadataCollector()->getClassName($className);
187
188
        if (isset($this->repositories[$namespace])) {
189
            return $this->repositories[$namespace];
190
        }
191
192
        $repository = $this->createRepository($namespace);
193
        $this->repositories[$namespace] = $repository;
194
195
        return $repository;
196
    }
197
198
    /**
199
     * @return MetadataCollector
200
     */
201
    public function getMetadataCollector()
202
    {
203
        return $this->metadataCollector;
204
    }
205
206
    /**
207
     * @return Converter
208
     */
209
    public function getConverter()
210
    {
211
        return $this->converter;
212
    }
213
214
    /**
215
     * @return string
216
     */
217
    public function getCommitMode()
218
    {
219
        return $this->commitMode;
220
    }
221
222
    /**
223
     * @param string $commitMode
224
     */
225
    public function setCommitMode($commitMode)
226
    {
227
        if ($commitMode === 'refresh' || $commitMode === 'flush' || $commitMode === 'none') {
228
            $this->commitMode = $commitMode;
229
        } else {
230
            throw new \LogicException('The commit method must be either refresh, flush or none.');
231
        }
232
    }
233
234
    /**
235
     * @return int
236
     */
237
    public function getBulkCommitSize()
238
    {
239
        return $this->bulkCommitSize;
240
    }
241
242
    /**
243
     * @param int $bulkCommitSize
244
     */
245
    public function setBulkCommitSize($bulkCommitSize)
246
    {
247
        $this->bulkCommitSize = $bulkCommitSize;
248
    }
249
250
    /**
251
     * Creates a repository.
252
     *
253
     * @param string $className
254
     *
255
     * @return Repository
256
     */
257
    private function createRepository($className)
258
    {
259
        return new Repository($this, $className);
260
    }
261
262
    /**
263
     * Executes search query in the index.
264
     *
265
     * @param array $types             List of types to search in.
266
     * @param array $query             Query to execute.
267
     * @param array $queryStringParams Query parameters.
268
     *
269
     * @return array
270
     */
271
    public function search(array $types, array $query, array $queryStringParams = [])
272
    {
273
        $params = [];
274
        $params['index'] = $this->getIndexName();
275
        
276
        if (!empty($types)) {
277
            $params['type'] = implode(',', $types);
278
        }
279
        
280
        $params['body'] = $query;
281
282
        if (!empty($queryStringParams)) {
283
            $params = array_merge($queryStringParams, $params);
284
        }
285
286
        $this->stopwatch('start', 'search');
287
        $result = $this->client->search($params);
288
        $this->stopwatch('stop', 'search');
289
290
        return $result;
291
    }
292
293
    /**
294
     * Execute search queries using multisearch api
295
     * $body - is array of requests described in elastic Multi Search API
296
     *
297
     * @param $body
298
     * @return array
299
     */
300
    public function msearch(array $body)
301
    {
302
        $result = $this->client->msearch(
303
            [
304
                'index' => $this->getIndexName(), // set default index
305
                'body' => $body
306
            ]
307
        );
308
        return $result;
309
    }
310
311
    /**
312
     * Adds document to next flush.
313
     *
314
     * @param object $document
315
     */
316
    public function persist($document)
317
    {
318
        $documentArray = $this->converter->convertToArray($document);
319
        $type = $this->getMetadataCollector()->getDocumentType(get_class($document));
320
321
        $this->bulk('index', $type, $documentArray);
322
    }
323
324
    /**
325
     * Adds document for removal.
326
     *
327
     * @param object $document
328
     */
329
    public function remove($document)
330
    {
331
        $data = $this->converter->convertToArray($document, [], ['_id', '_routing']);
332
333
        if (!isset($data['_id'])) {
334
            throw new \LogicException(
335
                'In order to use remove() method document class must have property with @Id annotation.'
336
            );
337
        }
338
339
        $type = $this->getMetadataCollector()->getDocumentType(get_class($document));
340
341
        $this->bulk('delete', $type, $data);
342
    }
343
344
    /**
345
     * Flushes elasticsearch index.
346
     *
347
     * @param array $params
348
     *
349
     * @return array
350
     */
351
    public function flush(array $params = [])
352
    {
353
        return $this->client->indices()->flush(array_merge(['index' => $this->getIndexName()], $params));
354
    }
355
356
    /**
357
     * Refreshes elasticsearch index.
358
     *
359
     * @param array $params
360
     *
361
     * @return array
362
     */
363
    public function refresh(array $params = [])
364
    {
365
        return $this->client->indices()->refresh(array_merge(['index' => $this->getIndexName()], $params));
366
    }
367
368
    /**
369
     * Inserts the current query container to the index, used for bulk queries execution.
370
     *
371
     * @param array $params Parameters that will be passed to the flush or refresh queries.
372
     *
373
     * @return null|array
374
     *
375
     * @throws BulkWithErrorsException
376
     */
377
    public function commit(array $params = [])
378
    {
379
        if (!empty($this->bulkQueries)) {
380
            $bulkQueries = array_merge($this->bulkQueries, $this->bulkParams);
381
            $bulkQueries['index']['_index'] = $this->getIndexName();
382
            $this->eventDispatcher->dispatch(
383
                Events::PRE_COMMIT,
384
                new CommitEvent($this->getCommitMode(), $bulkQueries)
385
            );
386
387
            $this->stopwatch('start', 'bulk');
388
            $bulkResponse = $this->client->bulk($bulkQueries);
389
            $this->stopwatch('stop', 'bulk');
390
391
            if ($bulkResponse['errors']) {
392
                throw new BulkWithErrorsException(
393
                    json_encode($bulkResponse),
394
                    0,
395
                    null,
396
                    $bulkResponse
0 ignored issues
show
Documentation introduced by
$bulkResponse is of type callable, but the function expects a array.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
397
                );
398
            }
399
400
            $this->bulkQueries = [];
401
            $this->bulkCount = 0;
402
403
            $this->stopwatch('start', 'refresh');
404
405
            switch ($this->getCommitMode()) {
406
                case 'flush':
407
                    $this->flush($params);
408
                    break;
409
                case 'refresh':
410
                    $this->refresh($params);
411
                    break;
412
            }
413
414
            $this->eventDispatcher->dispatch(
415
                Events::POST_COMMIT,
416
                new CommitEvent($this->getCommitMode(), $bulkResponse)
0 ignored issues
show
Documentation introduced by
$bulkResponse is of type callable, but the function expects a array|null.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
417
            );
418
419
            $this->stopwatch('stop', 'refresh');
420
421
            return $bulkResponse;
422
        }
423
424
        return null;
425
    }
426
427
    /**
428
     * Adds query to bulk queries container.
429
     *
430
     * @param string       $operation One of: index, update, delete, create.
431
     * @param string|array $type      Elasticsearch type name.
432
     * @param array        $query     DSL to execute.
433
     *
434
     * @throws \InvalidArgumentException
435
     *
436
     * @return null|array
437
     */
438
    public function bulk($operation, $type, array $query)
439
    {
440
        if (!in_array($operation, ['index', 'create', 'update', 'delete'])) {
441
            throw new \InvalidArgumentException('Wrong bulk operation selected');
442
        }
443
444
        $this->eventDispatcher->dispatch(
445
            Events::BULK,
446
            new BulkEvent($operation, $type, $query)
447
        );
448
449
        $this->bulkQueries['body'][] = [
450
            $operation => array_filter(
451
                [
452
                    '_type' => $type,
453
                    '_id' => isset($query['_id']) ? $query['_id'] : null,
454
                    '_ttl' => isset($query['_ttl']) ? $query['_ttl'] : null,
455
                    '_routing' => isset($query['_routing']) ? $query['_routing'] : null,
456
                    '_parent' => isset($query['_parent']) ? $query['_parent'] : null,
457
                ]
458
            ),
459
        ];
460
        unset($query['_id'], $query['_ttl'], $query['_parent'], $query['_routing']);
461
462
        switch ($operation) {
463
            case 'index':
464
            case 'create':
465
            case 'update':
466
                $this->bulkQueries['body'][] = $query;
467
                break;
468
            case 'delete':
469
                // Body for delete operation is not needed to apply.
470
            default:
471
                // Do nothing.
472
                break;
473
        }
474
475
        // We are using counter because there is to difficult to resolve this from bulkQueries array.
476
        $this->bulkCount++;
477
478
        $response = null;
479
480
        if ($this->bulkCommitSize === $this->bulkCount) {
481
            $response = $this->commit();
482
        }
483
484
        return $response;
485
    }
486
487
    /**
488
     * Optional setter to change bulk query params.
489
     *
490
     * @param array $params Possible keys:
491
     *                      ['consistency'] = (enum) Explicit write consistency setting for the operation.
492
     *                      ['refresh']     = (boolean) Refresh the index after performing the operation.
493
     *                      ['replication'] = (enum) Explicitly set the replication type.
494
     */
495
    public function setBulkParams(array $params)
496
    {
497
        $this->bulkParams = $params;
498
    }
499
500
    /**
501
     * Creates fresh elasticsearch index.
502
     *
503
     * @param bool $noMapping Determines if mapping should be included.
504
     *
505
     * @return array
506
     */
507
    public function createIndex($noMapping = false)
508
    {
509
        if ($noMapping) {
510
            unset($this->indexSettings['body']['mappings']);
511
        }
512
513
        return $this->getClient()->indices()->create($this->indexSettings);
514
    }
515
516
    /**
517
     * Drops elasticsearch index.
518
     */
519
    public function dropIndex()
520
    {
521
        return $this->getClient()->indices()->delete(['index' => $this->getIndexName()]);
522
    }
523
524
    /**
525
     * Tries to drop and create fresh elasticsearch index.
526
     *
527
     * @param bool $noMapping Determines if mapping should be included.
528
     *
529
     * @return array
530
     */
531
    public function dropAndCreateIndex($noMapping = false)
532
    {
533
        try {
534
            $this->dropIndex();
535
        } catch (\Exception $e) {
536
            // Do nothing, our target is to create new index.
537
        }
538
539
        return $this->createIndex($noMapping);
540
    }
541
542
    /**
543
     * Checks if connection index is already created.
544
     *
545
     * @return bool
546
     */
547
    public function indexExists()
548
    {
549
        return $this->getClient()->indices()->exists(['index' => $this->getIndexName()]);
550
    }
551
552
    /**
553
     * Returns index name this connection is attached to.
554
     *
555
     * @return string
556
     */
557
    public function getIndexName()
558
    {
559
        return $this->indexSettings['index'];
560
    }
561
562
    /**
563
     * Sets index name for this connection.
564
     *
565
     * @param string $name
566
     */
567
    public function setIndexName($name)
568
    {
569
        $this->indexSettings['index'] = $name;
570
    }
571
572
    /**
573
     * Returns mappings of the index for this connection.
574
     *
575
     * @return array
576
     */
577
    public function getIndexMappings()
578
    {
579
        return $this->indexSettings['body']['mappings'];
580
    }
581
582
    /**
583
     * Returns Elasticsearch version number.
584
     *
585
     * @return string
586
     */
587
    public function getVersionNumber()
588
    {
589
        return $this->client->info()['version']['number'];
590
    }
591
592
    /**
593
     * Clears elasticsearch client cache.
594
     */
595
    public function clearCache()
596
    {
597
        $this->getClient()->indices()->clearCache(['index' => $this->getIndexName()]);
598
    }
599
600
    /**
601
     * Returns a single document by ID. Returns NULL if document was not found.
602
     *
603
     * @param string $className Document class name or Elasticsearch type name
604
     * @param string $id        Document ID to find
605
     * @param string $routing   Custom routing for the document
606
     *
607
     * @return object
608
     */
609
    public function find($className, $id, $routing = null)
610
    {
611
        $type = $this->resolveTypeName($className);
612
613
        $params = [
614
            'index' => $this->getIndexName(),
615
            'type' => $type,
616
            'id' => $id,
617
        ];
618
619
        if ($routing) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $routing of type string|null is loosely compared to true; this is ambiguous if the string can be empty. You might want to explicitly use !== null instead.

In PHP, under loose comparison (like ==, or !=, or switch conditions), values of different types might be equal.

For string values, the empty string '' is a special case, in particular the following results might be unexpected:

''   == false // true
''   == null  // true
'ab' == false // false
'ab' == null  // false

// It is often better to use strict comparison
'' === false // false
'' === null  // false
Loading history...
620
            $params['routing'] = $routing;
621
        }
622
623
        try {
624
            $result = $this->getClient()->get($params);
625
        } catch (Missing404Exception $e) {
626
            return null;
627
        }
628
629
        return $this->getConverter()->convertToDocument($result, $this);
0 ignored issues
show
Documentation introduced by
$result is of type callable, but the function expects a array.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
630
    }
631
632
    /**
633
     * Fetches next set of results.
634
     *
635
     * @param string $scrollId
636
     * @param string $scrollDuration
637
     *
638
     * @return mixed
639
     *
640
     * @throws \Exception
641
     */
642
    public function scroll(
643
        $scrollId,
644
        $scrollDuration = '5m'
645
    ) {
646
        $results = $this->getClient()->scroll(['scroll_id' => $scrollId, 'scroll' => $scrollDuration]);
647
648
        return $results;
649
    }
650
651
    /**
652
     * Clears scroll.
653
     *
654
     * @param string $scrollId
655
     */
656
    public function clearScroll($scrollId)
657
    {
658
        $this->getClient()->clearScroll(['scroll_id' => $scrollId]);
659
    }
660
661
    /**
662
     * Calls "Get Settings API" in Elasticsearch and will return you the currently configured settings.
663
     *
664
     * return array
665
     */
666
    public function getSettings()
667
    {
668
        return $this->getClient()->indices()->getSettings(['index' => $this->getIndexName()]);
669
    }
670
671
    /**
672
     * Gets Elasticsearch aliases information.
673
     * @param $params
674
     *
675
     * @return array
676
     */
677
    public function getAliases($params = [])
678
    {
679
        return $this->getClient()->indices()->getAliases(array_merge(['index' => $this->getIndexName()], $params));
680
    }
681
682
    /**
683
     * Resolves type name by class name.
684
     *
685
     * @param string $className
686
     *
687
     * @return string
688
     */
689
    private function resolveTypeName($className)
690
    {
691
        if (strpos($className, ':') !== false || strpos($className, '\\') !== false) {
692
            return $this->getMetadataCollector()->getDocumentType($className);
693
        }
694
695
        return $className;
696
    }
697
698
    /**
699
     * Starts and stops an event in the stopwatch
700
     *
701
     * @param string $action   only 'start' and 'stop'
702
     * @param string $name     name of the event
703
     */
704
    private function stopwatch($action, $name)
705
    {
706
        if (isset($this->stopwatch)) {
707
            $this->stopwatch->$action('ongr_es: '.$name, 'ongr_es');
708
        }
709
    }
710
}
711