Completed
Push — v3.3 ( 6c1cb0...8faaca )
by Masiukevich
48:12 queued 46:35
created

EventSourcingProvider::releaseMutex()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 12
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 2

Importance

Changes 0
Metric Value
cc 2
eloc 5
c 0
b 0
f 0
nc 2
nop 1
dl 0
loc 12
ccs 6
cts 6
cp 1
crap 2
rs 10
1
<?php
2
3
/**
4
 * Event Sourcing implementation module.
5
 *
6
 * @author  Maksim Masiukevich <[email protected]>
7
 * @license MIT
8
 * @license https://opensource.org/licenses/MIT
9
 */
10
11
declare(strict_types = 1);
12
13
namespace ServiceBus\EventSourcingModule;
14
15
use function Amp\call;
16
use Amp\Promise;
17
use ServiceBus\Common\Context\ServiceBusContext;
18
use ServiceBus\EventSourcing\Aggregate;
19
use ServiceBus\EventSourcing\AggregateId;
20
use ServiceBus\EventSourcing\EventStream\EventStreamRepository;
21
use ServiceBus\EventSourcingModule\Exceptions\DuplicateAggregate;
22
use ServiceBus\EventSourcingModule\Exceptions\LoadAggregateFailed;
23
use ServiceBus\EventSourcingModule\Exceptions\RevertAggregateVersionFailed;
24
use ServiceBus\EventSourcingModule\Exceptions\SaveAggregateFailed;
25
use ServiceBus\Mutex\InMemoryMutexFactory;
26
use ServiceBus\Mutex\Lock;
27
use ServiceBus\Mutex\MutexFactory;
28
use ServiceBus\Storage\Common\Exceptions\UniqueConstraintViolationCheckFailed;
29
30
/**
31
 *
32
 */
