Completed
Push — v3.2 ( 440511...e1d649 )
by Masiukevich
04:34
created

EventSourcingProvider::revert()   A

Complexity

Conditions 2
Paths 1

Size

Total Lines 36
Code Lines 14

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 13
CRAP Score 2

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 2
eloc 14
c 1
b 0
f 0
nc 1
nop 3
dl 0
loc 36
ccs 13
cts 13
cp 1
crap 2
rs 9.7998
1
<?php
2
3
/**
4
 * Event Sourcing implementation module.
5
 *
6
 * @author  Maksim Masiukevich <[email protected]>
7
 * @license MIT
8
 * @license https://opensource.org/licenses/MIT
9
 */
10
11
declare(strict_types = 1);
12
13
namespace ServiceBus\EventSourcingModule;
14
15
use function Amp\call;
16
use Amp\Promise;
17
use ServiceBus\Common\Context\ServiceBusContext;
18
use ServiceBus\EventSourcing\Aggregate;
19
use ServiceBus\EventSourcing\AggregateId;
20
use ServiceBus\EventSourcing\EventStream\EventStreamRepository;
21
use ServiceBus\EventSourcingModule\Exceptions\DuplicateAggregate;
22
use ServiceBus\EventSourcingModule\Exceptions\LoadAggregateFailed;
23
use ServiceBus\EventSourcingModule\Exceptions\RevertAggregateVersionFailed;
24
use ServiceBus\EventSourcingModule\Exceptions\SaveAggregateFailed;
25
use ServiceBus\Mutex\InMemoryMutexFactory;
26
use ServiceBus\Mutex\Lock;
27
use ServiceBus\Mutex\MutexFactory;
28
use ServiceBus\Storage\Common\Exceptions\UniqueConstraintViolationCheckFailed;
29
30
/**
31
 *
32
 */
