Completed
Push — v3.3 ( 6c1cb0...8faaca )
by Masiukevich
48:12 queued 46:35
created

EventSourcingProvider::save()   A

Complexity

Conditions 5
Paths 1

Size

Total Lines 55
Code Lines 20

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 17
CRAP Score 5.0291

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 5
eloc 20
c 1
b 0
f 0
nc 1
nop 2
dl 0
loc 55
ccs 17
cts 19
cp 0.8947
crap 5.0291
rs 9.2888

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
/**
4
 * Event Sourcing implementation module.
5
 *
6
 * @author  Maksim Masiukevich <[email protected]>
7
 * @license MIT
8
 * @license https://opensource.org/licenses/MIT
9
 */
10
11
declare(strict_types = 1);
12
13
namespace ServiceBus\EventSourcingModule;
14
15
use function Amp\call;
16
use Amp\Promise;
17
use ServiceBus\Common\Context\ServiceBusContext;
18
use ServiceBus\EventSourcing\Aggregate;
19
use ServiceBus\EventSourcing\AggregateId;
20
use ServiceBus\EventSourcing\EventStream\EventStreamRepository;
21
use ServiceBus\EventSourcingModule\Exceptions\DuplicateAggregate;
22
use ServiceBus\EventSourcingModule\Exceptions\LoadAggregateFailed;
23
use ServiceBus\EventSourcingModule\Exceptions\RevertAggregateVersionFailed;
24
use ServiceBus\EventSourcingModule\Exceptions\SaveAggregateFailed;
25
use ServiceBus\Mutex\InMemoryMutexFactory;
26
use ServiceBus\Mutex\Lock;
27
use ServiceBus\Mutex\MutexFactory;
28
use ServiceBus\Storage\Common\Exceptions\UniqueConstraintViolationCheckFailed;
29
30
/**
31
 *
32
 */
