Completed
Pull Request — master (#1803)
by thomas
02:21
created

Bulk::addAction()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 6
rs 10
c 0
b 0
f 0
cc 1
nc 1
nop 1
1
<?php
2
3
namespace Elastica;
4
5
use Elastica\Bulk\Action;
6
use Elastica\Bulk\Action\AbstractDocument as AbstractDocumentAction;
7
use Elastica\Bulk\Response as BulkResponse;
8
use Elastica\Bulk\ResponseSet;
9
use Elastica\Exception\Bulk\ResponseException;
10
use Elastica\Exception\Bulk\ResponseException as BulkResponseException;
11
use Elastica\Exception\InvalidException;
12
use Elastica\Script\AbstractScript;
13
14
class Bulk
15
{
16
    public const DELIMITER = "\n";
17
18
    /**
19
     * @var Client
20
     */
21
    protected $_client;
22
23
    /**
24
     * @var Action[]
25
     */
26
    protected $_actions = [];
27
28
    /**
29
     * @var string|null
30
     */
31
    protected $_index;
32
33
    /**
34
     * @var string|null
35
     */
36
    protected $_type;
37
38
    /**
39
     * @var array request parameters to the bulk api
40
     */
41
    protected $_requestParams = [];
42
43
    public function __construct(Client $client)
44
    {
45
        $this->_client = $client;
46
    }
47
48
    public function __toString(): string
49
    {
50
        return $this->toString();
51
    }
52
53
    /**
54
     * @param Index|string $index
55
     *
56
     * @return $this
57
     */
58
    public function setIndex($index): self
59
    {
60
        if ($index instanceof Index) {
61
            $index = $index->getName();
62
        }
63
64
        $this->_index = (string) $index;
65
66
        return $this;
67
    }
68
69
    /**
70
     * @return string|null
71
     */
72
    public function getIndex()
73
    {
74
        return $this->_index;
75
    }
76
77
    public function hasIndex(): bool
78
    {
79
        return null !== $this->getIndex() && '' !== $this->getIndex();
80
    }
81
82
    public function getPath(): string
83
    {
84
        $path = '';
85
        if ($this->hasIndex()) {
86
            $path .= $this->getIndex().'/';
87
        }
88
        $path .= '_bulk';
89
90
        return $path;
91
    }
92
93
    /**
94
     * @return $this
95
     */
96
    public function addAction(Action $action): self
97
    {
98
        $this->_actions[] = $action;
99
100
        return $this;
101
    }
102
103
    /**
104
     * @param Action[] $actions
105
     *
106
     * @return $this
107
     */
108
    public function addActions(array $actions): self
109
    {
110
        foreach ($actions as $action) {
111
            $this->addAction($action);
112
        }
113
114
        return $this;
115
    }
116
117
    /**
118
     * @return Action[]
119
     */
120
    public function getActions(): array
121
    {
122
        return $this->_actions;
123
    }
124
125
    /**
126
     * @return $this
127
     */
128
    public function addDocument(Document $document, ?string $opType = null): self
129
    {
130
        $action = AbstractDocumentAction::create($document, $opType);
131
132
        return $this->addAction($action);
133
    }
134
135
    /**
136
     * @param Document[] $documents
137
     *
138
     * @return $this
139
     */
140
    public function addDocuments(array $documents, ?string $opType = null): self
141
    {
142
        foreach ($documents as $document) {
143
            $this->addDocument($document, $opType);
144
        }
145
146
        return $this;
147
    }
148
149
    /**
150
     * @return $this
151
     */
152
    public function addScript(AbstractScript $script, ?string $opType = null): self
153
    {
154
        $action = AbstractDocumentAction::create($script, $opType);
155
156
        return $this->addAction($action);
157
    }
158
159
    /**
160
     * @param Document[] $scripts
161
     * @param mixed|null $opType
162
     *
163
     * @return $this
164
     */
165
    public function addScripts(array $scripts, $opType = null): self
166
    {
167
        foreach ($scripts as $document) {
168
            $this->addScript($document, $opType);
0 ignored issues
show
Documentation introduced by
$document is of type object<Elastica\Document>, but the function expects a object<Elastica\Script\AbstractScript>.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
169
        }
170
171
        return $this;
172
    }
173
174
    /**
175
     * @param array|\Elastica\Document|\Elastica\Script\AbstractScript $data
176
     *
177
     * @return $this
178
     */
179
    public function addData($data, ?string $opType = null)
180
    {
181
        if (!\is_array($data)) {
182
            $data = [$data];
183
        }
184
185
        foreach ($data as $actionData) {
186
            if ($actionData instanceof AbstractScript) {
187
                $this->addScript($actionData, $opType);
188
            } elseif ($actionData instanceof Document) {
189
                $this->addDocument($actionData, $opType);
190
            } else {
191
                throw new \InvalidArgumentException('Data should be a Document, a Script or an array containing Documents and/or Scripts');
192
            }
193
        }
194
195
        return $this;
196
    }
197
198
    /**
199
     * @throws InvalidException
200
     *
201
     * @return $this
202
     */
203
    public function addRawData(array $data): self
204
    {
205
        foreach ($data as $row) {
206
            if (\is_array($row)) {
207
                $opType = \key($row);
208
                $metadata = \reset($row);
209
                if (Action::isValidOpType($opType)) {
210
                    // add previous action
211
                    if (isset($action)) {
212
                        $this->addAction($action);
213
                    }
214
                    $action = new Action($opType, $metadata);
215
                } elseif (isset($action)) {
216
                    $action->setSource($row);
217
                    $this->addAction($action);
218
                    $action = null;
219
                } else {
220
                    throw new InvalidException('Invalid bulk data, source must follow action metadata');
221
                }
222
            } else {
223
                throw new InvalidException('Invalid bulk data, should be array of array, Document or Bulk/Action');
224
            }
225
        }
226
227
        // add last action if available
228
        if (isset($action)) {
229
            $this->addAction($action);
230
        }
231
232
        return $this;
233
    }
234
235
    /**
236
     * Set a url parameter on the request bulk request.
237
     *
238
     * @param string $name  name of the parameter
239
     * @param mixed  $value value of the parameter
240
     *
241
     * @return $this
242
     */
243
    public function setRequestParam(string $name, $value): self
244
    {
245
        $this->_requestParams[$name] = $value;
246
247
        return $this;
248
    }
249
250
    /**
251
     * Set the amount of time that the request will wait the shards to come on line.
252
     * Requires Elasticsearch version >= 0.90.8.
253
     *
254
     * @param string $time timeout in Elasticsearch time format
255
     *
256
     * @return $this
257
     */
258
    public function setShardTimeout(string $time): self
259
    {
260
        return $this->setRequestParam('timeout', $time);
261
    }
262
263
    public function toString(): string
264
    {
265
        $data = '';
266
        foreach ($this->getActions() as $action) {
267
            $data .= $action->toString();
268
        }
269
270
        return $data;
271
    }
272
273
    public function toArray(): array
274
    {
275
        $data = [];
276
        foreach ($this->getActions() as $action) {
277
            foreach ($action->toArray() as $row) {
278
                $data[] = $row;
279
            }
280
        }
281
282
        return $data;
283
    }
284
285
    public function send(): ResponseSet
286
    {
287
        $path = $this->getPath();
288
        $data = $this->toString();
289
290
        $response = $this->_client->request($path, Request::POST, $data, $this->_requestParams, Request::NDJSON_CONTENT_TYPE);
291
292
        return $this->_processResponse($response);
293
    }
294
295
    /**
296
     * @throws ResponseException
297
     * @throws InvalidException
298
     */
299
    protected function _processResponse(Response $response): ResponseSet
300
    {
301
        $responseData = $response->getData();
302
303
        $actions = $this->getActions();
304
305
        $bulkResponses = [];
306
307
        if (isset($responseData['items']) && \is_array($responseData['items'])) {
308
            foreach ($responseData['items'] as $key => $item) {
309
                if (!isset($actions[$key])) {
310
                    throw new InvalidException('No response found for action #'.$key);
311
                }
312
313
                $action = $actions[$key];
314
315
                $opType = \key($item);
316
                $bulkResponseData = \reset($item);
317
318
                if ($action instanceof AbstractDocumentAction) {
319
                    $data = $action->getData();
320
                    if ($data instanceof Document && $data->isAutoPopulate()
321
                        || $this->_client->getConfigValue(['document', 'autoPopulate'], false)
322
                    ) {
323
                        if (!$data->hasId() && isset($bulkResponseData['_id'])) {
324
                            $data->setId($bulkResponseData['_id']);
325
                        }
326
                        if (isset($bulkResponseData['_version'])) {
327
                            $data->setVersion($bulkResponseData['_version']);
328
                        }
329
                        if (isset($bulkResponseData['_seq_no'])) {
330
                            $data->setSequenceNumber($bulkResponseData['_seq_no']);
331
                        }
332
                        if (isset($bulkResponseData['_primary_term'])) {
333
                            $data->setPrimaryTerm($bulkResponseData['_primary_term']);
334
                        }
335
                    }
336
                }
337
338
                $bulkResponses[] = new BulkResponse($bulkResponseData, $action, $opType);
339
            }
340
        }
341
342
        $bulkResponseSet = new ResponseSet($response, $bulkResponses);
343
344
        if ($bulkResponseSet->hasError()) {
345
            throw new BulkResponseException($bulkResponseSet);
346
        }
347
348
        return $bulkResponseSet;
349
    }
350
}
351