Test Failed
Push — master ( 6a2cd1...15b1c9 )
by Mr
01:29
created

CouchDbStorageAdapter   A

Complexity

Total Complexity 14

Size/Duplication

Total Lines 85
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 5

Importance

Changes 0
Metric Value
wmc 14
lcom 1
cbo 5
dl 0
loc 85
rs 10
c 0
b 0
f 0

6 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 5 1
B read() 0 36 5
A write() 0 9 3
A delete() 0 4 1
A request() 0 10 2
A buildUri() 0 9 2
1
<?php
2
3
namespace Daikon\CouchDb\Storage;
4
5
use Daikon\CouchDb\Connector\CouchDbConnector;
6
use Daikon\Dbal\Exception\DbalException;
7
use Daikon\Dbal\Storage\StorageAdapterInterface;
8
use Daikon\EventSourcing\EventStore\CommitSequence;
9
use GuzzleHttp\Exception\RequestException;
10
use GuzzleHttp\Psr7\Request;
11
12
final class CouchDbStorageAdapter implements StorageAdapterInterface
13
{
14
    private $connector;
15
16
    private $settings;
17
18
    public function __construct(CouchDbConnector $connector, array $settings = [])
19
    {
20
        $this->connector = $connector;
21
        $this->settings = $settings;
22
    }
23
24
    public function read(string $identifier)
25
    {
26
        $viewPath = sprintf(
27
            '/_design/%s/_view/%s',
28
            $this->settings['design_doc'],
29
            $this->settings['view_name'] ?? 'commit_stream'
30
        );
31
32
        $viewParams = [
33
            'startkey' => sprintf('["%s", {}]', $identifier),
34
            'endkey' => sprintf('["%s", 1]', $identifier),
35
            'include_docs' => 'true',
36
            'reduce' => 'false',
37
            'descending' => 'true',
38
            'limit' => 1000 // @todo use snapshot size config setting as soon as available
39
        ];
40
41
        try {
42
            $response = $this->request($viewPath, 'GET', [], $viewParams);
43
            $rawResponse = json_decode($response->getBody(), true);
44
        } catch (RequestException $error) {
45
            if ($error->hasResponse() && $error->getResponse()->getStatusCode() === 404) {
46
                return CommitSequence::makeEmpty();
47
            } else {
48
                throw $error;
49
            }
50
        }
51
52
        if (!isset($rawResponse['total_rows'])) {
53
            throw new DbalException('Failed to read data for '.$identifier);
54
        }
55
56
        return CommitSequence::fromArray(array_map(function (array $commitData) {
57
            return $commitData['doc'];
58
        }, array_reverse($rawResponse['rows'])));
59
    }
60
61
    public function write(string $identifier, array $body)
62
    {
63
        $response = $this->request($identifier, 'PUT', $body);
64
        $rawResponse = json_decode($response->getBody(), true);
65
66
        if (!isset($rawResponse['ok']) || !isset($rawResponse['rev'])) {
67
            throw new DbalException('Failed to write data for '.$identifier);
68
        }
69
    }
70
71
    public function delete(string $identifier)
72
    {
73
        throw new DbalException('Not yet implemented');
74
    }
75
76
    private function request(string $identifier, string $method, array $body = [], array $params = [])
77
    {
78
        $uri = $this->buildUri($identifier, $params);
79
80
        $request = empty($body)
81
            ? new Request($method, $uri)
82
            : new Request($method, $uri, [], json_encode($body));
83
84
        return $this->connector->getConnection()->send($request);
85
    }
86
87
    private function buildUri(string $identifier, array $params = [])
88
    {
89
        $settings = $this->connector->getSettings();
90
        $uri = sprintf('/%s/%s', $settings['database'], $identifier);
91
        if (!empty($params)) {
92
            $uri .= '?'.http_build_query($params);
93
        }
94
        return str_replace('//', '/', $uri);
95
    }
96
}
97