Completed
Push — 1.2 ( b13b4a )
by Simonas
10s
created

Manager::setCommitMode()   A

Complexity

Conditions 4
Paths 2

Size

Total Lines 8
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 1
Metric Value
c 1
b 0
f 1
dl 0
loc 8
rs 9.2
cc 4
eloc 5
nc 2
nop 1
1
<?php
2
3
/*
4
 * This file is part of the ONGR package.
5
 *
6
 * (c) NFQ Technologies UAB <[email protected]>
7
 *
8
 * For the full copyright and license information, please view the LICENSE
9
 * file that was distributed with this source code.
10
 */
11
12
namespace ONGR\ElasticsearchBundle\Service;
13
14
use Elasticsearch\Client;
15
use Elasticsearch\Common\Exceptions\Missing404Exception;
16
use ONGR\ElasticsearchBundle\Event\Events;
17
use ONGR\ElasticsearchBundle\Event\BulkEvent;
18
use ONGR\ElasticsearchBundle\Event\PersistEvent;
19
use ONGR\ElasticsearchBundle\Event\CommitEvent;
20
use ONGR\ElasticsearchBundle\Mapping\MetadataCollector;
21
use ONGR\ElasticsearchBundle\Result\AbstractResultsIterator;
22
use ONGR\ElasticsearchBundle\Result\Converter;
23
use ONGR\ElasticsearchBundle\Result\DocumentIterator;
24
use ONGR\ElasticsearchBundle\Result\RawIterator;
25
use ONGR\ElasticsearchBundle\Result\Result;
26
use ONGR\ElasticsearchDSL\Search;
27
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
28
use Symfony\Component\Stopwatch\Stopwatch;
29
30
/**
31
 * Manager class.
32
 */
