Completed
Push — v4.0 ( d1a754...a6a10d )
by Masiukevich
01:55
created

EventSourcingProvider::revert()   A

Complexity

Conditions 2
Paths 1

Size

Total Lines 27
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 6

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 2
eloc 11
c 1
b 0
f 0
nc 1
nop 3
dl 0
loc 27
ccs 0
cts 9
cp 0
crap 6
rs 9.9
1
<?php
2
3
/**
4
 * Event Sourcing implementation module.
5
 *
6
 * @author  Maksim Masiukevich <[email protected]>
7
 * @license MIT
8
 * @license https://opensource.org/licenses/MIT
9
 */
10
11
declare(strict_types = 1);
12
13
namespace ServiceBus\EventSourcingModule;
14
15
use ServiceBus\Mutex\InMemory\InMemoryMutexFactory;
16
use function Amp\call;
17
use Amp\Promise;
18
use ServiceBus\Common\Context\ServiceBusContext;
19
use ServiceBus\EventSourcing\Aggregate;
20
use ServiceBus\EventSourcing\AggregateId;
21
use ServiceBus\EventSourcing\EventStream\EventStreamRepository;
22
use ServiceBus\EventSourcingModule\Exceptions\DuplicateAggregate;
23
use ServiceBus\EventSourcingModule\Exceptions\LoadAggregateFailed;
24
use ServiceBus\EventSourcingModule\Exceptions\RevertAggregateVersionFailed;
25
use ServiceBus\EventSourcingModule\Exceptions\SaveAggregateFailed;
26
use ServiceBus\Mutex\Lock;
27
use ServiceBus\Mutex\MutexFactory;
28
use ServiceBus\Storage\Common\Exceptions\UniqueConstraintViolationCheckFailed;
29
30
/**
31
 *
32
 */
