Passed
Push — v3.3 ( 1e1106...84677f )
by Masiukevich
03:38 queued 10s
created

EventStreamRepository::load()   A

Complexity

Conditions 3
Paths 1

Size

Total Lines 66
Code Lines 36

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 28
CRAP Score 3.072

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 3
eloc 36
c 1
b 0
f 0
nc 1
nop 1
dl 0
loc 66
ccs 28
cts 35
cp 0.8
crap 3.072
rs 9.344

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
/**
4
 * Event Sourcing implementation.
5
 *
6
 * @author  Maksim Masiukevich <[email protected]>
7
 * @license MIT
8
 * @license https://opensource.org/licenses/MIT
9
 */
10
11
declare(strict_types = 1);
12
13
namespace ServiceBus\EventSourcing\EventStream;
14
15
use function Amp\call;
16
use function ServiceBus\Common\createWithoutConstructor;
17
use function ServiceBus\Common\invokeReflectionMethod;
18
use function ServiceBus\EventSourcing\EventStream\Store\streamToDomainRepresentation;
19
use function ServiceBus\EventSourcing\EventStream\Store\streamToStoredRepresentation;
20
use Amp\Promise;
21
use Psr\Log\LoggerInterface;
22
use Psr\Log\NullLogger;
23
use ServiceBus\EventSourcing\Aggregate;
24
use ServiceBus\EventSourcing\AggregateId;
25
use ServiceBus\EventSourcing\EventStream\Serializer\EventSerializer;
26
use ServiceBus\EventSourcing\EventStream\Store\EventStreamStore;
27
use ServiceBus\EventSourcing\EventStream\Store\StoredAggregateEventStream;
28
use ServiceBus\EventSourcing\Snapshots\Snapshot;
29
use ServiceBus\EventSourcing\Snapshots\Snapshotter;
30
31
/**
32
 * Repository for working with event streams.
33
 */
