Completed
Pull Request — master (#1)
by Raymond
03:07
created

HttpClient::updatePersistentSubscription()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 6
Code Lines 4

Duplication

Lines 0
Ratio 0 %
Metric Value
dl 0
loc 6
rs 9.4285
cc 1
eloc 4
nc 1
nop 3
1
<?php
2
3
namespace RayRutjes\GetEventStore\Client\Http;
4
5
use GuzzleHttp\Client;
6
use GuzzleHttp\Exception\TransferException;
7
use Psr\Http\Message\RequestInterface;
8
use Psr\Http\Message\ResponseInterface;
9
use RayRutjes\GetEventStore\ClientInterface;
10
use RayRutjes\GetEventStore\Client\Exception\SystemException;
11
use RayRutjes\GetEventStore\Client\Http\Feed\EventStreamFeedIterator;
12
use RayRutjes\GetEventStore\Client\Http\Feed\EventStreamIterator;
13
use RayRutjes\GetEventStore\EventDataCollection;
14
use RayRutjes\GetEventStore\EventRecordCollection;
15
use RayRutjes\GetEventStore\ExpectedVersion;
16
use RayRutjes\GetEventStore\PersistentSubscriptionInfo;
17
use RayRutjes\GetEventStore\PersistentSubscriptionSettings;
18
use RayRutjes\GetEventStore\StreamId;
19
use RayRutjes\GetEventStore\UserCredentials;
20
21
final class HttpClient implements ClientInterface
22
{
23
    /**
24
     * @var Client
25
     */
26
    private $httpClient;
27
28
    /**
29
     * @param string          $baseUri
30
     * @param UserCredentials $credentials
31
     * @param float           $connectTimeout
32
     * @param array           $httpClientOptions
33
     */
34
    public function __construct(
35
        string $baseUri,
36
        UserCredentials $credentials,
37
        float $connectTimeout = 3,
38
        array $httpClientOptions = []
39
    ) {
40
        $options = array_merge(
41
            $httpClientOptions,
42
            [
43
                'base_uri'        => $baseUri,
44
                'allow_redirects' => false,
45
                'connect_timeout' => $connectTimeout,
46
                'auth'            => [$credentials->getLogin(), $credentials->getPassword()],
47
                'http_errors'     => false, // Let the client handle the status codes for now.
48
            ]
49
        );
50
        $this->httpClient = new Client($options);
51
    }
52
53
    /**
54
     * @param string $streamId
55
     * @param int    $expectedVersion
56
     * @param array  $events
57
     */
58
    public function appendToStream(string $streamId, int $expectedVersion, array $events)
59
    {
60
        $events = EventDataCollection::fromArray($events);
61
        if (0 === $events->count()) {
62
            throw new \InvalidArgumentException('No events provided.');
63
        }
64
65
        $streamId = new StreamId($streamId);
66
        if ($streamId->isSystem()) {
67
            throw new \InvalidArgumentException(sprintf('Can not append to system stream %s', $streamId));
68
        }
69
70
        $expectedVersion = new ExpectedVersion($expectedVersion);
71
72
        $request = new AppendToStreamRequestFactory($streamId, $expectedVersion, $events);
73
74
        $this->send($request->buildRequest(), new AppendToStreamResponseInspector());
75
    }
76
77
    /**
78
     * @param RequestInterface  $request
79
     * @param ResponseInspector $inspector
80
     *
81
     * @return ResponseInterface
82
     *
83
     * @internal
84
     */
85
    public function send(RequestInterface $request, ResponseInspector $inspector): ResponseInterface
86
    {
87
        try {
88
            $response = $this->httpClient->send($request);
89
        } catch (TransferException $e) {
90
            $this->handleTransferException($e);
91
        }
92
93
        $inspector->inspect($response);
94
95
        return $response;
96
    }
97
98
    /**
99
     * @param $e
100
     */
101
    private function handleTransferException(TransferException $e)
102
    {
103
        throw new SystemException($e->getMessage(), $e->getCode(), $e);
104
    }
105
106
    /**
107
     * @param string $streamId
108
     */
109
    public function deleteStream(string $streamId)
110
    {
111
        $streamId = new StreamId($streamId);
112
        if ($streamId->isSystem()) {
113
            throw new \InvalidArgumentException(
114
                sprintf('Can not delete system stream with id %s', $streamId->toString())
115
            );
116
        }
117
118
        $factory = new DeleteStreamRequestFactory($streamId);
119
        $this->send($factory->buildRequest(), new DeleteStreamResponseInspector());
120
    }
121
122
    /**
123
     * With great power comes great responsibility.
124
     *
125
     * @return EventRecordCollection
126
     */
127
    public function readAllEvents(): EventRecordCollection
128
    {
129
        $streamId = new StreamId(StreamId::ALL);
130
131
        return $this->readAllEventsFromStream($streamId->toString());
132
    }
133
134
    /**
135
     * @param string $streamId
136
     *
137
     * @return EventRecordCollection
138
     */
139
    public function readAllEventsFromStream(string $streamId): EventRecordCollection
140
    {
141
        $streamId = new StreamId($streamId);
142
        $feedsIterator = new EventStreamFeedIterator($streamId, $this, true);
143
        $eventsIterator = new EventStreamIterator($feedsIterator);
144
145
        $events = [];
146
        foreach ($eventsIterator as $event) {
147
            $events[] = $event;
148
        }
149
150
        return EventRecordCollection::fromArray($events);
151
    }
152
153
    /**
154
     * Retrieves events recorded since a given version of the stream.
155
     * Does not include the event with number corresponding to the given version.
156
     *
157
     * @param string $streamId
158
     * @param int    $version
159
     *
160
     * @return EventRecordCollection
161
     */
162
    public function readStreamUpToVersion(string $streamId, int $version): EventRecordCollection
163
    {
164
        if ($version <= 0) {
165
            throw new \InvalidArgumentException(sprintf('version should be >= 0, got: %d', $version));
166
        }
167
168
        $streamId = new StreamId($streamId);
169
        // Todo: there are probably more streams to avoid. Thinking of system or metadata streams.
170
        if ($streamId->toString() === StreamId::ALL) {
171
            throw new \InvalidArgumentException(sprintf('Can not catch up %s stream.', StreamId::ALL));
172
        }
173
174
        $feedsIterator = new EventStreamFeedIterator($streamId, $this, false);
175
        $eventsIterator = new EventStreamIterator($feedsIterator);
176
177
        $events = [];
178
        foreach ($eventsIterator as $event) {
179
            if ($event->getNumber() < $version) {
180
                throw new \InvalidArgumentException(
181
                    sprintf('Stream %s has not reached version %d.', $streamId->toString(), $version)
182
                );
183
            }
184
185
            if ($event->getNumber() === $version) {
186
                break;
187
            }
188
            $events[] = $event;
189
        }
190
191
        $events = array_reverse($events);
192
193
        return EventRecordCollection::fromArray($events);
194
    }
195
196
    /**
197
     * @param string                         $streamId
198
     * @param string                         $groupName
199
     * @param PersistentSubscriptionSettings $settings
200
     */
201
    public function createPersistentSubscription(
202
        string $streamId,
203
        string $groupName,
204
        PersistentSubscriptionSettings $settings
205
    ) {
206
        $streamId = new StreamId($streamId);
207
        $factory = new CreatePersistentSubscriptionRequestFactory($streamId, $groupName, $settings);
208
        $this->send($factory->buildRequest(), new CreatePersistentSubscriptionResponseInspector());
209
    }
210
211
    /**
212
     * @param string                         $streamId
213
     * @param string                         $groupName
214
     * @param PersistentSubscriptionSettings $settings
215
     */
216
    public function updatePersistentSubscription(string $streamId, string $groupName, PersistentSubscriptionSettings $settings)
217
    {
218
        $streamId = new StreamId($streamId);
219
        $factory = new UpdatePersistentSubscriptionRequestFactory($streamId, $groupName, $settings);
220
        $this->send($factory->buildRequest(), new UpdatePersistentSubscriptionResponseInspector());
221
    }
222
223
    /**
224
     * @param string $streamId
225
     * @param string $groupName
226
     */
227
    public function deletePersistentSubscription(string $streamId, string $groupName)
228
    {
229
        $streamId = new StreamId($streamId);
230
        $factory = new DeletePersistentSubscriptionRequestFactory($streamId, $groupName);
231
        $this->send($factory->buildRequest(), new DeletePersistentSubscriptionResponseInspector());
232
    }
233
234
    /**
235
     * @param string   $streamId
236
     * @param string   $groupName
237
     * @param callable $messageHandler
238
     * @param int      $bufferSize
239
     * @param bool     $autoAck
240
     */
241
    public function readStreamViaPersistentSubscription(
242
        string $streamId,
243
        string $groupName,
244
        callable $messageHandler,
245
        int $bufferSize = 1,
246
        bool $autoAck = true
247
    ) {
248
        // TODO: Implement readStreamViaPersistentSubscription() method.
249
    }
250
251
    /**
252
     * @param string $streamId
253
     * @param string $groupName
254
     *
255
     * @return PersistentSubscriptionInfo
256
     */
257
    public function getPersistentSubscriptionInfo(string $streamId, string $groupName): PersistentSubscriptionInfo
258
    {
259
        $streamId = new StreamId($streamId);
260
        $factory = new GetPersistentSubscriptionInfoRequestFactory($streamId, $groupName);
261
        $inspector = new GetPersistentSubscriptionInfoResponseInspector();
262
        $this->send($factory->buildRequest(), $inspector);
263
264
        return $inspector->getFeed()->getPersistentSubscriptionInfo();
265
    }
266
}
267