EventStore::writeToStream()   B
last analyzed

Complexity

Conditions 3
Paths 4

Size

Total Lines 24
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 14
CRAP Score 3

Importance

Changes 12
Bugs 2 Features 4
Metric Value
c 12
b 2
f 4
dl 0
loc 24
ccs 14
cts 14
cp 1
rs 8.9713
cc 3
eloc 13
nc 4
nop 3
crap 3
1
<?php
2
namespace EventStore;
3
4
use EventStore\Exception\ConnectionFailedException;
5
use EventStore\Exception\StreamDeletedException;
6
use EventStore\Exception\StreamNotFoundException;
7
use EventStore\Exception\UnauthorizedException;
8
use EventStore\Exception\WrongExpectedVersionException;
9
use EventStore\Http\Exception\ClientException;
10
use EventStore\Http\Exception\RequestException;
11
use EventStore\Http\GuzzleHttpClient;
12
use EventStore\Http\HttpClientInterface;
13
use EventStore\Http\ResponseCode;
14
use EventStore\StreamFeed\EntryEmbedMode;
15
use EventStore\StreamFeed\Event;
16
use EventStore\StreamFeed\LinkRelation;
17
use EventStore\StreamFeed\StreamFeed;
18
use EventStore\StreamFeed\StreamFeedIterator;
19
use GuzzleHttp\Psr7\Request;
20
use GuzzleHttp\Psr7\Uri;
21
use Psr\Http\Message\RequestInterface;
22
use Psr\Http\Message\ResponseInterface;
23
24
/**
25
 * Class EventStore
26
 * @package EventStore
27
 */
