EventStreamRepository::restoreStream()   A
last analyzed

Complexity

Conditions 3
Paths 3

Size

Total Lines 22
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 7
CRAP Score 3.0175

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 3
eloc 7
c 1
b 0
f 0
nc 3
nop 2
dl 0
loc 22
ccs 7
cts 8
cp 0.875
crap 3.0175
rs 10
1
<?php
2
3
/**
4
 * Event Sourcing implementation.
5
 *
6
 * @author  Maksim Masiukevich <[email protected]>
7
 * @license MIT
8
 * @license https://opensource.org/licenses/MIT
9
 */
10
11
declare(strict_types = 1);
12
13
namespace ServiceBus\EventSourcing\EventStream;
14
15
use function Amp\call;
16
use function ServiceBus\Common\createWithoutConstructor;
17
use function ServiceBus\Common\invokeReflectionMethod;
18
use function ServiceBus\Common\throwableMessage;
19
use function ServiceBus\EventSourcing\EventStream\Store\streamToDomainRepresentation;
20
use function ServiceBus\EventSourcing\EventStream\Store\streamToStoredRepresentation;
21
use Amp\Promise;
22
use Psr\Log\LoggerInterface;
23
use Psr\Log\NullLogger;
24
use ServiceBus\EventSourcing\Aggregate;
25
use ServiceBus\EventSourcing\AggregateId;
26
use ServiceBus\EventSourcing\EventStream\Serializer\EventSerializer;
27
use ServiceBus\EventSourcing\EventStream\Store\EventStreamStore;
28
use ServiceBus\EventSourcing\EventStream\Store\StoredAggregateEventStream;
29
use ServiceBus\EventSourcing\Snapshots\Snapshot;
30
use ServiceBus\EventSourcing\Snapshots\Snapshotter;
31
32
/**
33
 * Repository for working with event streams.
34
 */
