Completed
Push — v4.0 ( ff495d...e0bacf )
by Masiukevich
01:44
created

Aggregate::onAggregateCreated()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
eloc 1
c 1
b 0
f 0
nc 1
nop 1
dl 0
loc 3
ccs 2
cts 2
cp 1
crap 1
rs 10
1
<?php
2
3
/**
4
 * Event Sourcing implementation.
5
 *
6
 * @author  Maksim Masiukevich <[email protected]>
7
 * @license MIT
8
 * @license https://opensource.org/licenses/MIT
9
 */
10
11
declare(strict_types = 1);
12
13
namespace ServiceBus\EventSourcing;
14
15
use function ServiceBus\Common\datetimeInstantiator;
16
use function ServiceBus\Common\uuid;
17
use ServiceBus\EventSourcing\Contract\AggregateClosed;
18
use ServiceBus\EventSourcing\Contract\AggregateCreated;
19
use ServiceBus\EventSourcing\EventStream\AggregateEvent;
20
use ServiceBus\EventSourcing\EventStream\AggregateEventStream;
21
use ServiceBus\EventSourcing\Exceptions\AttemptToChangeClosedStream;
22
23
/**
24
 * Aggregate base class.
25
 */
26
abstract class Aggregate
27
{
28
    public const   START_PLAYHEAD_INDEX = 0;
29
30
    private const  EVENT_APPLY_PREFIX = 'on';
31
32
    private const INTERNAL_EVENTS = [
33
        AggregateCreated::class,
34
        AggregateClosed::class,
35
    ];
36
37
    private const INCREASE_VERSION_STEP = 1;
38
39
    /**
40
     * Aggregate identifier.
41
     */
42
    private AggregateId $id;
43
44
    /**
45
     * Current version.
46
     */
47
    private int $version = self::START_PLAYHEAD_INDEX;
48
49
    /**
50
     * List of applied aggregate events.
51
     *
52
     * @psalm-var array<int, \ServiceBus\EventSourcing\EventStream\AggregateEvent>
53
     *
54
     * @var \ServiceBus\EventSourcing\EventStream\AggregateEvent[]
55
     */
56
    private array
57
58
 $events;
59
60
    /**
61
     * Created at datetime.
62
     */
63
    private \DateTimeImmutable $createdAt;
64
65
    /**
66
     * Closed at datetime.
67
     */
68
    private ?\DateTimeImmutable $closedAt = null;
69
70 10
    final public function __construct(AggregateId $id)
71
    {
72 10
        $this->id = $id;
73
74 10
        $this->clearEvents();
75
76 10
        $this->raise(
77 10
            new AggregateCreated($id, \get_class($this))
78
        );
79 10
    }
80
81
    /**
82
     * Receive id.
83
     */
84 8
    final public function id(): AggregateId
85
    {
86 8
        return $this->id;
87
    }
88
89
    /**
90
     * Receive created at datetime.
91
     */
92 7
    final public function getCreatedAt(): \DateTimeImmutable
93
    {
94 7
        return $this->createdAt;
95
    }
96
97
    /**
98
     * Raise (apply event).
99
     *
100
     * @throws \ServiceBus\EventSourcing\Exceptions\AttemptToChangeClosedStream
101
     */
102 10
    final protected function raise(object $event): void
103
    {
104 10
        if (null !== $this->closedAt)
105
        {
106 1
            throw new AttemptToChangeClosedStream($this->id);
107
        }
108
109 10
        $specifiedEvent = $event;
110
111 10
        $this->attachEvent($specifiedEvent);
112 10
        $this->applyEvent($specifiedEvent);
113 10
    }
114
115
    /**
116
     * Receive aggregate version.
117
     */
118 7
    final public function version(): int
119
    {
120 7
        return $this->version;
121
    }
122
123
    /**
124
     * Close aggregate (make it read-only).
125
     *
126
     * @throws \ServiceBus\EventSourcing\Exceptions\AttemptToChangeClosedStream
127
     */
128 1
    final protected function close(): void
129
    {
130
        /** @psalm-var class-string<\ServiceBus\EventSourcing\Aggregate> $aggregateClass */
131 1
        $aggregateClass = \get_class($this);
132
133 1
        $this->raise(
134 1
            new AggregateClosed($this->id, $aggregateClass)
135
        );
136 1
    }
137
138
    /**
139
     * On aggregate closed.
140
     *
141
     * @noinspection PhpUnusedPrivateMethodInspection
142
     */
143 1
    private function onAggregateClosed(AggregateClosed $event): void
144
    {
145 1
        $this->closedAt = $event->datetime;
146 1
    }
147
148
    /**
149
     * On aggregate created.
150
     *
151
     * @noinspection PhpUnusedPrivateMethodInspection
152
     */
153 10
    private function onAggregateCreated(AggregateCreated $event): void
154
    {
155 10
        $this->createdAt = $event->datetime;
156 10
    }
157
158
    /**
159
     * Receive uncommitted events as stream.
160
     *
161
     * @noinspection PhpUnusedPrivateMethodInspection
162
     *
163
     * @see          EventSourcingProvider::save()
164
     */
165 9
    private function makeStream(): AggregateEventStream
166
    {
167 9
        $events = $this->events;
168
169
        /** @psalm-var class-string<\ServiceBus\EventSourcing\Aggregate> $aggregateClass */
170 9
        $aggregateClass = \get_class($this);
171
172 9
        $this->clearEvents();
173
174 9
        return new AggregateEventStream(
175 9
            $this->id,
176
            $aggregateClass,
177
            $events,
178 9
            $this->createdAt,
179 9
            $this->closedAt
180
        );
181
    }
182
183
    /**
184
     * Restore from event stream.
185
     *
186
     * @noinspection PhpUnusedPrivateMethodInspection
187
     *
188
     * @see          EventSourcingProvider::load()
189
     */
190 5
    private function appendStream(AggregateEventStream $aggregateEventsStream): void
191
    {
192 5
        $this->clearEvents();
193
194 5
        $this->id = $aggregateEventsStream->id;
195
196
        /** @var AggregateEvent $aggregateEvent */
197 5
        foreach ($aggregateEventsStream->events as $aggregateEvent)
198
        {
199 4
            $this->applyEvent($aggregateEvent->event);
200
201 4
            $this->increaseVersion(self::INCREASE_VERSION_STEP);
202
        }
203 5
    }
204
205
    /**
206
     * Attach event to stream.
207
     */
208 10
    private function attachEvent(object $event): void
209
    {
210 10
        $this->increaseVersion(self::INCREASE_VERSION_STEP);
211
212
        /** @var \DateTimeImmutable $currentDate */
213 10
        $currentDate = datetimeInstantiator('NOW');
214
215 10
        $this->events[] = AggregateEvent::create(uuid(), $event, $this->version, $currentDate);
216 10
    }
217
218
    /**
219
     * Apply event.
220
     */
221 10
    private function applyEvent(object $event): void
222
    {
223 10
        $eventListenerMethodName = self::createListenerName($event);
224
225 10
        true === self::isInternalEvent($event)
226 10
            ? $this->processInternalEvent($eventListenerMethodName, $event)
227 4
            : $this->processChildEvent($eventListenerMethodName, $event);
228 10
    }
229
230
    /**
231
     * Is internal event (for current class).
232
     */
233 10
    private static function isInternalEvent(object $event): bool
234
    {
235 10
        return true === \in_array(\get_class($event), self::INTERNAL_EVENTS, true);
236
    }
237
238 10
    private function processInternalEvent(string $listenerName, object $event): void
239
    {
240 10
        $this->{$listenerName}($event);
241 10
    }
242
243 4
    private function processChildEvent(string $listenerName, object $event): void
244
    {
245
        /**
246
         * Call child class method.
247
         *
248
         * @param object $event
249
         *
250
         * @return void
251
         */
252
        $closure = function(object $event) use ($listenerName): void
253
        {
254 4
            if (true === \method_exists($this, $listenerName))
255
            {
256 4
                $this->{$listenerName}($event);
257
            }
258 4
        };
259
260 4
        $closure->call($this, $event);
261 4
    }
262
263
    /**
264
     * Create event listener name.
265
     */
266 10
    private static function createListenerName(object $event): string
267
    {
268 10
        $eventListenerMethodNameParts = \explode('\\', \get_class($event));
269
270
        /** @var string $latestPart */
271 10
        $latestPart = \end($eventListenerMethodNameParts);
272
273 10
        return \sprintf(
274 10
            '%s%s',
275 10
            self::EVENT_APPLY_PREFIX,
276 10
            $latestPart
277
        );
278
    }
279
280
    /**
281
     * Increase aggregate version.
282
     */
283 10
    private function increaseVersion(int $step): void
284
    {
285 10
        $this->version += $step;
286 10
    }
287
288
    /**
289
     * Clear all aggregate events.
290
     */
291 10
    private function clearEvents(): void
292
    {
293 10
        $this->events = [];
294 10
    }
295
}
296