Completed
Pull Request — master (#1)
by Raymond
02:20
created

HttpClient::readAllEventsFromStream()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 13
Code Lines 8

Duplication

Lines 0
Ratio 0 %
Metric Value
dl 0
loc 13
rs 9.4285
cc 2
eloc 8
nc 2
nop 1
1
<?php
2
3
namespace RayRutjes\GetEventStore\Client\Http;
4
5
use GuzzleHttp\Client;
6
use GuzzleHttp\Exception\TransferException;
7
use Psr\Http\Message\RequestInterface;
8
use Psr\Http\Message\ResponseInterface;
9
use RayRutjes\GetEventStore\Client\Http\Feed\EventStreamViaPersistentSubscriptionFeedIterator;
10
use RayRutjes\GetEventStore\Client\Http\Feed\EventStreamViaPersistentSubscriptionIterator;
11
use RayRutjes\GetEventStore\ClientInterface;
12
use RayRutjes\GetEventStore\Client\Exception\SystemException;
13
use RayRutjes\GetEventStore\Client\Http\Feed\EventStreamFeedIterator;
14
use RayRutjes\GetEventStore\Client\Http\Feed\EventStreamIterator;
15
use RayRutjes\GetEventStore\EventDataCollection;
16
use RayRutjes\GetEventStore\EventRecordCollection;
17
use RayRutjes\GetEventStore\ExpectedVersion;
18
use RayRutjes\GetEventStore\PersistentSubscriptionInfo;
19
use RayRutjes\GetEventStore\PersistentSubscriptionSettings;
20
use RayRutjes\GetEventStore\StreamId;
21
use RayRutjes\GetEventStore\UserCredentials;
22
23
final class HttpClient implements ClientInterface
24
{
25
    /**
26
     * @var Client
27
     */
28
    private $httpClient;
29
30
    /**
31
     * @param string          $baseUri
32
     * @param UserCredentials $credentials
33
     * @param float           $connectTimeout
34
     * @param array           $httpClientOptions
35
     */
36
    public function __construct(
37
        string $baseUri,
38
        UserCredentials $credentials,
39
        float $connectTimeout = 3,
40
        array $httpClientOptions = []
41
    ) {
42
        $options = array_merge(
43
            $httpClientOptions,
44
            [
45
                'base_uri'        => $baseUri,
46
                'allow_redirects' => false,
47
                'connect_timeout' => $connectTimeout,
48
                'auth'            => [$credentials->getLogin(), $credentials->getPassword()],
49
                'http_errors'     => false, // Let the client handle the status codes for now.
50
            ]
51
        );
52
        $this->httpClient = new Client($options);
53
    }
54
55
    /**
56
     * @param string $streamId
57
     * @param int    $expectedVersion
58
     * @param array  $events
59
     */
60
    public function appendToStream(string $streamId, int $expectedVersion, array $events)
61
    {
62
        $events = EventDataCollection::fromArray($events);
63
        if (0 === $events->count()) {
64
            throw new \InvalidArgumentException('No events provided.');
65
        }
66
67
        $streamId = new StreamId($streamId);
68
        if ($streamId->isSystem()) {
69
            throw new \InvalidArgumentException(sprintf('Can not append to system stream %s', $streamId));
70
        }
71
72
        $expectedVersion = new ExpectedVersion($expectedVersion);
73
74
        $request = new AppendToStreamRequestFactory($streamId, $expectedVersion, $events);
75
76
        $this->send($request->buildRequest(), new AppendToStreamResponseInspector());
77
    }
78
79
    /**
80
     * @param RequestInterface  $request
81
     * @param ResponseInspector $inspector
82
     *
83
     * @return ResponseInterface
84
     *
85
     * @internal
86
     */
87
    public function send(RequestInterface $request, ResponseInspector $inspector): ResponseInterface
88
    {
89
        try {
90
            $response = $this->httpClient->send($request);
91
        } catch (TransferException $e) {
92
            $this->handleTransferException($e);
93
        }
94
95
        $inspector->inspect($response);
96
97
        return $response;
98
    }
99
100
    /**
101
     * @param $e
102
     */
103
    private function handleTransferException(TransferException $e)
104
    {
105
        throw new SystemException($e->getMessage(), $e->getCode(), $e);
106
    }
107
108
    /**
109
     * @param string $streamId
110
     */
111
    public function deleteStream(string $streamId)
112
    {
113
        $streamId = new StreamId($streamId);
114
        if ($streamId->isSystem()) {
115
            throw new \InvalidArgumentException(
116
                sprintf('Can not delete system stream with id %s', $streamId->toString())
117
            );
118
        }
119
120
        $factory = new DeleteStreamRequestFactory($streamId);
121
        $this->send($factory->buildRequest(), new DeleteStreamResponseInspector());
122
    }
123
124
    /**
125
     * With great power comes great responsibility.
126
     *
127
     * @return EventRecordCollection
128
     */
129
    public function readAllEvents(): EventRecordCollection
130
    {
131
        $streamId = new StreamId(StreamId::ALL);
132
133
        return $this->readAllEventsFromStream($streamId->toString());
134
    }
135
136
    /**
137
     * @param string $streamId
138
     *
139
     * @return EventRecordCollection
140
     */
141
    public function readAllEventsFromStream(string $streamId): EventRecordCollection
142
    {
143
        $streamId = new StreamId($streamId);
144
        $feedsIterator = new EventStreamFeedIterator($streamId, $this, true);
145
        $eventsIterator = new EventStreamIterator($feedsIterator);
146
147
        $events = [];
148
        foreach ($eventsIterator as $event) {
149
            $events[] = $event;
150
        }
151
152
        return EventRecordCollection::fromArray($events);
153
    }
154
155
    /**
156
     * Retrieves events recorded since a given version of the stream.
157
     * Does not include the event with number corresponding to the given version.
158
     *
159
     * @param string $streamId
160
     * @param int    $version
161
     *
162
     * @return EventRecordCollection
163
     */
164
    public function readStreamUpToVersion(string $streamId, int $version): EventRecordCollection
165
    {
166
        if ($version <= 0) {
167
            throw new \InvalidArgumentException(sprintf('version should be >= 0, got: %d', $version));
168
        }
169
170
        $streamId = new StreamId($streamId);
171
        // Todo: there are probably more streams to avoid. Thinking of system or metadata streams.
172
        if ($streamId->toString() === StreamId::ALL) {
173
            throw new \InvalidArgumentException(sprintf('Can not catch up %s stream.', StreamId::ALL));
174
        }
175
176
        $feedsIterator = new EventStreamFeedIterator($streamId, $this, false);
177
        $eventsIterator = new EventStreamIterator($feedsIterator);
178
179
        $events = [];
180
        foreach ($eventsIterator as $event) {
181
            if ($event->getNumber() < $version) {
182
                throw new \InvalidArgumentException(
183
                    sprintf('Stream %s has not reached version %d.', $streamId->toString(), $version)
184
                );
185
            }
186
187
            if ($event->getNumber() === $version) {
188
                break;
189
            }
190
            $events[] = $event;
191
        }
192
193
        $events = array_reverse($events);
194
195
        return EventRecordCollection::fromArray($events);
196
    }
197
198
    /**
199
     * @param string                         $streamId
200
     * @param string                         $groupName
201
     * @param PersistentSubscriptionSettings $settings
202
     */
203
    public function createPersistentSubscription(
204
        string $streamId,
205
        string $groupName,
206
        PersistentSubscriptionSettings $settings
207
    ) {
208
        $streamId = new StreamId($streamId);
209
        $factory = new CreatePersistentSubscriptionRequestFactory($streamId, $groupName, $settings);
210
        $this->send($factory->buildRequest(), new CreatePersistentSubscriptionResponseInspector());
211
    }
212
213
    /**
214
     * @param string                         $streamId
215
     * @param string                         $groupName
216
     * @param PersistentSubscriptionSettings $settings
217
     */
218
    public function updatePersistentSubscription(string $streamId, string $groupName, PersistentSubscriptionSettings $settings)
219
    {
220
        $streamId = new StreamId($streamId);
221
        $factory = new UpdatePersistentSubscriptionRequestFactory($streamId, $groupName, $settings);
222
        $this->send($factory->buildRequest(), new UpdatePersistentSubscriptionResponseInspector());
223
    }
224
225
    /**
226
     * @param string $streamId
227
     * @param string $groupName
228
     */
229
    public function deletePersistentSubscription(string $streamId, string $groupName)
230
    {
231
        $streamId = new StreamId($streamId);
232
        $factory = new DeletePersistentSubscriptionRequestFactory($streamId, $groupName);
233
        $this->send($factory->buildRequest(), new DeletePersistentSubscriptionResponseInspector());
234
    }
235
236
    /**
237
     * @param string   $streamId
238
     * @param string   $groupName
239
     * @param callable $messageHandler
240
     * @param int      $batchSize
241
     * @param bool     $autoAck
242
     */
243
    public function readStreamViaPersistentSubscription(
244
        string $streamId,
245
        string $groupName,
246
        callable $messageHandler,
247
        int $batchSize = 1,
248
        bool $autoAck = true
249
    ) {
250
        $streamId = new StreamId($streamId);
251
        $feedsIterator = new EventStreamViaPersistentSubscriptionFeedIterator($streamId, $groupName, $this, $batchSize);
252
        $eventsIterator = new EventStreamViaPersistentSubscriptionIterator($feedsIterator);
253
        // todo: missing a subscription here to ack/nack messages on a single basis or multiple basis.
254
        foreach ($eventsIterator as $event) {
255
            try {
256
                call_user_func($messageHandler, $event);
257
            } catch (\Exception $e) {
258
                // todo: notAck!
259
                throw $e;
260
            }
261
            // todo: ack!
262
        }
263
264
        // todo: ackAll ???
0 ignored issues
show
Unused Code Comprehensibility introduced by
38% of this comment could be valid code. Did you maybe forget this after debugging?

Sometimes obsolete code just ends up commented out instead of removed. In this case it is better to remove the code once you have checked you do not need it.

The code might also have been commented out for debugging purposes. In this case it is vital that someone uncomments it again or your project may behave in very unexpected ways in production.

This check looks for comments that seem to be mostly valid code and reports them.

Loading history...
265
    }
266
267
    /**
268
     * @param string $streamId
269
     * @param string $groupName
270
     *
271
     * @return PersistentSubscriptionInfo
272
     */
273
    public function getPersistentSubscriptionInfo(string $streamId, string $groupName): PersistentSubscriptionInfo
274
    {
275
        $streamId = new StreamId($streamId);
276
        $factory = new GetPersistentSubscriptionInfoRequestFactory($streamId, $groupName);
277
        $inspector = new GetPersistentSubscriptionInfoResponseInspector();
278
        $this->send($factory->buildRequest(), $inspector);
279
280
        return $inspector->getFeed()->getPersistentSubscriptionInfo();
281
    }
282
}
283