Passed
Push — v4.2 ( 50993c )
by Masiukevich
07:39
created

EventStreamRepository::load()   A

Complexity

Conditions 3
Paths 1

Size

Total Lines 58
Code Lines 31

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 22
CRAP Score 3.1266

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 3
eloc 31
c 1
b 0
f 0
nc 1
nop 1
dl 0
loc 58
ccs 22
cts 29
cp 0.7586
crap 3.1266
rs 9.424

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
/**
4
 * Event Sourcing implementation.
5
 *
6
 * @author  Maksim Masiukevich <[email protected]>
7
 * @license MIT
8
 * @license https://opensource.org/licenses/MIT
9
 */
10
11
declare(strict_types = 1);
12
13
namespace ServiceBus\EventSourcing\EventStream;
14
15
use function Amp\call;
16
use function ServiceBus\Common\createWithoutConstructor;
17
use function ServiceBus\Common\invokeReflectionMethod;
18
use function ServiceBus\EventSourcing\EventStream\Store\streamToDomainRepresentation;
19
use function ServiceBus\EventSourcing\EventStream\Store\streamToStoredRepresentation;
20
use Amp\Promise;
21
use Psr\Log\LoggerInterface;
22
use Psr\Log\NullLogger;
23
use ServiceBus\EventSourcing\Aggregate;
24
use ServiceBus\EventSourcing\AggregateId;
25
use ServiceBus\EventSourcing\EventStream\Serializer\EventSerializer;
26
use ServiceBus\EventSourcing\EventStream\Store\EventStreamStore;
27
use ServiceBus\EventSourcing\EventStream\Store\StoredAggregateEventStream;
28
use ServiceBus\EventSourcing\Snapshots\Snapshot;
29
use ServiceBus\EventSourcing\Snapshots\Snapshotter;
30
31
/**
32
 * Repository for working with event streams.
33
 */
