Completed
Push — v3.2 ( 440511...e1d649 )
by Masiukevich
04:34
created

EventSourcingProvider::save()   A

Complexity

Conditions 5
Paths 1

Size

Total Lines 57
Code Lines 20

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 17
CRAP Score 5.0291

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 5
eloc 20
c 1
b 0
f 0
nc 1
nop 2
dl 0
loc 57
ccs 17
cts 19
cp 0.8947
crap 5.0291
rs 9.2888

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
/**
4
 * Event Sourcing implementation module.
5
 *
6
 * @author  Maksim Masiukevich <[email protected]>
7
 * @license MIT
8
 * @license https://opensource.org/licenses/MIT
9
 */
10
11
declare(strict_types = 1);
12
13
namespace ServiceBus\EventSourcingModule;
14
15
use function Amp\call;
16
use Amp\Promise;
17
use ServiceBus\Common\Context\ServiceBusContext;
18
use ServiceBus\EventSourcing\Aggregate;
19
use ServiceBus\EventSourcing\AggregateId;
20
use ServiceBus\EventSourcing\EventStream\EventStreamRepository;
21
use ServiceBus\EventSourcingModule\Exceptions\DuplicateAggregate;
22
use ServiceBus\EventSourcingModule\Exceptions\LoadAggregateFailed;
23
use ServiceBus\EventSourcingModule\Exceptions\RevertAggregateVersionFailed;
24
use ServiceBus\EventSourcingModule\Exceptions\SaveAggregateFailed;
25
use ServiceBus\Mutex\InMemoryMutexFactory;
26
use ServiceBus\Mutex\Lock;
27
use ServiceBus\Mutex\MutexFactory;
28
use ServiceBus\Storage\Common\Exceptions\UniqueConstraintViolationCheckFailed;
29
30
/**
31
 *
32
 */
