CouchDbStorageAdapter   A
last analyzed

Complexity

Total Complexity 15

Size/Duplication

Total Lines 93
Duplicated Lines 0 %

Test Coverage

Coverage 0%

Importance

Changes 7
Bugs 0 Features 0
Metric Value
eloc 45
c 7
b 0
f 0
dl 0
loc 93
ccs 0
cts 34
cp 0
rs 10
wmc 15

6 Methods

Rating   Name   Duplication   Size   Complexity  
A append() 0 13 4
A buildUri() 0 8 2
A load() 0 30 4
A __construct() 0 4 1
A purge() 0 3 1
A request() 0 15 3
1
<?php declare(strict_types=1);
2
/**
3
 * This file is part of the daikon-cqrs/couchdb-adapter project.
4
 *
5
 * For the full copyright and license information, please view the LICENSE
6
 * file that was distributed with this source code.
7
 */
8
9
namespace Daikon\CouchDb\Storage;
10
11
use Daikon\CouchDb\Connector\CouchDbConnector;
12
use Daikon\Dbal\Exception\DbalException;
13
use Daikon\Dbal\Exception\DocumentConflict;
14
use Daikon\EventSourcing\EventStore\Commit\CommitSequence;
15
use Daikon\EventSourcing\EventStore\Commit\CommitSequenceInterface;
16
use Daikon\EventSourcing\EventStore\Storage\StorageAdapterInterface;
17
use GuzzleHttp\Exception\BadResponseException;
18
use GuzzleHttp\Psr7\Request;
19
use GuzzleHttp\Psr7\Response;
20
21
final class CouchDbStorageAdapter implements StorageAdapterInterface
22
{
23
    private CouchDbConnector $connector;
24
25
    private array $settings;
26
27
    public function __construct(CouchDbConnector $connector, array $settings = [])
28
    {
29
        $this->connector = $connector;
30
        $this->settings = $settings;
31
    }
32
33
    public function load(string $identifier, string $from = null, string $to = null): CommitSequenceInterface
34
    {
35
        $viewPath = sprintf(
36
            '/_design/%s/_view/%s',
37
            $this->settings['design_doc'],
38
            $this->settings['view_name'] ?? 'commit_stream'
39
        );
40
41
        $viewParams = [
42
            'startkey' => sprintf('["%s", %s]', $identifier, $from ?: '{}'),
43
            'endkey' => sprintf('["%s", %s]', $identifier, $to ?: 1),
44
            'include_docs' => 'true',
45
            'reduce' => 'false',
46
            'descending' => 'true',
47
            'limit' => 5000 //@todo use snapshot size config setting as soon as available
48
        ];
49
50
        /** @var Response $response */
51
        $response = $this->request($viewPath, 'GET', [], $viewParams);
52
        $rawResponse = json_decode((string)$response->getBody(), true);
53
54
        if (!isset($rawResponse['rows'])) {
55
            //@todo add error logging
56
            throw new DbalException("Failed to load data for '$identifier'.");
57
        }
58
59
        return CommitSequence::fromNative(
60
            array_map(
61
                fn(array $commit): array => $commit['doc'],
62
                array_reverse($rawResponse['rows'])
63
            )
64
        );
65
    }
66
67
    public function append(string $identifier, array $data): void
68
    {
69
        /** @var Response $response */
70
        $response = $this->request($identifier, 'PUT', $data);
71
72
        if ($response->getStatusCode() === 409) {
73
            throw new DocumentConflict;
74
        }
75
76
        $rawResponse = json_decode((string)$response->getBody(), true);
77
        if (!isset($rawResponse['ok']) || !isset($rawResponse['rev'])) {
78
            //@todo add error logging
79
            throw new DbalException("Failed to append data for '$identifier'.");
80
        }
81
    }
82
83
    public function purge(string $identifier): void
84
    {
85
        throw new DbalException('Not implemented.');
86
    }
87
88
    /** @return mixed */
89
    private function request(string $identifier, string $method, array $body = [], array $params = [])
90
    {
91
        $uri = $this->buildUri($identifier, $params);
92
93
        $request = empty($body)
94
            ? new Request($method, $uri)
95
            : new Request($method, $uri, [], json_encode($body));
96
97
        try {
98
            $response = $this->connector->getConnection()->send($request);
99
        } catch (BadResponseException $error) {
100
            $response = $error->getResponse();
101
        }
102
103
        return $response;
104
    }
105
106
    private function buildUri(string $identifier, array $params = []): string
107
    {
108
        $settings = $this->connector->getSettings();
109
        $uri = sprintf('/%s/%s', $settings['database'], $identifier);
110
        if (!empty($params)) {
111
            $uri .= '?'.http_build_query($params);
112
        }
113
        return str_replace('//', '/', $uri);
114
    }
115
}
116