Completed
Push — master ( 05eff2...70dca9 )
by Masiukevich
03:14
created

EventSourcingProvider::revert()   A

Complexity

Conditions 2
Paths 1

Size

Total Lines 26
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 8
CRAP Score 2

Importance

Changes 0
Metric Value
cc 2
eloc 8
nc 1
nop 3
dl 0
loc 26
ccs 8
cts 8
cp 1
crap 2
rs 10
c 0
b 0
f 0
1
<?php
2
3
/**
4
 * Event Sourcing implementation module
5
 *
6
 * @author  Maksim Masiukevich <[email protected]>
7
 * @license MIT
8
 * @license https://opensource.org/licenses/MIT
9
 */
10
11
declare(strict_types = 1);
12
13
namespace ServiceBus\EventSourcingModule;
14
15
use function Amp\call;
16
use Amp\Promise;
17
use ServiceBus\Common\Context\ServiceBusContext;
18
use ServiceBus\EventSourcing\Aggregate;
19
use ServiceBus\EventSourcing\AggregateId;
20
use ServiceBus\EventSourcing\EventStream\EventStreamRepository;
21
use ServiceBus\EventSourcingModule\Exceptions\DuplicateAggregate;
22
use ServiceBus\EventSourcingModule\Exceptions\LoadAggregateFailed;
23
use ServiceBus\EventSourcingModule\Exceptions\RevertAggregateVersionFailed;
24
use ServiceBus\EventSourcingModule\Exceptions\SaveAggregateFailed;
25
use ServiceBus\Storage\Common\Exceptions\UniqueConstraintViolationCheckFailed;
26
27
/**
28
 *
29
 */
