Passed
Push — v3.0 ( 2d96b5...8256f3 )
by Masiukevich
02:53
created

EventStreamRepository::revert()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 25
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 14
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 13
nc 1
nop 3
dl 0
loc 25
ccs 14
cts 14
cp 1
crap 1
rs 9.8333
c 0
b 0
f 0
1
<?php
2
3
/**
4
 * Event Sourcing implementation.
5
 *
6
 * @author  Maksim Masiukevich <[email protected]>
7
 * @license MIT
8
 * @license https://opensource.org/licenses/MIT
9
 */
10
11
declare(strict_types = 1);
12
13
namespace ServiceBus\EventSourcing\EventStream;
14
15
use function Amp\call;
16
use function ServiceBus\Common\createWithoutConstructor;
17
use function ServiceBus\Common\invokeReflectionMethod;
18
use function ServiceBus\EventSourcing\EventStream\Store\streamToDomainRepresentation;
19
use function ServiceBus\EventSourcing\EventStream\Store\streamToStoredRepresentation;
20
use Amp\Promise;
21
use ServiceBus\EventSourcing\Aggregate;
22
use ServiceBus\EventSourcing\AggregateId;
23
use ServiceBus\EventSourcing\EventStream\Serializer\DefaultEventSerializer;
24
use ServiceBus\EventSourcing\EventStream\Serializer\EventSerializer;
25
use ServiceBus\EventSourcing\EventStream\Store\EventStreamStore;
26
use ServiceBus\EventSourcing\EventStream\Store\StoredAggregateEventStream;
27
use ServiceBus\EventSourcing\Snapshots\Snapshot;
28
use ServiceBus\EventSourcing\Snapshots\Snapshotter;
29
30
/**
31
 *
32
 */
