Completed
Push — v4.0 ( ff495d...e0bacf )
by Masiukevich
01:44
created

EventStreamRepository::__construct()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 10
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 5
CRAP Score 1

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
eloc 4
c 1
b 0
f 0
nc 1
nop 4
dl 0
loc 10
ccs 5
cts 5
cp 1
crap 1
rs 10
1
<?php
2
3
/**
4
 * Event Sourcing implementation.
5
 *
6
 * @author  Maksim Masiukevich <[email protected]>
7
 * @license MIT
8
 * @license https://opensource.org/licenses/MIT
9
 */
10
11
declare(strict_types = 1);
12
13
namespace ServiceBus\EventSourcing\EventStream;
14
15
use function Amp\call;
16
use function ServiceBus\Common\createWithoutConstructor;
17
use function ServiceBus\Common\invokeReflectionMethod;
18
use function ServiceBus\EventSourcing\EventStream\Store\streamToDomainRepresentation;
19
use function ServiceBus\EventSourcing\EventStream\Store\streamToStoredRepresentation;
20
use Amp\Promise;
21
use Psr\Log\LoggerInterface;
22
use Psr\Log\NullLogger;
23
use ServiceBus\EventSourcing\Aggregate;
24
use ServiceBus\EventSourcing\AggregateId;
25
use ServiceBus\EventSourcing\EventStream\Serializer\EventSerializer;
26
use ServiceBus\EventSourcing\EventStream\Store\EventStreamStore;
27
use ServiceBus\EventSourcing\EventStream\Store\StoredAggregateEventStream;
28
use ServiceBus\EventSourcing\Snapshots\Snapshot;
29
use ServiceBus\EventSourcing\Snapshots\Snapshotter;
30
31
/**
32
 * Repository for working with event streams.
33
 */