34
final class EventStreamRepository
35
{
36
    public const REVERT_MODE_SOFT_DELETE = 1;
37
38
    public const REVERT_MODE_DELETE = 2;
39
40
    /**
41
     * @var EventStreamStore
42
     */
43
    private $store;
44
45
    /**
46
     * @var EventSerializer
47
     */
48
    private $serializer;
49
50
    /**
51
     * @var Snapshotter
52
     */
53
    private $snapshotter;
54
55
    /**
56
     * @var LoggerInterface
57
     */
58
    private $logger;
59
60
    /**
61
     * @param EventStreamStore     $store
62
     * @param Snapshotter          $snapshotter
63
     * @param EventSerializer      $serializer
64
     * @param LoggerInterface|null $logger
65
     */
66 8
    public function __construct(
67
        EventStreamStore $store,
68
        Snapshotter $snapshotter,
69
        EventSerializer $serializer,
70
        ?LoggerInterface $logger = null
71
    ) {
72 8
        $this->store       = $store;
73 8
        $this->snapshotter = $snapshotter;
74 8
        $this->serializer  = $serializer;
75 8
        $this->logger      = $logger ?? new NullLogger();
76 8
    }
77
78
    /**
79
     * Load aggregate.
80
     *
81
     * @psalm-suppress MixedTypeCoercion Incorrect resolving the value of the promise
82
     *
83
     * @param AggregateId $id
84
     *
85
     * @throws \ServiceBus\Common\Exceptions\DateTimeException
86
     * @throws \ServiceBus\Common\Exceptions\ReflectionApiException
87
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
88
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
89
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
90
     *
91
     * @return Promise<\ServiceBus\EventSourcing\Aggregate|null>
92
     */
93 2
    public function load(AggregateId $id): Promise
94
    {
95 2
        $store      = $this->store;
96 2
        $serializer = $this->serializer;
97 2
        $snaphotter = $this->snapshotter;
98 2
        $logger     = $this->logger;
99
100
        /** @psalm-suppress InvalidArgument Incorrect psalm unpack parameters (...$args) */
101 2
        return call(
102
            static function(AggregateId $id) use ($store, $serializer, $snaphotter, $logger): \Generator
103
            {
104 2
                $idValue = $id->toString();
105 2
                $idClass = \get_class($id);
106
107 2
                $logger->debug('Load aggregate with id "{aggregateIdClass}:{aggregateId}"', [
108 2
                    'aggregateIdClass' => $idClass,
109 2
                    'aggregateId'      => $idValue,
110
                ]);
111
112
                try
113
                {
114 2
                    $aggregate         = null;
115 2
                    $fromStreamVersion = Aggregate::START_PLAYHEAD_INDEX;
116
117
                    /** @var \ServiceBus\EventSourcing\Snapshots\Snapshot|null $loadedSnapshot */
118 2
                    $loadedSnapshot = yield $snaphotter->load($id);
119
120 2
                    if (null !== $loadedSnapshot)
121
                    {
122 1
                        $aggregate         = $loadedSnapshot->aggregate;
123 1
                        $fromStreamVersion = $aggregate->version() + 1;
124
125 1
                        $logger->debug(
126 1
                            'Found a snapshot of the state of the aggregate with the identifier "{aggregateIdClass}:{aggregateId}" on version "{aggregateVersion}"',
127
                            [
128 1
                                'aggregateIdClass' => $idClass,
129 1
                                'aggregateId'      => $idValue,
130 1
                                'aggregateVersion' => $aggregate->version(),
131
                            ]
132
                        );
133
                    }
134
135
                    /** @var \ServiceBus\EventSourcing\EventStream\Store\StoredAggregateEventStream|null $storedEventStream */
136 2
                    $storedEventStream = yield $store->load($id, $fromStreamVersion);
137
138 2
                    $aggregate = self::restoreStream($serializer, $aggregate, $storedEventStream);
139
140 2
                    return $aggregate;
141
                }
142
                catch (\Throwable $throwable)
143
                {
144
                    $logger->debug('Load aggregate with id "{aggregateIdClass}:{aggregateId}" failed', [
145
                        'aggregateIdClass' => $idClass,
146
                        'aggregateId'      => $idValue,
147
                        'throwableMessage' => $throwable->getMessage(),
148
                        'throwablePoint'   => \sprintf('%s:%d', $throwable->getFile(), $throwable->getLine()),
149
                    ]);
150
151
                    throw $throwable;
152
                }
153
                finally
154
                {
155 2
                    unset($storedEventStream, $loadedSnapshot, $fromStreamVersion);
156
                }
157 2
            },
158 2
            $id
159
        );
160
    }
161
162
    /**
163
     * Save a new event stream.
164
     *
165
     * @psalm-suppress MixedTypeCoercion Incorrect resolving the value of the promise
166
     *
167
     * @param Aggregate $aggregate
168
     *
169
     * @throws \ServiceBus\Common\Exceptions\DateTimeException
170
     * @throws \ServiceBus\Common\Exceptions\ReflectionApiException
171
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
172
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
173
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
174
     * @throws \ServiceBus\Storage\Common\Exceptions\UniqueConstraintViolationCheckFailed
175
     *
176
     * @return Promise<array<int, object>>
177
     */
178 7
    public function save(Aggregate $aggregate): Promise
179
    {
180 7
        $store      = $this->store;
181 7
        $serializer = $this->serializer;
182 7
        $snaphotter = $this->snapshotter;
183 7
        $logger     = $this->logger;
184
185
        /** @psalm-suppress InvalidArgument Incorrect psalm unpack parameters (...$args) */
186 7
        return call(
187
            static function(Aggregate $aggregate) use ($store, $serializer, $snaphotter, $logger): \Generator
188
            {
189 7
                $id = $aggregate->id();
190
191 7
                $idValue = $id->toString();
192 7
                $idClass = \get_class($id);
193
194 7
                $logger->debug('Save new aggregate with identifier "{aggregateIdClass}:{aggregateId}"', [
195 7
                    'aggregateIdClass' => $idClass,
196 7
                    'aggregateId'      => $idValue,
197
                ]);
198
199
                try
200
                {
201
                    /**
202
                     * @psalm-var array<int, object> $raisedEvents
203
                     *
204
                     * @var object[] $raisedEvents
205
                     */
206 7
                    $raisedEvents = yield from self::doStore($serializer, $store, $snaphotter, $aggregate, true);
207
208 7
                    return $raisedEvents;
209
                }
210 1
                catch (\Throwable $throwable)
211
                {
212 1
                    $logger->debug('Save new aggregate with identifier "{aggregateIdClass}:{aggregateId}" failed', [
213 1
                        'aggregateIdClass' => $idClass,
214 1
                        'aggregateId'      => $idValue,
215 1
                        'throwableMessage' => $throwable->getMessage(),
216 1
                        'throwablePoint'   => \sprintf('%s:%d', $throwable->getFile(), $throwable->getLine()),
217
                    ]);
218
219 1
                    throw $throwable;
220
                }
221 7
            },
222 7
            $aggregate
223
        );
224
    }
225
226
    /**
227
     * Update existent event stream (append events).
228
     *
229
     * @psalm-suppress MixedTypeCoercion Incorrect resolving the value of the promise
230
     *
231
     * @param Aggregate $aggregate
232
     *
233
     * @throws \ServiceBus\Common\Exceptions\DateTimeException
234
     * @throws \ServiceBus\Common\Exceptions\ReflectionApiException
235
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
236
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
237
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
238
     *
239
     * @return Promise<array<int, object>>
240
     */
241 5
    public function update(Aggregate $aggregate): Promise
242
    {
243 5
        $store      = $this->store;
244 5
        $serializer = $this->serializer;
245 5
        $snaphotter = $this->snapshotter;
246 5
        $logger     = $this->logger;
247
248
        /** @psalm-suppress InvalidArgument Incorrect psalm unpack parameters (...$args) */
249 5
        return call(
250
            static function(Aggregate $aggregate) use ($store, $serializer, $snaphotter, $logger): \Generator
251
            {
252 5
                $id = $aggregate->id();
253
254 5
                $idValue = $id->toString();
255 5
                $idClass = \get_class($id);
256
257 5
                $logger->debug('Adding events to an existing stream with identifier "{aggregateIdClass}:{aggregateId}"', [
258 5
                    'aggregateIdClass' => $idClass,
259 5
                    'aggregateId'      => $idValue,
260
                ]);
261
262
                try
263
                {
264
                    /**
265
                     * @psalm-var array<int, object> $raisedEvents
266
                     *
267
                     * @var object[] $raisedEvents
268
                     */
269 5
                    $raisedEvents = yield from self::doStore($serializer, $store, $snaphotter, $aggregate, false);
270
271 5
                    return $raisedEvents;
272
                }
273
                catch (\Throwable $throwable)
274
                {
275
                    $logger->debug('Adding events to an existing stream with identifier "{aggregateIdClass}:{aggregateId}', [
276
                        'aggregateIdClass' => $idClass,
277
                        'aggregateId'      => $idValue,
278
                        'throwableMessage' => $throwable->getMessage(),
279
                        'throwablePoint'   => \sprintf('%s:%d', $throwable->getFile(), $throwable->getLine()),
280
                    ]);
281
282
                    throw $throwable;
283
                }
284 5
            },
285 5
            $aggregate
286
        );
287
    }
288
289
    /**
290
     * Revert aggregate to specified version.
291
     *
292
     * Mode options:
293
     *   - 1 (self::REVERT_MODE_SOFT_DELETE): Mark tail events as deleted (soft deletion). There may be version
294
     *   conflicts in some situations
295
     *   - 2 (self::REVERT_MODE_DELETE): Removes tail events from the database (the best option)
296
     *
297
     * @psalm-suppress MixedTypeCoercion Incorrect resolving the value of the promise
298
     *
299
     * @param Aggregate $aggregate
300
     * @param int       $toVersion
301
     * @param int       $mode
302
     *
303
     * @throws \ServiceBus\Common\Exceptions\DateTimeException
304
     * @throws \ServiceBus\Common\Exceptions\ReflectionApiException
305
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
306
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
307
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
308
     *
309
     * @return Promise<\ServiceBus\EventSourcing\Aggregate>
310
     */
311 4
    public function revert(Aggregate $aggregate, int $toVersion, int $mode = self::REVERT_MODE_SOFT_DELETE): Promise
312
    {
313 4
        $store      = $this->store;
314 4
        $serializer = $this->serializer;
315 4
        $snaphotter = $this->snapshotter;
316 4
        $logger     = $this->logger;
317
318
        /** @psalm-suppress InvalidArgument Incorrect psalm unpack parameters (...$args) */
319 4
        return call(
320
            static function(Aggregate $aggregate, int $toVersion, int $mode) use ($store, $serializer, $snaphotter, $logger): \Generator
321
            {
322 4
                $id = $aggregate->id();
323
324 4
                $idValue = $id->toString();
325 4
                $idClass = \get_class($id);
326
327 4
                $logger->debug('Rollback of aggregate with identifier "{aggregateIdClass}:{aggregateId}" to version "{aggregateVersion}"', [
328 4
                    'aggregateIdClass' => $idClass,
329 4
                    'aggregateId'      => $idValue,
330 4
                    'aggregateVersion' => $toVersion,
331
                ]);
332
333
                try
334
                {
335 4
                    yield $store->revert($aggregate->id(), $toVersion, self::REVERT_MODE_DELETE === $mode);
336
337
                    /** @var StoredAggregateEventStream|null $storedEventStream */
338 3
                    $storedEventStream = yield $store->load($aggregate->id());
339
340
                    /** @var Aggregate $aggregate */
341 3
                    $aggregate = self::restoreStream($serializer, null, $storedEventStream);
342
343 3
                    yield $snaphotter->store(Snapshot::create($aggregate, $aggregate->version()));
344
345 3
                    return $aggregate;
346
                }
347 2
                catch (\Throwable $throwable)
348
                {
349 2
                    $logger->debug('Error when rolling back the version of the aggregate with the identifier "{aggregateIdClass}:{aggregateId}', [
350 2
                        'aggregateIdClass' => $idClass,
351 2
                        'aggregateId'      => $idValue,
352 2
                        'throwableMessage' => $throwable->getMessage(),
353 2
                        'throwablePoint'   => \sprintf('%s:%d', $throwable->getFile(), $throwable->getLine()),
354
                    ]);
355
356 2
                    throw $throwable;
357
                }
358
                finally
359
                {
360 4
                    unset($storedEventStream);
361
                }
362 4
            },
363 4
            $aggregate,
364 4
            $toVersion,
365 4
            $mode
366
        );
367
    }
368
369
    /**
370
     * @param EventSerializer  $eventSerializer
371
     * @param EventStreamStore $eventStreamStore
372
     * @param Snapshotter      $snapshotter
373
     * @param Aggregate        $aggregate
374
     * @param bool             $isNew
375
     *
376
     * @throws \ServiceBus\Common\Exceptions\DateTimeException
377
     * @throws \ServiceBus\Common\Exceptions\ReflectionApiException
378
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
379
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
380
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
381
     * @throws \ServiceBus\Storage\Common\Exceptions\UniqueConstraintViolationCheckFailed
382
     *
383
     * @return \Generator
384
     */
385 7
    private static function doStore(
386
        EventSerializer $eventSerializer,
387
        EventStreamStore $eventStreamStore,
388
        Snapshotter $snapshotter,
389
        Aggregate $aggregate,
390
        bool $isNew
391
    ): \Generator {
392
        /** @var \ServiceBus\EventSourcing\EventStream\AggregateEventStream $eventStream */
393 7
        $eventStream    = invokeReflectionMethod($aggregate, 'makeStream');
394 7
        $receivedEvents = $eventStream->originEvents;
395
396 7
        $storedEventStream = streamToStoredRepresentation($eventSerializer, $eventStream);
397
398 7
        $promise = true === $isNew
399 7
            ? $eventStreamStore->save($storedEventStream)
400 7
            : $eventStreamStore->append($storedEventStream);
401
402 7
        yield $promise;
403
404
        /** @var \ServiceBus\EventSourcing\Snapshots\Snapshot|null $loadedSnapshot */
405 7
        $loadedSnapshot = yield $snapshotter->load($aggregate->id());
406
407 7
        if (true === $snapshotter->snapshotMustBeCreated($aggregate, $loadedSnapshot))
408
        {
409 7
            yield $snapshotter->store(Snapshot::create($aggregate, $aggregate->version()));
410
        }
411
412 7
        unset($eventStream, $loadedSnapshot, $storedEventStream);
413
414 7
        return $receivedEvents;
415
    }
416
417
    /**
418
     * Restore the aggregate from the event stream/Add missing events to the aggregate from the snapshot.
419
     *
420
     * @param EventSerializer                 $eventSerializer
421
     * @param Aggregate|null                  $aggregate
422
     * @param StoredAggregateEventStream|null $storedEventStream
423
     *
424
     * @throws \ServiceBus\Common\Exceptions\DateTimeException
425
     * @throws \ServiceBus\EventSourcing\EventStream\Serializer\Exceptions\SerializeEventFailed
426
     * @throws \ServiceBus\Common\Exceptions\ReflectionApiException
427
     *
428
     * @return Aggregate|null
429
     */
430 5
    private static function restoreStream(
431
        EventSerializer $eventSerializer,
432
        ?Aggregate $aggregate,
433
        ?StoredAggregateEventStream $storedEventStream
434
    ): ?Aggregate {
435 5
        if (null === $storedEventStream)
436
        {
437
            return null;
438
        }
439
440 5
        $eventStream = streamToDomainRepresentation($eventSerializer, $storedEventStream);
441
442 5
        if (null === $aggregate)
443
        {
444
            /**
445
             * @noinspection CallableParameterUseCaseInTypeContextInspection
446
             *
447
             * @var Aggregate $aggregate
448
             */
449 4
            $aggregate = createWithoutConstructor($storedEventStream->aggregateClass);
450
        }
451
452 5
        invokeReflectionMethod($aggregate, 'appendStream', $eventStream);
453
454 5
        return $aggregate;
455
    }
456
}
457