This project does not seem to handle request data directly as such no vulnerable execution paths were found.
include
, or for example
via PHP's auto-loading mechanism.
These results are based on our legacy PHP analysis, consider migrating to our new PHP analysis engine instead. Learn more
1 | <?php |
||
2 | |||
3 | /* |
||
4 | * This file is part of the ONGR package. |
||
5 | * |
||
6 | * (c) NFQ Technologies UAB <[email protected]> |
||
7 | * |
||
8 | * For the full copyright and license information, please view the LICENSE |
||
9 | * file that was distributed with this source code. |
||
10 | */ |
||
11 | |||
12 | namespace ONGR\ElasticsearchBundle\Service; |
||
13 | |||
14 | use Elasticsearch\Client; |
||
15 | use Elasticsearch\ClientBuilder; |
||
16 | use ONGR\ElasticsearchBundle\Event\BulkEvent; |
||
17 | use ONGR\ElasticsearchBundle\Event\CommitEvent; |
||
18 | use ONGR\ElasticsearchBundle\Event\Events; |
||
19 | use ONGR\ElasticsearchBundle\Event\PostCreateClientEvent; |
||
20 | use ONGR\ElasticsearchBundle\Exception\BulkWithErrorsException; |
||
21 | use ONGR\ElasticsearchBundle\Mapping\Converter; |
||
22 | use ONGR\ElasticsearchBundle\Mapping\IndexSettings; |
||
23 | use ONGR\ElasticsearchBundle\Result\ArrayIterator; |
||
24 | use ONGR\ElasticsearchBundle\Result\RawIterator; |
||
25 | use ONGR\ElasticsearchDSL\Query\TermLevel\IdsQuery; |
||
26 | use ONGR\ElasticsearchDSL\Query\TermLevel\TermQuery; |
||
27 | use ONGR\ElasticsearchDSL\Search; |
||
28 | use ONGR\ElasticsearchDSL\Sort\FieldSort; |
||
29 | use ONGR\ElasticsearchBundle\Result\DocumentIterator; |
||
30 | use Symfony\Component\EventDispatcher\EventDispatcherInterface; |
||
31 | |||
32 | class IndexService |
||
33 | { |
||
34 | private $client; |
||
35 | private $namespace; |
||
36 | private $converter; |
||
37 | private $eventDispatcher; |
||
38 | |||
39 | private $stopwatch; |
||
40 | private $bulkCommitSize = 100; |
||
41 | private $bulkQueries = []; |
||
42 | private $indexSettings = []; |
||
43 | private $tracer; |
||
44 | |||
45 | public function __construct( |
||
46 | string $namespace, |
||
47 | Converter $converter, |
||
48 | EventDispatcherInterface $eventDispatcher, |
||
49 | IndexSettings $indexSettings, |
||
50 | $tracer = null |
||
51 | ) { |
||
52 | $this->namespace = $namespace; |
||
53 | $this->converter = $converter; |
||
54 | $this->eventDispatcher = $eventDispatcher; |
||
55 | $this->indexSettings = $indexSettings; |
||
56 | $this->tracer = $tracer; |
||
57 | $this->getClient(); |
||
58 | } |
||
59 | |||
60 | public function getNamespace(): string |
||
61 | { |
||
62 | return $this->namespace; |
||
63 | } |
||
64 | |||
65 | public function getIndexSettings() |
||
66 | { |
||
67 | return $this->indexSettings; |
||
68 | } |
||
69 | |||
70 | public function setIndexSettings($indexSettings): self |
||
71 | { |
||
72 | $this->indexSettings = $indexSettings; |
||
73 | return $this; |
||
74 | } |
||
75 | |||
76 | public function getClient(): Client |
||
77 | { |
||
78 | if (!$this->client) { |
||
79 | $client = ClientBuilder::create(); |
||
80 | $client->setHosts($this->indexSettings->getHosts()); |
||
81 | $this->tracer && $client->setTracer($this->tracer); |
||
82 | // $client->setLogger() |
||
83 | |||
84 | $this->eventDispatcher->dispatch( |
||
85 | new PostCreateClientEvent($this->namespace, $client), |
||
86 | Events::POST_CLIENT_CREATE |
||
87 | ); |
||
88 | $this->client = $client->build(); |
||
89 | } |
||
90 | return $this->client; |
||
91 | } |
||
92 | |||
93 | public function getIndexName(): string |
||
94 | { |
||
95 | return $this->indexSettings->getIndexName(); |
||
96 | } |
||
97 | |||
98 | public function getEventDispatcher(): EventDispatcherInterface |
||
99 | { |
||
100 | return $this->eventDispatcher; |
||
101 | } |
||
102 | |||
103 | public function getConverter(): Converter |
||
104 | { |
||
105 | return $this->converter; |
||
106 | } |
||
107 | |||
108 | public function getBulkCommitSize(): int |
||
109 | { |
||
110 | return $this->bulkCommitSize; |
||
111 | } |
||
112 | |||
113 | public function setBulkCommitSize(int $bulkCommitSize) |
||
114 | { |
||
115 | $this->bulkCommitSize = $bulkCommitSize; |
||
116 | return $this; |
||
117 | } |
||
118 | |||
119 | public function getStopwatch() |
||
120 | { |
||
121 | return $this->stopwatch; |
||
122 | } |
||
123 | |||
124 | public function setStopwatch($stopwatch) |
||
125 | { |
||
126 | $this->stopwatch = $stopwatch; |
||
127 | return $this; |
||
128 | } |
||
129 | |||
130 | public function createIndex($noMapping = false, $params = []): array |
||
131 | { |
||
132 | $params = array_merge([ |
||
133 | 'index' => $this->getIndexName(), |
||
134 | 'body' => $noMapping ? [] : $this->indexSettings->getIndexMetadata(), |
||
135 | ], $params); |
||
136 | |||
137 | #TODO Add event here. |
||
138 | |||
139 | return $this->getClient()->indices()->create(array_filter($params)); |
||
140 | } |
||
141 | |||
142 | public function dropIndex(): array |
||
143 | { |
||
144 | $indexName = $this->getIndexName(); |
||
145 | |||
146 | if ($this->getClient()->indices()->existsAlias(['name' => $this->getIndexName()])) { |
||
147 | $aliases = $this->getClient()->indices()->getAlias(['name' => $this->getIndexName()]); |
||
148 | $indexName = array_keys($aliases); |
||
149 | } |
||
150 | |||
151 | return $this->getClient()->indices()->delete(['index' => $indexName]); |
||
152 | } |
||
153 | |||
154 | public function dropAndCreateIndex($noMapping = false, $params = []): array |
||
155 | { |
||
156 | try { |
||
157 | if ($this->indexExists()) { |
||
158 | $this->dropIndex(); |
||
159 | } |
||
160 | } catch (\Exception $e) { |
||
161 | // Do nothing, our target is to create the new index. |
||
162 | } |
||
163 | |||
164 | return $this->createIndex($noMapping, $params); |
||
165 | } |
||
166 | |||
167 | public function indexExists(): bool |
||
168 | { |
||
169 | return $this->getClient()->indices()->exists(['index' => $this->getIndexName()]); |
||
170 | } |
||
171 | |||
172 | public function clearCache(): array |
||
173 | { |
||
174 | return $this->getClient()->indices()->clearCache(['index' => $this->getIndexName()]); |
||
175 | } |
||
176 | |||
177 | /** |
||
178 | * Returns a single document by provided ID or null if a document was not found. |
||
179 | */ |
||
180 | public function find($id, $params = []) |
||
181 | { |
||
182 | $requestParams = [ |
||
183 | 'index' => $this->getIndexName(), |
||
184 | 'id' => $id, |
||
185 | ]; |
||
186 | |||
187 | $requestParams = array_merge($requestParams, $params); |
||
188 | |||
189 | $result = $this->getClient()->get($requestParams); |
||
190 | |||
191 | if (!$result['found']) { |
||
192 | return null; |
||
193 | } |
||
194 | |||
195 | $result['_source']['_id'] = $result['_id']; |
||
196 | |||
197 | return $this->converter->convertArrayToDocument($this->namespace, $result['_source']); |
||
198 | } |
||
199 | |||
200 | public function findByIds(array $ids): DocumentIterator |
||
201 | { |
||
202 | $search = $this->createSearch(); |
||
203 | $search->addQuery(new IdsQuery($ids)); |
||
204 | |||
205 | return $this->findDocuments($search); |
||
206 | } |
||
207 | |||
208 | /** |
||
209 | * Finds documents by a set of criteria. |
||
210 | * |
||
211 | * @param array $criteria Example: ['group' => ['best', 'worst'], 'job' => 'medic']. |
||
212 | * @param array|null $orderBy Example: ['name' => 'ASC', 'surname' => 'DESC']. |
||
213 | * @param int|null $limit Default is 10. |
||
214 | * @param int|null $offset Default is 0. |
||
215 | * |
||
216 | * @return array|DocumentIterator The objects. |
||
217 | */ |
||
218 | public function findBy( |
||
219 | array $criteria, |
||
220 | array $orderBy = [], |
||
221 | int $limit = 10, |
||
222 | int $offset = 0 |
||
223 | ) { |
||
224 | $search = $this->createSearch(); |
||
225 | $search->setSize($limit); |
||
226 | $search->setFrom($offset); |
||
227 | |||
228 | foreach ($criteria as $field => $value) { |
||
229 | $search->addQuery(new TermQuery($field, $value)); |
||
230 | } |
||
231 | |||
232 | foreach ($orderBy as $field => $direction) { |
||
233 | $search->addSort(new FieldSort($field, $direction)); |
||
234 | } |
||
235 | |||
236 | return $this->findDocuments($search); |
||
237 | } |
||
238 | |||
239 | /** |
||
240 | * Finds a single document by a set of criteria. |
||
241 | * |
||
242 | * @param array $criteria Example: ['group' => ['best', 'worst'], 'job' => 'medic']. |
||
243 | * @param array|null $orderBy Example: ['name' => 'ASC', 'surname' => 'DESC']. |
||
244 | * |
||
245 | * @return object|null The object. |
||
246 | */ |
||
247 | public function findOneBy(array $criteria, array $orderBy = []) |
||
248 | { |
||
249 | return $this->findBy($criteria, $orderBy, 1)->current(); |
||
250 | } |
||
251 | |||
252 | public function createSearch(): Search |
||
253 | { |
||
254 | return new Search(); |
||
255 | } |
||
256 | |||
257 | public function getScrollConfiguration($raw, $scrollDuration): array |
||
258 | { |
||
259 | $scrollConfig = []; |
||
260 | if (isset($raw['_scroll_id'])) { |
||
261 | $scrollConfig['_scroll_id'] = $raw['_scroll_id']; |
||
262 | $scrollConfig['duration'] = $scrollDuration; |
||
263 | } |
||
264 | |||
265 | return $scrollConfig; |
||
266 | } |
||
267 | |||
268 | View Code Duplication | public function findDocuments(Search $search): DocumentIterator |
|
0 ignored issues
–
show
|
|||
269 | { |
||
270 | $results = $this->executeSearch($search); |
||
271 | |||
272 | return new DocumentIterator( |
||
273 | $results, |
||
274 | $this, |
||
275 | $this->converter, |
||
276 | $this->getScrollConfiguration($results, $search->getScroll()) |
||
277 | ); |
||
278 | } |
||
279 | |||
280 | View Code Duplication | public function findArray(Search $search): ArrayIterator |
|
0 ignored issues
–
show
This method seems to be duplicated in your project.
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation. You can also find more detailed suggestions in the “Code” section of your repository. ![]() |
|||
281 | { |
||
282 | $results = $this->executeSearch($search); |
||
283 | |||
284 | return new ArrayIterator( |
||
285 | $results, |
||
286 | $this, |
||
287 | $this->converter, |
||
288 | $this->getScrollConfiguration($results, $search->getScroll()) |
||
289 | ); |
||
290 | } |
||
291 | |||
292 | View Code Duplication | public function findRaw(Search $search): RawIterator |
|
0 ignored issues
–
show
This method seems to be duplicated in your project.
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation. You can also find more detailed suggestions in the “Code” section of your repository. ![]() |
|||
293 | { |
||
294 | $results = $this->executeSearch($search); |
||
295 | |||
296 | return new RawIterator( |
||
297 | $results, |
||
298 | $this, |
||
299 | $this->converter, |
||
300 | $this->getScrollConfiguration($results, $search->getScroll()) |
||
301 | ); |
||
302 | } |
||
303 | |||
304 | private function executeSearch(Search $search): array |
||
305 | { |
||
306 | return $this->search($search->toArray(), $search->getUriParams()); |
||
307 | } |
||
308 | |||
309 | public function getIndexDocumentCount(): int |
||
310 | { |
||
311 | $body = [ |
||
312 | 'index' => $this->getIndexName(), |
||
313 | 'body' => [], |
||
314 | ]; |
||
315 | |||
316 | $results = $this->getClient()->count($body); |
||
317 | |||
318 | return $results['count']; |
||
319 | } |
||
320 | |||
321 | public function remove($id, $routing = null) |
||
322 | { |
||
323 | $params = [ |
||
324 | 'index' => $this->getIndexName(), |
||
325 | 'id' => $id, |
||
326 | ]; |
||
327 | |||
328 | if ($routing) { |
||
329 | $params['routing'] = $routing; |
||
330 | } |
||
331 | |||
332 | $response = $this->getClient()->delete($params); |
||
333 | |||
334 | return $response; |
||
335 | } |
||
336 | |||
337 | public function update($id, array $fields = [], $script = null, array $params = []): array |
||
338 | { |
||
339 | $body = array_filter( |
||
340 | [ |
||
341 | 'doc' => $fields, |
||
342 | 'script' => $script, |
||
343 | ] |
||
344 | ); |
||
345 | |||
346 | $params = array_merge( |
||
347 | [ |
||
348 | 'id' => $id, |
||
349 | 'index' => $this->getIndexName(), |
||
350 | 'body' => $body, |
||
351 | ], |
||
352 | $params |
||
353 | ); |
||
354 | |||
355 | return $this->getClient()->update($params); |
||
356 | } |
||
357 | |||
358 | public function search(array $query, array $params = []): array |
||
359 | { |
||
360 | $requestParams = [ |
||
361 | 'index' => $this->getIndexName(), |
||
362 | 'body' => $query, |
||
363 | ]; |
||
364 | |||
365 | |||
366 | if (!empty($params)) { |
||
367 | $requestParams = array_merge($requestParams, $params); |
||
368 | } |
||
369 | |||
370 | // $this->stopwatch('start', 'search'); |
||
371 | $result = $this->getClient()->search($requestParams); |
||
372 | // $this->stopwatch('stop', 'search'); |
||
373 | |||
374 | return $result; |
||
375 | } |
||
376 | |||
377 | /** |
||
378 | * Usage example |
||
379 | * |
||
380 | * $im->bulk('index', ['_id' => 1, 'title' => 'foo']); |
||
381 | * $im->bulk('delete', ['_id' => 2]); |
||
382 | * $im->bulk('create', ['title' => 'foo']); |
||
383 | */ |
||
384 | public function bulk(string $operation, array $data = [], $autoCommit = true): array |
||
385 | { |
||
386 | $bulkParams = [ |
||
387 | '_id' => $data['_id'] ?? null, |
||
388 | ]; |
||
389 | |||
390 | unset($data['_index'], $data['_id']); |
||
391 | |||
392 | $this->eventDispatcher->dispatch( |
||
393 | new BulkEvent($operation, $bulkParams, $data), |
||
394 | Events::BULK |
||
395 | ); |
||
396 | |||
397 | $this->bulkQueries[] = [ $operation => $bulkParams]; |
||
398 | |||
399 | if (!empty($data)) { |
||
400 | $this->bulkQueries[] = $data; |
||
401 | } |
||
402 | |||
403 | $response = []; |
||
404 | |||
405 | // %X is not very accurate, but might be better than use counter. This place is experimental for now. |
||
406 | if ($autoCommit && $this->getBulkCommitSize() <= count($this->bulkQueries) % $this->getBulkCommitSize() / 2) { |
||
407 | $response = $this->commit(); |
||
408 | } |
||
409 | |||
410 | return $response; |
||
411 | } |
||
412 | |||
413 | /** |
||
414 | * Adds document to next flush. |
||
415 | * |
||
416 | * @param object $document |
||
417 | */ |
||
418 | public function persist($document): void |
||
419 | { |
||
420 | $documentArray = array_filter($this->converter->convertDocumentToArray($document), function ($val) { |
||
421 | // remove unset properties but keep other falsy values |
||
422 | return !($val === null); |
||
423 | }); |
||
424 | |||
425 | $this->bulk('index', $documentArray); |
||
426 | } |
||
427 | |||
428 | public function commit($commitMode = 'refresh', array $params = []): array |
||
429 | { |
||
430 | $bulkResponse = []; |
||
431 | if (!empty($this->bulkQueries)) { |
||
432 | $this->eventDispatcher->dispatch( |
||
433 | new CommitEvent($commitMode, $this->bulkQueries, []), |
||
434 | Events::PRE_COMMIT |
||
435 | ); |
||
436 | |||
437 | // $this->stopwatch('start', 'bulk'); |
||
438 | $bulkResponse = $this->client->bulk( |
||
439 | array_merge( |
||
440 | [ |
||
441 | 'index' => $this->getIndexName(), |
||
442 | 'body' => $this->bulkQueries, |
||
443 | ], |
||
444 | $params |
||
445 | ) |
||
446 | ); |
||
447 | // $this->stopwatch('stop', 'bulk'); |
||
448 | |||
449 | if ($bulkResponse['errors']) { |
||
450 | throw new BulkWithErrorsException( |
||
451 | json_encode($bulkResponse), |
||
452 | 0, |
||
453 | null, |
||
454 | $bulkResponse |
||
455 | ); |
||
456 | } |
||
457 | |||
458 | // $this->stopwatch('start', 'refresh'); |
||
459 | switch ($commitMode) { |
||
460 | case 'flush': |
||
461 | $this->getClient()->indices()->flush(); |
||
462 | break; |
||
463 | case 'flush_synced': |
||
464 | $this->getClient()->indices()->flushSynced(); |
||
465 | break; |
||
466 | case 'refresh': |
||
467 | $this->getClient()->indices()->refresh(); |
||
468 | break; |
||
469 | } |
||
470 | |||
471 | $this->eventDispatcher->dispatch( |
||
472 | new CommitEvent($commitMode, $this->bulkQueries, $bulkResponse), |
||
473 | Events::POST_COMMIT |
||
474 | ); |
||
475 | |||
476 | $this->bulkQueries = []; |
||
477 | |||
478 | // $this->stopwatch('stop', $this->getCommitMode()); |
||
479 | } |
||
480 | |||
481 | return $bulkResponse; |
||
482 | } |
||
483 | |||
484 | public function flush(array $params = []): array |
||
485 | { |
||
486 | return $this->client->indices()->flush(array_merge(['index' => $this->getIndexName()], $params)); |
||
487 | } |
||
488 | |||
489 | public function refresh(array $params = []): array |
||
490 | { |
||
491 | return $this->client->indices()->refresh(array_merge(['index' => $this->getIndexName()], $params)); |
||
492 | } |
||
493 | |||
494 | public function scroll($scrollId, $scrollDuration = '5m'): array |
||
495 | { |
||
496 | $results = $this->getClient()->scroll(['scroll_id' => $scrollId, 'scroll' => $scrollDuration]); |
||
497 | |||
498 | return $results; |
||
499 | } |
||
500 | |||
501 | public function clearScroll($scrollId): array |
||
502 | { |
||
503 | return $this->getClient()->clearScroll(['scroll_id' => $scrollId]); |
||
504 | } |
||
505 | |||
506 | public function resetClient(): void |
||
507 | { |
||
508 | $this->client = null; |
||
509 | } |
||
510 | |||
511 | public function clearElasticIndexCache(): array |
||
512 | { |
||
513 | return $this->getClient()->indices()->clearCache(['index' => $this->getIndexName()]); |
||
514 | } |
||
515 | |||
516 | private function stopwatch($action, $name): void |
||
517 | { |
||
518 | if ($this->stopwatch && ($action == 'start' || $action == 'stop')) { |
||
519 | $this->stopwatch->$action('ongr_es: '.$name, 'ongr_es'); |
||
520 | } |
||
521 | } |
||
522 | } |
||
523 |
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.
You can also find more detailed suggestions in the “Code” section of your repository.