Passed
Push — v3.2 ( 8a1d48...8a3142 )
by Masiukevich
01:41
created

EventSourcingProvider::__construct()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 2
nc 1
nop 2
dl 0
loc 4
ccs 3
cts 3
cp 1
crap 1
rs 10
c 0
b 0
f 0
1
<?php
2
3
/**
4
 * Event Sourcing implementation module.
5
 *
6
 * @author  Maksim Masiukevich <[email protected]>
7
 * @license MIT
8
 * @license https://opensource.org/licenses/MIT
9
 */
10
11
declare(strict_types = 1);
12
13
namespace ServiceBus\EventSourcingModule;
14
15
use function Amp\call;
16
use Amp\Promise;
17
use ServiceBus\Common\Context\ServiceBusContext;
18
use ServiceBus\EventSourcing\Aggregate;
19
use ServiceBus\EventSourcing\AggregateId;
20
use ServiceBus\EventSourcing\EventStream\EventStreamRepository;
21
use ServiceBus\EventSourcingModule\Exceptions\DuplicateAggregate;
22
use ServiceBus\EventSourcingModule\Exceptions\LoadAggregateFailed;
23
use ServiceBus\EventSourcingModule\Exceptions\RevertAggregateVersionFailed;
24
use ServiceBus\EventSourcingModule\Exceptions\SaveAggregateFailed;
25
use ServiceBus\Mutex\InMemoryMutexFactory;
26
use ServiceBus\Mutex\Lock;
27
use ServiceBus\Mutex\MutexFactory;
28
use ServiceBus\Storage\Common\Exceptions\UniqueConstraintViolationCheckFailed;
29
30
/**
31
 *
32
 */
