Completed
Push — v4.0 ( ff495d...e0bacf )
by Masiukevich
01:44
created

EventStreamRepository::load()   A

Complexity

Conditions 3
Paths 1

Size

Total Lines 66
Code Lines 36

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 28
CRAP Score 3.072

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 3
eloc 36
c 1
b 0
f 0
nc 1
nop 1
dl 0
loc 66
ccs 28
cts 35
cp 0.8
crap 3.072
rs 9.344

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
/**
4
 * Event Sourcing implementation.
5
 *
6
 * @author  Maksim Masiukevich <[email protected]>
7
 * @license MIT
8
 * @license https://opensource.org/licenses/MIT
9
 */
10
11
declare(strict_types = 1);
12
13
namespace ServiceBus\EventSourcing\EventStream;
14
15
use function Amp\call;
16
use function ServiceBus\Common\createWithoutConstructor;
17
use function ServiceBus\Common\invokeReflectionMethod;
18
use function ServiceBus\EventSourcing\EventStream\Store\streamToDomainRepresentation;
19
use function ServiceBus\EventSourcing\EventStream\Store\streamToStoredRepresentation;
20
use Amp\Promise;
21
use Psr\Log\LoggerInterface;
22
use Psr\Log\NullLogger;
23
use ServiceBus\EventSourcing\Aggregate;
24
use ServiceBus\EventSourcing\AggregateId;
25
use ServiceBus\EventSourcing\EventStream\Serializer\EventSerializer;
26
use ServiceBus\EventSourcing\EventStream\Store\EventStreamStore;
27
use ServiceBus\EventSourcing\EventStream\Store\StoredAggregateEventStream;
28
use ServiceBus\EventSourcing\Snapshots\Snapshot;
29
use ServiceBus\EventSourcing\Snapshots\Snapshotter;
30
31
/**
32
 * Repository for working with event streams.
33
 */