30
final class EventSourcingProvider
31
{
32
    /**
33
     * @var EventStreamRepository
34
     */
35
    private $repository;
36
37
    /**
38
     * List of loaded/added aggregates
39
     *
40
     * @var array<string, string>
41
     */
42
    private $aggregates = [];
43
44
    /**
45
     * @param EventStreamRepository $repository
46
     */
47 6
    public function __construct(EventStreamRepository $repository)
48
    {
49 6
        $this->repository = $repository;
50 6
    }
51
52
    /**
53
     * Load aggregate
54
     *
55
     * @noinspection PhpDocRedundantThrowsInspection
56
     *
57
     * @param AggregateId $id
58
     *
59
     * @return Promise
60
     *
61
     * @throws \ServiceBus\EventSourcingModule\Exceptions\LoadAggregateFailed
62
     */
63 1
    public function load(AggregateId $id): Promise
64
    {
65
        /** @psalm-suppress InvalidArgument Incorrect psalm unpack parameters (...$args) */
66 1
        return call(
67
            function(AggregateId $id): \Generator
68
            {
69
                try
70
                {
71
                    /**
72
                     * @psalm-suppress TooManyTemplateParams Wrong Promise template
73
                     * @var Aggregate|null $aggregate
74
                     */
75 1
                    $aggregate = yield $this->repository->load($id);
76
77 1
                    if(null !== $aggregate)
78
                    {
79 1
                        $this->aggregates[(string) $aggregate->id()] = \get_class($aggregate);
80
                    }
81
82 1
                    return $aggregate;
83
                }
84
                catch(\Throwable $throwable)
85
                {
86
                    throw LoadAggregateFailed::fromThrowable($throwable);
87
                }
88 1
            },
89 1
            $id
90
        );
91
    }
92
93
    /**
94
     * Save a new aggregate
95
     *
96
     * @noinspection PhpDocRedundantThrowsInspection
97
     *
98
     * @param Aggregate         $aggregate
99
     * @param ServiceBusContext $context
100
     *
101
     * @return Promise
102
     *
103
     * @throws \ServiceBus\EventSourcingModule\Exceptions\DuplicateAggregate
104
     * @throws \ServiceBus\EventSourcingModule\Exceptions\SaveAggregateFailed
105
     */
106 4
    public function save(Aggregate $aggregate, ServiceBusContext $context): Promise
107
    {
108
        /** @psalm-suppress InvalidArgument Incorrect psalm unpack parameters (...$args) */
109 4
        return call(
110
            function(Aggregate $aggregate, ServiceBusContext $context): \Generator
111
            {
112
                try
113
                {
114
                    /** The aggregate hasn't been loaded before, which means it is new */
115 4
                    if(false === isset($this->aggregates[(string) $aggregate->id()]))
116
                    {
117
                        /**
118
                         * @psalm-suppress TooManyTemplateParams Wrong Promise template
119
                         * @var array<int, \ServiceBus\Common\Messages\Event> $events
120
                         */
121 4
                        $events = yield $this->repository->save($aggregate);
122
123 4
                        $this->aggregates[(string) $aggregate->id()] = \get_class($aggregate);
124
                    }
125
                    else
126
                    {
127
                        /**
128
                         * @psalm-suppress TooManyTemplateParams Wrong Promise template
129
                         * @var array<int, \ServiceBus\Common\Messages\Event> $events
130
                         */
131 3
                        $events = yield $this->repository->update($aggregate);
132
                    }
133
134 4
                    $promises = [];
135
136
                    /** @var \ServiceBus\Common\Messages\Event $event */
137 4
                    foreach($events as $event)
138
                    {
139 4
                        $promises[] = $context->delivery($event);
140
                    }
141
142 4
                    yield $promises;
143
                }
144 1
                catch(UniqueConstraintViolationCheckFailed $exception)
145
                {
146 1
                    throw DuplicateAggregate::create($aggregate->id());
147
                }
148
                catch(\Throwable $throwable)
149
                {
150
                    throw SaveAggregateFailed::fromThrowable($throwable);
151
                }
152 4
            },
153 4
            $aggregate, $context
154
        );
155
    }
156
157
    /**
158
     * Revert aggregate to specified version
159
     *
160
     * Mode options:
161
     *   - 1 (EventStreamRepository::REVERT_MODE_SOFT_DELETE): Mark tail events as deleted (soft deletion). There may be version conflicts in some situations
162
     *   - 2 (EventStreamRepository::REVERT_MODE_DELETE): Removes tail events from the database (the best option)
163
     *
164
     * @noinspection PhpDocRedundantThrowsInspection
165
     * @psalm-suppress MixedTypeCoercion Incorrect resolving the value of the promise
166
     *
167
     * @param Aggregate $aggregate
168
     * @param int       $toVersion
169
     * @param int       $mode
170
     *
171
     * @return Promise<\ServiceBus\EventSourcing\Aggregate>
172
     *
173
     * @throws \ServiceBus\EventSourcingModule\Exceptions\RevertAggregateVersionFailed
174
     */
175 3
    public function revert(
176
        Aggregate $aggregate,
177
        int $toVersion,
178
        int $mode = EventStreamRepository::REVERT_MODE_SOFT_DELETE
179
    ): Promise
180
    {
181
        /** @psalm-suppress InvalidArgument Incorrect psalm unpack parameters (...$args) */
182 3
        return call(
183
            function(Aggregate $aggregate, int $toVersion, int $mode): \Generator
184
            {
185
                try
186
                {
187
                    /**
188
                     * @psalm-suppress TooManyTemplateParams Wrong Promise template
189
                     * @var Aggregate $aggregate
190
                     */
191 3
                    $aggregate = yield $this->repository->revert($aggregate, $toVersion, $mode);
192
193 2
                    return $aggregate;
194
                }
195 2
                catch(\Throwable $throwable)
196
                {
197 2
                    throw RevertAggregateVersionFailed::fromThrowable($throwable);
198
                }
199 3
            },
200 3
            $aggregate, $toVersion, $mode
201
        );
202
    }
203
}
204