Passed
Push — v3.3 ( 1e1106...84677f )
by Masiukevich
03:38 queued 10s
created

EventStreamRepository   A

Complexity

Total Complexity 16

Size/Duplication

Total Lines 421
Duplicated Lines 0 %

Test Coverage

Coverage 89.13%

Importance

Changes 1
Bugs 0 Features 0
Metric Value
eloc 142
c 1
b 0
f 0
dl 0
loc 421
ccs 123
cts 138
cp 0.8913
rs 10
wmc 16

7 Methods

Rating   Name   Duplication   Size   Complexity  
A save() 0 45 2
A update() 0 45 2
A __construct() 0 10 1
A restoreStream() 0 25 3
A load() 0 66 3
A revert() 0 55 2
A doStore() 0 30 3
1
<?php
2
3
/**
4
 * Event Sourcing implementation.
5
 *
6
 * @author  Maksim Masiukevich <[email protected]>
7
 * @license MIT
8
 * @license https://opensource.org/licenses/MIT
9
 */
10
11
declare(strict_types = 1);
12
13
namespace ServiceBus\EventSourcing\EventStream;
14
15
use function Amp\call;
16
use function ServiceBus\Common\createWithoutConstructor;
17
use function ServiceBus\Common\invokeReflectionMethod;
18
use function ServiceBus\EventSourcing\EventStream\Store\streamToDomainRepresentation;
19
use function ServiceBus\EventSourcing\EventStream\Store\streamToStoredRepresentation;
20
use Amp\Promise;
21
use Psr\Log\LoggerInterface;
22
use Psr\Log\NullLogger;
23
use ServiceBus\EventSourcing\Aggregate;
24
use ServiceBus\EventSourcing\AggregateId;
25
use ServiceBus\EventSourcing\EventStream\Serializer\EventSerializer;
26
use ServiceBus\EventSourcing\EventStream\Store\EventStreamStore;
27
use ServiceBus\EventSourcing\EventStream\Store\StoredAggregateEventStream;
28
use ServiceBus\EventSourcing\Snapshots\Snapshot;
29
use ServiceBus\EventSourcing\Snapshots\Snapshotter;
30
31
/**
32
 * Repository for working with event streams.
33
 */