28
final class EventStore implements EventStoreInterface
29
{
30
    /**
31
     * @var string
32
     */
33
    private $url;
34
35
    /**
36
     * @var Client
37
     */
38
    private $httpClient;
39
40
    /**
41
     * @var ResponseInterface
42
     */
43
    private $lastResponse;
44
45
    /**
46
     * @var array
47
     */
48
    private $badCodeHandlers = [];
49
50
    /**
51
     * @param string $url Endpoint of the EventStore HTTP API
52
     * @param HttpClientInterface the http client
53
     */
54 26
    public function __construct($url, HttpClientInterface $httpClient = null)
55
    {
56 26
        $this->url = $url;
57
58 26
        $this->httpClient = $httpClient ?: new GuzzleHttpClient();
0 ignored issues
show
Documentation Bug introduced by
It seems like $httpClient ?: new \Even...Http\GuzzleHttpClient() of type object<EventStore\Http\HttpClientInterface> is incompatible with the declared type object<EventStore\Client> of property $httpClient.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
59 26
        $this->checkConnection();
60 26
        $this->initBadCodeHandlers();
61 26
    }
62
63
    /**
64
     * Delete a stream
65
     *
66
     * @param string         $streamName Name of the stream
67
     * @param StreamDeletion $mode       Deletion mode (soft or hard)
68
     */
69 4
    public function deleteStream($streamName, StreamDeletion $mode)
70
    {
71 4
        $request = new Request('DELETE', $this->getStreamUrl($streamName));
72
73 4
        if ($mode == StreamDeletion::HARD) {
74 3
            $request = $request->withHeader('ES-HardDelete', 'true');
75
        }
76
77 4
        $this->sendRequest($request);
78 4
    }
79
80
    /**
81
     * Get the response from the last HTTP call to the EventStore API
82
     *
83
     * @return ResponseInterface
84
     */
85 23
    public function getLastResponse()
86
    {
87 23
        return $this->lastResponse;
88
    }
89
90
    /**
91
     * Navigate stream feed through link relations
92
     *
93
     * @param  StreamFeed      $streamFeed The stream feed to navigate through
94
     * @param  LinkRelation    $relation   The "direction" expressed as link relation
95
     * @return null|StreamFeed
96
     */
97 8
    public function navigateStreamFeed(StreamFeed $streamFeed, LinkRelation $relation)
98
    {
99 8
        $url = $streamFeed->getLinkUrl($relation);
100
101 8
        if (empty($url)) {
102 3
            return null;
103
        }
104
105 7
        return $this->readStreamFeed($url, $streamFeed->getEntryEmbedMode());
106
    }
107
108
    /**
109
     * Open a stream feed for read and navigation
110
     *
111
     * @param  string         $streamName The stream name
112
     * @param  EntryEmbedMode $embedMode  The event entries embed mode (none, rich or body)
113
     * @return StreamFeed
114
     */
115 18
    public function openStreamFeed($streamName, EntryEmbedMode $embedMode = null)
116
    {
117 18
        $url = $this->getStreamUrl($streamName);
118
119 18
        return $this->readStreamFeed($url, $embedMode);
120
    }
121
122
    /**
123
     * Read a single event
124
     *
125
     * @param  string $eventUrl The url of the event
126
     * @return Event
127
     */
128 3
    public function readEvent($eventUrl)
129
    {
130 3
        $request = $this->getJsonRequest($eventUrl);
131 3
        $this->sendRequest($request);
132
133 3
        $this->ensureStatusCodeIsGood($eventUrl);
134
135 2
        $jsonResponse = $this->lastResponseAsJson();
136
137 2
        return $this->createEventFromResponseContent($jsonResponse['content']);
138
    }
139
140
    /**
141
     * Read a single event
142
     *
143
     * @param  string $eventUrl The url of the event
0 ignored issues
show
Documentation introduced by
There is no parameter named $eventUrl. Did you maybe mean $eventUrls?

This check looks for PHPDoc comments describing methods or function parameters that do not exist on the corresponding method or function. It has, however, found a similar but not annotated parameter which might be a good fit.

Consider the following example. The parameter $ireland is not defined by the method finale(...).

/**
 * @param array $germany
 * @param array $ireland
 */
function finale($germany, $island) {
    return "2:1";
}

The most likely cause is that the parameter was changed, but the annotation was not.

Loading history...
144
     * @return Event
145
     */
146 7
    public function readEventBatch(array $eventUrls)
147
    {
148 7
        $requests = array_map(
149
            function ($eventUrl) {
150 7
                return $this->getJsonRequest($eventUrl);
151 7
            },
152
            $eventUrls
153
        );
154
155 7
        $responses = $this->httpClient->sendRequestBatch($requests);
156
157 7
        return array_map(
158
            function ($response) {
159
                return $this
160 7
                    ->createEventFromResponseContent(
161 7
                        json_decode($response->getBody(), true)['content']
162
                    )
163
                ;
164 7
            },
165
            $responses
166
        );
167
    }
168
169
    /**
170
     * @param  array $content
171
     * @return Event
172
     */
173 9
    protected function createEventFromResponseContent(array $content)
174
    {
175 9
        $type = $content['eventType'];
176 9
        $version = (integer) $content['eventNumber'];
177 9
        $data = $content['data'];
178 9
        $metadata = (!empty($content['metadata'])) ? $content['metadata'] : null;
179
180 9
        return new Event($type, $version, $data, $metadata);
181
    }
182
183
    /**
184
     * Write one or more events to a stream
185
     *
186
     * @param  string                                  $streamName      The stream name
187
     * @param  WritableToStream                        $events          Single event or a collection of events
188
     * @param  int                                     $expectedVersion The expected version of the stream
189
     * @throws Exception\WrongExpectedVersionException
190
     */
191 22
    public function writeToStream($streamName, WritableToStream $events, $expectedVersion = ExpectedVersion::ANY)
192
    {
193 22
        if ($events instanceof WritableEvent) {
194 5
            $events = new WritableEventCollection([$events]);
195
        }
196
197 22
        $request = new Request(
198 22
            'POST',
199 22
            $this->getStreamUrl($streamName),
200
            [
201 22
                'ES-ExpectedVersion' => intval($expectedVersion),
202 22
                'Content-Type' => 'application/vnd.eventstore.events+json',
203
            ],
204 22
            json_encode($events->toStreamData())
205
        );
206
207 22
        $this->sendRequest($request);
208
209 22
        $responseStatusCode = $this->getLastResponse()->getStatusCode();
210
211 22
        if (ResponseCode::HTTP_BAD_REQUEST == $responseStatusCode) {
212 1
            throw new WrongExpectedVersionException();
213
        }
214 22
    }
215
216
    /**
217
     * @param  string             $streamName
218
     * @return StreamFeedIterator
219
     */
220 1
    public function forwardStreamFeedIterator($streamName)
221
    {
222 1
        return StreamFeedIterator::forward($this, $streamName);
223
    }
224
225
    /**
226
     * @param  string             $streamName
227
     * @return StreamFeedIterator
228
     */
229 1
    public function backwardStreamFeedIterator($streamName)
230
    {
231 1
        return StreamFeedIterator::backward($this, $streamName);
232
    }
233
234
    /**
235
     * @throws Exception\ConnectionFailedException
236
     */
237 26
    private function checkConnection()
238
    {
239
        try {
240 26
            $request = new Request('GET', $this->url);
241 26
            $this->sendRequest($request);
242 1
        } catch (RequestException $e) {
243 1
            throw new ConnectionFailedException($e->getMessage());
244
        }
245 26
    }
246
247
    /**
248
     * @param  string $streamName
249
     * @return string
250
     */
251 24
    private function getStreamUrl($streamName)
252
    {
253 24
        return sprintf('%s/streams/%s', $this->url, $streamName);
254
    }
255
256
    /**
257
     * @param  string                            $streamUrl
258
     * @param  EntryEmbedMode                    $embedMode
259
     * @return StreamFeed
260
     * @throws Exception\StreamDeletedException
261
     * @throws Exception\StreamNotFoundException
262
     */
263 18
    private function readStreamFeed($streamUrl, EntryEmbedMode $embedMode = null)
264
    {
265 18
        $request = $this->getJsonRequest($streamUrl);
266
267 18
        if ($embedMode != null && $embedMode != EntryEmbedMode::NONE()) {
268 1
            $uri = Uri::withQueryValue(
269 1
                $request->getUri(),
270 1
                'embed',
271 1
                $embedMode->toNative()
272
            );
273
274 1
            $request = $request->withUri($uri);
275
        }
276
277 18
        $this->sendRequest($request);
278
279 18
        $this->ensureStatusCodeIsGood($streamUrl);
280
281 15
        return new StreamFeed($this->lastResponseAsJson(), $embedMode);
282
    }
283
284
    /**
285
     * @param  string                                       $uri
286
     * @return \GuzzleHttp\Message\Request|RequestInterface
287
     */
288 18
    private function getJsonRequest($uri)
289
    {
290 18
        return new Request(
291 18
            'GET',
292
            $uri,
293
            [
294
                'Accept' => 'application/vnd.eventstore.atom+json'
295 18
            ]
296
        );
297
    }
298
299
    /**
300
     * @param RequestInterface $request
301
     */
302 26
    private function sendRequest(RequestInterface $request)
303
    {
304
        try {
305 26
            $this->lastResponse = $this->httpClient->send($request);
306 1
        } catch (ClientException $e) {
307
            $this->lastResponse = $e->getResponse();
308
        }
309 26
    }
310
311
    /**
312
     * @param  string                            $streamUrl
313
     * @throws Exception\StreamDeletedException
314
     * @throws Exception\StreamNotFoundException
315
     * @throws Exception\UnauthorizedException
316
     */
317 18
    private function ensureStatusCodeIsGood($streamUrl)
318
    {
319 18
        $code = $this->lastResponse->getStatusCode();
320
321 18
        if (array_key_exists($code, $this->badCodeHandlers)) {
322 4
            $this->badCodeHandlers[$code]($streamUrl);
323
        }
324 15
    }
325
326 26
    private function initBadCodeHandlers()
327
    {
328 26
        $this->badCodeHandlers = [
329
            ResponseCode::HTTP_NOT_FOUND => function ($streamUrl) {
330 1
                throw new StreamNotFoundException(
331
                        sprintf(
332 1
                            'No stream found at %s',
333
                            $streamUrl
334
                        )
335
                    );
336 26
            },
337
338
            ResponseCode::HTTP_GONE => function ($streamUrl) {
339 2
                throw new StreamDeletedException(
340
                        sprintf(
341 2
                            'Stream at %s has been permanently deleted',
342
                            $streamUrl
343
                        )
344
                    );
345 26
            },
346
347 26
            ResponseCode::HTTP_UNAUTHORIZED => function ($streamUrl) {
348 1
                throw new UnauthorizedException(
349
                        sprintf(
350 1
                            'Tried to open stream %s got 401',
351
                            $streamUrl
352
                        )
353
                    );
354 26
            }
355
        ];
356 26
    }
357
358 15
    private function lastResponseAsJson()
359
    {
360 15
        return json_decode($this->lastResponse->getBody(), true);
361
    }
362
}
363