Completed
Pull Request — master (#530)
by Mantas
06:01
created

Manager::remove()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 14
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 1
Metric Value
c 1
b 0
f 1
dl 0
loc 14
rs 9.4286
cc 2
eloc 7
nc 2
nop 1
1
<?php
2
3
/*
4
 * This file is part of the ONGR package.
5
 *
6
 * (c) NFQ Technologies UAB <[email protected]>
7
 *
8
 * For the full copyright and license information, please view the LICENSE
9
 * file that was distributed with this source code.
10
 */
11
12
namespace ONGR\ElasticsearchBundle\Service;
13
14
use Elasticsearch\Client;
15
use Elasticsearch\Common\Exceptions\Forbidden403Exception;
16
use Elasticsearch\Common\Exceptions\Missing404Exception;
17
use ONGR\ElasticsearchBundle\Mapping\MetadataCollector;
18
use ONGR\ElasticsearchBundle\Result\AbstractResultsIterator;
19
use ONGR\ElasticsearchBundle\Result\Converter;
20
use ONGR\ElasticsearchBundle\Result\DocumentIterator;
21
use ONGR\ElasticsearchBundle\Result\RawIterator;
22
use ONGR\ElasticsearchBundle\Result\Result;
23
use ONGR\ElasticsearchDSL\Search;
24
25
/**
26
 * Manager class.
27
 */
28
class Manager
29
{
30
    /**
31
     * @var string Manager name
32
     */
33
    private $name;
34
35
    /**
36
     * @var array Manager configuration
37
     */
38
    private $config = [];
39
40
    /**
41
     * @var Client
42
     */
43
    private $client;
44
45
    /**
46
     * @var Converter
47
     */
48
    private $converter;
49
50
    /**
51
     * @var bool
52
     */
53
    private $readOnly;
54
55
    /**
56
     * @var array Container for bulk queries
57
     */
58
    private $bulkQueries = [];
59
60
    /**
61
     * @var array Holder for consistency, refresh and replication parameters
62
     */
63
    private $bulkParams = [];
64
65
    /**
66
     * @var array
67
     */
68
    private $indexSettings;
69
70
    /**
71
     * @var MetadataCollector
72
     */
73
    private $metadataCollector;
74
75
    /**
76
     * After commit to make data available the refresh or flush operation is needed
77
     * so one of those methods has to be defined, the default is refresh.
78
     *
79
     * @var string
80
     */
81
    private $commitMode = 'refresh';
82
83
    /**
84
     * The size that defines after how much document inserts call commit function.
85
     *
86
     * @var int
87
     */
88
    private $bulkCommitSize = 100;
89
90
    /**
91
     * Container to count how many documents was passed to the bulk query.
92
     *
93
     * @var int
94
     */
95
    private $bulkCount = 0;
96
97
    /**
98
     * @var Repository[] Repository local cache
99
     */
100
    private $repositories;
101
102
    /**
103
     * @param string            $name              Manager name
104
     * @param array             $config            Manager configuration
105
     * @param Client            $client
106
     * @param array             $indexSettings
107
     * @param MetadataCollector $metadataCollector
108
     * @param Converter         $converter
109
     */
110
    public function __construct(
111
        $name,
112
        array $config,
113
        $client,
114
        array $indexSettings,
115
        $metadataCollector,
116
        $converter
117
    ) {
118
        $this->name = $name;
119
        $this->config = $config;
120
        $this->client = $client;
121
        $this->indexSettings = $indexSettings;
122
        $this->metadataCollector = $metadataCollector;
123
        $this->converter = $converter;
124
125
        $this->setReadOnly($config['readonly']);
126
    }
127
128
    /**
129
     * Returns Elasticsearch connection.
130
     *
131
     * @return Client
132
     */
133
    public function getClient()
134
    {
135
        return $this->client;
136
    }
137
138
    /**
139
     * @return string
140
     */
141
    public function getName()
142
    {
143
        return $this->name;
144
    }
145
146
    /**
147
     * @return array
148
     */
149
    public function getConfig()
150
    {
151
        return $this->config;
152
    }
153
154
    /**
155
     * Returns repository by document class.
156
     *
157
     * @param string $className FQCN or string in Bundle:Document format
158
     *
159
     * @return Repository
160
     */
161
    public function getRepository($className)
162
    {
163
        if (!is_string($className)) {
164
            throw new \InvalidArgumentException('Document class must be a string.');
165
        }
166
167
        $namespace = $this->getMetadataCollector()->getClassName($className);
168
169
        if (isset($this->repositories[$namespace])) {
170
            return $this->repositories[$namespace];
171
        }
172
173
        $repository = $this->createRepository($namespace);
174
        $this->repositories[$namespace] = $repository;
175
176
        return $repository;
177
    }
178
179
    /**
180
     * @return MetadataCollector
181
     */
182
    public function getMetadataCollector()
183
    {
184
        return $this->metadataCollector;
185
    }
186
187
    /**
188
     * @return Converter
189
     */
190
    public function getConverter()
191
    {
192
        return $this->converter;
193
    }
194
195
    /**
196
     * @return string
197
     */
198
    public function getCommitMode()
199
    {
200
        return $this->commitMode;
201
    }
202
203
    /**
204
     * @param string $commitMode
205
     */
206
    public function setCommitMode($commitMode)
207
    {
208
        if ($commitMode === 'refresh' || $commitMode === 'flush') {
209
            $this->commitMode = $commitMode;
210
        } else {
211
            throw new \LogicException('The commit method must be either refresh or flush.');
212
        }
213
    }
214
215
    /**
216
     * @return int
217
     */
218
    public function getBulkCommitSize()
219
    {
220
        return $this->bulkCommitSize;
221
    }
222
223
    /**
224
     * @param int $bulkCommitSize
225
     */
226
    public function setBulkCommitSize($bulkCommitSize)
227
    {
228
        $this->bulkCommitSize = $bulkCommitSize;
229
    }
230
231
    /**
232
     * Creates a repository.
233
     *
234
     * @param string $className
235
     *
236
     * @return Repository
237
     */
238
    private function createRepository($className)
239
    {
240
        return new Repository($this, $className);
241
    }
242
243
    /**
244
     * Executes search query in the index.
245
     *
246
     * @param array $types             List of types to search in.
247
     * @param array $query             Query to execute.
248
     * @param array $queryStringParams Query parameters.
249
     *
250
     * @return array
251
     */
252
    public function search(array $types, array $query, array $queryStringParams = [])
253
    {
254
        $params = [];
255
        $params['index'] = $this->getIndexName();
256
        $params['type'] = implode(',', $types);
257
        $params['body'] = $query;
258
259
        if (!empty($queryStringParams)) {
260
            $params = array_merge($queryStringParams, $params);
261
        }
262
263
        return $this->client->search($params);
264
    }
265
266
    /**
267
     * Adds document to next flush.
268
     *
269
     * @param object $document
270
     */
271
    public function persist($document)
272
    {
273
        $documentArray = $this->converter->convertToArray($document);
274
        $type = $this->getMetadataCollector()->getDocumentType(get_class($document));
275
276
        $this->bulk('index', $type, $documentArray);
277
    }
278
279
    /**
280
     * Adds document for removal.
281
     *
282
     * @param object $document
283
     */
284
    public function remove($document)
285
    {
286
        $data = $this->converter->convertToArray($document, [], ['_id']);
287
288
        if (!isset($data['_id'])) {
289
            throw new \LogicException(
290
                'In order to use remove() method document class must have @MetaField annotation for "_id" field.'
291
            );
292
        }
293
294
        $type = $this->getMetadataCollector()->getDocumentType(get_class($document));
295
296
        $this->bulk('delete', $type, ['_id' => $data['_id']]);
297
    }
298
299
    /**
300
     * Flushes elasticsearch index.
301
     *
302
     * @param array $params
303
     *
304
     * @return array
305
     */
306
    public function flush(array $params = [])
307
    {
308
        return $this->client->indices()->flush($params);
309
    }
310
311
    /**
312
     * Refreshes elasticsearch index.
313
     *
314
     * @param array $params
315
     *
316
     * @return array
317
     */
318
    public function refresh(array $params = [])
319
    {
320
        return $this->client->indices()->refresh($params);
321
    }
322
323
    /**
324
     * Inserts the current query container to the index, used for bulk queries execution.
325
     *
326
     * @param array $params Parameters that will be passed to the flush or refresh queries.
327
     *
328
     * @return null|array
329
     */
330
    public function commit(array $params = [])
331
    {
332
        $this->isReadOnly('Commit');
333
334
        if (!empty($this->bulkQueries)) {
335
            $bulkQueries = array_merge($this->bulkQueries, $this->bulkParams);
336
            $this->bulkQueries = [];
337
338
            $bulkResponse = $this->client->bulk($bulkQueries);
339
340
            switch ($this->getCommitMode()) {
341
                case 'flush':
342
                    $this->flush($params);
343
                    break;
344
                case 'refresh':
345
                default:
346
                    $this->refresh($params);
347
                    break;
348
            }
349
350
            return $bulkResponse;
351
        }
352
353
        return null;
354
    }
355
356
    /**
357
     * Adds query to bulk queries container.
358
     *
359
     * @param string       $operation One of: index, update, delete, create.
360
     * @param string|array $type      Elasticsearch type name.
361
     * @param array        $query     DSL to execute.
362
     *
363
     * @throws \InvalidArgumentException
364
     */
365
    public function bulk($operation, $type, array $query)
366
    {
367
        $this->isReadOnly('Bulk');
368
369
        if (!in_array($operation, ['index', 'create', 'update', 'delete'])) {
370
            throw new \InvalidArgumentException('Wrong bulk operation selected');
371
        }
372
373
        $this->bulkQueries['body'][] = [
374
            $operation => array_filter(
375
                [
376
                    '_index' => $this->getIndexName(),
377
                    '_type' => $type,
378
                    '_id' => isset($query['_id']) ? $query['_id'] : null,
379
                    '_ttl' => isset($query['_ttl']) ? $query['_ttl'] : null,
380
                    '_parent' => isset($query['_parent']) ? $query['_parent'] : null,
381
                ]
382
            ),
383
        ];
384
        unset($query['_id'], $query['_ttl'], $query['_parent']);
385
386
        switch ($operation) {
387
            case 'index':
388
            case 'create':
389
            case 'update':
390
                $this->bulkQueries['body'][] = $query;
391
                break;
392
            case 'delete':
393
                // Body for delete operation is not needed to apply.
394
            default:
395
                // Do nothing.
396
                break;
397
        }
398
399
        // We are using counter because there is to difficult to resolve this from bulkQueries array.
400
        $this->bulkCount++;
401
402
        if ($this->bulkCommitSize === $this->bulkCount) {
403
            $this->commit();
404
            $this->bulkCount = 0;
405
        }
406
    }
407
408
    /**
409
     * Optional setter to change bulk query params.
410
     *
411
     * @param array $params Possible keys:
412
     *                      ['consistency'] = (enum) Explicit write consistency setting for the operation.
413
     *                      ['refresh']     = (boolean) Refresh the index after performing the operation.
414
     *                      ['replication'] = (enum) Explicitly set the replication type.
415
     */
416
    public function setBulkParams(array $params)
417
    {
418
        $this->bulkParams = $params;
419
    }
420
421
    /**
422
     * Creates fresh elasticsearch index.
423
     *
424
     * @param bool $noMapping Determines if mapping should be included.
425
     *
426
     * @return array
427
     */
428
    public function createIndex($noMapping = false)
429
    {
430
        $this->isReadOnly('Create index');
431
432
        if ($noMapping) {
433
            unset($this->indexSettings['body']['mappings']);
434
        }
435
436
        return $this->getClient()->indices()->create($this->indexSettings);
437
    }
438
439
    /**
440
     * Drops elasticsearch index.
441
     */
442
    public function dropIndex()
443
    {
444
        $this->isReadOnly('Drop index');
445
446
        return $this->getClient()->indices()->delete(['index' => $this->getIndexName()]);
447
    }
448
449
    /**
450
     * Tries to drop and create fresh elasticsearch index.
451
     *
452
     * @param bool $noMapping Determines if mapping should be included.
453
     *
454
     * @return array
455
     */
456
    public function dropAndCreateIndex($noMapping = false)
457
    {
458
        try {
459
            $this->dropIndex();
460
        } catch (\Exception $e) {
461
            // Do nothing, our target is to create new index.
462
        }
463
464
        return $this->createIndex($noMapping);
465
    }
466
467
    /**
468
     * Puts mapping into elasticsearch client.
469
     *
470
     * @param array $types           Specific types to put.
471
     * @param bool  $ignoreConflicts Ignore elasticsearch merge conflicts.
472
     */
473
    public function updateMapping(array $types = [], $ignoreConflicts = true)
474
    {
475
        $this->isReadOnly('Mapping update');
476
        $params['index'] = $this->getIndexName();
0 ignored issues
show
Coding Style Comprehensibility introduced by
$params was never initialized. Although not strictly required by PHP, it is generally a good practice to add $params = array(); before regardless.

Adding an explicit array definition is generally preferable to implicit array definition as it guarantees a stable state of the code.

Let’s take a look at an example:

foreach ($collection as $item) {
    $myArray['foo'] = $item->getFoo();

    if ($item->hasBar()) {
        $myArray['bar'] = $item->getBar();
    }

    // do something with $myArray
}

As you can see in this example, the array $myArray is initialized the first time when the foreach loop is entered. You can also see that the value of the bar key is only written conditionally; thus, its value might result from a previous iteration.

This might or might not be intended. To make your intention clear, your code more readible and to avoid accidental bugs, we recommend to add an explicit initialization $myArray = array() either outside or inside the foreach loop.

Loading history...
477
478
        if (empty($types)) {
479
            $map = $this->getConfig()['mappings'];
480
            foreach ($map as $bundle) {
481
                if (strpos($bundle, ':')) {
482
                    $types[] = $bundle;
483
                } else {
484
                    $bundleMappings = $this->getMetadataCollector()->getMappings([$bundle]);
485
                    foreach ($bundleMappings as $document) {
486
                        $types[] = $document['bundle'].':'.$document['class'];
487
                    }
488
                }
489
            }
490
        }
491
492
        foreach ($types as $type) {
493
            $mapping = $this->getMetadataCollector()->getClientMapping([$type]);
494
495
            if ($mapping === null) {
496
                throw new \LogicException(sprintf('Mapping for type "%s" was not found.', $type));
497
            }
498
499
            try {
500
                $type = $this->getMetadataCollector()->getDocumentType($type);
501
                $params['type'] = $type;
502
                $params['body'] = $mapping;
503
                $params['ignore_conflicts'] = $ignoreConflicts;
504
                $this->client->indices()->putMapping(array_filter($params));
505
            } catch (\Exception $e) {
506
                throw new \LogicException(
507
                    'Only the documents[] can be passed to the type update command. ' .
508
                    'Maybe you added only a bundle. Please check if a document is mapped in the manager.'
509
                );
510
            }
511
        }
512
    }
513
514
    /**
515
     * Checks if connection index is already created.
516
     *
517
     * @return bool
518
     */
519
    public function indexExists()
520
    {
521
        return $this->getClient()->indices()->exists(['index' => $this->getIndexName()]);
522
    }
523
524
    /**
525
     * Returns index name this connection is attached to.
526
     *
527
     * @return string
528
     */
529
    public function getIndexName()
530
    {
531
        return $this->indexSettings['index'];
532
    }
533
534
    /**
535
     * Sets index name for this connection.
536
     *
537
     * @param string $name
538
     */
539
    public function setIndexName($name)
540
    {
541
        $this->indexSettings['index'] = $name;
542
    }
543
544
    /**
545
     * Returns Elasticsearch version number.
546
     *
547
     * @return string
548
     */
549
    public function getVersionNumber()
550
    {
551
        return $this->client->info()['version']['number'];
552
    }
553
554
    /**
555
     * Clears elasticsearch client cache.
556
     */
557
    public function clearCache()
558
    {
559
        $this->isReadOnly('Clear cache');
560
561
        $this->getClient()->indices()->clearCache(['index' => $this->getIndexName()]);
562
    }
563
564
    /**
565
     * Set connection to read only state.
566
     *
567
     * @param bool $readOnly
568
     */
569
    public function setReadOnly($readOnly)
570
    {
571
        $this->readOnly = $readOnly;
572
    }
573
574
    /**
575
     * Checks if connection is read only.
576
     *
577
     * @param string $message Error message.
578
     *
579
     * @throws Forbidden403Exception
580
     */
581
    public function isReadOnly($message = '')
582
    {
583
        if ($this->readOnly) {
584
            throw new Forbidden403Exception("Manager is readonly! {$message} operation is not permitted.");
585
        }
586
    }
587
588
    /**
589
     * Returns a single document by ID. Returns NULL if document was not found.
590
     *
591
     * @param string $className Document class name or Elasticsearch type name
592
     * @param string $id        Document ID to find
593
     *
594
     * @return object
595
     */
596
    public function find($className, $id)
597
    {
598
        $type = $this->resolveTypeName($className);
599
600
        $params = [
601
            'index' => $this->getIndexName(),
602
            'type' => $type,
603
            'id' => $id,
604
        ];
605
606
        try {
607
            $result = $this->getClient()->get($params);
608
        } catch (Missing404Exception $e) {
609
            return null;
610
        }
611
612
        return $this->getConverter()->convertToDocument($result, $this);
0 ignored issues
show
Documentation introduced by
$result is of type callable, but the function expects a array.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
613
    }
614
615
    /**
616
     * Executes given search.
617
     *
618
     * @param array  $types
619
     * @param Search $search
620
     * @param string $resultsType
621
     *
622
     * @return DocumentIterator|RawIterator|array
623
     */
624
    public function execute($types, Search $search, $resultsType = Result::RESULTS_OBJECT)
625
    {
626
        foreach ($types as &$type) {
627
            $type = $this->resolveTypeName($type);
628
        }
629
630
        $results = $this->search($types, $search->toArray(), $search->getQueryParams());
631
632
        return $this->parseResult($results, $resultsType, $search->getScroll());
0 ignored issues
show
Documentation introduced by
$results is of type callable, but the function expects a array.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
633
    }
634
635
    /**
636
     * Parses raw result.
637
     *
638
     * @param array  $raw
639
     * @param string $resultsType
640
     * @param string $scrollDuration
641
     *
642
     * @return DocumentIterator|RawIterator|array
643
     *
644
     * @throws \Exception
645
     */
646
    private function parseResult($raw, $resultsType, $scrollDuration = null)
647
    {
648
        $scrollConfig = [];
649
        if (isset($raw['_scroll_id'])) {
650
            $scrollConfig['_scroll_id'] = $raw['_scroll_id'];
651
            $scrollConfig['duration'] = $scrollDuration;
652
        }
653
654
        switch ($resultsType) {
655
            case Result::RESULTS_OBJECT:
656
                return new DocumentIterator($raw, $this, $scrollConfig);
657
            case Result::RESULTS_ARRAY:
658
                return $this->convertToNormalizedArray($raw);
659
            case Result::RESULTS_RAW:
660
                return $raw;
661
            case Result::RESULTS_RAW_ITERATOR:
662
                return new RawIterator($raw, $this, $scrollConfig);
663
            default:
664
                throw new \Exception('Wrong results type selected');
665
        }
666
    }
667
668
    /**
669
     * Normalizes response array.
670
     *
671
     * @param array $data
672
     *
673
     * @return array
674
     */
675
    private function convertToNormalizedArray($data)
676
    {
677
        if (array_key_exists('_source', $data)) {
678
            return $data['_source'];
679
        }
680
681
        $output = [];
682
683
        if (isset($data['hits']['hits'][0]['_source'])) {
684
            foreach ($data['hits']['hits'] as $item) {
685
                $output[] = $item['_source'];
686
            }
687
        } elseif (isset($data['hits']['hits'][0]['fields'])) {
688
            foreach ($data['hits']['hits'] as $item) {
689
                $output[] = array_map('reset', $item['fields']);
690
            }
691
        }
692
693
        return $output;
694
    }
695
696
    /**
697
     * Fetches next set of results.
698
     *
699
     * @param string $scrollId
700
     * @param string $scrollDuration
701
     * @param string $resultsType
702
     *
703
     * @return AbstractResultsIterator
704
     *
705
     * @throws \Exception
706
     */
707
    public function scroll(
708
        $scrollId,
709
        $scrollDuration = '5m',
710
        $resultsType = Result::RESULTS_OBJECT
711
    ) {
712
        $results = $this->getClient()->scroll(['scroll_id' => $scrollId, 'scroll' => $scrollDuration]);
713
714
        return $this->parseResult($results, $resultsType, $scrollDuration);
0 ignored issues
show
Documentation introduced by
$results is of type callable, but the function expects a array.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
715
    }
716
717
    /**
718
     * Clears scroll.
719
     *
720
     * @param string $scrollId
721
     */
722
    public function clearScroll($scrollId)
723
    {
724
        $this->getClient()->clearScroll(['scroll_id' => $scrollId]);
725
    }
726
727
    /**
728
     * Resolves type name by class name.
729
     *
730
     * @param string $className
731
     *
732
     * @return string
733
     */
734
    private function resolveTypeName($className)
735
    {
736
        if (strpos($className, ':') !== false || strpos($className, '\\') !== false) {
737
            return $this->getMetadataCollector()->getDocumentType($className);
738
        }
739
740
        return $className;
741
    }
742
}
743