Passed
Push — v4.2 ( 50993c...c387ce )
by Masiukevich
10:37
created

EventSourcingProvider::setupMutex()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 12
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 2
eloc 5
c 1
b 0
f 0
nc 2
nop 1
dl 0
loc 12
rs 10
1
<?php
2
3
/**
4
 * Event Sourcing implementation.
5
 *
6
 * @author  Maksim Masiukevich <[email protected]>
7
 * @license MIT
8
 * @license https://opensource.org/licenses/MIT
9
 */
10
11
declare(strict_types = 1);
12
13
namespace ServiceBus\EventSourcing;
14
15
use function Amp\call;
16
use Amp\Promise;
17
use ServiceBus\Common\Context\ServiceBusContext;
18
use ServiceBus\EventSourcing\EventStream\EventStreamRepository;
19
use ServiceBus\EventSourcing\Exceptions\DuplicateAggregate;
20
use ServiceBus\EventSourcing\Exceptions\LoadAggregateFailed;
21
use ServiceBus\EventSourcing\Exceptions\RevertAggregateVersionFailed;
22
use ServiceBus\EventSourcing\Exceptions\SaveAggregateFailed;
23
use ServiceBus\Mutex\InMemory\InMemoryMutexFactory;
24
use ServiceBus\Mutex\Lock;
25
use ServiceBus\Mutex\MutexFactory;
26
use ServiceBus\Storage\Common\Exceptions\UniqueConstraintViolationCheckFailed;
27
28
/**
29
 *
30
 */
31
final class EventSourcingProvider
32
{
33
    /** @var EventStreamRepository */
34
    private $repository;
35
36
    /**
37
     * List of loaded/added aggregates.
38
     *
39
     * @psalm-var array<string, string>
40
     *
41
     * @var string[]
42
     */
43
    private $aggregates = [];
44
45
    /** @var MutexFactory */
46
    private $mutexFactory;
47
48
    /** @var Lock[] */
49
    private $lockCollection = [];
50
51
    public function __construct(EventStreamRepository $repository, ?MutexFactory $mutexFactory = null)
52
    {
53
        $this->repository   = $repository;
54
        $this->mutexFactory = $mutexFactory ?? new InMemoryMutexFactory();
55
    }
56
57
    /**
58
     * Load aggregate.
59
     *
60
     * Returns \ServiceBus\EventSourcing\Aggregate|null
61
     *
62
     * @throws \ServiceBus\EventSourcing\Exceptions\LoadAggregateFailed
63
     */
64
    public function load(AggregateId $id): Promise
65
    {
66
        return call(
67
            function() use ($id): \Generator
68
            {
69
                yield from $this->setupMutex($id);
70
71
                try
72
                {
73
                    /** @var Aggregate|null $aggregate */
74
                    $aggregate = yield $this->repository->load($id);
75
76
                    if (null !== $aggregate)
77
                    {
78
                        $this->aggregates[$aggregate->id()->toString()] = \get_class($aggregate);
79
                    }
80
81
                    return $aggregate;
82
                }
83
                catch (\Throwable $throwable)
84
                {
85
                    throw LoadAggregateFailed::fromThrowable($throwable);
86
                }
87
                finally
88
                {
89
                    yield from $this->releaseMutex($id);
90
                }
91
            }
92
        );
93
    }
94
95
    /**
96
     * Save a new aggregate.
97
     *
98
     * @throws \ServiceBus\EventSourcing\Exceptions\SaveAggregateFailed
99
     * @throws \ServiceBus\EventSourcing\Exceptions\DuplicateAggregate
100
     */
101
    public function save(Aggregate $aggregate, ServiceBusContext $context): Promise
102
    {
103
        return call(
104
            function() use ($aggregate, $context): \Generator
105
            {
106
                try
107
                {
108
                    /** The aggregate hasn't been loaded before, which means it is new */
109
                    if (false === isset($this->aggregates[$aggregate->id()->toString()]))
110
                    {
111
                        /**
112
                         * @psalm-var  array<int, object> $events
113
                         *
114
                         * @var object[] $events
115
                         */
116
                        $events = yield $this->repository->save($aggregate);
117
118
                        $this->aggregates[$aggregate->id()->toString()] = \get_class($aggregate);
119
                    }
120
                    else
121
                    {
122
                        /**
123
                         * @psalm-var array<int, object> $events
124
                         *
125
                         * @var object[] $events
126
                         */
127
                        $events = yield $this->repository->update($aggregate);
128
                    }
129
130
                    $promises = [];
131
132
                    foreach ($events as $event)
133
                    {
134
                        $promises[] = $context->delivery($event);
135
                    }
136
137
                    yield $promises;
138
                }
139
                catch (UniqueConstraintViolationCheckFailed $exception)
140
                {
141
                    throw DuplicateAggregate::create($aggregate->id());
142
                }
143
                catch (\Throwable $throwable)
144
                {
145
                    throw SaveAggregateFailed::fromThrowable($throwable);
146
                }
147
                finally
148
                {
149
                    yield from $this->releaseMutex($aggregate->id());
150
                }
151
            }
152
        );
153
    }
154
155
    /**
156
     * Revert aggregate to specified version.
157
     *
158
     * Returns \ServiceBus\EventSourcing\Aggregate
159
     *
160
     * Mode options:
161
     *   - 1 (EventStreamRepository::REVERT_MODE_SOFT_DELETE): Mark tail events as deleted (soft deletion). There may
162
     *   be version conflicts in some situations
163
     *   - 2 (EventStreamRepository::REVERT_MODE_DELETE): Removes tail events from the database (the best option)
164
     *
165
     * @throws \ServiceBus\EventSourcing\Exceptions\RevertAggregateVersionFailed
166
     */
167
    public function revert(
168
        Aggregate $aggregate,
169
        int $toVersion,
170
        ?int $mode = null
171
    ): Promise {
172
        $mode = $mode ?? EventStreamRepository::REVERT_MODE_SOFT_DELETE;
173
174
        /** @psalm-suppress InvalidArgument Incorrect psalm unpack parameters (...$args) */
175
        return call(
176
            function() use ($aggregate, $toVersion, $mode): \Generator
177
            {
178
                yield from $this->setupMutex($aggregate->id());
179
180
                try
181
                {
182
                    /** @var Aggregate $aggregate */
183
                    $aggregate = yield $this->repository->revert($aggregate, $toVersion, $mode);
184
185
                    return $aggregate;
186
                }
187
                catch (\Throwable $throwable)
188
                {
189
                    throw RevertAggregateVersionFailed::fromThrowable($throwable);
190
                }
191
                finally
192
                {
193
                    yield from $this->releaseMutex($aggregate->id());
194
                }
195
            }
196
        );
197
    }
198
199
    private function setupMutex(AggregateId $id): \Generator
200
    {
201
        $mutexKey = createAggregateMutexKey($id);
202
203
        if (false === \array_key_exists($mutexKey, $this->lockCollection))
204
        {
205
            $mutex = $this->mutexFactory->create($mutexKey);
206
207
            /** @var Lock $lock */
208
            $lock = yield $mutex->acquire();
209
210
            $this->lockCollection[$mutexKey] = $lock;
211
        }
212
    }
213
214
    private function releaseMutex(AggregateId $id): \Generator
215
    {
216
        $mutexKey = createAggregateMutexKey($id);
217
218
        if (\array_key_exists($mutexKey, $this->lockCollection))
219
        {
220
            /** @var Lock $lock */
221
            $lock = $this->lockCollection[$mutexKey];
222
223
            unset($this->lockCollection[$mutexKey]);
224
225
            yield $lock->release();
226
        }
227
    }
228
}
229