33
final class EventStreamRepository
34
{
35
    public const REVERT_MODE_SOFT_DELETE = 1;
36
37
    public const REVERT_MODE_DELETE      = 2;
38
39
    /**
40
     * @var EventStreamStore
41
     */
42
    private $store;
43
44
    /**
45
     * @var EventSerializer
46
     */
47
    private $serializer;
48
49
    /**
50
     * @var Snapshotter
51
     */
52
    private $snapshotter;
53
54
    /**
55
     * @param EventStreamStore     $store
56
     * @param Snapshotter          $snapshotter
57
     * @param EventSerializer|null $serializer
58
     */
59 8
    public function __construct(EventStreamStore $store, Snapshotter $snapshotter, ?EventSerializer $serializer = null)
60
    {
61 8
        $this->store       = $store;
62 8
        $this->snapshotter = $snapshotter;
63 8
        $this->serializer  = $serializer ?? new DefaultEventSerializer();
64 8
    }
65
66
    /**
67
     * Load aggregate.
68
     *
69
     * @psalm-suppress MixedTypeCoercion Incorrect resolving the value of the promise
70
     *
71
     * @param AggregateId $id
72
     *
73
     * @return Promise<\ServiceBus\EventSourcing\Aggregate|null>
74
     */
75 2
    public function load(AggregateId $id): Promise
76
    {
77
        /** @psalm-suppress InvalidArgument Incorrect psalm unpack parameters (...$args) */
78 2
        return call(
79
            function(AggregateId $id): \Generator
80
            {
81 2
                $aggregate         = null;
82 2
                $fromStreamVersion = Aggregate::START_PLAYHEAD_INDEX;
83
84
                /** @var \ServiceBus\EventSourcing\Snapshots\Snapshot|null $loadedSnapshot */
85 2
                $loadedSnapshot = yield $this->snapshotter->load($id);
86
87 2
                if (null !== $loadedSnapshot)
88
                {
89 1
                    $aggregate         = $loadedSnapshot->aggregate;
90 1
                    $fromStreamVersion = $aggregate->version() + 1;
91
                }
92
93
                /** @var \ServiceBus\EventSourcing\EventStream\Store\StoredAggregateEventStream|null $storedEventStream */
94 2
                $storedEventStream = yield $this->store->load($id, $fromStreamVersion);
95
96 2
                $aggregate = $this->restoreStream($aggregate, $storedEventStream);
97
98 2
                unset($storedEventStream, $loadedSnapshot, $fromStreamVersion);
99
100 2
                return $aggregate;
101 2
            },
102 2
            $id
103
        );
104
    }
105
106
    /**
107
     * Save a new event stream.
108
     *
109
     * @psalm-suppress MixedTypeCoercion Incorrect resolving the value of the promise
110
     *
111
     * @param Aggregate $aggregate
112
     *
113
     * @return Promise<array<int, object>>
114
     */
115 7
    public function save(Aggregate $aggregate): Promise
116
    {
117
        /** @psalm-suppress InvalidArgument Incorrect psalm unpack parameters (...$args) */
118 7
        return call(
119
            function(Aggregate $aggregate): \Generator
120
            {
121
                /**
122
                 * @psalm-var array<int, object> $raisedEvents
123
                 *
124
                 * @var object[] $raisedEvents
125
                 */
126 7
                $raisedEvents = yield from $this->doStore($aggregate, true);
127
128 7
                return $raisedEvents;
129 7
            },
130 7
            $aggregate
131
        );
132
    }
133
134
    /**
135
     * Update existent event stream (append events).
136
     *
137
     * @psalm-suppress MixedTypeCoercion Incorrect resolving the value of the promise
138
     *
139
     * @param Aggregate $aggregate
140
     *
141
     * @return Promise<array<int, object>>
142
     */
143 5
    public function update(Aggregate $aggregate): Promise
144
    {
145
        /** @psalm-suppress InvalidArgument Incorrect psalm unpack parameters (...$args) */
146 5
        return call(
147
            function(Aggregate $aggregate): \Generator
148
            {
149
                /**
150
                 * @psalm-var array<int, object> $raisedEvents
151
                 *
152
                 * @var object[] $raisedEvents
153
                 */
154 5
                $raisedEvents = yield from $this->doStore($aggregate, false);
155
156 5
                return $raisedEvents;
157 5
            },
158 5
            $aggregate
159
        );
160
    }
161
162
    /**
163
     * Revert aggregate to specified version.
164
     *
165
     * Mode options:
166
     *   - 1 (self::REVERT_MODE_SOFT_DELETE): Mark tail events as deleted (soft deletion). There may be version
167
     *   conflicts in some situations
168
     *   - 2 (self::REVERT_MODE_DELETE): Removes tail events from the database (the best option)
169
     *
170
     * @psalm-suppress MixedTypeCoercion Incorrect resolving the value of the promise
171
     *
172
     * @param Aggregate $aggregate
173
     * @param int       $toVersion
174
     * @param int       $mode
175
     *
176
     * @return Promise<\ServiceBus\EventSourcing\Aggregate>
177
     */
178 4
    public function revert(Aggregate $aggregate, int $toVersion, int $mode = self::REVERT_MODE_SOFT_DELETE): Promise
179
    {
180
        /** @psalm-suppress InvalidArgument Incorrect psalm unpack parameters (...$args) */
181 4
        return call(
182
            function(Aggregate $aggregate, int $toVersion, int $mode): \Generator
183
            {
184 4
                yield $this->store->revert(
185 4
                    $aggregate->id(),
186 4
                    $toVersion,
187 4
                    self::REVERT_MODE_DELETE === $mode
188
                );
189
190
                /** @var StoredAggregateEventStream|null $storedEventStream */
191 3
                $storedEventStream = yield $this->store->load($aggregate->id());
192
193
                /** @var Aggregate $aggregate */
194 3
                $aggregate = $this->restoreStream(null, $storedEventStream);
195
196 3
                yield $this->snapshotter->store(Snapshot::create($aggregate, $aggregate->version()));
197
198 3
                return $aggregate;
199 4
            },
200 4
            $aggregate,
201 4
            $toVersion,
202 4
            $mode
203
        );
204
    }
205
206
    /**
207
     * @param Aggregate $aggregate
208
     * @param bool      $isNew
209
     *
210
     * @throws \ServiceBus\Common\Exceptions\DateTimeException
211
     * @throws \ServiceBus\Common\Exceptions\ReflectionApiException
212
     * @throws \ServiceBus\Storage\Common\Exceptions\ConnectionFailed
213
     * @throws \ServiceBus\Storage\Common\Exceptions\InvalidConfigurationOptions
214
     * @throws \ServiceBus\Storage\Common\Exceptions\StorageInteractingFailed
215
     * @throws \ServiceBus\Storage\Common\Exceptions\UniqueConstraintViolationCheckFailed
216
     *
217
     * @return \Generator
218
     */
219 7
    private function doStore(Aggregate $aggregate, bool $isNew): \Generator
220
    {
221
        /** @var \ServiceBus\EventSourcing\EventStream\AggregateEventStream $eventStream */
222 7
        $eventStream    = invokeReflectionMethod($aggregate, 'makeStream');
223 7
        $receivedEvents = $eventStream->originEvents;
224
225 7
        $storedEventStream = streamToStoredRepresentation($this->serializer, $eventStream);
226
227 7
        $promise = true === $isNew
228 7
            ? $this->store->save($storedEventStream)
229 7
            : $this->store->append($storedEventStream);
230
231 7
        yield $promise;
232
233
        /** @var \ServiceBus\EventSourcing\Snapshots\Snapshot|null $loadedSnapshot */
234 7
        $loadedSnapshot = yield $this->snapshotter->load($aggregate->id());
235
236 7
        if (true === $this->snapshotter->snapshotMustBeCreated($aggregate, $loadedSnapshot))
237
        {
238 7
            yield $this->snapshotter->store(Snapshot::create($aggregate, $aggregate->version()));
239
        }
240
241 7
        unset($eventStream, $loadedSnapshot, $storedEventStream);
242
243 7
        return $receivedEvents;
244
    }
245
246
    /**
247
     * Restore the aggregate from the event stream/Add missing events to the aggregate from the snapshot.
248
     *
249
     * @param Aggregate|null                  $aggregate
250
     * @param StoredAggregateEventStream|null $storedEventStream
251
     *
252
     * @throws \ServiceBus\Common\Exceptions\DateTimeException
253
     * @throws \ServiceBus\EventSourcing\EventStream\Serializer\Exceptions\SerializeEventFailed
254
     * @throws \ServiceBus\Common\Exceptions\ReflectionApiException
255
     *
256
     * @return Aggregate|null
257
     */
258 5
    private function restoreStream(?Aggregate $aggregate, ?StoredAggregateEventStream $storedEventStream): ?Aggregate
259
    {
260 5
        if (null === $storedEventStream)
261
        {
262
            return null;
263
        }
264
265 5
        $eventStream = streamToDomainRepresentation($this->serializer, $storedEventStream);
266
267 5
        if (null === $aggregate)
268
        {
269
            /** @noinspection CallableParameterUseCaseInTypeContextInspection */
270
            /** @var Aggregate $aggregate */
271 4
            $aggregate = createWithoutConstructor($storedEventStream->aggregateClass);
272
        }
273
274 5
        invokeReflectionMethod($aggregate, 'appendStream', $eventStream);
275
276 5
        return $aggregate;
277
    }
278
}
279