Completed
Pull Request — master (#641)
by
unknown
02:26
created

Manager::bulk()   C

Complexity

Conditions 11
Paths 11

Size

Total Lines 43
Code Lines 26

Duplication

Lines 0
Ratio 0 %

Importance

Changes 3
Bugs 0 Features 1
Metric Value
c 3
b 0
f 1
dl 0
loc 43
rs 5.2653
cc 11
eloc 26
nc 11
nop 3

How to fix   Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
/*
4
 * This file is part of the ONGR package.
5
 *
6
 * (c) NFQ Technologies UAB <[email protected]>
7
 *
8
 * For the full copyright and license information, please view the LICENSE
9
 * file that was distributed with this source code.
10
 */
11
12
namespace ONGR\ElasticsearchBundle\Service;
13
14
use Elasticsearch\Client;
15
use Elasticsearch\Common\Exceptions\Missing404Exception;
16
use ONGR\ElasticsearchBundle\Mapping\MetadataCollector;
17
use ONGR\ElasticsearchBundle\Result\AbstractResultsIterator;
18
use ONGR\ElasticsearchBundle\Result\Converter;
19
use ONGR\ElasticsearchBundle\Result\DocumentIterator;
20
use ONGR\ElasticsearchBundle\Result\RawIterator;
21
use ONGR\ElasticsearchBundle\Result\Result;
22
use ONGR\ElasticsearchDSL\Search;
23
24
/**
25
 * Manager class.
26
 */
27
class Manager
28
{
29
    /**
30
     * @var string Manager name
31
     */
32
    private $name;
33
34
    /**
35
     * @var array Manager configuration
36
     */
37
    private $config = [];
38
39
    /**
40
     * @var Client
41
     */
42
    private $client;
43
44
    /**
45
     * @var Converter
46
     */
47
    private $converter;
48
49
    /**
50
     * @var array Container for bulk queries
51
     */
52
    private $bulkQueries = [];
53
54
    /**
55
     * @var array Holder for consistency, refresh and replication parameters
56
     */
57
    private $bulkParams = [];
58
59
    /**
60
     * @var array
61
     */
62
    private $indexSettings;
63
64
    /**
65
     * @var MetadataCollector
66
     */
67
    private $metadataCollector;
68
69
    /**
70
     * After commit to make data available the refresh or flush operation is needed
71
     * so one of those methods has to be defined, the default is refresh.
72
     *
73
     * @var string
74
     */
75
    private $commitMode = 'refresh';
76
77
    /**
78
     * The size that defines after how much document inserts call commit function.
79
     *
80
     * @var int
81
     */
82
    private $bulkCommitSize = 100;
83
84
    /**
85
     * Container to count how many documents was passed to the bulk query.
86
     *
87
     * @var int
88
     */
89
    private $bulkCount = 0;
90
91
    /**
92
     * @var Repository[] Repository local cache
93
     */
94
    private $repositories;
95
96
    /**
97
     * @param string            $name              Manager name
98
     * @param array             $config            Manager configuration
99
     * @param Client            $client
100
     * @param array             $indexSettings
101
     * @param MetadataCollector $metadataCollector
102
     * @param Converter         $converter
103
     */
104
    public function __construct(
105
        $name,
106
        array $config,
107
        $client,
108
        array $indexSettings,
109
        $metadataCollector,
110
        $converter
111
    ) {
112
        $this->name = $name;
113
        $this->config = $config;
114
        $this->client = $client;
115
        $this->indexSettings = $indexSettings;
116
        $this->metadataCollector = $metadataCollector;
117
        $this->converter = $converter;
118
    }
119
120
    /**
121
     * Returns Elasticsearch connection.
122
     *
123
     * @return Client
124
     */
125
    public function getClient()
126
    {
127
        return $this->client;
128
    }
129
130
    /**
131
     * @return string
132
     */
133
    public function getName()
134
    {
135
        return $this->name;
136
    }
137
138
    /**
139
     * @return array
140
     */
141
    public function getConfig()
142
    {
143
        return $this->config;
144
    }
145
146
    /**
147
     * Returns repository by document class.
148
     *
149
     * @param string $className FQCN or string in Bundle:Document format
150
     *
151
     * @return Repository
152
     */
153
    public function getRepository($className)
154
    {
155
        if (!is_string($className)) {
156
            throw new \InvalidArgumentException('Document class must be a string.');
157
        }
158
159
        $namespace = $this->getMetadataCollector()->getClassName($className);
160
161
        if (isset($this->repositories[$namespace])) {
162
            return $this->repositories[$namespace];
163
        }
164
165
        $repository = $this->createRepository($namespace);
166
        $this->repositories[$namespace] = $repository;
167
168
        return $repository;
169
    }
170
171
    /**
172
     * @return MetadataCollector
173
     */
174
    public function getMetadataCollector()
175
    {
176
        return $this->metadataCollector;
177
    }
178
179
    /**
180
     * @return Converter
181
     */
182
    public function getConverter()
183
    {
184
        return $this->converter;
185
    }
186
187
    /**
188
     * @return string
189
     */
190
    public function getCommitMode()
191
    {
192
        return $this->commitMode;
193
    }
194
195
    /**
196
     * @param string $commitMode
197
     */
198
    public function setCommitMode($commitMode)
199
    {
200
        if ($commitMode === 'refresh' || $commitMode === 'flush' || $commitMode === 'none') {
201
            $this->commitMode = $commitMode;
202
        } else {
203
            throw new \LogicException('The commit method must be either refresh, flush or none.');
204
        }
205
    }
206
207
    /**
208
     * @return int
209
     */
210
    public function getBulkCommitSize()
211
    {
212
        return $this->bulkCommitSize;
213
    }
214
215
    /**
216
     * @param int $bulkCommitSize
217
     */
218
    public function setBulkCommitSize($bulkCommitSize)
219
    {
220
        $this->bulkCommitSize = $bulkCommitSize;
221
    }
222
223
    /**
224
     * Creates a repository.
225
     *
226
     * @param string $className
227
     *
228
     * @return Repository
229
     */
230
    private function createRepository($className)
231
    {
232
        return new Repository($this, $className);
233
    }
234
235
    /**
236
     * Executes search query in the index.
237
     *
238
     * @param array $types             List of types to search in.
239
     * @param array $query             Query to execute.
240
     * @param array $queryStringParams Query parameters.
241
     *
242
     * @return array
243
     */
244
    public function search(array $types, array $query, array $queryStringParams = [])
245
    {
246
        $params = [];
247
        $params['index'] = $this->getIndexName();
248
        $params['type'] = implode(',', $types);
249
        $params['body'] = $query;
250
251
        if (!empty($queryStringParams)) {
252
            $params = array_merge($queryStringParams, $params);
253
        }
254
255
        return $this->client->search($params);
256
    }
257
258
    /**
259
     * Adds document to next flush.
260
     *
261
     * @param object $document
262
     */
263
    public function persist($document)
264
    {
265
        $documentArray = $this->converter->convertToArray($document);
266
        $type = $this->getMetadataCollector()->getDocumentType(get_class($document));
267
268
        $this->bulk('index', $type, $documentArray);
269
    }
270
271
    /**
272
     * Adds document for removal.
273
     *
274
     * @param object $document
275
     */
276
    public function remove($document)
277
    {
278
        $data = $this->converter->convertToArray($document, [], ['_id', '_routing']);
279
280
        if (!isset($data['_id'])) {
281
            throw new \LogicException(
282
                'In order to use remove() method document class must have property with @Id annotation.'
283
            );
284
        }
285
286
        $type = $this->getMetadataCollector()->getDocumentType(get_class($document));
287
288
        $this->bulk('delete', $type, $data);
289
    }
290
291
    /**
292
     * Flushes elasticsearch index.
293
     *
294
     * @param array $params
295
     *
296
     * @return array
297
     */
298
    public function flush(array $params = [])
299
    {
300
        return $this->client->indices()->flush(array_merge(['index' => $this->getIndexName()], $params));
301
    }
302
303
    /**
304
     * Refreshes elasticsearch index.
305
     *
306
     * @param array $params
307
     *
308
     * @return array
309
     */
310
    public function refresh(array $params = [])
311
    {
312
        return $this->client->indices()->refresh(array_merge(['index' => $this->getIndexName()], $params));
313
    }
314
315
    /**
316
     * Inserts the current query container to the index, used for bulk queries execution.
317
     *
318
     * @param array $params Parameters that will be passed to the flush or refresh queries.
319
     *
320
     * @return null|array
321
     */
322
    public function commit(array $params = [])
323
    {
324
        if (!empty($this->bulkQueries)) {
325
            $bulkQueries = array_merge($this->bulkQueries, $this->bulkParams);
326
327
            $bulkResponse = $this->client->bulk($bulkQueries);
328
            $this->bulkQueries = [];
329
            $this->bulkCount = 0;
330
331
            switch ($this->getCommitMode()) {
332
                case 'flush':
333
                    $this->flush($params);
334
                    break;
335
                case 'refresh':
336
                    $this->refresh($params);
337
                    break;
338
            }
339
340
            return $bulkResponse;
341
        }
342
343
        return null;
344
    }
345
346
    /**
347
     * Adds query to bulk queries container.
348
     *
349
     * @param string       $operation One of: index, update, delete, create.
350
     * @param string|array $type      Elasticsearch type name.
351
     * @param array        $query     DSL to execute.
352
     *
353
     * @throws \InvalidArgumentException
354
     *
355
     * @return null|array
356
     */
357
    public function bulk($operation, $type, array $query)
358
    {
359
        if (!in_array($operation, ['index', 'create', 'update', 'delete'])) {
360
            throw new \InvalidArgumentException('Wrong bulk operation selected');
361
        }
362
363
        $this->bulkQueries['body'][] = [
364
            $operation => array_filter(
365
                [
366
                    '_index' => $this->getIndexName(),
367
                    '_type' => $type,
368
                    '_id' => isset($query['_id']) ? $query['_id'] : null,
369
                    '_ttl' => isset($query['_ttl']) ? $query['_ttl'] : null,
370
                    '_routing' => isset($query['_routing']) ? $query['_routing'] : null,
371
                    '_parent' => isset($query['_parent']) ? $query['_parent'] : null,
372
                ]
373
            ),
374
        ];
375
        unset($query['_id'], $query['_ttl'], $query['_parent'], $query['_routing']);
376
377
        switch ($operation) {
378
            case 'index':
379
            case 'create':
380
            case 'update':
381
                $this->bulkQueries['body'][] = $query;
382
                break;
383
            case 'delete':
384
                // Body for delete operation is not needed to apply.
385
            default:
386
                // Do nothing.
387
                break;
388
        }
389
390
        // We are using counter because there is to difficult to resolve this from bulkQueries array.
391
        $this->bulkCount++;
392
393
        $response = null;
394
        if ($this->bulkCommitSize === $this->bulkCount) {
395
            $response = $this->commit();
396
        }
397
398
        return $response;
399
    }
400
401
    /**
402
     * Optional setter to change bulk query params.
403
     *
404
     * @param array $params Possible keys:
405
     *                      ['consistency'] = (enum) Explicit write consistency setting for the operation.
406
     *                      ['refresh']     = (boolean) Refresh the index after performing the operation.
407
     *                      ['replication'] = (enum) Explicitly set the replication type.
408
     */
409
    public function setBulkParams(array $params)
410
    {
411
        $this->bulkParams = $params;
412
    }
413
414
    /**
415
     * Creates fresh elasticsearch index.
416
     *
417
     * @param bool $noMapping Determines if mapping should be included.
418
     *
419
     * @return array
420
     */
421
    public function createIndex($noMapping = false)
422
    {
423
        if ($noMapping) {
424
            unset($this->indexSettings['body']['mappings']);
425
        }
426
427
        return $this->getClient()->indices()->create($this->indexSettings);
428
    }
429
430
    /**
431
     * Drops elasticsearch index.
432
     */
433
    public function dropIndex()
434
    {
435
        return $this->getClient()->indices()->delete(['index' => $this->getIndexName()]);
436
    }
437
438
    /**
439
     * Tries to drop and create fresh elasticsearch index.
440
     *
441
     * @param bool $noMapping Determines if mapping should be included.
442
     *
443
     * @return array
444
     */
445
    public function dropAndCreateIndex($noMapping = false)
446
    {
447
        try {
448
            $this->dropIndex();
449
        } catch (\Exception $e) {
450
            // Do nothing, our target is to create new index.
451
        }
452
453
        return $this->createIndex($noMapping);
454
    }
455
456
    /**
457
     * Checks if connection index is already created.
458
     *
459
     * @return bool
460
     */
461
    public function indexExists()
462
    {
463
        return $this->getClient()->indices()->exists(['index' => $this->getIndexName()]);
464
    }
465
466
    /**
467
     * Returns index name this connection is attached to.
468
     *
469
     * @return string
470
     */
471
    public function getIndexName()
472
    {
473
        return $this->indexSettings['index'];
474
    }
475
476
    /**
477
     * Sets index name for this connection.
478
     *
479
     * @param string $name
480
     */
481
    public function setIndexName($name)
482
    {
483
        $this->indexSettings['index'] = $name;
484
    }
485
486
    /**
487
     * Returns Elasticsearch version number.
488
     *
489
     * @return string
490
     */
491
    public function getVersionNumber()
492
    {
493
        return $this->client->info()['version']['number'];
494
    }
495
496
    /**
497
     * Clears elasticsearch client cache.
498
     */
499
    public function clearCache()
500
    {
501
        $this->getClient()->indices()->clearCache(['index' => $this->getIndexName()]);
502
    }
503
504
    /**
505
     * Returns a single document by ID. Returns NULL if document was not found.
506
     *
507
     * @param string $className Document class name or Elasticsearch type name
508
     * @param string $id        Document ID to find
509
     * @param string $routing   Custom routing for the document
510
     *
511
     * @return object
512
     */
513
    public function find($className, $id, $routing = null)
514
    {
515
        $type = $this->resolveTypeName($className);
516
517
        $params = [
518
            'index' => $this->getIndexName(),
519
            'type' => $type,
520
            'id' => $id,
521
        ];
522
523
        if ($routing) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $routing of type string|null is loosely compared to true; this is ambiguous if the string can be empty. You might want to explicitly use !== null instead.

In PHP, under loose comparison (like ==, or !=, or switch conditions), values of different types might be equal.

For string values, the empty string '' is a special case, in particular the following results might be unexpected:

''   == false // true
''   == null  // true
'ab' == false // false
'ab' == null  // false

// It is often better to use strict comparison
'' === false // false
'' === null  // false
Loading history...
524
            $params['routing'] = $routing;
525
        }
526
527
        try {
528
            $result = $this->getClient()->get($params);
529
        } catch (Missing404Exception $e) {
530
            return null;
531
        }
532
533
        return $this->getConverter()->convertToDocument($result, $this);
0 ignored issues
show
Documentation introduced by
$result is of type callable, but the function expects a array.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
534
    }
535
536
    /**
537
     * Executes given search.
538
     *
539
     * @param array  $types
540
     * @param Search $search
541
     * @param string $resultsType
542
     *
543
     * @return DocumentIterator|RawIterator|array
544
     */
545
    public function execute($types, Search $search, $resultsType = Result::RESULTS_OBJECT)
546
    {
547
        foreach ($types as &$type) {
548
            $type = $this->resolveTypeName($type);
549
        }
550
551
        $results = $this->search($types, $search->toArray(), $search->getQueryParams());
552
553
        return $this->parseResult($results, $resultsType, $search->getScroll());
0 ignored issues
show
Documentation introduced by
$results is of type callable, but the function expects a array.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
554
    }
555
556
    /**
557
     * Parses raw result.
558
     *
559
     * @param array  $raw
560
     * @param string $resultsType
561
     * @param string $scrollDuration
562
     *
563
     * @return DocumentIterator|RawIterator|array
564
     *
565
     * @throws \Exception
566
     */
567
    private function parseResult($raw, $resultsType, $scrollDuration = null)
568
    {
569
        $scrollConfig = [];
570
        if (isset($raw['_scroll_id'])) {
571
            $scrollConfig['_scroll_id'] = $raw['_scroll_id'];
572
            $scrollConfig['duration'] = $scrollDuration;
573
        }
574
575
        switch ($resultsType) {
576
            case Result::RESULTS_OBJECT:
577
                return new DocumentIterator($raw, $this, $scrollConfig);
578
            case Result::RESULTS_ARRAY:
579
                return $this->convertToNormalizedArray($raw);
580
            case Result::RESULTS_RAW:
581
                return $raw;
582
            case Result::RESULTS_RAW_ITERATOR:
583
                return new RawIterator($raw, $this, $scrollConfig);
584
            default:
585
                throw new \Exception('Wrong results type selected');
586
        }
587
    }
588
589
    /**
590
     * Normalizes response array.
591
     *
592
     * @param array $data
593
     *
594
     * @return array
595
     */
596
    private function convertToNormalizedArray($data)
597
    {
598
        if (array_key_exists('_source', $data)) {
599
            return $data['_source'];
600
        }
601
602
        $output = [];
603
604
        if (isset($data['hits']['hits'][0]['_source'])) {
605
            foreach ($data['hits']['hits'] as $item) {
606
                $output[] = $item['_source'];
607
            }
608
        } elseif (isset($data['hits']['hits'][0]['fields'])) {
609
            foreach ($data['hits']['hits'] as $item) {
610
                $output[] = array_map('reset', $item['fields']);
611
            }
612
        }
613
614
        return $output;
615
    }
616
617
    /**
618
     * Fetches next set of results.
619
     *
620
     * @param string $scrollId
621
     * @param string $scrollDuration
622
     * @param string $resultsType
623
     *
624
     * @return AbstractResultsIterator
625
     *
626
     * @throws \Exception
627
     */
628
    public function scroll(
629
        $scrollId,
630
        $scrollDuration = '5m',
631
        $resultsType = Result::RESULTS_OBJECT
632
    ) {
633
        $results = $this->getClient()->scroll(['scroll_id' => $scrollId, 'scroll' => $scrollDuration]);
634
635
        return $this->parseResult($results, $resultsType, $scrollDuration);
0 ignored issues
show
Documentation introduced by
$results is of type callable, but the function expects a array.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
636
    }
637
638
    /**
639
     * Clears scroll.
640
     *
641
     * @param string $scrollId
642
     */
643
    public function clearScroll($scrollId)
644
    {
645
        $this->getClient()->clearScroll(['scroll_id' => $scrollId]);
646
    }
647
648
    /**
649
     * Resolves type name by class name.
650
     *
651
     * @param string $className
652
     *
653
     * @return string
654
     */
655
    private function resolveTypeName($className)
656
    {
657
        if (strpos($className, ':') !== false || strpos($className, '\\') !== false) {
658
            return $this->getMetadataCollector()->getDocumentType($className);
659
        }
660
661
        return $className;
662
    }
663
}
664