1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace RayRutjes\GetEventStore\Client\Http; |
4
|
|
|
|
5
|
|
|
use GuzzleHttp\Client; |
6
|
|
|
use GuzzleHttp\Exception\TransferException; |
7
|
|
|
use Psr\Http\Message\RequestInterface; |
8
|
|
|
use Psr\Http\Message\ResponseInterface; |
9
|
|
|
use RayRutjes\GetEventStore\Client\Http\Feed\EventStreamViaPersistentSubscriptionFeedIterator; |
10
|
|
|
use RayRutjes\GetEventStore\Client\Http\Feed\EventStreamViaPersistentSubscriptionIterator; |
11
|
|
|
use RayRutjes\GetEventStore\ClientInterface; |
12
|
|
|
use RayRutjes\GetEventStore\Client\Exception\SystemException; |
13
|
|
|
use RayRutjes\GetEventStore\Client\Http\Feed\EventStreamFeedIterator; |
14
|
|
|
use RayRutjes\GetEventStore\Client\Http\Feed\EventStreamIterator; |
15
|
|
|
use RayRutjes\GetEventStore\EventDataCollection; |
16
|
|
|
use RayRutjes\GetEventStore\EventRecordCollection; |
17
|
|
|
use RayRutjes\GetEventStore\ExpectedVersion; |
18
|
|
|
use RayRutjes\GetEventStore\PersistentSubscriptionInfo; |
19
|
|
|
use RayRutjes\GetEventStore\PersistentSubscriptionSettings; |
20
|
|
|
use RayRutjes\GetEventStore\StreamId; |
21
|
|
|
use RayRutjes\GetEventStore\UserCredentials; |
22
|
|
|
|
23
|
|
|
final class HttpClient implements ClientInterface |
24
|
|
|
{ |
25
|
|
|
/** |
26
|
|
|
* @var Client |
27
|
|
|
*/ |
28
|
|
|
private $httpClient; |
29
|
|
|
|
30
|
|
|
/** |
31
|
|
|
* @param string $baseUri |
32
|
|
|
* @param UserCredentials $credentials |
33
|
|
|
* @param float $connectTimeout |
34
|
|
|
* @param array $httpClientOptions |
35
|
|
|
*/ |
36
|
|
|
public function __construct( |
37
|
|
|
string $baseUri, |
38
|
|
|
UserCredentials $credentials, |
39
|
|
|
float $connectTimeout = 3, |
40
|
|
|
array $httpClientOptions = [] |
41
|
|
|
) { |
42
|
|
|
$options = array_merge( |
43
|
|
|
$httpClientOptions, |
44
|
|
|
[ |
45
|
|
|
'base_uri' => $baseUri, |
46
|
|
|
'allow_redirects' => false, |
47
|
|
|
'connect_timeout' => $connectTimeout, |
48
|
|
|
'auth' => [$credentials->getLogin(), $credentials->getPassword()], |
49
|
|
|
'http_errors' => false, // Let the client handle the status codes for now. |
50
|
|
|
] |
51
|
|
|
); |
52
|
|
|
$this->httpClient = new Client($options); |
53
|
|
|
} |
54
|
|
|
|
55
|
|
|
/** |
56
|
|
|
* @param string $streamId |
57
|
|
|
* @param int $expectedVersion |
58
|
|
|
* @param array $events |
59
|
|
|
*/ |
60
|
|
|
public function appendToStream(string $streamId, int $expectedVersion, array $events) |
61
|
|
|
{ |
62
|
|
|
$events = EventDataCollection::fromArray($events); |
63
|
|
|
if (0 === $events->count()) { |
64
|
|
|
throw new \InvalidArgumentException('No events provided.'); |
65
|
|
|
} |
66
|
|
|
|
67
|
|
|
$streamId = new StreamId($streamId); |
68
|
|
|
if ($streamId->isSystem()) { |
69
|
|
|
throw new \InvalidArgumentException(sprintf('Can not append to system stream %s', $streamId)); |
70
|
|
|
} |
71
|
|
|
|
72
|
|
|
$expectedVersion = new ExpectedVersion($expectedVersion); |
73
|
|
|
|
74
|
|
|
$request = new AppendToStreamRequestFactory($streamId, $expectedVersion, $events); |
75
|
|
|
|
76
|
|
|
$this->send($request->buildRequest(), new AppendToStreamResponseInspector()); |
77
|
|
|
} |
78
|
|
|
|
79
|
|
|
/** |
80
|
|
|
* @param RequestInterface $request |
81
|
|
|
* @param ResponseInspector $inspector |
82
|
|
|
* |
83
|
|
|
* @return ResponseInterface |
84
|
|
|
* |
85
|
|
|
* @internal |
86
|
|
|
*/ |
87
|
|
|
public function send(RequestInterface $request, ResponseInspector $inspector): ResponseInterface |
88
|
|
|
{ |
89
|
|
|
try { |
90
|
|
|
$response = $this->httpClient->send($request); |
91
|
|
|
} catch (TransferException $e) { |
92
|
|
|
$this->handleTransferException($e); |
93
|
|
|
} |
94
|
|
|
|
95
|
|
|
$inspector->inspect($response); |
96
|
|
|
|
97
|
|
|
return $response; |
98
|
|
|
} |
99
|
|
|
|
100
|
|
|
/** |
101
|
|
|
* @param $e |
102
|
|
|
*/ |
103
|
|
|
private function handleTransferException(TransferException $e) |
104
|
|
|
{ |
105
|
|
|
throw new SystemException($e->getMessage(), $e->getCode(), $e); |
106
|
|
|
} |
107
|
|
|
|
108
|
|
|
/** |
109
|
|
|
* @param string $streamId |
110
|
|
|
*/ |
111
|
|
|
public function deleteStream(string $streamId) |
112
|
|
|
{ |
113
|
|
|
$streamId = new StreamId($streamId); |
114
|
|
|
if ($streamId->isSystem()) { |
115
|
|
|
throw new \InvalidArgumentException( |
116
|
|
|
sprintf('Can not delete system stream with id %s', $streamId->toString()) |
117
|
|
|
); |
118
|
|
|
} |
119
|
|
|
|
120
|
|
|
$factory = new DeleteStreamRequestFactory($streamId); |
121
|
|
|
$this->send($factory->buildRequest(), new DeleteStreamResponseInspector()); |
122
|
|
|
} |
123
|
|
|
|
124
|
|
|
/** |
125
|
|
|
* With great power comes great responsibility. |
126
|
|
|
* |
127
|
|
|
* @return EventRecordCollection |
128
|
|
|
*/ |
129
|
|
|
public function readAllEvents(): EventRecordCollection |
130
|
|
|
{ |
131
|
|
|
$streamId = new StreamId(StreamId::ALL); |
132
|
|
|
|
133
|
|
|
return $this->readAllEventsFromStream($streamId->toString()); |
134
|
|
|
} |
135
|
|
|
|
136
|
|
|
/** |
137
|
|
|
* @param string $streamId |
138
|
|
|
* |
139
|
|
|
* @return EventRecordCollection |
140
|
|
|
*/ |
141
|
|
|
public function readAllEventsFromStream(string $streamId): EventRecordCollection |
142
|
|
|
{ |
143
|
|
|
$streamId = new StreamId($streamId); |
144
|
|
|
$feedsIterator = new EventStreamFeedIterator($streamId, $this, true); |
145
|
|
|
$eventsIterator = new EventStreamIterator($feedsIterator); |
146
|
|
|
|
147
|
|
|
$events = []; |
148
|
|
|
foreach ($eventsIterator as $event) { |
149
|
|
|
$events[] = $event; |
150
|
|
|
} |
151
|
|
|
|
152
|
|
|
return EventRecordCollection::fromArray($events); |
153
|
|
|
} |
154
|
|
|
|
155
|
|
|
/** |
156
|
|
|
* Retrieves events recorded since a given version of the stream. |
157
|
|
|
* Does not include the event with number corresponding to the given version. |
158
|
|
|
* |
159
|
|
|
* @param string $streamId |
160
|
|
|
* @param int $version |
161
|
|
|
* |
162
|
|
|
* @return EventRecordCollection |
163
|
|
|
*/ |
164
|
|
|
public function readStreamUpToVersion(string $streamId, int $version): EventRecordCollection |
165
|
|
|
{ |
166
|
|
|
if ($version <= 0) { |
167
|
|
|
throw new \InvalidArgumentException(sprintf('version should be >= 0, got: %d', $version)); |
168
|
|
|
} |
169
|
|
|
|
170
|
|
|
$streamId = new StreamId($streamId); |
171
|
|
|
// Todo: there are probably more streams to avoid. Thinking of system or metadata streams. |
172
|
|
|
if ($streamId->toString() === StreamId::ALL) { |
173
|
|
|
throw new \InvalidArgumentException(sprintf('Can not catch up %s stream.', StreamId::ALL)); |
174
|
|
|
} |
175
|
|
|
|
176
|
|
|
$feedsIterator = new EventStreamFeedIterator($streamId, $this, false); |
177
|
|
|
$eventsIterator = new EventStreamIterator($feedsIterator); |
178
|
|
|
|
179
|
|
|
$events = []; |
180
|
|
|
foreach ($eventsIterator as $event) { |
181
|
|
|
if ($event->getNumber() < $version) { |
182
|
|
|
throw new \InvalidArgumentException( |
183
|
|
|
sprintf('Stream %s has not reached version %d.', $streamId->toString(), $version) |
184
|
|
|
); |
185
|
|
|
} |
186
|
|
|
|
187
|
|
|
if ($event->getNumber() === $version) { |
188
|
|
|
break; |
189
|
|
|
} |
190
|
|
|
$events[] = $event; |
191
|
|
|
} |
192
|
|
|
|
193
|
|
|
$events = array_reverse($events); |
194
|
|
|
|
195
|
|
|
return EventRecordCollection::fromArray($events); |
196
|
|
|
} |
197
|
|
|
|
198
|
|
|
/** |
199
|
|
|
* @param string $streamId |
200
|
|
|
* @param string $groupName |
201
|
|
|
* @param PersistentSubscriptionSettings $settings |
202
|
|
|
*/ |
203
|
|
|
public function createPersistentSubscription( |
204
|
|
|
string $streamId, |
205
|
|
|
string $groupName, |
206
|
|
|
PersistentSubscriptionSettings $settings |
207
|
|
|
) { |
208
|
|
|
$streamId = new StreamId($streamId); |
209
|
|
|
$factory = new CreatePersistentSubscriptionRequestFactory($streamId, $groupName, $settings); |
210
|
|
|
$this->send($factory->buildRequest(), new CreatePersistentSubscriptionResponseInspector()); |
211
|
|
|
} |
212
|
|
|
|
213
|
|
|
/** |
214
|
|
|
* @param string $streamId |
215
|
|
|
* @param string $groupName |
216
|
|
|
* @param PersistentSubscriptionSettings $settings |
217
|
|
|
*/ |
218
|
|
|
public function updatePersistentSubscription(string $streamId, string $groupName, PersistentSubscriptionSettings $settings) |
219
|
|
|
{ |
220
|
|
|
$streamId = new StreamId($streamId); |
221
|
|
|
$factory = new UpdatePersistentSubscriptionRequestFactory($streamId, $groupName, $settings); |
222
|
|
|
$this->send($factory->buildRequest(), new UpdatePersistentSubscriptionResponseInspector()); |
223
|
|
|
} |
224
|
|
|
|
225
|
|
|
/** |
226
|
|
|
* @param string $streamId |
227
|
|
|
* @param string $groupName |
228
|
|
|
*/ |
229
|
|
|
public function deletePersistentSubscription(string $streamId, string $groupName) |
230
|
|
|
{ |
231
|
|
|
$streamId = new StreamId($streamId); |
232
|
|
|
$factory = new DeletePersistentSubscriptionRequestFactory($streamId, $groupName); |
233
|
|
|
$this->send($factory->buildRequest(), new DeletePersistentSubscriptionResponseInspector()); |
234
|
|
|
} |
235
|
|
|
|
236
|
|
|
/** |
237
|
|
|
* @param string $streamId |
238
|
|
|
* @param string $groupName |
239
|
|
|
* @param callable $messageHandler |
240
|
|
|
* @param int $batchSize |
241
|
|
|
* @param bool $autoAck |
242
|
|
|
*/ |
243
|
|
|
public function readStreamViaPersistentSubscription( |
244
|
|
|
string $streamId, |
245
|
|
|
string $groupName, |
246
|
|
|
callable $messageHandler, |
247
|
|
|
int $batchSize = 1, |
248
|
|
|
bool $autoAck = true |
249
|
|
|
) { |
250
|
|
|
$streamId = new StreamId($streamId); |
251
|
|
|
$feedsIterator = new EventStreamViaPersistentSubscriptionFeedIterator($streamId, $groupName, $this, $batchSize); |
252
|
|
|
$eventsIterator = new EventStreamViaPersistentSubscriptionIterator($feedsIterator); |
253
|
|
|
// todo: missing a subscription here to ack/nack messages on a single basis or multiple basis. |
254
|
|
|
foreach ($eventsIterator as $event) { |
255
|
|
|
try { |
256
|
|
|
call_user_func($messageHandler, $event); |
257
|
|
|
} catch (\Exception $e) { |
258
|
|
|
// todo: notAck! |
259
|
|
|
throw $e; |
260
|
|
|
} |
261
|
|
|
// todo: ack! |
262
|
|
|
} |
263
|
|
|
|
264
|
|
|
// todo: ackAll ??? |
|
|
|
|
265
|
|
|
} |
266
|
|
|
|
267
|
|
|
/** |
268
|
|
|
* @param string $streamId |
269
|
|
|
* @param string $groupName |
270
|
|
|
* |
271
|
|
|
* @return PersistentSubscriptionInfo |
272
|
|
|
*/ |
273
|
|
|
public function getPersistentSubscriptionInfo(string $streamId, string $groupName): PersistentSubscriptionInfo |
274
|
|
|
{ |
275
|
|
|
$streamId = new StreamId($streamId); |
276
|
|
|
$factory = new GetPersistentSubscriptionInfoRequestFactory($streamId, $groupName); |
277
|
|
|
$inspector = new GetPersistentSubscriptionInfoResponseInspector(); |
278
|
|
|
$this->send($factory->buildRequest(), $inspector); |
279
|
|
|
|
280
|
|
|
return $inspector->getFeed()->getPersistentSubscriptionInfo(); |
281
|
|
|
} |
282
|
|
|
} |
283
|
|
|
|
Sometimes obsolete code just ends up commented out instead of removed. In this case it is better to remove the code once you have checked you do not need it.
The code might also have been commented out for debugging purposes. In this case it is vital that someone uncomments it again or your project may behave in very unexpected ways in production.
This check looks for comments that seem to be mostly valid code and reports them.