33
class Manager
34
{
35
    /**
36
     * @var string Manager name
37
     */
38
    private $name;
39
40
    /**
41
     * @var array Manager configuration
42
     */
43
    private $config = [];
44
45
    /**
46
     * @var Client
47
     */
48
    private $client;
49
50
    /**
51
     * @var Converter
52
     */
53
    private $converter;
54
55
    /**
56
     * @var array Container for bulk queries
57
     */
58
    private $bulkQueries = [];
59
60
    /**
61
     * @var array Holder for consistency, refresh and replication parameters
62
     */
63
    private $bulkParams = [];
64
65
    /**
66
     * @var array
67
     */
68
    private $indexSettings;
69
70
    /**
71
     * @var MetadataCollector
72
     */
73
    private $metadataCollector;
74
75
    /**
76
     * After commit to make data available the refresh or flush operation is needed
77
     * so one of those methods has to be defined, the default is refresh.
78
     *
79
     * @var string
80
     */
81
    private $commitMode = 'refresh';
82
83
    /**
84
     * The size that defines after how much document inserts call commit function.
85
     *
86
     * @var int
87
     */
88
    private $bulkCommitSize = 100;
89
90
    /**
91
     * Container to count how many documents was passed to the bulk query.
92
     *
93
     * @var int
94
     */
95
    private $bulkCount = 0;
96
97
    /**
98
     * @var Repository[] Repository local cache
99
     */
100
    private $repositories;
101
102
    /**
103
     * @var EventDispatcherInterface
104
     */
105
    private $eventDispatcher;
106
107
    /**
108
     * @var Stopwatch
109
     */
110
    private $stopwatch;
111
112
    /**
113
     * @param string            $name              Manager name
114
     * @param array             $config            Manager configuration
115
     * @param Client            $client
116
     * @param array             $indexSettings
117
     * @param MetadataCollector $metadataCollector
118
     * @param Converter         $converter
119
     */
120
    public function __construct(
121
        $name,
122
        array $config,
123
        $client,
124
        array $indexSettings,
125
        $metadataCollector,
126
        $converter
127
    ) {
128
        $this->name = $name;
129
        $this->config = $config;
130
        $this->client = $client;
131
        $this->indexSettings = $indexSettings;
132
        $this->metadataCollector = $metadataCollector;
133
        $this->converter = $converter;
134
    }
135
136
    /**
137
     * Returns Elasticsearch connection.
138
     *
139
     * @return Client
140
     */
141
    public function getClient()
142
    {
143
        return $this->client;
144
    }
145
146
    /**
147
     * @return string
148
     */
149
    public function getName()
150
    {
151
        return $this->name;
152
    }
153
154
    /**
155
     * @return array
156
     */
157
    public function getConfig()
158
    {
159
        return $this->config;
160
    }
161
162
    /**
163
     * @param EventDispatcherInterface $eventDispatcher
164
     */
165
    public function setEventDispatcher(EventDispatcherInterface $eventDispatcher)
166
    {
167
        $this->eventDispatcher = $eventDispatcher;
168
    }
169
170
    /**
171
     * @param Stopwatch $stopwatch
172
     */
173
    public function setStopwatch(Stopwatch $stopwatch)
174
    {
175
        $this->stopwatch = $stopwatch;
176
    }
177
178
    /**
179
     * Returns repository by document class.
180
     *
181
     * @param string $className FQCN or string in Bundle:Document format
182
     *
183
     * @return Repository
184
     */
185
    public function getRepository($className)
186
    {
187
        if (!is_string($className)) {
188
            throw new \InvalidArgumentException('Document class must be a string.');
189
        }
190
191
        $namespace = $this->getMetadataCollector()->getClassName($className);
192
193
        if (isset($this->repositories[$namespace])) {
194
            return $this->repositories[$namespace];
195
        }
196
197
        $repository = $this->createRepository($namespace);
198
        $this->repositories[$namespace] = $repository;
199
200
        return $repository;
201
    }
202
203
    /**
204
     * @return MetadataCollector
205
     */
206
    public function getMetadataCollector()
207
    {
208
        return $this->metadataCollector;
209
    }
210
211
    /**
212
     * @return Converter
213
     */
214
    public function getConverter()
215
    {
216
        return $this->converter;
217
    }
218
219
    /**
220
     * @return string
221
     */
222
    public function getCommitMode()
223
    {
224
        return $this->commitMode;
225
    }
226
227
    /**
228
     * @param string $commitMode
229
     */
230
    public function setCommitMode($commitMode)
231
    {
232
        if ($commitMode === 'refresh' || $commitMode === 'flush' || $commitMode === 'none') {
233
            $this->commitMode = $commitMode;
234
        } else {
235
            throw new \LogicException('The commit method must be either refresh, flush or none.');
236
        }
237
    }
238
239
    /**
240
     * @return int
241
     */
242
    public function getBulkCommitSize()
243
    {
244
        return $this->bulkCommitSize;
245
    }
246
247
    /**
248
     * @param int $bulkCommitSize
249
     */
250
    public function setBulkCommitSize($bulkCommitSize)
251
    {
252
        $this->bulkCommitSize = $bulkCommitSize;
253
    }
254
255
    /**
256
     * Creates a repository.
257
     *
258
     * @param string $className
259
     *
260
     * @return Repository
261
     */
262
    private function createRepository($className)
263
    {
264
        return new Repository($this, $className);
265
    }
266
267
    /**
268
     * Executes search query in the index.
269
     *
270
     * @param array $types             List of types to search in.
271
     * @param array $query             Query to execute.
272
     * @param array $queryStringParams Query parameters.
273
     *
274
     * @return array
275
     */
276
    public function search(array $types, array $query, array $queryStringParams = [])
277
    {
278
        $params = [];
279
        $params['index'] = $this->getIndexName();
280
        $params['type'] = implode(',', $types);
281
        $params['body'] = $query;
282
283
        if (!empty($queryStringParams)) {
284
            $params = array_merge($queryStringParams, $params);
285
        }
286
287
        $this->stopwatch('start', 'search');
288
        $result = $this->client->search($params);
289
        $this->stopwatch('stop', 'search');
290
291
        return $result;
292
    }
293
294
    /**
295
     * Adds document to next flush.
296
     *
297
     * @param object $document
298
     */
299
    public function persist($document)
300
    {
301
        $documentArray = $this->converter->convertToArray($document);
302
        $type = $this->getMetadataCollector()->getDocumentType(get_class($document));
303
304
        $this->bulk('index', $type, $documentArray);
305
    }
306
307
    /**
308
     * Adds document for removal.
309
     *
310
     * @param object $document
311
     */
312
    public function remove($document)
313
    {
314
        $data = $this->converter->convertToArray($document, [], ['_id', '_routing']);
315
316
        if (!isset($data['_id'])) {
317
            throw new \LogicException(
318
                'In order to use remove() method document class must have property with @Id annotation.'
319
            );
320
        }
321
322
        $type = $this->getMetadataCollector()->getDocumentType(get_class($document));
323
324
        $this->bulk('delete', $type, $data);
325
    }
326
327
    /**
328
     * Flushes elasticsearch index.
329
     *
330
     * @param array $params
331
     *
332
     * @return array
333
     */
334
    public function flush(array $params = [])
335
    {
336
        return $this->client->indices()->flush(array_merge(['index' => $this->getIndexName()], $params));
337
    }
338
339
    /**
340
     * Refreshes elasticsearch index.
341
     *
342
     * @param array $params
343
     *
344
     * @return array
345
     */
346
    public function refresh(array $params = [])
347
    {
348
        return $this->client->indices()->refresh(array_merge(['index' => $this->getIndexName()], $params));
349
    }
350
351
    /**
352
     * Inserts the current query container to the index, used for bulk queries execution.
353
     *
354
     * @param array $params Parameters that will be passed to the flush or refresh queries.
355
     *
356
     * @return null|array
357
     */
358
    public function commit(array $params = [])
359
    {
360
        if (!empty($this->bulkQueries)) {
361
            $bulkQueries = array_merge($this->bulkQueries, $this->bulkParams);
362
363
            $this->eventDispatcher->dispatch(
364
                Events::PRE_COMMIT,
365
                new CommitEvent($this->getCommitMode(), $bulkQueries)
366
            );
367
368
            $this->stopwatch('start', 'bulk');
369
            $bulkResponse = $this->client->bulk($bulkQueries);
370
            $this->stopwatch('stop', 'bulk');
371
372
            $this->bulkQueries = [];
373
            $this->bulkCount = 0;
374
375
            $this->stopwatch('start', 'refresh');
376
377
            switch ($this->getCommitMode()) {
378
                case 'flush':
379
                    $this->flush($params);
380
                    break;
381
                case 'refresh':
382
                    $this->refresh($params);
383
                    break;
384
            }
385
386
            $this->eventDispatcher->dispatch(
387
                Events::POST_COMMIT,
388
                new CommitEvent($this->getCommitMode(), $bulkResponse)
0 ignored issues
show
Documentation introduced by
$bulkResponse is of type callable, but the function expects a array|null.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
389
            );
390
391
            $this->stopwatch('stop', 'refresh');
392
393
            return $bulkResponse;
394
        }
395
396
        return null;
397
    }
398
399
    /**
400
     * Adds query to bulk queries container.
401
     *
402
     * @param string       $operation One of: index, update, delete, create.
403
     * @param string|array $type      Elasticsearch type name.
404
     * @param array        $query     DSL to execute.
405
     *
406
     * @throws \InvalidArgumentException
407
     *
408
     * @return null|array
409
     */
410
    public function bulk($operation, $type, array $query)
411
    {
412
        if (!in_array($operation, ['index', 'create', 'update', 'delete'])) {
413
            throw new \InvalidArgumentException('Wrong bulk operation selected');
414
        }
415
416
        $this->eventDispatcher->dispatch(
417
            Events::BULK,
418
            new BulkEvent($operation, $type, $query)
419
        );
420
421
        $this->bulkQueries['body'][] = [
422
            $operation => array_filter(
423
                [
424
                    '_index' => $this->getIndexName(),
425
                    '_type' => $type,
426
                    '_id' => isset($query['_id']) ? $query['_id'] : null,
427
                    '_ttl' => isset($query['_ttl']) ? $query['_ttl'] : null,
428
                    '_routing' => isset($query['_routing']) ? $query['_routing'] : null,
429
                    '_parent' => isset($query['_parent']) ? $query['_parent'] : null,
430
                ]
431
            ),
432
        ];
433
        unset($query['_id'], $query['_ttl'], $query['_parent'], $query['_routing']);
434
435
        switch ($operation) {
436
            case 'index':
437
            case 'create':
438
            case 'update':
439
                $this->bulkQueries['body'][] = $query;
440
                break;
441
            case 'delete':
442
                // Body for delete operation is not needed to apply.
443
            default:
444
                // Do nothing.
445
                break;
446
        }
447
448
        // We are using counter because there is to difficult to resolve this from bulkQueries array.
449
        $this->bulkCount++;
450
451
        $response = null;
452
453
        if ($this->bulkCommitSize === $this->bulkCount) {
454
            $response = $this->commit();
455
        }
456
457
        return $response;
458
    }
459
460
    /**
461
     * Optional setter to change bulk query params.
462
     *
463
     * @param array $params Possible keys:
464
     *                      ['consistency'] = (enum) Explicit write consistency setting for the operation.
465
     *                      ['refresh']     = (boolean) Refresh the index after performing the operation.
466
     *                      ['replication'] = (enum) Explicitly set the replication type.
467
     */
468
    public function setBulkParams(array $params)
469
    {
470
        $this->bulkParams = $params;
471
    }
472
473
    /**
474
     * Creates fresh elasticsearch index.
475
     *
476
     * @param bool $noMapping Determines if mapping should be included.
477
     *
478
     * @return array
479
     */
480
    public function createIndex($noMapping = false)
481
    {
482
        if ($noMapping) {
483
            unset($this->indexSettings['body']['mappings']);
484
        }
485
486
        return $this->getClient()->indices()->create($this->indexSettings);
487
    }
488
489
    /**
490
     * Drops elasticsearch index.
491
     */
492
    public function dropIndex()
493
    {
494
        return $this->getClient()->indices()->delete(['index' => $this->getIndexName()]);
495
    }
496
497
    /**
498
     * Tries to drop and create fresh elasticsearch index.
499
     *
500
     * @param bool $noMapping Determines if mapping should be included.
501
     *
502
     * @return array
503
     */
504
    public function dropAndCreateIndex($noMapping = false)
505
    {
506
        try {
507
            $this->dropIndex();
508
        } catch (\Exception $e) {
509
            // Do nothing, our target is to create new index.
510
        }
511
512
        return $this->createIndex($noMapping);
513
    }
514
515
    /**
516
     * Checks if connection index is already created.
517
     *
518
     * @return bool
519
     */
520
    public function indexExists()
521
    {
522
        return $this->getClient()->indices()->exists(['index' => $this->getIndexName()]);
523
    }
524
525
    /**
526
     * Returns index name this connection is attached to.
527
     *
528
     * @return string
529
     */
530
    public function getIndexName()
531
    {
532
        return $this->indexSettings['index'];
533
    }
534
535
    /**
536
     * Sets index name for this connection.
537
     *
538
     * @param string $name
539
     */
540
    public function setIndexName($name)
541
    {
542
        $this->indexSettings['index'] = $name;
543
    }
544
545
    /**
546
     * Returns mappings of the index for this connection.
547
     *
548
     * @return array
549
     */
550
    public function getIndexMappings()
551
    {
552
        return $this->indexSettings['body']['mappings'];
553
    }
554
555
    /**
556
     * Returns Elasticsearch version number.
557
     *
558
     * @return string
559
     */
560
    public function getVersionNumber()
561
    {
562
        return $this->client->info()['version']['number'];
563
    }
564
565
    /**
566
     * Clears elasticsearch client cache.
567
     */
568
    public function clearCache()
569
    {
570
        $this->getClient()->indices()->clearCache(['index' => $this->getIndexName()]);
571
    }
572
573
    /**
574
     * Returns a single document by ID. Returns NULL if document was not found.
575
     *
576
     * @param string $className Document class name or Elasticsearch type name
577
     * @param string $id        Document ID to find
578
     * @param string $routing   Custom routing for the document
579
     *
580
     * @return object
581
     */
582
    public function find($className, $id, $routing = null)
583
    {
584
        $type = $this->resolveTypeName($className);
585
586
        $params = [
587
            'index' => $this->getIndexName(),
588
            'type' => $type,
589
            'id' => $id,
590
        ];
591
592
        if ($routing) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $routing of type string|null is loosely compared to true; this is ambiguous if the string can be empty. You might want to explicitly use !== null instead.

In PHP, under loose comparison (like ==, or !=, or switch conditions), values of different types might be equal.

For string values, the empty string '' is a special case, in particular the following results might be unexpected:

''   == false // true
''   == null  // true
'ab' == false // false
'ab' == null  // false

// It is often better to use strict comparison
'' === false // false
'' === null  // false
Loading history...
593
            $params['routing'] = $routing;
594
        }
595
596
        try {
597
            $result = $this->getClient()->get($params);
598
        } catch (Missing404Exception $e) {
599
            return null;
600
        }
601
602
        return $this->getConverter()->convertToDocument($result, $this);
0 ignored issues
show
Documentation introduced by
$result is of type callable, but the function expects a array.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
603
    }