33
final class EventSourcingProvider
34
{
35
    /**
36
     * @var EventStreamRepository
37
     */
38
    private $repository;
39
40
    /**
41
     * List of loaded/added aggregates.
42
     *
43
     * @psalm-var array<string, string>
44
     *
45
     * @var array
46
     */
47
    private $aggregates = [];
48
49
    /**
50
     * Current locks collection.
51
     *
52
     * @psalm-var array<string, \ServiceBus\Mutex\Lock>
53
     *
54
     * @var Lock[]
55
     */
56
    private $locks = [];
57
58
    /**
59
     * Mutex creator.
60
     *
61
     * @var MutexFactory
62
     */
63
    private $mutexFactory;
64
65
    /**
66
     * EventSourcingProvider constructor.
67
     *
68
     * @param EventStreamRepository $repository
69
     * @param MutexFactory|null     $mutexFactory
70
     */
71 6
    public function __construct(EventStreamRepository $repository, ?MutexFactory $mutexFactory = null)
72
    {
73 6
        $this->repository   = $repository;
74 6
        $this->mutexFactory = $mutexFactory ?? new InMemoryMutexFactory();
75 6
    }
76
77
    /**
78
     * Load aggregate.
79
     *
80
     * @noinspection PhpDocRedundantThrowsInspection
81
     *
82
     * @param AggregateId $id
83
     *
84
     * @throws \ServiceBus\EventSourcingModule\Exceptions\LoadAggregateFailed
85
     *
86
     * @return Promise
87
     */
88 1
    public function load(AggregateId $id): Promise
89
    {
90
        /** @psalm-suppress InvalidArgument Incorrect psalm unpack parameters (...$args) */
91 1
        return call(
92
            function(AggregateId $id): \Generator
93
            {
94
                try
95
                {
96 1
                    yield from $this->setupMutex($id);
97
98
                    /** @var Aggregate|null $aggregate */
99 1
                    $aggregate = yield $this->repository->load($id);
100
101 1
                    if (null !== $aggregate)
102
                    {
103 1
                        $this->aggregates[$aggregate->id()->toString()] = \get_class($aggregate);
104
                    }
105
                    else
106
                    {
107
                        yield from $this->releaseMutex($id);
108
                    }
109
110 1
                    return $aggregate;
111
                }
112
                catch (\Throwable $throwable)
113
                {
114
                    yield from $this->releaseMutex($id);
115
116
                    throw LoadAggregateFailed::fromThrowable($throwable);
117
                }
118 1
            },
119 1
            $id
120
        );
121
    }
122
123
    /**
124
     * Save a new aggregate.
125
     *
126
     * @noinspection PhpDocRedundantThrowsInspection
127
     *
128
     * @param Aggregate         $aggregate
129
     * @param ServiceBusContext $context
130
     *
131
     * @throws \ServiceBus\EventSourcingModule\Exceptions\SaveAggregateFailed
132
     * @throws \ServiceBus\EventSourcingModule\Exceptions\DuplicateAggregate
133
     *
134
     * @return Promise
135
     */
136 4
    public function save(Aggregate $aggregate, ServiceBusContext $context): Promise
137
    {
138
        /** @psalm-suppress InvalidArgument Incorrect psalm unpack parameters (...$args) */
139 4
        return call(
140
            function(Aggregate $aggregate, ServiceBusContext $context): \Generator
141
            {
142
                try
143
                {
144
                    /** The aggregate hasn't been loaded before, which means it is new */
145 4
                    if (false === isset($this->aggregates[$aggregate->id()->toString()]))
146
                    {
147
                        /**
148
                         * @psalm-var  array<int, object> $events
149
                         *
150
                         * @var object[] $events
151
                         */
152 4
                        $events = yield $this->repository->save($aggregate);
153
154 4
                        $this->aggregates[$aggregate->id()->toString()] = \get_class($aggregate);
155
                    }
156
                    else
157
                    {
158
                        /**
159
                         * @psalm-var array<int, object> $events
160
                         *
161
                         * @var object[] $events
162
                         */
163 3
                        $events = yield $this->repository->update($aggregate);
164
                    }
165
166 4
                    $promises = [];
167
168
                    /** @var object $event */
169 4
                    foreach ($events as $event)
170
                    {
171 4
                        $promises[] = $context->delivery($event);
172
                    }
173
174 4
                    yield $promises;
175
                }
176 1
                catch (UniqueConstraintViolationCheckFailed $exception)
177
                {
178 1
                    throw DuplicateAggregate::create($aggregate->id());
179
                }
180
                catch (\Throwable $throwable)
181
                {
182
                    throw SaveAggregateFailed::fromThrowable($throwable);
183
                }
184
                finally
185 4
                {
186 4
                    yield from $this->releaseMutex($aggregate->id());
187
                }
188 4
            },
189 4
            $aggregate,
190 4
            $context
191
        );
192
    }
193
194
    /**
195
     * Revert aggregate to specified version.
196
     *
197
     * Mode options:
198
     *   - 1 (EventStreamRepository::REVERT_MODE_SOFT_DELETE): Mark tail events as deleted (soft deletion). There may
199
     *   be version conflicts in some situations
200
     *   - 2 (EventStreamRepository::REVERT_MODE_DELETE): Removes tail events from the database (the best option)
201
     *
202
     * @noinspection   PhpDocRedundantThrowsInspection
203
     * @psalm-suppress MixedTypeCoercion Incorrect resolving the value of the promise
204
     *
205
     * @param Aggregate $aggregate
206
     * @param int       $toVersion
207
     * @param int       $mode
208
     *
209
     * @throws \ServiceBus\EventSourcingModule\Exceptions\RevertAggregateVersionFailed
210
     *
211
     * @return Promise<\ServiceBus\EventSourcing\Aggregate>
212
     */
213 3
    public function revert(
214
        Aggregate $aggregate,
215
        int $toVersion,
216
        ?int $mode = null
217
    ): Promise {
218 3
        $mode = $mode ?? EventStreamRepository::REVERT_MODE_SOFT_DELETE;
219
220
        /** @psalm-suppress InvalidArgument Incorrect psalm unpack parameters (...$args) */
221 3
        return call(
222
            function(Aggregate $aggregate, int $toVersion, int $mode): \Generator
223
            {
224 3
                yield from $this->setupMutex($aggregate->id());
225
226
                try
227
                {
228
                    /** @var Aggregate $aggregate */
229 3
                    $aggregate = yield $this->repository->revert($aggregate, $toVersion, $mode);
230
231 2
                    return $aggregate;
232
                }
233 2
                catch (\Throwable $throwable)
234
                {
235 2
                    throw RevertAggregateVersionFailed::fromThrowable($throwable);
236
                }
237
                finally
238
                {
239 3
                    yield from $this->releaseMutex($aggregate->id());
240
                }
241 3
            },
242 3
            $aggregate,
243 3
            $toVersion,
244 3
            $mode
245
        );
246
    }
247
248
    /**
249
     * @param AggregateId $id
250
     *
251
     * @return \Generator
252
     */
253 4
    private function setupMutex(AggregateId $id): \Generator
254
    {
255 4
        $mutexKey = createAggregateMutexKey($id);
256
257 4
        if (false === isset($this->locks[$mutexKey]))
258
        {
259 4
            $mutex = $this->mutexFactory->create($mutexKey);
260
261
            /** @psalm-suppress InvalidPropertyAssignmentValue */
262 4
            $this->locks[$mutexKey] = yield $mutex->acquire();
263
        }
264 4
    }
265
266
    /**
267
     * @param AggregateId $id
268
     *
269
     * @return \Generator
270
     */
271 5
    private function releaseMutex(AggregateId $id): \Generator
272
    {
273 5
        $mutexKey = createAggregateMutexKey($id);
274
275 5
        if (true === isset($this->locks[$mutexKey]))
276
        {
277
            /** @var Lock $lock */
278 3
            $lock = $this->locks[$mutexKey];
279
280 3
            yield $lock->release();
281
282 3
            unset($this->locks[$mutexKey]);
283
        }
284 5
    }
285
}
286