Aggregate::applyEvent()   A
last analyzed

Complexity

Conditions 2
Paths 2

Size

Total Lines 7
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 5
CRAP Score 2

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 2
eloc 4
c 1
b 0
f 0
nc 2
nop 1
dl 0
loc 7
ccs 5
cts 5
cp 1
crap 2
rs 10
1
<?php
2
3
/**
4
 * Event Sourcing implementation.
5
 *
6
 * @author  Maksim Masiukevich <[email protected]>
7
 * @license MIT
8
 * @license https://opensource.org/licenses/MIT
9
 */
10
11
declare(strict_types = 1);
12
13
namespace ServiceBus\EventSourcing;
14
15
use function ServiceBus\Common\now;
16
use function ServiceBus\Common\uuid;
17
use ServiceBus\EventSourcing\Contract\AggregateClosed;
18
use ServiceBus\EventSourcing\Contract\AggregateCreated;
19
use ServiceBus\EventSourcing\EventStream\AggregateEvent;
20
use ServiceBus\EventSourcing\EventStream\AggregateEventStream;
21
use ServiceBus\EventSourcing\Exceptions\AttemptToChangeClosedStream;
22
23
/**
24
 * Aggregate base class.
25
 */
26
abstract class Aggregate
27
{
28
    public const   START_PLAYHEAD_INDEX = 0;
29
30
    private const  EVENT_APPLY_PREFIX = 'on';
31
32
    private const INTERNAL_EVENTS = [
33
        AggregateCreated::class,
34
        AggregateClosed::class,
35
    ];
36
37
    private const INCREASE_VERSION_STEP = 1;
38
39
    /**
40
     * Aggregate identifier.
41
     *
42
     * @var AggregateId
43
     */
44
    private $id;
45
46
    /**
47
     * Current version.
48
     *
49
     * @var int
50
     */
51
    private $version = self::START_PLAYHEAD_INDEX;
52
53
    /**
54
     * List of applied aggregate events.
55
     *
56
     * @psalm-var array<int, \ServiceBus\EventSourcing\EventStream\AggregateEvent>
57
     *
58
     * @var \ServiceBus\EventSourcing\EventStream\AggregateEvent[]
59
     */
60
    private $events;
61
62
    /**
63
     * Created at datetime.
64
     *
65
     * @var \DateTimeImmutable
66
     */
67
    private $createdAt;
68
69
    /**
70
     * Closed at datetime.
71
     *
72
     * @var \DateTimeImmutable|null
73
     */
74
    private $closedAt = null;
75
76 10
    final public function __construct(AggregateId $id)
77
    {
78 10
        $this->id = $id;
79
80 10
        $this->clearEvents();
81
82 10
        $this->raise(
83 10
            new AggregateCreated($id, \get_class($this), now())
84
        );
85 10
    }
86
87
    /**
88
     * Receive id.
89
     */
90 8
    final public function id(): AggregateId
91
    {
92 8
        return $this->id;
93
    }
94
95
    /**
96
     * Receive created at datetime.
97
     */
98 7
    final public function getCreatedAt(): \DateTimeImmutable
99
    {
100 7
        return $this->createdAt;
101
    }
102
103
    /**
104
     * Raise (apply event).
105
     *
106
     * @throws \ServiceBus\EventSourcing\Exceptions\AttemptToChangeClosedStream
107
     */
108 10
    final protected function raise(object $event): void
109
    {
110 10
        if (null !== $this->closedAt)
111
        {
112 1
            throw new AttemptToChangeClosedStream($this->id);
113
        }
114
115 10
        $specifiedEvent = $event;
116
117 10
        $this->attachEvent($specifiedEvent);
118 10
        $this->applyEvent($specifiedEvent);
119 10
    }
120
121
    /**
122
     * Receive aggregate version.
123
     */
124 7
    final public function version(): int
125
    {
126 7
        return $this->version;
127
    }
128
129
    /**
130
     * Close aggregate (make it read-only).
131
     *
132
     * @throws \ServiceBus\EventSourcing\Exceptions\AttemptToChangeClosedStream
133
     */
134 1
    final protected function close(): void
135
    {
136
        /** @psalm-var class-string<\ServiceBus\EventSourcing\Aggregate> $aggregateClass */
137 1
        $aggregateClass = \get_class($this);
138
139 1
        $this->raise(
140 1
            new AggregateClosed($this->id, $aggregateClass, now())
141
        );
142 1
    }
143
144
    /**
145
     * On aggregate closed.
146
     *
147
     * @noinspection PhpUnusedPrivateMethodInspection
148
     */
149 1
    private function onAggregateClosed(AggregateClosed $event): void
150
    {
151 1
        $this->closedAt = $event->datetime;
152 1
    }
153
154
    /**
155
     * On aggregate created.
156
     *
157
     * @noinspection PhpUnusedPrivateMethodInspection
158
     */
159 10
    private function onAggregateCreated(AggregateCreated $event): void
160
    {
161 10
        $this->createdAt = $event->datetime;
162 10
    }
163
164
    /**
165
     * Receive uncommitted events as stream.
166
     *
167
     * @noinspection PhpUnusedPrivateMethodInspection
168
     *
169
     * @see          EventSourcingProvider::save()
170
     */
171 9
    private function makeStream(): AggregateEventStream
172
    {
173 9
        $events = $this->events;
174
175
        /** @psalm-var class-string<\ServiceBus\EventSourcing\Aggregate> $aggregateClass */
176 9
        $aggregateClass = \get_class($this);
177
178 9
        $this->clearEvents();
179
180 9
        return new AggregateEventStream(
181 9
            $this->id,
182
            $aggregateClass,
183
            $events,
184 9
            $this->createdAt,
185 9
            $this->closedAt
186
        );
187
    }
188
189
    /**
190
     * Restore from event stream.
191
     *
192
     * @noinspection PhpUnusedPrivateMethodInspection
193
     *
194
     * @see          EventSourcingProvider::load()
195
     */
196 5
    private function appendStream(AggregateEventStream $aggregateEventsStream): void
197
    {
198 5
        $this->clearEvents();
199
200 5
        $this->id = $aggregateEventsStream->id;
201
202
        /** @var AggregateEvent $aggregateEvent */
203 5
        foreach ($aggregateEventsStream->events as $aggregateEvent)
204
        {
205 4
            $this->applyEvent($aggregateEvent->event);
206
207 4
            $this->increaseVersion(self::INCREASE_VERSION_STEP);
208
        }
209 5
    }
210
211
    /**
212
     * Attach event to stream.
213
     */
214 10
    private function attachEvent(object $event): void
215
    {
216 10
        $this->increaseVersion(self::INCREASE_VERSION_STEP);
217
218 10
        $this->events[] = AggregateEvent::create(uuid(), $event, $this->version, now());
219 10
    }
220
221
    /**
222
     * Apply event.
223
     */
224 10
    private function applyEvent(object $event): void
225
    {
226 10
        $eventListenerMethodName = self::createListenerName($event);
227
228 10
        self::isInternalEvent($event)
229 10
            ? $this->processInternalEvent($eventListenerMethodName, $event)
230 4
            : $this->processChildEvent($eventListenerMethodName, $event);
231 10
    }
232
233
    /**
234
     * Is internal event (for current class).
235
     */
236 10
    private static function isInternalEvent(object $event): bool
237
    {
238 10
        return \in_array(\get_class($event), self::INTERNAL_EVENTS, true);
239
    }
240
241 10
    private function processInternalEvent(string $listenerName, object $event): void
242
    {
243 10
        $this->{$listenerName}($event);
244 10
    }
245
246 4
    private function processChildEvent(string $listenerName, object $event): void
247
    {
248
        /**
249
         * Call child class method.
250
         *
251
         * @param object $event
252
         *
253
         * @return void
254
         */
255
        $closure = function(object $event) use ($listenerName): void
256
        {
257 4
            if (\method_exists($this, $listenerName))
258
            {
259 4
                $this->{$listenerName}($event);
260
            }
261 4
        };
262
263 4
        $closure->call($this, $event);
264 4
    }
265
266
    /**
267
     * Create event listener name.
268
     */
269 10
    private static function createListenerName(object $event): string
270
    {
271 10
        $eventListenerMethodNameParts = \explode('\\', \get_class($event));
272
273
        /** @var string $latestPart */
274 10
        $latestPart = \end($eventListenerMethodNameParts);
275
276 10
        return \sprintf(
277 10
            '%s%s',
278 10
            self::EVENT_APPLY_PREFIX,
279 10
            $latestPart
280
        );
281
    }
282
283
    /**
284
     * Increase aggregate version.
285
     */
286 10
    private function increaseVersion(int $step): void
287
    {
288 10
        $this->version += $step;
289 10
    }
290
291
    /**
292
     * Clear all aggregate events.
293
     */
294 10
    private function clearEvents(): void
295
    {
296 10
        $this->events = [];
297 10
    }
298
}
299