These results are based on our legacy PHP analysis, consider migrating to our new PHP analysis engine instead. Learn more
1 | <?php |
||
2 | |||
3 | namespace Elastica; |
||
4 | |||
5 | use Elastica\Bulk\Action; |
||
6 | use Elastica\Bulk\Action\AbstractDocument as AbstractDocumentAction; |
||
7 | use Elastica\Bulk\Response as BulkResponse; |
||
8 | use Elastica\Bulk\ResponseSet; |
||
9 | use Elastica\Exception\Bulk\ResponseException as BulkResponseException; |
||
10 | use Elastica\Exception\InvalidException; |
||
11 | use Elastica\Script\AbstractScript; |
||
12 | |||
13 | class Bulk |
||
14 | { |
||
15 | const DELIMITER = "\n"; |
||
16 | |||
17 | /** |
||
18 | * @var \Elastica\Client |
||
19 | */ |
||
20 | protected $_client; |
||
21 | |||
22 | /** |
||
23 | * @var \Elastica\Bulk\Action[] |
||
24 | */ |
||
25 | protected $_actions = []; |
||
26 | |||
27 | /** |
||
28 | * @var string|null |
||
29 | */ |
||
30 | protected $_index; |
||
31 | |||
32 | /** |
||
33 | * @var string|null |
||
34 | */ |
||
35 | protected $_type; |
||
36 | |||
37 | /** |
||
38 | * @var array request parameters to the bulk api |
||
39 | */ |
||
40 | protected $_requestParams = []; |
||
41 | |||
42 | /** |
||
43 | * @param \Elastica\Client $client |
||
44 | */ |
||
45 | public function __construct(Client $client) |
||
46 | { |
||
47 | $this->_client = $client; |
||
48 | } |
||
49 | |||
50 | /** |
||
51 | * @param string|\Elastica\Index $index |
||
52 | * |
||
53 | * @return $this |
||
54 | */ |
||
55 | public function setIndex($index) |
||
56 | { |
||
57 | if ($index instanceof Index) { |
||
58 | $index = $index->getName(); |
||
59 | } |
||
60 | |||
61 | $this->_index = (string) $index; |
||
62 | |||
63 | return $this; |
||
64 | } |
||
65 | |||
66 | /** |
||
67 | * @return string|null |
||
68 | */ |
||
69 | public function getIndex() |
||
70 | { |
||
71 | return $this->_index; |
||
72 | } |
||
73 | |||
74 | /** |
||
75 | * @return bool |
||
76 | */ |
||
77 | public function hasIndex() |
||
78 | { |
||
79 | return null !== $this->getIndex() && '' !== $this->getIndex(); |
||
80 | } |
||
81 | |||
82 | /** |
||
83 | * @param string|\Elastica\Type $type |
||
84 | * |
||
85 | * @return $this |
||
86 | */ |
||
87 | View Code Duplication | public function setType($type) |
|
0 ignored issues
–
show
|
|||
88 | { |
||
89 | if ($type instanceof Type) { |
||
90 | $this->setIndex($type->getIndex()->getName()); |
||
91 | $type = $type->getName(); |
||
92 | } |
||
93 | |||
94 | $this->_type = (string) $type; |
||
95 | |||
96 | return $this; |
||
97 | } |
||
98 | |||
99 | /** |
||
100 | * @return string|null |
||
101 | */ |
||
102 | public function getType() |
||
103 | { |
||
104 | return $this->_type; |
||
105 | } |
||
106 | |||
107 | /** |
||
108 | * @return bool |
||
109 | */ |
||
110 | public function hasType() |
||
111 | { |
||
112 | return null !== $this->getType() && '' !== $this->getType(); |
||
113 | } |
||
114 | |||
115 | /** |
||
116 | * @return string |
||
117 | */ |
||
118 | public function getPath() |
||
119 | { |
||
120 | $path = ''; |
||
121 | if ($this->hasIndex()) { |
||
122 | $path .= $this->getIndex().'/'; |
||
123 | if ($this->hasType()) { |
||
124 | $path .= $this->getType().'/'; |
||
125 | } |
||
126 | } |
||
127 | $path .= '_bulk'; |
||
128 | |||
129 | return $path; |
||
130 | } |
||
131 | |||
132 | /** |
||
133 | * @param \Elastica\Bulk\Action $action |
||
134 | * |
||
135 | * @return $this |
||
136 | */ |
||
137 | public function addAction(Action $action) |
||
138 | { |
||
139 | $this->_actions[] = $action; |
||
140 | |||
141 | return $this; |
||
142 | } |
||
143 | |||
144 | /** |
||
145 | * @param \Elastica\Bulk\Action[] $actions |
||
146 | * |
||
147 | * @return $this |
||
148 | */ |
||
149 | public function addActions(array $actions) |
||
150 | { |
||
151 | foreach ($actions as $action) { |
||
152 | $this->addAction($action); |
||
153 | } |
||
154 | |||
155 | return $this; |
||
156 | } |
||
157 | |||
158 | /** |
||
159 | * @return \Elastica\Bulk\Action[] |
||
160 | */ |
||
161 | public function getActions() |
||
162 | { |
||
163 | return $this->_actions; |
||
164 | } |
||
165 | |||
166 | /** |
||
167 | * @param \Elastica\Document $document |
||
168 | * @param string $opType |
||
169 | * |
||
170 | * @return $this |
||
171 | */ |
||
172 | public function addDocument(Document $document, $opType = null) |
||
173 | { |
||
174 | $action = AbstractDocumentAction::create($document, $opType); |
||
175 | |||
176 | return $this->addAction($action); |
||
177 | } |
||
178 | |||
179 | /** |
||
180 | * @param \Elastica\Document[] $documents |
||
181 | * @param string $opType |
||
182 | * |
||
183 | * @return $this |
||
184 | */ |
||
185 | public function addDocuments(array $documents, $opType = null) |
||
186 | { |
||
187 | foreach ($documents as $document) { |
||
188 | $this->addDocument($document, $opType); |
||
189 | } |
||
190 | |||
191 | return $this; |
||
192 | } |
||
193 | |||
194 | /** |
||
195 | * @param \Elastica\Script\AbstractScript $script |
||
196 | * @param string $opType |
||
197 | * |
||
198 | * @return $this |
||
199 | */ |
||
200 | public function addScript(AbstractScript $script, $opType = null) |
||
201 | { |
||
202 | $action = AbstractDocumentAction::create($script, $opType); |
||
203 | |||
204 | return $this->addAction($action); |
||
205 | } |
||
206 | |||
207 | /** |
||
208 | * @param \Elastica\Document[] $scripts |
||
209 | * @param string $opType |
||
210 | * |
||
211 | * @return $this |
||
212 | */ |
||
213 | public function addScripts(array $scripts, $opType = null) |
||
214 | { |
||
215 | foreach ($scripts as $document) { |
||
216 | $this->addScript($document, $opType); |
||
217 | } |
||
218 | |||
219 | return $this; |
||
220 | } |
||
221 | |||
222 | /** |
||
223 | * @param \Elastica\Script\AbstractScript|\Elastica\Document|array $data |
||
224 | * @param string $opType |
||
225 | * |
||
226 | * @return $this |
||
227 | */ |
||
228 | public function addData($data, $opType = null) |
||
229 | { |
||
230 | if (!is_array($data)) { |
||
231 | $data = [$data]; |
||
232 | } |
||
233 | |||
234 | foreach ($data as $actionData) { |
||
235 | if ($actionData instanceof AbstractScript) { |
||
236 | $this->addScript($actionData, $opType); |
||
237 | } elseif ($actionData instanceof Document) { |
||
238 | $this->addDocument($actionData, $opType); |
||
239 | } else { |
||
240 | throw new \InvalidArgumentException('Data should be a Document, a Script or an array containing Documents and/or Scripts'); |
||
241 | } |
||
242 | } |
||
243 | |||
244 | return $this; |
||
245 | } |
||
246 | |||
247 | /** |
||
248 | * @param array $data |
||
249 | * |
||
250 | * @throws \Elastica\Exception\InvalidException |
||
251 | * |
||
252 | * @return $this |
||
253 | */ |
||
254 | public function addRawData(array $data) |
||
255 | { |
||
256 | foreach ($data as $row) { |
||
257 | if (is_array($row)) { |
||
258 | $opType = key($row); |
||
259 | $metadata = reset($row); |
||
260 | if (Action::isValidOpType($opType)) { |
||
261 | // add previous action |
||
262 | if (isset($action)) { |
||
263 | $this->addAction($action); |
||
264 | } |
||
265 | $action = new Action($opType, $metadata); |
||
266 | } elseif (isset($action)) { |
||
267 | $action->setSource($row); |
||
268 | $this->addAction($action); |
||
269 | $action = null; |
||
270 | } else { |
||
271 | throw new InvalidException('Invalid bulk data, source must follow action metadata'); |
||
272 | } |
||
273 | } else { |
||
274 | throw new InvalidException('Invalid bulk data, should be array of array, Document or Bulk/Action'); |
||
275 | } |
||
276 | } |
||
277 | |||
278 | // add last action if available |
||
279 | if (isset($action)) { |
||
280 | $this->addAction($action); |
||
281 | } |
||
282 | |||
283 | return $this; |
||
284 | } |
||
285 | |||
286 | /** |
||
287 | * Set a url parameter on the request bulk request. |
||
288 | * |
||
289 | * @param string $name name of the parameter |
||
290 | * @param string $value value of the parameter |
||
291 | * |
||
292 | * @return $this |
||
293 | */ |
||
294 | public function setRequestParam($name, $value) |
||
295 | { |
||
296 | $this->_requestParams[$name] = $value; |
||
297 | |||
298 | return $this; |
||
299 | } |
||
300 | |||
301 | /** |
||
302 | * Set the amount of time that the request will wait the shards to come on line. |
||
303 | * Requires Elasticsearch version >= 0.90.8. |
||
304 | * |
||
305 | * @param string $time timeout in Elasticsearch time format |
||
306 | * |
||
307 | * @return $this |
||
308 | */ |
||
309 | public function setShardTimeout($time) |
||
310 | { |
||
311 | return $this->setRequestParam('timeout', $time); |
||
312 | } |
||
313 | |||
314 | /** |
||
315 | * @return string |
||
316 | */ |
||
317 | public function __toString() |
||
318 | { |
||
319 | return $this->toString(); |
||
320 | } |
||
321 | |||
322 | /** |
||
323 | * @return string |
||
324 | */ |
||
325 | public function toString() |
||
326 | { |
||
327 | $data = ''; |
||
328 | foreach ($this->getActions() as $action) { |
||
329 | $data .= $action->toString(); |
||
330 | } |
||
331 | |||
332 | return $data; |
||
333 | } |
||
334 | |||
335 | /** |
||
336 | * @return array |
||
337 | */ |
||
338 | public function toArray() |
||
339 | { |
||
340 | $data = []; |
||
341 | foreach ($this->getActions() as $action) { |
||
342 | foreach ($action->toArray() as $row) { |
||
343 | $data[] = $row; |
||
344 | } |
||
345 | } |
||
346 | |||
347 | return $data; |
||
348 | } |
||
349 | |||
350 | /** |
||
351 | * @return \Elastica\Bulk\ResponseSet |
||
352 | */ |
||
353 | public function send() |
||
354 | { |
||
355 | $path = $this->getPath(); |
||
356 | $data = $this->toString(); |
||
357 | |||
358 | $response = $this->_client->request($path, Request::POST, $data, $this->_requestParams, Request::NDJSON_CONTENT_TYPE); |
||
359 | |||
360 | return $this->_processResponse($response); |
||
361 | } |
||
362 | |||
363 | /** |
||
364 | * @param \Elastica\Response $response |
||
365 | * |
||
366 | * @throws \Elastica\Exception\Bulk\ResponseException |
||
367 | * @throws \Elastica\Exception\InvalidException |
||
368 | * |
||
369 | * @return \Elastica\Bulk\ResponseSet |
||
370 | */ |
||
371 | protected function _processResponse(Response $response) |
||
372 | { |
||
373 | $responseData = $response->getData(); |
||
374 | |||
375 | $actions = $this->getActions(); |
||
376 | |||
377 | $bulkResponses = []; |
||
378 | |||
379 | if (isset($responseData['items']) && is_array($responseData['items'])) { |
||
380 | foreach ($responseData['items'] as $key => $item) { |
||
381 | if (!isset($actions[$key])) { |
||
382 | throw new InvalidException('No response found for action #'.$key); |
||
383 | } |
||
384 | |||
385 | $action = $actions[$key]; |
||
386 | |||
387 | $opType = key($item); |
||
388 | $bulkResponseData = reset($item); |
||
389 | |||
390 | View Code Duplication | if ($action instanceof AbstractDocumentAction) { |
|
0 ignored issues
–
show
This code seems to be duplicated across your project.
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation. You can also find more detailed suggestions in the “Code” section of your repository.
Loading history...
|
|||
391 | $data = $action->getData(); |
||
392 | if ($data instanceof Document && $data->isAutoPopulate() |
||
393 | || $this->_client->getConfigValue(['document', 'autoPopulate'], false) |
||
394 | ) { |
||
395 | if (!$data->hasId() && isset($bulkResponseData['_id'])) { |
||
396 | $data->setId($bulkResponseData['_id']); |
||
397 | } |
||
398 | if (isset($bulkResponseData['_version'])) { |
||
399 | $data->setVersion($bulkResponseData['_version']); |
||
400 | } |
||
401 | } |
||
402 | } |
||
403 | |||
404 | $bulkResponses[] = new BulkResponse($bulkResponseData, $action, $opType); |
||
405 | } |
||
406 | } |
||
407 | |||
408 | $bulkResponseSet = new ResponseSet($response, $bulkResponses); |
||
409 | |||
410 | if ($bulkResponseSet->hasError()) { |
||
411 | throw new BulkResponseException($bulkResponseSet); |
||
412 | } |
||
413 | |||
414 | return $bulkResponseSet; |
||
415 | } |
||
416 | } |
||
417 |
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.
You can also find more detailed suggestions in the “Code” section of your repository.