Completed
Push — v4.0 ( d88c1e...d1a754 )
by Masiukevich
03:36
created

EventSourcingProvider   A

Complexity

Total Complexity 15

Size/Duplication

Total Lines 198
Duplicated Lines 0 %

Test Coverage

Coverage 7.14%

Importance

Changes 3
Bugs 2 Features 0
Metric Value
eloc 58
c 3
b 2
f 0
dl 0
loc 198
ccs 4
cts 56
cp 0.0714
rs 10
wmc 15

6 Methods

Rating   Name   Duplication   Size   Complexity  
A save() 0 50 5
A releaseMutex() 0 12 2
A setupMutex() 0 12 2
A load() 0 28 3
A revert() 0 27 2
A __construct() 0 4 1
1
<?php
2
3
/**
4
 * Event Sourcing implementation module.
5
 *
6
 * @author  Maksim Masiukevich <[email protected]>
7
 * @license MIT
8
 * @license https://opensource.org/licenses/MIT
9
 */
10
11
declare(strict_types = 1);
12
13
namespace ServiceBus\EventSourcingModule;
14
15
use ServiceBus\Mutex\InMemory\InMemoryMutexFactory;
16
use function Amp\call;
17
use Amp\Promise;
18
use ServiceBus\Common\Context\ServiceBusContext;
19
use ServiceBus\EventSourcing\Aggregate;
20
use ServiceBus\EventSourcing\AggregateId;
21
use ServiceBus\EventSourcing\EventStream\EventStreamRepository;
22
use ServiceBus\EventSourcingModule\Exceptions\DuplicateAggregate;
23
use ServiceBus\EventSourcingModule\Exceptions\LoadAggregateFailed;
24
use ServiceBus\EventSourcingModule\Exceptions\RevertAggregateVersionFailed;
25
use ServiceBus\EventSourcingModule\Exceptions\SaveAggregateFailed;
26
use ServiceBus\Mutex\Lock;
27
use ServiceBus\Mutex\MutexFactory;
28
use ServiceBus\Storage\Common\Exceptions\UniqueConstraintViolationCheckFailed;
29
30
/**
31
 *
32
 */