33
final class EventSourcingProvider
34
{
35
    /**
36
     * @var EventStreamRepository
37
     */
38
    private $repository;
39
40
    /**
41
     * List of loaded/added aggregates.
42
     *
43
     * @psalm-var array<string, string>
44
     *
45
     * @var array
46
     */
47
    private $aggregates = [];
48
49
    /**
50
     * Current locks collection.
51
     *
52
     * @psalm-var array<string, \ServiceBus\Mutex\Lock>
53
     *
54
     * @var Lock[]
55
     */
56
    private $locks = [];
57
58
    /**
59
     * Mutex creator.
60
     *
61
     * @var MutexFactory
62
     */
63
    private $mutexFactory;
64
65
    /**
66
     * EventSourcingProvider constructor.
67
     *
68
     * @param EventStreamRepository $repository
69
     * @param MutexFactory|null     $mutexFactory
70
     */
71 6
    public function __construct(EventStreamRepository $repository, ?MutexFactory $mutexFactory = null)
72
    {
73 6
        $this->repository   = $repository;
74 6
        $this->mutexFactory = $mutexFactory ?? new InMemoryMutexFactory();
75 6
    }
76
77
    /**
78
     * Load aggregate.
79
     *
80
     * @noinspection PhpDocRedundantThrowsInspection
81
     *
82
     * @param AggregateId $id
83
     *
84
     * @throws \ServiceBus\EventSourcingModule\Exceptions\LoadAggregateFailed
85
     *
86
     * @return Promise
87
     */
88 1
    public function load(AggregateId $id): Promise
89
    {
90
        /** @psalm-suppress InvalidArgument Incorrect psalm unpack parameters (...$args) */
91 1
        return call(
92
            function(AggregateId $id): \Generator
93
            {
94
                try
95
                {
96 1
                    yield from $this->setupMutex($id);
97
98
                    /** @var Aggregate|null $aggregate */
99 1
                    $aggregate = yield $this->repository->load($id);
100
101 1
                    if (null !== $aggregate)
102
                    {
103 1
                        $this->aggregates[$aggregate->id()->toString()] = \get_class($aggregate);
104
                    }
105
                    else
106
                    {
107
                        yield from $this->releaseMutex($id);
108
                    }
109
110 1
                    return $aggregate;
111
                }
112
                catch (\Throwable $throwable)
113
                {
114
                    yield from $this->releaseMutex($id);
115
116
                    throw LoadAggregateFailed::fromThrowable($throwable);
117
                }
118 1
            },
119 1
            $id
120
        );
121
    }
122
123
    /**
124
     * Save a new aggregate.
125
     *
126
     * @noinspection PhpDocRedundantThrowsInspection
127
     *
128
     * @param Aggregate         $aggregate
129
     * @param ServiceBusContext $context
130
     *
131
     * @throws \ServiceBus\EventSourcingModule\Exceptions\SaveAggregateFailed
132
     * @throws \ServiceBus\EventSourcingModule\Exceptions\DuplicateAggregate
133
     *
134
     * @return Promise
135
     */
136 4
    public function save(Aggregate $aggregate, ServiceBusContext $context): Promise
137
    {
138
        /** @psalm-suppress InvalidArgument Incorrect psalm unpack parameters (...$args) */
139 4
        return call(
140
            function(Aggregate $aggregate, ServiceBusContext $context): \Generator
141
            {
142
                try
143
                {
144
                    /** The aggregate hasn't been loaded before, which means it is new */
145 4
                    if (false === isset($this->aggregates[$aggregate->id()->toString()]))
146
                    {
147
                        /**
148
                         * @psalm-var  array<int, object> $events
149
                         *
150
                         * @var object[] $events
151
                         */
152 4
                        $events = yield $this->repository->save($aggregate);
153
154 4
                        $this->aggregates[$aggregate->id()->toString()] = \get_class($aggregate);
155
                    }
156
                    else
157
                    {
158
                        /**
159
                         * @psalm-var array<int, object> $events
160
                         *
161
                         * @var object[] $events
162
                         */
163 3
                        $events = yield $this->repository->update($aggregate);
164
                    }
165
166 4
                    $promises = [];
167
168
                    /** @var object $event */
169 4
                    foreach ($events as $event)
170
                    {
171 4
                        $promises[] = $context->delivery($event);
172
                    }
173
174 4
                    yield $promises;
175
                }
176 1
                catch (UniqueConstraintViolationCheckFailed $exception)
177
                {
178 1
                    throw DuplicateAggregate::create($aggregate->id());
179
                }
180
                catch (\Throwable $throwable)
181
                {
182
                    throw SaveAggregateFailed::fromThrowable($throwable);
183
                }
184
                finally
185 4
                {
186 4
                    yield from $this->releaseMutex($aggregate->id());
187
                }
188 4
            },
189 4
            $aggregate,
190 4
            $context
191
        );
192
    }
193
194
    /**
195
     * Revert aggregate to specified version.
196
     *
197
     * Mode options:
198
     *   - 1 (EventStreamRepository::REVERT_MODE_SOFT_DELETE): Mark tail events as deleted (soft deletion). There may
199
     *   be version conflicts in some situations
200
     *   - 2 (EventStreamRepository::REVERT_MODE_DELETE): Removes tail events from the database (the best option)
201
     *
202
     * @noinspection   PhpDocRedundantThrowsInspection
203
     * @psalm-suppress MixedTypeCoercion Incorrect resolving the value of the promise
204
     *
205
     * @param Aggregate $aggregate
206
     * @param int       $toVersion
207
     * @param int       $mode
208
     *
209
     * @throws \ServiceBus\EventSourcingModule\Exceptions\RevertAggregateVersionFailed
210
     *
211
     * @return Promise<\ServiceBus\EventSourcing\Aggregate>
212
     */
213 3
    public function revert(
214
        Aggregate $aggregate,
215
        int $toVersion,
216
        ?int $mode = null
217
    ): Promise {
218 3
        $mode = $mode ?? EventStreamRepository::REVERT_MODE_SOFT_DELETE;
219
220
        /** @psalm-suppress InvalidArgument Incorrect psalm unpack parameters (...$args) */
221 3
        return call(
222
            function(Aggregate $aggregate, int $toVersion, int $mode): \Generator
223
            {
224 3
                yield from $this->setupMutex($aggregate->id());
225
226
                try
227
                {
228
                    /** @var Aggregate $aggregate */
229 3
                    $aggregate = yield $this->repository->revert($aggregate, $toVersion, $mode);
230
231 2
                    return $aggregate;
232
                }
233 2
                catch (\Throwable $throwable)
234
                {
235 2
                    throw RevertAggregateVersionFailed::fromThrowable($throwable);
236
                }
237
                finally
238
                {
239 3
                    yield from $this->releaseMutex($aggregate->id());
240
                }
241 3
            },
242 3
            $aggregate,
243 3
            $toVersion,
244 3
            $mode
245
        );
246
    }
247
248
    /**
249
     * @param AggregateId $id
250
     *
251
     * @return \Generator
252
     */
253 4
    private function setupMutex(AggregateId $id): \Generator
254
    {
255 4
        $mutexKey = createAggregateMutexKey($id);
256
257 4
        if (false === isset($this->locks[$mutexKey]))
258
        {
259 4
            $mutex = $this->mutexFactory->create($mutexKey);
260
261
            /** @psalm-suppress InvalidPropertyAssignmentValue */
262 4
            $this->locks[$mutexKey] = yield $mutex->acquire();
263
        }
264 4
    }
265
266
    /**
267
     * @param AggregateId $id
268
     *
269
     * @return \Generator
270
     */
271 5
    private function releaseMutex(AggregateId $id): \Generator
272
    {
273 5
        $mutexKey = createAggregateMutexKey($id);
274
275 5
        if (true === isset($this->locks[$mutexKey]))
276
        {
277
            /** @var Lock $lock */
278 3
            $lock = $this->locks[$mutexKey];
279
280 3
            yield $lock->release();
281
282 3
            unset($this->locks[$mutexKey]);
283
        }
284 5
    }
285
}
286