33
final class EventSourcingProvider
34
{
35
    /**
36
     * @var EventStreamRepository
37
     */
38
    private $repository;
39
40
    /**
41
     * List of loaded/added aggregates.
42
     *
43
     * @psalm-var array<string, string>
44
     *
45
     * @var array
46
     */
47
    private $aggregates = [];
48
49
    /**
50
     * Current locks collection.
51
     *
52
     * @psalm-var array<string, \ServiceBus\Mutex\Lock>
53
     *
54
     * @var Lock[]
55
     */
56
    private $locks = [];
57
58
    /**
59
     * Mutex creator.
60
     *
61
     * @var MutexFactory
62
     */
63
    private $mutexFactory;
64
65
    /**
66
     * EventSourcingProvider constructor.
67
     *
68
     * @param EventStreamRepository $repository
69
     * @param MutexFactory|null     $mutexFactory
70
     */
71 6
    public function __construct(EventStreamRepository $repository, ?MutexFactory $mutexFactory = null)
72
    {
73 6
        $this->repository   = $repository;
74 6
        $this->mutexFactory = $mutexFactory ?? new InMemoryMutexFactory();
75 6
    }
76
77
    /**
78
     * Load aggregate.
79
     *
80
     * @noinspection PhpDocRedundantThrowsInspection
81
     *
82
     * @param AggregateId $id
83
     *
84
     * @throws \ServiceBus\EventSourcingModule\Exceptions\LoadAggregateFailed
85
     *
86
     * @return Promise
87
     *
88
     */
89 1
    public function load(AggregateId $id): Promise
90
    {
91
        /** @psalm-suppress InvalidArgument Incorrect psalm unpack parameters (...$args) */
92 1
        return call(
93
            function(AggregateId $id): \Generator
94
            {
95
                try
96
                {
97 1
                    yield from $this->setupMutex($id);
98
99
                    /**
100
                     * @psalm-suppress TooManyTemplateParams Wrong Promise template
101
                     *
102
                     * @var Aggregate|null $aggregate
103
                     */
104 1
                    $aggregate = yield $this->repository->load($id);
105
106 1
                    if (null !== $aggregate)
107
                    {
108 1
                        $this->aggregates[(string) $aggregate->id()] = \get_class($aggregate);
109
                    }
110
                    else
111
                    {
112
                        yield from $this->releaseMutex($id);
113
                    }
114
115 1
                    return $aggregate;
116
                }
117
                catch (\Throwable $throwable)
118
                {
119
                    yield from $this->releaseMutex($id);
120
121
                    throw LoadAggregateFailed::fromThrowable($throwable);
122
                }
123 1
            },
124 1
            $id
125
        );
126
    }
127
128
    /**
129
     * Save a new aggregate.
130
     *
131
     * @noinspection PhpDocRedundantThrowsInspection
132
     *
133
     * @param Aggregate         $aggregate
134
     * @param ServiceBusContext $context
135
     *
136
     * @throws \ServiceBus\EventSourcingModule\Exceptions\SaveAggregateFailed
137
     * @throws \ServiceBus\EventSourcingModule\Exceptions\DuplicateAggregate
138
     *
139
     * @return Promise
140
     *
141
     */
142 4
    public function save(Aggregate $aggregate, ServiceBusContext $context): Promise
143
    {
144
        /** @psalm-suppress InvalidArgument Incorrect psalm unpack parameters (...$args) */
145 4
        return call(
146
            function(Aggregate $aggregate, ServiceBusContext $context): \Generator
147
            {
148
                try
149
                {
150
                    /** The aggregate hasn't been loaded before, which means it is new */
151 4
                    if (false === isset($this->aggregates[(string) $aggregate->id()]))
152
                    {
153
                        /**
154
                         * @psalm-suppress TooManyTemplateParams Wrong Promise template
155
                         * @psalm-var      array<int, object> $events
156
                         *
157
                         * @var object[] $events
158
                         */
159 4
                        $events = yield $this->repository->save($aggregate);
160
161 4
                        $this->aggregates[(string) $aggregate->id()] = \get_class($aggregate);
162
                    }
163
                    else
164
                    {
165
                        /**
166
                         * @psalm-suppress TooManyTemplateParams Wrong Promise template
167
                         * @psalm-var      array<int, object> $events
168
                         *
169
                         * @var object[] $events
170
                         */
171 3
                        $events = yield $this->repository->update($aggregate);
172
                    }
173
174 4
                    $promises = [];
175
176
                    /** @var object $event */
177 4
                    foreach ($events as $event)
178
                    {
179 4
                        $promises[] = $context->delivery($event);
180
                    }
181
182 4
                    yield $promises;
183
                }
184 1
                catch (UniqueConstraintViolationCheckFailed $exception)
185
                {
186 1
                    throw DuplicateAggregate::create($aggregate->id());
187
                }
188
                catch (\Throwable $throwable)
189
                {
190
                    throw SaveAggregateFailed::fromThrowable($throwable);
191
                }
192
                finally
193 4
                {
194 4
                    yield from $this->releaseMutex($aggregate->id());
195
                }
196 4
            },
197 4
            $aggregate,
198 4
            $context
199
        );
200
    }
201
202
    /**
203
     * Revert aggregate to specified version.
204
     *
205
     * Mode options:
206
     *   - 1 (EventStreamRepository::REVERT_MODE_SOFT_DELETE): Mark tail events as deleted (soft deletion). There may
207
     *   be version conflicts in some situations
208
     *   - 2 (EventStreamRepository::REVERT_MODE_DELETE): Removes tail events from the database (the best option)
209
     *
210
     * @noinspection   PhpDocRedundantThrowsInspection
211
     * @psalm-suppress MixedTypeCoercion Incorrect resolving the value of the promise
212
     *
213
     * @param Aggregate $aggregate
214
     * @param int       $toVersion
215
     * @param int       $mode
216
     *
217
     * @throws \ServiceBus\EventSourcingModule\Exceptions\RevertAggregateVersionFailed
218
     *
219
     * @return Promise<\ServiceBus\EventSourcing\Aggregate>
220
     *
221
     */
222 3
    public function revert(
223
        Aggregate $aggregate,
224
        int $toVersion,
225
        ?int $mode = null
226
    ): Promise {
227 3
        $mode = $mode ?? EventStreamRepository::REVERT_MODE_SOFT_DELETE;
228
229
        /** @psalm-suppress InvalidArgument Incorrect psalm unpack parameters (...$args) */
230 3
        return call(
231
            function(Aggregate $aggregate, int $toVersion, int $mode): \Generator
232
            {
233 3
                yield from $this->setupMutex($aggregate->id());
234
235
                try
236
                {
237
                    /**
238
                     * @psalm-suppress TooManyTemplateParams Wrong Promise template
239
                     *
240
                     * @var Aggregate $aggregate
241
                     */
242 3
                    $aggregate = yield $this->repository->revert($aggregate, $toVersion, $mode);
243
244 2
                    return $aggregate;
245
                }
246 2
                catch (\Throwable $throwable)
247
                {
248 2
                    throw RevertAggregateVersionFailed::fromThrowable($throwable);
249
                }
250
                finally
251
                {
252 3
                    yield from $this->releaseMutex($aggregate->id());
253
                }
254 3
            },
255 3
            $aggregate,
256 3
            $toVersion,
257 3
            $mode
258
        );
259
    }
260
261
    /**
262
     * @param AggregateId $id
263
     *
264
     * @return \Generator
265
     */
266 4
    private function setupMutex(AggregateId $id): \Generator
267
    {
268 4
        $mutexKey = createAggregateMutexKey($id);
269
270 4
        if (false === isset($this->locks[$mutexKey]))
271
        {
272 4
            $mutex = $this->mutexFactory->create($mutexKey);
273
274
            /**
275
             * @psalm-suppress TooManyTemplateParams
276
             * @psalm-suppress InvalidPropertyAssignmentValue
277
             */
278 4
            $this->locks[$mutexKey] = yield $mutex->acquire();
279
        }
280 4
    }
281
282
    /**
283
     * @param AggregateId $id
284
     *
285
     * @return \Generator
286
     */
287 5
    private function releaseMutex(AggregateId $id): \Generator
288
    {
289 5
        $mutexKey = createAggregateMutexKey($id);
290
291 5
        if (true === isset($this->locks[$mutexKey]))
292
        {
293
            /** @var Lock $lock */
294 3
            $lock = $this->locks[$mutexKey];
295
296
            /** @psalm-suppress TooManyTemplateParams */
297 3
            yield $lock->release();
298
299 3
            unset($this->locks[$mutexKey]);
300
        }
301 5
    }
302
}
303