1 | <?php declare(strict_types=1); |
||
12 | use Daikon\Dbal\Exception\DbalException; |
||
13 | use Daikon\Dbal\Exception\DocumentConflict; |
||
14 | use Daikon\EventSourcing\EventStore\Commit\CommitSequence; |
||
15 | use Daikon\EventSourcing\EventStore\Commit\CommitSequenceInterface; |
||
16 | use Daikon\EventSourcing\EventStore\Storage\StorageAdapterInterface; |
||
17 | use GuzzleHttp\Exception\BadResponseException; |
||
18 | use GuzzleHttp\Psr7\Request; |
||
19 | use GuzzleHttp\Psr7\Response; |
||
20 | |||
21 | final class CouchDbStorageAdapter implements StorageAdapterInterface |
||
22 | { |
||
23 | private CouchDbConnector $connector; |
||
24 | |||
25 | private array $settings; |
||
26 | |||
27 | public function __construct(CouchDbConnector $connector, array $settings = []) |
||
28 | { |
||
29 | $this->connector = $connector; |
||
30 | $this->settings = $settings; |
||
31 | } |
||
32 | |||
33 | public function load(string $identifier, string $from = null, string $to = null): CommitSequenceInterface |
||
34 | { |
||
35 | $viewPath = sprintf( |
||
36 | '/_design/%s/_view/%s', |
||
37 | $this->settings['design_doc'], |
||
38 | $this->settings['view_name'] ?? 'commit_stream' |
||
39 | ); |
||
40 | |||
41 | $viewParams = [ |
||
42 | 'startkey' => sprintf('["%s", %s]', $identifier, $from ?: '{}'), |
||
43 | 'endkey' => sprintf('["%s", %s]', $identifier, $to ?: 1), |
||
44 | 'include_docs' => 'true', |
||
45 | 'reduce' => 'false', |
||
46 | 'descending' => 'true', |
||
47 | 'limit' => 5000 //@todo use snapshot size config setting as soon as available |
||
48 | ]; |
||
49 | |||
50 | /** @var Response $response */ |
||
51 | $response = $this->request($viewPath, 'GET', [], $viewParams); |
||
52 | $rawResponse = json_decode((string)$response->getBody(), true); |
||
53 | |||
54 | if (!isset($rawResponse['rows'])) { |
||
55 | //@todo add error logging |
||
56 | throw new DbalException('Failed to load data for '.$identifier); |
||
57 | } |
||
58 | |||
59 | return CommitSequence::fromNative( |
||
60 | array_map( |
||
61 | fn(array $commit): array => $commit['doc'], |
||
|
|||
62 | array_reverse($rawResponse['rows']) |
||
63 | ) |
||
64 | ); |
||
65 | } |
||
66 | |||
67 | public function append(string $identifier, array $body): void |
||
68 | { |
||
69 | /** @var Response $response */ |
||
70 | $response = $this->request($identifier, 'PUT', $body); |
||
71 | |||
72 | if ($response->getStatusCode() === 409) { |
||
73 | throw new DocumentConflict; |
||
74 | } |
||
75 | |||
76 | $rawResponse = json_decode((string)$response->getBody(), true); |
||
77 | if (!isset($rawResponse['ok']) || !isset($rawResponse['rev'])) { |
||
78 | //@todo add error logging |
||
79 | throw new DbalException('Failed to append data for '.$identifier); |
||
80 | } |
||
81 | } |
||
82 | |||
83 | public function purge(string $identifier): void |
||
84 | { |
||
85 | throw new DbalException('Not implemented'); |
||
86 | } |
||
87 | |||
88 | /** @return mixed */ |
||
89 | private function request(string $identifier, string $method, array $body = [], array $params = []) |
||
116 |