33
final class EventSourcingProvider
34
{
35
    /**
36
     * @var EventStreamRepository
37
     */
38
    private $repository;
39
40
    /**
41
     * List of loaded/added aggregates.
42
     *
43
     * @psalm-var array<string, string>
44
     *
45
     * @var array
46
     */
47
    private $aggregates = [];
48
49
    /**
50
     * Current locks collection.
51
     *
52
     * @psalm-var array<string, \ServiceBus\Mutex\Lock>
53
     *
54
     * @var Lock[]
55
     */
56
    private $locks = [];
57
58
    /**
59
     * Mutex creator.
60
     *
61
     * @var MutexFactory
62
     */
63
    private $mutexFactory;
64
65
    /**
66
     * EventSourcingProvider constructor.
67
     *
68
     * @param EventStreamRepository $repository
69
     * @param MutexFactory|null     $mutexFactory
70
     */
71 6
    public function __construct(EventStreamRepository $repository, ?MutexFactory $mutexFactory = null)
72
    {
73 6
        $this->repository   = $repository;
74 6
        $this->mutexFactory = $mutexFactory ?? new InMemoryMutexFactory();
75 6
    }
76
77
    /**
78
     * Load aggregate.
79
     *
80
     * @noinspection PhpDocRedundantThrowsInspection
81
     *
82
     * @param AggregateId $id
83
     *
84
     * @throws \ServiceBus\EventSourcingModule\Exceptions\LoadAggregateFailed
85
     *
86
     * @return Promise
87
     *
88
     */
89 1
    public function load(AggregateId $id): Promise
90
    {
91
        /** @psalm-suppress InvalidArgument Incorrect psalm unpack parameters (...$args) */
92 1
        return call(
93
            function(AggregateId $id): \Generator
94
            {
95
                try
96
                {
97 1
                    yield from $this->setupMutex($id);
98
99
                    /**
100
                     * @psalm-suppress TooManyTemplateParams Wrong Promise template
101
                     *
102
                     * @var Aggregate|null $aggregate
103
                     */
104 1
                    $aggregate = yield $this->repository->load($id);
105
106 1
                    if (null !== $aggregate)
107
                    {
108 1
                        $this->aggregates[(string) $aggregate->id()] = \get_class($aggregate);
109
                    }
110
111 1
                    return $aggregate;
112
                }
113
                catch (\Throwable $throwable)
114
                {
115
                    yield from $this->releaseMutex($id);
116
117
                    throw LoadAggregateFailed::fromThrowable($throwable);
118
                }
119 1
            },
120 1
            $id
121
        );
122
    }
123
124
    /**
125
     * Save a new aggregate.
126
     *
127
     * @noinspection PhpDocRedundantThrowsInspection
128
     *
129
     * @param Aggregate         $aggregate
130
     * @param ServiceBusContext $context
131
     *
132
     * @throws \ServiceBus\EventSourcingModule\Exceptions\SaveAggregateFailed
133
     * @throws \ServiceBus\EventSourcingModule\Exceptions\DuplicateAggregate
134
     *
135
     * @return Promise
136
     *
137
     */
138 4
    public function save(Aggregate $aggregate, ServiceBusContext $context): Promise
139
    {
140
        /** @psalm-suppress InvalidArgument Incorrect psalm unpack parameters (...$args) */
141 4
        return call(
142
            function(Aggregate $aggregate, ServiceBusContext $context): \Generator
143
            {
144
                try
145
                {
146
                    /** The aggregate hasn't been loaded before, which means it is new */
147 4
                    if (false === isset($this->aggregates[(string) $aggregate->id()]))
148
                    {
149
                        /**
150
                         * @psalm-suppress TooManyTemplateParams Wrong Promise template
151
                         * @psalm-var      array<int, object> $events
152
                         *
153
                         * @var object[] $events
154
                         */
155 4
                        $events = yield $this->repository->save($aggregate);
156
157 4
                        $this->aggregates[(string) $aggregate->id()] = \get_class($aggregate);
158
                    }
159
                    else
160
                    {
161
                        /**
162
                         * @psalm-suppress TooManyTemplateParams Wrong Promise template
163
                         * @psalm-var      array<int, object> $events
164
                         *
165
                         * @var object[] $events
166
                         */
167 3
                        $events = yield $this->repository->update($aggregate);
168
                    }
169
170 4
                    $promises = [];
171
172
                    /** @var object $event */
173 4
                    foreach ($events as $event)
174
                    {
175 4
                        $promises[] = $context->delivery($event);
176
                    }
177
178 4
                    yield $promises;
179
                }
180 1
                catch (UniqueConstraintViolationCheckFailed $exception)
181
                {
182 1
                    throw DuplicateAggregate::create($aggregate->id());
183
                }
184
                catch (\Throwable $throwable)
185
                {
186
                    throw SaveAggregateFailed::fromThrowable($throwable);
187
                }
188
                finally
189 4
                {
190 4
                    yield from $this->releaseMutex($aggregate->id());
191
                }
192 4
            },
193 4
            $aggregate,
194 4
            $context
195
        );
196
    }
197
198
    /**
199
     * Revert aggregate to specified version.
200
     *
201
     * Mode options:
202
     *   - 1 (EventStreamRepository::REVERT_MODE_SOFT_DELETE): Mark tail events as deleted (soft deletion). There may
203
     *   be version conflicts in some situations
204
     *   - 2 (EventStreamRepository::REVERT_MODE_DELETE): Removes tail events from the database (the best option)
205
     *
206
     * @noinspection   PhpDocRedundantThrowsInspection
207
     * @psalm-suppress MixedTypeCoercion Incorrect resolving the value of the promise
208
     *
209
     * @param Aggregate $aggregate
210
     * @param int       $toVersion
211
     * @param int       $mode
212
     *
213
     * @throws \ServiceBus\EventSourcingModule\Exceptions\RevertAggregateVersionFailed
214
     *
215
     * @return Promise<\ServiceBus\EventSourcing\Aggregate>
216
     *
217
     */
218 3
    public function revert(
219
        Aggregate $aggregate,
220
        int $toVersion,
221
        ?int $mode = null
222
    ): Promise {
223 3
        $mode = $mode ?? EventStreamRepository::REVERT_MODE_SOFT_DELETE;
224
225
        /** @psalm-suppress InvalidArgument Incorrect psalm unpack parameters (...$args) */
226 3
        return call(
227
            function(Aggregate $aggregate, int $toVersion, int $mode): \Generator
228
            {
229 3
                yield from $this->setupMutex($aggregate->id());
230
231
                try
232
                {
233
                    /**
234
                     * @psalm-suppress TooManyTemplateParams Wrong Promise template
235
                     *
236
                     * @var Aggregate $aggregate
237
                     */
238 3
                    $aggregate = yield $this->repository->revert($aggregate, $toVersion, $mode);
239
240 2
                    return $aggregate;
241
                }
242 2
                catch (\Throwable $throwable)
243
                {
244 2
                    throw RevertAggregateVersionFailed::fromThrowable($throwable);
245
                }
246
                finally
247
                {
248 3
                    yield from $this->releaseMutex($aggregate->id());
249
                }
250 3
            },
251 3
            $aggregate,
252 3
            $toVersion,
253 3
            $mode
254
        );
255
    }
256
257
    /**
258
     * @param AggregateId $id
259
     *
260
     * @return \Generator
261
     */
262 4
    private function setupMutex(AggregateId $id): \Generator
263
    {
264 4
        $mutexKey = createAggregateMutexKey($id);
265 4
        $mutex    = $this->mutexFactory->create($mutexKey);
266
267
        /**
268
         * @psalm-suppress TooManyTemplateParams
269
         * @psalm-suppress InvalidPropertyAssignmentValue
270
         */
271 4
        $this->locks[$mutexKey] = yield $mutex->acquire();
272 4
    }
273
274
    /**
275
     * @param AggregateId $id
276
     *
277
     * @return \Generator
278
     */
279 5
    private function releaseMutex(AggregateId $id): \Generator
280
    {
281 5
        $mutexKey = createAggregateMutexKey($id);
282
283 5
        if (true === isset($this->locks[$mutexKey]))
284
        {
285
            /** @var Lock $lock */
286 3
            $lock = $this->locks[$mutexKey];
287
288
            /** @psalm-suppress TooManyTemplateParams */
289 3
            yield $lock->release();
290
291 3
            unset($this->locks[$mutexKey]);
292
        }
293 5
    }
294
}
295