604
605
    /**
606
     * Executes given search.
607
     *
608
     * @param array  $types
609
     * @param Search $search
610
     * @param string $resultsType
611
     *
612
     * @return DocumentIterator|RawIterator|array
613
     */
614
    public function execute($types, Search $search, $resultsType = Result::RESULTS_OBJECT)
615
    {
616
        foreach ($types as &$type) {
617
            $type = $this->resolveTypeName($type);
618
        }
619
620
        $results = $this->search($types, $search->toArray(), $search->getQueryParams());
621
622
        return $this->parseResult($results, $resultsType, $search->getScroll());
0 ignored issues
show
Documentation introduced by
$results is of type callable, but the function expects a array.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
623
    }
624
625
    /**
626
     * Parses raw result.
627
     *
628
     * @param array  $raw
629
     * @param string $resultsType
630
     * @param string $scrollDuration
631
     *
632
     * @return DocumentIterator|RawIterator|array
633
     *
634
     * @throws \Exception
635
     */
636
    private function parseResult($raw, $resultsType, $scrollDuration = null)
637
    {
638
        $scrollConfig = [];
639
        if (isset($raw['_scroll_id'])) {
640
            $scrollConfig['_scroll_id'] = $raw['_scroll_id'];
641
            $scrollConfig['duration'] = $scrollDuration;
642
        }
643
644
        switch ($resultsType) {
645
            case Result::RESULTS_OBJECT:
646
                return new DocumentIterator($raw, $this, $scrollConfig);
647
            case Result::RESULTS_ARRAY:
648
                return $this->convertToNormalizedArray($raw);
649
            case Result::RESULTS_RAW:
650
                return $raw;
651
            case Result::RESULTS_RAW_ITERATOR:
652
                return new RawIterator($raw, $this, $scrollConfig);
653
            default:
654
                throw new \Exception('Wrong results type selected');
655
        }
656
    }
