Completed
Push — v0.10 ( f42b97...309689 )
by Simonas
8s
created

Connection   D

Complexity

Total Complexity 80

Size/Duplication

Total Lines 760
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 5

Importance

Changes 6
Bugs 2 Features 1
Metric Value
wmc 80
c 6
b 2
f 1
lcom 1
cbo 5
dl 0
loc 760
rs 4.4444

42 Methods

Rating   Name   Duplication   Size   Complexity  
A getClient() 0 4 1
A setBulkParams() 0 4 1
A refresh() 0 4 1
A flush() 0 4 1
A indexExists() 0 4 1
A getIndexName() 0 4 1
A setIndexName() 0 4 1
A forceMapping() 0 4 1
A setMapping() 0 4 1
A updateMapping() 0 4 1
A getVersionNumber() 0 4 1
A addWarmer() 0 4 1
A putWarmers() 0 4 1
A deleteWarmers() 0 4 1
A setReadOnly() 0 4 1
A __construct() 0 8 1
D bulk() 0 36 9
A commit() 0 10 2
A delete() 0 6 1
A deleteByQuery() 0 9 1
A search() 0 13 2
A scroll() 0 8 1
A createIndex() 0 17 3
A dropIndex() 0 6 1
A createTypes() 0 18 3
A dropTypes() 0 14 2
B updateTypes() 0 29 3
A dropAndCreateIndex() 0 10 2
A getMapping() 0 8 2
A setMultipleMapping() 0 10 3
A close() 0 6 1
A isOpen() 0 10 2
A open() 0 6 1
A getMappingFromIndex() 0 13 2
A updateSettings() 0 8 2
A clearCache() 0 6 1
A isReadOnly() 0 6 2
B warmersAction() 0 38 6
A validateWarmers() 0 15 3
A loadMappingArray() 0 14 2
A unloadMappingArray() 0 11 2
B filterMapping() 0 12 5

How to fix   Complexity   

Complex Class

Complex classes like Connection often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes. You can also have a look at the cohesion graph to spot any un-connected, or weakly-connected components.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use Connection, and based on these observations, apply Extract Interface, too.

1
<?php
2
3
/*
4
 * This file is part of the ONGR package.
5
 *
6
 * (c) NFQ Technologies UAB <[email protected]>
7
 *
8
 * For the full copyright and license information, please view the LICENSE
9
 * file that was distributed with this source code.
10
 */
11
12
namespace ONGR\ElasticsearchBundle\Client;
13
14
use Elasticsearch\Client;
15
use Elasticsearch\Common\Exceptions\Forbidden403Exception;
16
use ONGR\ElasticsearchBundle\Cache\WarmerInterface;
17
use ONGR\ElasticsearchBundle\Cache\WarmersContainer;
18
use ONGR\ElasticsearchBundle\Mapping\MappingTool;
19
20
/**
21
 * This class interacts with elasticsearch using injected client.
22
 */
