Completed
Push — v3.2 ( 8bd00d...440511 )
by Masiukevich
04:17
created

EventSourcingProvider::save()   A

Complexity

Conditions 5
Paths 1

Size

Total Lines 57
Code Lines 20

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 17
CRAP Score 5.0291

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 5
eloc 20
c 1
b 0
f 0
nc 1
nop 2
dl 0
loc 57
ccs 17
cts 19
cp 0.8947
crap 5.0291
rs 9.2888

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
/**
4
 * Event Sourcing implementation module.
5
 *
6
 * @author  Maksim Masiukevich <[email protected]>
7
 * @license MIT
8
 * @license https://opensource.org/licenses/MIT
9
 */
10
11
declare(strict_types = 1);
12
13
namespace ServiceBus\EventSourcingModule;
14
15
use function Amp\call;
16
use Amp\Promise;
17
use ServiceBus\Common\Context\ServiceBusContext;
18
use ServiceBus\EventSourcing\Aggregate;
19
use ServiceBus\EventSourcing\AggregateId;
20
use ServiceBus\EventSourcing\EventStream\EventStreamRepository;
21
use ServiceBus\EventSourcingModule\Exceptions\DuplicateAggregate;
22
use ServiceBus\EventSourcingModule\Exceptions\LoadAggregateFailed;
23
use ServiceBus\EventSourcingModule\Exceptions\RevertAggregateVersionFailed;
24
use ServiceBus\EventSourcingModule\Exceptions\SaveAggregateFailed;
25
use ServiceBus\Mutex\InMemoryMutexFactory;
26
use ServiceBus\Mutex\Lock;
27
use ServiceBus\Mutex\MutexFactory;
28
use ServiceBus\Storage\Common\Exceptions\UniqueConstraintViolationCheckFailed;
29
30
/**
31
 *
32
 */
