Completed
Pull Request — master (#615)
by
unknown
02:55
created

Manager::bulk()   C

Complexity

Conditions 10
Paths 11

Size

Total Lines 47
Code Lines 28

Duplication

Lines 0
Ratio 0 %

Importance

Changes 3
Bugs 0 Features 1
Metric Value
c 3
b 0
f 1
dl 0
loc 47
rs 5.1578
cc 10
eloc 28
nc 11
nop 3

How to fix   Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
/*
4
 * This file is part of the ONGR package.
5
 *
6
 * (c) NFQ Technologies UAB <[email protected]>
7
 *
8
 * For the full copyright and license information, please view the LICENSE
9
 * file that was distributed with this source code.
10
 */
11
12
namespace ONGR\ElasticsearchBundle\Service;
13
14
use Elasticsearch\Client;
15
use Elasticsearch\Common\Exceptions\Missing404Exception;
16
use ONGR\ElasticsearchBundle\Event\OperationEvent;
17
use ONGR\ElasticsearchBundle\Event\PreCommitEvent;
18
use ONGR\ElasticsearchBundle\Mapping\MetadataCollector;
19
use ONGR\ElasticsearchBundle\ONGRElasticsearchEvents;
20
use ONGR\ElasticsearchBundle\Result\AbstractResultsIterator;
21
use ONGR\ElasticsearchBundle\Result\Converter;
22
use ONGR\ElasticsearchBundle\Result\DocumentIterator;
23
use ONGR\ElasticsearchBundle\Result\RawIterator;
24
use ONGR\ElasticsearchBundle\Result\Result;
25
use ONGR\ElasticsearchDSL\Search;
26
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
27
28
/**
29
 * Manager class.
30
 */
31
class Manager
32
{
33
    /**
34
     * @var string Manager name
35
     */
36
    private $name;
37
38
    /**
39
     * @var array Manager configuration
40
     */
41
    private $config = [];
42
43
    /**
44
     * @var Client
45
     */
46
    private $client;
47
48
    /**
49
     * @var Converter
50
     */
51
    private $converter;
52
53
    /**
54
     * @var array Container for bulk queries
55
     */
56
    private $bulkQueries = [];
57
58
    /**
59
     * @var array Holder for consistency, refresh and replication parameters
60
     */
61
    private $bulkParams = [];
62
63
    /**
64
     * @var array
65
     */
66
    private $indexSettings;
67
68
    /**
69
     * @var MetadataCollector
70
     */
71
    private $metadataCollector;
72
73
    /**
74
     * After commit to make data available the refresh or flush operation is needed
75
     * so one of those methods has to be defined, the default is refresh.
76
     *
77
     * @var string
78
     */
79
    private $commitMode = 'refresh';
80
81
    /**
82
     * The size that defines after how much document inserts call commit function.
83
     *
84
     * @var int
85
     */
86
    private $bulkCommitSize = 100;
87
88
    /**
89
     * Container to count how many documents was passed to the bulk query.
90
     *
91
     * @var int
92
     */
93
    private $bulkCount = 0;
94
95
    /**
96
     * @var Repository[] Repository local cache
97
     */
98
    private $repositories;
99
100
    /**
101
     * @var EventDispatcherInterface
102
     */
103
    private $eventDispatcher;
104
    
105
    /**
106
     * @param string                   $name              Manager name
107
     * @param array                    $config            Manager configuration
108
     * @param Client                   $client
109
     * @param array                    $indexSettings
110
     * @param MetadataCollector        $metadataCollector
111
     * @param Converter                $converter
112
     * @param EventDispatcherInterface $eventDispatcher
113
     */
114
    public function __construct(
115
        $name,
116
        array $config,
117
        $client,
118
        array $indexSettings,
119
        $metadataCollector,
120
        $converter,
121
        $eventDispatcher
122
    ) {
123
        $this->name = $name;
124
        $this->config = $config;
125
        $this->client = $client;
126
        $this->indexSettings = $indexSettings;
127
        $this->metadataCollector = $metadataCollector;
128
        $this->converter = $converter;
129
        $this->eventDispatcher = $eventDispatcher;
130
    }
131
132
    /**
133
     * Returns Elasticsearch connection.
134
     *
135
     * @return Client
136
     */
137
    public function getClient()
138
    {
139
        return $this->client;
140
    }
141
142
    /**
143
     * @return string
144
     */
145
    public function getName()
146
    {
147
        return $this->name;
148
    }
149
150
    /**
151
     * @return array
152
     */
153
    public function getConfig()
154
    {
155
        return $this->config;
156
    }
157
158
    /**
159
     * Returns repository by document class.
160
     *
161
     * @param string $className FQCN or string in Bundle:Document format
162
     *
163
     * @return Repository
164
     */
165
    public function getRepository($className)
166
    {
167
        if (!is_string($className)) {
168
            throw new \InvalidArgumentException('Document class must be a string.');
169
        }
170
171
        $namespace = $this->getMetadataCollector()->getClassName($className);
172
173
        if (isset($this->repositories[$namespace])) {
174
            return $this->repositories[$namespace];
175
        }
176
177
        $repository = $this->createRepository($namespace);
178
        $this->repositories[$namespace] = $repository;
179
180
        return $repository;
181
    }
182
183
    /**
184
     * @return MetadataCollector
185
     */
186
    public function getMetadataCollector()
187
    {
188
        return $this->metadataCollector;
189
    }
190
191
    /**
192
     * @return Converter
193
     */
194
    public function getConverter()
195
    {
196
        return $this->converter;
197
    }
198
199
    /**
200
     * @return string
201
     */
202
    public function getCommitMode()
203
    {
204
        return $this->commitMode;
205
    }
206
207
    /**
208
     * @param string $commitMode
209
     */
210
    public function setCommitMode($commitMode)
211
    {
212
        if ($commitMode === 'refresh' || $commitMode === 'flush' || $commitMode === 'none') {
213
            $this->commitMode = $commitMode;
214
        } else {
215
            throw new \LogicException('The commit method must be either refresh, flush or none.');
216
        }
217
    }
218
219
    /**
220
     * @return int
221
     */
222
    public function getBulkCommitSize()
223
    {
224
        return $this->bulkCommitSize;
225
    }
226
227
    /**
228
     * @param int $bulkCommitSize
229
     */
230
    public function setBulkCommitSize($bulkCommitSize)
231
    {
232
        $this->bulkCommitSize = $bulkCommitSize;
233
    }
234
235
    /**
236
     * Creates a repository.
237
     *
238
     * @param string $className
239
     *
240
     * @return Repository
241
     */
242
    private function createRepository($className)
243
    {
244
        return new Repository($this, $className);
245
    }
246
247
    /**
248
     * Executes search query in the index.
249
     *
250
     * @param array $types             List of types to search in.
251
     * @param array $query             Query to execute.
252
     * @param array $queryStringParams Query parameters.
253
     *
254
     * @return array
255
     */
256
    public function search(array $types, array $query, array $queryStringParams = [])
257
    {
258
        $params = [];
259
        $params['index'] = $this->getIndexName();
260
        $params['type'] = implode(',', $types);
261
        $params['body'] = $query;
262
263
        if (!empty($queryStringParams)) {
264
            $params = array_merge($queryStringParams, $params);
265
        }
266
267
        return $this->client->search($params);
268
    }
269
270
    /**
271
     * Adds document to next flush.
272
     *
273
     * @param object $document
274
     */
275
    public function persist($document)
276
    {
277
        $documentArray = $this->converter->convertToArray($document);
278
        $type = $this->getMetadataCollector()->getDocumentType(get_class($document));
279
280
        $this->bulk('index', $type, $documentArray);
281
    }
282
283
    /**
284
     * Adds document for removal.
285
     *
286
     * @param object $document
287
     */
288
    public function remove($document)
289
    {
290
        $data = $this->converter->convertToArray($document, [], ['_id']);
291
292
        if (!isset($data['_id'])) {
293
            throw new \LogicException(
294
                'In order to use remove() method document class must have property with @Id annotation.'
295
            );
296
        }
297
298
        $type = $this->getMetadataCollector()->getDocumentType(get_class($document));
299
300
        $this->bulk('delete', $type, ['_id' => $data['_id']]);
301
    }
302
303
    /**
304
     * Flushes elasticsearch index.
305
     *
306
     * @param array $params
307
     *
308
     * @return array
309
     */
310
    public function flush(array $params = [])
311
    {
312
        return $this->client->indices()->flush(array_merge(['index' => $this->getIndexName()], $params));
313
    }
314
315
    /**
316
     * Refreshes elasticsearch index.
317
     *
318
     * @param array $params
319
     *
320
     * @return array
321
     */
322
    public function refresh(array $params = [])
323
    {
324
        return $this->client->indices()->refresh(array_merge(['index' => $this->getIndexName()], $params));
325
    }
326
327
    /**
328
     * Inserts the current query container to the index, used for bulk queries execution.
329
     *
330
     * @param array $params Parameters that will be passed to the flush or refresh queries.
331
     *
332
     * @return null|array
333
     */
334
    public function commit(array $params = [])
335
    {
336
        if (!empty($this->bulkQueries)) {
337
            $bulkQueries = array_merge($this->bulkQueries, $this->bulkParams);
338
339
            $bulkResponse = $this->client->bulk($bulkQueries);
340
            $this->bulkQueries = [];
341
            $this->bulkCount = 0;
342
343
            $this->eventDispatcher->dispatch(
344
                ONGRElasticsearchEvents::PRE_COMMIT,
345
                new PreCommitEvent($this->getCommitMode(), $params)
346
            );
347
            
348
            switch ($this->getCommitMode()) {
349
                case 'flush':
350
                    $this->flush($params);
351
                    break;
352
                case 'refresh':
353
                    $this->refresh($params);
354
                    break;
355
            }
356
357
            return $bulkResponse;
358
        }
359
360
        return null;
361
    }
362
363
    /**
364
     * Adds query to bulk queries container.
365
     *
366
     * @param string       $operation One of: index, update, delete, create.
367
     * @param string|array $type      Elasticsearch type name.
368
     * @param array        $query     DSL to execute.
369
     *
370
     * @throws \InvalidArgumentException
371
     *
372
     * @return null|array
373
     */
374
    public function bulk($operation, $type, array $query)
375
    {
376
        if (!in_array($operation, ['index', 'create', 'update', 'delete'])) {
377
            throw new \InvalidArgumentException('Wrong bulk operation selected');
378
        }
379
380
        $this->bulkQueries['body'][] = [
381
            $operation => array_filter(
382
                [
383
                    '_index' => $this->getIndexName(),
384
                    '_type' => $type,
385
                    '_id' => isset($query['_id']) ? $query['_id'] : null,
386
                    '_ttl' => isset($query['_ttl']) ? $query['_ttl'] : null,
387
                    '_parent' => isset($query['_parent']) ? $query['_parent'] : null,
388
                ]
389
            ),
390
        ];
391
        unset($query['_id'], $query['_ttl'], $query['_parent']);
392
393
        $this->eventDispatcher->dispatch(
394
            constant('ONGR\ElasticsearchBundle\ONGRElasticsearchEvents::PRE_' . strtoupper($operation)),
395
            new OperationEvent($operation, $type, $query)
396
        );
397
        
398
        switch ($operation) {
399
            case 'index':
400
            case 'create':
401
            case 'update':
402
                $this->bulkQueries['body'][] = $query;
403
                break;
404
            case 'delete':
405
                // Body for delete operation is not needed to apply.
406
            default:
407
                // Do nothing.
408
                break;
409
        }
410
411
        // We are using counter because there is to difficult to resolve this from bulkQueries array.
412
        $this->bulkCount++;
413
414
        $response = null;
415
        if ($this->bulkCommitSize === $this->bulkCount) {
416
            $response = $this->commit();
417
        }
418
419
        return $response;
420
    }
421
422
    /**
423
     * Optional setter to change bulk query params.
424
     *
425
     * @param array $params Possible keys:
426
     *                      ['consistency'] = (enum) Explicit write consistency setting for the operation.
427
     *                      ['refresh']     = (boolean) Refresh the index after performing the operation.
428
     *                      ['replication'] = (enum) Explicitly set the replication type.
429
     */
430
    public function setBulkParams(array $params)
431
    {
432
        $this->bulkParams = $params;
433
    }
434
435
    /**
436
     * Creates fresh elasticsearch index.
437
     *
438
     * @param bool $noMapping Determines if mapping should be included.
439
     *
440
     * @return array
441
     */
442
    public function createIndex($noMapping = false)
443
    {
444
        if ($noMapping) {
445
            unset($this->indexSettings['body']['mappings']);
446
        }
447
448
        return $this->getClient()->indices()->create($this->indexSettings);
449
    }
450
451
    /**
452
     * Drops elasticsearch index.
453
     */
454
    public function dropIndex()
455
    {
456
        return $this->getClient()->indices()->delete(['index' => $this->getIndexName()]);
457
    }
458
459
    /**
460
     * Tries to drop and create fresh elasticsearch index.
461
     *
462
     * @param bool $noMapping Determines if mapping should be included.
463
     *
464
     * @return array
465
     */
466
    public function dropAndCreateIndex($noMapping = false)
467
    {
468
        try {
469
            $this->dropIndex();
470
        } catch (\Exception $e) {
471
            // Do nothing, our target is to create new index.
472
        }
473
474
        return $this->createIndex($noMapping);
475
    }
476
477
    /**
478
     * Checks if connection index is already created.
479
     *
480
     * @return bool
481
     */
482
    public function indexExists()
483
    {
484
        return $this->getClient()->indices()->exists(['index' => $this->getIndexName()]);
485
    }
486
487
    /**
488
     * Returns index name this connection is attached to.
489
     *
490
     * @return string
491
     */
492
    public function getIndexName()
493
    {
494
        return $this->indexSettings['index'];
495
    }
496
497
    /**
498
     * Sets index name for this connection.
499
     *
500
     * @param string $name
501
     */
502
    public function setIndexName($name)
503
    {
504
        $this->indexSettings['index'] = $name;
505
    }
506
507
    /**
508
     * Returns Elasticsearch version number.
509
     *
510
     * @return string
511
     */
512
    public function getVersionNumber()
513
    {
514
        return $this->client->info()['version']['number'];
515
    }
516
517
    /**
518
     * Clears elasticsearch client cache.
519
     */
520
    public function clearCache()
521
    {
522
        $this->getClient()->indices()->clearCache(['index' => $this->getIndexName()]);
523
    }
524
525
    /**
526
     * Returns a single document by ID. Returns NULL if document was not found.
527
     *
528
     * @param string $className Document class name or Elasticsearch type name
529
     * @param string $id        Document ID to find
530
     *
531
     * @return object
532
     */
533
    public function find($className, $id)
534
    {
535
        $type = $this->resolveTypeName($className);
536
537
        $params = [
538
            'index' => $this->getIndexName(),
539
            'type' => $type,
540
            'id' => $id,
541
        ];
542
543
        try {
544
            $result = $this->getClient()->get($params);
545
        } catch (Missing404Exception $e) {
546
            return null;
547
        }
548
549
        return $this->getConverter()->convertToDocument($result, $this);
0 ignored issues
show
Documentation introduced by
$result is of type callable, but the function expects a array.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
550
    }
551
552
    /**
553
     * Executes given search.
554
     *
555
     * @param array  $types
556
     * @param Search $search
557
     * @param string $resultsType
558
     *
559
     * @return DocumentIterator|RawIterator|array
560
     */
561
    public function execute($types, Search $search, $resultsType = Result::RESULTS_OBJECT)
562
    {
563
        foreach ($types as &$type) {
564
            $type = $this->resolveTypeName($type);
565
        }
566
567
        $results = $this->search($types, $search->toArray(), $search->getQueryParams());
568
569
        return $this->parseResult($results, $resultsType, $search->getScroll());
0 ignored issues
show
Documentation introduced by
$results is of type callable, but the function expects a array.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
570
    }
571
572
    /**
573
     * Parses raw result.
574
     *
575
     * @param array  $raw
576
     * @param string $resultsType
577
     * @param string $scrollDuration
578
     *
579
     * @return DocumentIterator|RawIterator|array
580
     *
581
     * @throws \Exception
582
     */
583
    private function parseResult($raw, $resultsType, $scrollDuration = null)
584
    {
585
        $scrollConfig = [];
586
        if (isset($raw['_scroll_id'])) {
587
            $scrollConfig['_scroll_id'] = $raw['_scroll_id'];
588
            $scrollConfig['duration'] = $scrollDuration;
589
        }
590
591
        switch ($resultsType) {
592
            case Result::RESULTS_OBJECT:
593
                return new DocumentIterator($raw, $this, $scrollConfig);
594
            case Result::RESULTS_ARRAY:
595
                return $this->convertToNormalizedArray($raw);
596
            case Result::RESULTS_RAW:
597
                return $raw;
598
            case Result::RESULTS_RAW_ITERATOR:
599
                return new RawIterator($raw, $this, $scrollConfig);
600
            default:
601
                throw new \Exception('Wrong results type selected');
602
        }
603
    }
604
605
    /**
606
     * Normalizes response array.
607
     *
608
     * @param array $data
609
     *
610
     * @return array
611
     */
612
    private function convertToNormalizedArray($data)
613
    {
614
        if (array_key_exists('_source', $data)) {
615
            return $data['_source'];
616
        }
617
618
        $output = [];
619
620
        if (isset($data['hits']['hits'][0]['_source'])) {
621
            foreach ($data['hits']['hits'] as $item) {
622
                $output[] = $item['_source'];
623
            }
624
        } elseif (isset($data['hits']['hits'][0]['fields'])) {
625
            foreach ($data['hits']['hits'] as $item) {
626
                $output[] = array_map('reset', $item['fields']);
627
            }
628
        }
629
630
        return $output;
631
    }
632
633
    /**
634
     * Fetches next set of results.
635
     *
636
     * @param string $scrollId
637
     * @param string $scrollDuration
638
     * @param string $resultsType
639
     *
640
     * @return AbstractResultsIterator
641
     *
642
     * @throws \Exception
643
     */
644
    public function scroll(
645
        $scrollId,
646
        $scrollDuration = '5m',
647
        $resultsType = Result::RESULTS_OBJECT
648
    ) {
649
        $results = $this->getClient()->scroll(['scroll_id' => $scrollId, 'scroll' => $scrollDuration]);
650
651
        return $this->parseResult($results, $resultsType, $scrollDuration);
0 ignored issues
show
Documentation introduced by
$results is of type callable, but the function expects a array.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
652
    }
653
654
    /**
655
     * Clears scroll.
656
     *
657
     * @param string $scrollId
658
     */
659
    public function clearScroll($scrollId)
660
    {
661
        $this->getClient()->clearScroll(['scroll_id' => $scrollId]);
662
    }
663
664
    /**
665
     * Resolves type name by class name.
666
     *
667
     * @param string $className
668
     *
669
     * @return string
670
     */
671
    private function resolveTypeName($className)
672
    {
673
        if (strpos($className, ':') !== false || strpos($className, '\\') !== false) {
674
            return $this->getMetadataCollector()->getDocumentType($className);
675
        }
676
677
        return $className;
678
    }
679
}
680