Completed
Push — 5.0 ( f8c541...d9528a )
by Simonas
17:40
created

Manager   C

Complexity

Total Complexity 71

Size/Duplication

Total Lines 695
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 7

Importance

Changes 0
Metric Value
wmc 71
c 0
b 0
f 0
lcom 1
cbo 7
dl 0
loc 695
rs 5.0332

39 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 15 1
A getClient() 0 4 1
A getName() 0 4 1
A getConfig() 0 4 1
A setEventDispatcher() 0 4 1
A setStopwatch() 0 4 1
B getRepository() 0 27 5
A getMetadataCollector() 0 4 1
A getConverter() 0 4 1
A getCommitMode() 0 4 1
A setCommitMode() 0 8 4
A getBulkCommitSize() 0 4 1
A setBulkCommitSize() 0 4 1
A createRepository() 0 4 1
A search() 0 21 3
A indexExists() 0 4 1
A getIndexName() 0 4 1
A setIndexName() 0 4 1
A getIndexMappings() 0 4 1
A getVersionNumber() 0 4 1
A clearCache() 0 4 1
A find() 0 22 3
A scroll() 0 8 1
A clearScroll() 0 4 1
A getSettings() 0 4 1
A getAliases() 0 4 1
A resolveTypeName() 0 8 3
A stopwatch() 0 6 2
A msearch() 0 10 1
A persist() 0 7 1
A remove() 0 14 2
A flush() 0 4 1
A refresh() 0 4 1
B commit() 0 49 5
C bulk() 0 48 11
A setBulkParams() 0 4 1
A createIndex() 0 8 2
A dropIndex() 0 4 1
A dropAndCreateIndex() 0 12 3

How to fix   Complexity   

Complex Class

Complex classes like Manager often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes. You can also have a look at the cohesion graph to spot any un-connected, or weakly-connected components.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use Manager, and based on these observations, apply Extract Interface, too.

1
<?php
2
3
/*
4
 * This file is part of the ONGR package.
5
 *
6
 * (c) NFQ Technologies UAB <[email protected]>
7
 *
8
 * For the full copyright and license information, please view the LICENSE
9
 * file that was distributed with this source code.
10
 */
11
12
namespace ONGR\ElasticsearchBundle\Service;
13
14
use Elasticsearch\Client;
15
use Elasticsearch\Common\Exceptions\Missing404Exception;
16
use ONGR\ElasticsearchBundle\Event\Events;
17
use ONGR\ElasticsearchBundle\Event\BulkEvent;
18
use ONGR\ElasticsearchBundle\Event\CommitEvent;
19
use ONGR\ElasticsearchBundle\Exception\BulkWithErrorsException;
20
use ONGR\ElasticsearchBundle\Mapping\MetadataCollector;
21
use ONGR\ElasticsearchBundle\Result\Converter;
22
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
23
use Symfony\Component\Stopwatch\Stopwatch;
24
25
/**
26
 * Manager class.
27
 */
