|
1
|
|
|
<?php |
|
2
|
|
|
|
|
3
|
|
|
namespace Elastica; |
|
4
|
|
|
|
|
5
|
|
|
use Elastica\Bulk\Action; |
|
6
|
|
|
use Elastica\Bulk\Action\AbstractDocument as AbstractDocumentAction; |
|
7
|
|
|
use Elastica\Bulk\Response as BulkResponse; |
|
8
|
|
|
use Elastica\Bulk\ResponseSet; |
|
9
|
|
|
use Elastica\Exception\Bulk\ResponseException; |
|
10
|
|
|
use Elastica\Exception\Bulk\ResponseException as BulkResponseException; |
|
11
|
|
|
use Elastica\Exception\InvalidException; |
|
12
|
|
|
use Elastica\Script\AbstractScript; |
|
13
|
|
|
|
|
14
|
|
|
class Bulk |
|
15
|
|
|
{ |
|
16
|
|
|
const DELIMITER = "\n"; |
|
17
|
|
|
|
|
18
|
|
|
/** |
|
19
|
|
|
* @var Client |
|
20
|
|
|
*/ |
|
21
|
|
|
protected $_client; |
|
22
|
|
|
|
|
23
|
|
|
/** |
|
24
|
|
|
* @var Action[] |
|
25
|
|
|
*/ |
|
26
|
|
|
protected $_actions = []; |
|
27
|
|
|
|
|
28
|
|
|
/** |
|
29
|
|
|
* @var string|null |
|
30
|
|
|
*/ |
|
31
|
|
|
protected $_index; |
|
32
|
|
|
|
|
33
|
|
|
/** |
|
34
|
|
|
* @var string|null |
|
35
|
|
|
*/ |
|
36
|
|
|
protected $_type; |
|
37
|
|
|
|
|
38
|
|
|
/** |
|
39
|
|
|
* @var array request parameters to the bulk api |
|
40
|
|
|
*/ |
|
41
|
|
|
protected $_requestParams = []; |
|
42
|
|
|
|
|
43
|
|
|
/** |
|
44
|
|
|
* @param Client $client |
|
45
|
|
|
*/ |
|
46
|
|
|
public function __construct(Client $client) |
|
47
|
|
|
{ |
|
48
|
|
|
$this->_client = $client; |
|
49
|
|
|
} |
|
50
|
|
|
|
|
51
|
|
|
/** |
|
52
|
|
|
* @param string|Index $index |
|
53
|
|
|
* |
|
54
|
|
|
* @return $this |
|
55
|
|
|
*/ |
|
56
|
|
View Code Duplication |
public function setIndex($index): self |
|
|
|
|
|
|
57
|
|
|
{ |
|
58
|
|
|
if ($index instanceof Index) { |
|
59
|
|
|
$index = $index->getName(); |
|
60
|
|
|
} |
|
61
|
|
|
|
|
62
|
|
|
$this->_index = (string) $index; |
|
63
|
|
|
|
|
64
|
|
|
return $this; |
|
65
|
|
|
} |
|
66
|
|
|
|
|
67
|
|
|
/** |
|
68
|
|
|
* @return string|null |
|
69
|
|
|
*/ |
|
70
|
|
|
public function getIndex() |
|
71
|
|
|
{ |
|
72
|
|
|
return $this->_index; |
|
73
|
|
|
} |
|
74
|
|
|
|
|
75
|
|
|
/** |
|
76
|
|
|
* @return bool |
|
77
|
|
|
*/ |
|
78
|
|
|
public function hasIndex(): bool |
|
79
|
|
|
{ |
|
80
|
|
|
return null !== $this->getIndex() && '' !== $this->getIndex(); |
|
81
|
|
|
} |
|
82
|
|
|
|
|
83
|
|
|
/** |
|
84
|
|
|
* @return string |
|
85
|
|
|
*/ |
|
86
|
|
|
public function getPath(): string |
|
87
|
|
|
{ |
|
88
|
|
|
$path = ''; |
|
89
|
|
|
if ($this->hasIndex()) { |
|
90
|
|
|
$path .= $this->getIndex().'/'; |
|
91
|
|
|
} |
|
92
|
|
|
$path .= '_bulk'; |
|
93
|
|
|
|
|
94
|
|
|
return $path; |
|
95
|
|
|
} |
|
96
|
|
|
|
|
97
|
|
|
/** |
|
98
|
|
|
* @param Action $action |
|
99
|
|
|
* |
|
100
|
|
|
* @return $this |
|
101
|
|
|
*/ |
|
102
|
|
|
public function addAction(Action $action): self |
|
103
|
|
|
{ |
|
104
|
|
|
$this->_actions[] = $action; |
|
105
|
|
|
|
|
106
|
|
|
return $this; |
|
107
|
|
|
} |
|
108
|
|
|
|
|
109
|
|
|
/** |
|
110
|
|
|
* @param Action[] $actions |
|
111
|
|
|
* |
|
112
|
|
|
* @return $this |
|
113
|
|
|
*/ |
|
114
|
|
|
public function addActions(array $actions): self |
|
115
|
|
|
{ |
|
116
|
|
|
foreach ($actions as $action) { |
|
117
|
|
|
$this->addAction($action); |
|
118
|
|
|
} |
|
119
|
|
|
|
|
120
|
|
|
return $this; |
|
121
|
|
|
} |
|
122
|
|
|
|
|
123
|
|
|
/** |
|
124
|
|
|
* @return Action[] |
|
125
|
|
|
*/ |
|
126
|
|
|
public function getActions(): array |
|
127
|
|
|
{ |
|
128
|
|
|
return $this->_actions; |
|
129
|
|
|
} |
|
130
|
|
|
|
|
131
|
|
|
/** |
|
132
|
|
|
* @param Document $document |
|
133
|
|
|
* @param string $opType |
|
134
|
|
|
* |
|
135
|
|
|
* @return $this |
|
136
|
|
|
*/ |
|
137
|
|
|
public function addDocument(Document $document, string $opType = null): self |
|
138
|
|
|
{ |
|
139
|
|
|
$action = AbstractDocumentAction::create($document, $opType); |
|
140
|
|
|
|
|
141
|
|
|
return $this->addAction($action); |
|
142
|
|
|
} |
|
143
|
|
|
|
|
144
|
|
|
/** |
|
145
|
|
|
* @param Document[] $documents |
|
146
|
|
|
* @param string $opType |
|
147
|
|
|
* |
|
148
|
|
|
* @return $this |
|
149
|
|
|
*/ |
|
150
|
|
|
public function addDocuments(array $documents, string $opType = null): self |
|
151
|
|
|
{ |
|
152
|
|
|
foreach ($documents as $document) { |
|
153
|
|
|
$this->addDocument($document, $opType); |
|
154
|
|
|
} |
|
155
|
|
|
|
|
156
|
|
|
return $this; |
|
157
|
|
|
} |
|
158
|
|
|
|
|
159
|
|
|
/** |
|
160
|
|
|
* @param AbstractScript $script |
|
161
|
|
|
* @param string $opType |
|
162
|
|
|
* |
|
163
|
|
|
* @return $this |
|
164
|
|
|
*/ |
|
165
|
|
|
public function addScript(AbstractScript $script, string $opType = null): self |
|
166
|
|
|
{ |
|
167
|
|
|
$action = AbstractDocumentAction::create($script, $opType); |
|
168
|
|
|
|
|
169
|
|
|
return $this->addAction($action); |
|
170
|
|
|
} |
|
171
|
|
|
|
|
172
|
|
|
/** |
|
173
|
|
|
* @param Document[] $scripts |
|
174
|
|
|
* @param string $opType |
|
175
|
|
|
* |
|
176
|
|
|
* @return $this |
|
177
|
|
|
*/ |
|
178
|
|
|
public function addScripts(array $scripts, $opType = null): self |
|
179
|
|
|
{ |
|
180
|
|
|
foreach ($scripts as $document) { |
|
181
|
|
|
$this->addScript($document, $opType); |
|
|
|
|
|
|
182
|
|
|
} |
|
183
|
|
|
|
|
184
|
|
|
return $this; |
|
185
|
|
|
} |
|
186
|
|
|
|
|
187
|
|
|
/** |
|
188
|
|
|
* @param \Elastica\Script\AbstractScript|\Elastica\Document|array $data |
|
189
|
|
|
* @param string $opType |
|
190
|
|
|
* |
|
191
|
|
|
* @return $this |
|
192
|
|
|
*/ |
|
193
|
|
|
public function addData($data, string $opType = null) |
|
194
|
|
|
{ |
|
195
|
|
|
if (!\is_array($data)) { |
|
196
|
|
|
$data = [$data]; |
|
197
|
|
|
} |
|
198
|
|
|
|
|
199
|
|
|
foreach ($data as $actionData) { |
|
200
|
|
|
if ($actionData instanceof AbstractScript) { |
|
201
|
|
|
$this->addScript($actionData, $opType); |
|
202
|
|
|
} elseif ($actionData instanceof Document) { |
|
203
|
|
|
$this->addDocument($actionData, $opType); |
|
204
|
|
|
} else { |
|
205
|
|
|
throw new \InvalidArgumentException('Data should be a Document, a Script or an array containing Documents and/or Scripts'); |
|
206
|
|
|
} |
|
207
|
|
|
} |
|
208
|
|
|
|
|
209
|
|
|
return $this; |
|
210
|
|
|
} |
|
211
|
|
|
|
|
212
|
|
|
/** |
|
213
|
|
|
* @param array $data |
|
214
|
|
|
* |
|
215
|
|
|
* @throws InvalidException |
|
216
|
|
|
* |
|
217
|
|
|
* @return $this |
|
218
|
|
|
*/ |
|
219
|
|
|
public function addRawData(array $data): self |
|
220
|
|
|
{ |
|
221
|
|
|
foreach ($data as $row) { |
|
222
|
|
|
if (\is_array($row)) { |
|
223
|
|
|
$opType = \key($row); |
|
224
|
|
|
$metadata = \reset($row); |
|
225
|
|
|
if (Action::isValidOpType($opType)) { |
|
226
|
|
|
// add previous action |
|
227
|
|
|
if (isset($action)) { |
|
228
|
|
|
$this->addAction($action); |
|
229
|
|
|
} |
|
230
|
|
|
$action = new Action($opType, $metadata); |
|
231
|
|
|
} elseif (isset($action)) { |
|
232
|
|
|
$action->setSource($row); |
|
233
|
|
|
$this->addAction($action); |
|
234
|
|
|
$action = null; |
|
235
|
|
|
} else { |
|
236
|
|
|
throw new InvalidException('Invalid bulk data, source must follow action metadata'); |
|
237
|
|
|
} |
|
238
|
|
|
} else { |
|
239
|
|
|
throw new InvalidException('Invalid bulk data, should be array of array, Document or Bulk/Action'); |
|
240
|
|
|
} |
|
241
|
|
|
} |
|
242
|
|
|
|
|
243
|
|
|
// add last action if available |
|
244
|
|
|
if (isset($action)) { |
|
245
|
|
|
$this->addAction($action); |
|
246
|
|
|
} |
|
247
|
|
|
|
|
248
|
|
|
return $this; |
|
249
|
|
|
} |
|
250
|
|
|
|
|
251
|
|
|
/** |
|
252
|
|
|
* Set a url parameter on the request bulk request. |
|
253
|
|
|
* |
|
254
|
|
|
* @param string $name name of the parameter |
|
255
|
|
|
* @param mixed $value value of the parameter |
|
256
|
|
|
* |
|
257
|
|
|
* @return $this |
|
258
|
|
|
*/ |
|
259
|
|
|
public function setRequestParam(string $name, $value): self |
|
260
|
|
|
{ |
|
261
|
|
|
$this->_requestParams[$name] = $value; |
|
262
|
|
|
|
|
263
|
|
|
return $this; |
|
264
|
|
|
} |
|
265
|
|
|
|
|
266
|
|
|
/** |
|
267
|
|
|
* Set the amount of time that the request will wait the shards to come on line. |
|
268
|
|
|
* Requires Elasticsearch version >= 0.90.8. |
|
269
|
|
|
* |
|
270
|
|
|
* @param string $time timeout in Elasticsearch time format |
|
271
|
|
|
* |
|
272
|
|
|
* @return $this |
|
273
|
|
|
*/ |
|
274
|
|
|
public function setShardTimeout(string $time): self |
|
275
|
|
|
{ |
|
276
|
|
|
return $this->setRequestParam('timeout', $time); |
|
277
|
|
|
} |
|
278
|
|
|
|
|
279
|
|
|
/** |
|
280
|
|
|
* @return string |
|
281
|
|
|
*/ |
|
282
|
|
|
public function __toString(): string |
|
283
|
|
|
{ |
|
284
|
|
|
return $this->toString(); |
|
285
|
|
|
} |
|
286
|
|
|
|
|
287
|
|
|
/** |
|
288
|
|
|
* @return string |
|
289
|
|
|
*/ |
|
290
|
|
|
public function toString(): string |
|
291
|
|
|
{ |
|
292
|
|
|
$data = ''; |
|
293
|
|
|
foreach ($this->getActions() as $action) { |
|
294
|
|
|
$data .= $action->toString(); |
|
295
|
|
|
} |
|
296
|
|
|
|
|
297
|
|
|
return $data; |
|
298
|
|
|
} |
|
299
|
|
|
|
|
300
|
|
|
/** |
|
301
|
|
|
* @return array |
|
302
|
|
|
*/ |
|
303
|
|
|
public function toArray(): array |
|
304
|
|
|
{ |
|
305
|
|
|
$data = []; |
|
306
|
|
|
foreach ($this->getActions() as $action) { |
|
307
|
|
|
foreach ($action->toArray() as $row) { |
|
308
|
|
|
$data[] = $row; |
|
309
|
|
|
} |
|
310
|
|
|
} |
|
311
|
|
|
|
|
312
|
|
|
return $data; |
|
313
|
|
|
} |
|
314
|
|
|
|
|
315
|
|
|
/** |
|
316
|
|
|
* @return ResponseSet |
|
317
|
|
|
*/ |
|
318
|
|
|
public function send(): ResponseSet |
|
319
|
|
|
{ |
|
320
|
|
|
$path = $this->getPath(); |
|
321
|
|
|
$data = $this->toString(); |
|
322
|
|
|
|
|
323
|
|
|
$response = $this->_client->request($path, Request::POST, $data, $this->_requestParams, Request::NDJSON_CONTENT_TYPE); |
|
324
|
|
|
|
|
325
|
|
|
return $this->_processResponse($response); |
|
326
|
|
|
} |
|
327
|
|
|
|
|
328
|
|
|
/** |
|
329
|
|
|
* @param Response $response |
|
330
|
|
|
* |
|
331
|
|
|
* @throws ResponseException |
|
332
|
|
|
* @throws InvalidException |
|
333
|
|
|
* |
|
334
|
|
|
* @return ResponseSet |
|
335
|
|
|
*/ |
|
336
|
|
|
protected function _processResponse(Response $response): ResponseSet |
|
337
|
|
|
{ |
|
338
|
|
|
$responseData = $response->getData(); |
|
339
|
|
|
|
|
340
|
|
|
$actions = $this->getActions(); |
|
341
|
|
|
|
|
342
|
|
|
$bulkResponses = []; |
|
343
|
|
|
|
|
344
|
|
|
if (isset($responseData['items']) && \is_array($responseData['items'])) { |
|
345
|
|
|
foreach ($responseData['items'] as $key => $item) { |
|
346
|
|
|
if (!isset($actions[$key])) { |
|
347
|
|
|
throw new InvalidException('No response found for action #'.$key); |
|
348
|
|
|
} |
|
349
|
|
|
|
|
350
|
|
|
$action = $actions[$key]; |
|
351
|
|
|
|
|
352
|
|
|
$opType = \key($item); |
|
353
|
|
|
$bulkResponseData = \reset($item); |
|
354
|
|
|
|
|
355
|
|
|
if ($action instanceof AbstractDocumentAction) { |
|
356
|
|
|
$data = $action->getData(); |
|
357
|
|
View Code Duplication |
if ($data instanceof Document && $data->isAutoPopulate() |
|
|
|
|
|
|
358
|
|
|
|| $this->_client->getConfigValue(['document', 'autoPopulate'], false) |
|
359
|
|
|
) { |
|
360
|
|
|
if (!$data->hasId() && isset($bulkResponseData['_id'])) { |
|
361
|
|
|
$data->setId($bulkResponseData['_id']); |
|
362
|
|
|
} |
|
363
|
|
|
if (isset($bulkResponseData['_version'])) { |
|
364
|
|
|
$data->setVersion($bulkResponseData['_version']); |
|
365
|
|
|
} |
|
366
|
|
|
} |
|
367
|
|
|
} |
|
368
|
|
|
|
|
369
|
|
|
$bulkResponses[] = new BulkResponse($bulkResponseData, $action, $opType); |
|
370
|
|
|
} |
|
371
|
|
|
} |
|
372
|
|
|
|
|
373
|
|
|
$bulkResponseSet = new ResponseSet($response, $bulkResponses); |
|
374
|
|
|
|
|
375
|
|
|
if ($bulkResponseSet->hasError()) { |
|
376
|
|
|
throw new BulkResponseException($bulkResponseSet); |
|
377
|
|
|
} |
|
378
|
|
|
|
|
379
|
|
|
return $bulkResponseSet; |
|
380
|
|
|
} |
|
381
|
|
|
} |
|
382
|
|
|
|
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.
You can also find more detailed suggestions in the “Code” section of your repository.