Completed
Pull Request — 1.0 (#638)
by
unknown
03:04
created

Manager::scroll()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 9
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 9
rs 9.6666
cc 1
eloc 6
nc 1
nop 3
1
<?php
2
3
/*
4
 * This file is part of the ONGR package.
5
 *
6
 * (c) NFQ Technologies UAB <[email protected]>
7
 *
8
 * For the full copyright and license information, please view the LICENSE
9
 * file that was distributed with this source code.
10
 */
11
12
namespace ONGR\ElasticsearchBundle\Service;
13
14
use Elasticsearch\Client;
15
use Elasticsearch\Common\Exceptions\Forbidden403Exception;
16
use Elasticsearch\Common\Exceptions\Missing404Exception;
17
use ONGR\ElasticsearchBundle\Mapping\MetadataCollector;
18
use ONGR\ElasticsearchBundle\Result\AbstractResultsIterator;
19
use ONGR\ElasticsearchBundle\Result\Converter;
20
use ONGR\ElasticsearchBundle\Result\DocumentIterator;
21
use ONGR\ElasticsearchBundle\Result\RawIterator;
22
use ONGR\ElasticsearchBundle\Result\Result;
23
use ONGR\ElasticsearchDSL\Search;
24
25
/**
26
 * Manager class.
27
 */
28
class Manager
29
{
30
    /**
31
     * @var string Manager name
32
     */
33
    private $name;
34
35
    /**
36
     * @var array Manager configuration
37
     */
38
    private $config = [];
39
40
    /**
41
     * @var Client
42
     */
43
    private $client;
44
45
    /**
46
     * @var Converter
47
     */
48
    private $converter;
49
50
    /**
51
     * @var array Container for bulk queries
52
     */
53
    private $bulkQueries = [];
54
55
    /**
56
     * @var array Holder for consistency, refresh and replication parameters
57
     */
58
    private $bulkParams = [];
59
60
    /**
61
     * @var array
62
     */
63
    private $indexSettings;
64
65
    /**
66
     * @var MetadataCollector
67
     */
68
    private $metadataCollector;
69
70
    /**
71
     * After commit to make data available the refresh or flush operation is needed
72
     * so one of those methods has to be defined, the default is refresh.
73
     *
74
     * @var string
75
     */
76
    private $commitMode = 'refresh';
77
78
    /**
79
     * The size that defines after how much document inserts call commit function.
80
     *
81
     * @var int
82
     */
83
    private $bulkCommitSize = 100;
84
85
    /**
86
     * Container to count how many documents was passed to the bulk query.
87
     *
88
     * @var int
89
     */
90
    private $bulkCount = 0;
91
92
    /**
93
     * @var Repository[] Repository local cache
94
     */
95
    private $repositories;
96
97
    /**
98
     * @param string            $name              Manager name
99
     * @param array             $config            Manager configuration
100
     * @param Client            $client
101
     * @param array             $indexSettings
102
     * @param MetadataCollector $metadataCollector
103
     * @param Converter         $converter
104
     */
105
    public function __construct(
106
        $name,
107
        array $config,
108
        $client,
109
        array $indexSettings,
110
        $metadataCollector,
111
        $converter
112
    ) {
113
        $this->name = $name;
114
        $this->config = $config;
115
        $this->client = $client;
116
        $this->indexSettings = $indexSettings;
117
        $this->metadataCollector = $metadataCollector;
118
        $this->converter = $converter;
119
    }
120
121
    /**
122
     * Returns Elasticsearch connection.
123
     *
124
     * @return Client
125
     */
126
    public function getClient()
127
    {
128
        return $this->client;
129
    }
130
131
    /**
132
     * @return string
133
     */
134
    public function getName()
135
    {
136
        return $this->name;
137
    }
138
139
    /**
140
     * @return array
141
     */
142
    public function getConfig()
143
    {
144
        return $this->config;
145
    }
146
147
    /**
148
     * Returns repository by document class.
149
     *
150
     * @param string $className FQCN or string in Bundle:Document format
151
     *
152
     * @return Repository
153
     */
154
    public function getRepository($className)
155
    {
156
        if (!is_string($className)) {
157
            throw new \InvalidArgumentException('Document class must be a string.');
158
        }
159
160
        $namespace = $this->getMetadataCollector()->getClassName($className);
161
162
        if (isset($this->repositories[$namespace])) {
163
            return $this->repositories[$namespace];
164
        }
165
166
        $repository = $this->createRepository($namespace);
167
        $this->repositories[$namespace] = $repository;
168
169
        return $repository;
170
    }
171
172
    /**
173
     * @return MetadataCollector
174
     */
175
    public function getMetadataCollector()
176
    {
177
        return $this->metadataCollector;
178
    }
179
180
    /**
181
     * @return Converter
182
     */
183
    public function getConverter()
184
    {
185
        return $this->converter;
186
    }
187
188
    /**
189
     * @return string
190
     */
191
    public function getCommitMode()
192
    {
193
        return $this->commitMode;
194
    }
195
196
    /**
197
     * @param string $commitMode
198
     */
199
    public function setCommitMode($commitMode)
200
    {
201
        if ($commitMode === 'refresh' || $commitMode === 'flush' || $commitMode === 'none') {
202
            $this->commitMode = $commitMode;
203
        } else {
204
            throw new \LogicException('The commit method must be either refresh, flush or none.');
205
        }
206
    }
207
208
    /**
209
     * @return int
210
     */
211
    public function getBulkCommitSize()
212
    {
213
        return $this->bulkCommitSize;
214
    }
215
216
    /**
217
     * @param int $bulkCommitSize
218
     */
219
    public function setBulkCommitSize($bulkCommitSize)
220
    {
221
        $this->bulkCommitSize = $bulkCommitSize;
222
    }
223
224
    /**
225
     * Creates a repository.
226
     *
227
     * @param string $className
228
     *
229
     * @return Repository
230
     */
231
    private function createRepository($className)
232
    {
233
        return new Repository($this, $className);
234
    }
235
236
    /**
237
     * Executes search query in the index.
238
     *
239
     * @param array $types             List of types to search in.
240
     * @param array $query             Query to execute.
241
     * @param array $queryStringParams Query parameters.
242
     *
243
     * @return array
244
     */
245
    public function search(array $types, array $query, array $queryStringParams = [])
246
    {
247
        $params = [];
248
        $params['index'] = $this->getIndexName();
249
        $params['type'] = implode(',', $types);
250
        $params['body'] = $query;
251
252
        if (!empty($queryStringParams)) {
253
            $params = array_merge($queryStringParams, $params);
254
        }
255
256
        return $this->client->search($params);
257
    }
258
259
    /**
260
     * Adds document to next flush.
261
     *
262
     * @param object $document
263
     */
264
    public function persist($document)
265
    {
266
        $documentArray = $this->converter->convertToArray($document);
267
        $type = $this->getMetadataCollector()->getDocumentType(get_class($document));
268
269
        $this->bulk('index', $type, $documentArray);
270
    }
271
272
    /**
273
     * Adds document for removal.
274
     *
275
     * @param object $document
276
     */
277
    public function remove($document)
278
    {
279
        $data = $this->converter->convertToArray($document, [], ['_id']);
280
281
        if (!isset($data['_id'])) {
282
            throw new \LogicException(
283
                'In order to use remove() method document class must have property with @Id annotation.'
284
            );
285
        }
286
287
        $type = $this->getMetadataCollector()->getDocumentType(get_class($document));
288
289
        $this->bulk('delete', $type, ['_id' => $data['_id']]);
290
    }
291
292
    /**
293
     * Flushes elasticsearch index.
294
     *
295
     * @param array $params
296
     *
297
     * @return array
298
     */
299
    public function flush(array $params = [])
300
    {
301
        $params = array_merge($params, ['index' => $this->getIndexName()]);
302
        return $this->client->indices()->flush($params);
303
    }
304
305
    /**
306
     * Refreshes elasticsearch index.
307
     *
308
     * @param array $params
309
     *
310
     * @return array
311
     */
312
    public function refresh(array $params = [])
313
    {
314
        $params = array_merge($params, ['index' => $this->getIndexName()]);
315
        return $this->client->indices()->refresh($params);
316
    }
317
318
    /**
319
     * Inserts the current query container to the index, used for bulk queries execution.
320
     *
321
     * @param array $params Parameters that will be passed to the flush or refresh queries.
322
     *
323
     * @return null|array
324
     */
325
    public function commit(array $params = [])
326
    {
327
        if (!empty($this->bulkQueries)) {
328
            $bulkQueries = array_merge($this->bulkQueries, $this->bulkParams);
329
330
            $bulkResponse = $this->client->bulk($bulkQueries);
331
            $this->bulkQueries = [];
332
            $this->bulkCount = 0;
333
334
            switch ($this->getCommitMode()) {
335
                case 'flush':
336
                    $this->flush($params);
337
                    break;
338
                case 'refresh':
339
                    $this->refresh($params);
340
                    break;
341
            }
342
343
            return $bulkResponse;
344
        }
345
346
        return null;
347
    }
348
349
    /**
350
     * Adds query to bulk queries container.
351
     *
352
     * @param string       $operation One of: index, update, delete, create.
353
     * @param string|array $type      Elasticsearch type name.
354
     * @param array        $query     DSL to execute.
355
     *
356
     * @throws \InvalidArgumentException
357
     *
358
     * @return null|array
359
     */
360
    public function bulk($operation, $type, array $query)
361
    {
362
        if (!in_array($operation, ['index', 'create', 'update', 'delete'])) {
363
            throw new \InvalidArgumentException('Wrong bulk operation selected');
364
        }
365
366
        $this->bulkQueries['body'][] = [
367
            $operation => array_filter(
368
                [
369
                    '_index' => $this->getIndexName(),
370
                    '_type' => $type,
371
                    '_id' => isset($query['_id']) ? $query['_id'] : null,
372
                    '_ttl' => isset($query['_ttl']) ? $query['_ttl'] : null,
373
                    '_parent' => isset($query['_parent']) ? $query['_parent'] : null,
374
                ]
375
            ),
376
        ];
377
        unset($query['_id'], $query['_ttl'], $query['_parent']);
378
379
        switch ($operation) {
380
            case 'index':
381
            case 'create':
382
            case 'update':
383
                $this->bulkQueries['body'][] = $query;
384
                break;
385
            case 'delete':
386
                // Body for delete operation is not needed to apply.
387
            default:
388
                // Do nothing.
389
                break;
390
        }
391
392
        // We are using counter because there is to difficult to resolve this from bulkQueries array.
393
        $this->bulkCount++;
394
395
        $response = null;
396
        if ($this->bulkCommitSize === $this->bulkCount) {
397
            $response = $this->commit();
398
        }
399
400
        return $response;
401
    }
402
403
    /**
404
     * Optional setter to change bulk query params.
405
     *
406
     * @param array $params Possible keys:
407
     *                      ['consistency'] = (enum) Explicit write consistency setting for the operation.
408
     *                      ['refresh']     = (boolean) Refresh the index after performing the operation.
409
     *                      ['replication'] = (enum) Explicitly set the replication type.
410
     */
411
    public function setBulkParams(array $params)
412
    {
413
        $this->bulkParams = $params;
414
    }
415
416
    /**
417
     * Creates fresh elasticsearch index.
418
     *
419
     * @param bool $noMapping Determines if mapping should be included.
420
     *
421
     * @return array
422
     */
423
    public function createIndex($noMapping = false)
424
    {
425
        if ($noMapping) {
426
            unset($this->indexSettings['body']['mappings']);
427
        }
428
429
        return $this->getClient()->indices()->create($this->indexSettings);
430
    }
431
432
    /**
433
     * Drops elasticsearch index.
434
     */
435
    public function dropIndex()
436
    {
437
        return $this->getClient()->indices()->delete(['index' => $this->getIndexName()]);
438
    }
439
440
    /**
441
     * Tries to drop and create fresh elasticsearch index.
442
     *
443
     * @param bool $noMapping Determines if mapping should be included.
444
     *
445
     * @return array
446
     */
447
    public function dropAndCreateIndex($noMapping = false)
448
    {
449
        try {
450
            $this->dropIndex();
451
        } catch (\Exception $e) {
452
            // Do nothing, our target is to create new index.
453
        }
454
455
        return $this->createIndex($noMapping);
456
    }
457
458
    /**
459
     * Checks if connection index is already created.
460
     *
461
     * @return bool
462
     */
463
    public function indexExists()
464
    {
465
        return $this->getClient()->indices()->exists(['index' => $this->getIndexName()]);
466
    }
467
468
    /**
469
     * Returns index name this connection is attached to.
470
     *
471
     * @return string
472
     */
473
    public function getIndexName()
474
    {
475
        return $this->indexSettings['index'];
476
    }
477
478
    /**
479
     * Sets index name for this connection.
480
     *
481
     * @param string $name
482
     */
483
    public function setIndexName($name)
484
    {
485
        $this->indexSettings['index'] = $name;
486
    }
487
488
    /**
489
     * Returns Elasticsearch version number.
490
     *
491
     * @return string
492
     */
493
    public function getVersionNumber()
494
    {
495
        return $this->client->info()['version']['number'];
496
    }
497
498
    /**
499
     * Clears elasticsearch client cache.
500
     */
501
    public function clearCache()
502
    {
503
        $this->getClient()->indices()->clearCache(['index' => $this->getIndexName()]);
504
    }
505
506
    /**
507
     * Returns a single document by ID. Returns NULL if document was not found.
508
     *
509
     * @param string $className Document class name or Elasticsearch type name
510
     * @param string $id        Document ID to find
511
     *
512
     * @return object
513
     */
514
    public function find($className, $id)
515
    {
516
        $type = $this->resolveTypeName($className);
517
518
        $params = [
519
            'index' => $this->getIndexName(),
520
            'type' => $type,
521
            'id' => $id,
522
        ];
523
524
        try {
525
            $result = $this->getClient()->get($params);
526
        } catch (Missing404Exception $e) {
527
            return null;
528
        }
529
530
        return $this->getConverter()->convertToDocument($result, $this);
0 ignored issues
show
Documentation introduced by
$result is of type callable, but the function expects a array.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
531
    }
532
533
    /**
534
     * Executes given search.
535
     *
536
     * @param array  $types
537
     * @param Search $search
538
     * @param string $resultsType
539
     *
540
     * @return DocumentIterator|RawIterator|array
541
     */
542
    public function execute($types, Search $search, $resultsType = Result::RESULTS_OBJECT)
543
    {
544
        foreach ($types as &$type) {
545
            $type = $this->resolveTypeName($type);
546
        }
547
548
        $results = $this->search($types, $search->toArray(), $search->getQueryParams());
549
550
        return $this->parseResult($results, $resultsType, $search->getScroll());
0 ignored issues
show
Documentation introduced by
$results is of type callable, but the function expects a array.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
551
    }
552
553
    /**
554
     * Parses raw result.
555
     *
556
     * @param array  $raw
557
     * @param string $resultsType
558
     * @param string $scrollDuration
559
     *
560
     * @return DocumentIterator|RawIterator|array
561
     *
562
     * @throws \Exception
563
     */
564
    private function parseResult($raw, $resultsType, $scrollDuration = null)
565
    {
566
        $scrollConfig = [];
567
        if (isset($raw['_scroll_id'])) {
568
            $scrollConfig['_scroll_id'] = $raw['_scroll_id'];
569
            $scrollConfig['duration'] = $scrollDuration;
570
        }
571
572
        switch ($resultsType) {
573
            case Result::RESULTS_OBJECT:
574
                return new DocumentIterator($raw, $this, $scrollConfig);
575
            case Result::RESULTS_ARRAY:
576
                return $this->convertToNormalizedArray($raw);
577
            case Result::RESULTS_RAW:
578
                return $raw;
579
            case Result::RESULTS_RAW_ITERATOR:
580
                return new RawIterator($raw, $this, $scrollConfig);
581
            default:
582
                throw new \Exception('Wrong results type selected');
583
        }
584
    }
585
586
    /**
587
     * Normalizes response array.
588
     *
589
     * @param array $data
590
     *
591
     * @return array
592
     */
593
    private function convertToNormalizedArray($data)
594
    {
595
        if (array_key_exists('_source', $data)) {
596
            return $data['_source'];
597
        }
598
599
        $output = [];
600
601
        if (isset($data['hits']['hits'][0]['_source'])) {
602
            foreach ($data['hits']['hits'] as $item) {
603
                $output[] = $item['_source'];
604
            }
605
        } elseif (isset($data['hits']['hits'][0]['fields'])) {
606
            foreach ($data['hits']['hits'] as $item) {
607
                $output[] = array_map('reset', $item['fields']);
608
            }
609
        }
610
611
        return $output;
612
    }
613
614
    /**
615
     * Fetches next set of results.
616
     *
617
     * @param string $scrollId
618
     * @param string $scrollDuration
619
     * @param string $resultsType
620
     *
621
     * @return AbstractResultsIterator
622
     *
623
     * @throws \Exception
624
     */
625
    public function scroll(
626
        $scrollId,
627
        $scrollDuration = '5m',
628
        $resultsType = Result::RESULTS_OBJECT
629
    ) {
630
        $results = $this->getClient()->scroll(['scroll_id' => $scrollId, 'scroll' => $scrollDuration]);
631
632
        return $this->parseResult($results, $resultsType, $scrollDuration);
0 ignored issues
show
Documentation introduced by
$results is of type callable, but the function expects a array.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
633
    }
634
635
    /**
636
     * Clears scroll.
637
     *
638
     * @param string $scrollId
639
     */
640
    public function clearScroll($scrollId)
641
    {
642
        $this->getClient()->clearScroll(['scroll_id' => $scrollId]);
643
    }
644
645
    /**
646
     * Resolves type name by class name.
647
     *
648
     * @param string $className
649
     *
650
     * @return string
651
     */
652
    private function resolveTypeName($className)
653
    {
654
        if (strpos($className, ':') !== false || strpos($className, '\\') !== false) {
655
            return $this->getMetadataCollector()->getDocumentType($className);
656
        }
657
658
        return $className;
659
    }
660
}
661