23
class Connection
24
{
25
    /**
26
     * @var Client
27
     */
28
    private $client;
29
30
    /**
31
     * Holds index information. Similar structure to elasticsearch docs.
32
     *
33
     * Example:
34
     *
35
     * ```php
36
     * array(
37
     *      'index' => 'index name'
38
     *      'body' => [
39
     *          'settings' => [...],
40
     *          'mappings' => [...]
41
     *      ]
42
     * )
43
     * ```
44
     *
45
     * @var array
46
     */
47
    private $settings;
48
49
    /**
50
     * @var array Container for bulk queries.
51
     */
52
    private $bulkQueries;
53
54
    /**
55
     * @var array Holder for consistency, refresh and replication parameters.
56
     */
57
    private $bulkParams;
58
59
    /**
60
     * @var WarmersContainer
61
     */
62
    private $warmers;
63
64
    /**
65
     * @var bool
66
     */
67
    private $readOnly;
68
69
    /**
70
     * Construct.
71
     *
72
     * @param Client $client   Elasticsearch client.
73
     * @param array  $settings Settings array.
74
     */
75
    public function __construct($client, $settings)
76
    {
77
        $this->client = $client;
78
        $this->settings = $settings;
79
        $this->bulkQueries = [];
80
        $this->bulkParams = [];
81
        $this->warmers = new WarmersContainer();
82
    }
83
84
    /**
85
     * @return Client
86
     */
87
    public function getClient()
88
    {
89
        return $this->client;
90
    }
91
92
    /**
93
     * Adds query to bulk queries container.
94
     *
95
     * @param string       $operation One of: index, update, delete, create.
96
     * @param string|array $type      Elasticsearch type name.
97
     * @param array        $query     DSL to execute.
98
     *
99
     * @throws \InvalidArgumentException
100
     */
101
    public function bulk($operation, $type, array $query)
102
    {
103
        $this->isReadOnly('Bulk');
104
105
        if (!in_array($operation, ['index', 'create', 'update', 'delete'])) {
106
            throw new \InvalidArgumentException('Wrong bulk operation selected');
107
        }
108
109
        $this->bulkQueries['body'][] = [
110
            $operation => array_filter(
111
                [
112
                    '_index' => $this->getIndexName(),
113
                    '_type' => $type,
114
                    '_id' => isset($query['_id']) ? $query['_id'] : null,
115
                    '_ttl' => isset($query['_ttl']) ? $query['_ttl'] : null,
116
                    '_parent' => isset($query['_parent']) ? $query['_parent'] : null,
117
                ]
118
            ),
119
        ];
120
        unset($query['_id'], $query['_ttl'], $query['_parent']);
121
122
        switch ($operation) {
123
            case 'index':
124
            case 'create':
125
                $this->bulkQueries['body'][] = $query;
126
                break;
127
            case 'update':
128
                $this->bulkQueries['body'][] = ['doc' => $query];
129
                break;
130
            case 'delete':
131
                // Body for delete operation is not needed to apply.
132
            default:
133
                // Do nothing.
134
                break;
135
        }
136
    }
137
138
    /**
139
     * Optional setter to change bulk query params.
140
     *
141
     * @param array $params Possible keys:
142
     *                      ['consistency'] = (enum) Explicit write consistency setting for the operation.
143
     *                      ['refresh']     = (boolean) Refresh the index after performing the operation.
144
     *                      ['replication'] = (enum) Explicitly set the replication type.
145
     */
146
    public function setBulkParams(array $params)
147
    {
148
        $this->bulkParams = $params;
149
    }
150
151
    /**
152
     * Transmits the current query container to the index, used for bulk queries execution.
153
     *
154
     * @param bool $flush Flag for executing flush.
155
     */
156
    public function commit($flush = true)
157
    {
158
        $this->bulkQueries = array_merge($this->bulkQueries, $this->bulkParams);
159
        $this->getClient()->bulk($this->bulkQueries);
160
        if ($flush) {
161
            $this->flush();
162
        }
163
164
        $this->bulkQueries = [];
165
    }
166
167
    /**
168
     * Send refresh call to index.
169
     *
170
     * Makes your documents available for search.
171
     */
172
    public function refresh()
173
    {
174
        $this->getClient()->indices()->refresh(['index' => $this->getIndexName()]);
175
    }
176
177
    /**
178
     * Send flush call to index.
179
     *
180
     * Causes a Lucene commit to happen (more expensive than refresh).
181
     */
182
    public function flush()
183
    {
184
        $this->getClient()->indices()->flush(['index' => $this->getIndexName()]);
185
    }
186
187
    /**
188
     * Removes a single document.
189
     *
190
     * @param array $params Parameters.
191
     *
192
     *                      $params = [
193
     *                      'index' => 'index_name',
194
     *                      'type' => 'document_type',
195
     *                      'id' => 'id',
196
     *                      ];.
197
     *
198
     * @return array
199
     */
200
    public function delete($params)
201
    {
202
        $this->isReadOnly('Delete');
203
204
        return $this->getClient()->delete($params);
205
    }
206
207
    /**
208
     * Delete by query.
209
     *
210
     * @param array $types List of types to search in.
211
     * @param array $query Query to execute.
212
     *
213
     * @return array
214
     */
215
    public function deleteByQuery(array $types, array $query)
216
    {
217
        $params = [];
218
        $params['index'] = $this->getIndexName();
219
        $params['type'] = implode(',', $types);
220
        $params['body'] = $query;
221
222
        return $this->getClient()->deleteByQuery($params);
223
    }
224
225
    /**
226
     * Executes search query in the index.
227
     *
228
     * @param array $types             List of types to search in.
229
     * @param array $query             Query to execute.
230
     * @param array $queryStringParams Query parameters.
231
     *
232
     * @return array
233
     */
234
    public function search(array $types, array $query, array $queryStringParams = [])
235
    {
236
        $params = [];
237
        $params['index'] = $this->getIndexName();
238
        $params['type'] = implode(',', $types);
239
        $params['body'] = $query;
240
241
        if (!empty($queryStringParams)) {
242
            $params = array_merge($queryStringParams, $params);
243
        }
244
245
        return $this->getClient()->search($params);
246
    }
247
248
    /**
249
     * Execute scrolled search.
250
     *
251
     * @param string $scrollId       Scroll id.
252
     * @param string $scrollDuration Specify how long a consistent view of the index should be maintained
253
     *                               for scrolled search.
254
     *
255
     * @return array
256
     */
257
    public function scroll($scrollId, $scrollDuration)
258
    {
259
        $params = [];
260
        $params['scroll_id'] = $scrollId;
261
        $params['scroll'] = $scrollDuration;
262
263
        return $this->getClient()->scroll($params);
264
    }
265
266
    /**
267
     * Creates fresh elasticsearch index.
268
     *
269
     * @param bool $putWarmers Determines if warmers should be loaded.
270
     * @param bool $noMapping  Determines if mapping should be included.
271
     */
272
    public function createIndex($putWarmers = false, $noMapping = false)
273
    {
274
        $this->isReadOnly('Create index');
275
276
        $settings = $this->settings;
277
278
        if ($noMapping) {
279
            unset($settings['body']['mappings']);
280
        }
281
        $this->getClient()->indices()->create($settings);
282
283
        if ($putWarmers) {
284
            // Sometimes Elasticsearch gives service unavailable.
285
            usleep(200000);
286
            $this->putWarmers();
287
        }
288
    }
289
290
    /**
291
     * Drops elasticsearch index.
292
     */
293
    public function dropIndex()
294
    {
295
        $this->isReadOnly('Drop index');
296
297
        $this->getClient()->indices()->delete(['index' => $this->getIndexName()]);
298
    }
299
300
    /**
301
     * Puts mapping into elasticsearch client.
302
     *
303
     * @param array $types Specific types to put.
304
     *
305
     * @return int
306
     */
307
    public function createTypes(array $types = [])
308
    {
309
        $this->isReadOnly('Create types');
310
311
        $mapping = $this->getMapping($types);
312
        if (empty($mapping)) {
313
            return 0;
314
        }
315
316
        $mapping = array_diff_key($mapping, $this->getMappingFromIndex($types));
317
        if (empty($mapping)) {
318
            return -1;
319
        }
320
321
        $this->loadMappingArray($mapping);
322
323
        return 1;
324
    }
325
326
    /**
327
     * Drops mapping from elasticsearch client.
328
     *
329
     * @param array $types Specific types to drop.
330
     *
331
     * @return int
332
     */
333
    public function dropTypes(array $types = [])
334
    {
335
        $this->isReadOnly('Drop types');
336
337
        $mapping = $this->getMapping($types);
338
339
        if (empty($mapping)) {
340
            return 0;
341
        }
342
343
        $this->unloadMappingArray(array_keys($mapping));
344
345
        return 1;
346
    }
347
348
    /**
349
     * Updates elasticsearch client mapping.
350
     *
351
     * @param array $types Specific types to update.
352
     *
353
     * @return int
354
     */
355
    public function updateTypes(array $types = [])
356
    {
357
        $this->isReadOnly('Update types');
358
359
        if (!$this->getMapping($types)) {
360
            return -1;
361
        }
362
363
        $tempSettings = $this->settings;
364
        $tempSettings['index'] = uniqid('mapping_check_');
365
        $mappingCheckConnection = new Connection($this->getClient(), $tempSettings);
366
        $mappingCheckConnection->createIndex();
367
        $mappingCheckConnection->createTypes($types);
368
369
        $newMapping = $mappingCheckConnection->getMappingFromIndex($types);
370
        $oldMapping = $this->getMappingFromIndex($types);
371
372
        $mappingCheckConnection->dropIndex();
373
374
        $tool = new MappingTool();
375
        $updated = (int)$tool->checkMapping($oldMapping, $newMapping);
376
377
        if ($updated) {
378
            $this->unloadMappingArray($tool->getRemovedTypes());
379
            $this->loadMappingArray($tool->getUpdatedTypes());
380
        }
381
382
        return $updated;
383
    }
384
385
    /**
386
     * Tries to drop and create fresh elasticsearch index.
387
     *
388
     * @param bool $putWarmers Determines if warmers should be loaded.
389
     * @param bool $noMapping  Determines if mapping should be included.
390
     */
391
    public function dropAndCreateIndex($putWarmers = false, $noMapping = false)
392
    {
393
        try {
394
            $this->dropIndex();
395
        } catch (\Exception $e) {
396
            // Do nothing because I'm only trying.
397
        }
398
399
        $this->createIndex($putWarmers, $noMapping);
400
    }
401
402
    /**
403
     * Checks if connection index is already created.
404
     *
405
     * @return bool
406
     */
407
    public function indexExists()
408
    {
409
        return $this->getClient()->indices()->exists(['index' => $this->getIndexName()]);
410
    }
411
412
    /**
413
     * Returns index name this connection is attached to.
414
     *
415
     * @return string
416
     */
417
    public function getIndexName()
418
    {
419
        return $this->settings['index'];
420
    }
421
422
    /**
423
     * Sets index name this connection is attached to.
424
     *
425
     * @param string $name
426
     */
427
    public function setIndexName($name)
428
    {
429
        $this->settings['index'] = $name;
430
    }
431
432
    /**
433
     * Returns mapping by type if defined.
434
     *
435
     * @param string|array $type Type names.
436
     *
437
     * @return array|null
438
     */
439
    public function getMapping($type = [])
440
    {
441
        if (isset($this->settings['body']['mappings'])) {
442
            return $this->filterMapping($type, $this->settings['body']['mappings']);
443
        }
444
445
        return null;
446
    }
447
448
    /**
449
     * Sets whole mapping, deleting non-existent types.
450
     *
451
     * @param array $mapping Mapping structure to force.
452
     */
453
    public function forceMapping(array $mapping)
454
    {
455
        $this->settings['body']['mappings'] = $mapping;
456
    }
457
458
    /**
459
     * Sets mapping by type.
460
     *
461
     * @param string $type    Type name.
462
     * @param array  $mapping Mapping structure.
463
     */
464
    public function setMapping($type, array $mapping)
465
    {
466
        $this->settings['body']['mappings'][$type] = $mapping;
467
    }
468
469
    /**
470
     * Sets multiple mappings.
471
     *
472
     * @param array $mapping Mapping to set.
473
     * @param bool  $cleanUp Cleans current mapping.
474
     */
475
    public function setMultipleMapping(array $mapping, $cleanUp = false)
476
    {
477
        if ($cleanUp === true) {
478
            unset($this->settings['body']['mappings']);
479
        }
480
481
        foreach ($mapping as $type => $map) {
482
            $this->setMapping($type, $map);
483
        }
484
    }
485
486
    /**
487
     * Mapping is compared with loaded, if needed updates it and returns true.
488
     *
489
     * @param array $types Types to update.
490
     *
491
     * @return bool
492
     *
493
     * @throws \LogicException
494
     *
495
     * @deprecated Will be removed in 1.0. Please now use Connection#updateTypes().
496
     */
497
    public function updateMapping(array $types = [])
498
    {
499
        return $this->updateTypes($types);
500
    }
501
502
    /**
503
     * Closes index.
504
     */
505
    public function close()
506
    {
507
        $this->isReadOnly('Close index');
508
509
        $this->getClient()->indices()->close(['index' => $this->getIndexName()]);
510
    }
511
512
    /**
513
     * Returns whether the index is opened.
514
     *
515
     * @return bool
516
     */
517
    public function isOpen()
518
    {
519
        try {
520
            $this->getClient()->indices()->recovery(['index' => $this->getIndexName()]);
521
        } catch (Forbidden403Exception $ex) {
522
            return false;
523
        }
524
525
        return true;
526
    }
527
528
    /**
529
     * Opens index.
530
     */
531
    public function open()
532
    {
533
        $this->isReadOnly('Open index');
534
535
        $this->getClient()->indices()->open(['index' => $this->getIndexName()]);
536
    }
537
538
    /**
539
     * Returns mapping from index.
540
     *
541
     * @param array|string $types Returns only certain set of types if set.
542
     *
543
     * @return array
544
     */
545
    public function getMappingFromIndex($types = [])
546
    {
547
        $mapping = $this
548
            ->getClient()
549
            ->indices()
550
            ->getMapping(['index' => $this->getIndexName()]);
551
552
        if (array_key_exists($this->getIndexName(), $mapping)) {
553
            return $this->filterMapping($types, $mapping[$this->getIndexName()]['mappings']);
554
        }
555
556
        return [];
557
    }
558
559
    /**
560
     * Returns Elasticsearch version number.
561
     *
562
     * @return string
563
     */
564
    public function getVersionNumber()
565
    {
566
        return $this->getClient()->info()['version']['number'];
567
    }
568
569
    /**
570
     * Updates index settings recursive.
571
     *
572
     * @param array $settings Settings.
573
     * @param bool  $force    Should replace structure instead of merging.
574
     */
575
    public function updateSettings(array $settings, $force = false)
576
    {
577
        if ($force) {
578
            $this->settings = $settings;
579
        } else {
580
            $this->settings = array_replace_recursive($this->settings, $settings);
581
        }
582
    }
583
584
    /**
585
     * Clears elasticsearch client cache.
586
     */
587
    public function clearCache()
588
    {
589
        $this->isReadOnly('Clear cache');
590
591
        $this->getClient()->indices()->clearCache(['index' => $this->getIndexName()]);
592
    }
593
594
    /**
595
     * Adds warmer to container.
596
     *
597
     * @param WarmerInterface $warmer
598
     */
599
    public function addWarmer(WarmerInterface $warmer)
600
    {
601
        $this->warmers->addWarmer($warmer);
602
    }
603
604
    /**
605
     * Loads warmers into elasticseach.
606
     *
607
     * @param array $names Warmers names to put.
608
     *
609
     * @return bool
610
     */
611
    public function putWarmers(array $names = [])
612
    {
613
        return $this->warmersAction('put', $names);
614
    }
615
616
    /**
617
     * Deletes warmers from elasticsearch index.
618
     *
619
     * @param array $names Warmers names to delete.
620
     *
621
     * @return bool
622
     */
623
    public function deleteWarmers(array $names = [])
624
    {
625
        return $this->warmersAction('delete', $names);
626
    }
627
628
    /**
629
     * Set connection to read only state.
630
     *
631
     * @param bool $readOnly
632
     */
633
    public function setReadOnly($readOnly)
634
    {
635
        $this->readOnly = $readOnly;
636
    }
637
638
    /**
639
     * Checks if connection is read only.
640
     *
641
     * @param string $message Error message.
642
     *
643
     * @throws Forbidden403Exception
644
     */
645
    public function isReadOnly($message = '')
646
    {
647
        if ($this->readOnly) {
648
            throw new Forbidden403Exception("Manager is readonly! {$message} operation not permitted.");
649
        }
650
    }
651
652
    /**
653
     * Executes warmers actions.
654
     *
655
     * @param string $action Action name.
656
     * @param array  $names  Warmers names.
657
     *
658
     * @return bool
659
     *
660
     * @throws \LogicException
661
     */
662
    private function warmersAction($action, $names = [])
663
    {
664
        $this->isReadOnly('Warmer edit');
665
666
        $status = false;
667
        $warmers = $this->warmers->getWarmers();
668
        $this->validateWarmers($names, array_keys($warmers));
669
670
        foreach ($warmers as $name => $body) {
671
            if (empty($names) || in_array($name, $names)) {
672
                switch ($action) {
673
                    case 'put':
674
                        $this->getClient()->indices()->putWarmer(
675
                            [
676
                                'index' => $this->getIndexName(),
677
                                'name' => $name,
678
                                'body' => $body,
679
                            ]
680
                        );
681
                        break;
682
                    case 'delete':
683
                        $this->getClient()->indices()->deleteWarmer(
684
                            [
685
                                'index' => $this->getIndexName(),
686
                                'name' => $name,
687
                            ]
688
                        );
689
                        break;
690
                    default:
691
                        throw new \LogicException('Unknown warmer action');
692
                }
693
            }
694
695
            $status = true;
696
        }
697
698
        return $status;
699
    }
700
701
    /**
702
     * Warmer names validation.
703
     *
704
     * @param array $names       Names to check.
705
     * @param array $warmerNames Warmer names loaded.
706
     *
707
     * @throws \RuntimeException
708
     */
709
    private function validateWarmers($names, $warmerNames = [])
710
    {
711
        if (empty($warmerNames)) {
712
            $warmerNames = array_keys($this->warmers->getWarmers());
713
        }
714
715
        $unknown = array_diff($names, $warmerNames);
716
717
        if (!empty($unknown)) {
718
            throw new \RuntimeException(
719
                'Warmer(s) named ' . implode(', ', $unknown)
720
                . ' do not exist. Available: ' . implode(', ', $warmerNames)
721
            );
722
        }
723
    }
724
725
    /**
726
     * Puts mapping into elasticsearch.
727
     *
728
     * @param array $mapping Mapping to put into client.
729
     */
730
    private function loadMappingArray(array $mapping)
731
    {
732
        foreach ($mapping as $type => $properties) {
733
            $this->getClient()->indices()->putMapping(
734
                [
735
                    'index' => $this->getIndexName(),
736
                    'type' => $type,
737
                    'body' => [
738
                        $type => $properties,
739
                    ],
740
                ]
741
            );
742
        }
743
    }
744
745
    /**
746
     * Drops mapping from elasticsearch client.
747
     *
748
     * @param array $mapping Mapping to drop from client.
749
     */
750
    private function unloadMappingArray(array $mapping)
751
    {
752
        foreach ($mapping as $type) {
753
            $this->getClient()->indices()->deleteMapping(
754
                [
755
                    'index' => $this->getIndexName(),
756
                    'type' => $type,
757
                ]
758
            );
759
        }
760
    }
761
762
    /**
763
     * Filters out mapping from given type.
764
     *
765
     * @param string|array $type    Types to filter from mapping.
766
     * @param array        $mapping Mapping array.
767
     *
768
     * @return array
769
     */
770
    private function filterMapping($type, $mapping)
771
    {
772
        if (empty($type)) {
773
            return $mapping;
774
        } elseif (is_string($type) && array_key_exists($type, $mapping)) {
775
            return $mapping[$type];
776
        } elseif (is_array($type)) {
777
            return array_intersect_key($mapping, array_flip($type));
778
        }
779
780
        return [];
781
    }
782
}
783