Completed
Push — v4.0 ( 989be1...ff495d )
by Masiukevich
02:21
created

EventStreamRepository::revert()   A

Complexity

Conditions 2
Paths 1

Size

Total Lines 55
Code Lines 31

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 28
CRAP Score 2.0011

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 2
eloc 31
c 1
b 0
f 0
nc 1
nop 3
dl 0
loc 55
ccs 28
cts 30
cp 0.9333
crap 2.0011
rs 9.424

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
/**
4
 * Event Sourcing implementation.
5
 *
6
 * @author  Maksim Masiukevich <[email protected]>
7
 * @license MIT
8
 * @license https://opensource.org/licenses/MIT
9
 */
10
11
declare(strict_types = 1);
12
13
namespace ServiceBus\EventSourcing\EventStream;
14
15
use function Amp\call;
16
use function ServiceBus\Common\createWithoutConstructor;
17
use function ServiceBus\Common\invokeReflectionMethod;
18
use function ServiceBus\EventSourcing\EventStream\Store\streamToDomainRepresentation;
19
use function ServiceBus\EventSourcing\EventStream\Store\streamToStoredRepresentation;
20
use Amp\Promise;
21
use Psr\Log\LoggerInterface;
22
use Psr\Log\NullLogger;
23
use ServiceBus\EventSourcing\Aggregate;
24
use ServiceBus\EventSourcing\AggregateId;
25
use ServiceBus\EventSourcing\EventStream\Serializer\EventSerializer;
26
use ServiceBus\EventSourcing\EventStream\Store\EventStreamStore;
27
use ServiceBus\EventSourcing\EventStream\Store\StoredAggregateEventStream;
28
use ServiceBus\EventSourcing\Snapshots\Snapshot;
29
use ServiceBus\EventSourcing\Snapshots\Snapshotter;
30
31
/**
32
 * Repository for working with event streams.
33
 */
