Completed
Push — master ( 5e8cea...8544a9 )
by Nicolas
01:42
created

Reindex::setPipeline()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 4
rs 10
c 0
b 0
f 0
cc 1
nc 1
nop 1
1
<?php
2
3
namespace Elastica;
4
5
use Elastica\Query\AbstractQuery;
6
use Elastica\Script\AbstractScript;
7
use Elastica\Script\Script;
8
9
class Reindex extends Param
10
{
11
    public const VERSION_TYPE = 'version_type';
12
    public const VERSION_TYPE_INTERNAL = 'internal';
13
    public const VERSION_TYPE_EXTERNAL = 'external';
14
    public const OPERATION_TYPE = 'op_type';
15
    public const OPERATION_TYPE_CREATE = 'create';
16
    public const CONFLICTS = 'conflicts';
17
    public const CONFLICTS_PROCEED = 'proceed';
18
    public const SIZE = 'size';
19
    public const QUERY = 'query';
20
    public const SORT = 'sort';
21
    public const SCRIPT = 'script';
22
    public const SOURCE = '_source';
23
    public const REMOTE = 'remote';
24
    public const SLICE = 'slice';
25
    public const REFRESH = 'refresh';
26
    public const REFRESH_TRUE = 'true';
27
    public const REFRESH_FALSE = 'false';
28
    public const REFRESH_WAIT_FOR = 'wait_for';
29
    public const WAIT_FOR_COMPLETION = 'wait_for_completion';
30
    public const WAIT_FOR_COMPLETION_FALSE = 'false';
31
    public const WAIT_FOR_ACTIVE_SHARDS = 'wait_for_active_shards';
32
    public const TIMEOUT = 'timeout';
33
    public const SCROLL = 'scroll';
34
    public const REQUESTS_PER_SECOND = 'requests_per_second';
35
    public const PIPELINE = 'pipeline';
36
    public const SLICES = 'slices';
37
    public const SLICES_AUTO = 'auto';
38
39
    /**
40
     * @var Index
41
     */
42
    protected $_oldIndex;
43
44
    /**
45
     * @var Index
46
     */
47
    protected $_newIndex;
48
49
    /**
50
     * @var Response|null
51
     */
52
    protected $_lastResponse;
53
54
    public function __construct(Index $oldIndex, Index $newIndex, array $params = [])
55
    {
56
        $this->_oldIndex = $oldIndex;
57
        $this->_newIndex = $newIndex;
58
59
        $this->setParams($params);
60
    }
61
62
    public function run(): Response
63
    {
64
        $body = $this->_getBody($this->_oldIndex, $this->_newIndex, $this->getParams());
65
66
        $reindexEndpoint = new \Elasticsearch\Endpoints\Reindex();
67
        $params = \array_intersect_key($this->getParams(), \array_fill_keys($reindexEndpoint->getParamWhitelist(), null));
68
        $reindexEndpoint->setParams($params);
69
        $reindexEndpoint->setBody($body);
70
71
        $this->_lastResponse = $this->_oldIndex->getClient()->requestEndpoint($reindexEndpoint);
72
73
        return $this->_lastResponse;
74
    }
75
76
    protected function _getBody(Index $oldIndex, Index $newIndex, array $params): array
77
    {
78
        $body = \array_merge([
79
            'source' => $this->_getSourcePartBody($oldIndex, $params),
80
            'dest' => $this->_getDestPartBody($newIndex, $params),
81
        ], $this->_resolveBodyOptions($params));
82
83
        $body = $this->_setBodyScript($body);
84
85
        return $body;
86
    }
87
88
    protected function _getSourcePartBody(Index $index, array $params): array
89
    {
90
        $sourceBody = \array_merge([
91
            'index' => $index->getName(),
92
        ], $this->_resolveSourceOptions($params));
93
94
        $sourceBody = $this->_setSourceQuery($sourceBody);
95
96
        return $sourceBody;
97
    }
98
99
    protected function _getDestPartBody(Index $index, array $params): array
100
    {
101
        $destBody = \array_merge([
102
            'index' => $index->getName(),
103
        ], $this->_resolveDestOptions($params));
104
105
        // Resolves the pipeline name
106
        $pipeline = $destBody[self::PIPELINE] ?? null;
107
        if ($pipeline instanceof Pipeline) {
108
            $destBody[self::PIPELINE] = $pipeline->getId();
109
        }
110
111
        return $destBody;
112
    }
113
114
    private function _resolveSourceOptions(array $params): array
115
    {
116
        return \array_intersect_key($params, [
117
            self::QUERY => null,
118
            self::SORT => null,
119
            self::SOURCE => null,
120
            self::REMOTE => null,
121
            self::SLICE => null,
122
        ]);
123
    }
124
125
    private function _resolveDestOptions(array $params): array
126
    {
127
        return \array_intersect_key($params, [
128
            self::VERSION_TYPE => null,
129
            self::OPERATION_TYPE => null,
130
            self::PIPELINE => null,
131
        ]);
132
    }
133
134
    private function _resolveBodyOptions(array $params): array
135
    {
136
        return \array_intersect_key($params, [
137
            self::SIZE => null,
138
            self::CONFLICTS => null,
139
        ]);
140
    }
141
142
    private function _setSourceQuery(array $sourceBody): array
143
    {
144
        if (isset($sourceBody[self::QUERY]) && $sourceBody[self::QUERY] instanceof AbstractQuery) {
145
            $sourceBody[self::QUERY] = $sourceBody[self::QUERY]->toArray();
146
        }
147
148
        return $sourceBody;
149
    }
150
151
    private function _setBodyScript(array $body): array
152
    {
153
        if (!$this->hasParam(self::SCRIPT)) {
154
            return $body;
155
        }
156
157
        $script = $this->getParam(self::SCRIPT);
158
159
        if ($script instanceof AbstractScript) {
160
            $body = \array_merge($body, $script->toArray());
161
        } else {
162
            $body[self::SCRIPT] = $script;
163
        }
164
165
        return $body;
166
    }
167
168
    public function setWaitForCompletion($value)
169
    {
170
        \is_bool($value) && $value = $value ? 'true' : 'false';
171
172
        $this->setParam(self::WAIT_FOR_COMPLETION, $value);
173
    }
174
175
    public function setWaitForActiveShards($value)
176
    {
177
        $this->setParam(self::WAIT_FOR_ACTIVE_SHARDS, $value);
178
    }
179
180
    public function setTimeout($value)
181
    {
182
        $this->setParam(self::TIMEOUT, $value);
183
    }
184
185
    public function setScroll($value)
186
    {
187
        $this->setParam(self::SCROLL, $value);
188
    }
189
190
    public function setRequestsPerSecond($value)
191
    {
192
        $this->setParam(self::REQUESTS_PER_SECOND, $value);
193
    }
194
195
    public function setScript(Script $script)
196
    {
197
        $this->setParam(self::SCRIPT, $script);
198
    }
199
200
    public function setQuery(AbstractQuery $query): void
201
    {
202
        $this->setParam(self::QUERY, $query);
203
    }
204
205
    public function setPipeline(Pipeline $pipeline): void
206
    {
207
        $this->setParam(self::PIPELINE, $pipeline);
208
    }
209
210
    /**
211
     * @param bool|string $value
212
     */
213
    public function setRefresh($value): void
214
    {
215
        \is_bool($value) && $value = $value ? self::REFRESH_TRUE : self::REFRESH_FALSE;
216
217
        $this->setParam(self::REFRESH, $value);
218
    }
219
220
    public function getTaskId()
221
    {
222
        $taskId = null;
223
        if ($this->_lastResponse instanceof Response) {
224
            $taskId = $this->_lastResponse->getData()['task'] ? $this->_lastResponse->getData()['task'] : null;
225
        }
226
227
        return $taskId;
228
    }
229
}
230