34
final class EventStreamRepository
35
{
36
    public const REVERT_MODE_SOFT_DELETE = 1;
37
38
    public const REVERT_MODE_DELETE = 2;
39
40
    private EventStreamStore $store;
41
42
    private EventSerializer $serializer;
43
44
    private Snapshotter $snapshotter;
45
46
    private LoggerInterface $logger;
47
48 8
    public function __construct(
49
        EventStreamStore $store,
50
        Snapshotter $snapshotter,
51
        EventSerializer $serializer,
52
        ?LoggerInterface $logger = null
53
    ) {
54 8
        $this->store       = $store;
55 8
        $this->snapshotter = $snapshotter;
56 8
        $this->serializer  = $serializer;
57 8
        $this->logger      = $logger ?? new NullLogger();
58 8
    }
59
60
    /**
61
     * Load aggregate.
62
     *
63
     * Returns \ServiceBus\EventSourcing\Aggregate|null
64
     *
65
     * @throws \ServiceBus\Common\Exceptions\DateTimeException
66
     * @throws \ServiceBus\Common\Exceptions\ReflectionApiException
67
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
68
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
69
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
70
     */
71 2
    public function load(AggregateId $id): Promise
72
    {
73 2
        $store      = $this->store;
74 2
        $serializer = $this->serializer;
75 2
        $snaphotter = $this->snapshotter;
76 2
        $logger     = $this->logger;
77
78 2
        return call(
79
            static function(AggregateId $id) use ($store, $serializer, $snaphotter, $logger): \Generator
80
            {
81 2
                $idValue = $id->toString();
82 2
                $idClass = \get_class($id);
83
84 2
                $logger->debug('Load aggregate with id "{aggregateIdClass}:{aggregateId}"', [
85 2
                    'aggregateIdClass' => $idClass,
86 2
                    'aggregateId'      => $idValue,
87
                ]);
88
89
                try
90
                {
91 2
                    $aggregate         = null;
92 2
                    $fromStreamVersion = Aggregate::START_PLAYHEAD_INDEX;
93
94
                    /** @var \ServiceBus\EventSourcing\Snapshots\Snapshot|null $loadedSnapshot */
95 2
                    $loadedSnapshot = yield $snaphotter->load($id);
96
97 2
                    if (null !== $loadedSnapshot)
98
                    {
99 1
                        $aggregate         = $loadedSnapshot->aggregate;
100 1
                        $fromStreamVersion = $aggregate->version() + 1;
101
102 1
                        $logger->debug(
103 1
                            'Found a snapshot of the state of the aggregate with the identifier "{aggregateIdClass}:{aggregateId}" on version "{aggregateVersion}"',
104
                            [
105 1
                                'aggregateIdClass' => $idClass,
106 1
                                'aggregateId'      => $idValue,
107 1
                                'aggregateVersion' => $aggregate->version(),
108
                            ]
109
                        );
110
                    }
111
112
                    /** @var \ServiceBus\EventSourcing\EventStream\Store\StoredAggregateEventStream|null $storedEventStream */
113 2
                    $storedEventStream = yield $store->load($id, $fromStreamVersion);
114
115 2
                    $aggregate = self::restoreStream($serializer, $aggregate, $storedEventStream);
116
117 2
                    return $aggregate;
118
                }
119
                catch (\Throwable $throwable)
120
                {
121
                    $logger->debug('Load aggregate with id "{aggregateIdClass}:{aggregateId}" failed', [
122
                        'aggregateIdClass' => $idClass,
123
                        'aggregateId'      => $idValue,
124
                        'throwableMessage' => $throwable->getMessage(),
125
                        'throwablePoint'   => \sprintf('%s:%d', $throwable->getFile(), $throwable->getLine()),
126
                    ]);
127
128
                    throw $throwable;
129
                }
130
                finally
131
                {
132
                    /** @psalm-suppress PossiblyUndefinedVariable */
133 2
                    unset($storedEventStream, $loadedSnapshot, $fromStreamVersion);
134
                }
135 2
            },
136 2
            $id
137
        );
138
    }
139
140
    /**
141
     * Save a new event stream.
142
     *
143
     * Returns array<int, object>
144
     *
145
     * @throws \ServiceBus\Common\Exceptions\DateTimeException
146
     * @throws \ServiceBus\Common\Exceptions\ReflectionApiException
147
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
148
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
149
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
150
     * @throws \ServiceBus\Storage\Common\Exceptions\UniqueConstraintViolationCheckFailed
151
     */
152 7
    public function save(Aggregate $aggregate): Promise
153
    {
154 7
        $store      = $this->store;
155 7
        $serializer = $this->serializer;
156 7
        $snaphotter = $this->snapshotter;
157 7
        $logger     = $this->logger;
158
159 7
        return call(
160
            static function(Aggregate $aggregate) use ($store, $serializer, $snaphotter, $logger): \Generator
161
            {
162 7
                $id = $aggregate->id();
163
164 7
                $idValue = $id->toString();
165 7
                $idClass = \get_class($id);
166
167 7
                $logger->debug('Save new aggregate with identifier "{aggregateIdClass}:{aggregateId}"', [
168 7
                    'aggregateIdClass' => $idClass,
169 7
                    'aggregateId'      => $idValue,
170
                ]);
171
172
                try
173
                {
174
                    /**
175
                     * @psalm-var array<int, object> $raisedEvents
176
                     *
177
                     * @var object[] $raisedEvents
178
                     */
179 7
                    $raisedEvents = yield from self::doStore($serializer, $store, $snaphotter, $aggregate, true);
180
181 7
                    return $raisedEvents;
182
                }
183 1
                catch (\Throwable $throwable)
184
                {
185 1
                    $logger->debug('Save new aggregate with identifier "{aggregateIdClass}:{aggregateId}" failed', [
186 1
                        'aggregateIdClass' => $idClass,
187 1
                        'aggregateId'      => $idValue,
188 1
                        'throwableMessage' => $throwable->getMessage(),
189 1
                        'throwablePoint'   => \sprintf('%s:%d', $throwable->getFile(), $throwable->getLine()),
190
                    ]);
191
192 1
                    throw $throwable;
193
                }
194 7
            },
195 7
            $aggregate
196
        );
197
    }
198
199
    /**
200
     * Update existent event stream (append events).
201
     *
202
     * Returns array<int, object>
203
     *
204
     * @throws \ServiceBus\Common\Exceptions\DateTimeException
205
     * @throws \ServiceBus\Common\Exceptions\ReflectionApiException
206
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
207
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
208
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
209
     */
210 5
    public function update(Aggregate $aggregate): Promise
211
    {
212 5
        $store      = $this->store;
213 5
        $serializer = $this->serializer;
214 5
        $snaphotter = $this->snapshotter;
215 5
        $logger     = $this->logger;
216
217 5
        return call(
218
            static function(Aggregate $aggregate) use ($store, $serializer, $snaphotter, $logger): \Generator
219
            {
220 5
                $id = $aggregate->id();
221
222 5
                $idValue = $id->toString();
223 5
                $idClass = \get_class($id);
224
225 5
                $logger->debug('Adding events to an existing stream with identifier "{aggregateIdClass}:{aggregateId}"', [
226 5
                    'aggregateIdClass' => $idClass,
227 5
                    'aggregateId'      => $idValue,
228
                ]);
229
230
                try
231
                {
232
                    /**
233
                     * @psalm-var array<int, object> $raisedEvents
234
                     *
235
                     * @var object[] $raisedEvents
236
                     */
237 5
                    $raisedEvents = yield from self::doStore($serializer, $store, $snaphotter, $aggregate, false);
238
239 5
                    return $raisedEvents;
240
                }
241
                catch (\Throwable $throwable)
242
                {
243
                    $logger->debug('Adding events to an existing stream with identifier "{aggregateIdClass}:{aggregateId}', [
244
                        'aggregateIdClass' => $idClass,
245
                        'aggregateId'      => $idValue,
246
                        'throwableMessage' => $throwable->getMessage(),
247
                        'throwablePoint'   => \sprintf('%s:%d', $throwable->getFile(), $throwable->getLine()),
248
                    ]);
249
250
                    throw $throwable;
251
                }
252 5
            },
253 5
            $aggregate
254
        );
255
    }
256
257
    /**
258
     * Revert aggregate to specified version.
259
     *
260
     * Returns \ServiceBus\EventSourcing\Aggregate
261
     *
262
     * Mode options:
263
     *   - 1 (self::REVERT_MODE_SOFT_DELETE): Mark tail events as deleted (soft deletion). There may be version
264
     *   conflicts in some situations
265
     *   - 2 (self::REVERT_MODE_DELETE): Removes tail events from the database (the best option)
266
     *
267
     * @throws \ServiceBus\Common\Exceptions\DateTimeException
268
     * @throws \ServiceBus\Common\Exceptions\ReflectionApiException
269
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
270
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
271
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
272
     */
273 4
    public function revert(Aggregate $aggregate, int $toVersion, int $mode = self::REVERT_MODE_SOFT_DELETE): Promise
274
    {
275 4
        $store      = $this->store;
276 4
        $serializer = $this->serializer;
277 4
        $snaphotter = $this->snapshotter;
278 4
        $logger     = $this->logger;
279
280 4
        return call(
281
            static function(Aggregate $aggregate, int $toVersion, int $mode) use ($store, $serializer, $snaphotter, $logger): \Generator
282
            {
283 4
                $id = $aggregate->id();
284
285 4
                $idValue = $id->toString();
286 4
                $idClass = \get_class($id);
287
288 4
                $logger->debug('Rollback of aggregate with identifier "{aggregateIdClass}:{aggregateId}" to version "{aggregateVersion}"', [
289 4
                    'aggregateIdClass' => $idClass,
290 4
                    'aggregateId'      => $idValue,
291 4
                    'aggregateVersion' => $toVersion,
292
                ]);
293
294
                try
295
                {
296 4
                    yield $store->revert($aggregate->id(), $toVersion, self::REVERT_MODE_DELETE === $mode);
297
298
                    /** @var StoredAggregateEventStream|null $storedEventStream */
299 3
                    $storedEventStream = yield $store->load($aggregate->id());
300
301
                    /** @var Aggregate $aggregate */
302 3
                    $aggregate = self::restoreStream($serializer, null, $storedEventStream);
303
304 3
                    yield $snaphotter->store(new Snapshot($aggregate, $aggregate->version()));
305
306 3
                    return $aggregate;
307
                }
308 2
                catch (\Throwable $throwable)
309
                {
310 2
                    $logger->debug('Error when rolling back the version of the aggregate with the identifier "{aggregateIdClass}:{aggregateId}', [
311 2
                        'aggregateIdClass' => $idClass,
312 2
                        'aggregateId'      => $idValue,
313 2
                        'throwableMessage' => $throwable->getMessage(),
314 2
                        'throwablePoint'   => \sprintf('%s:%d', $throwable->getFile(), $throwable->getLine()),
315
                    ]);
316
317 2
                    throw $throwable;
318
                }
319
                finally
320
                {
321
                    /** @psalm-suppress PossiblyUndefinedVariable */
322 4
                    unset($storedEventStream);
323
                }
324 4
            },
325 4
            $aggregate,
326 4
            $toVersion,
327 4
            $mode
328
        );
329
    }
330
331
    /**
332
     * @throws \ServiceBus\Common\Exceptions\DateTimeException
333
     * @throws \ServiceBus\Common\Exceptions\ReflectionApiException
334
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
335
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
336
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
337
     * @throws \ServiceBus\Storage\Common\Exceptions\UniqueConstraintViolationCheckFailed
338
     */
339 7
    private static function doStore(
340
        EventSerializer $eventSerializer,
341
        EventStreamStore $eventStreamStore,
342
        Snapshotter $snapshotter,
343
        Aggregate $aggregate,
344
        bool $isNew
345
    ): \Generator {
346
        /** @var \ServiceBus\EventSourcing\EventStream\AggregateEventStream $eventStream */
347 7
        $eventStream    = invokeReflectionMethod($aggregate, 'makeStream');
348 7
        $receivedEvents = $eventStream->originEvents;
349
350 7
        $storedEventStream = streamToStoredRepresentation($eventSerializer, $eventStream);
351
352
        /** @noinspection PhpUnnecessaryLocalVariableInspection */
353 7
        $promise = true === $isNew
354 7
            ? $eventStreamStore->save($storedEventStream)
355 7
            : $eventStreamStore->append($storedEventStream);
356
357 7
        yield $promise;
358
359
        /** @var \ServiceBus\EventSourcing\Snapshots\Snapshot|null $loadedSnapshot */
360 7
        $loadedSnapshot = yield $snapshotter->load($aggregate->id());
361
362 7
        if (true === $snapshotter->snapshotMustBeCreated($aggregate, $loadedSnapshot))
363
        {
364 7
            yield $snapshotter->store(new Snapshot($aggregate, $aggregate->version()));
365
        }
366
367 7
        unset($eventStream, $loadedSnapshot, $storedEventStream);
368
369 7
        return $receivedEvents;
370
    }
371
372
    /**
373
     * Restore the aggregate from the event stream/Add missing events to the aggregate from the snapshot.
374
     *
375
     * @throws \ServiceBus\Common\Exceptions\DateTimeException
376
     * @throws \ServiceBus\EventSourcing\EventStream\Serializer\Exceptions\SerializeEventFailed
377
     * @throws \ServiceBus\Common\Exceptions\ReflectionApiException
378
     */
379 5
    private static function restoreStream(
380
        EventSerializer $eventSerializer,
381
        ?Aggregate $aggregate,
382
        ?StoredAggregateEventStream $storedEventStream
383
    ): ?Aggregate {
384 5
        if (null === $storedEventStream)
385
        {
386
            return null;
387
        }
388
389 5
        $eventStream = streamToDomainRepresentation($eventSerializer, $storedEventStream);
390
391 5
        if (null === $aggregate)
392
        {
393
            /**
394
             * @noinspection CallableParameterUseCaseInTypeContextInspection
395
             *
396
             * @var Aggregate $aggregate
397
             */
398 4
            $aggregate = createWithoutConstructor($storedEventStream->aggregateClass);
399
        }
400
401 5
        invokeReflectionMethod($aggregate, 'appendStream', $eventStream);
402
403 5
        return $aggregate;
404
    }
405
}
406