Passed
Push — v4.2 ( c387ce...05d5da )
by Masiukevich
11:55 queued 10:19
created

EventStreamRepository::load()   A

Complexity

Conditions 3
Paths 1

Size

Total Lines 58
Code Lines 31

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 22
CRAP Score 3.1266

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 3
eloc 31
c 1
b 0
f 0
nc 1
nop 1
dl 0
loc 58
ccs 22
cts 29
cp 0.7586
crap 3.1266
rs 9.424

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
/**
4
 * Event Sourcing implementation.
5
 *
6
 * @author  Maksim Masiukevich <[email protected]>
7
 * @license MIT
8
 * @license https://opensource.org/licenses/MIT
9
 */
10
11
declare(strict_types = 1);
12
13
namespace ServiceBus\EventSourcing\EventStream;
14
15
use function Amp\call;
16
use function ServiceBus\Common\createWithoutConstructor;
17
use function ServiceBus\Common\invokeReflectionMethod;
18
use function ServiceBus\Common\throwableMessage;
19
use function ServiceBus\EventSourcing\EventStream\Store\streamToDomainRepresentation;
20
use function ServiceBus\EventSourcing\EventStream\Store\streamToStoredRepresentation;
21
use Amp\Promise;
22
use Psr\Log\LoggerInterface;
23
use Psr\Log\NullLogger;
24
use ServiceBus\EventSourcing\Aggregate;
25
use ServiceBus\EventSourcing\AggregateId;
26
use ServiceBus\EventSourcing\EventStream\Serializer\EventSerializer;
27
use ServiceBus\EventSourcing\EventStream\Store\EventStreamStore;
28
use ServiceBus\EventSourcing\EventStream\Store\StoredAggregateEventStream;
29
use ServiceBus\EventSourcing\Snapshots\Snapshot;
30
use ServiceBus\EventSourcing\Snapshots\Snapshotter;
31
32
/**
33
 * Repository for working with event streams.
34
 */
