Passed
Push — v4.2 ( 50993c )
by Masiukevich
07:39
created

EventStreamRepository   A

Complexity

Total Complexity 16

Size/Duplication

Total Lines 340
Duplicated Lines 0 %

Test Coverage

Coverage 87.07%

Importance

Changes 1
Bugs 0 Features 0
Metric Value
eloc 120
c 1
b 0
f 0
dl 0
loc 340
ccs 101
cts 116
cp 0.8707
rs 10
wmc 16

7 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 10 1
A load() 0 58 3
A save() 0 36 2
A update() 0 36 2
A restoreStream() 0 22 3
A revert() 0 45 2
A doStore() 0 26 3
1
<?php
2
3
/**
4
 * Event Sourcing implementation.
5
 *
6
 * @author  Maksim Masiukevich <[email protected]>
7
 * @license MIT
8
 * @license https://opensource.org/licenses/MIT
9
 */
10
11
declare(strict_types = 1);
12
13
namespace ServiceBus\EventSourcing\EventStream;
14
15
use function Amp\call;
16
use function ServiceBus\Common\createWithoutConstructor;
17
use function ServiceBus\Common\invokeReflectionMethod;
18
use function ServiceBus\EventSourcing\EventStream\Store\streamToDomainRepresentation;
19
use function ServiceBus\EventSourcing\EventStream\Store\streamToStoredRepresentation;
20
use Amp\Promise;
21
use Psr\Log\LoggerInterface;
22
use Psr\Log\NullLogger;
23
use ServiceBus\EventSourcing\Aggregate;
24
use ServiceBus\EventSourcing\AggregateId;
25
use ServiceBus\EventSourcing\EventStream\Serializer\EventSerializer;
26
use ServiceBus\EventSourcing\EventStream\Store\EventStreamStore;
27
use ServiceBus\EventSourcing\EventStream\Store\StoredAggregateEventStream;
28
use ServiceBus\EventSourcing\Snapshots\Snapshot;
29
use ServiceBus\EventSourcing\Snapshots\Snapshotter;
30
31
/**
32
 * Repository for working with event streams.
33
 */
