Completed
Push — master ( 5a4989...d650eb )
by Raymond
7s
created

HttpClient::readStreamUpToVersion()   B

Complexity

Conditions 6
Paths 6

Size

Total Lines 33
Code Lines 18

Duplication

Lines 0
Ratio 0 %
Metric Value
dl 0
loc 33
rs 8.439
cc 6
eloc 18
nc 6
nop 2
1
<?php
2
3
namespace RayRutjes\GetEventStore\Client\Http;
4
5
use GuzzleHttp\Client;
6
use GuzzleHttp\Exception\TransferException;
7
use Psr\Http\Message\RequestInterface;
8
use Psr\Http\Message\ResponseInterface;
9
use RayRutjes\GetEventStore\ClientInterface;
10
use RayRutjes\GetEventStore\Client\Exception\SystemException;
11
use RayRutjes\GetEventStore\Client\Http\Feed\EventStreamFeedIterator;
12
use RayRutjes\GetEventStore\Client\Http\Feed\EventStreamIterator;
13
use RayRutjes\GetEventStore\EventDataCollection;
14
use RayRutjes\GetEventStore\EventRecordCollection;
15
use RayRutjes\GetEventStore\ExpectedVersion;
16
use RayRutjes\GetEventStore\StreamId;
17
use RayRutjes\GetEventStore\UserCredentials;
18
19
final class HttpClient implements ClientInterface
20
{
21
    /**
22
     * @var Client
23
     */
24
    private $httpClient;
25
26
    /**
27
     * @param string          $baseUri
28
     * @param UserCredentials $credentials
29
     * @param float           $connectTimeout
30
     * @param array           $httpClientOptions
31
     */
32
    public function __construct(
33
        string $baseUri,
34
        UserCredentials $credentials,
35
        float $connectTimeout = 3,
36
        array $httpClientOptions = []
37
    ) {
38
        $options = array_merge(
39
            $httpClientOptions,
40
            [
41
                'base_uri'        => $baseUri,
42
                'allow_redirects' => false,
43
                'connect_timeout' => $connectTimeout,
44
                'auth'            => [$credentials->getLogin(), $credentials->getPassword()],
45
                'http_errors'     => false, // Let the client handle the status codes for now.
46
            ]
47
        );
48
        $this->httpClient = new Client($options);
49
    }
50
51
    /**
52
     * @param string $streamId
53
     * @param int    $expectedVersion
54
     * @param array  $events
55
     */
56
    public function appendToStream(string $streamId, int $expectedVersion, array $events)
57
    {
58
        $events = EventDataCollection::fromArray($events);
59
        if (0 === $events->count()) {
60
            throw new \InvalidArgumentException('No events provided.');
61
        }
62
63
        $streamId = new StreamId($streamId);
64
        if ($streamId->isSystem()) {
65
            throw new \InvalidArgumentException(sprintf('Can not append to system stream %s', $streamId));
66
        }
67
68
        $expectedVersion = new ExpectedVersion($expectedVersion);
69
70
        $request = new AppendMultipleToStreamRequestFactory($streamId, $expectedVersion, $events);
71
72
        $this->send($request->buildRequest(), new AppendToStreamResponseInspector());
73
    }
74
75
    /**
76
     * @param RequestInterface  $request
77
     * @param ResponseInspector $inspector
78
     *
79
     * @return ResponseInterface
80
     *
81
     * @internal
82
     */
83
    public function send(RequestInterface $request, ResponseInspector $inspector): ResponseInterface
84
    {
85
        try {
86
            $response = $this->httpClient->send($request);
87
        } catch (TransferException $e) {
88
            $this->handleTransferException($e);
89
        }
90
91
        $inspector->inspect($response);
92
93
        return $response;
94
    }
95
96
    /**
97
     * @param $e
98
     */
99
    private function handleTransferException(TransferException $e)
100
    {
101
        throw new SystemException($e->getMessage(), $e->getCode(), $e);
102
    }
103
104
    /**
105
     * @param string $streamId
106
     */
107
    public function deleteStream(string $streamId)
108
    {
109
        $streamId = new StreamId($streamId);
110
        if ($streamId->isSystem()) {
111
            throw new \InvalidArgumentException(
112
                sprintf('Can not delete system stream with id %s', $streamId->toString())
113
            );
114
        }
115
116
        $factory = new DeleteStreamRequestFactory($streamId);
117
        $this->send($factory->buildRequest(), new DeleteStreamResponseInspector());
118
    }
119
120
    /**
121
     * With great power comes great responsibility.
122
     *
123
     * @return EventRecordCollection
124
     */
125
    public function readAllEvents(): EventRecordCollection
126
    {
127
        $streamId = new StreamId(StreamId::ALL);
128
129
        return $this->readAllEventsFromStream($streamId->toString());
130
    }
131
132
    /**
133
     * @param string $streamId
134
     *
135
     * @return EventRecordCollection
136
     */
137
    public function readAllEventsFromStream(string $streamId): EventRecordCollection
138
    {
139
        $streamId = new StreamId($streamId);
140
        $feedsIterator = new EventStreamFeedIterator($streamId, $this, true);
141
        $eventsIterator = new EventStreamIterator($feedsIterator);
142
143
        $events = [];
144
        foreach ($eventsIterator as $event) {
145
            $events[] = $event;
146
        }
147
148
        return EventRecordCollection::fromArray($events);
149
    }
150
151
    /**
152
     * Retrieves events recorded since a given version of the stream.
153
     * Does not include the event with number corresponding to the given version.
154
     *
155
     * @param string $streamId
156
     * @param int    $version
157
     *
158
     * @return EventRecordCollection
159
     */
160
    public function readStreamUpToVersion(string $streamId, int $version): EventRecordCollection
161
    {
162
        if ($version <= 0) {
163
            throw new \InvalidArgumentException(sprintf('version should be >= 0, got: %d', $version));
164
        }
165
166
        $streamId = new StreamId($streamId);
167
        // Todo: there are probably more streams to avoid. Thinking of system or metadata streams.
168
        if ($streamId->toString() === StreamId::ALL) {
169
            throw new \InvalidArgumentException(sprintf('Can not catch up %s stream.', StreamId::ALL));
170
        }
171
172
        $feedsIterator = new EventStreamFeedIterator($streamId, $this, false);
173
        $eventsIterator = new EventStreamIterator($feedsIterator);
174
175
        $events = [];
176
        foreach ($eventsIterator as $event) {
177
            if ($event->getNumber() < $version) {
178
                throw new \InvalidArgumentException(
179
                    sprintf('Stream %s has not reached version %d.', $streamId->toString(), $version)
180
                );
181
            }
182
183
            if ($event->getNumber() === $version) {
184
                break;
185
            }
186
            $events[] = $event;
187
        }
188
189
        $events = array_reverse($events);
190
191
        return EventRecordCollection::fromArray($events);
192
    }
193
}
194