Completed
Push — v4.0 ( 989be1...ff495d )
by Masiukevich
02:21
created

EventStreamRepository::restoreStream()   A

Complexity

Conditions 3
Paths 3

Size

Total Lines 25
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 3.1406

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 3
eloc 7
c 1
b 0
f 0
nc 3
nop 3
dl 0
loc 25
ccs 6
cts 8
cp 0.75
crap 3.1406
rs 10
1
<?php
2
3
/**
4
 * Event Sourcing implementation.
5
 *
6
 * @author  Maksim Masiukevich <[email protected]>
7
 * @license MIT
8
 * @license https://opensource.org/licenses/MIT
9
 */
10
11
declare(strict_types = 1);
12
13
namespace ServiceBus\EventSourcing\EventStream;
14
15
use function Amp\call;
16
use function ServiceBus\Common\createWithoutConstructor;
17
use function ServiceBus\Common\invokeReflectionMethod;
18
use function ServiceBus\EventSourcing\EventStream\Store\streamToDomainRepresentation;
19
use function ServiceBus\EventSourcing\EventStream\Store\streamToStoredRepresentation;
20
use Amp\Promise;
21
use Psr\Log\LoggerInterface;
22
use Psr\Log\NullLogger;
23
use ServiceBus\EventSourcing\Aggregate;
24
use ServiceBus\EventSourcing\AggregateId;
25
use ServiceBus\EventSourcing\EventStream\Serializer\EventSerializer;
26
use ServiceBus\EventSourcing\EventStream\Store\EventStreamStore;
27
use ServiceBus\EventSourcing\EventStream\Store\StoredAggregateEventStream;
28
use ServiceBus\EventSourcing\Snapshots\Snapshot;
29
use ServiceBus\EventSourcing\Snapshots\Snapshotter;
30
31
/**
32
 * Repository for working with event streams.
33
 */