28
class Manager
29
{
30
    /**
31
     * @var string Manager name
32
     */
33
    private $name;
34
35
    /**
36
     * @var array Manager configuration
37
     */
38
    private $config = [];
39
40
    /**
41
     * @var Client
42
     */
43
    private $client;
44
45
    /**
46
     * @var Converter
47
     */
48
    private $converter;
49
50
    /**
51
     * @var array Container for bulk queries
52
     */
53
    private $bulkQueries = [];
54
55
    /**
56
     * @var array Holder for consistency, refresh and replication parameters
57
     */
58
    private $bulkParams = [];
59
60
    /**
61
     * @var array
62
     */
63
    private $indexSettings;
64
65
    /**
66
     * @var MetadataCollector
67
     */
68
    private $metadataCollector;
69
70
    /**
71
     * After commit to make data available the refresh or flush operation is needed
72
     * so one of those methods has to be defined, the default is refresh.
73
     *
74
     * @var string
75
     */
76
    private $commitMode = 'refresh';
77
78
    /**
79
     * The size that defines after how much document inserts call commit function.
80
     *
81
     * @var int
82
     */
83
    private $bulkCommitSize = 100;
84
85
    /**
86
     * Container to count how many documents was passed to the bulk query.
87
     *
88
     * @var int
89
     */
90
    private $bulkCount = 0;
91
92
    /**
93
     * @var Repository[] Repository local cache
94
     */
95
    private $repositories;
96
97
    /**
98
     * @var EventDispatcherInterface
99
     */
100
    private $eventDispatcher;
101
102
    /**
103
     * @var Stopwatch
104
     */
105
    private $stopwatch;
106
107
    /**
108
     * @param string            $name              Manager name
109
     * @param array             $config            Manager configuration
110
     * @param Client            $client
111
     * @param array             $indexSettings
112
     * @param MetadataCollector $metadataCollector
113
     * @param Converter         $converter
114
     */
115
    public function __construct(
116
        $name,
117
        array $config,
118
        $client,
119
        array $indexSettings,
120
        $metadataCollector,
121
        $converter
122
    ) {
123
        $this->name = $name;
124
        $this->config = $config;
125
        $this->client = $client;
126
        $this->indexSettings = $indexSettings;
127
        $this->metadataCollector = $metadataCollector;
128
        $this->converter = $converter;
129
    }
130
131
    /**
132
     * Returns Elasticsearch connection.
133
     *
134
     * @return Client
135
     */
136
    public function getClient()
137
    {
138
        return $this->client;
139
    }
140
141
    /**
142
     * @return string
143
     */
144
    public function getName()
145
    {
146
        return $this->name;
147
    }
148
149
    /**
150
     * @return array
151
     */
152
    public function getConfig()
153
    {
154
        return $this->config;
155
    }
156
157
    /**
158
     * @param EventDispatcherInterface $eventDispatcher
159
     */
160
    public function setEventDispatcher(EventDispatcherInterface $eventDispatcher)
161
    {
162
        $this->eventDispatcher = $eventDispatcher;
163
    }
164
165
    /**
166
     * @param Stopwatch $stopwatch
167
     */
168
    public function setStopwatch(Stopwatch $stopwatch)
169
    {
170
        $this->stopwatch = $stopwatch;
171
    }
172
173
    /**
174
     * Returns repository by document class.
175
     *
176
     * @param string $className FQCN or string in Bundle:Document format
177
     *
178
     * @return Repository
179
     */
180
    public function getRepository($className)
181
    {
182
        if (!is_string($className)) {
183
            throw new \InvalidArgumentException('Document class must be a string.');
184
        }
185
186
        $directory = null;
187
188
        if (strpos($className, ':')) {
189
            $bundle = explode(':', $className)[0];
190
191
            if (isset($this->config['mappings'][$bundle]['document_dir'])) {
192
                $directory = $this->config['mappings'][$bundle]['document_dir'];
193
            }
194
        }
195
196
        $namespace = $this->getMetadataCollector()->getClassName($className, $directory);
197
198
        if (isset($this->repositories[$namespace])) {
199
            return $this->repositories[$namespace];
200
        }
201
202
        $repository = $this->createRepository($namespace);
203
        $this->repositories[$namespace] = $repository;
204
205
        return $repository;
206
    }
207
208
    /**
209
     * @return MetadataCollector
210
     */
211
    public function getMetadataCollector()
212
    {
213
        return $this->metadataCollector;
214
    }
215
216
    /**
217
     * @return Converter
218
     */
219
    public function getConverter()
220
    {
221
        return $this->converter;
222
    }
223
224
    /**
225
     * @return string
226
     */
227
    public function getCommitMode()
228
    {
229
        return $this->commitMode;
230
    }
231
232
    /**
233
     * @param string $commitMode
234
     */
235
    public function setCommitMode($commitMode)
236
    {
237
        if ($commitMode === 'refresh' || $commitMode === 'flush' || $commitMode === 'none') {
238
            $this->commitMode = $commitMode;
239
        } else {
240
            throw new \LogicException('The commit method must be either refresh, flush or none.');
241
        }
242
    }
243
244
    /**
245
     * @return int
246
     */
247
    public function getBulkCommitSize()
248
    {
249
        return $this->bulkCommitSize;
250
    }
251
252
    /**
253
     * @param int $bulkCommitSize
254
     */
255
    public function setBulkCommitSize($bulkCommitSize)
256
    {
257
        $this->bulkCommitSize = $bulkCommitSize;
258
    }
259
260
    /**
261
     * Creates a repository.
262
     *
263
     * @param string $className
264
     *
265
     * @return Repository
266
     */
267
    private function createRepository($className)
268
    {
269
        return new Repository($this, $className);
270
    }
271
272
    /**
273
     * Executes search query in the index.
274
     *
275
     * @param array $types             List of types to search in.
276
     * @param array $query             Query to execute.
277
     * @param array $queryStringParams Query parameters.
278
     *
279
     * @return array
280
     */
281
    public function search(array $types, array $query, array $queryStringParams = [])
282
    {
283
        $params = [];
284
        $params['index'] = $this->getIndexName();
285
        
286
        if (!empty($types)) {
287
            $params['type'] = implode(',', $types);
288
        }
289
        
290
        $params['body'] = $query;
291
292
        if (!empty($queryStringParams)) {
293
            $params = array_merge($queryStringParams, $params);
294
        }
295
296
        $this->stopwatch('start', 'search');
297
        $result = $this->client->search($params);
298
        $this->stopwatch('stop', 'search');
299
300
        return $result;
301
    }
302
303
    /**
304
     * Execute search queries using multisearch api
305
     * $body - is array of requests described in elastic Multi Search API
306
     *
307
     * @param $body
308
     * @return array
309
     */
310
    public function msearch(array $body)
311
    {
312
        $result = $this->client->msearch(
313
            [
314
                'index' => $this->getIndexName(), // set default index
315
                'body' => $body
316
            ]
317
        );
318
        return $result;
319
    }
320
321
    /**
322
     * Adds document to next flush.
323
     *
324
     * @param object $document
325
     */
326
    public function persist($document)
327
    {
328
        $documentArray = $this->converter->convertToArray($document);
329
        $type = $this->getMetadataCollector()->getDocumentType(get_class($document));
330
331
        $this->bulk('index', $type, $documentArray);
332
    }
333
334
    /**
335
     * Adds document for removal.
336
     *
337
     * @param object $document
338
     */
339
    public function remove($document)
340
    {
341
        $data = $this->converter->convertToArray($document, [], ['_id', '_routing']);
342
343
        if (!isset($data['_id'])) {
344
            throw new \LogicException(
345
                'In order to use remove() method document class must have property with @Id annotation.'
346
            );
347
        }
348
349
        $type = $this->getMetadataCollector()->getDocumentType(get_class($document));
350
351
        $this->bulk('delete', $type, $data);
352
    }
353
354
    /**
355
     * Flushes elasticsearch index.
356
     *
357
     * @param array $params
358
     *
359
     * @return array
360
     */
361
    public function flush(array $params = [])
362
    {
363
        return $this->client->indices()->flush(array_merge(['index' => $this->getIndexName()], $params));
364
    }
365
366
    /**
367
     * Refreshes elasticsearch index.
368
     *
369
     * @param array $params
370
     *
371
     * @return array
372
     */
373
    public function refresh(array $params = [])
374
    {
375
        return $this->client->indices()->refresh(array_merge(['index' => $this->getIndexName()], $params));
376
    }
377
378
    /**
379
     * Inserts the current query container to the index, used for bulk queries execution.
380
     *
381
     * @param array $params Parameters that will be passed to the flush or refresh queries.
382
     *
383
     * @return null|array
384
     *
385
     * @throws BulkWithErrorsException
386
     */
387
    public function commit(array $params = [])
388
    {
389
        if (!empty($this->bulkQueries)) {
390
            $bulkQueries = array_merge($this->bulkQueries, $this->bulkParams);
391
            $bulkQueries['index']['_index'] = $this->getIndexName();
392
            $this->eventDispatcher->dispatch(
393
                Events::PRE_COMMIT,
394
                new CommitEvent($this->getCommitMode(), $bulkQueries)
395
            );
396
397
            $this->stopwatch('start', 'bulk');
398
            $bulkResponse = $this->client->bulk($bulkQueries);
399
            $this->stopwatch('stop', 'bulk');
400
401
            if ($bulkResponse['errors']) {
402
                throw new BulkWithErrorsException(
403
                    json_encode($bulkResponse),
404
                    0,
405
                    null,
406
                    $bulkResponse
407
                );
408
            }
409
410
            $this->bulkQueries = [];
411
            $this->bulkCount = 0;
412
413
            $this->stopwatch('start', 'refresh');
414
415
            switch ($this->getCommitMode()) {
416
                case 'flush':
417
                    $this->flush($params);
418
                    break;
419
                case 'refresh':
420
                    $this->refresh($params);
421
                    break;
422
            }
423
424
            $this->eventDispatcher->dispatch(
425
                Events::POST_COMMIT,
426
                new CommitEvent($this->getCommitMode(), $bulkResponse)
427
            );
428
429
            $this->stopwatch('stop', 'refresh');
430
431
            return $bulkResponse;
432
        }
433
434
        return null;
435
    }
436
437
    /**
438
     * Adds query to bulk queries container.
439
     *
440
     * @param string       $operation One of: index, update, delete, create.
441
     * @param string|array $type      Elasticsearch type name.
442
     * @param array        $query     DSL to execute.
443
     *
444
     * @throws \InvalidArgumentException
445
     *
446
     * @return null|array
447
     */
448
    public function bulk($operation, $type, array $query)
449
    {
450
        if (!in_array($operation, ['index', 'create', 'update', 'delete'])) {
451
            throw new \InvalidArgumentException('Wrong bulk operation selected');
452
        }
453
454
        $this->eventDispatcher->dispatch(
455
            Events::BULK,
456
            new BulkEvent($operation, $type, $query)
457
        );
458
459
        $this->bulkQueries['body'][] = [
460
            $operation => array_filter(
461
                [
462
                    '_type' => $type,
463
                    '_id' => isset($query['_id']) ? $query['_id'] : null,
464
                    '_ttl' => isset($query['_ttl']) ? $query['_ttl'] : null,
465
                    '_routing' => isset($query['_routing']) ? $query['_routing'] : null,
466
                    '_parent' => isset($query['_parent']) ? $query['_parent'] : null,
467
                ]
468
            ),
469
        ];
470
        unset($query['_id'], $query['_ttl'], $query['_parent'], $query['_routing']);
471
472
        switch ($operation) {
473
            case 'index':
474
            case 'create':
475
            case 'update':
476
                $this->bulkQueries['body'][] = $query;
477
                break;
478
            case 'delete':
479
                // Body for delete operation is not needed to apply.
480
            default:
481
                // Do nothing.
482
                break;
483
        }
484
485
        // We are using counter because there is to difficult to resolve this from bulkQueries array.
486
        $this->bulkCount++;
487
488
        $response = null;
489
490
        if ($this->bulkCommitSize === $this->bulkCount) {
491
            $response = $this->commit();
492
        }
493
494
        return $response;
495
    }
496
497
    /**
498
     * Optional setter to change bulk query params.
499
     *
500
     * @param array $params Possible keys:
501
     *                      ['consistency'] = (enum) Explicit write consistency setting for the operation.
502
     *                      ['refresh']     = (boolean) Refresh the index after performing the operation.
503
     *                      ['replication'] = (enum) Explicitly set the replication type.
504
     */
505
    public function setBulkParams(array $params)
506
    {
507
        $this->bulkParams = $params;
508
    }
509
510
    /**
511
     * Creates fresh elasticsearch index.
512
     *
513
     * @param bool $noMapping Determines if mapping should be included.
514
     *
515
     * @return array
516
     */
517
    public function createIndex($noMapping = false)
518
    {
519
        if ($noMapping) {
520
            unset($this->indexSettings['body']['mappings']);
521
        }
522
523
        return $this->getClient()->indices()->create($this->indexSettings);
524
    }
525
526
    /**
527
     * Drops elasticsearch index.
528
     */
529
    public function dropIndex()
530
    {
531
        return $this->getClient()->indices()->delete(['index' => $this->getIndexName()]);
532
    }
533
534
    /**
535
     * Tries to drop and create fresh elasticsearch index.
536
     *
537
     * @param bool $noMapping Determines if mapping should be included.
538
     *
539
     * @return array
540
     */
541
    public function dropAndCreateIndex($noMapping = false)
542
    {
543
        try {
544
            if ($this->indexExists()) {
545
                $this->dropIndex();
546
            }
547
        } catch (\Exception $e) {
548
            // Do nothing, our target is to create new index.
549
        }
550
551
        return $this->createIndex($noMapping);
552
    }
553
554
    /**
555
     * Checks if connection index is already created.
556
     *
557
     * @return bool
558
     */
559
    public function indexExists()
560
    {
561
        return $this->getClient()->indices()->exists(['index' => $this->getIndexName()]);
562
    }
563
564
    /**
565
     * Returns index name this connection is attached to.
566
     *
567
     * @return string
568
     */
569
    public function getIndexName()
570
    {
571
        return $this->indexSettings['index'];
572
    }
573
574
    /**
575
     * Sets index name for this connection.
576
     *
577
     * @param string $name
578
     */
579
    public function setIndexName($name)
580
    {
581
        $this->indexSettings['index'] = $name;
582
    }
583
584
    /**
585
     * Returns mappings of the index for this connection.
586
     *
587
     * @return array
588
     */
589
    public function getIndexMappings()
590
    {
591
        return $this->indexSettings['body']['mappings'];
592
    }
593
594
    /**
595
     * Returns Elasticsearch version number.
596
     *
597
     * @return string
598
     */
599
    public function getVersionNumber()
600
    {
601
        return $this->client->info()['version']['number'];
602
    }
603
604
    /**
605
     * Clears elasticsearch client cache.
606
     */
607
    public function clearCache()
608
    {
609
        $this->getClient()->indices()->clearCache(['index' => $this->getIndexName()]);
610
    }
611
612
    /**
613
     * Returns a single document by ID. Returns NULL if document was not found.
614
     *
615
     * @param string $className Document class name or Elasticsearch type name
616
     * @param string $id        Document ID to find
617
     * @param string $routing   Custom routing for the document
618
     *
619
     * @return object
620
     */
621
    public function find($className, $id, $routing = null)
622
    {
623
        $type = $this->resolveTypeName($className);
624
625
        $params = [
626
            'index' => $this->getIndexName(),
627
            'type' => $type,
628
            'id' => $id,
629
        ];
630
631
        if ($routing) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $routing of type string|null is loosely compared to true; this is ambiguous if the string can be empty. You might want to explicitly use !== null instead.

In PHP, under loose comparison (like ==, or !=, or switch conditions), values of different types might be equal.

For string values, the empty string '' is a special case, in particular the following results might be unexpected:

''   == false // true
''   == null  // true
'ab' == false // false
'ab' == null  // false

// It is often better to use strict comparison
'' === false // false
'' === null  // false
Loading history...
632
            $params['routing'] = $routing;
633
        }
634
635
        try {
636
            $result = $this->getClient()->get($params);
637
        } catch (Missing404Exception $e) {
0 ignored issues
show
Bug introduced by
The class Elasticsearch\Common\Exc...ons\Missing404Exception does not exist. Did you forget a USE statement, or did you not list all dependencies?

Scrutinizer analyzes your composer.json/composer.lock file if available to determine the classes, and functions that are defined by your dependencies.

It seems like the listed class was neither found in your dependencies, nor was it found in the analyzed files in your repository. If you are using some other form of dependency management, you might want to disable this analysis.

Loading history...
638
            return null;
639
        }
640
641
        return $this->getConverter()->convertToDocument($result, $this);
642
    }