33
final class EventSourcingProvider
34
{
35
    /**
36
     * @var EventStreamRepository
37
     */
38
    private $repository;
39
40
    /**
41
     * List of loaded/added aggregates.
42
     *
43
     * @psalm-var array<string, string>
44
     *
45
     * @var array
46
     */
47
    private $aggregates = [];
48
49
    /**
50
     * Current locks collection.
51
     *
52
     * @psalm-var array<string, \ServiceBus\Mutex\Lock>
53
     *
54
     * @var Lock[]
55
     */
56
    private $locks = [];
57
58
    /**
59
     * Mutex creator.
60
     *
61
     * @var MutexFactory
62
     */
63
    private $mutexFactory;
64
65
    /**
66
     * EventSourcingProvider constructor.
67
     *
68
     * @param EventStreamRepository $repository
69
     * @param MutexFactory|null     $mutexFactory
70
     */
71 6
    public function __construct(EventStreamRepository $repository, ?MutexFactory $mutexFactory = null)
72
    {
73 6
        $this->repository   = $repository;
74 6
        $this->mutexFactory = $mutexFactory ?? new InMemoryMutexFactory();
75 6
    }
76
77
    /**
78
     * Load aggregate.
79
     *
80
     * @noinspection PhpDocRedundantThrowsInspection
81
     *
82
     * @param AggregateId $id
83
     *
84
     * @throws \ServiceBus\EventSourcingModule\Exceptions\LoadAggregateFailed
85
     *
86
     * @return Promise
87
     *
88
     */
89 1
    public function load(AggregateId $id): Promise
90
    {
91
        /** @psalm-suppress InvalidArgument Incorrect psalm unpack parameters (...$args) */
92 1
        return call(
93
            function(AggregateId $id): \Generator
94
            {
95
                try
96
                {
97 1
                    yield from $this->setupMutex($id);
98
99
                    /**
100
                     * @psalm-suppress TooManyTemplateParams Wrong Promise template
101
                     *
102
                     * @var Aggregate|null $aggregate
103
                     */
104 1
                    $aggregate = yield $this->repository->load($id);
105
106 1
                    if (null !== $aggregate)
107
                    {
108 1
                        $this->aggregates[(string) $aggregate->id()] = \get_class($aggregate);
109
                    }
110
111 1
                    return $aggregate;
112
                }
113
                catch (\Throwable $throwable)
114
                {
115
                    yield from $this->releaseMutex($id);
116
117
                    throw LoadAggregateFailed::fromThrowable($throwable);
118
                }
119 1
            },
120 1
            $id
121
        );
122
    }
123
124
    /**
125
     * Save a new aggregate.
126
     *
127
     * @noinspection PhpDocRedundantThrowsInspection
128
     *
129
     * @param Aggregate         $aggregate
130
     * @param ServiceBusContext $context
131
     *
132
     * @throws \ServiceBus\EventSourcingModule\Exceptions\SaveAggregateFailed
133
     * @throws \ServiceBus\EventSourcingModule\Exceptions\DuplicateAggregate
134
     *
135
     * @return Promise
136
     *
137
     */
138 4
    public function save(Aggregate $aggregate, ServiceBusContext $context): Promise
139
    {
140
        /** @psalm-suppress InvalidArgument Incorrect psalm unpack parameters (...$args) */
141 4
        return call(
142
            function(Aggregate $aggregate, ServiceBusContext $context): \Generator
143
            {
144
                try
145
                {
146
                    /** The aggregate hasn't been loaded before, which means it is new */
147 4
                    if (false === isset($this->aggregates[(string) $aggregate->id()]))
148
                    {
149
                        /**
150
                         * @psalm-suppress TooManyTemplateParams Wrong Promise template
151
                         * @psalm-var      array<int, object> $events
152
                         *
153
                         * @var object[] $events
154
                         */
155 4
                        $events = yield $this->repository->save($aggregate);
156
157 4
                        $this->aggregates[(string) $aggregate->id()] = \get_class($aggregate);
158
                    }
159
                    else
160
                    {
161
                        /**
162
                         * @psalm-suppress TooManyTemplateParams Wrong Promise template
163
                         * @psalm-var      array<int, object> $events
164
                         *
165
                         * @var object[] $events
166
                         */
167 3
                        $events = yield $this->repository->update($aggregate);
168
                    }
169
170 4
                    $promises = [];
171
172
                    /** @var object $event */
173 4
                    foreach ($events as $event)
174
                    {
175 4
                        $promises[] = $context->delivery($event);
176
                    }
177
178 4
                    yield $promises;
179
                }
180 1
                catch (UniqueConstraintViolationCheckFailed $exception)
181
                {
182 1
                    throw DuplicateAggregate::create($aggregate->id());
183
                }
184
                catch (\Throwable $throwable)
185
                {
186
                    throw SaveAggregateFailed::fromThrowable($throwable);
187
                }
188
                finally
189 4
                {
190 4
                    yield from $this->releaseMutex($aggregate->id());
191
                }
192 4
            },
193 4
            $aggregate,
194 4
            $context
195
        );
196
    }
197
198
    /**
199
     * Revert aggregate to specified version.
200
     *
201
     * Mode options:
202
     *   - 1 (EventStreamRepository::REVERT_MODE_SOFT_DELETE): Mark tail events as deleted (soft deletion). There may
203
     *   be version conflicts in some situations
204
     *   - 2 (EventStreamRepository::REVERT_MODE_DELETE): Removes tail events from the database (the best option)
205
     *
206
     * @noinspection   PhpDocRedundantThrowsInspection
207
     * @psalm-suppress MixedTypeCoercion Incorrect resolving the value of the promise
208
     *
209
     * @param Aggregate $aggregate
210
     * @param int       $toVersion
211
     * @param int       $mode
212
     *
213
     * @throws \ServiceBus\EventSourcingModule\Exceptions\RevertAggregateVersionFailed
214
     *
215
     * @return Promise<\ServiceBus\EventSourcing\Aggregate>
216
     *
217
     */
218 3
    public function revert(
219
        Aggregate $aggregate,
220
        int $toVersion,
221
        ?int $mode = null
222
    ): Promise {
223 3
        $mode = $mode ?? EventStreamRepository::REVERT_MODE_SOFT_DELETE;
224
225
        /** @psalm-suppress InvalidArgument Incorrect psalm unpack parameters (...$args) */
226 3
        return call(
227
            function(Aggregate $aggregate, int $toVersion, int $mode): \Generator
228
            {
229 3
                yield from $this->setupMutex($aggregate->id());
230
231
                try
232
                {
233
                    /**
234
                     * @psalm-suppress TooManyTemplateParams Wrong Promise template
235
                     *
236
                     * @var Aggregate $aggregate
237
                     */
238 3
                    $aggregate = yield $this->repository->revert($aggregate, $toVersion, $mode);
239
240 2
                    return $aggregate;
241
                }
242 2
                catch (\Throwable $throwable)
243
                {
244 2
                    throw RevertAggregateVersionFailed::fromThrowable($throwable);
245
                }
246
                finally
247
                {
248 3
                    yield from $this->releaseMutex($aggregate->id());
249
                }
250 3
            },
251 3
            $aggregate,
252 3
            $toVersion,
253 3
            $mode
254
        );
255
    }
256
257
    /**
258
     * @param AggregateId $id
259
     *
260
     * @return \Generator
261
     */
262 4
    private function setupMutex(AggregateId $id): \Generator
263
    {
264 4
        $mutexKey = createAggregateMutexKey($id);
265
266 4
        if (false === isset($this->locks[$mutexKey]))
267
        {
268 4
            $mutex = $this->mutexFactory->create($mutexKey);
269
270
            /**
271
             * @psalm-suppress TooManyTemplateParams
272
             * @psalm-suppress InvalidPropertyAssignmentValue
273
             */
274 4
            $this->locks[$mutexKey] = yield $mutex->acquire();
275
        }
276 4
    }
277
278
    /**
279
     * @param AggregateId $id
280
     *
281
     * @return \Generator
282
     */
283 5
    private function releaseMutex(AggregateId $id): \Generator
284
    {
285 5
        $mutexKey = createAggregateMutexKey($id);
286
287 5
        if (true === isset($this->locks[$mutexKey]))
288
        {
289
            /** @var Lock $lock */
290 3
            $lock = $this->locks[$mutexKey];
291
292
            /** @psalm-suppress TooManyTemplateParams */
293 3
            yield $lock->release();
294
295 3
            unset($this->locks[$mutexKey]);
296
        }
297 5
    }
298
}
299