34
final class EventStreamRepository
35
{
36
    public const REVERT_MODE_SOFT_DELETE = 1;
37
38
    public const REVERT_MODE_DELETE = 2;
39
40
    /** @var EventStreamStore */
41
    private $store;
42
43
    /** @var EventSerializer */
44
    private $serializer;
45
46
    /** @var Snapshotter */
47
    private $snapshotter;
48
49
    /** @var LoggerInterface */
50
    private $logger;
51
52 8
    public function __construct(
53
        EventStreamStore $store,
54
        Snapshotter $snapshotter,
55
        EventSerializer $serializer,
56
        ?LoggerInterface $logger = null
57
    ) {
58 8
        $this->store       = $store;
59 8
        $this->snapshotter = $snapshotter;
60 8
        $this->serializer  = $serializer;
61 8
        $this->logger      = $logger ?? new NullLogger();
62 8
    }
63
64
    /**
65
     * Load aggregate.
66
     *
67
     * Returns \ServiceBus\EventSourcing\Aggregate|null
68
     *
69
     * @throws \ServiceBus\Common\Exceptions\DateTimeException
70
     * @throws \ServiceBus\Common\Exceptions\ReflectionApiException
71
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
72
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
73
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
74
     */
75 2
    public function load(AggregateId $id): Promise
76
    {
77 2
        return call(
78
            function() use ($id): \Generator
79
            {
80 2
                $idValue = $id->toString();
81 2
                $idClass = \get_class($id);
82
83 2
                $this->logger->debug('Load aggregate with id "{aggregateIdClass}:{aggregateId}"', [
84 2
                    'aggregateIdClass' => $idClass,
85 2
                    'aggregateId'      => $idValue,
86
                ]);
87
88
                try
89
                {
90 2
                    $aggregate         = null;
91 2
                    $fromStreamVersion = Aggregate::START_PLAYHEAD_INDEX;
92
93
                    /** @var \ServiceBus\EventSourcing\Snapshots\Snapshot|null $loadedSnapshot */
94 2
                    $loadedSnapshot = yield $this->snapshotter->load($id);
95
96 2
                    if (null !== $loadedSnapshot)
97
                    {
98 1
                        $aggregate         = $loadedSnapshot->aggregate;
99 1
                        $fromStreamVersion = $aggregate->version() + 1;
100
101 1
                        $this->logger->debug(
102 1
                            'Found a snapshot of the state of the aggregate with the identifier "{aggregateIdClass}:{aggregateId}" on version "{aggregateVersion}"',
103
                            [
104 1
                                'aggregateIdClass' => $idClass,
105 1
                                'aggregateId'      => $idValue,
106 1
                                'aggregateVersion' => $aggregate->version(),
107
                            ]
108
                        );
109
                    }
110
111
                    /** @var \ServiceBus\EventSourcing\EventStream\Store\StoredAggregateEventStream|null $storedEventStream */
112 2
                    $storedEventStream = yield $this->store->load($id, $fromStreamVersion);
113
114 2
                    $aggregate = $this->restoreStream($aggregate, $storedEventStream);
115
116 2
                    return $aggregate;
117
                }
118
                catch (\Throwable $throwable)
119
                {
120
                    $this->logger->debug('Load aggregate with id "{aggregateIdClass}:{aggregateId}" failed', [
121
                        'aggregateIdClass' => $idClass,
122
                        'aggregateId'      => $idValue,
123
                        'throwableMessage' => $throwable->getMessage(),
124
                        'throwablePoint'   => \sprintf('%s:%d', $throwable->getFile(), $throwable->getLine()),
125
                    ]);
126
127
                    throw $throwable;
128
                }
129
                finally
130
                {
131
                    /** @psalm-suppress PossiblyUndefinedVariable */
132 2
                    unset($storedEventStream, $loadedSnapshot, $fromStreamVersion);
133
                }
134 2
            }
135
        );
136
    }
137
138
    /**
139
     * Save a new event stream.
140
     *
141
     * Returns array<int, object>
142
     *
143
     * @throws \ServiceBus\Common\Exceptions\DateTimeException
144
     * @throws \ServiceBus\Common\Exceptions\ReflectionApiException
145
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
146
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
147
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
148
     * @throws \ServiceBus\Storage\Common\Exceptions\UniqueConstraintViolationCheckFailed
149
     */
150 7
    public function save(Aggregate $aggregate): Promise
151
    {
152 7
        return call(
153
            function() use ($aggregate): \Generator
154
            {
155 7
                $id = $aggregate->id();
156
157 7
                $idValue = $id->toString();
158 7
                $idClass = \get_class($id);
159
160 7
                $this->logger->debug('Save new aggregate with identifier "{aggregateIdClass}:{aggregateId}"', [
161 7
                    'aggregateIdClass' => $idClass,
162 7
                    'aggregateId'      => $idValue,
163
                ]);
164
165
                try
166
                {
167
                    /**
168
                     * @psalm-var array<int, object> $raisedEvents
169
                     *
170
                     * @var object[] $raisedEvents
171
                     */
172 7
                    $raisedEvents = yield from $this->doStore($aggregate, true);
173
174 7
                    return $raisedEvents;
175
                }
176 1
                catch (\Throwable $throwable)
177
                {
178 1
                    $this->logger->debug('Save new aggregate with identifier "{aggregateIdClass}:{aggregateId}" failed', [
179 1
                        'aggregateIdClass' => $idClass,
180 1
                        'aggregateId'      => $idValue,
181 1
                        'throwableMessage' => $throwable->getMessage(),
182 1
                        'throwablePoint'   => \sprintf('%s:%d', $throwable->getFile(), $throwable->getLine()),
183
                    ]);
184
185 1
                    throw $throwable;
186
                }
187 7
            }
188
        );
189
    }
190
191
    /**
192
     * Update existent event stream (append events).
193
     *
194
     * Returns array<int, object>
195
     *
196
     * @throws \ServiceBus\Common\Exceptions\DateTimeException
197
     * @throws \ServiceBus\Common\Exceptions\ReflectionApiException
198
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
199
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
200
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
201
     */
202 5
    public function update(Aggregate $aggregate): Promise
203
    {
204 5
        return call(
205
            function() use ($aggregate): \Generator
206
            {
207 5
                $id = $aggregate->id();
208
209 5
                $idValue = $id->toString();
210 5
                $idClass = \get_class($id);
211
212 5
                $this->logger->debug('Adding events to an existing stream with identifier "{aggregateIdClass}:{aggregateId}"', [
213 5
                    'aggregateIdClass' => $idClass,
214 5
                    'aggregateId'      => $idValue,
215
                ]);
216
217
                try
218
                {
219
                    /**
220
                     * @psalm-var array<int, object> $raisedEvents
221
                     *
222
                     * @var object[] $raisedEvents
223
                     */
224 5
                    $raisedEvents = yield from $this->doStore($aggregate, false);
225
226 5
                    return $raisedEvents;
227
                }
228
                catch (\Throwable $throwable)
229
                {
230
                    $this->logger->debug('Adding events to an existing stream with identifier "{aggregateIdClass}:{aggregateId}', [
231
                        'aggregateIdClass' => $idClass,
232
                        'aggregateId'      => $idValue,
233
                        'throwableMessage' => $throwable->getMessage(),
234
                        'throwablePoint'   => \sprintf('%s:%d', $throwable->getFile(), $throwable->getLine()),
235
                    ]);
236
237
                    throw $throwable;
238
                }
239 5
            }
240
        );
241
    }
242
243
    /**
244
     * Revert aggregate to specified version.
245
     *
246
     * Returns \ServiceBus\EventSourcing\Aggregate
247
     *
248
     * Mode options:
249
     *   - 1 (self::REVERT_MODE_SOFT_DELETE): Mark tail events as deleted (soft deletion). There may be version
250
     *   conflicts in some situations
251
     *   - 2 (self::REVERT_MODE_DELETE): Removes tail events from the database (the best option)
252
     *
253
     * @throws \ServiceBus\Common\Exceptions\DateTimeException
254
     * @throws \ServiceBus\Common\Exceptions\ReflectionApiException
255
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
256
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
257
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
258
     */
259 4
    public function revert(Aggregate $aggregate, int $toVersion, int $mode = self::REVERT_MODE_SOFT_DELETE): Promise
260
    {
261 4
        return call(
262
            function() use ($aggregate, $toVersion, $mode): \Generator
263
            {
264 4
                $id = $aggregate->id();
265
266 4
                $idValue = $id->toString();
267 4
                $idClass = \get_class($id);
268
269 4
                $this->logger->debug('Rollback of aggregate with identifier "{aggregateIdClass}:{aggregateId}" to version "{aggregateVersion}"', [
270 4
                    'aggregateIdClass' => $idClass,
271 4
                    'aggregateId'      => $idValue,
272 4
                    'aggregateVersion' => $toVersion,
273
                ]);
274
275
                try
276
                {
277 4
                    yield $this->store->revert($aggregate->id(), $toVersion, self::REVERT_MODE_DELETE === $mode);
278
279
                    /** @var StoredAggregateEventStream|null $storedEventStream */
280 3
                    $storedEventStream = yield $this->store->load($aggregate->id());
281
282
                    /** @var Aggregate $aggregate */
283 3
                    $aggregate = $this->restoreStream(null, $storedEventStream);
284
285 3
                    yield $this->snapshotter->store(new Snapshot($aggregate, $aggregate->version()));
286
287 3
                    return $aggregate;
288
                }
289 2
                catch (\Throwable $throwable)
290
                {
291 2
                    $this->logger->debug('Error when rolling back the version of the aggregate with the identifier "{aggregateIdClass}:{aggregateId}', [
292 2
                        'aggregateIdClass' => $idClass,
293 2
                        'aggregateId'      => $idValue,
294 2
                        'throwableMessage' => $throwable->getMessage(),
295 2
                        'throwablePoint'   => \sprintf('%s:%d', $throwable->getFile(), $throwable->getLine()),
296
                    ]);
297
298 2
                    throw $throwable;
299
                }
300
                finally
301
                {
302
                    /** @psalm-suppress PossiblyUndefinedVariable */
303 4
                    unset($storedEventStream);
304
                }
305 4
            }
306
        );
307
    }
308
309
    /**
310
     * @throws \ServiceBus\Common\Exceptions\DateTimeException
311
     * @throws \ServiceBus\Common\Exceptions\ReflectionApiException
312
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
313
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
314
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
315
     * @throws \ServiceBus\Storage\Common\Exceptions\UniqueConstraintViolationCheckFailed
316
     */
317 7
    private function doStore(Aggregate $aggregate, bool $isNew): \Generator
318
    {
319
        /** @var \ServiceBus\EventSourcing\EventStream\AggregateEventStream $eventStream */
320 7
        $eventStream    = invokeReflectionMethod($aggregate, 'makeStream');
321 7
        $receivedEvents = $eventStream->originEvents;
322
323 7
        $storedEventStream = streamToStoredRepresentation($this->serializer, $eventStream);
324
325
        /** @noinspection PhpUnnecessaryLocalVariableInspection */
326 7
        $promise = true === $isNew
327 7
            ? $this->store->save($storedEventStream)
328 7
            : $this->store->append($storedEventStream);
329
330 7
        yield $promise;
331
332
        /** @var \ServiceBus\EventSourcing\Snapshots\Snapshot|null $loadedSnapshot */
333 7
        $loadedSnapshot = yield $this->snapshotter->load($aggregate->id());
334
335 7
        if (true === $this->snapshotter->snapshotMustBeCreated($aggregate, $loadedSnapshot))
336
        {
337 7
            yield $this->snapshotter->store(new Snapshot($aggregate, $aggregate->version()));
338
        }
339
340 7
        unset($eventStream, $loadedSnapshot, $storedEventStream);
341
342 7
        return $receivedEvents;
343
    }
344
345
    /**
346
     * Restore the aggregate from the event stream/Add missing events to the aggregate from the snapshot.
347
     *
348
     * @throws \ServiceBus\Common\Exceptions\DateTimeException
349
     * @throws \ServiceBus\EventSourcing\EventStream\Serializer\Exceptions\SerializeEventFailed
350
     * @throws \ServiceBus\Common\Exceptions\ReflectionApiException
351
     */
352 5
    private function restoreStream(?Aggregate $aggregate, ?StoredAggregateEventStream $storedEventStream): ?Aggregate
353
    {
354 5
        if (null === $storedEventStream)
355
        {
356
            return null;
357
        }
358
359 5
        $eventStream = streamToDomainRepresentation($this->serializer, $storedEventStream);
360
361 5
        if (null === $aggregate)
362
        {
363
            /**
364
             * @noinspection CallableParameterUseCaseInTypeContextInspection
365
             *
366
             * @var Aggregate $aggregate
367
             */
368 4
            $aggregate = createWithoutConstructor($storedEventStream->aggregateClass);
369
        }
370
371 5
        invokeReflectionMethod($aggregate, 'appendStream', $eventStream);
372
373 5
        return $aggregate;
374
    }
375
}
376