Completed
Push — master ( d3ac62...0cb203 )
by Federico
02:08
created

lib/Elastica/Bulk.php (2 issues)

Upgrade to new PHP Analysis Engine

These results are based on our legacy PHP analysis, consider migrating to our new PHP analysis engine instead. Learn more

1
<?php
2
3
namespace Elastica;
4
5
use Elastica\Bulk\Action;
6
use Elastica\Bulk\Action\AbstractDocument as AbstractDocumentAction;
7
use Elastica\Bulk\Response as BulkResponse;
8
use Elastica\Bulk\ResponseSet;
9
use Elastica\Exception\Bulk\ResponseException as BulkResponseException;
10
use Elastica\Exception\InvalidException;
11
use Elastica\Script\AbstractScript;
12
13
class Bulk
14
{
15
    const DELIMITER = "\n";
16
17
    /**
18
     * @var \Elastica\Client
19
     */
20
    protected $_client;
21
22
    /**
23
     * @var \Elastica\Bulk\Action[]
24
     */
25
    protected $_actions = [];
26
27
    /**
28
     * @var string|null
29
     */
30
    protected $_index;
31
32
    /**
33
     * @var string|null
34
     */
35
    protected $_type;
36
37
    /**
38
     * @var array request parameters to the bulk api
39
     */
40
    protected $_requestParams = [];
41
42
    /**
43
     * @param \Elastica\Client $client
44
     */
45
    public function __construct(Client $client)
46
    {
47
        $this->_client = $client;
48
    }
49
50
    /**
51
     * @param string|\Elastica\Index $index
52
     *
53
     * @return $this
54
     */
55
    public function setIndex($index)
56
    {
57
        if ($index instanceof Index) {
58
            $index = $index->getName();
59
        }
60
61
        $this->_index = (string) $index;
62
63
        return $this;
64
    }
65
66
    /**
67
     * @return string|null
68
     */
69
    public function getIndex()
70
    {
71
        return $this->_index;
72
    }
73
74
    /**
75
     * @return bool
76
     */
77
    public function hasIndex()
78
    {
79
        return null !== $this->getIndex() && '' !== $this->getIndex();
80
    }
81
82
    /**
83
     * @param string|\Elastica\Type $type
84
     *
85
     * @return $this
86
     */
87 View Code Duplication
    public function setType($type)
0 ignored issues
show
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
88
    {
89
        if ($type instanceof Type) {
90
            $this->setIndex($type->getIndex()->getName());
91
            $type = $type->getName();
92
        }
93
94
        $this->_type = (string) $type;
95
96
        return $this;
97
    }
98
99
    /**
100
     * @return string|null
101
     */
102
    public function getType()
103
    {
104
        return $this->_type;
105
    }
106
107
    /**
108
     * @return bool
109
     */
110
    public function hasType()
111
    {
112
        return null !== $this->getType() && '' !== $this->getType();
113
    }
114
115
    /**
116
     * @return string
117
     */
118
    public function getPath()
119
    {
120
        $path = '';
121
        if ($this->hasIndex()) {
122
            $path .= $this->getIndex().'/';
123
            if ($this->hasType()) {
124
                $path .= $this->getType().'/';
125
            }
126
        }
127
        $path .= '_bulk';
128
129
        return $path;
130
    }
131
132
    /**
133
     * @param \Elastica\Bulk\Action $action
134
     *
135
     * @return $this
136
     */
137
    public function addAction(Action $action)
138
    {
139
        $this->_actions[] = $action;
140
141
        return $this;
142
    }
143
144
    /**
145
     * @param \Elastica\Bulk\Action[] $actions
146
     *
147
     * @return $this
148
     */
149
    public function addActions(array $actions)
150
    {
151
        foreach ($actions as $action) {
152
            $this->addAction($action);
153
        }
154
155
        return $this;
156
    }
157
158
    /**
159
     * @return \Elastica\Bulk\Action[]
160
     */
161
    public function getActions()
162
    {
163
        return $this->_actions;
164
    }
165
166
    /**
167
     * @param \Elastica\Document $document
168
     * @param string             $opType
169
     *
170
     * @return $this
171
     */
172
    public function addDocument(Document $document, $opType = null)
173
    {
174
        $action = AbstractDocumentAction::create($document, $opType);
175
176
        return $this->addAction($action);
177
    }
178
179
    /**
180
     * @param \Elastica\Document[] $documents
181
     * @param string               $opType
182
     *
183
     * @return $this
184
     */
185
    public function addDocuments(array $documents, $opType = null)
186
    {
187
        foreach ($documents as $document) {
188
            $this->addDocument($document, $opType);
189
        }
190
191
        return $this;
192
    }
193
194
    /**
195
     * @param \Elastica\Script\AbstractScript $script
196
     * @param string                          $opType
197
     *
198
     * @return $this
199
     */
200
    public function addScript(AbstractScript $script, $opType = null)
201
    {
202
        $action = AbstractDocumentAction::create($script, $opType);
203
204
        return $this->addAction($action);
205
    }
206
207
    /**
208
     * @param \Elastica\Document[] $scripts
209
     * @param string               $opType
210
     *
211
     * @return $this
212
     */
213
    public function addScripts(array $scripts, $opType = null)
214
    {
215
        foreach ($scripts as $document) {
216
            $this->addScript($document, $opType);
217
        }
218
219
        return $this;
220
    }
221
222
    /**
223
     * @param \Elastica\Script\AbstractScript|\Elastica\Document|array $data
224
     * @param string                                                   $opType
225
     *
226
     * @return $this
227
     */
228
    public function addData($data, $opType = null)
229
    {
230
        if (!is_array($data)) {
231
            $data = [$data];
232
        }
233
234
        foreach ($data as $actionData) {
235
            if ($actionData instanceof AbstractScript) {
236
                $this->addScript($actionData, $opType);
237
            } elseif ($actionData instanceof Document) {
238
                $this->addDocument($actionData, $opType);
239
            } else {
240
                throw new \InvalidArgumentException('Data should be a Document, a Script or an array containing Documents and/or Scripts');
241
            }
242
        }
243
244
        return $this;
245
    }
246
247
    /**
248
     * @param array $data
249
     *
250
     * @throws \Elastica\Exception\InvalidException
251
     *
252
     * @return $this
253
     */
254
    public function addRawData(array $data)
255
    {
256
        foreach ($data as $row) {
257
            if (is_array($row)) {
258
                $opType = key($row);
259
                $metadata = reset($row);
260
                if (Action::isValidOpType($opType)) {
261
                    // add previous action
262
                    if (isset($action)) {
263
                        $this->addAction($action);
264
                    }
265
                    $action = new Action($opType, $metadata);
266
                } elseif (isset($action)) {
267
                    $action->setSource($row);
268
                    $this->addAction($action);
269
                    $action = null;
270
                } else {
271
                    throw new InvalidException('Invalid bulk data, source must follow action metadata');
272
                }
273
            } else {
274
                throw new InvalidException('Invalid bulk data, should be array of array, Document or Bulk/Action');
275
            }
276
        }
277
278
        // add last action if available
279
        if (isset($action)) {
280
            $this->addAction($action);
281
        }
282
283
        return $this;
284
    }
285
286
    /**
287
     * Set a url parameter on the request bulk request.
288
     *
289
     * @param string $name  name of the parameter
290
     * @param string $value value of the parameter
291
     *
292
     * @return $this
293
     */
294
    public function setRequestParam($name, $value)
295
    {
296
        $this->_requestParams[$name] = $value;
297
298
        return $this;
299
    }
300
301
    /**
302
     * Set the amount of time that the request will wait the shards to come on line.
303
     * Requires Elasticsearch version >= 0.90.8.
304
     *
305
     * @param string $time timeout in Elasticsearch time format
306
     *
307
     * @return $this
308
     */
309
    public function setShardTimeout($time)
310
    {
311
        return $this->setRequestParam('timeout', $time);
312
    }
313
314
    /**
315
     * @return string
316
     */
317
    public function __toString()
318
    {
319
        return $this->toString();
320
    }
321
322
    /**
323
     * @return string
324
     */
325
    public function toString()
326
    {
327
        $data = '';
328
        foreach ($this->getActions() as $action) {
329
            $data .= $action->toString();
330
        }
331
332
        return $data;
333
    }
334
335
    /**
336
     * @return array
337
     */
338
    public function toArray()
339
    {
340
        $data = [];
341
        foreach ($this->getActions() as $action) {
342
            foreach ($action->toArray() as $row) {
343
                $data[] = $row;
344
            }
345
        }
346
347
        return $data;
348
    }
349
350
    /**
351
     * @return \Elastica\Bulk\ResponseSet
352
     */
353
    public function send()
354
    {
355
        $path = $this->getPath();
356
        $data = $this->toString();
357
358
        $response = $this->_client->request($path, Request::POST, $data, $this->_requestParams, Request::NDJSON_CONTENT_TYPE);
359
360
        return $this->_processResponse($response);
361
    }
362
363
    /**
364
     * @param \Elastica\Response $response
365
     *
366
     * @throws \Elastica\Exception\Bulk\ResponseException
367
     * @throws \Elastica\Exception\InvalidException
368
     *
369
     * @return \Elastica\Bulk\ResponseSet
370
     */
371
    protected function _processResponse(Response $response)
372
    {
373
        $responseData = $response->getData();
374
375
        $actions = $this->getActions();
376
377
        $bulkResponses = [];
378
379
        if (isset($responseData['items']) && is_array($responseData['items'])) {
380
            foreach ($responseData['items'] as $key => $item) {
381
                if (!isset($actions[$key])) {
382
                    throw new InvalidException('No response found for action #'.$key);
383
                }
384
385
                $action = $actions[$key];
386
387
                $opType = key($item);
388
                $bulkResponseData = reset($item);
389
390 View Code Duplication
                if ($action instanceof AbstractDocumentAction) {
0 ignored issues
show
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
391
                    $data = $action->getData();
392
                    if ($data instanceof Document && $data->isAutoPopulate()
393
                        || $this->_client->getConfigValue(['document', 'autoPopulate'], false)
394
                    ) {
395
                        if (!$data->hasId() && isset($bulkResponseData['_id'])) {
396
                            $data->setId($bulkResponseData['_id']);
397
                        }
398
                        if (isset($bulkResponseData['_version'])) {
399
                            $data->setVersion($bulkResponseData['_version']);
400
                        }
401
                    }
402
                }
403
404
                $bulkResponses[] = new BulkResponse($bulkResponseData, $action, $opType);
405
            }
406
        }
407
408
        $bulkResponseSet = new ResponseSet($response, $bulkResponses);
409
410
        if ($bulkResponseSet->hasError()) {
411
            throw new BulkResponseException($bulkResponseSet);
412
        }
413
414
        return $bulkResponseSet;
415
    }
416
}
417