34
final class EventStreamRepository
35
{
36
    public const REVERT_MODE_SOFT_DELETE = 1;
37
38
    public const REVERT_MODE_DELETE = 2;
39
40
    /**
41
     * @var EventStreamStore
42
     */
43
    private $store;
44
45
    /**
46
     * @var EventSerializer
47
     */
48
    private $serializer;
49
50
    /**
51
     * @var Snapshotter
52
     */
53
    private $snapshotter;
54
55
    /**
56
     * @var LoggerInterface
57
     */
58
    private $logger;
59
60
    /**
61
     * @param EventStreamStore     $store
62
     * @param Snapshotter          $snapshotter
63
     * @param EventSerializer      $serializer
64
     * @param LoggerInterface|null $logger
65
     */
66 8
    public function __construct(
67
        EventStreamStore $store,
68
        Snapshotter $snapshotter,
69
        EventSerializer $serializer,
70
        ?LoggerInterface $logger = null
71
    ) {
72 8
        $this->store       = $store;
73 8
        $this->snapshotter = $snapshotter;
74 8
        $this->serializer  = $serializer;
75 8
        $this->logger      = $logger ?? new NullLogger();
76 8
    }
77
78
    /**
79
     * Load aggregate.
80
     *
81
     * @psalm-suppress MixedTypeCoercion Incorrect resolving the value of the promise
82
     *
83
     * @param AggregateId $id
84
     *
85
     * @throws \ServiceBus\Common\Exceptions\DateTimeException
86
     * @throws \ServiceBus\Common\Exceptions\ReflectionApiException
87
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
88
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
89
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
90
     *
91
     * @return Promise<\ServiceBus\EventSourcing\Aggregate|null>
92
     */
93 2
    public function load(AggregateId $id): Promise
94
    {
95 2
        $store      = $this->store;
96 2
        $serializer = $this->serializer;
97 2
        $snaphotter = $this->snapshotter;
98 2
        $logger     = $this->logger;
99
100
        /** @psalm-suppress InvalidArgument Incorrect psalm unpack parameters (...$args) */
101 2
        return call(
102
            static function(AggregateId $id) use ($store, $serializer, $snaphotter, $logger): \Generator
103
            {
104 2
                $idValue = $id->toString();
105 2
                $idClass = \get_class($id);
106
107 2
                $logger->debug('Load aggregate with id "{aggregateIdClass}:{aggregateId}"', [
108 2
                    'aggregateIdClass' => $idClass,
109 2
                    'aggregateId'      => $idValue,
110
                ]);
111
112
                try
113
                {
114 2
                    $aggregate         = null;
115 2
                    $fromStreamVersion = Aggregate::START_PLAYHEAD_INDEX;
116
117
                    /** @var \ServiceBus\EventSourcing\Snapshots\Snapshot|null $loadedSnapshot */
118 2
                    $loadedSnapshot = yield $snaphotter->load($id);
119
120 2
                    if (null !== $loadedSnapshot)
121
                    {
122 1
                        $aggregate         = $loadedSnapshot->aggregate;
123 1
                        $fromStreamVersion = $aggregate->version() + 1;
124
125 1
                        $logger->debug(
126 1
                            'Found a snapshot of the state of the aggregate with the identifier "{aggregateIdClass}:{aggregateId}" on version "{aggregateVersion}"',
127
                            [
128 1
                                'aggregateIdClass' => $idClass,
129 1
                                'aggregateId'      => $idValue,
130 1
                                'aggregateVersion' => $aggregate->version(),
131
                            ]
132
                        );
133
                    }
134
135
                    /** @var \ServiceBus\EventSourcing\EventStream\Store\StoredAggregateEventStream|null $storedEventStream */
136 2
                    $storedEventStream = yield $store->load($id, $fromStreamVersion);
137
138 2
                    $aggregate = self::restoreStream($serializer, $aggregate, $storedEventStream);
139
140 2
                    return $aggregate;
141
                }
142
                catch (\Throwable $throwable)
143
                {
144
                    $logger->debug('Load aggregate with id "{aggregateIdClass}:{aggregateId}" failed', [
145
                        'aggregateIdClass' => $idClass,
146
                        'aggregateId'      => $idValue,
147
                        'throwableMessage' => $throwable->getMessage(),
148
                        'throwablePoint'   => \sprintf('%s:%d', $throwable->getFile(), $throwable->getLine()),
149
                    ]);
150
151
                    throw $throwable;
152
                }
153
                finally
154
                {
155 2
                    unset($storedEventStream, $loadedSnapshot, $fromStreamVersion);
156
                }
157 2
            },
158 2
            $id
159
        );
160
    }
161
162
    /**
163
     * Save a new event stream.
164
     *
165
     * @psalm-suppress MixedTypeCoercion Incorrect resolving the value of the promise
166
     *
167
     * @param Aggregate $aggregate
168
     *
169
     * @throws \ServiceBus\Common\Exceptions\DateTimeException
170
     * @throws \ServiceBus\Common\Exceptions\ReflectionApiException
171
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
172
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
173
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
174
     * @throws \ServiceBus\Storage\Common\Exceptions\UniqueConstraintViolationCheckFailed
175
     *
176
     * @return Promise<array<int, object>>
177
     */
178 7
    public function save(Aggregate $aggregate): Promise
179
    {
180 7
        $store      = $this->store;
181 7
        $serializer = $this->serializer;
182 7
        $snaphotter = $this->snapshotter;
183 7
        $logger     = $this->logger;
184
185
        /** @psalm-suppress InvalidArgument Incorrect psalm unpack parameters (...$args) */
186 7
        return call(
187
            static function(Aggregate $aggregate) use ($store, $serializer, $snaphotter, $logger): \Generator
188
            {
189 7
                $id = $aggregate->id();
190
191 7
                $idValue = $id->toString();
192 7
                $idClass = \get_class($id);
193
194 7
                $logger->debug('Save new aggregate with identifier "{aggregateIdClass}:{aggregateId}"', [
195 7
                    'aggregateIdClass' => $idClass,
196 7
                    'aggregateId'      => $idValue,
197
                ]);
198
199
                try
200
                {
201
                    /**
202
                     * @psalm-var array<int, object> $raisedEvents
203
                     *
204
                     * @var object[] $raisedEvents
205
                     */
206 7
                    $raisedEvents = yield from self::doStore($serializer, $store, $snaphotter, $aggregate, true);
207
208 7
                    return $raisedEvents;
209
                }
210 1
                catch (\Throwable $throwable)
211
                {
212 1
                    $logger->debug('Save new aggregate with identifier "{aggregateIdClass}:{aggregateId}" failed', [
213 1
                        'aggregateIdClass' => $idClass,
214 1
                        'aggregateId'      => $idValue,
215 1
                        'throwableMessage' => $throwable->getMessage(),
216 1
                        'throwablePoint'   => \sprintf('%s:%d', $throwable->getFile(), $throwable->getLine()),
217
                    ]);
218
219 1
                    throw $throwable;
220
                }
221 7
            },
222 7
            $aggregate
223
        );
224
    }
225
226
    /**
227
     * Update existent event stream (append events).
228
     *
229
     * @psalm-suppress MixedTypeCoercion Incorrect resolving the value of the promise
230
     *
231
     * @param Aggregate $aggregate
232
     *
233
     * @throws \ServiceBus\Common\Exceptions\DateTimeException
234
     * @throws \ServiceBus\Common\Exceptions\ReflectionApiException
235
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
236
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
237
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
238
     *
239
     * @return Promise<array<int, object>>
240
     */
241 5
    public function update(Aggregate $aggregate): Promise
242
    {
243 5
        $store      = $this->store;
244 5
        $serializer = $this->serializer;
245 5
        $snaphotter = $this->snapshotter;
246 5
        $logger     = $this->logger;
247
248
        /** @psalm-suppress InvalidArgument Incorrect psalm unpack parameters (...$args) */
249 5
        return call(
250
            static function(Aggregate $aggregate) use ($store, $serializer, $snaphotter, $logger): \Generator
251
            {
252 5
                $id = $aggregate->id();
253
254 5
                $idValue = $id->toString();
255 5
                $idClass = \get_class($id);
256
257 5
                $logger->debug('Adding events to an existing stream with identifier "{aggregateIdClass}:{aggregateId}"', [
258 5
                    'aggregateIdClass' => $idClass,
259 5
                    'aggregateId'      => $idValue,
260
                ]);
261
262
                try
263
                {
264
                    /**
265
                     * @psalm-var array<int, object> $raisedEvents
266
                     *
267
                     * @var object[] $raisedEvents
268
                     */
269 5
                    $raisedEvents = yield from self::doStore($serializer, $store, $snaphotter, $aggregate, false);
270
271 5
                    return $raisedEvents;
272
                }
273
                catch (\Throwable $throwable)
274
                {
275
                    $logger->debug('Adding events to an existing stream with identifier "{aggregateIdClass}:{aggregateId}', [
276
                        'aggregateIdClass' => $idClass,
277
                        'aggregateId'      => $idValue,
278
                        'throwableMessage' => $throwable->getMessage(),
279
                        'throwablePoint'   => \sprintf('%s:%d', $throwable->getFile(), $throwable->getLine()),
280
                    ]);
281
282
                    throw $throwable;
283
                }
284 5
            },
285 5
            $aggregate
286
        );
287
    }
288
289
    /**
290
     * Revert aggregate to specified version.
291
     *
292
     * Mode options:
293
     *   - 1 (self::REVERT_MODE_SOFT_DELETE): Mark tail events as deleted (soft deletion). There may be version
294
     *   conflicts in some situations
295
     *   - 2 (self::REVERT_MODE_DELETE): Removes tail events from the database (the best option)
296
     *
297
     * @psalm-suppress MixedTypeCoercion Incorrect resolving the value of the promise
298
     *
299
     * @param Aggregate $aggregate
300
     * @param int       $toVersion
301
     * @param int       $mode
302
     *
303
     * @throws \ServiceBus\Common\Exceptions\DateTimeException
304
     * @throws \ServiceBus\Common\Exceptions\ReflectionApiException
305
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
306
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
307
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
308
     *
309
     * @return Promise<\ServiceBus\EventSourcing\Aggregate>
310
     */
311 4
    public function revert(Aggregate $aggregate, int $toVersion, int $mode = self::REVERT_MODE_SOFT_DELETE): Promise
312
    {
313 4
        $store      = $this->store;
314 4
        $serializer = $this->serializer;
315 4
        $snaphotter = $this->snapshotter;
316 4
        $logger     = $this->logger;
317
318
        /** @psalm-suppress InvalidArgument Incorrect psalm unpack parameters (...$args) */
319 4
        return call(
320
            static function(Aggregate $aggregate, int $toVersion, int $mode) use ($store, $serializer, $snaphotter, $logger): \Generator
321
            {
322 4
                $id = $aggregate->id();
323
324 4
                $idValue = $id->toString();
325 4
                $idClass = \get_class($id);
326
327 4
                $logger->debug('Rollback of aggregate with identifier "{aggregateIdClass}:{aggregateId}" to version "{aggregateVersion}"', [
328 4
                    'aggregateIdClass' => $idClass,
329 4
                    'aggregateId'      => $idValue,
330 4
                    'aggregateVersion' => $toVersion,
331
                ]);
332
333
                try
334
                {
335 4
                    yield $store->revert($aggregate->id(), $toVersion, self::REVERT_MODE_DELETE === $mode);
336
337
                    /** @var StoredAggregateEventStream|null $storedEventStream */
338 3
                    $storedEventStream = yield $store->load($aggregate->id());
339
340
                    /** @var Aggregate $aggregate */
341 3
                    $aggregate = self::restoreStream($serializer, null, $storedEventStream);
342
343 3
                    yield $snaphotter->store(Snapshot::create($aggregate, $aggregate->version()));
344
345 3
                    return $aggregate;
346
                }
347 2
                catch (\Throwable $throwable)
348
                {
349 2
                    $logger->debug('Error when rolling back the version of the aggregate with the identifier "{aggregateIdClass}:{aggregateId}', [
350 2
                        'aggregateIdClass' => $idClass,
351 2
                        'aggregateId'      => $idValue,
352 2
                        'throwableMessage' => $throwable->getMessage(),
353 2
                        'throwablePoint'   => \sprintf('%s:%d', $throwable->getFile(), $throwable->getLine()),
354
                    ]);
355
356 2
                    throw $throwable;
357
                }
358
                finally
359
                {
360 4
                    unset($storedEventStream);
361
                }
362 4
            },
363 4
            $aggregate,
364 4
            $toVersion,
365 4
            $mode
366
        );
367
    }
368
369
    /**
370
     * @param EventSerializer  $eventSerializer
371
     * @param EventStreamStore $eventStreamStore
372
     * @param Snapshotter      $snapshotter
373
     * @param Aggregate        $aggregate
374
     * @param bool             $isNew
375
     *
376
     * @throws \ServiceBus\Common\Exceptions\DateTimeException
377
     * @throws \ServiceBus\Common\Exceptions\ReflectionApiException
378
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
379
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
380
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
381
     * @throws \ServiceBus\Storage\Common\Exceptions\UniqueConstraintViolationCheckFailed
382
     *
383
     * @return \Generator
384
     */
385 7
    private static function doStore(
386
        EventSerializer $eventSerializer,
387
        EventStreamStore $eventStreamStore,
388
        Snapshotter $snapshotter,
389
        Aggregate $aggregate,
390
        bool $isNew
391
    ): \Generator {
392
        /** @var \ServiceBus\EventSourcing\EventStream\AggregateEventStream $eventStream */
393 7
        $eventStream    = invokeReflectionMethod($aggregate, 'makeStream');
394 7
        $receivedEvents = $eventStream->originEvents;
395
396 7
        $storedEventStream = streamToStoredRepresentation($eventSerializer, $eventStream);
397
398 7
        $promise = true === $isNew
399 7
            ? $eventStreamStore->save($storedEventStream)
400 7
            : $eventStreamStore->append($storedEventStream);
401
402 7
        yield $promise;
403
404
        /** @var \ServiceBus\EventSourcing\Snapshots\Snapshot|null $loadedSnapshot */
405 7
        $loadedSnapshot = yield $snapshotter->load($aggregate->id());
406
407 7
        if (true === $snapshotter->snapshotMustBeCreated($aggregate, $loadedSnapshot))
408
        {
409 7
            yield $snapshotter->store(Snapshot::create($aggregate, $aggregate->version()));
410
        }
411
412 7
        unset($eventStream, $loadedSnapshot, $storedEventStream);
413
414 7
        return $receivedEvents;
415
    }
416
417
    /**
418
     * Restore the aggregate from the event stream/Add missing events to the aggregate from the snapshot.
419
     *
420
     * @param EventSerializer                 $eventSerializer
421
     * @param Aggregate|null                  $aggregate
422
     * @param StoredAggregateEventStream|null $storedEventStream
423
     *
424
     * @throws \ServiceBus\Common\Exceptions\DateTimeException
425
     * @throws \ServiceBus\EventSourcing\EventStream\Serializer\Exceptions\SerializeEventFailed
426
     * @throws \ServiceBus\Common\Exceptions\ReflectionApiException
427
     *
428
     * @return Aggregate|null
429
     */
430 5
    private static function restoreStream(
431
        EventSerializer $eventSerializer,
432
        ?Aggregate $aggregate,
433
        ?StoredAggregateEventStream $storedEventStream
434
    ): ?Aggregate {
435 5
        if (null === $storedEventStream)
436
        {
437
            return null;
438
        }
439
440 5
        $eventStream = streamToDomainRepresentation($eventSerializer, $storedEventStream);
441
442 5
        if (null === $aggregate)
443
        {
444
            /**
445
             * @noinspection CallableParameterUseCaseInTypeContextInspection
446
             *
447
             * @var Aggregate $aggregate
448
             */
449 4
            $aggregate = createWithoutConstructor($storedEventStream->aggregateClass);
450
        }
451
452 5
        invokeReflectionMethod($aggregate, 'appendStream', $eventStream);
453
454 5
        return $aggregate;
455
    }
456
}
457