Elasticsearch7StorageAdapter::scrollNext()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 15
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
eloc 10
c 1
b 0
f 0
nc 1
nop 2
dl 0
loc 15
ccs 0
cts 4
cp 0
crap 2
rs 9.9332
1
<?php declare(strict_types=1);
2
/**
3
 * This file is part of the daikon-cqrs/elasticsearch7-adapter project.
4
 *
5
 * For the full copyright and license information, please view the LICENSE
6
 * file that was distributed with this source code.
7
 */
8
9
namespace Daikon\Elasticsearch7\Storage;
10
11
use Daikon\Dbal\Exception\DbalException;
12
use Daikon\Elasticsearch7\Connector\Elasticsearch7Connector;
13
use Daikon\Interop\Assert;
14
use Daikon\Metadata\Metadata;
15
use Daikon\ReadModel\Projection\ProjectionMap;
16
use Daikon\ReadModel\Query\QueryInterface;
17
use Daikon\ReadModel\Storage\ScrollAdapterInterface;
18
use Daikon\ReadModel\Storage\SearchAdapterInterface;
19
use Daikon\ReadModel\Storage\StorageAdapterInterface;
20
use Daikon\ReadModel\Storage\StorageResultInterface;
21
use Elasticsearch\Common\Exceptions\Missing404Exception;
22
23
final class Elasticsearch7StorageAdapter implements
24
    StorageAdapterInterface,
25
    SearchAdapterInterface,
26
    ScrollAdapterInterface
27
{
28
    private Elasticsearch7Connector $connector;
29
30
    private array $settings;
31
32
    public function __construct(Elasticsearch7Connector $connector, array $settings = [])
33
    {
34
        $this->connector = $connector;
35
        $this->settings = $settings;
36
    }
37
38
    public function read(string $identifier): StorageResultInterface
39
    {
40
        $projection = [];
41
42
        try {
43
            $document = $this->connector->getConnection()->get(
44
                array_merge($this->settings['read'] ?? [], [
45
                    'index' => $this->getIndex(),
46
                    'id' => $identifier
47
                ])
48
            );
49
            $projectionClass = $document['_source']['@type'];
50
            $projection = [$document['_id'] => $projectionClass::fromNative($document['_source'])];
51
        } catch (Missing404Exception $error) {
52
            // just return an empty result
53
        }
54
55
        return new Elasticsearch7StorageResult(
56
            new ProjectionMap($projection)
57
        );
58
    }
59
60
    public function write(string $identifier, array $data): bool
61
    {
62
        $document = array_merge($this->settings['write'] ?? [], [
63
            'index' => $this->getIndex(),
64
            'id' => $identifier,
65
            'body' => $data
66
        ]);
67
68
        $this->connector->getConnection()->index($document);
69
70
        return true;
71
    }
72
73
    public function delete(string $identifier): bool
74
    {
75
        throw new DbalException('Not implemented.');
76
    }
77
78
    public function search(QueryInterface $query, int $from = null, int $size = null): StorageResultInterface
79
    {
80
        $query = array_merge($this->settings['search'] ?? [], [
81
            'index' => $this->getIndex(),
82
            'from' => $from,
83
            'size' => $size,
84
            'body' => $query->toNative(),
85
            'rest_total_hits_as_int' => true
86
        ]);
87
88
        $results = $this->connector->getConnection()->search($query);
89
90
        return new Elasticsearch7StorageResult(
91
            $this->makeProjectionMap($results['hits']['hits']),
92
            Metadata::fromNative(['total' => $results['hits']['total']])
93
        );
94
    }
95
96
    public function scrollStart(QueryInterface $query, int $size = null, $cursor = null): StorageResultInterface
97
    {
98
        $query = array_merge($this->settings['search'] ?? [], [
99
            'index' => $this->getIndex(),
100
            'size' => $size,
101
            'body' => $query->toNative(),
102
            'scroll' => $this->settings['scroll_timeout'] ?? '1m',
103
            'sort' => ['_doc'],
104
            'rest_total_hits_as_int' => true
105
        ]);
106
107
        $results = $this->connector->getConnection()->search($query);
108
109
        return new Elasticsearch7StorageResult(
110
            $this->makeProjectionMap($results['hits']['hits']),
111
            Metadata::fromNative([
112
                'total' => $results['hits']['total'],
113
                'cursor' => $results['_scroll_id']
114
            ])
115
        );
116
    }
117
118
    public function scrollNext($cursor, int $size = null): StorageResultInterface
119
    {
120
        Assert::that($cursor, 'Invalid cursor.')->string()->notEmpty();
121
122
        $results = $this->connector->getConnection()->scroll([
123
            'scroll_id' => $cursor,
124
            'scroll' => $this->settings['scroll_timeout'] ?? '1m',
125
            'rest_total_hits_as_int' => true
126
        ]);
127
128
        return new Elasticsearch7StorageResult(
129
            $this->makeProjectionMap($results['hits']['hits']),
130
            Metadata::fromNative([
131
                'total' => $results['hits']['total'],
132
                'cursor' => $results['_scroll_id']
133
            ])
134
        );
135
    }
136
137
    public function scrollEnd($cursor): void
138
    {
139
        Assert::that($cursor, 'Invalid cursor.')->string()->notEmpty();
140
141
        $this->connector->getConnection()->clearScroll(['scroll_id' => $cursor]);
142
    }
143
144
    private function getIndex(): string
145
    {
146
        return $this->settings['index'] ?? $this->connector->getSettings()['index'];
147
    }
148
149
    private function makeProjectionMap(array $documents): ProjectionMap
150
    {
151
        $projections = [];
152
        foreach ($documents as $document) {
153
            $projectionClass = $document['_source']['@type'];
154
            $projections[$document['_id']] = $projectionClass::fromNative($document['_source']);
155
        }
156
        return new ProjectionMap($projections);
157
    }
158
}
159