34
final class EventStreamRepository
35
{
36
    public const REVERT_MODE_SOFT_DELETE = 1;
37
38
    public const REVERT_MODE_DELETE = 2;
39
40
    private EventStreamStore $store;
41
42
    private EventSerializer $serializer;
43
44
    private Snapshotter $snapshotter;
45
46
    private LoggerInterface $logger;
47
48 8
    public function __construct(
49
        EventStreamStore $store,
50
        Snapshotter $snapshotter,
51
        EventSerializer $serializer,
52
        ?LoggerInterface $logger = null
53
    ) {
54 8
        $this->store       = $store;
55 8
        $this->snapshotter = $snapshotter;
56 8
        $this->serializer  = $serializer;
57 8
        $this->logger      = $logger ?? new NullLogger();
58 8
    }
59
60
    /**
61
     * Load aggregate.
62
     *
63
     * Returns \ServiceBus\EventSourcing\Aggregate|null
64
     *
65
     * @throws \ServiceBus\Common\Exceptions\DateTimeException
66
     * @throws \ServiceBus\Common\Exceptions\ReflectionApiException
67
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
68
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
69
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
70
     */
71 2
    public function load(AggregateId $id): Promise
72
    {
73 2
        $store      = $this->store;
74 2
        $serializer = $this->serializer;
75 2
        $snaphotter = $this->snapshotter;
76 2
        $logger     = $this->logger;
77
78 2
        return call(
79
            static function(AggregateId $id) use ($store, $serializer, $snaphotter, $logger): \Generator
80
            {
81 2
                $idValue = $id->toString();
82 2
                $idClass = \get_class($id);
83
84 2
                $logger->debug('Load aggregate with id "{aggregateIdClass}:{aggregateId}"', [
85 2
                    'aggregateIdClass' => $idClass,
86 2
                    'aggregateId'      => $idValue,
87
                ]);
88
89
                try
90
                {
91 2
                    $aggregate         = null;
92 2
                    $fromStreamVersion = Aggregate::START_PLAYHEAD_INDEX;
93
94
                    /** @var \ServiceBus\EventSourcing\Snapshots\Snapshot|null $loadedSnapshot */
95 2
                    $loadedSnapshot = yield $snaphotter->load($id);
96
97 2
                    if (null !== $loadedSnapshot)
98
                    {
99 1
                        $aggregate         = $loadedSnapshot->aggregate;
100 1
                        $fromStreamVersion = $aggregate->version() + 1;
101
102 1
                        $logger->debug(
103 1
                            'Found a snapshot of the state of the aggregate with the identifier "{aggregateIdClass}:{aggregateId}" on version "{aggregateVersion}"',
104
                            [
105 1
                                'aggregateIdClass' => $idClass,
106 1
                                'aggregateId'      => $idValue,
107 1
                                'aggregateVersion' => $aggregate->version(),
108
                            ]
109
                        );
110
                    }
111
112
                    /** @var \ServiceBus\EventSourcing\EventStream\Store\StoredAggregateEventStream|null $storedEventStream */
113 2
                    $storedEventStream = yield $store->load($id, $fromStreamVersion);
114
115 2
                    $aggregate = self::restoreStream($serializer, $aggregate, $storedEventStream);
116
117 2
                    return $aggregate;
118
                }
119
                catch (\Throwable $throwable)
120
                {
121
                    $logger->debug('Load aggregate with id "{aggregateIdClass}:{aggregateId}" failed', [
122
                        'aggregateIdClass' => $idClass,
123
                        'aggregateId'      => $idValue,
124
                        'throwableMessage' => $throwable->getMessage(),
125
                        'throwablePoint'   => \sprintf('%s:%d', $throwable->getFile(), $throwable->getLine()),
126
                    ]);
127
128
                    throw $throwable;
129
                }
130
                finally
131
                {
132
                    /** @psalm-suppress PossiblyUndefinedVariable */
133 2
                    unset($storedEventStream, $loadedSnapshot, $fromStreamVersion);
134
                }
135 2
            },
136 2
            $id
137
        );
138
    }
139
140
    /**
141
     * Save a new event stream.
142
     *
143
     * Returns array<int, object>
144
     *
145
     * @throws \ServiceBus\Common\Exceptions\DateTimeException
146
     * @throws \ServiceBus\Common\Exceptions\ReflectionApiException
147
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
148
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
149
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
150
     * @throws \ServiceBus\Storage\Common\Exceptions\UniqueConstraintViolationCheckFailed
151
     */
152 7
    public function save(Aggregate $aggregate): Promise
153
    {
154 7
        $store      = $this->store;
155 7
        $serializer = $this->serializer;
156 7
        $snaphotter = $this->snapshotter;
157 7
        $logger     = $this->logger;
158
159 7
        return call(
160
            static function(Aggregate $aggregate) use ($store, $serializer, $snaphotter, $logger): \Generator
161
            {
162 7
                $id = $aggregate->id();
163
164 7
                $idValue = $id->toString();
165 7
                $idClass = \get_class($id);
166
167 7
                $logger->debug('Save new aggregate with identifier "{aggregateIdClass}:{aggregateId}"', [
168 7
                    'aggregateIdClass' => $idClass,
169 7
                    'aggregateId'      => $idValue,
170
                ]);
171
172
                try
173
                {
174
                    /**
175
                     * @psalm-var array<int, object> $raisedEvents
176
                     *
177
                     * @var object[] $raisedEvents
178
                     */
179 7
                    $raisedEvents = yield from self::doStore($serializer, $store, $snaphotter, $aggregate, true);
180
181 7
                    return $raisedEvents;
182
                }
183 1
                catch (\Throwable $throwable)
184
                {
185 1
                    $logger->debug('Save new aggregate with identifier "{aggregateIdClass}:{aggregateId}" failed', [
186 1
                        'aggregateIdClass' => $idClass,
187 1
                        'aggregateId'      => $idValue,
188 1
                        'throwableMessage' => $throwable->getMessage(),
189 1
                        'throwablePoint'   => \sprintf('%s:%d', $throwable->getFile(), $throwable->getLine()),
190
                    ]);
191
192 1
                    throw $throwable;
193
                }
194 7
            },
195 7
            $aggregate
196
        );
197
    }
198
199
    /**
200
     * Update existent event stream (append events).
201
     *
202
     * Returns array<int, object>
203
     *
204
     * @throws \ServiceBus\Common\Exceptions\DateTimeException
205
     * @throws \ServiceBus\Common\Exceptions\ReflectionApiException
206
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
207
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
208
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
209
     */
210 5
    public function update(Aggregate $aggregate): Promise
211
    {
212 5
        $store      = $this->store;
213 5
        $serializer = $this->serializer;
214 5
        $snaphotter = $this->snapshotter;
215 5
        $logger     = $this->logger;
216
217 5
        return call(
218
            static function(Aggregate $aggregate) use ($store, $serializer, $snaphotter, $logger): \Generator
219
            {
220 5
                $id = $aggregate->id();
221
222 5
                $idValue = $id->toString();
223 5
                $idClass = \get_class($id);
224
225 5
                $logger->debug('Adding events to an existing stream with identifier "{aggregateIdClass}:{aggregateId}"', [
226 5
                    'aggregateIdClass' => $idClass,
227 5
                    'aggregateId'      => $idValue,
228
                ]);
229
230
                try
231
                {
232
                    /**
233
                     * @psalm-var array<int, object> $raisedEvents
234
                     *
235
                     * @var object[] $raisedEvents
236
                     */
237 5
                    $raisedEvents = yield from self::doStore($serializer, $store, $snaphotter, $aggregate, false);
238
239 5
                    return $raisedEvents;
240
                }
241
                catch (\Throwable $throwable)
242
                {
243
                    $logger->debug('Adding events to an existing stream with identifier "{aggregateIdClass}:{aggregateId}', [
244
                        'aggregateIdClass' => $idClass,
245
                        'aggregateId'      => $idValue,
246
                        'throwableMessage' => $throwable->getMessage(),
247
                        'throwablePoint'   => \sprintf('%s:%d', $throwable->getFile(), $throwable->getLine()),
248
                    ]);
249
250
                    throw $throwable;
251
                }
252 5
            },
253 5
            $aggregate
254
        );
255
    }
256
257
    /**
258
     * Revert aggregate to specified version.
259
     *
260
     * Returns \ServiceBus\EventSourcing\Aggregate
261
     *
262
     * Mode options:
263
     *   - 1 (self::REVERT_MODE_SOFT_DELETE): Mark tail events as deleted (soft deletion). There may be version
264
     *   conflicts in some situations
265
     *   - 2 (self::REVERT_MODE_DELETE): Removes tail events from the database (the best option)
266
     *
267
     * @throws \ServiceBus\Common\Exceptions\DateTimeException
268
     * @throws \ServiceBus\Common\Exceptions\ReflectionApiException
269
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
270
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
271
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
272
     */
273 4
    public function revert(Aggregate $aggregate, int $toVersion, int $mode = self::REVERT_MODE_SOFT_DELETE): Promise
274
    {
275 4
        $store      = $this->store;
276 4
        $serializer = $this->serializer;
277 4
        $snaphotter = $this->snapshotter;
278 4
        $logger     = $this->logger;
279
280 4
        return call(
281
            static function(Aggregate $aggregate, int $toVersion, int $mode) use ($store, $serializer, $snaphotter, $logger): \Generator
282
            {
283 4
                $id = $aggregate->id();
284
285 4
                $idValue = $id->toString();
286 4
                $idClass = \get_class($id);
287
288 4
                $logger->debug('Rollback of aggregate with identifier "{aggregateIdClass}:{aggregateId}" to version "{aggregateVersion}"', [
289 4
                    'aggregateIdClass' => $idClass,
290 4
                    'aggregateId'      => $idValue,
291 4
                    'aggregateVersion' => $toVersion,
292
                ]);
293
294
                try
295
                {
296 4
                    yield $store->revert($aggregate->id(), $toVersion, self::REVERT_MODE_DELETE === $mode);
297
298
                    /** @var StoredAggregateEventStream|null $storedEventStream */
299 3
                    $storedEventStream = yield $store->load($aggregate->id());
300
301
                    /** @var Aggregate $aggregate */
302 3
                    $aggregate = self::restoreStream($serializer, null, $storedEventStream);
303
304 3
                    yield $snaphotter->store(new Snapshot($aggregate, $aggregate->version()));
305
306 3
                    return $aggregate;
307
                }
308 2
                catch (\Throwable $throwable)
309
                {
310 2
                    $logger->debug('Error when rolling back the version of the aggregate with the identifier "{aggregateIdClass}:{aggregateId}', [
311 2
                        'aggregateIdClass' => $idClass,
312 2
                        'aggregateId'      => $idValue,
313 2
                        'throwableMessage' => $throwable->getMessage(),
314 2
                        'throwablePoint'   => \sprintf('%s:%d', $throwable->getFile(), $throwable->getLine()),
315
                    ]);
316
317 2
                    throw $throwable;
318
                }
319
                finally
320
                {
321
                    /** @psalm-suppress PossiblyUndefinedVariable */
322 4
                    unset($storedEventStream);
323
                }
324 4
            },
325 4
            $aggregate,
326 4
            $toVersion,
327 4
            $mode
328
        );
329
    }
330
331
    /**
332
     * @throws \ServiceBus\Common\Exceptions\DateTimeException
333
     * @throws \ServiceBus\Common\Exceptions\ReflectionApiException
334
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
335
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
336
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
337
     * @throws \ServiceBus\Storage\Common\Exceptions\UniqueConstraintViolationCheckFailed
338
     */
339 7
    private static function doStore(
340
        EventSerializer $eventSerializer,
341
        EventStreamStore $eventStreamStore,
342
        Snapshotter $snapshotter,
343
        Aggregate $aggregate,
344
        bool $isNew
345
    ): \Generator {
346
        /** @var \ServiceBus\EventSourcing\EventStream\AggregateEventStream $eventStream */
347 7
        $eventStream    = invokeReflectionMethod($aggregate, 'makeStream');
348 7
        $receivedEvents = $eventStream->originEvents;
349
350 7
        $storedEventStream = streamToStoredRepresentation($eventSerializer, $eventStream);
351
352
        /** @noinspection PhpUnnecessaryLocalVariableInspection */
353 7
        $promise = true === $isNew
354 7
            ? $eventStreamStore->save($storedEventStream)
355 7
            : $eventStreamStore->append($storedEventStream);
356
357 7
        yield $promise;
358
359
        /** @var \ServiceBus\EventSourcing\Snapshots\Snapshot|null $loadedSnapshot */
360 7
        $loadedSnapshot = yield $snapshotter->load($aggregate->id());
361
362 7
        if (true === $snapshotter->snapshotMustBeCreated($aggregate, $loadedSnapshot))
363
        {
364 7
            yield $snapshotter->store(new Snapshot($aggregate, $aggregate->version()));
365
        }
366
367 7
        unset($eventStream, $loadedSnapshot, $storedEventStream);
368
369 7
        return $receivedEvents;
370
    }
371
372
    /**
373
     * Restore the aggregate from the event stream/Add missing events to the aggregate from the snapshot.
374
     *
375
     * @throws \ServiceBus\Common\Exceptions\DateTimeException
376
     * @throws \ServiceBus\EventSourcing\EventStream\Serializer\Exceptions\SerializeEventFailed
377
     * @throws \ServiceBus\Common\Exceptions\ReflectionApiException
378
     */
379 5
    private static function restoreStream(
380
        EventSerializer $eventSerializer,
381
        ?Aggregate $aggregate,
382
        ?StoredAggregateEventStream $storedEventStream
383
    ): ?Aggregate {
384 5
        if (null === $storedEventStream)
385
        {
386
            return null;
387
        }
388
389 5
        $eventStream = streamToDomainRepresentation($eventSerializer, $storedEventStream);
390
391 5
        if (null === $aggregate)
392
        {
393
            /**
394
             * @noinspection CallableParameterUseCaseInTypeContextInspection
395
             *
396
             * @var Aggregate $aggregate
397
             */
398 4
            $aggregate = createWithoutConstructor($storedEventStream->aggregateClass);
399
        }
400
401 5
        invokeReflectionMethod($aggregate, 'appendStream', $eventStream);
402
403 5
        return $aggregate;
404
    }
405
}
406