33
final class EventSourcingProvider
34
{
35
    /** @var EventStreamRepository */
36
    private $repository;
37
38
    /**
39
     * List of loaded/added aggregates.
40
     *
41
     * @psalm-var array<string, string>
42
     *
43
     * @var string[]
44
     */
45
    private $aggregates = [];
46
47
    /** @var MutexFactory */
48
    private $mutexFactory;
49
50
    /** @var Lock[] */
51
    private $lockCollection = [];
52
53 6
    public function __construct(EventStreamRepository $repository, ?MutexFactory $mutexFactory = null)
54
    {
55 6
        $this->repository   = $repository;
56 6
        $this->mutexFactory = $mutexFactory ?? new InMemoryMutexFactory();
57 6
    }
58
59
    /**
60
     * Load aggregate.
61
     *
62
     * Returns \ServiceBus\EventSourcing\Aggregate|null
63
     *
64
     * @throws \ServiceBus\EventSourcingModule\Exceptions\LoadAggregateFailed
65
     */
66
    public function load(AggregateId $id): Promise
67
    {
68
        return call(
69
            function () use ($id): \Generator
70
            {
71
                try
72
                {
73
                    yield from $this->setupMutex($id);
74
75
                    /** @var Aggregate|null $aggregate */
76
                    $aggregate = yield $this->repository->load($id);
77
78
                    if ($aggregate !== null)
79
                    {
80
                        $this->aggregates[$aggregate->id()->toString()] = \get_class($aggregate);
81
                    }
82
                    else
83
                    {
84
                        yield from $this->releaseMutex($id);
85
                    }
86
87
                    return $aggregate;
88
                }
89
                catch (\Throwable $throwable)
90
                {
91
                    yield from $this->releaseMutex($id);
92
93
                    throw LoadAggregateFailed::fromThrowable($throwable);
94
                }
95
            }
96
        );
97
    }
98
99
    /**
100
     * Save a new aggregate.
101
     *
102
     * @throws \ServiceBus\EventSourcingModule\Exceptions\SaveAggregateFailed
103
     * @throws \ServiceBus\EventSourcingModule\Exceptions\DuplicateAggregate
104
     */
105
    public function save(Aggregate $aggregate, ServiceBusContext $context): Promise
106
    {
107
        return call(
108
            function () use ($aggregate, $context): \Generator
109
            {
110
                try
111
                {
112
                    /** The aggregate hasn't been loaded before, which means it is new */
113
                    if (isset($this->aggregates[$aggregate->id()->toString()]) === false)
114
                    {
115
                        /**
116
                         * @psalm-var  array<int, object> $events
117
                         *
118
                         * @var object[] $events
119
                         */
120
                        $events = yield $this->repository->save($aggregate);
121
122
                        $this->aggregates[$aggregate->id()->toString()] = \get_class($aggregate);
123
                    }
124
                    else
125
                    {
126
                        /**
127
                         * @psalm-var array<int, object> $events
128
                         *
129
                         * @var object[] $events
130
                         */
131
                        $events = yield $this->repository->update($aggregate);
132
                    }
133
134
                    $promises = [];
135
136
                    /** @var object $event */
137
                    foreach ($events as $event)
138
                    {
139
                        $promises[] = $context->delivery($event);
140
                    }
141
142
                    yield $promises;
143
                }
144
                catch (UniqueConstraintViolationCheckFailed $exception)
145
                {
146
                    throw DuplicateAggregate::create($aggregate->id());
147
                }
148
                catch (\Throwable $throwable)
149
                {
150
                    throw SaveAggregateFailed::fromThrowable($throwable);
151
                }
152
                finally
153
                {
154
                    yield from $this->releaseMutex($aggregate->id());
155
                }
156
            }
157
        );
158
    }
159
160
    /**
161
     * Revert aggregate to specified version.
162
     *
163
     * Returns \ServiceBus\EventSourcing\Aggregate
164
     *
165
     * Mode options:
166
     *   - 1 (EventStreamRepository::REVERT_MODE_SOFT_DELETE): Mark tail events as deleted (soft deletion). There may
167
     *   be version conflicts in some situations
168
     *   - 2 (EventStreamRepository::REVERT_MODE_DELETE): Removes tail events from the database (the best option)
169
     *
170
     * @throws \ServiceBus\EventSourcingModule\Exceptions\RevertAggregateVersionFailed
171
     */
172
    public function revert(
173
        Aggregate $aggregate,
174
        int $toVersion,
175
        ?int $mode = null
176
    ): Promise {
177
        $mode = $mode ?? EventStreamRepository::REVERT_MODE_SOFT_DELETE;
178
179
        /** @psalm-suppress InvalidArgument Incorrect psalm unpack parameters (...$args) */
180
        return call(
181
            function () use ($aggregate, $toVersion, $mode): \Generator
182
            {
183
                yield from $this->setupMutex($aggregate->id());
184
185
                try
186
                {
187
                    /** @var Aggregate $aggregate */
188
                    $aggregate = yield $this->repository->revert($aggregate, $toVersion, $mode);
189
190
                    return $aggregate;
191
                }
192
                catch (\Throwable $throwable)
193
                {
194
                    throw RevertAggregateVersionFailed::fromThrowable($throwable);
195
                }
196
                finally
197
                {
198
                    yield from $this->releaseMutex($aggregate->id());
199
                }
200
            }
201
        );
202
    }
203
204
    private function setupMutex(AggregateId $id): \Generator
205
    {
206
        $mutexKey = createAggregateMutexKey($id);
207
208
        if (\array_key_exists($mutexKey, $this->lockCollection) === false)
209
        {
210
            $mutex = $this->mutexFactory->create($mutexKey);
211
212
            /** @var Lock $lock */
213
            $lock = yield $mutex->acquire();
214
215
            $this->lockCollection[$mutexKey] = $lock;
216
        }
217
    }
218
219
    private function releaseMutex(AggregateId $id): \Generator
220
    {
221
        $mutexKey = createAggregateMutexKey($id);
222
223
        if (\array_key_exists($mutexKey, $this->lockCollection) === true)
224
        {
225
            /** @var Lock $lock */
226
            $lock = $this->lockCollection[$mutexKey];
227
228
            unset($this->lockCollection[$mutexKey]);
229
230
            yield $lock->release();
231
        }
232
    }
233
}
234