UpdateIndexAliasStage   A
last analyzed

Complexity

Total Complexity 5

Size/Duplication

Total Lines 72
Duplicated Lines 0 %

Importance

Changes 2
Bugs 0 Features 0
Metric Value
wmc 5
eloc 27
c 2
b 0
f 0
dl 0
loc 72
rs 10

2 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 3 1
A __invoke() 0 51 4
1
<?php
2
3
namespace Nord\Lumen\Elasticsearch\Pipelines\Stages;
4
5
use Elasticsearch\Common\Exceptions\Missing404Exception;
6
use League\Pipeline\StageInterface;
7
use Nord\Lumen\Elasticsearch\Contracts\ElasticsearchServiceContract;
8
use Nord\Lumen\Elasticsearch\Pipelines\Payloads\ApplyMigrationPayload;
9
10
/**
11
 * Class UpdateAliasStage
12
 * @package Nord\Lumen\Elasticsearch\Pipelines\Stages
13
 */
14
class UpdateIndexAliasStage implements StageInterface
15
{
16
17
    /**
18
     * @var ElasticsearchServiceContract
19
     */
20
    private $elasticsearchService;
21
22
    /**
23
     * CheckIndexExistsStage constructor.
24
     *
25
     * @param ElasticsearchServiceContract $elasticsearchService
26
     */
27
    public function __construct(ElasticsearchServiceContract $elasticsearchService)
28
    {
29
        $this->elasticsearchService = $elasticsearchService;
30
    }
31
32
    /**
33
     * @inheritDoc
34
     */
35
    public function __invoke($payload)
36
    {
37
        /** @var ApplyMigrationPayload $payload */
38
39
        $indices         = $this->elasticsearchService->indices();
40
        $alias           = $payload->getPrefixedIndexName();
41
        $orphanedIndices = [];
42
43
        // If we already have an alias in place we store the indices it points to right now
44
        try {
45
            $aliasDefinition = $indices->getAlias([
46
                'name' => $alias,
47
            ]);
48
49
            $orphanedIndices = array_keys($aliasDefinition);
50
        } catch (Missing404Exception $e) {
51
            // If the alias doesn't exist we need to remove the index that has its name
52
            if ($indices->exists(['index' => $alias])) {
53
                $indices->delete(['index' => $alias]);
54
            }
55
        }
56
57
        // Revert temporary index settings
58
        $indices->putSettings([
59
            'index' => $payload->getPrefixedTargetVersionName(),
60
            'body'  => [
61
                'refresh_interval'   => '1s',
62
                'number_of_replicas' => $payload->getNumberOfReplicas(),
63
            ],
64
        ]);
65
66
        // Update the alias definition
67
        $indices->updateAliases([
68
            'body' => [
69
                'actions' => [
70
                    [
71
                        'add' => [
72
                            'index' => $payload->getPrefixedTargetVersionName(),
73
                            'alias' => $alias,
74
                        ],
75
                    ],
76
                ],
77
            ],
78
        ]);
79
80
        // Remove orphaned indices
81
        foreach ($orphanedIndices as $orphanedIndex) {
82
            $indices->delete(['index' => $orphanedIndex]);
83
        }
84
85
        return $payload;
86
    }
87
}
88