Completed
Push — master ( 643204...f070d3 )
by Nicolas
01:55
created

Reindex::setWaitForActiveShards()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 4
rs 10
c 0
b 0
f 0
cc 1
nc 1
nop 1
1
<?php
2
3
namespace Elastica;
4
5
use Elastica\Query\AbstractQuery;
6
use Elastica\Script\AbstractScript;
7
use Elastica\Script\Script;
8
9
class Reindex extends Param
10
{
11
    const VERSION_TYPE = 'version_type';
12
    const VERSION_TYPE_INTERNAL = 'internal';
13
    const VERSION_TYPE_EXTERNAL = 'external';
14
    const OPERATION_TYPE = 'op_type';
15
    const OPERATION_TYPE_CREATE = 'create';
16
    const CONFLICTS = 'conflicts';
17
    const CONFLICTS_PROCEED = 'proceed';
18
    const TYPE = 'type';
19
    const SIZE = 'size';
20
    const QUERY = 'query';
21
    const SORT = 'sort';
22
    const SCRIPT = 'script';
23
    const SOURCE = '_source';
24
    const REMOTE = 'remote';
25
    const SLICE = 'slice';
26
    const REFRESH = 'refresh';
27
    const WAIT_FOR_COMPLETION = 'wait_for_completion';
28
    const WAIT_FOR_COMPLETION_FALSE = 'false';
29
    const WAIT_FOR_ACTIVE_SHARDS = 'wait_for_active_shards';
30
    const TIMEOUT = 'timeout';
31
    const SCROLL = 'scroll';
32
    const REQUESTS_PER_SECOND = 'requests_per_second';
33
34
    /**
35
     * @var Index
36
     */
37
    protected $_oldIndex;
38
39
    /**
40
     * @var Index
41
     */
42
    protected $_newIndex;
43
44
    /**
45
     * @var array
46
     */
47
    protected $_options;
48
49
    /**
50
     * @var Response|null
51
     */
52
    protected $_lastResponse;
53
54
    public function __construct(Index $oldIndex, Index $newIndex, array $params = [])
55
    {
56
        $this->_oldIndex = $oldIndex;
57
        $this->_newIndex = $newIndex;
58
59
        $this->setParams($params);
60
    }
61
62
    public function run(): Response
63
    {
64
        $body = $this->_getBody($this->_oldIndex, $this->_newIndex, $this->getParams());
65
66
        $reindexEndpoint = new \Elasticsearch\Endpoints\Reindex();
67
        $params = \array_intersect_key($this->getParams(), \array_fill_keys($reindexEndpoint->getParamWhitelist(), null));
68
        $reindexEndpoint->setParams($params);
69
        $reindexEndpoint->setBody($body);
70
71
        $this->_lastResponse = $this->_oldIndex->getClient()->requestEndpoint($reindexEndpoint);
72
73
        return $this->_lastResponse;
74
    }
75
76
    protected function _getBody(Index $oldIndex, Index $newIndex, array $params): array
77
    {
78
        $body = \array_merge([
79
            'source' => $this->_getSourcePartBody($oldIndex, $params),
80
            'dest' => $this->_getDestPartBody($newIndex, $params),
81
        ], $this->_resolveBodyOptions($params));
82
83
        $body = $this->_setBodyScript($body);
84
85
        return $body;
86
    }
87
88
    protected function _getSourcePartBody(Index $index, array $params): array
89
    {
90
        $sourceBody = \array_merge([
91
            'index' => $index->getName(),
92
        ], $this->_resolveSourceOptions($params));
93
94
        $sourceBody = $this->_setSourceQuery($sourceBody);
95
        $sourceBody = $this->_setSourceType($sourceBody);
96
97
        return $sourceBody;
98
    }
99
100
    protected function _getDestPartBody(Index $index, array $params): array
101
    {
102
        $destBody = \array_merge([
103
            'index' => $index->getName(),
104
        ], $this->_resolveDestOptions($params));
105
106
        return $destBody;
107
    }
108
109
    private function _resolveSourceOptions(array $params): array
110
    {
111
        return \array_intersect_key($params, [
112
            self::TYPE => null,
113
            self::QUERY => null,
114
            self::SORT => null,
115
            self::SOURCE => null,
116
            self::REMOTE => null,
117
            self::SLICE => null,
118
        ]);
119
    }
120
121
    private function _resolveDestOptions(array $params): array
122
    {
123
        return \array_intersect_key($params, [
124
            self::VERSION_TYPE => null,
125
            self::OPERATION_TYPE => null,
126
        ]);
127
    }
128
129
    private function _resolveBodyOptions(array $params): array
130
    {
131
        return \array_intersect_key($params, [
132
            self::SIZE => null,
133
            self::CONFLICTS => null,
134
        ]);
135
    }
136
137
    private function _setSourceQuery(array $sourceBody): array
138
    {
139
        if (isset($sourceBody[self::QUERY]) && $sourceBody[self::QUERY] instanceof AbstractQuery) {
140
            $sourceBody[self::QUERY] = $sourceBody[self::QUERY]->toArray();
141
        }
142
143
        return $sourceBody;
144
    }
145
146
    private function _setSourceType(array $sourceBody): array
147
    {
148
        if (isset($sourceBody[self::TYPE]) && !\is_array($sourceBody[self::TYPE])) {
149
            $sourceBody[self::TYPE] = [$sourceBody[self::TYPE]];
150
        }
151
        if (isset($sourceBody[self::TYPE])) {
152
            foreach ($sourceBody[self::TYPE] as $key => $type) {
153
                if ($type instanceof Type) {
154
                    $sourceBody[self::TYPE][$key] = $type->getName();
155
                }
156
            }
157
        }
158
159
        return $sourceBody;
160
    }
161
162
    private function _setBodyScript(array $body): array
163
    {
164
        if (!$this->hasParam(self::SCRIPT)) {
165
            return $body;
166
        }
167
168
        $script = $this->getParam(self::SCRIPT);
169
170
        if ($script instanceof AbstractScript) {
171
            $body = \array_merge($body, $script->toArray());
172
        } else {
173
            $body[self::SCRIPT] = $script;
174
        }
175
176
        return $body;
177
    }
178
179
    public function setWaitForCompletion($value)
180
    {
181
        \is_bool($value) && $value = $value ? 'true' : 'false';
182
183
        $this->setParam(self::WAIT_FOR_COMPLETION, $value);
184
    }
185
186
    public function setWaitForActiveShards($value)
187
    {
188
        $this->setParam(self::WAIT_FOR_ACTIVE_SHARDS, $value);
189
    }
190
191
    public function setTimeout($value)
192
    {
193
        $this->setParam(self::TIMEOUT, $value);
194
    }
195
196
    public function setScroll($value)
197
    {
198
        $this->setParam(self::SCROLL, $value);
199
    }
200
201
    public function setRequestsPerSecond($value)
202
    {
203
        $this->setParam(self::REQUESTS_PER_SECOND, $value);
204
    }
205
206
    public function setScript(Script $script)
207
    {
208
        $this->setParam(self::SCRIPT, $script);
209
    }
210
211
    public function getTaskId()
212
    {
213
        $taskId = null;
214
        if ($this->_lastResponse instanceof Response) {
215
            $taskId = $this->_lastResponse->getData()['task'] ? $this->_lastResponse->getData()['task'] : null;
216
        }
217
218
        return $taskId;
219
    }
220
}
221