33
final class EventSourcingProvider
34
{
35
    /**
36
     * @var EventStreamRepository
37
     */
38
    private $repository;
39
40
    /**
41
     * List of loaded/added aggregates.
42
     *
43
     * @psalm-var array<string, string>
44
     *
45
     * @var array
46
     */
47
    private $aggregates = [];
48
49
    /**
50
     * Current locks collection.
51
     *
52
     * @psalm-var array<string, \ServiceBus\Mutex\Lock>
53
     *
54
     * @var Lock[]
55
     */
56
    private $locks = [];
57
58
    /**
59
     * Mutex creator.
60
     *
61
     * @var MutexFactory
62
     */
63
    private $mutexFactory;
64
65
    /**
66
     * EventSourcingProvider constructor.
67
     *
68
     * @param EventStreamRepository $repository
69
     * @param MutexFactory|null     $mutexFactory
70
     */
71 6
    public function __construct(EventStreamRepository $repository, ?MutexFactory $mutexFactory = null)
72
    {
73 6
        $this->repository   = $repository;
74 6
        $this->mutexFactory = $mutexFactory ?? new InMemoryMutexFactory();
75 6
    }
76
77
    /**
78
     * Load aggregate.
79
     *
80
     * @noinspection PhpDocRedundantThrowsInspection
81
     *
82
     * @param AggregateId $id
83
     *
84
     * @throws \ServiceBus\EventSourcingModule\Exceptions\LoadAggregateFailed
85
     *
86
     * @return Promise
87
     *
88
     */
89 1
    public function load(AggregateId $id): Promise
90
    {
91
        /** @psalm-suppress InvalidArgument Incorrect psalm unpack parameters (...$args) */
92 1
        return call(
93
            function(AggregateId $id): \Generator
94
            {
95
                try
96
                {
97 1
                    yield from $this->setupMutex($id);
98
99
                    /**
100
                     * @psalm-suppress TooManyTemplateParams Wrong Promise template
101
                     *
102
                     * @var Aggregate|null $aggregate
103
                     */
104 1
                    $aggregate = yield $this->repository->load($id);
105
106 1
                    if (null !== $aggregate)
107
                    {
108 1
                        $this->aggregates[(string) $aggregate->id()] = \get_class($aggregate);
109
                    }
110
                    else
111
                    {
112
                        yield from $this->releaseMutex($id);
113
                    }
114
115 1
                    return $aggregate;
116
                }
117
                catch (\Throwable $throwable)
118
                {
119
                    yield from $this->releaseMutex($id);
120
121
                    throw LoadAggregateFailed::fromThrowable($throwable);
122
                }
123 1
            },
124 1
            $id
125
        );
126
    }
127
128
    /**
129
     * Save a new aggregate.
130
     *
131
     * @noinspection PhpDocRedundantThrowsInspection
132
     *
133
     * @param Aggregate         $aggregate
134
     * @param ServiceBusContext $context
135
     *
136
     * @throws \ServiceBus\EventSourcingModule\Exceptions\SaveAggregateFailed
137
     * @throws \ServiceBus\EventSourcingModule\Exceptions\DuplicateAggregate
138
     *
139
     * @return Promise
140
     *
141
     */
142 4
    public function save(Aggregate $aggregate, ServiceBusContext $context): Promise
143
    {
144
        /** @psalm-suppress InvalidArgument Incorrect psalm unpack parameters (...$args) */
145 4
        return call(
146
            function(Aggregate $aggregate, ServiceBusContext $context): \Generator
147
            {
148
                try
149
                {
150
                    /** The aggregate hasn't been loaded before, which means it is new */
151 4
                    if (false === isset($this->aggregates[(string) $aggregate->id()]))
152
                    {
153
                        /**
154
                         * @psalm-suppress TooManyTemplateParams Wrong Promise template
155
                         * @psalm-var      array<int, object> $events
156
                         *
157
                         * @var object[] $events
158
                         */
159 4
                        $events = yield $this->repository->save($aggregate);
160
161 4
                        $this->aggregates[(string) $aggregate->id()] = \get_class($aggregate);
162
                    }
163
                    else
164
                    {
165
                        /**
166
                         * @psalm-suppress TooManyTemplateParams Wrong Promise template
167
                         * @psalm-var      array<int, object> $events
168
                         *
169
                         * @var object[] $events
170
                         */
171 3
                        $events = yield $this->repository->update($aggregate);
172
                    }
173
174 4
                    $promises = [];
175
176
                    /** @var object $event */
177 4
                    foreach ($events as $event)
178
                    {
179 4
                        $promises[] = $context->delivery($event);
180
                    }
181
182 4
                    yield $promises;
183
                }
184 1
                catch (UniqueConstraintViolationCheckFailed $exception)
185
                {
186 1
                    throw DuplicateAggregate::create($aggregate->id());
187
                }
188
                catch (\Throwable $throwable)
189
                {
190
                    throw SaveAggregateFailed::fromThrowable($throwable);
191
                }
192
                finally
193 4
                {
194 4
                    yield from $this->releaseMutex($aggregate->id());
195
                }
196 4
            },
197 4
            $aggregate,
198 4
            $context
199
        );
200
    }
201
202
    /**
203
     * Revert aggregate to specified version.
204
     *
205
     * Mode options:
206
     *   - 1 (EventStreamRepository::REVERT_MODE_SOFT_DELETE): Mark tail events as deleted (soft deletion). There may
207
     *   be version conflicts in some situations
208
     *   - 2 (EventStreamRepository::REVERT_MODE_DELETE): Removes tail events from the database (the best option)
209
     *
210
     * @noinspection   PhpDocRedundantThrowsInspection
211
     * @psalm-suppress MixedTypeCoercion Incorrect resolving the value of the promise
212
     *
213
     * @param Aggregate $aggregate
214
     * @param int       $toVersion
215
     * @param int       $mode
216
     *
217
     * @throws \ServiceBus\EventSourcingModule\Exceptions\RevertAggregateVersionFailed
218
     *
219
     * @return Promise<\ServiceBus\EventSourcing\Aggregate>
220
     *
221
     */
222 3
    public function revert(
223
        Aggregate $aggregate,
224
        int $toVersion,
225
        ?int $mode = null
226
    ): Promise {
227 3
        $mode = $mode ?? EventStreamRepository::REVERT_MODE_SOFT_DELETE;
228
229
        /** @psalm-suppress InvalidArgument Incorrect psalm unpack parameters (...$args) */
230 3
        return call(
231
            function(Aggregate $aggregate, int $toVersion, int $mode): \Generator
232
            {
233 3
                yield from $this->setupMutex($aggregate->id());
234
235
                try
236
                {
237
                    /**
238
                     * @psalm-suppress TooManyTemplateParams Wrong Promise template
239
                     *
240
                     * @var Aggregate $aggregate
241
                     */
242 3
                    $aggregate = yield $this->repository->revert($aggregate, $toVersion, $mode);
243
244 2
                    return $aggregate;
245
                }
246 2
                catch (\Throwable $throwable)
247
                {
248 2
                    throw RevertAggregateVersionFailed::fromThrowable($throwable);
249
                }
250
                finally
251
                {
252 3
                    yield from $this->releaseMutex($aggregate->id());
253
                }
254 3
            },
255 3
            $aggregate,
256 3
            $toVersion,
257 3
            $mode
258
        );
259
    }
260
261
    /**
262
     * @param AggregateId $id
263
     *
264
     * @return \Generator
265
     */
266 4
    private function setupMutex(AggregateId $id): \Generator
267
    {
268 4
        $mutexKey = createAggregateMutexKey($id);
269
270 4
        if (false === isset($this->locks[$mutexKey]))
271
        {
272 4
            $mutex = $this->mutexFactory->create($mutexKey);
273
274
            /**
275
             * @psalm-suppress TooManyTemplateParams
276
             * @psalm-suppress InvalidPropertyAssignmentValue
277
             */
278 4
            $this->locks[$mutexKey] = yield $mutex->acquire();
279
        }
280 4
    }
281
282
    /**
283
     * @param AggregateId $id
284
     *
285
     * @return \Generator
286
     */
287 5
    private function releaseMutex(AggregateId $id): \Generator
288
    {
289 5
        $mutexKey = createAggregateMutexKey($id);
290
291 5
        if (true === isset($this->locks[$mutexKey]))
292
        {
293
            /** @var Lock $lock */
294 3
            $lock = $this->locks[$mutexKey];
295
296
            /** @psalm-suppress TooManyTemplateParams */
297 3
            yield $lock->release();
298
299 3
            unset($this->locks[$mutexKey]);
300
        }
301 5
    }
302
}
303