35
final class EventStreamRepository
36
{
37
    public const REVERT_MODE_SOFT_DELETE = 1;
38
39
    public const REVERT_MODE_DELETE = 2;
40
41
    /** @var EventStreamStore */
42
    private $store;
43
44
    /** @var EventSerializer */
45
    private $serializer;
46
47
    /** @var Snapshotter */
48
    private $snapshotter;
49
50
    /** @var LoggerInterface */
51
    private $logger;
52
53 14
    public function __construct(
54
        EventStreamStore $store,
55
        Snapshotter $snapshotter,
56
        EventSerializer $serializer,
57
        ?LoggerInterface $logger = null
58
    ) {
59 14
        $this->store       = $store;
60 14
        $this->snapshotter = $snapshotter;
61 14
        $this->serializer  = $serializer;
62 14
        $this->logger      = $logger ?? new NullLogger();
63 14
    }
64
65
    /**
66
     * Load aggregate.
67
     *
68
     * Returns \ServiceBus\EventSourcing\Aggregate|null
69
     *
70
     * @throws \ServiceBus\Common\Exceptions\DateTimeException
71
     * @throws \ServiceBus\Common\Exceptions\ReflectionApiException
72
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
73
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
74
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
75
     */
76 2
    public function load(AggregateId $id): Promise
77
    {
78 2
        return call(
79
            function() use ($id): \Generator
80
            {
81 2
                $idValue = $id->toString();
82 2
                $idClass = \get_class($id);
83
84 2
                $this->logger->debug('Load aggregate with id "{aggregateIdClass}:{aggregateId}"', [
85 2
                    'aggregateIdClass' => $idClass,
86 2
                    'aggregateId'      => $idValue,
87
                ]);
88
89
                try
90
                {
91 2
                    $aggregate         = null;
92 2
                    $fromStreamVersion = Aggregate::START_PLAYHEAD_INDEX;
93
94
                    /** @var \ServiceBus\EventSourcing\Snapshots\Snapshot|null $loadedSnapshot */
95 2
                    $loadedSnapshot = yield $this->snapshotter->load($id);
96
97 2
                    if (null !== $loadedSnapshot)
98
                    {
99 1
                        $aggregate         = $loadedSnapshot->aggregate;
100 1
                        $fromStreamVersion = $aggregate->version() + 1;
101
102 1
                        $this->logger->debug(
103 1
                            'Found a snapshot of the state of the aggregate with the identifier "{aggregateIdClass}:{aggregateId}" on version "{aggregateVersion}"',
104
                            [
105 1
                                'aggregateIdClass' => $idClass,
106 1
                                'aggregateId'      => $idValue,
107 1
                                'aggregateVersion' => $aggregate->version(),
108
                            ]
109
                        );
110
                    }
111
112
                    /** @var \ServiceBus\EventSourcing\EventStream\Store\StoredAggregateEventStream|null $storedEventStream */
113 2
                    $storedEventStream = yield $this->store->load($id, $fromStreamVersion);
114
115 2
                    $aggregate = $this->restoreStream($aggregate, $storedEventStream);
116
117 2
                    return $aggregate;
118
                }
119
                catch (\Throwable $throwable)
120
                {
121
                    $this->logger->debug('Load aggregate with id "{aggregateIdClass}:{aggregateId}" failed', [
122
                        'aggregateIdClass' => $idClass,
123
                        'aggregateId'      => $idValue,
124
                        'throwableMessage' => throwableMessage($throwable),
125
                        'throwablePoint'   => \sprintf('%s:%d', $throwable->getFile(), $throwable->getLine()),
126
                    ]);
127
128
                    throw $throwable;
129
                }
130
                finally
131
                {
132
                    /** @psalm-suppress PossiblyUndefinedVariable */
133 2
                    unset($storedEventStream, $loadedSnapshot, $fromStreamVersion);
134
                }
135 2
            }
136
        );
137
    }
138
139
    /**
140
     * Save a new event stream.
141
     *
142
     * Returns array<int, object>
143
     *
144
     * @throws \ServiceBus\Common\Exceptions\DateTimeException
145
     * @throws \ServiceBus\Common\Exceptions\ReflectionApiException
146
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
147
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
148
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
149
     * @throws \ServiceBus\Storage\Common\Exceptions\UniqueConstraintViolationCheckFailed
150
     */
151 7
    public function save(Aggregate $aggregate): Promise
152
    {
153 7
        return call(
154
            function() use ($aggregate): \Generator
155
            {
156 7
                $id = $aggregate->id();
157
158 7
                $idValue = $id->toString();
159 7
                $idClass = \get_class($id);
160
161 7
                $this->logger->debug('Save new aggregate with identifier "{aggregateIdClass}:{aggregateId}"', [
162 7
                    'aggregateIdClass' => $idClass,
163 7
                    'aggregateId'      => $idValue,
164
                ]);
165
166
                try
167
                {
168
                    /**
169
                     * @psalm-var array<int, object> $raisedEvents
170
                     *
171
                     * @var object[] $raisedEvents
172
                     */
173 7
                    $raisedEvents = yield from $this->doStore($aggregate, true);
174
175 7
                    return $raisedEvents;
176
                }
177 1
                catch (\Throwable $throwable)
178
                {
179 1
                    $this->logger->debug('Save new aggregate with identifier "{aggregateIdClass}:{aggregateId}" failed', [
180 1
                        'aggregateIdClass' => $idClass,
181 1
                        'aggregateId'      => $idValue,
182 1
                        'throwableMessage' => throwableMessage($throwable),
183 1
                        'throwablePoint'   => \sprintf('%s:%d', $throwable->getFile(), $throwable->getLine()),
184
                    ]);
185
186 1
                    throw $throwable;
187
                }
188 7
            }
189
        );
190
    }
191
192
    /**
193
     * Update existent event stream (append events).
194
     *
195
     * Returns array<int, object>
196
     *
197
     * @throws \ServiceBus\Common\Exceptions\DateTimeException
198
     * @throws \ServiceBus\Common\Exceptions\ReflectionApiException
199
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
200
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
201
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
202
     */
203 5
    public function update(Aggregate $aggregate): Promise
204
    {
205 5
        return call(
206
            function() use ($aggregate): \Generator
207
            {
208 5
                $id = $aggregate->id();
209
210 5
                $idValue = $id->toString();
211 5
                $idClass = \get_class($id);
212
213 5
                $this->logger->debug('Adding events to an existing stream with identifier "{aggregateIdClass}:{aggregateId}"', [
214 5
                    'aggregateIdClass' => $idClass,
215 5
                    'aggregateId'      => $idValue,
216
                ]);
217
218
                try
219
                {
220
                    /**
221
                     * @psalm-var array<int, object> $raisedEvents
222
                     *
223
                     * @var object[] $raisedEvents
224
                     */
225 5
                    $raisedEvents = yield from $this->doStore($aggregate, false);
226
227 5
                    return $raisedEvents;
228
                }
229
                catch (\Throwable $throwable)
230
                {
231
                    $this->logger->debug('Adding events to an existing stream with identifier "{aggregateIdClass}:{aggregateId}', [
232
                        'aggregateIdClass' => $idClass,
233
                        'aggregateId'      => $idValue,
234
                        'throwableMessage' => throwableMessage($throwable),
235
                        'throwablePoint'   => \sprintf('%s:%d', $throwable->getFile(), $throwable->getLine()),
236
                    ]);
237
238
                    throw $throwable;
239
                }
240 5
            }
241
        );
242
    }
243
244
    /**
245
     * Revert aggregate to specified version.
246
     *
247
     * Returns \ServiceBus\EventSourcing\Aggregate
248
     *
249
     * Mode options:
250
     *   - 1 (self::REVERT_MODE_SOFT_DELETE): Mark tail events as deleted (soft deletion). There may be version
251
     *   conflicts in some situations
252
     *   - 2 (self::REVERT_MODE_DELETE): Removes tail events from the database (the best option)
253
     *
254
     * @throws \ServiceBus\Common\Exceptions\DateTimeException
255
     * @throws \ServiceBus\Common\Exceptions\ReflectionApiException
256
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
257
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
258
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
259
     */
260 4
    public function revert(Aggregate $aggregate, int $toVersion, int $mode = self::REVERT_MODE_SOFT_DELETE): Promise
261
    {
262 4
        return call(
263
            function() use ($aggregate, $toVersion, $mode): \Generator
264
            {
265 4
                $id = $aggregate->id();
266
267 4
                $idValue = $id->toString();
268 4
                $idClass = \get_class($id);
269
270 4
                $this->logger->debug('Rollback of aggregate with identifier "{aggregateIdClass}:{aggregateId}" to version "{aggregateVersion}"', [
271 4
                    'aggregateIdClass' => $idClass,
272 4
                    'aggregateId'      => $idValue,
273 4
                    'aggregateVersion' => $toVersion,
274
                ]);
275
276
                try
277
                {
278 4
                    yield $this->store->revert($aggregate->id(), $toVersion, self::REVERT_MODE_DELETE === $mode);
279
280
                    /** @var StoredAggregateEventStream|null $storedEventStream */
281 3
                    $storedEventStream = yield $this->store->load($aggregate->id());
282
283
                    /** @var Aggregate $aggregate */
284 3
                    $aggregate = $this->restoreStream(null, $storedEventStream);
285
286 3
                    yield $this->snapshotter->store(new Snapshot($aggregate, $aggregate->version()));
287
288 3
                    return $aggregate;
289
                }
290 2
                catch (\Throwable $throwable)
291
                {
292 2
                    $this->logger->debug('Error when rolling back the version of the aggregate with the identifier "{aggregateIdClass}:{aggregateId}', [
293 2
                        'aggregateIdClass' => $idClass,
294 2
                        'aggregateId'      => $idValue,
295 2
                        'throwableMessage' => throwableMessage($throwable),
296 2
                        'throwablePoint'   => \sprintf('%s:%d', $throwable->getFile(), $throwable->getLine()),
297
                    ]);
298
299 2
                    throw $throwable;
300
                }
301
                finally
302
                {
303
                    /** @psalm-suppress PossiblyUndefinedVariable */
304 4
                    unset($storedEventStream);
305
                }
306 4
            }
307
        );
308
    }
309
310
    /**
311
     * @throws \ServiceBus\Common\Exceptions\DateTimeException
312
     * @throws \ServiceBus\Common\Exceptions\ReflectionApiException
313
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
314
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
315
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
316
     * @throws \ServiceBus\Storage\Common\Exceptions\UniqueConstraintViolationCheckFailed
317
     */
318 7
    private function doStore(Aggregate $aggregate, bool $isNew): \Generator
319
    {
320
        /** @var \ServiceBus\EventSourcing\EventStream\AggregateEventStream $eventStream */
321 7
        $eventStream    = invokeReflectionMethod($aggregate, 'makeStream');
322 7
        $receivedEvents = $eventStream->originEvents;
323
324 7
        $storedEventStream = streamToStoredRepresentation($this->serializer, $eventStream);
325
326
        /** @noinspection PhpUnnecessaryLocalVariableInspection */
327 7
        $promise = $isNew
328 7
            ? $this->store->save($storedEventStream)
329 7
            : $this->store->append($storedEventStream);
330
331 7
        yield $promise;
332
333
        /** @var \ServiceBus\EventSourcing\Snapshots\Snapshot|null $loadedSnapshot */
334 7
        $loadedSnapshot = yield $this->snapshotter->load($aggregate->id());
335
336 7
        if ($this->snapshotter->snapshotMustBeCreated($aggregate, $loadedSnapshot))
337
        {
338 7
            yield $this->snapshotter->store(new Snapshot($aggregate, $aggregate->version()));
339
        }
340
341 7
        unset($eventStream, $loadedSnapshot, $storedEventStream);
342
343 7
        return $receivedEvents;
344
    }
345
346
    /**
347
     * Restore the aggregate from the event stream/Add missing events to the aggregate from the snapshot.
348
     *
349
     * @throws \ServiceBus\Common\Exceptions\DateTimeException
350
     * @throws \ServiceBus\EventSourcing\EventStream\Serializer\Exceptions\SerializeEventFailed
351
     * @throws \ServiceBus\Common\Exceptions\ReflectionApiException
352
     */
353 5
    private function restoreStream(?Aggregate $aggregate, ?StoredAggregateEventStream $storedEventStream): ?Aggregate
354
    {
355 5
        if (null === $storedEventStream)
356
        {
357
            return null;
358
        }
359
360 5
        $eventStream = streamToDomainRepresentation($this->serializer, $storedEventStream);
361
362 5
        if (null === $aggregate)
363
        {
364
            /**
365
             * @noinspection CallableParameterUseCaseInTypeContextInspection
366
             *
367
             * @var Aggregate $aggregate
368
             */
369 4
            $aggregate = createWithoutConstructor($storedEventStream->aggregateClass);
370
        }
371
372 5
        invokeReflectionMethod($aggregate, 'appendStream', $eventStream);
373
374 5
        return $aggregate;
375
    }
376
}
377