657
658
    /**
659
     * Normalizes response array.
660
     *
661
     * @param array $data
662
     *
663
     * @return array
664
     */
665
    private function convertToNormalizedArray($data)
666
    {
667
        if (array_key_exists('_source', $data)) {
668
            return $data['_source'];
669
        }
670
671
        $output = [];
672
673
        if (isset($data['hits']['hits'][0]['_source'])) {
674
            foreach ($data['hits']['hits'] as $item) {
675
                $output[] = $item['_source'];
676
            }
677
        } elseif (isset($data['hits']['hits'][0]['fields'])) {
678
            foreach ($data['hits']['hits'] as $item) {
679
                $output[] = array_map('reset', $item['fields']);
680
            }
681
        }
682
683
        return $output;
684
    }
685
686
    /**
687
     * Fetches next set of results.
688
     *
689
     * @param string $scrollId
690
     * @param string $scrollDuration
691
     * @param string $resultsType
692
     *
693
     * @return AbstractResultsIterator
694
     *
695
     * @throws \Exception
696
     */
697
    public function scroll(
698
        $scrollId,
699
        $scrollDuration = '5m',
700
        $resultsType = Result::RESULTS_OBJECT
701
    ) {
702
        $results = $this->getClient()->scroll(['scroll_id' => $scrollId, 'scroll' => $scrollDuration]);
703
704
        return $this->parseResult($results, $resultsType, $scrollDuration);
0 ignored issues
show
Documentation introduced by
$results is of type callable, but the function expects a array.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
705
    }
706
707
    /**
708
     * Clears scroll.
709
     *
710
     * @param string $scrollId
711
     */
712
    public function clearScroll($scrollId)
713
    {
714
        $this->getClient()->clearScroll(['scroll_id' => $scrollId]);
715
    }
716
717
    /**
718
     * Resolves type name by class name.
719
     *
720
     * @param string $className
721
     *
722
     * @return string
723
     */
724
    private function resolveTypeName($className)
725
    {
726
        if (strpos($className, ':') !== false || strpos($className, '\\') !== false) {
727
            return $this->getMetadataCollector()->getDocumentType($className);
728
        }
729
730
        return $className;
731
    }
732
733
    /**
734
     * Starts and stops an event in the stopwatch
735
     *
736
     * @param string $action   only 'start' and 'stop'
737
     * @param string $name     name of the event
738
     */
739
    private function stopwatch($action, $name)
740
    {
741
        if (isset($this->stopwatch)) {
742
            $this->stopwatch->$action('ongr_es: '.$name, 'ongr_es');
743
        }
744
    }
745
}
746