Completed
Pull Request — master (#1)
by Raymond
02:16
created

HttpClient::readStreamUpToVersion()   B

Complexity

Conditions 6
Paths 6

Size

Total Lines 33
Code Lines 18

Duplication

Lines 0
Ratio 0 %
Metric Value
dl 0
loc 33
rs 8.439
cc 6
eloc 18
nc 6
nop 2
1
<?php
2
3
namespace RayRutjes\GetEventStore\Client\Http;
4
5
use GuzzleHttp\Client;
6
use GuzzleHttp\Exception\TransferException;
7
use Psr\Http\Message\RequestInterface;
8
use Psr\Http\Message\ResponseInterface;
9
use RayRutjes\GetEventStore\ClientInterface;
10
use RayRutjes\GetEventStore\Client\Exception\SystemException;
11
use RayRutjes\GetEventStore\Client\Http\Feed\EventStreamFeedIterator;
12
use RayRutjes\GetEventStore\Client\Http\Feed\EventStreamIterator;
13
use RayRutjes\GetEventStore\EventDataCollection;
14
use RayRutjes\GetEventStore\EventRecordCollection;
15
use RayRutjes\GetEventStore\ExpectedVersion;
16
use RayRutjes\GetEventStore\PersistentSubscriptionSettings;
17
use RayRutjes\GetEventStore\StreamId;
18
use RayRutjes\GetEventStore\UserCredentials;
19
20
final class HttpClient implements ClientInterface
21
{
22
    /**
23
     * @var Client
24
     */
25
    private $httpClient;
26
27
    /**
28
     * @param string          $baseUri
29
     * @param UserCredentials $credentials
30
     * @param float           $connectTimeout
31
     * @param array           $httpClientOptions
32
     */
33
    public function __construct(
34
        string $baseUri,
35
        UserCredentials $credentials,
36
        float $connectTimeout = 3,
37
        array $httpClientOptions = []
38
    ) {
39
        $options = array_merge(
40
            $httpClientOptions,
41
            [
42
                'base_uri'        => $baseUri,
43
                'allow_redirects' => false,
44
                'connect_timeout' => $connectTimeout,
45
                'auth'            => [$credentials->getLogin(), $credentials->getPassword()],
46
                'http_errors'     => false, // Let the client handle the status codes for now.
47
            ]
48
        );
49
        $this->httpClient = new Client($options);
50
    }
51
52
    /**
53
     * @param string $streamId
54
     * @param int    $expectedVersion
55
     * @param array  $events
56
     */
57
    public function appendToStream(string $streamId, int $expectedVersion, array $events)
58
    {
59
        $events = EventDataCollection::fromArray($events);
60
        if (0 === $events->count()) {
61
            throw new \InvalidArgumentException('No events provided.');
62
        }
63
64
        $streamId = new StreamId($streamId);
65
        if ($streamId->isSystem()) {
66
            throw new \InvalidArgumentException(sprintf('Can not append to system stream %s', $streamId));
67
        }
68
69
        $expectedVersion = new ExpectedVersion($expectedVersion);
70
71
        $request = new AppendToStreamRequestFactory($streamId, $expectedVersion, $events);
72
73
        $this->send($request->buildRequest(), new AppendToStreamResponseInspector());
74
    }
75
76
    /**
77
     * @param RequestInterface  $request
78
     * @param ResponseInspector $inspector
79
     *
80
     * @return ResponseInterface
81
     *
82
     * @internal
83
     */
84
    public function send(RequestInterface $request, ResponseInspector $inspector): ResponseInterface
85
    {
86
        try {
87
            $response = $this->httpClient->send($request);
88
        } catch (TransferException $e) {
89
            $this->handleTransferException($e);
90
        }
91
92
        $inspector->inspect($response);
93
94
        return $response;
95
    }
96
97
    /**
98
     * @param $e
99
     */
100
    private function handleTransferException(TransferException $e)
101
    {
102
        throw new SystemException($e->getMessage(), $e->getCode(), $e);
103
    }
104
105
    /**
106
     * @param string $streamId
107
     */
108
    public function deleteStream(string $streamId)
109
    {
110
        $streamId = new StreamId($streamId);
111
        if ($streamId->isSystem()) {
112
            throw new \InvalidArgumentException(
113
                sprintf('Can not delete system stream with id %s', $streamId->toString())
114
            );
115
        }
116
117
        $factory = new DeleteStreamRequestFactory($streamId);
118
        $this->send($factory->buildRequest(), new DeleteStreamResponseInspector());
119
    }
120
121
    /**
122
     * With great power comes great responsibility.
123
     *
124
     * @return EventRecordCollection
125
     */
126
    public function readAllEvents(): EventRecordCollection
127
    {
128
        $streamId = new StreamId(StreamId::ALL);
129
130
        return $this->readAllEventsFromStream($streamId->toString());
131
    }
132
133
    /**
134
     * @param string $streamId
135
     *
136
     * @return EventRecordCollection
137
     */
138
    public function readAllEventsFromStream(string $streamId): EventRecordCollection
139
    {
140
        $streamId = new StreamId($streamId);
141
        $feedsIterator = new EventStreamFeedIterator($streamId, $this, true);
142
        $eventsIterator = new EventStreamIterator($feedsIterator);
143
144
        $events = [];
145
        foreach ($eventsIterator as $event) {
146
            $events[] = $event;
147
        }
148
149
        return EventRecordCollection::fromArray($events);
150
    }
151
152
    /**
153
     * Retrieves events recorded since a given version of the stream.
154
     * Does not include the event with number corresponding to the given version.
155
     *
156
     * @param string $streamId
157
     * @param int    $version
158
     *
159
     * @return EventRecordCollection
160
     */
161
    public function readStreamUpToVersion(string $streamId, int $version): EventRecordCollection
162
    {
163
        if ($version <= 0) {
164
            throw new \InvalidArgumentException(sprintf('version should be >= 0, got: %d', $version));
165
        }
166
167
        $streamId = new StreamId($streamId);
168
        // Todo: there are probably more streams to avoid. Thinking of system or metadata streams.
169
        if ($streamId->toString() === StreamId::ALL) {
170
            throw new \InvalidArgumentException(sprintf('Can not catch up %s stream.', StreamId::ALL));
171
        }
172
173
        $feedsIterator = new EventStreamFeedIterator($streamId, $this, false);
174
        $eventsIterator = new EventStreamIterator($feedsIterator);
175
176
        $events = [];
177
        foreach ($eventsIterator as $event) {
178
            if ($event->getNumber() < $version) {
179
                throw new \InvalidArgumentException(
180
                    sprintf('Stream %s has not reached version %d.', $streamId->toString(), $version)
181
                );
182
            }
183
184
            if ($event->getNumber() === $version) {
185
                break;
186
            }
187
            $events[] = $event;
188
        }
189
190
        $events = array_reverse($events);
191
192
        return EventRecordCollection::fromArray($events);
193
    }
194
195
    /**
196
     * @param string                         $streamId
197
     * @param string                         $groupName
198
     * @param PersistentSubscriptionSettings $settings
199
     */
200
    public function createPersistentSubscription(
201
        string $streamId,
202
        string $groupName,
203
        PersistentSubscriptionSettings $settings
204
    ) {
205
        $streamId = new StreamId($streamId);
206
        $factory = new CreatePersistentSubscriptionRequestFactory($streamId, $groupName, $settings);
207
        $this->send($factory->buildRequest(), new CreatePersistentSubscriptionResponseInspector());
208
    }
209
210
    /**
211
     * @param string                         $streamId
212
     * @param string                         $groupName
213
     * @param PersistentSubscriptionSettings $settings
214
     */
215
    public function updatePersistentSubscription(string $streamId, string $groupName, PersistentSubscriptionSettings $settings)
216
    {
217
        $streamId = new StreamId($streamId);
218
        $factory = new UpdatePersistentSubscriptionRequestFactory($streamId, $groupName, $settings);
219
        $this->send($factory->buildRequest(), new UpdatePersistentSubscriptionResponseInspector());
220
    }
221
222
    /**
223
     * @param string $streamId
224
     * @param string $groupName
225
     */
226
    public function deletePersistentSubscription(string $streamId, string $groupName)
227
    {
228
        $streamId = new StreamId($streamId);
229
        $factory = new DeletePersistentSubscriptionRequestFactory($streamId, $groupName);
230
        $this->send($factory->buildRequest(), new DeletePersistentSubscriptionResponseInspector());
231
    }
232
233
    /**
234
     * @param string   $streamId
235
     * @param string   $groupName
236
     * @param callable $messageHandler
237
     * @param int      $bufferSize
238
     * @param bool     $autoAck
239
     */
240
    public function readStreamViaPersistentSubscription(
241
        string $streamId,
242
        string $groupName,
243
        callable $messageHandler,
244
        int $bufferSize = 1,
245
        bool $autoAck = true
246
    ) {
247
        // TODO: Implement readStreamViaPersistentSubscription() method.
248
    }
249
}
250