Completed
Pull Request — master (#514)
by Mantas
03:31
created

Manager::updateMapping()   C

Complexity

Conditions 8
Paths 19

Size

Total Lines 40
Code Lines 26

Duplication

Lines 0
Ratio 0 %

Importance

Changes 3
Bugs 1 Features 1
Metric Value
c 3
b 1
f 1
dl 0
loc 40
rs 5.3846
cc 8
eloc 26
nc 19
nop 2
1
<?php
2
3
/*
4
 * This file is part of the ONGR package.
5
 *
6
 * (c) NFQ Technologies UAB <[email protected]>
7
 *
8
 * For the full copyright and license information, please view the LICENSE
9
 * file that was distributed with this source code.
10
 */
11
12
namespace ONGR\ElasticsearchBundle\Service;
13
14
use Elasticsearch\Client;
15
use Elasticsearch\Common\Exceptions\Forbidden403Exception;
16
use Elasticsearch\Common\Exceptions\Missing404Exception;
17
use ONGR\ElasticsearchBundle\Mapping\MetadataCollector;
18
use ONGR\ElasticsearchBundle\Result\AbstractResultsIterator;
19
use ONGR\ElasticsearchBundle\Result\Converter;
20
use ONGR\ElasticsearchBundle\Result\DocumentIterator;
21
use ONGR\ElasticsearchBundle\Result\RawIterator;
22
use ONGR\ElasticsearchBundle\Result\Result;
23
use ONGR\ElasticsearchDSL\Search;
24
25
/**
26
 * Manager class.
27
 */
28
class Manager
29
{
30
    /**
31
     * @var string Manager name
32
     */
33
    private $name;
34
35
    /**
36
     * @var array Manager configuration
37
     */
38
    private $config = [];
39
40
    /**
41
     * @var Client
42
     */
43
    private $client;
44
45
    /**
46
     * @var Converter
47
     */
48
    private $converter;
49
50
    /**
51
     * @var bool
52
     */
53
    private $readOnly;
54
55
    /**
56
     * @var array Container for bulk queries
57
     */
58
    private $bulkQueries = [];
59
60
    /**
61
     * @var array Holder for consistency, refresh and replication parameters
62
     */
63
    private $bulkParams = [];
64
65
    /**
66
     * @var array
67
     */
68
    private $indexSettings;
69
70
    /**
71
     * @var MetadataCollector
72
     */
73
    private $metadataCollector;
74
75
    /**
76
     * After commit to make data available the refresh or flush operation is needed
77
     * so one of those methods has to be defined, the default is refresh.
78
     *
79
     * @var string
80
     */
81
    private $commitMode = 'refresh';
82
83
    /**
84
     * The size that defines after how much document inserts call commit function.
85
     *
86
     * @var int
87
     */
88
    private $bulkCommitSize = 100;
89
90
    /**
91
     * Container to count how many documents was passed to the bulk query.
92
     *
93
     * @var int
94
     */
95
    private $bulkCount = 0;
96
97
    /**
98
     * @var Repository[] Repository local cache
99
     */
100
    private $repositories;
101
102
    /**
103
     * @param string            $name              Manager name
104
     * @param array             $config            Manager configuration
105
     * @param Client            $client
106
     * @param array             $indexSettings
107
     * @param MetadataCollector $metadataCollector
108
     * @param Converter         $converter
109
     */
110
    public function __construct(
111
        $name,
112
        array $config,
113
        $client,
114
        array $indexSettings,
115
        $metadataCollector,
116
        $converter
117
    ) {
118
        $this->name = $name;
119
        $this->config = $config;
120
        $this->client = $client;
121
        $this->indexSettings = $indexSettings;
122
        $this->metadataCollector = $metadataCollector;
123
        $this->converter = $converter;
124
125
        $this->setReadOnly($config['readonly']);
126
    }
127
128
    /**
129
     * Returns Elasticsearch connection.
130
     *
131
     * @return Client
132
     */
133
    public function getClient()
134
    {
135
        return $this->client;
136
    }
137
138
    /**
139
     * @return string
140
     */
141
    public function getName()
142
    {
143
        return $this->name;
144
    }
145
146
    /**
147
     * @return array
148
     */
149
    public function getConfig()
150
    {
151
        return $this->config;
152
    }
153
154
    /**
155
     * Returns repository by document class.
156
     *
157
     * @param string $className FQCN or string in Bundle:Document format
158
     *
159
     * @return Repository
160
     */
161
    public function getRepository($className)
162
    {
163
        if (!is_string($className)) {
164
            throw new \InvalidArgumentException('Document class must be a string.');
165
        }
166
167
        $namespace = $this->getMetadataCollector()->getClassName($className);
168
169
        if (isset($this->repositories[$namespace])) {
170
            return $this->repositories[$namespace];
171
        }
172
173
        $repository = $this->createRepository($namespace);
174
        $this->repositories[$namespace] = $repository;
175
176
        return $repository;
177
    }
178
179
    /**
180
     * @return MetadataCollector
181
     */
182
    public function getMetadataCollector()
183
    {
184
        return $this->metadataCollector;
185
    }
186
187
    /**
188
     * @return Converter
189
     */
190
    public function getConverter()
191
    {
192
        return $this->converter;
193
    }
194
195
    /**
196
     * @return string
197
     */
198
    public function getCommitMode()
199
    {
200
        return $this->commitMode;
201
    }
202
203
    /**
204
     * @param string $commitMode
205
     */
206
    public function setCommitMode($commitMode)
207
    {
208
        if ($commitMode === 'refresh' || $commitMode === 'flush') {
209
            $this->commitMode = $commitMode;
210
        } else {
211
            throw new \LogicException('The commit method must be either refresh or flush.');
212
        }
213
    }
214
215
    /**
216
     * @return int
217
     */
218
    public function getBulkCommitSize()
219
    {
220
        return $this->bulkCommitSize;
221
    }
222
223
    /**
224
     * @param int $bulkCommitSize
225
     */
226
    public function setBulkCommitSize($bulkCommitSize)
227
    {
228
        $this->bulkCommitSize = $bulkCommitSize;
229
    }
230
231
    /**
232
     * Creates a repository.
233
     *
234
     * @param string $className
235
     *
236
     * @return Repository
237
     */
238
    private function createRepository($className)
239
    {
240
        return new Repository($this, $className);
241
    }
242
243
    /**
244
     * Executes search query in the index.
245
     *
246
     * @param array $types             List of types to search in.
247
     * @param array $query             Query to execute.
248
     * @param array $queryStringParams Query parameters.
249
     *
250
     * @return array
251
     */
252
    public function search(array $types, array $query, array $queryStringParams = [])
253
    {
254
        $params = [];
255
        $params['index'] = $this->getIndexName();
256
        $params['type'] = implode(',', $types);
257
        $params['body'] = $query;
258
259
        if (!empty($queryStringParams)) {
260
            $params = array_merge($queryStringParams, $params);
261
        }
262
263
        return $this->client->search($params);
264
    }
265
266
    /**
267
     * Adds document to next flush.
268
     *
269
     * @param object $document
270
     */
271
    public function persist($document)
272
    {
273
        $documentArray = $this->converter->convertToArray($document);
274
        $typeMapping = $this->getMetadataCollector()->getDocumentMapping($document);
275
276
        $this->bulk('index', $typeMapping['type'], $documentArray);
277
    }
278
279
    /**
280
     * Flushes elasticsearch index.
281
     *
282
     * @param array $params
283
     *
284
     * @return array
285
     */
286
    public function flush(array $params = [])
287
    {
288
        return $this->client->indices()->flush($params);
289
    }
290
291
    /**
292
     * Refreshes elasticsearch index.
293
     *
294
     * @param array $params
295
     *
296
     * @return array
297
     */
298
    public function refresh(array $params = [])
299
    {
300
        return $this->client->indices()->refresh($params);
301
    }
302
303
    /**
304
     * Inserts the current query container to the index, used for bulk queries execution.
305
     *
306
     * @param array $params Parameters that will be passed to the flush or refresh queries.
307
     *
308
     * @return null|array
309
     */
310
    public function commit(array $params = [])
311
    {
312
        $this->isReadOnly('Commit');
313
314
        if (!empty($this->bulkQueries)) {
315
            $bulkQueries = array_merge($this->bulkQueries, $this->bulkParams);
316
            $this->bulkQueries = [];
317
318
            $bulkResponse = $this->client->bulk($bulkQueries);
319
320
            switch ($this->getCommitMode()) {
321
                case 'flush':
322
                    $this->flush($params);
323
                    break;
324
                case 'refresh':
325
                default:
326
                    $this->refresh($params);
327
                    break;
328
            }
329
330
            return $bulkResponse;
331
        }
332
333
        return null;
334
    }
335
336
    /**
337
     * Adds query to bulk queries container.
338
     *
339
     * @param string       $operation One of: index, update, delete, create.
340
     * @param string|array $type      Elasticsearch type name.
341
     * @param array        $query     DSL to execute.
342
     *
343
     * @throws \InvalidArgumentException
344
     */
345
    public function bulk($operation, $type, array $query)
346
    {
347
        $this->isReadOnly('Bulk');
348
349
        if (!in_array($operation, ['index', 'create', 'update', 'delete'])) {
350
            throw new \InvalidArgumentException('Wrong bulk operation selected');
351
        }
352
353
        $this->bulkQueries['body'][] = [
354
            $operation => array_filter(
355
                [
356
                    '_index' => $this->getIndexName(),
357
                    '_type' => $type,
358
                    '_id' => isset($query['_id']) ? $query['_id'] : null,
359
                    '_ttl' => isset($query['_ttl']) ? $query['_ttl'] : null,
360
                    '_parent' => isset($query['_parent']) ? $query['_parent'] : null,
361
                ]
362
            ),
363
        ];
364
        unset($query['_id'], $query['_ttl'], $query['_parent']);
365
366
        switch ($operation) {
367
            case 'index':
368
            case 'create':
369
            case 'update':
370
                $this->bulkQueries['body'][] = $query;
371
                break;
372
            case 'delete':
373
                // Body for delete operation is not needed to apply.
374
            default:
375
                // Do nothing.
376
                break;
377
        }
378
379
        // We are using counter because there is to difficult to resolve this from bulkQueries array.
380
        $this->bulkCount++;
381
382
        if ($this->bulkCommitSize === $this->bulkCount) {
383
            $this->commit();
384
            $this->bulkCount = 0;
385
        }
386
    }
387
388
    /**
389
     * Optional setter to change bulk query params.
390
     *
391
     * @param array $params Possible keys:
392
     *                      ['consistency'] = (enum) Explicit write consistency setting for the operation.
393
     *                      ['refresh']     = (boolean) Refresh the index after performing the operation.
394
     *                      ['replication'] = (enum) Explicitly set the replication type.
395
     */
396
    public function setBulkParams(array $params)
397
    {
398
        $this->bulkParams = $params;
399
    }
400
401
    /**
402
     * Creates fresh elasticsearch index.
403
     *
404
     * @param bool $noMapping Determines if mapping should be included.
405
     *
406
     * @return array
407
     */
408
    public function createIndex($noMapping = false)
409
    {
410
        $this->isReadOnly('Create index');
411
412
        if ($noMapping) {
413
            unset($this->indexSettings['body']['mappings']);
414
        }
415
416
        return $this->getClient()->indices()->create($this->indexSettings);
417
    }
418
419
    /**
420
     * Drops elasticsearch index.
421
     */
422
    public function dropIndex()
423
    {
424
        $this->isReadOnly('Drop index');
425
426
        return $this->getClient()->indices()->delete(['index' => $this->getIndexName()]);
427
    }
428
429
    /**
430
     * Tries to drop and create fresh elasticsearch index.
431
     *
432
     * @param bool $noMapping Determines if mapping should be included.
433
     *
434
     * @return array
435
     */
436
    public function dropAndCreateIndex($noMapping = false)
437
    {
438
        try {
439
            $this->dropIndex();
440
        } catch (\Exception $e) {
441
            // Do nothing, our target is to create new index.
442
        }
443
444
        return $this->createIndex($noMapping);
445
    }
446
447
    /**
448
     * Puts mapping into elasticsearch client.
449
     *
450
     * @param array $types           Specific types to put.
451
     * @param bool  $ignoreConflicts Ignore elasticsearch merge conflicts.
452
     */
453
    public function updateMapping(array $types = [], $ignoreConflicts = true)
454
    {
455
        $this->isReadOnly('Mapping update');
456
        $params['index'] = $this->getIndexName();
0 ignored issues
show
Coding Style Comprehensibility introduced by
$params was never initialized. Although not strictly required by PHP, it is generally a good practice to add $params = array(); before regardless.

Adding an explicit array definition is generally preferable to implicit array definition as it guarantees a stable state of the code.

Let’s take a look at an example:

foreach ($collection as $item) {
    $myArray['foo'] = $item->getFoo();

    if ($item->hasBar()) {
        $myArray['bar'] = $item->getBar();
    }

    // do something with $myArray
}

As you can see in this example, the array $myArray is initialized the first time when the foreach loop is entered. You can also see that the value of the bar key is only written conditionally; thus, its value might result from a previous iteration.

This might or might not be intended. To make your intention clear, your code more readible and to avoid accidental bugs, we recommend to add an explicit initialization $myArray = array() either outside or inside the foreach loop.

Loading history...
457
458
        if (empty($types)) {
459
            $map = $this->getConfig()['mappings'];
460
            foreach ($map as $bundle) {
461
                if (strpos($bundle, ':')) {
462
                    $types[] = $bundle;
463
                } else {
464
                    $bundleMappings = $this->getMetadataCollector()->getMappings([$bundle]);
465
                    foreach ($bundleMappings as $document) {
466
                        $types[] = $document['bundle'].':'.$document['class'];
467
                    }
468
                }
469
            }
470
        }
471
472
        foreach ($types as $type) {
473
            $mapping = $this->getMetadataCollector()->getClientMapping([$type]);
474
475
            if ($mapping === null) {
476
                throw new \LogicException(sprintf('Mapping for type "%s" was not found.', $type));
477
            }
478
479
            try {
480
                $type = $this->getMetadataCollector()->getDocumentType($type);
481
                $params['type'] = $type;
482
                $params['body'] = $mapping;
483
                $params['ignore_conflicts'] = $ignoreConflicts;
484
                $this->client->indices()->putMapping(array_filter($params));
485
            } catch (\Exception $e) {
486
                throw new \LogicException(
487
                    'Only the documents[] can be passed to the type update command. ' .
488
                    'Maybe you added only a bundle. Please check if a document is mapped in the manager.'
489
                );
490
            }
491
        }
492
    }
493
494
    /**
495
     * Checks if connection index is already created.
496
     *
497
     * @return bool
498
     */
499
    public function indexExists()
500
    {
501
        return $this->getClient()->indices()->exists(['index' => $this->getIndexName()]);
502
    }
503
504
    /**
505
     * Returns index name this connection is attached to.
506
     *
507
     * @return string
508
     */
509
    public function getIndexName()
510
    {
511
        return $this->indexSettings['index'];
512
    }
513
514
    /**
515
     * Sets index name for this connection.
516
     *
517
     * @param string $name
518
     */
519
    public function setIndexName($name)
520
    {
521
        $this->indexSettings['index'] = $name;
522
    }
523
524
    /**
525
     * Returns Elasticsearch version number.
526
     *
527
     * @return string
528
     */
529
    public function getVersionNumber()
530
    {
531
        return $this->client->info()['version']['number'];
532
    }
533
534
    /**
535
     * Clears elasticsearch client cache.
536
     */
537
    public function clearCache()
538
    {
539
        $this->isReadOnly('Clear cache');
540
541
        $this->getClient()->indices()->clearCache(['index' => $this->getIndexName()]);
542
    }
543
544
    /**
545
     * Set connection to read only state.
546
     *
547
     * @param bool $readOnly
548
     */
549
    public function setReadOnly($readOnly)
550
    {
551
        $this->readOnly = $readOnly;
552
    }
553
554
    /**
555
     * Checks if connection is read only.
556
     *
557
     * @param string $message Error message.
558
     *
559
     * @throws Forbidden403Exception
560
     */
561
    public function isReadOnly($message = '')
562
    {
563
        if ($this->readOnly) {
564
            throw new Forbidden403Exception("Manager is readonly! {$message} operation is not permitted.");
565
        }
566
    }
567
568
    /**
569
     * Returns a single document by ID. Returns NULL if document was not found.
570
     *
571
     * @param string $className Document class name or Elasticsearch type name
572
     * @param string $id        Document ID to find
573
     *
574
     * @return object
575
     */
576
    public function find($className, $id)
577
    {
578
        $type = $this->resolveTypeName($className);
579
580
        $params = [
581
            'index' => $this->getIndexName(),
582
            'type' => $type,
583
            'id' => $id,
584
        ];
585
586
        try {
587
            $result = $this->getClient()->get($params);
588
        } catch (Missing404Exception $e) {
589
            return null;
590
        }
591
592
        return $this->getConverter()->convertToDocument($result, $this);
0 ignored issues
show
Documentation introduced by
$result is of type callable, but the function expects a array.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
593
    }
594
595
    /**
596
     * Executes given search.
597
     *
598
     * @param array  $types
599
     * @param Search $search
600
     * @param string $resultsType
601
     *
602
     * @return DocumentIterator|RawIterator|array
603
     */
604
    public function execute($types, Search $search, $resultsType = Result::RESULTS_OBJECT)
605
    {
606
        foreach ($types as &$type) {
607
            $type = $this->resolveTypeName($type);
608
        }
609
610
        $results = $this->search($types, $search->toArray(), $search->getQueryParams());
611
612
        return $this->parseResult($results, $resultsType, $search->getScroll());
0 ignored issues
show
Documentation introduced by
$results is of type callable, but the function expects a array.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
613
    }
614
615
    /**
616
     * Parses raw result.
617
     *
618
     * @param array  $raw
619
     * @param string $resultsType
620
     * @param string $scrollDuration
621
     *
622
     * @return DocumentIterator|RawIterator|array
623
     *
624
     * @throws \Exception
625
     */
626
    private function parseResult($raw, $resultsType, $scrollDuration = null)
627
    {
628
        $scrollConfig = [];
629
        if (isset($raw['_scroll_id'])) {
630
            $scrollConfig['_scroll_id'] = $raw['_scroll_id'];
631
            $scrollConfig['duration'] = $scrollDuration;
632
        }
633
634
        switch ($resultsType) {
635
            case Result::RESULTS_OBJECT:
636
                return new DocumentIterator($raw, $this, $scrollConfig);
637
            case Result::RESULTS_ARRAY:
638
                return $this->convertToNormalizedArray($raw);
639
            case Result::RESULTS_RAW:
640
                return $raw;
641
            case Result::RESULTS_RAW_ITERATOR:
642
                return new RawIterator($raw, $this, $scrollConfig);
643
            default:
644
                throw new \Exception('Wrong results type selected');
645
        }
646
    }
647
648
    /**
649
     * Normalizes response array.
650
     *
651
     * @param array $data
652
     *
653
     * @return array
654
     */
655
    private function convertToNormalizedArray($data)
656
    {
657
        if (array_key_exists('_source', $data)) {
658
            return $data['_source'];
659
        }
660
661
        $output = [];
662
663
        if (isset($data['hits']['hits'][0]['_source'])) {
664
            foreach ($data['hits']['hits'] as $item) {
665
                $output[] = $item['_source'];
666
            }
667
        } elseif (isset($data['hits']['hits'][0]['fields'])) {
668
            foreach ($data['hits']['hits'] as $item) {
669
                $output[] = array_map('reset', $item['fields']);
670
            }
671
        }
672
673
        return $output;
674
    }
675
676
    /**
677
     * Fetches next set of results.
678
     *
679
     * @param string $scrollId
680
     * @param string $scrollDuration
681
     * @param string $resultsType
682
     *
683
     * @return AbstractResultsIterator
684
     *
685
     * @throws \Exception
686
     */
687
    public function scroll(
688
        $scrollId,
689
        $scrollDuration = '5m',
690
        $resultsType = Result::RESULTS_OBJECT
691
    ) {
692
        $results = $this->getClient()->scroll(['scroll_id' => $scrollId, 'scroll' => $scrollDuration]);
693
694
        return $this->parseResult($results, $resultsType, $scrollDuration);
0 ignored issues
show
Documentation introduced by
$results is of type callable, but the function expects a array.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
695
    }
696
697
    /**
698
     * Resolves type name by class name.
699
     *
700
     * @param string $className
701
     *
702
     * @return string
703
     */
704
    private function resolveTypeName($className)
705
    {
706
        if (strpos($className, ':') !== false || strpos($className, '\\') !== false) {
707
            return $this->getMetadataCollector()->getDocumentType($className);
708
        }
709
710
        return $className;
711
    }
712
}
713