33
final class EventSourcingProvider
34
{
35
    /** @var EventStreamRepository */
36
    private $repository;
37
38
    /**
39
     * List of loaded/added aggregates.
40
     *
41
     * @psalm-var array<string, string>
42
     *
43
     * @var string[]
44
     */
45
    private $aggregates = [];
46
47
    /** @var MutexFactory */
48
    private $mutexFactory;
49
50
    /** @var Lock[] */
51
    private $lockCollection = [];
52
53 6
    public function __construct(EventStreamRepository $repository, ?MutexFactory $mutexFactory = null)
54
    {
55 6
        $this->repository   = $repository;
56 6
        $this->mutexFactory = $mutexFactory ?? new InMemoryMutexFactory();
57 6
    }
58
59
    /**
60
     * Load aggregate.
61
     *
62
     * Returns \ServiceBus\EventSourcing\Aggregate|null
63
     *
64
     * @throws \ServiceBus\EventSourcingModule\Exceptions\LoadAggregateFailed
65
     */
66
    public function load(AggregateId $id): Promise
67
    {
68
        return call(
69
            function () use ($id): \Generator
70
            {
71
                yield from $this->setupMutex($id);
72
73
                try
74
                {
75
                    /** @var Aggregate|null $aggregate */
76
                    $aggregate = yield $this->repository->load($id);
77
78
                    if ($aggregate !== null)
79
                    {
80
                        $this->aggregates[$aggregate->id()->toString()] = \get_class($aggregate);
81
                    }
82
83
                    return $aggregate;
84
                }
85
                catch (\Throwable $throwable)
86
                {
87
                    throw LoadAggregateFailed::fromThrowable($throwable);
88
                }
89
                finally
90
                {
91
                    yield from $this->releaseMutex($id);
92
                }
93
            }
94
        );
95
    }
96
97
    /**
98
     * Save a new aggregate.
99
     *
100
     * @throws \ServiceBus\EventSourcingModule\Exceptions\SaveAggregateFailed
101
     * @throws \ServiceBus\EventSourcingModule\Exceptions\DuplicateAggregate
102
     */
103
    public function save(Aggregate $aggregate, ServiceBusContext $context): Promise
104
    {
105
        return call(
106
            function () use ($aggregate, $context): \Generator
107
            {
108
                try
109
                {
110
                    /** The aggregate hasn't been loaded before, which means it is new */
111
                    if (isset($this->aggregates[$aggregate->id()->toString()]) === false)
112
                    {
113
                        /**
114
                         * @psalm-var  array<int, object> $events
115
                         *
116
                         * @var object[] $events
117
                         */
118
                        $events = yield $this->repository->save($aggregate);
119
120
                        $this->aggregates[$aggregate->id()->toString()] = \get_class($aggregate);
121
                    }
122
                    else
123
                    {
124
                        /**
125
                         * @psalm-var array<int, object> $events
126
                         *
127
                         * @var object[] $events
128
                         */
129
                        $events = yield $this->repository->update($aggregate);
130
                    }
131
132
                    $promises = [];
133
134
                    /** @var object $event */
135
                    foreach ($events as $event)
136
                    {
137
                        $promises[] = $context->delivery($event);
138
                    }
139
140
                    yield $promises;
141
                }
142
                catch (UniqueConstraintViolationCheckFailed $exception)
143
                {
144
                    throw DuplicateAggregate::create($aggregate->id());
145
                }
146
                catch (\Throwable $throwable)
147
                {
148
                    throw SaveAggregateFailed::fromThrowable($throwable);
149
                }
150
                finally
151
                {
152
                    yield from $this->releaseMutex($aggregate->id());
153
                }
154
            }
155
        );
156
    }
157
158
    /**
159
     * Revert aggregate to specified version.
160
     *
161
     * Returns \ServiceBus\EventSourcing\Aggregate
162
     *
163
     * Mode options:
164
     *   - 1 (EventStreamRepository::REVERT_MODE_SOFT_DELETE): Mark tail events as deleted (soft deletion). There may
165
     *   be version conflicts in some situations
166
     *   - 2 (EventStreamRepository::REVERT_MODE_DELETE): Removes tail events from the database (the best option)
167
     *
168
     * @throws \ServiceBus\EventSourcingModule\Exceptions\RevertAggregateVersionFailed
169
     */
170
    public function revert(
171
        Aggregate $aggregate,
172
        int $toVersion,
173
        ?int $mode = null
174
    ): Promise {
175
        $mode = $mode ?? EventStreamRepository::REVERT_MODE_SOFT_DELETE;
176
177
        /** @psalm-suppress InvalidArgument Incorrect psalm unpack parameters (...$args) */
178
        return call(
179
            function () use ($aggregate, $toVersion, $mode): \Generator
180
            {
181
                yield from $this->setupMutex($aggregate->id());
182
183
                try
184
                {
185
                    /** @var Aggregate $aggregate */
186
                    $aggregate = yield $this->repository->revert($aggregate, $toVersion, $mode);
187
188
                    return $aggregate;
189
                }
190
                catch (\Throwable $throwable)
191
                {
192
                    throw RevertAggregateVersionFailed::fromThrowable($throwable);
193
                }
194
                finally
195
                {
196
                    yield from $this->releaseMutex($aggregate->id());
197
                }
198
            }
199
        );
200
    }
201
202
    private function setupMutex(AggregateId $id): \Generator
203
    {
204
        $mutexKey = createAggregateMutexKey($id);
205
206
        if (\array_key_exists($mutexKey, $this->lockCollection) === false)
207
        {
208
            $mutex = $this->mutexFactory->create($mutexKey);
209
210
            /** @var Lock $lock */
211
            $lock = yield $mutex->acquire();
212
213
            $this->lockCollection[$mutexKey] = $lock;
214
        }
215
    }
216
217
    private function releaseMutex(AggregateId $id): \Generator
218
    {
219
        $mutexKey = createAggregateMutexKey($id);
220
221
        if (\array_key_exists($mutexKey, $this->lockCollection) === true)
222
        {
223
            /** @var Lock $lock */
224
            $lock = $this->lockCollection[$mutexKey];
225
226
            unset($this->lockCollection[$mutexKey]);
227
228
            yield $lock->release();
229
        }
230
    }
231
}
232