EventStore::afterRead()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 2
Code Lines 0

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 0
nc 1
nop 2
dl 0
loc 2
rs 10
c 0
b 0
f 0
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Blixit\EventSourcing\Store;
6
7
use Blixit\EventSourcing\Aggregate\AggregateAccessor;
8
use Blixit\EventSourcing\Aggregate\AggregateRoot;
9
use Blixit\EventSourcing\Aggregate\AggregateRootInterface;
10
use Blixit\EventSourcing\Event\EventAccessor;
11
use Blixit\EventSourcing\Event\EventInterface;
12
use Blixit\EventSourcing\Messaging\DispatcherInterface;
13
use Blixit\EventSourcing\Store\Exception\CorruptedReadEvent;
14
use Blixit\EventSourcing\Store\Exception\NonWritableEvent;
15
use Blixit\EventSourcing\Store\Matcher\EventMatcher;
16
use Blixit\EventSourcing\Store\Matcher\EventMatcherInterface;
17
use Blixit\EventSourcing\Store\Matcher\MatcherInterface;
18
use Blixit\EventSourcing\Store\Persistence\EventPersisterException;
19
use Blixit\EventSourcing\Store\Persistence\EventPersisterInterface;
20
use Blixit\EventSourcing\Stream\ReadableStream;
21
use Blixit\EventSourcing\Stream\StorableStream;
22
use Blixit\EventSourcing\Stream\Strategy\StreamStrategy;
23
use Blixit\EventSourcing\Stream\Stream;
24
use Blixit\EventSourcing\Stream\StreamName;
25
use ReflectionClass;
26
use ReflectionException;
27
use Throwable;
28
use function sprintf;
29
30
class EventStore implements EventStoreInterface
31
{
32
    /** @var EventPersisterInterface $eventPersister */
33
    protected $eventPersister;
34
35
    /** @var string $aggregateClass */
36
    private $aggregateClass;
37
38
    /** @var StreamStrategy $streamStrategy */
39
    private $streamStrategy;
40
41
    /** @var DispatcherInterface $messageBus */
42
    private $messageBus;
43
44
    /** @var AggregateAccessor $aggregateAccessor */
45
    private $aggregateAccessor;
46
47
    /** @var EventAccessor $eventAccessor */
48
    private $eventAccessor;
49
50
    public function __construct(
51
        string $aggregateClass,
52
        EventPersisterInterface $eventPersister,
53
        string $streamStrategyClass,
54
        ?DispatcherInterface $messageBus = null
55
    ) {
56
        $this->eventPersister = $eventPersister;
57
        $this->aggregateClass = $aggregateClass;
58
        $this->streamStrategy = StreamStrategy::resolveStreamStrategy($streamStrategyClass);
59
60
        $this->messageBus = $messageBus;
61
62
        $this->aggregateAccessor = AggregateAccessor::getInstance();
63
        $this->eventAccessor     = EventAccessor::getInstance();
64
    }
65
66
    /**
67
     * @param mixed $aggregateId
68
     */
69
    public function getStreamNameForAggregateId($aggregateId = null) : StreamName
70
    {
71
        // TODO: add hook after stream computeName
72
        return $this->streamStrategy->computeName($this->aggregateClass, $aggregateId);
73
    }
74
75
    /**
76
     * @param mixed $aggregateId
77
     *
78
     * @throws ReflectionException
79
     * @throws CorruptedReadEvent
80
     */
81
    public function get($aggregateId) : ?AggregateRootInterface
82
    {
83
        /** @var AggregateRootInterface $aggregate */
84
        $aggregate = $this->buildAggregate($aggregateId);
85
        // compute streamName based on stream strategy
86
        $streamName = $this->getStreamNameForAggregateId($aggregateId);
87
        // get events from store
88
        $events = $this->eventPersister->getByStream($streamName, $aggregate->getSequence());
89
        // get beforeRead callback
90
        $beforeRead = [$this, 'beforeRead'];
91
        // build stream
92
        $stream = new ReadableStream(
93
            $streamName,
94
            $events,
95
            static function (EventInterface $event) use ($beforeRead, $aggregate) : void {
96
                $beforeRead($aggregate, $event);
97
            }
98
        );
99
100
        // replay events
101
        foreach ($stream->getIterator() as $event) {
102
            /** @var EventInterface $event */
103
            $aggregate->apply($event);
104
            $this->aggregateAccessor->setVersionSequence($aggregate, $event->getSequence());
105
            $this->afterRead($aggregate, $event);
106
        }
107
108
        return $aggregate->getSequence() > 0 ? $aggregate : null;
109
    }
110
111
    /**
112
     * @param mixed $aggregateId
113
     *
114
     * @throws ReflectionException
115
     */
116
    protected function getEmptyAggregate($aggregateId) : AggregateRootInterface
117
    {
118
        /** @var AggregateRootInterface $aggregate */
119
        $aggregate = (new ReflectionClass($this->aggregateClass))->newInstanceWithoutConstructor();
120
        $this->aggregateAccessor->setAggregateId($aggregate, $aggregateId);
121
        return $aggregate;
122
    }
123
124
    /**
125
     * @param mixed $aggregateId
126
     *
127
     * @throws ReflectionException
128
     */
129
    protected function buildAggregate($aggregateId) : AggregateRootInterface
130
    {
131
        return $this->getEmptyAggregate($aggregateId);
132
    }
133
134
    /**
135
     * @throws EventPersisterException
136
     * @throws NonWritableEvent
137
     */
138
    public function store(AggregateRootInterface &$aggregateRoot) : void
139
    {
140
        // compute streamName based on stream strategy
141
        $streamName = $this->getStreamNameForAggregateId($aggregateRoot->getAggregateId());
142
        // get beforeWrite callback
143
        $beforeWrite = [$this, 'beforeWrite'];
144
145
        $lastEvent = $this->eventPersister->getLastEvent($streamName);
146
        if ($lastEvent instanceof EventInterface) {
147
            $this->aggregateAccessor->setVersionSequence($aggregateRoot, $lastEvent->getSequence());
148
        }
149
150
        /** @var AggregateRoot $aggregateRoot */
151
        $stream = new StorableStream(
152
            $streamName,
153
            $aggregateRoot->getRecordedEvents(),
154
            static function (EventInterface $event) use ($beforeWrite, $aggregateRoot) : void {
155
                $beforeWrite($aggregateRoot, $event);
156
            }
157
        );
158
159
        foreach ($stream->getIterator() as $event) {
160
            $this->writeLoopIteration($aggregateRoot, $event);
161
            // dispatch event
162
            if (empty($this->messageBus)) {
163
                continue;
164
            }
165
            $this->messageBus->dispatch($event);
166
        }
167
    }
168
169
    /**
170
     * @throws EventPersisterException
171
     */
172
    protected function writeLoopIteration(AggregateRootInterface &$aggregateRoot, EventInterface &$event) : void
173
    {
174
        // next sequence number
175
        $nextSequence = $aggregateRoot->getSequence() + 1;
176
        // set event sequence
177
        $this->eventAccessor->setSequence($event, $nextSequence);
178
        // set event aggregate class
179
        $this->eventAccessor->setAggregateClass($event, $this->aggregateClass);
180
        // save event
181
        $committedEvent = $this->persist($event);
182
        // remove from recorded events
183
        $this->aggregateAccessor->shiftEvent($aggregateRoot);
184
        // if persistence works, then increment aggregate sequence
185
        $this->aggregateAccessor->setVersionSequence($aggregateRoot, $nextSequence);
186
        // apply event
187
        $aggregateRoot->apply($committedEvent);
188
        // do something with the updated aggregate like snapShotting
189
        $this->afterWrite($aggregateRoot, $committedEvent);
190
    }
191
192
    /**
193
     * @throws EventPersisterException
194
     */
195
    protected function persist(EventInterface $event) : EventInterface
196
    {
197
        try {
198
            return clone $this->eventPersister->persist($event);
199
        } catch (Throwable $exception) {
200
            throw new EventPersisterException($exception->getMessage());
201
        }
202
    }
203
204
    /**
205
     * @param mixed $aggregateId
206
     *
207
     * @throws NonWritableEvent
208
     */
209
    protected function beforeWrite(AggregateRootInterface $aggregateRoot, EventInterface $event) : void
210
    {
211
        if ($event->getAggregateId() !== $aggregateRoot->getAggregateId()) {
212
            throw new NonWritableEvent(sprintf(
213
                'Event aggregate Id not expected. Expected: %s . Found: %s',
214
                $aggregateRoot->getAggregateId(),
215
                $event->getAggregateId()
216
            ));
217
        }
218
        if (! empty($event->getSequence())) {
219
            throw new NonWritableEvent('Sequence number should be empty. Found: ' . $event->getSequence());
220
        }
221
        if (! empty($event->getStreamName())) {
222
            throw new NonWritableEvent('Stream name should be empty. Found: ' . $event->getStreamName());
223
        }
224
    }
225
226
    protected function afterWrite(AggregateRootInterface $aggregateRoot, EventInterface $event) : void
0 ignored issues
show
Unused Code introduced by
The parameter $aggregateRoot is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

226
    protected function afterWrite(/** @scrutinizer ignore-unused */ AggregateRootInterface $aggregateRoot, EventInterface $event) : void

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
Unused Code introduced by
The parameter $event is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

226
    protected function afterWrite(AggregateRootInterface $aggregateRoot, /** @scrutinizer ignore-unused */ EventInterface $event) : void

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
227
    {
228
    }
229
230
    /**
231
     * @throws CorruptedReadEvent
232
     */
233
    protected function beforeRead(AggregateRootInterface $aggregateRoot, EventInterface $event) : void
234
    {
235
        // ignores not relevant events
236
        if ($event->getAggregateId() !== $aggregateRoot->getAggregateId()) {
237
            throw new CorruptedReadEvent(sprintf(
238
                'Event aggregate Id not expected. Expected: %s . Found: %s',
239
                $aggregateRoot->getAggregateId(),
240
                $event->getAggregateId()
241
            ));
242
        }
243
244
        // ignore event with bad type
245
        if ($this->aggregateClass !== $event->getAggregateClass()) {
246
            throw new CorruptedReadEvent(sprintf(
247
                'Event aggregate Class not expected. Expected: %s . Found: %s',
248
                $this->aggregateClass,
249
                $event->getAggregateClass()
250
            ));
251
        }
252
    }
253
254
    protected function afterRead(AggregateRootInterface $aggregateRoot, EventInterface $event) : void
0 ignored issues
show
Unused Code introduced by
The parameter $event is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

254
    protected function afterRead(AggregateRootInterface $aggregateRoot, /** @scrutinizer ignore-unused */ EventInterface $event) : void

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
Unused Code introduced by
The parameter $aggregateRoot is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

254
    protected function afterRead(/** @scrutinizer ignore-unused */ AggregateRootInterface $aggregateRoot, EventInterface $event) : void

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
255
    {
256
    }
257
258
    public function load(EventMatcherInterface $matcher) : Stream
259
    {
260
        // compute streamName based on stream strategy
261
        $streamName = $this->getStreamNameForAggregateId($matcher->getAggregateId());
262
        // get events from store
263
        $events = $this->eventPersister->find($matcher);
264
        // build stream
265
        return new ReadableStream($streamName, $events);
266
    }
267
}
268