Test Failed
Push — master ( d032ff...5c688f )
by Mr
02:06
created

CouchDbStorageAdapter::purge()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 4
rs 10
c 0
b 0
f 0
cc 1
eloc 2
nc 1
nop 1
1
<?php
2
3
namespace Daikon\CouchDb\Storage;
4
5
use Daikon\CouchDb\Connector\CouchDbConnector;
6
use Daikon\Dbal\Exception\DbalException;
7
use Daikon\EventSourcing\EventStore\Commit\CommitSequence;
8
use Daikon\EventSourcing\EventStore\Commit\CommitSequenceInterface;
9
use Daikon\EventSourcing\EventStore\Storage\StorageAdapterInterface;
10
use GuzzleHttp\Exception\RequestException;
11
use GuzzleHttp\Psr7\Request;
12
13
final class CouchDbStorageAdapter implements StorageAdapterInterface
14
{
15
    private $connector;
16
17
    private $settings;
18
19
    public function __construct(CouchDbConnector $connector, array $settings = [])
20
    {
21
        $this->connector = $connector;
22
        $this->settings = $settings;
23
    }
24
25
    public function load(string $identifier): CommitSequenceInterface
26
    {
27
        $viewPath = sprintf(
28
            '/_design/%s/_view/%s',
29
            $this->settings['design_doc'],
30
            $this->settings['view_name'] ?? 'commit_stream'
31
        );
32
33
        $viewParams = [
34
            'startkey' => sprintf('["%s", {}]', $identifier),
35
            'endkey' => sprintf('["%s", 1]', $identifier),
36
            'include_docs' => 'true',
37
            'reduce' => 'false',
38
            'descending' => 'true',
39
            'limit' => 1000 // @todo use snapshot size config setting as soon as available
40
        ];
41
42
        try {
43
            $response = $this->request($viewPath, 'GET', [], $viewParams);
44
            $rawResponse = json_decode($response->getBody(), true);
45
        } catch (RequestException $error) {
46
            if ($error->hasResponse() && $error->getResponse()->getStatusCode() === 404) {
47
                return CommitSequence::makeEmpty();
48
            } else {
49
                throw $error;
50
            }
51
        }
52
53
        if (!isset($rawResponse['total_rows'])) {
54
            throw new DbalException('Failed to read data for '.$identifier);
55
        }
56
57
        return CommitSequence::fromArray(array_map(function (array $commitData) {
58
            return $commitData['doc'];
59
        }, array_reverse($rawResponse['rows'])));
60
    }
61
62
    public function append(string $identifier, array $body): void
63
    {
64
        $response = $this->request($identifier, 'PUT', $body);
65
        $rawResponse = json_decode($response->getBody(), true);
66
67
        if (!isset($rawResponse['ok']) || !isset($rawResponse['rev'])) {
68
            throw new DbalException('Failed to write data for '.$identifier);
69
        }
70
    }
71
72
    public function purge(string $identifier): void
73
    {
74
        throw new DbalException('Not yet implemented');
75
    }
76
77
    private function request(string $identifier, string $method, array $body = [], array $params = [])
78
    {
79
        $uri = $this->buildUri($identifier, $params);
80
81
        $request = empty($body)
82
            ? new Request($method, $uri)
83
            : new Request($method, $uri, [], json_encode($body));
84
85
        return $this->connector->getConnection()->send($request);
86
    }
87
88
    private function buildUri(string $identifier, array $params = [])
89
    {
90
        $settings = $this->connector->getSettings();
91
        $uri = sprintf('/%s/%s', $settings['database'], $identifier);
92
        if (!empty($params)) {
93
            $uri .= '?'.http_build_query($params);
94
        }
95
        return str_replace('//', '/', $uri);
96
    }
97
}
98