643
644
    /**
645
     * Fetches next set of results.
646
     *
647
     * @param string $scrollId
648
     * @param string $scrollDuration
649
     *
650
     * @return mixed
651
     *
652
     * @throws \Exception
653
     */
654
    public function scroll(
655
        $scrollId,
656
        $scrollDuration = '5m'
657
    ) {
658
        $results = $this->getClient()->scroll(['scroll_id' => $scrollId, 'scroll' => $scrollDuration]);
659
660
        return $results;
661
    }
662
663
    /**
664
     * Clears scroll.
665
     *
666
     * @param string $scrollId
667
     */
668
    public function clearScroll($scrollId)
669
    {
670
        $this->getClient()->clearScroll(['scroll_id' => $scrollId]);
671
    }
672
673
    /**
674
     * Calls "Get Settings API" in Elasticsearch and will return you the currently configured settings.
675
     *
676
     * return array
677
     */
678
    public function getSettings()
679
    {
680
        return $this->getClient()->indices()->getSettings(['index' => $this->getIndexName()]);
681
    }
682
683
    /**
684
     * Gets Elasticsearch aliases information.
685
     * @param $params
686
     *
687
     * @return array
688
     */
689
    public function getAliases($params = [])
690
    {
691
        return $this->getClient()->indices()->getAliases(array_merge(['index' => $this->getIndexName()], $params));
692
    }
693
694
    /**
695
     * Resolves type name by class name.
696
     *
697
     * @param string $className
698
     *
699
     * @return string
700
     */
701
    private function resolveTypeName($className)
702
    {
703
        if (strpos($className, ':') !== false || strpos($className, '\\') !== false) {
704
            return $this->getMetadataCollector()->getDocumentType($className);
705
        }
706
707
        return $className;
708
    }
709
710
    /**
711
     * Starts and stops an event in the stopwatch
712
     *
713
     * @param string $action   only 'start' and 'stop'
714
     * @param string $name     name of the event
715
     */
716
    private function stopwatch($action, $name)
717
    {
718
        if (isset($this->stopwatch)) {
719
            $this->stopwatch->$action('ongr_es: '.$name, 'ongr_es');
720
        }
721
    }
722
}
723