34
final class EventStreamRepository
35
{
36
    public const REVERT_MODE_SOFT_DELETE = 1;
37
38
    public const REVERT_MODE_DELETE = 2;
39
40
    /** @var EventStreamStore */
41
    private $store;
42
43
    /** @var EventSerializer */
44
    private $serializer;
45
46
    /** @var Snapshotter */
47
    private $snapshotter;
48
49
    /** @var LoggerInterface */
50
    private $logger;
51
52 8
    public function __construct(
53
        EventStreamStore $store,
54
        Snapshotter $snapshotter,
55
        EventSerializer $serializer,
56
        ?LoggerInterface $logger = null
57
    ) {
58 8
        $this->store       = $store;
59 8
        $this->snapshotter = $snapshotter;
60 8
        $this->serializer  = $serializer;
61 8
        $this->logger      = $logger ?? new NullLogger();
62 8
    }
63
64
    /**
65
     * Load aggregate.
66
     *
67
     * Returns \ServiceBus\EventSourcing\Aggregate|null
68
     *
69
     * @throws \ServiceBus\Common\Exceptions\DateTimeException
70
     * @throws \ServiceBus\Common\Exceptions\ReflectionApiException
71
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
72
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
73
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
74
     */
75 2
    public function load(AggregateId $id): Promise
76
    {
77 2
        return call(
78
            function() use ($id): \Generator
79
            {
80 2
                $idValue = $id->toString();
81 2
                $idClass = \get_class($id);
82
83 2
                $this->logger->debug('Load aggregate with id "{aggregateIdClass}:{aggregateId}"', [
84 2
                    'aggregateIdClass' => $idClass,
85 2
                    'aggregateId'      => $idValue,
86
                ]);
87
88
                try
89
                {
90 2
                    $aggregate         = null;
91 2
                    $fromStreamVersion = Aggregate::START_PLAYHEAD_INDEX;
92
93
                    /** @var \ServiceBus\EventSourcing\Snapshots\Snapshot|null $loadedSnapshot */
94 2
                    $loadedSnapshot = yield $this->snapshotter->load($id);
95
96 2
                    if (null !== $loadedSnapshot)
97
                    {
98 1
                        $aggregate         = $loadedSnapshot->aggregate;
99 1
                        $fromStreamVersion = $aggregate->version() + 1;
100
101 1
                        $this->logger->debug(
102 1
                            'Found a snapshot of the state of the aggregate with the identifier "{aggregateIdClass}:{aggregateId}" on version "{aggregateVersion}"',
103
                            [
104 1
                                'aggregateIdClass' => $idClass,
105 1
                                'aggregateId'      => $idValue,
106 1
                                'aggregateVersion' => $aggregate->version(),
107
                            ]
108
                        );
109
                    }
110
111
                    /** @var \ServiceBus\EventSourcing\EventStream\Store\StoredAggregateEventStream|null $storedEventStream */
112 2
                    $storedEventStream = yield $this->store->load($id, $fromStreamVersion);
113
114 2
                    $aggregate = $this->restoreStream($aggregate, $storedEventStream);
115
116 2
                    return $aggregate;
117
                }
118
                catch (\Throwable $throwable)
119
                {
120
                    $this->logger->debug('Load aggregate with id "{aggregateIdClass}:{aggregateId}" failed', [
121
                        'aggregateIdClass' => $idClass,
122
                        'aggregateId'      => $idValue,
123
                        'throwableMessage' => $throwable->getMessage(),
124
                        'throwablePoint'   => \sprintf('%s:%d', $throwable->getFile(), $throwable->getLine()),
125
                    ]);
126
127
                    throw $throwable;
128
                }
129
                finally
130
                {
131
                    /** @psalm-suppress PossiblyUndefinedVariable */
132 2
                    unset($storedEventStream, $loadedSnapshot, $fromStreamVersion);
133
                }
134 2
            }
135
        );
136
    }
137
138
    /**
139
     * Save a new event stream.
140
     *
141
     * Returns array<int, object>
142
     *
143
     * @throws \ServiceBus\Common\Exceptions\DateTimeException
144
     * @throws \ServiceBus\Common\Exceptions\ReflectionApiException
145
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
146
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
147
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
148
     * @throws \ServiceBus\Storage\Common\Exceptions\UniqueConstraintViolationCheckFailed
149
     */
150 7
    public function save(Aggregate $aggregate): Promise
151
    {
152 7
        return call(
153
            function() use ($aggregate): \Generator
154
            {
155 7
                $id = $aggregate->id();
156
157 7
                $idValue = $id->toString();
158 7
                $idClass = \get_class($id);
159
160 7
                $this->logger->debug('Save new aggregate with identifier "{aggregateIdClass}:{aggregateId}"', [
161 7
                    'aggregateIdClass' => $idClass,
162 7
                    'aggregateId'      => $idValue,
163
                ]);
164
165
                try
166
                {
167
                    /**
168
                     * @psalm-var array<int, object> $raisedEvents
169
                     *
170
                     * @var object[] $raisedEvents
171
                     */
172 7
                    $raisedEvents = yield from $this->doStore($aggregate, true);
173
174 7
                    return $raisedEvents;
175
                }
176 1
                catch (\Throwable $throwable)
177
                {
178 1
                    $this->logger->debug('Save new aggregate with identifier "{aggregateIdClass}:{aggregateId}" failed', [
179 1
                        'aggregateIdClass' => $idClass,
180 1
                        'aggregateId'      => $idValue,
181 1
                        'throwableMessage' => $throwable->getMessage(),
182 1
                        'throwablePoint'   => \sprintf('%s:%d', $throwable->getFile(), $throwable->getLine()),
183
                    ]);
184
185 1
                    throw $throwable;
186
                }
187 7
            }
188
        );
189
    }
190
191
    /**
192
     * Update existent event stream (append events).
193
     *
194
     * Returns array<int, object>
195
     *
196
     * @throws \ServiceBus\Common\Exceptions\DateTimeException
197
     * @throws \ServiceBus\Common\Exceptions\ReflectionApiException
198
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
199
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
200
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
201
     */
202 5
    public function update(Aggregate $aggregate): Promise
203
    {
204 5
        return call(
205
            function() use ($aggregate): \Generator
206
            {
207 5
                $id = $aggregate->id();
208
209 5
                $idValue = $id->toString();
210 5
                $idClass = \get_class($id);
211
212 5
                $this->logger->debug('Adding events to an existing stream with identifier "{aggregateIdClass}:{aggregateId}"', [
213 5
                    'aggregateIdClass' => $idClass,
214 5
                    'aggregateId'      => $idValue,
215
                ]);
216
217
                try
218
                {
219
                    /**
220
                     * @psalm-var array<int, object> $raisedEvents
221
                     *
222
                     * @var object[] $raisedEvents
223
                     */
224 5
                    $raisedEvents = yield from $this->doStore($aggregate, false);
225
226 5
                    return $raisedEvents;
227
                }
228
                catch (\Throwable $throwable)
229
                {
230
                    $this->logger->debug('Adding events to an existing stream with identifier "{aggregateIdClass}:{aggregateId}', [
231
                        'aggregateIdClass' => $idClass,
232
                        'aggregateId'      => $idValue,
233
                        'throwableMessage' => $throwable->getMessage(),
234
                        'throwablePoint'   => \sprintf('%s:%d', $throwable->getFile(), $throwable->getLine()),
235
                    ]);
236
237
                    throw $throwable;
238
                }
239 5
            }
240
        );
241
    }
242
243
    /**
244
     * Revert aggregate to specified version.
245
     *
246
     * Returns \ServiceBus\EventSourcing\Aggregate
247
     *
248
     * Mode options:
249
     *   - 1 (self::REVERT_MODE_SOFT_DELETE): Mark tail events as deleted (soft deletion). There may be version
250
     *   conflicts in some situations
251
     *   - 2 (self::REVERT_MODE_DELETE): Removes tail events from the database (the best option)
252
     *
253
     * @throws \ServiceBus\Common\Exceptions\DateTimeException
254
     * @throws \ServiceBus\Common\Exceptions\ReflectionApiException
255
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
256
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
257
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
258
     */
259 4
    public function revert(Aggregate $aggregate, int $toVersion, int $mode = self::REVERT_MODE_SOFT_DELETE): Promise
260
    {
261 4
        return call(
262
            function() use ($aggregate, $toVersion, $mode): \Generator
263
            {
264 4
                $id = $aggregate->id();
265
266 4
                $idValue = $id->toString();
267 4
                $idClass = \get_class($id);
268
269 4
                $this->logger->debug('Rollback of aggregate with identifier "{aggregateIdClass}:{aggregateId}" to version "{aggregateVersion}"', [
270 4
                    'aggregateIdClass' => $idClass,
271 4
                    'aggregateId'      => $idValue,
272 4
                    'aggregateVersion' => $toVersion,
273
                ]);
274
275
                try
276
                {
277 4
                    yield $this->store->revert($aggregate->id(), $toVersion, self::REVERT_MODE_DELETE === $mode);
278
279
                    /** @var StoredAggregateEventStream|null $storedEventStream */
280 3
                    $storedEventStream = yield $this->store->load($aggregate->id());
281
282
                    /** @var Aggregate $aggregate */
283 3
                    $aggregate = $this->restoreStream(null, $storedEventStream);
284
285 3
                    yield $this->snapshotter->store(new Snapshot($aggregate, $aggregate->version()));
286
287 3
                    return $aggregate;
288
                }
289 2
                catch (\Throwable $throwable)
290
                {
291 2
                    $this->logger->debug('Error when rolling back the version of the aggregate with the identifier "{aggregateIdClass}:{aggregateId}', [
292 2
                        'aggregateIdClass' => $idClass,
293 2
                        'aggregateId'      => $idValue,
294 2
                        'throwableMessage' => $throwable->getMessage(),
295 2
                        'throwablePoint'   => \sprintf('%s:%d', $throwable->getFile(), $throwable->getLine()),
296
                    ]);
297
298 2
                    throw $throwable;
299
                }
300
                finally
301
                {
302
                    /** @psalm-suppress PossiblyUndefinedVariable */
303 4
                    unset($storedEventStream);
304
                }
305 4
            }
306
        );
307
    }
308
309
    /**
310
     * @throws \ServiceBus\Common\Exceptions\DateTimeException
311
     * @throws \ServiceBus\Common\Exceptions\ReflectionApiException
312
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
313
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
314
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
315
     * @throws \ServiceBus\Storage\Common\Exceptions\UniqueConstraintViolationCheckFailed
316
     */
317 7
    private function doStore(Aggregate $aggregate, bool $isNew): \Generator
318
    {
319
        /** @var \ServiceBus\EventSourcing\EventStream\AggregateEventStream $eventStream */
320 7
        $eventStream    = invokeReflectionMethod($aggregate, 'makeStream');
321 7
        $receivedEvents = $eventStream->originEvents;
322
323 7
        $storedEventStream = streamToStoredRepresentation($this->serializer, $eventStream);
324
325
        /** @noinspection PhpUnnecessaryLocalVariableInspection */
326 7
        $promise = true === $isNew
327 7
            ? $this->store->save($storedEventStream)
328 7
            : $this->store->append($storedEventStream);
329
330 7
        yield $promise;
331
332
        /** @var \ServiceBus\EventSourcing\Snapshots\Snapshot|null $loadedSnapshot */
333 7
        $loadedSnapshot = yield $this->snapshotter->load($aggregate->id());
334
335 7
        if (true === $this->snapshotter->snapshotMustBeCreated($aggregate, $loadedSnapshot))
336
        {
337 7
            yield $this->snapshotter->store(new Snapshot($aggregate, $aggregate->version()));
338
        }
339
340 7
        unset($eventStream, $loadedSnapshot, $storedEventStream);
341
342 7
        return $receivedEvents;
343
    }
344
345
    /**
346
     * Restore the aggregate from the event stream/Add missing events to the aggregate from the snapshot.
347
     *
348
     * @throws \ServiceBus\Common\Exceptions\DateTimeException
349
     * @throws \ServiceBus\EventSourcing\EventStream\Serializer\Exceptions\SerializeEventFailed
350
     * @throws \ServiceBus\Common\Exceptions\ReflectionApiException
351
     */
352 5
    private function restoreStream(?Aggregate $aggregate, ?StoredAggregateEventStream $storedEventStream): ?Aggregate
353
    {
354 5
        if (null === $storedEventStream)
355
        {
356
            return null;
357
        }
358
359 5
        $eventStream = streamToDomainRepresentation($this->serializer, $storedEventStream);
360
361 5
        if (null === $aggregate)
362
        {
363
            /**
364
             * @noinspection CallableParameterUseCaseInTypeContextInspection
365
             *
366
             * @var Aggregate $aggregate
367
             */
368 4
            $aggregate = createWithoutConstructor($storedEventStream->aggregateClass);
369
        }
370
371 5
        invokeReflectionMethod($aggregate, 'appendStream', $eventStream);
372
373 5
        return $aggregate;
374
    }
375
}
376