34
final class EventStreamRepository
35
{
36
    public const REVERT_MODE_SOFT_DELETE = 1;
37
38
    public const REVERT_MODE_DELETE = 2;
39
40
    private EventStreamStore $store;
41
42
    private EventSerializer $serializer;
43
44
    private Snapshotter $snapshotter;
45
46
    private LoggerInterface $logger;
47
48 8
    public function __construct(
49
        EventStreamStore $store,
50
        Snapshotter $snapshotter,
51
        EventSerializer $serializer,
52
        ?LoggerInterface $logger = null
53
    ) {
54 8
        $this->store       = $store;
55 8
        $this->snapshotter = $snapshotter;
56 8
        $this->serializer  = $serializer;
57 8
        $this->logger      = $logger ?? new NullLogger();
58 8
    }
59
60
    /**
61
     * Load aggregate.
62
     *
63
     * @psalm-suppress MixedTypeCoercion Incorrect resolving the value of the promise
64
     *
65
     * @throws \ServiceBus\Common\Exceptions\DateTimeException
66
     * @throws \ServiceBus\Common\Exceptions\ReflectionApiException
67
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
68
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
69
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
70
     *
71
     * @return Promise<\ServiceBus\EventSourcing\Aggregate|null>
72
     */
73 2
    public function load(AggregateId $id): Promise
74
    {
75 2
        $store      = $this->store;
76 2
        $serializer = $this->serializer;
77 2
        $snaphotter = $this->snapshotter;
78 2
        $logger     = $this->logger;
79
80 2
        return call(
81
            static function(AggregateId $id) use ($store, $serializer, $snaphotter, $logger): \Generator
82
            {
83 2
                $idValue = $id->toString();
84 2
                $idClass = \get_class($id);
85
86 2
                $logger->debug('Load aggregate with id "{aggregateIdClass}:{aggregateId}"', [
87 2
                    'aggregateIdClass' => $idClass,
88 2
                    'aggregateId'      => $idValue,
89
                ]);
90
91
                try
92
                {
93 2
                    $aggregate         = null;
94 2
                    $fromStreamVersion = Aggregate::START_PLAYHEAD_INDEX;
95
96
                    /** @var \ServiceBus\EventSourcing\Snapshots\Snapshot|null $loadedSnapshot */
97 2
                    $loadedSnapshot = yield $snaphotter->load($id);
98
99 2
                    if (null !== $loadedSnapshot)
100
                    {
101 1
                        $aggregate         = $loadedSnapshot->aggregate;
102 1
                        $fromStreamVersion = $aggregate->version() + 1;
103
104 1
                        $logger->debug(
105 1
                            'Found a snapshot of the state of the aggregate with the identifier "{aggregateIdClass}:{aggregateId}" on version "{aggregateVersion}"',
106
                            [
107 1
                                'aggregateIdClass' => $idClass,
108 1
                                'aggregateId'      => $idValue,
109 1
                                'aggregateVersion' => $aggregate->version(),
110
                            ]
111
                        );
112
                    }
113
114
                    /** @var \ServiceBus\EventSourcing\EventStream\Store\StoredAggregateEventStream|null $storedEventStream */
115 2
                    $storedEventStream = yield $store->load($id, $fromStreamVersion);
116
117 2
                    $aggregate = self::restoreStream($serializer, $aggregate, $storedEventStream);
118
119 1
                    return $aggregate;
120
                }
121 1
                catch (\Throwable $throwable)
122
                {
123 1
                    $logger->debug('Load aggregate with id "{aggregateIdClass}:{aggregateId}" failed', [
124 1
                        'aggregateIdClass' => $idClass,
125 1
                        'aggregateId'      => $idValue,
126 1
                        'throwableMessage' => $throwable->getMessage(),
127 1
                        'throwablePoint'   => \sprintf('%s:%d', $throwable->getFile(), $throwable->getLine()),
128
                    ]);
129
130 1
                    throw $throwable;
131
                }
132
                finally
133
                {
134
                    /** @psalm-suppress PossiblyUndefinedVariable */
135 2
                    unset($storedEventStream, $loadedSnapshot, $fromStreamVersion);
136
                }
137 2
            },
138 2
            $id
139
        );
140
    }
141
142
    /**
143
     * Save a new event stream.
144
     *
145
     * @psalm-suppress MixedTypeCoercion Incorrect resolving the value of the promise
146
     *
147
     * @throws \ServiceBus\Common\Exceptions\DateTimeException
148
     * @throws \ServiceBus\Common\Exceptions\ReflectionApiException
149
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
150
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
151
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
152
     * @throws \ServiceBus\Storage\Common\Exceptions\UniqueConstraintViolationCheckFailed
153
     *
154
     * @return Promise<array<int, object>>
155
     */
156 7
    public function save(Aggregate $aggregate): Promise
157
    {
158 7
        $store      = $this->store;
159 7
        $serializer = $this->serializer;
160 7
        $snaphotter = $this->snapshotter;
161 7
        $logger     = $this->logger;
162
163 7
        return call(
164
            static function(Aggregate $aggregate) use ($store, $serializer, $snaphotter, $logger): \Generator
165
            {
166 7
                $id = $aggregate->id();
167
168 7
                $idValue = $id->toString();
169 7
                $idClass = \get_class($id);
170
171 7
                $logger->debug('Save new aggregate with identifier "{aggregateIdClass}:{aggregateId}"', [
172 7
                    'aggregateIdClass' => $idClass,
173 7
                    'aggregateId'      => $idValue,
174
                ]);
175
176
                try
177
                {
178
                    /**
179
                     * @psalm-var array<int, object> $raisedEvents
180
                     *
181
                     * @var object[] $raisedEvents
182
                     */
183 7
                    $raisedEvents = yield from self::doStore($serializer, $store, $snaphotter, $aggregate, true);
184
185 7
                    return $raisedEvents;
186
                }
187 1
                catch (\Throwable $throwable)
188
                {
189 1
                    $logger->debug('Save new aggregate with identifier "{aggregateIdClass}:{aggregateId}" failed', [
190 1
                        'aggregateIdClass' => $idClass,
191 1
                        'aggregateId'      => $idValue,
192 1
                        'throwableMessage' => $throwable->getMessage(),
193 1
                        'throwablePoint'   => \sprintf('%s:%d', $throwable->getFile(), $throwable->getLine()),
194
                    ]);
195
196 1
                    throw $throwable;
197
                }
198 7
            },
199 7
            $aggregate
200
        );
201
    }
202
203
    /**
204
     * Update existent event stream (append events).
205
     *
206
     * @psalm-suppress MixedTypeCoercion Incorrect resolving the value of the promise
207
     *
208
     * @throws \ServiceBus\Common\Exceptions\DateTimeException
209
     * @throws \ServiceBus\Common\Exceptions\ReflectionApiException
210
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
211
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
212
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
213
     *
214
     * @return Promise<array<int, object>>
215
     */
216 4
    public function update(Aggregate $aggregate): Promise
217
    {
218 4
        $store      = $this->store;
219 4
        $serializer = $this->serializer;
220 4
        $snaphotter = $this->snapshotter;
221 4
        $logger     = $this->logger;
222
223 4
        return call(
224
            static function(Aggregate $aggregate) use ($store, $serializer, $snaphotter, $logger): \Generator
225
            {
226 4
                $id = $aggregate->id();
227
228 4
                $idValue = $id->toString();
229 4
                $idClass = \get_class($id);
230
231 4
                $logger->debug('Adding events to an existing stream with identifier "{aggregateIdClass}:{aggregateId}"', [
232 4
                    'aggregateIdClass' => $idClass,
233 4
                    'aggregateId'      => $idValue,
234
                ]);
235
236
                try
237
                {
238
                    /**
239
                     * @psalm-var array<int, object> $raisedEvents
240
                     *
241
                     * @var object[] $raisedEvents
242
                     */
243 4
                    $raisedEvents = yield from self::doStore($serializer, $store, $snaphotter, $aggregate, false);
244
245 4
                    return $raisedEvents;
246
                }
247
                catch (\Throwable $throwable)
248
                {
249
                    $logger->debug('Adding events to an existing stream with identifier "{aggregateIdClass}:{aggregateId}', [
250
                        'aggregateIdClass' => $idClass,
251
                        'aggregateId'      => $idValue,
252
                        'throwableMessage' => $throwable->getMessage(),
253
                        'throwablePoint'   => \sprintf('%s:%d', $throwable->getFile(), $throwable->getLine()),
254
                    ]);
255
256
                    throw $throwable;
257
                }
258 4
            },
259 4
            $aggregate
260
        );
261
    }
262
263
    /**
264
     * Revert aggregate to specified version.
265
     *
266
     * Mode options:
267
     *   - 1 (self::REVERT_MODE_SOFT_DELETE): Mark tail events as deleted (soft deletion). There may be version
268
     *   conflicts in some situations
269
     *   - 2 (self::REVERT_MODE_DELETE): Removes tail events from the database (the best option)
270
     *
271
     * @psalm-suppress MixedTypeCoercion Incorrect resolving the value of the promise
272
     *
273
     * @throws \ServiceBus\Common\Exceptions\DateTimeException
274
     * @throws \ServiceBus\Common\Exceptions\ReflectionApiException
275
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
276
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
277
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
278
     *
279
     * @return Promise<\ServiceBus\EventSourcing\Aggregate>
280
     */
281 4
    public function revert(Aggregate $aggregate, int $toVersion, int $mode = self::REVERT_MODE_SOFT_DELETE): Promise
282
    {
283 4
        $store      = $this->store;
284 4
        $serializer = $this->serializer;
285 4
        $snaphotter = $this->snapshotter;
286 4
        $logger     = $this->logger;
287
288 4
        return call(
289
            static function(Aggregate $aggregate, int $toVersion, int $mode) use ($store, $serializer, $snaphotter, $logger): \Generator
290
            {
291 4
                $id = $aggregate->id();
292
293 4
                $idValue = $id->toString();
294 4
                $idClass = \get_class($id);
295
296 4
                $logger->debug('Rollback of aggregate with identifier "{aggregateIdClass}:{aggregateId}" to version "{aggregateVersion}"', [
297 4
                    'aggregateIdClass' => $idClass,
298 4
                    'aggregateId'      => $idValue,
299 4
                    'aggregateVersion' => $toVersion,
300
                ]);
301
302
                try
303
                {
304 4
                    yield $store->revert($aggregate->id(), $toVersion, self::REVERT_MODE_DELETE === $mode);
305
306
                    /** @var StoredAggregateEventStream|null $storedEventStream */
307 3
                    $storedEventStream = yield $store->load($aggregate->id());
308
309
                    /** @var Aggregate $aggregate */
310 3
                    $aggregate = self::restoreStream($serializer, null, $storedEventStream);
311
312
                    yield $snaphotter->store(new Snapshot($aggregate, $aggregate->version()));
313
314
                    return $aggregate;
315
                }
316 4
                catch (\Throwable $throwable)
317
                {
318 4
                    $logger->debug('Error when rolling back the version of the aggregate with the identifier "{aggregateIdClass}:{aggregateId}', [
319 4
                        'aggregateIdClass' => $idClass,
320 4
                        'aggregateId'      => $idValue,
321 4
                        'throwableMessage' => $throwable->getMessage(),
322 4
                        'throwablePoint'   => \sprintf('%s:%d', $throwable->getFile(), $throwable->getLine()),
323
                    ]);
324
325 4
                    throw $throwable;
326
                }
327
                finally
328
                {
329
                    /** @psalm-suppress PossiblyUndefinedVariable */
330 4
                    unset($storedEventStream);
331
                }
332 4
            },
333 4
            $aggregate,
334 4
            $toVersion,
335 4
            $mode
336
        );
337
    }
338
339
    /**
340
     * @throws \ServiceBus\Common\Exceptions\DateTimeException
341
     * @throws \ServiceBus\Common\Exceptions\ReflectionApiException
342
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
343
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
344
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
345
     * @throws \ServiceBus\Storage\Common\Exceptions\UniqueConstraintViolationCheckFailed
346
     */
347 7
    private static function doStore(
348
        EventSerializer $eventSerializer,
349
        EventStreamStore $eventStreamStore,
350
        Snapshotter $snapshotter,
351
        Aggregate $aggregate,
352
        bool $isNew
353
    ): \Generator {
354
        /** @var \ServiceBus\EventSourcing\EventStream\AggregateEventStream $eventStream */
355 7
        $eventStream    = invokeReflectionMethod($aggregate, 'makeStream');
356 7
        $receivedEvents = $eventStream->originEvents;
357
358 7
        $storedEventStream = streamToStoredRepresentation($eventSerializer, $eventStream);
359
360
        /** @noinspection PhpUnnecessaryLocalVariableInspection */
361 7
        $promise = true === $isNew
362 7
            ? $eventStreamStore->save($storedEventStream)
363 7
            : $eventStreamStore->append($storedEventStream);
364
365 7
        yield $promise;
366
367
        /** @var \ServiceBus\EventSourcing\Snapshots\Snapshot|null $loadedSnapshot */
368 7
        $loadedSnapshot = yield $snapshotter->load($aggregate->id());
369
370 7
        if (true === $snapshotter->snapshotMustBeCreated($aggregate, $loadedSnapshot))
371
        {
372 7
            yield $snapshotter->store(new Snapshot($aggregate, $aggregate->version()));
373
        }
374
375 7
        unset($eventStream, $loadedSnapshot, $storedEventStream);
376
377 7
        return $receivedEvents;
378
    }
379
380
    /**
381
     * Restore the aggregate from the event stream/Add missing events to the aggregate from the snapshot.
382
     *
383
     * @throws \ServiceBus\Common\Exceptions\DateTimeException
384
     * @throws \ServiceBus\EventSourcing\EventStream\Serializer\Exceptions\SerializeEventFailed
385
     * @throws \ServiceBus\Common\Exceptions\ReflectionApiException
386
     */
387 5
    private static function restoreStream(
388
        EventSerializer $eventSerializer,
389
        ?Aggregate $aggregate,
390
        ?StoredAggregateEventStream $storedEventStream
391
    ): ?Aggregate {
392 5
        if (null === $storedEventStream)
393
        {
394
            return null;
395
        }
396
397 5
        $eventStream = streamToDomainRepresentation($eventSerializer, $storedEventStream);
398
399 1
        if (null === $aggregate)
400
        {
401
            /**
402
             * @noinspection CallableParameterUseCaseInTypeContextInspection
403
             *
404
             * @var Aggregate $aggregate
405
             */
406
            $aggregate = createWithoutConstructor($storedEventStream->aggregateClass);
407
        }
408
409 1
        invokeReflectionMethod($aggregate, 'appendStream', $eventStream);
410
411 1
        return $aggregate;
412
    }
413
}
414