35
final class EventStreamRepository
36
{
37
    public const REVERT_MODE_SOFT_DELETE = 1;
38
39
    public const REVERT_MODE_DELETE = 2;
40
41
    /** @var EventStreamStore */
42
    private $store;
43
44
    /** @var EventSerializer */
45
    private $serializer;
46
47
    /** @var Snapshotter */
48
    private $snapshotter;
49
50
    /** @var LoggerInterface */
51
    private $logger;
52
53 14
    public function __construct(
54
        EventStreamStore $store,
55
        Snapshotter $snapshotter,
56
        EventSerializer $serializer,
57
        ?LoggerInterface $logger = null
58
    ) {
59 14
        $this->store       = $store;
60 14
        $this->snapshotter = $snapshotter;
61 14
        $this->serializer  = $serializer;
62 14
        $this->logger      = $logger ?? new NullLogger();
63 14
    }
64
65
    /**
66
     * Load aggregate.
67
     *
68
     * Returns \ServiceBus\EventSourcing\Aggregate|null
69
     *
70
     * @throws \ServiceBus\Common\Exceptions\DateTimeException
71
     * @throws \ServiceBus\Common\Exceptions\ReflectionApiException
72
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
73
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
74
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
75
     */
76 2
    public function load(AggregateId $id): Promise
77
    {
78 2
        return call(
79
            function() use ($id): \Generator
80
            {
81 2
                $idValue = $id->toString();
82 2
                $idClass = \get_class($id);
83
84 2
                $this->logger->debug('Load aggregate with id "{aggregateIdClass}:{aggregateId}"', [
85 2
                    'aggregateIdClass' => $idClass,
86 2
                    'aggregateId'      => $idValue,
87
                ]);
88
89
                try
90
                {
91 2
                    $aggregate         = null;
92 2
                    $fromStreamVersion = Aggregate::START_PLAYHEAD_INDEX;
93
94
                    /** @var \ServiceBus\EventSourcing\Snapshots\Snapshot|null $loadedSnapshot */
95 2
                    $loadedSnapshot = yield $this->snapshotter->load($id);
96
97 2
                    if (null !== $loadedSnapshot)
98
                    {
99 1
                        $aggregate         = $loadedSnapshot->aggregate;
100 1
                        $fromStreamVersion = $aggregate->version() + 1;
101
102 1
                        $this->logger->debug(
103 1
                            'Found a snapshot of the state of the aggregate with the identifier "{aggregateIdClass}:{aggregateId}" on version "{aggregateVersion}"',
104
                            [
105 1
                                'aggregateIdClass' => $idClass,
106 1
                                'aggregateId'      => $idValue,
107 1
                                'aggregateVersion' => $aggregate->version(),
108
                            ]
109
                        );
110
                    }
111
112
                    /** @var \ServiceBus\EventSourcing\EventStream\Store\StoredAggregateEventStream|null $storedEventStream */
113 2
                    $storedEventStream = yield $this->store->load($id, $fromStreamVersion);
114
115 2
                    $aggregate = $this->restoreStream($aggregate, $storedEventStream);
116
117 2
                    return $aggregate;
118
                }
119
                catch (\Throwable $throwable)
120
                {
121
                    $this->logger->debug('Load aggregate with id "{aggregateIdClass}:{aggregateId}" failed', [
122
                        'aggregateIdClass' => $idClass,
123
                        'aggregateId'      => $idValue,
124
                        'throwableMessage' => throwableMessage($throwable),
125
                        'throwablePoint'   => \sprintf('%s:%d', $throwable->getFile(), $throwable->getLine()),
126
                    ]);
127
128
                    throw $throwable;
129
                }
130
                finally
131
                {
132
                    /** @psalm-suppress PossiblyUndefinedVariable */
133 2
                    unset($storedEventStream, $loadedSnapshot, $fromStreamVersion);
134
                }
135 2
            }
136
        );
137
    }
138
139
    /**
140
     * Save a new event stream.
141
     *
142
     * Returns array<int, object>
143
     *
144
     * @throws \ServiceBus\Common\Exceptions\DateTimeException
145
     * @throws \ServiceBus\Common\Exceptions\ReflectionApiException
146
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
147
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
148
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
149
     * @throws \ServiceBus\Storage\Common\Exceptions\UniqueConstraintViolationCheckFailed
150
     */
151 7
    public function save(Aggregate $aggregate): Promise
152
    {
153 7
        return call(
154
            function() use ($aggregate): \Generator
155
            {
156 7
                $id = $aggregate->id();
157
158 7
                $idValue = $id->toString();
159 7
                $idClass = \get_class($id);
160
161 7
                $this->logger->debug('Save new aggregate with identifier "{aggregateIdClass}:{aggregateId}"', [
162 7
                    'aggregateIdClass' => $idClass,
163 7
                    'aggregateId'      => $idValue,
164
                ]);
165
166
                try
167
                {
168
                    /**
169
                     * @psalm-var array<int, object> $raisedEvents
170
                     *
171
                     * @var object[] $raisedEvents
172
                     */
173 7
                    $raisedEvents = yield from $this->doStore($aggregate, true);
174
175 7
                    return $raisedEvents;
176
                }
177 1
                catch (\Throwable $throwable)
178
                {
179 1
                    $this->logger->debug('Save new aggregate with identifier "{aggregateIdClass}:{aggregateId}" failed', [
180 1
                        'aggregateIdClass' => $idClass,
181 1
                        'aggregateId'      => $idValue,
182 1
                        'throwableMessage' => throwableMessage($throwable),
183 1
                        'throwablePoint'   => \sprintf('%s:%d', $throwable->getFile(), $throwable->getLine()),
184
                    ]);
185
186 1
                    throw $throwable;
187
                }
188 7
            }
189
        );
190
    }
191
192
    /**
193
     * Update existent event stream (append events).
194
     *
195
     * Returns array<int, object>
196
     *
197
     * @throws \ServiceBus\Common\Exceptions\DateTimeException
198
     * @throws \ServiceBus\Common\Exceptions\ReflectionApiException
199
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
200
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
201
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
202
     */
203 5
    public function update(Aggregate $aggregate): Promise
204
    {
205 5
        return call(
206
            function() use ($aggregate): \Generator
207
            {
208 5
                $id = $aggregate->id();
209
210 5
                $idValue = $id->toString();
211 5
                $idClass = \get_class($id);
212
213 5
                $this->logger->debug('Adding events to an existing stream with identifier "{aggregateIdClass}:{aggregateId}"', [
214 5
                    'aggregateIdClass' => $idClass,
215 5
                    'aggregateId'      => $idValue,
216
                ]);
217
218
                try
219
                {
220
                    /**
221
                     * @psalm-var array<int, object> $raisedEvents
222
                     *
223
                     * @var object[] $raisedEvents
224
                     */
225 5
                    $raisedEvents = yield from $this->doStore($aggregate, false);
226
227 5
                    return $raisedEvents;
228
                }
229
                catch (\Throwable $throwable)
230
                {
231
                    $this->logger->debug('Adding events to an existing stream with identifier "{aggregateIdClass}:{aggregateId}', [
232
                        'aggregateIdClass' => $idClass,
233
                        'aggregateId'      => $idValue,
234
                        'throwableMessage' => throwableMessage($throwable),
235
                        'throwablePoint'   => \sprintf('%s:%d', $throwable->getFile(), $throwable->getLine()),
236
                    ]);
237
238
                    throw $throwable;
239
                }
240 5
            }
241
        );
242
    }
243
244
    /**
245
     * Revert aggregate to specified version.
246
     *
247
     * Returns \ServiceBus\EventSourcing\Aggregate
248
     *
249
     * Mode options:
250
     *   - 1 (self::REVERT_MODE_SOFT_DELETE): Mark tail events as deleted (soft deletion). There may be version
251
     *   conflicts in some situations
252
     *   - 2 (self::REVERT_MODE_DELETE): Removes tail events from the database (the best option)
253
     *
254
     * @throws \ServiceBus\Common\Exceptions\DateTimeException
255
     * @throws \ServiceBus\Common\Exceptions\ReflectionApiException
256
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
257
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
258
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
259
     */
260 4
    public function revert(Aggregate $aggregate, int $toVersion, int $mode = self::REVERT_MODE_SOFT_DELETE): Promise
261
    {
262 4
        return call(
263
            function() use ($aggregate, $toVersion, $mode): \Generator
264
            {
265 4
                $id = $aggregate->id();
266
267 4
                $idValue = $id->toString();
268 4
                $idClass = \get_class($id);
269
270 4
                $this->logger->debug('Rollback of aggregate with identifier "{aggregateIdClass}:{aggregateId}" to version "{aggregateVersion}"', [
271 4
                    'aggregateIdClass' => $idClass,
272 4
                    'aggregateId'      => $idValue,
273 4
                    'aggregateVersion' => $toVersion,
274
                ]);
275
276
                try
277
                {
278 4
                    yield $this->store->revert($aggregate->id(), $toVersion, self::REVERT_MODE_DELETE === $mode);
279
280
                    /** @var StoredAggregateEventStream|null $storedEventStream */
281 3
                    $storedEventStream = yield $this->store->load($aggregate->id());
282
283
                    /** @var Aggregate $aggregate */
284 3
                    $aggregate = $this->restoreStream(null, $storedEventStream);
285
286 3
                    yield $this->snapshotter->store(new Snapshot($aggregate, $aggregate->version()));
287
288 3
                    return $aggregate;
289
                }
290 2
                catch (\Throwable $throwable)
291
                {
292 2
                    $this->logger->debug('Error when rolling back the version of the aggregate with the identifier "{aggregateIdClass}:{aggregateId}', [
293 2
                        'aggregateIdClass' => $idClass,
294 2
                        'aggregateId'      => $idValue,
295 2
                        'throwableMessage' => throwableMessage($throwable),
296 2
                        'throwablePoint'   => \sprintf('%s:%d', $throwable->getFile(), $throwable->getLine()),
297
                    ]);
298
299 2
                    throw $throwable;
300
                }
301
                finally
302
                {
303
                    /** @psalm-suppress PossiblyUndefinedVariable */
304 4
                    unset($storedEventStream);
305
                }
306 4
            }
307
        );
308
    }
309
310
    /**
311
     * @throws \ServiceBus\Common\Exceptions\DateTimeException
312
     * @throws \ServiceBus\Common\Exceptions\ReflectionApiException
313
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
314
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
315
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
316
     * @throws \ServiceBus\Storage\Common\Exceptions\UniqueConstraintViolationCheckFailed
317
     */
318 7
    private function doStore(Aggregate $aggregate, bool $isNew): \Generator
319
    {
320
        /** @var \ServiceBus\EventSourcing\EventStream\AggregateEventStream $eventStream */
321 7
        $eventStream    = invokeReflectionMethod($aggregate, 'makeStream');
322 7
        $receivedEvents = $eventStream->originEvents;
323
324 7
        $storedEventStream = streamToStoredRepresentation($this->serializer, $eventStream);
325
326
        /** @noinspection PhpUnnecessaryLocalVariableInspection */
327 7
        $promise = $isNew
328 7
            ? $this->store->save($storedEventStream)
329 7
            : $this->store->append($storedEventStream);
330
331 7
        yield $promise;
332
333
        /** @var \ServiceBus\EventSourcing\Snapshots\Snapshot|null $loadedSnapshot */
334 7
        $loadedSnapshot = yield $this->snapshotter->load($aggregate->id());
335
336 7
        if ($this->snapshotter->snapshotMustBeCreated($aggregate, $loadedSnapshot))
337
        {
338 7
            yield $this->snapshotter->store(new Snapshot($aggregate, $aggregate->version()));
339
        }
340
341 7
        unset($eventStream, $loadedSnapshot, $storedEventStream);
342
343 7
        return $receivedEvents;
344
    }
345
346
    /**
347
     * Restore the aggregate from the event stream/Add missing events to the aggregate from the snapshot.
348
     *
349
     * @throws \ServiceBus\Common\Exceptions\DateTimeException
350
     * @throws \ServiceBus\EventSourcing\EventStream\Serializer\Exceptions\SerializeEventFailed
351
     * @throws \ServiceBus\Common\Exceptions\ReflectionApiException
352
     */
353 5
    private function restoreStream(?Aggregate $aggregate, ?StoredAggregateEventStream $storedEventStream): ?Aggregate
354
    {
355 5
        if (null === $storedEventStream)
356
        {
357
            return null;
358
        }
359
360 5
        $eventStream = streamToDomainRepresentation($this->serializer, $storedEventStream);
361
362 5
        if (null === $aggregate)
363
        {
364
            /**
365
             * @noinspection CallableParameterUseCaseInTypeContextInspection
366
             *
367
             * @var Aggregate $aggregate
368
             */
369 4
            $aggregate = createWithoutConstructor($storedEventStream->aggregateClass);
370
        }
371
372 5
        invokeReflectionMethod($aggregate, 'appendStream', $eventStream);
373
374 5
        return $aggregate;
375
    }
376
}
377