Passed
Push — v3.0 ( 80e567...6feae5 )
by Masiukevich
02:24
created

EventSourcingProvider::__construct()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 1
nc 1
nop 1
dl 0
loc 3
ccs 2
cts 2
cp 1
crap 1
rs 10
c 0
b 0
f 0
1
<?php
2
3
/**
4
 * Event Sourcing implementation module.
5
 *
6
 * @author  Maksim Masiukevich <[email protected]>
7
 * @license MIT
8
 * @license https://opensource.org/licenses/MIT
9
 */
10
11
declare(strict_types = 1);
12
13
namespace ServiceBus\EventSourcingModule;
14
15
use function Amp\call;
16
use Amp\Promise;
17
use ServiceBus\Common\Context\ServiceBusContext;
18
use ServiceBus\EventSourcing\Aggregate;
19
use ServiceBus\EventSourcing\AggregateId;
20
use ServiceBus\EventSourcing\EventStream\EventStreamRepository;
21
use ServiceBus\EventSourcingModule\Exceptions\DuplicateAggregate;
22
use ServiceBus\EventSourcingModule\Exceptions\LoadAggregateFailed;
23
use ServiceBus\EventSourcingModule\Exceptions\RevertAggregateVersionFailed;
24
use ServiceBus\EventSourcingModule\Exceptions\SaveAggregateFailed;
25
use ServiceBus\Storage\Common\Exceptions\UniqueConstraintViolationCheckFailed;
26
27
/**
28
 *
29
 */
30
final class EventSourcingProvider
31
{
32
    /**
33
     * @var EventStreamRepository
34
     */
35
    private $repository;
36
37
    /**
38
     * List of loaded/added aggregates.
39
     *
40
     * @psalm-var array<string, string>
41
     *
42
     * @var array
43
     */
44
    private $aggregates = [];
45
46
    /**
47
     * @param EventStreamRepository $repository
48
     */
49 6
    public function __construct(EventStreamRepository $repository)
50
    {
51 6
        $this->repository = $repository;
52 6
    }
53
54
    /**
55
     * Load aggregate.
56
     *
57
     * @noinspection PhpDocRedundantThrowsInspection
58
     *
59
     * @param AggregateId $id
60
     *
61
     * @throws \ServiceBus\EventSourcingModule\Exceptions\LoadAggregateFailed
62
     *
63
     * @return Promise
64
     */
65 1
    public function load(AggregateId $id): Promise
66
    {
67
        /** @psalm-suppress InvalidArgument Incorrect psalm unpack parameters (...$args) */
68 1
        return call(
69
            function(AggregateId $id): \Generator
70
            {
71
                try
72
                {
73
                    /**
74
                     * @psalm-suppress TooManyTemplateParams Wrong Promise template
75
                     *
76
                     * @var Aggregate|null $aggregate
77
                     */
78 1
                    $aggregate = yield $this->repository->load($id);
79
80 1
                    if (null !== $aggregate)
81
                    {
82 1
                        $this->aggregates[(string) $aggregate->id()] = \get_class($aggregate);
83
                    }
84
85 1
                    return $aggregate;
86
                }
87
                catch (\Throwable $throwable)
88
                {
89
                    throw LoadAggregateFailed::fromThrowable($throwable);
90
                }
91 1
            },
92 1
            $id
93
        );
94
    }
95
96
    /**
97
     * Save a new aggregate.
98
     *
99
     * @noinspection PhpDocRedundantThrowsInspection
100
     *
101
     * @param Aggregate         $aggregate
102
     * @param ServiceBusContext $context
103
     *
104
     * @throws \ServiceBus\EventSourcingModule\Exceptions\DuplicateAggregate
105
     * @throws \ServiceBus\EventSourcingModule\Exceptions\SaveAggregateFailed
106
     *
107
     * @return Promise
108
     */
109 4
    public function save(Aggregate $aggregate, ServiceBusContext $context): Promise
110
    {
111
        /** @psalm-suppress InvalidArgument Incorrect psalm unpack parameters (...$args) */
112 4
        return call(
113
            function(Aggregate $aggregate, ServiceBusContext $context): \Generator
114
            {
115
                try
116
                {
117
                    /** The aggregate hasn't been loaded before, which means it is new */
118 4
                    if (false === isset($this->aggregates[(string) $aggregate->id()]))
119
                    {
120
                        /**
121
                         * @psalm-suppress TooManyTemplateParams Wrong Promise template
122
                         * @psalm-var      array<int, object> $events
123
                         *
124
                         * @var object[] $events
125
                         */
126 4
                        $events = yield $this->repository->save($aggregate);
127
128 4
                        $this->aggregates[(string) $aggregate->id()] = \get_class($aggregate);
129
                    }
130
                    else
131
                    {
132
                        /**
133
                         * @psalm-suppress TooManyTemplateParams Wrong Promise template
134
                         * @psalm-var      array<int, object> $events
135
                         *
136
                         * @var object[] $events
137
                         */
138 3
                        $events = yield $this->repository->update($aggregate);
139
                    }
140
141 4
                    $promises = [];
142
143
                    /** @var object $event */
144 4
                    foreach ($events as $event)
145
                    {
146 4
                        $promises[] = $context->delivery($event);
147
                    }
148
149 4
                    yield $promises;
150
                }
151 1
                catch (UniqueConstraintViolationCheckFailed $exception)
152
                {
153 1
                    throw DuplicateAggregate::create($aggregate->id());
154
                }
155
                catch (\Throwable $throwable)
156
                {
157
                    throw SaveAggregateFailed::fromThrowable($throwable);
158
                }
159 4
            },
160 4
            $aggregate,
161 4
            $context
162
        );
163
    }
164
165
    /**
166
     * Revert aggregate to specified version.
167
     *
168
     * Mode options:
169
     *   - 1 (EventStreamRepository::REVERT_MODE_SOFT_DELETE): Mark tail events as deleted (soft deletion). There may
170
     *   be version conflicts in some situations
171
     *   - 2 (EventStreamRepository::REVERT_MODE_DELETE): Removes tail events from the database (the best option)
172
     *
173
     * @noinspection   PhpDocRedundantThrowsInspection
174
     * @psalm-suppress MixedTypeCoercion Incorrect resolving the value of the promise
175
     *
176
     * @param Aggregate $aggregate
177
     * @param int       $toVersion
178
     * @param int       $mode
179
     *
180
     * @throws \ServiceBus\EventSourcingModule\Exceptions\RevertAggregateVersionFailed
181
     *
182
     * @return Promise<\ServiceBus\EventSourcing\Aggregate>
183
     */
184 3
    public function revert(
185
        Aggregate $aggregate,
186
        int $toVersion,
187
        int $mode = EventStreamRepository::REVERT_MODE_SOFT_DELETE
188
    ): Promise {
189
        /** @psalm-suppress InvalidArgument Incorrect psalm unpack parameters (...$args) */
190 3
        return call(
191
            function(Aggregate $aggregate, int $toVersion, int $mode): \Generator
192
            {
193
                try
194
                {
195
                    /**
196
                     * @psalm-suppress TooManyTemplateParams Wrong Promise template
197
                     *
198
                     * @var Aggregate $aggregate
199
                     */
200 3
                    $aggregate = yield $this->repository->revert($aggregate, $toVersion, $mode);
201
202 2
                    return $aggregate;
203
                }
204 2
                catch (\Throwable $throwable)
205
                {
206 2
                    throw RevertAggregateVersionFailed::fromThrowable($throwable);
207
                }
208 3
            },
209 3
            $aggregate,
210 3
            $toVersion,
211 3
            $mode
212
        );
213
    }
214
}
215