Completed
Pull Request — 1.1 (#646)
by
unknown
18:44
created

Manager::resolveTypeName()   A

Complexity

Conditions 3
Paths 2

Size

Total Lines 8
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 8
rs 9.4285
cc 3
eloc 4
nc 2
nop 1
1
<?php
2
3
/*
4
 * This file is part of the ONGR package.
5
 *
6
 * (c) NFQ Technologies UAB <[email protected]>
7
 *
8
 * For the full copyright and license information, please view the LICENSE
9
 * file that was distributed with this source code.
10
 */
11
12
namespace ONGR\ElasticsearchBundle\Service;
13
14
use Elasticsearch\Client;
15
use Elasticsearch\Common\Exceptions\Missing404Exception;
16
use ONGR\ElasticsearchBundle\Mapping\MetadataCollector;
17
use ONGR\ElasticsearchBundle\Result\AbstractResultsIterator;
18
use ONGR\ElasticsearchBundle\Result\Converter;
19
use ONGR\ElasticsearchBundle\Result\DocumentIterator;
20
use ONGR\ElasticsearchBundle\Result\RawIterator;
21
use ONGR\ElasticsearchBundle\Result\Result;
22
use ONGR\ElasticsearchDSL\Search;
23
24
/**
25
 * Manager class.
26
 */
27
class Manager
28
{
29
    /**
30
     * @var string Manager name
31
     */
32
    private $name;
33
34
    /**
35
     * @var array Manager configuration
36
     */
37
    private $config = [];
38
39
    /**
40
     * @var Client
41
     */
42
    private $client;
43
44
    /**
45
     * @var Converter
46
     */
47
    private $converter;
48
49
    /**
50
     * @var array Container for bulk queries
51
     */
52
    private $bulkQueries = [];
53
54
    /**
55
     * @var array Holder for consistency, refresh and replication parameters
56
     */
57
    private $bulkParams = [];
58
59
    /**
60
     * @var array
61
     */
62
    private $indexSettings;
63
64
    /**
65
     * @var MetadataCollector
66
     */
67
    private $metadataCollector;
68
69
    /**
70
     * After commit to make data available the refresh or flush operation is needed
71
     * so one of those methods has to be defined, the default is refresh.
72
     *
73
     * @var string
74
     */
75
    private $commitMode = 'refresh';
76
77
    /**
78
     * The size that defines after how much document inserts call commit function.
79
     *
80
     * @var int
81
     */
82
    private $bulkCommitSize = 100;
83
84
    /**
85
     * Container to count how many documents was passed to the bulk query.
86
     *
87
     * @var int
88
     */
89
    private $bulkCount = 0;
90
91
    /**
92
     * @var Repository[] Repository local cache
93
     */
94
    private $repositories;
95
96
    /**
97
     * @param string            $name              Manager name
98
     * @param array             $config            Manager configuration
99
     * @param Client            $client
100
     * @param array             $indexSettings
101
     * @param MetadataCollector $metadataCollector
102
     * @param Converter         $converter
103
     */
104
    public function __construct(
105
        $name,
106
        array $config,
107
        $client,
108
        array $indexSettings,
109
        $metadataCollector,
110
        $converter
111
    ) {
112
        $this->name = $name;
113
        $this->config = $config;
114
        $this->client = $client;
115
        $this->indexSettings = $indexSettings;
116
        $this->metadataCollector = $metadataCollector;
117
        $this->converter = $converter;
118
    }
119
120
    /**
121
     * Returns Elasticsearch connection.
122
     *
123
     * @return Client
124
     */
125
    public function getClient()
126
    {
127
        return $this->client;
128
    }
129
130
    /**
131
     * @return string
132
     */
133
    public function getName()
134
    {
135
        return $this->name;
136
    }
137
138
    /**
139
     * @return array
140
     */
141
    public function getConfig()
142
    {
143
        return $this->config;
144
    }
145
146
    /**
147
     * Returns repository by document class.
148
     *
149
     * @param string $className FQCN or string in Bundle:Document format
150
     *
151
     * @return Repository
152
     */
153
    public function getRepository($className)
154
    {
155
        if (!is_string($className)) {
156
            throw new \InvalidArgumentException('Document class must be a string.');
157
        }
158
159
        $namespace = $this->getMetadataCollector()->getClassName($className);
160
161
        if (isset($this->repositories[$namespace])) {
162
            return $this->repositories[$namespace];
163
        }
164
165
        $repository = $this->createRepository($namespace);
166
        $this->repositories[$namespace] = $repository;
167
168
        return $repository;
169
    }
170
171
    /**
172
     * @return MetadataCollector
173
     */
174
    public function getMetadataCollector()
175
    {
176
        return $this->metadataCollector;
177
    }
178
179
    /**
180
     * @return Converter
181
     */
182
    public function getConverter()
183
    {
184
        return $this->converter;
185
    }
186
187
    /**
188
     * @return string
189
     */
190
    public function getCommitMode()
191
    {
192
        return $this->commitMode;
193
    }
194
195
    /**
196
     * @param string $commitMode
197
     */
198
    public function setCommitMode($commitMode)
199
    {
200
        if ($commitMode === 'refresh' || $commitMode === 'flush' || $commitMode === 'none') {
201
            $this->commitMode = $commitMode;
202
        } else {
203
            throw new \LogicException('The commit method must be either refresh, flush or none.');
204
        }
205
    }
206
207
    /**
208
     * @return int
209
     */
210
    public function getBulkCommitSize()
211
    {
212
        return $this->bulkCommitSize;
213
    }
214
215
    /**
216
     * @param int $bulkCommitSize
217
     */
218
    public function setBulkCommitSize($bulkCommitSize)
219
    {
220
        $this->bulkCommitSize = $bulkCommitSize;
221
    }
222
223
    /**
224
     * Creates a repository.
225
     *
226
     * @param string $className
227
     *
228
     * @return Repository
229
     */
230
    private function createRepository($className)
231
    {
232
        return new Repository($this, $className);
233
    }
234
235
    /**
236
     * Executes search query in the index.
237
     *
238
     * @param array $types             List of types to search in.
239
     * @param array $query             Query to execute.
240
     * @param array $queryStringParams Query parameters.
241
     *
242
     * @return array
243
     */
244
    public function search(array $types, array $query, array $queryStringParams = [])
245
    {
246
        $params = [];
247
        $params['index'] = $this->getIndexName();
248
        $params['type'] = implode(',', $types);
249
        $params['body'] = $query;
250
251
        if (!empty($queryStringParams)) {
252
            $params = array_merge($queryStringParams, $params);
253
        }
254
255
        return $this->client->search($params);
256
    }
257
258
    /**
259
     * Adds document to next flush.
260
     *
261
     * @param object $document
262
     */
263
    public function persist($document)
264
    {
265
        $documentArray = $this->converter->convertToArray($document);
266
        $type = $this->getMetadataCollector()->getDocumentType(get_class($document));
267
268
        $this->bulk('index', $type, $documentArray);
269
    }
270
271
    /**
272
     * Adds document for removal.
273
     *
274
     * @param object $document
275
     */
276
    public function remove($document)
277
    {
278
        $data = $this->converter->convertToArray($document, [], ['_id']);
279
280
        if (!isset($data['_id'])) {
281
            throw new \LogicException(
282
                'In order to use remove() method document class must have property with @Id annotation.'
283
            );
284
        }
285
286
        $type = $this->getMetadataCollector()->getDocumentType(get_class($document));
287
288
        $this->bulk('delete', $type, ['_id' => $data['_id']]);
289
    }
290
291
    /**
292
     * Flushes elasticsearch index.
293
     *
294
     * @param array $params
295
     *
296
     * @return array
297
     */
298
    public function flush(array $params = [])
299
    {
300
        return $this->client->indices()->flush($params);
301
    }
302
303
    /**
304
     * Refreshes elasticsearch index.
305
     *
306
     * @param array $params
307
     *
308
     * @return array
309
     */
310
    public function refresh(array $params = [])
311
    {
312
        return $this->client->indices()->refresh($params);
313
    }
314
315
    /**
316
     * Inserts the current query container to the index, used for bulk queries execution.
317
     *
318
     * @param array $params Parameters that will be passed to the flush or refresh queries.
319
     *
320
     * @return null|array
321
     */
322
    public function commit(array $params = [])
323
    {
324
        if (!empty($this->bulkQueries)) {
325
            $bulkQueries = array_merge($this->bulkQueries, $this->bulkParams);
326
327
            $bulkResponse = $this->client->bulk($bulkQueries);
328
            $this->bulkQueries = [];
329
            $this->bulkCount = 0;
330
331
            switch ($this->getCommitMode()) {
332
                case 'flush':
333
                    $this->flush($params);
334
                    break;
335
                case 'refresh':
336
                    $this->refresh($params);
337
                    break;
338
            }
339
340
            return $bulkResponse;
341
        }
342
343
        return null;
344
    }
345
346
    /**
347
     * Adds query to bulk queries container.
348
     *
349
     * @param string       $operation One of: index, update, delete, create.
350
     * @param string|array $type      Elasticsearch type name.
351
     * @param array        $query     DSL to execute.
352
     *
353
     * @throws \InvalidArgumentException
354
     *
355
     * @return null|array
356
     */
357
    public function bulk($operation, $type, array $query)
358
    {
359
        if (!in_array($operation, ['index', 'create', 'update', 'delete'])) {
360
            throw new \InvalidArgumentException('Wrong bulk operation selected');
361
        }
362
363
        $this->bulkQueries['body'][] = [
364
            $operation => array_filter(
365
                [
366
                    '_index' => $this->getIndexName(),
367
                    '_type' => $type,
368
                    '_id' => isset($query['_id']) ? $query['_id'] : null,
369
                    '_ttl' => isset($query['_ttl']) ? $query['_ttl'] : null,
370
                    '_parent' => isset($query['_parent']) ? $query['_parent'] : null,
371
                ]
372
            ),
373
        ];
374
        unset($query['_id'], $query['_ttl'], $query['_parent']);
375
376
        switch ($operation) {
377
            case 'index':
378
            case 'create':
379
            case 'update':
380
                $this->bulkQueries['body'][] = $query;
381
                break;
382
            case 'delete':
383
                // Body for delete operation is not needed to apply.
384
            default:
385
                // Do nothing.
386
                break;
387
        }
388
389
        // We are using counter because there is to difficult to resolve this from bulkQueries array.
390
        $this->bulkCount++;
391
392
        $response = null;
393
        if ($this->bulkCommitSize === $this->bulkCount) {
394
            $response = $this->commit();
395
        }
396
397
        return $response;
398
    }
399
400
    /**
401
     * Optional setter to change bulk query params.
402
     *
403
     * @param array $params Possible keys:
404
     *                      ['consistency'] = (enum) Explicit write consistency setting for the operation.
405
     *                      ['refresh']     = (boolean) Refresh the index after performing the operation.
406
     *                      ['replication'] = (enum) Explicitly set the replication type.
407
     */
408
    public function setBulkParams(array $params)
409
    {
410
        $this->bulkParams = $params;
411
    }
412
413
    /**
414
     * Creates fresh elasticsearch index.
415
     *
416
     * @param bool $noMapping Determines if mapping should be included.
417
     *
418
     * @return array
419
     */
420
    public function createIndex($noMapping = false)
421
    {
422
        if ($noMapping) {
423
            unset($this->indexSettings['body']['mappings']);
424
        }
425
426
        return $this->getClient()->indices()->create($this->indexSettings);
427
    }
428
429
    /**
430
     * Drops elasticsearch index.
431
     */
432
    public function dropIndex()
433
    {
434
        return $this->getClient()->indices()->delete(['index' => $this->getIndexName()]);
435
    }
436
437
    /**
438
     * Tries to drop and create fresh elasticsearch index.
439
     *
440
     * @param bool $noMapping Determines if mapping should be included.
441
     *
442
     * @return array
443
     */
444
    public function dropAndCreateIndex($noMapping = false)
445
    {
446
        try {
447
            $this->dropIndex();
448
        } catch (\Exception $e) {
449
            // Do nothing, our target is to create new index.
450
        }
451
452
        return $this->createIndex($noMapping);
453
    }
454
455
    /**
456
     * Checks if connection index is already created.
457
     *
458
     * @return bool
459
     */
460
    public function indexExists()
461
    {
462
        return $this->getClient()->indices()->exists(['index' => $this->getIndexName()]);
463
    }
464
465
    /**
466
     * Returns index name this connection is attached to.
467
     *
468
     * @return string
469
     */
470
    public function getIndexName()
471
    {
472
        return $this->indexSettings['index'];
473
    }
474
475
    /**
476
     * Sets index name for this connection.
477
     *
478
     * @param string $name
479
     */
480
    public function setIndexName($name)
481
    {
482
        $this->indexSettings['index'] = $name;
483
    }
484
485
    /**
486
     * Returns Elasticsearch version number.
487
     *
488
     * @return string
489
     */
490
    public function getVersionNumber()
491
    {
492
        return $this->client->info()['version']['number'];
493
    }
494
495
    /**
496
     * Clears elasticsearch client cache.
497
     */
498
    public function clearCache()
499
    {
500
        $this->getClient()->indices()->clearCache(['index' => $this->getIndexName()]);
501
    }
502
503
    /**
504
     * Returns a single document by ID. Returns NULL if document was not found.
505
     *
506
     * @param string $className Document class name or Elasticsearch type name
507
     * @param string $id        Document ID to find
508
     *
509
     * @return object
510
     */
511
    public function find($className, $id)
512
    {
513
        $type = $this->resolveTypeName($className);
514
515
        $params = [
516
            'index' => $this->getIndexName(),
517
            'type' => $type,
518
            'id' => $id,
519
        ];
520
521
        try {
522
            $result = $this->getClient()->get($params);
523
        } catch (Missing404Exception $e) {
524
            return null;
525
        }
526
527
        return $this->getConverter()->convertToDocument($result, $this);
0 ignored issues
show
Documentation introduced by
$result is of type callable, but the function expects a array.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
528
    }
529
530
    /**
531
     * Executes given search.
532
     *
533
     * @param array  $types
534
     * @param Search $search
535
     * @param string $resultsType
536
     *
537
     * @return DocumentIterator|RawIterator|array
538
     */
539
    public function execute($types, Search $search, $resultsType = Result::RESULTS_OBJECT)
540
    {
541
        foreach ($types as &$type) {
542
            $type = $this->resolveTypeName($type);
543
        }
544
545
        $results = $this->search($types, $search->toArray(), $search->getQueryParams());
546
547
        return $this->parseResult($results, $resultsType, $search->getScroll());
0 ignored issues
show
Documentation introduced by
$results is of type callable, but the function expects a array.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
548
    }
549
550
    /**
551
     * Parses raw result.
552
     *
553
     * @param array  $raw
554
     * @param string $resultsType
555
     * @param string $scrollDuration
556
     *
557
     * @return DocumentIterator|RawIterator|array
558
     *
559
     * @throws \Exception
560
     */
561
    private function parseResult($raw, $resultsType, $scrollDuration = null)
562
    {
563
        $scrollConfig = [];
564
        if (isset($raw['_scroll_id'])) {
565
            $scrollConfig['_scroll_id'] = $raw['_scroll_id'];
566
            $scrollConfig['duration'] = $scrollDuration;
567
        }
568
569
        switch ($resultsType) {
570
            case Result::RESULTS_OBJECT:
571
                return new DocumentIterator($raw, $this, $scrollConfig);
572
            case Result::RESULTS_ARRAY:
573
                return $this->convertToNormalizedArray($raw);
574
            case Result::RESULTS_RAW:
575
                return $raw;
576
            case Result::RESULTS_RAW_ITERATOR:
577
                return new RawIterator($raw, $this, $scrollConfig);
578
            default:
579
                throw new \Exception('Wrong results type selected');
580
        }
581
    }
582
583
    /**
584
     * Normalizes response array.
585
     *
586
     * @param array $data
587
     *
588
     * @return array
589
     */
590
    private function convertToNormalizedArray($data)
591
    {
592
        if (array_key_exists('_source', $data)) {
593
            return $data['_source'];
594
        }
595
596
        $output = [];
597
598
        if (isset($data['hits']['hits'][0]['_source'])) {
599
            foreach ($data['hits']['hits'] as $item) {
600
                $output[] = $item['_source'];
601
            }
602
        } elseif (isset($data['hits']['hits'][0]['fields'])) {
603
            foreach ($data['hits']['hits'] as $item) {
604
                $output[] = array_map('reset', $item['fields']);
605
            }
606
        }
607
608
        return $output;
609
    }
610
611
    /**
612
     * Fetches next set of results.
613
     *
614
     * @param string $scrollId
615
     * @param string $scrollDuration
616
     * @param string $resultsType
617
     *
618
     * @return AbstractResultsIterator
619
     *
620
     * @throws \Exception
621
     */
622
    public function scroll(
623
        $scrollId,
624
        $scrollDuration = '5m',
625
        $resultsType = Result::RESULTS_OBJECT
626
    ) {
627
        $results = $this->getClient()->scroll(['scroll_id' => $scrollId, 'scroll' => $scrollDuration]);
628
629
        return $this->parseResult($results, $resultsType, $scrollDuration);
0 ignored issues
show
Documentation introduced by
$results is of type callable, but the function expects a array.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
630
    }
631
632
    /**
633
     * Clears scroll.
634
     *
635
     * @param string $scrollId
636
     */
637
    public function clearScroll($scrollId)
638
    {
639
        $this->getClient()->clearScroll(['scroll_id' => $scrollId]);
640
    }
641
642
    /**
643
     * Resolves type name by class name.
644
     *
645
     * @param string $className
646
     *
647
     * @return string
648
     */
649
    private function resolveTypeName($className)
650
    {
651
        if (strpos($className, ':') !== false || strpos($className, '\\') !== false) {
652
            return $this->getMetadataCollector()->getDocumentType($className);
653
        }
654
655
        return $className;
656
    }
657
}
658