Passed
Push — master ( 3112ca...a45803 )
by Blixit
02:01
created

EventStore::load()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 1
nc 1
nop 0
dl 0
loc 3
rs 10
c 0
b 0
f 0
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Blixit\EventSourcing\Store;
6
7
use Blixit\EventSourcing\Aggregate\AggregateAccessor;
8
use Blixit\EventSourcing\Aggregate\AggregateRoot;
9
use Blixit\EventSourcing\Aggregate\AggregateRootInterface;
10
use Blixit\EventSourcing\Event\EventAccessor;
11
use Blixit\EventSourcing\Event\EventInterface;
12
use Blixit\EventSourcing\Store\Exception\CorruptedReadEvent;
13
use Blixit\EventSourcing\Store\Exception\NonWritableEvent;
14
use Blixit\EventSourcing\Store\Persistence\EventPersisterException;
15
use Blixit\EventSourcing\Store\Persistence\EventPersisterInterface;
16
use Blixit\EventSourcing\Stream\ReadableStream;
17
use Blixit\EventSourcing\Stream\StorableStream;
18
use Blixit\EventSourcing\Stream\Strategy\StreamStrategy;
19
use Blixit\EventSourcing\Stream\Stream;
20
use Blixit\EventSourcing\Stream\StreamName;
21
use ReflectionClass;
22
use ReflectionException;
23
use Symfony\Component\Messenger\MessageBusInterface;
24
use Throwable;
25
use function sprintf;
26
27
class EventStore implements EventStoreInterface
28
{
29
    /** @var EventPersisterInterface $eventPersister */
30
    protected $eventPersister;
31
32
    /** @var string $aggregateClass */
33
    private $aggregateClass;
34
35
    /** @var StreamStrategy $streamStrategy */
36
    private $streamStrategy;
37
38
    /** @var MessageBusInterface $messageBus */
39
    private $messageBus;
40
41
    /** @var AggregateAccessor $aggregateAccessor */
42
    private $aggregateAccessor;
43
44
    /** @var EventAccessor $eventAccessor */
45
    private $eventAccessor;
46
47
    public function __construct(
48
        string $aggregateClass,
49
        EventPersisterInterface $eventPersister,
50
        string $streamStrategyClass,
51
        ?MessageBusInterface $messageBus = null
52
    ) {
53
        $this->eventPersister = $eventPersister;
54
        $this->aggregateClass = $aggregateClass;
55
        $this->streamStrategy = StreamStrategy::resolveStreamStrategy($streamStrategyClass);
56
57
        $this->messageBus = $messageBus;
58
59
        $this->aggregateAccessor = AggregateAccessor::getInstance();
60
        $this->eventAccessor     = EventAccessor::getInstance();
61
    }
62
63
    /**
64
     * @param mixed $aggregateId
65
     */
66
    public function getStreamNameForAggregateId($aggregateId = null) : StreamName
67
    {
68
        // TODO: add hook after stream computeName
69
        return $this->streamStrategy->computeName($this->aggregateClass, $aggregateId);
70
    }
71
72
    /**
73
     * @param mixed $aggregateId
74
     *
75
     * @throws ReflectionException
76
     * @throws CorruptedReadEvent
77
     */
78
    public function get($aggregateId) : ?AggregateRootInterface
79
    {
80
        /** @var AggregateRootInterface $aggregate */
81
        $aggregate = $this->buildAggregate($aggregateId);
82
        // compute streamName based on stream strategy
83
        $streamName = $this->getStreamNameForAggregateId($aggregateId);
84
        // get events from store
85
        $events = $this->eventPersister->getByStream($streamName, $aggregate->getSequence());
86
        // get beforeRead callback
87
        $beforeRead = [$this, 'beforeRead'];
88
        // build stream
89
        $stream = new ReadableStream(
90
            $streamName,
91
            $events,
92
            static function (EventInterface $event) use ($beforeRead, $aggregate) : void {
93
                $beforeRead($aggregate, $event);
94
            }
95
        );
96
97
        // replay events
98
        foreach ($stream->getIterator() as $event) {
99
            /** @var EventInterface $event */
100
101
            /** @var AggregateRoot $aggregate */
102
            $aggregate->apply($event);
103
            $this->aggregateAccessor->setVersionSequence($aggregate, $event->getSequence());
104
            $this->afterRead($aggregate, $event);
105
        }
106
107
        return $aggregate->getSequence() > 0 ? $aggregate : null;
108
    }
109
110
    /**
111
     * @param mixed $aggregateId
112
     *
113
     * @throws ReflectionException
114
     */
115
    protected function getEmptyAggregate($aggregateId) : AggregateRootInterface
116
    {
117
        /** @var AggregateRootInterface $aggregate */
118
        $aggregate = (new ReflectionClass($this->aggregateClass))->newInstanceWithoutConstructor();
119
        $this->aggregateAccessor->setAggregateId($aggregate, $aggregateId);
120
        return $aggregate;
121
    }
122
123
    /**
124
     * @param mixed $aggregateId
125
     *
126
     * @throws ReflectionException
127
     */
128
    protected function buildAggregate($aggregateId) : AggregateRootInterface
129
    {
130
        return $this->getEmptyAggregate($aggregateId);
131
    }
132
133
    /**
134
     * @throws EventPersisterException
135
     * @throws NonWritableEvent
136
     */
137
    public function store(AggregateRootInterface &$aggregateRoot) : void
138
    {
139
        // compute streamName based on stream strategy
140
        $streamName = $this->getStreamNameForAggregateId($aggregateRoot->getAggregateId());
141
        // get beforeWrite callback
142
        $beforeWrite = [$this, 'beforeWrite'];
143
144
        /** @var AggregateRoot $aggregateRoot */
145
        $stream = new StorableStream(
146
            $streamName,
147
            $aggregateRoot->getRecordedEvents(),
148
            static function (EventInterface $event) use ($beforeWrite, $aggregateRoot) : void {
149
                $beforeWrite($aggregateRoot, $event);
150
            }
151
        );
152
153
        foreach ($stream->getIterator() as $event) {
154
            $this->writeLoopIteration($aggregateRoot, $event);
155
            // dispatch event
156
            if (empty($this->messageBus)) {
157
                continue;
158
            }
159
            $this->messageBus->dispatch($event);
160
        }
161
    }
162
163
    /**
164
     * @throws EventPersisterException
165
     */
166
    protected function writeLoopIteration(AggregateRootInterface &$aggregateRoot, EventInterface &$event) : void
167
    {
168
        // next sequence number
169
        $nextSequence = $aggregateRoot->getSequence() + 1;
170
        // set event sequence
171
        $this->eventAccessor->setSequence($event, $nextSequence);
172
        // set event aggregate class
173
        $this->eventAccessor->setAggregateClass($event, $this->aggregateClass);
174
        // save event
175
        $committedEvent = $this->persist($event);
176
        // remove from recorded events
177
        $this->aggregateAccessor->shiftEvent($aggregateRoot);
178
        // if persistence works, then increment aggregate sequence
179
        $this->aggregateAccessor->setVersionSequence($aggregateRoot, $nextSequence);
180
        // apply event
181
        $aggregateRoot->apply($committedEvent);
182
        // do something with the updated aggregate like snapShotting
183
        $this->afterWrite($aggregateRoot, $committedEvent);
184
    }
185
186
    /**
187
     * @throws EventPersisterException
188
     */
189
    protected function persist(EventInterface $event) : EventInterface
190
    {
191
        try {
192
            return clone $this->eventPersister->persist($event);
193
        } catch (Throwable $exception) {
194
            throw new EventPersisterException($exception->getMessage());
195
        }
196
    }
197
198
    /**
199
     * @param mixed $aggregateId
200
     *
201
     * @throws NonWritableEvent
202
     */
203
    protected function beforeWrite(AggregateRootInterface $aggregateRoot, EventInterface $event) : void
204
    {
205
        if ($event->getAggregateId() !== $aggregateRoot->getAggregateId()) {
206
            throw new NonWritableEvent(sprintf(
207
                'Event aggregate Id not expected. Expected: %s . Found: %s',
208
                $aggregateRoot->getAggregateId(),
209
                $event->getAggregateId()
210
            ));
211
        }
212
        if (! empty($event->getSequence())) {
213
            throw new NonWritableEvent('Sequence number should be empty. Found: ' . $event->getSequence());
214
        }
215
        if (! empty($event->getStreamName())) {
216
            throw new NonWritableEvent('Stream name should be empty. Found: ' . $event->getStreamName());
217
        }
218
    }
219
220
    protected function afterWrite(AggregateRootInterface $aggregateRoot, EventInterface $event) : void
0 ignored issues
show
Unused Code introduced by
The parameter $aggregateRoot is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

220
    protected function afterWrite(/** @scrutinizer ignore-unused */ AggregateRootInterface $aggregateRoot, EventInterface $event) : void

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
Unused Code introduced by
The parameter $event is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

220
    protected function afterWrite(AggregateRootInterface $aggregateRoot, /** @scrutinizer ignore-unused */ EventInterface $event) : void

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
221
    {
222
    }
223
224
    /**
225
     * @throws CorruptedReadEvent
226
     */
227
    protected function beforeRead(AggregateRootInterface $aggregateRoot, EventInterface $event) : void
228
    {
229
        // ignores not relevant events
230
        if ($event->getAggregateId() !== $aggregateRoot->getAggregateId()) {
231
            throw new CorruptedReadEvent(sprintf(
232
                'Event aggregate Id not expected. Expected: %s . Found: %s',
233
                $aggregateRoot->getAggregateId(),
234
                $event->getAggregateId()
235
            ));
236
        }
237
238
        // ignore event with bad type
239
        if ($this->aggregateClass !== $event->getAggregateClass()) {
240
            throw new CorruptedReadEvent(sprintf(
241
                'Event aggregate Class not expected. Expected: %s . Found: %s',
242
                $this->aggregateClass,
243
                $event->getAggregateClass()
244
            ));
245
        }
246
    }
247
248
    protected function afterRead(AggregateRootInterface $aggregateRoot, EventInterface $event) : void
0 ignored issues
show
Unused Code introduced by
The parameter $event is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

248
    protected function afterRead(AggregateRootInterface $aggregateRoot, /** @scrutinizer ignore-unused */ EventInterface $event) : void

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
Unused Code introduced by
The parameter $aggregateRoot is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

248
    protected function afterRead(/** @scrutinizer ignore-unused */ AggregateRootInterface $aggregateRoot, EventInterface $event) : void

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
249
    {
250
    }
251
252
    public function load() : Stream
253
    {
254
        return null;
0 ignored issues
show
Bug Best Practice introduced by
The expression return null returns the type null which is incompatible with the type-hinted return Blixit\EventSourcing\Stream\Stream.
Loading history...
255
    }
256
}
257