34
final class EventStreamRepository
35
{
36
    public const REVERT_MODE_SOFT_DELETE = 1;
37
38
    public const REVERT_MODE_DELETE = 2;
39
40
    private EventStreamStore $store;
41
42
    private EventSerializer $serializer;
43
44
    private Snapshotter $snapshotter;
45
46
    private LoggerInterface $logger;
47
48 8
    public function __construct(
49
        EventStreamStore $store,
50
        Snapshotter $snapshotter,
51
        EventSerializer $serializer,
52
        ?LoggerInterface $logger = null
53
    ) {
54 8
        $this->store       = $store;
55 8
        $this->snapshotter = $snapshotter;
56 8
        $this->serializer  = $serializer;
57 8
        $this->logger      = $logger ?? new NullLogger();
58 8
    }
59
60
    /**
61
     * Load aggregate.
62
     *
63
     * @psalm-suppress MixedTypeCoercion Incorrect resolving the value of the promise
64
     *
65
     * @throws \ServiceBus\Common\Exceptions\DateTimeException
66
     * @throws \ServiceBus\Common\Exceptions\ReflectionApiException
67
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
68
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
69
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
70
     *
71
     * @return Promise<\ServiceBus\EventSourcing\Aggregate|null>
72
     */
73 2
    public function load(AggregateId $id): Promise
74
    {
75 2
        $store      = $this->store;
76 2
        $serializer = $this->serializer;
77 2
        $snaphotter = $this->snapshotter;
78 2
        $logger     = $this->logger;
79
80 2
        return call(
81
            static function(AggregateId $id) use ($store, $serializer, $snaphotter, $logger): \Generator
82
            {
83 2
                $idValue = $id->toString();
84 2
                $idClass = \get_class($id);
85
86 2
                $logger->debug('Load aggregate with id "{aggregateIdClass}:{aggregateId}"', [
87 2
                    'aggregateIdClass' => $idClass,
88 2
                    'aggregateId'      => $idValue,
89
                ]);
90
91
                try
92
                {
93 2
                    $aggregate         = null;
94 2
                    $fromStreamVersion = Aggregate::START_PLAYHEAD_INDEX;
95
96
                    /** @var \ServiceBus\EventSourcing\Snapshots\Snapshot|null $loadedSnapshot */
97 2
                    $loadedSnapshot = yield $snaphotter->load($id);
98
99 2
                    if (null !== $loadedSnapshot)
100
                    {
101 1
                        $aggregate         = $loadedSnapshot->aggregate;
102 1
                        $fromStreamVersion = $aggregate->version() + 1;
103
104 1
                        $logger->debug(
105 1
                            'Found a snapshot of the state of the aggregate with the identifier "{aggregateIdClass}:{aggregateId}" on version "{aggregateVersion}"',
106
                            [
107 1
                                'aggregateIdClass' => $idClass,
108 1
                                'aggregateId'      => $idValue,
109 1
                                'aggregateVersion' => $aggregate->version(),
110
                            ]
111
                        );
112
                    }
113
114
                    /** @var \ServiceBus\EventSourcing\EventStream\Store\StoredAggregateEventStream|null $storedEventStream */
115 2
                    $storedEventStream = yield $store->load($id, $fromStreamVersion);
116
117 2
                    $aggregate = self::restoreStream($serializer, $aggregate, $storedEventStream);
118
119 1
                    return $aggregate;
120
                }
121 1
                catch (\Throwable $throwable)
122
                {
123 1
                    $logger->debug('Load aggregate with id "{aggregateIdClass}:{aggregateId}" failed', [
124 1
                        'aggregateIdClass' => $idClass,
125 1
                        'aggregateId'      => $idValue,
126 1
                        'throwableMessage' => $throwable->getMessage(),
127 1
                        'throwablePoint'   => \sprintf('%s:%d', $throwable->getFile(), $throwable->getLine()),
128
                    ]);
129
130 1
                    throw $throwable;
131
                }
132
                finally
133
                {
134
                    /** @psalm-suppress PossiblyUndefinedVariable */
135 2
                    unset($storedEventStream, $loadedSnapshot, $fromStreamVersion);
136
                }
137 2
            },
138 2
            $id
139
        );
140
    }
141
142
    /**
143
     * Save a new event stream.
144
     *
145
     * @psalm-suppress MixedTypeCoercion Incorrect resolving the value of the promise
146
     *
147
     * @throws \ServiceBus\Common\Exceptions\DateTimeException
148
     * @throws \ServiceBus\Common\Exceptions\ReflectionApiException
149
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
150
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
151
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
152
     * @throws \ServiceBus\Storage\Common\Exceptions\UniqueConstraintViolationCheckFailed
153
     *
154
     * @return Promise<array<int, object>>
155
     */
156 7
    public function save(Aggregate $aggregate): Promise
157
    {
158 7
        $store      = $this->store;
159 7
        $serializer = $this->serializer;
160 7
        $snaphotter = $this->snapshotter;
161 7
        $logger     = $this->logger;
162
163 7
        return call(
164
            static function(Aggregate $aggregate) use ($store, $serializer, $snaphotter, $logger): \Generator
165
            {
166 7
                $id = $aggregate->id();
167
168 7
                $idValue = $id->toString();
169 7
                $idClass = \get_class($id);
170
171 7
                $logger->debug('Save new aggregate with identifier "{aggregateIdClass}:{aggregateId}"', [
172 7
                    'aggregateIdClass' => $idClass,
173 7
                    'aggregateId'      => $idValue,
174
                ]);
175
176
                try
177
                {
178
                    /**
179
                     * @psalm-var array<int, object> $raisedEvents
180
                     *
181
                     * @var object[] $raisedEvents
182
                     */
183 7
                    $raisedEvents = yield from self::doStore($serializer, $store, $snaphotter, $aggregate, true);
184
185 7
                    return $raisedEvents;
186
                }
187 1
                catch (\Throwable $throwable)
188
                {
189 1
                    $logger->debug('Save new aggregate with identifier "{aggregateIdClass}:{aggregateId}" failed', [
190 1
                        'aggregateIdClass' => $idClass,
191 1
                        'aggregateId'      => $idValue,
192 1
                        'throwableMessage' => $throwable->getMessage(),
193 1
                        'throwablePoint'   => \sprintf('%s:%d', $throwable->getFile(), $throwable->getLine()),
194
                    ]);
195
196 1
                    throw $throwable;
197
                }
198 7
            },
199 7
            $aggregate
200
        );
201
    }
202
203
    /**
204
     * Update existent event stream (append events).
205
     *
206
     * @psalm-suppress MixedTypeCoercion Incorrect resolving the value of the promise
207
     *
208
     * @throws \ServiceBus\Common\Exceptions\DateTimeException
209
     * @throws \ServiceBus\Common\Exceptions\ReflectionApiException
210
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
211
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
212
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
213
     *
214
     * @return Promise<array<int, object>>
215
     */
216 4
    public function update(Aggregate $aggregate): Promise
217
    {
218 4
        $store      = $this->store;
219 4
        $serializer = $this->serializer;
220 4
        $snaphotter = $this->snapshotter;
221 4
        $logger     = $this->logger;
222
223 4
        return call(
224
            static function(Aggregate $aggregate) use ($store, $serializer, $snaphotter, $logger): \Generator
225
            {
226 4
                $id = $aggregate->id();
227
228 4
                $idValue = $id->toString();
229 4
                $idClass = \get_class($id);
230
231 4
                $logger->debug('Adding events to an existing stream with identifier "{aggregateIdClass}:{aggregateId}"', [
232 4
                    'aggregateIdClass' => $idClass,
233 4
                    'aggregateId'      => $idValue,
234
                ]);
235
236
                try
237
                {
238
                    /**
239
                     * @psalm-var array<int, object> $raisedEvents
240
                     *
241
                     * @var object[] $raisedEvents
242
                     */
243 4
                    $raisedEvents = yield from self::doStore($serializer, $store, $snaphotter, $aggregate, false);
244
245 4
                    return $raisedEvents;
246
                }
247
                catch (\Throwable $throwable)
248
                {
249
                    $logger->debug('Adding events to an existing stream with identifier "{aggregateIdClass}:{aggregateId}', [
250
                        'aggregateIdClass' => $idClass,
251
                        'aggregateId'      => $idValue,
252
                        'throwableMessage' => $throwable->getMessage(),
253
                        'throwablePoint'   => \sprintf('%s:%d', $throwable->getFile(), $throwable->getLine()),
254
                    ]);
255
256
                    throw $throwable;
257
                }
258 4
            },
259 4
            $aggregate
260
        );
261
    }
262
263
    /**
264
     * Revert aggregate to specified version.
265
     *
266
     * Mode options:
267
     *   - 1 (self::REVERT_MODE_SOFT_DELETE): Mark tail events as deleted (soft deletion). There may be version
268
     *   conflicts in some situations
269
     *   - 2 (self::REVERT_MODE_DELETE): Removes tail events from the database (the best option)
270
     *
271
     * @psalm-suppress MixedTypeCoercion Incorrect resolving the value of the promise
272
     *
273
     * @throws \ServiceBus\Common\Exceptions\DateTimeException
274
     * @throws \ServiceBus\Common\Exceptions\ReflectionApiException
275
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
276
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
277
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
278
     *
279
     * @return Promise<\ServiceBus\EventSourcing\Aggregate>
280
     */
281 4
    public function revert(Aggregate $aggregate, int $toVersion, int $mode = self::REVERT_MODE_SOFT_DELETE): Promise
282
    {
283 4
        $store      = $this->store;
284 4
        $serializer = $this->serializer;
285 4
        $snaphotter = $this->snapshotter;
286 4
        $logger     = $this->logger;
287
288 4
        return call(
289
            static function(Aggregate $aggregate, int $toVersion, int $mode) use ($store, $serializer, $snaphotter, $logger): \Generator
290
            {
291 4
                $id = $aggregate->id();
292
293 4
                $idValue = $id->toString();
294 4
                $idClass = \get_class($id);
295
296 4
                $logger->debug('Rollback of aggregate with identifier "{aggregateIdClass}:{aggregateId}" to version "{aggregateVersion}"', [
297 4
                    'aggregateIdClass' => $idClass,
298 4
                    'aggregateId'      => $idValue,
299 4
                    'aggregateVersion' => $toVersion,
300
                ]);
301
302
                try
303
                {
304 4
                    yield $store->revert($aggregate->id(), $toVersion, self::REVERT_MODE_DELETE === $mode);
305
306
                    /** @var StoredAggregateEventStream|null $storedEventStream */
307 3
                    $storedEventStream = yield $store->load($aggregate->id());
308
309
                    /** @var Aggregate $aggregate */
310 3
                    $aggregate = self::restoreStream($serializer, null, $storedEventStream);
311
312
                    yield $snaphotter->store(new Snapshot($aggregate, $aggregate->version()));
313
314
                    return $aggregate;
315
                }
316 4
                catch (\Throwable $throwable)
317
                {
318 4
                    $logger->debug('Error when rolling back the version of the aggregate with the identifier "{aggregateIdClass}:{aggregateId}', [
319 4
                        'aggregateIdClass' => $idClass,
320 4
                        'aggregateId'      => $idValue,
321 4
                        'throwableMessage' => $throwable->getMessage(),
322 4
                        'throwablePoint'   => \sprintf('%s:%d', $throwable->getFile(), $throwable->getLine()),
323
                    ]);
324
325 4
                    throw $throwable;
326
                }
327
                finally
328
                {
329
                    /** @psalm-suppress PossiblyUndefinedVariable */
330 4
                    unset($storedEventStream);
331
                }
332 4
            },
333 4
            $aggregate,
334 4
            $toVersion,
335 4
            $mode
336
        );
337
    }
338
339
    /**
340
     * @throws \ServiceBus\Common\Exceptions\DateTimeException
341
     * @throws \ServiceBus\Common\Exceptions\ReflectionApiException
342
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
343
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
344
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
345
     * @throws \ServiceBus\Storage\Common\Exceptions\UniqueConstraintViolationCheckFailed
346
     */
347 7
    private static function doStore(
348
        EventSerializer $eventSerializer,
349
        EventStreamStore $eventStreamStore,
350
        Snapshotter $snapshotter,
351
        Aggregate $aggregate,
352
        bool $isNew
353
    ): \Generator {
354
        /** @var \ServiceBus\EventSourcing\EventStream\AggregateEventStream $eventStream */
355 7
        $eventStream    = invokeReflectionMethod($aggregate, 'makeStream');
356 7
        $receivedEvents = $eventStream->originEvents;
357
358 7
        $storedEventStream = streamToStoredRepresentation($eventSerializer, $eventStream);
359
360
        /** @noinspection PhpUnnecessaryLocalVariableInspection */
361 7
        $promise = true === $isNew
362 7
            ? $eventStreamStore->save($storedEventStream)
363 7
            : $eventStreamStore->append($storedEventStream);
364
365 7
        yield $promise;
366
367
        /** @var \ServiceBus\EventSourcing\Snapshots\Snapshot|null $loadedSnapshot */
368 7
        $loadedSnapshot = yield $snapshotter->load($aggregate->id());
369
370 7
        if (true === $snapshotter->snapshotMustBeCreated($aggregate, $loadedSnapshot))
371
        {
372 7
            yield $snapshotter->store(new Snapshot($aggregate, $aggregate->version()));
373
        }
374
375 7
        unset($eventStream, $loadedSnapshot, $storedEventStream);
376
377 7
        return $receivedEvents;
378
    }
379
380
    /**
381
     * Restore the aggregate from the event stream/Add missing events to the aggregate from the snapshot.
382
     *
383
     * @throws \ServiceBus\Common\Exceptions\DateTimeException
384
     * @throws \ServiceBus\EventSourcing\EventStream\Serializer\Exceptions\SerializeEventFailed
385
     * @throws \ServiceBus\Common\Exceptions\ReflectionApiException
386
     */
387 5
    private static function restoreStream(
388
        EventSerializer $eventSerializer,
389
        ?Aggregate $aggregate,
390
        ?StoredAggregateEventStream $storedEventStream
391
    ): ?Aggregate {
392 5
        if (null === $storedEventStream)
393
        {
394
            return null;
395
        }
396
397 5
        $eventStream = streamToDomainRepresentation($eventSerializer, $storedEventStream);
398
399 1
        if (null === $aggregate)
400
        {
401
            /**
402
             * @noinspection CallableParameterUseCaseInTypeContextInspection
403
             *
404
             * @var Aggregate $aggregate
405
             */
406
            $aggregate = createWithoutConstructor($storedEventStream->aggregateClass);
407
        }
408
409 1
        invokeReflectionMethod($aggregate, 'appendStream', $eventStream);
410
411 1
        return $aggregate;
412
    }
413
}
414