AbstractEventStore::appendStoredEvents()
last analyzed

Size

Total Lines 1

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 1
ccs 0
cts 0
cp 0
c 0
b 0
f 0
nc 1
1
<?php
2
3
namespace DDDominio\EventSourcing\EventStore;
4
5
use DDDominio\EventSourcing\Common\DomainEvent;
6
use DDDominio\EventSourcing\Common\EventStreamInterface;
7
use DDDominio\EventSourcing\Serialization\SerializerInterface;
8
use DDDominio\EventSourcing\Versioning\EventUpgraderInterface;
9
use DDDominio\EventSourcing\Versioning\UpgradableEventStoreInterface;
10
use DDDominio\EventSourcing\Versioning\Version;
11
use Ramsey\Uuid\Uuid;
12
13
abstract class AbstractEventStore implements EventStoreInterface, UpgradableEventStoreInterface
14
{
15
    /**
16
     * @var SerializerInterface
17
     */
18
    private $serializer;
19
20
    /**
21
     * @var EventUpgraderInterface
22
     */
23
    private $eventUpgrader;
24
25
    /**
26
     * @var EventStoreListenerInterface[]|callable[]
27
     */
28
    private $eventListeners;
29
30
    /**
31
     * @param SerializerInterface $serializer
32
     * @param EventUpgraderInterface $eventUpgrader
33
     */
34 79
    public function __construct(
35
        SerializerInterface $serializer,
36
        EventUpgraderInterface $eventUpgrader
37
    ) {
38 79
        $this->serializer = $serializer;
39 79
        $this->eventUpgrader = $eventUpgrader;
40 79
    }
41
42
    /**
43
     * @param string $streamId
44
     * @param DomainEvent[] $events
45
     * @param int $expectedVersion
46
     * @throws ConcurrencyException
47
     * @throws EventStreamDoesNotExistException
48
     */
49 44
    public function appendToStream($streamId, $events, $expectedVersion = self::EXPECTED_VERSION_EMPTY_STREAM)
50
    {
51 44
        if ($this->streamExists($streamId)) {
52 9
            $this->assertOptimisticConcurrency($streamId, $expectedVersion);
53
        } else {
54 40
            $this->assertEventStreamExistence($streamId, $expectedVersion);
55
        }
56 40
        $this->executeEventListeners($events, EventStoreEvents::PRE_APPEND);
57 40
        $this->appendStoredEvents(
58
            $streamId,
59 40
            $this->storedEventsFromEvents($streamId, $events),
60
            $expectedVersion
61
        );
62 38
        $this->executeEventListeners($events, EventStoreEvents::POST_APPEND);
63 38
    }
64
65
    /**
66
     * @param EventStreamInterface $eventStream
67
     * @return EventStreamInterface
68
     */
69
    protected function domainEventStreamFromStoredEvents($eventStream)
70
    {
71 43
        return $eventStream->map(function (StoredEvent $storedEvent) {
72 38
            $this->eventUpgrader->migrate($storedEvent);
73 38
            return new DomainEvent(
74 38
                $this->serializer->deserialize($storedEvent->data(), $storedEvent->type()),
75 38
                json_decode($storedEvent->metadata(), true),
76 38
                $storedEvent->occurredOn(),
77 38
                $storedEvent->version()
78
            );
79 43
        });
80
    }
81
82
    /**
83
     * @param string $streamId
84
     * @param int $expectedVersion
85
     * @throws ConcurrencyException
86
     */
87 9
    private function assertOptimisticConcurrency($streamId, $expectedVersion)
88
    {
89 9
        if ($expectedVersion !== self::EXPECTED_VERSION_ANY
90 9
            && $expectedVersion !== $this->streamVersion($streamId)) {
91 3
            throw ConcurrencyException::fromVersions(
92 3
                $this->streamVersion($streamId),
93
                $expectedVersion
94
            );
95
        }
96 6
    }
97
98
    /**
99
     * @param string $streamId
100
     * @param int $expectedVersion
101
     * @throws EventStreamDoesNotExistException
102
     */
103 40
    private function assertEventStreamExistence($streamId, $expectedVersion)
104
    {
105 40
        if ($expectedVersion > 0) {
106 3
            throw EventStreamDoesNotExistException::fromStreamId($streamId);
107
        }
108 37
    }
109
110
    /**
111
     * @param string $streamId
112
     * @param DomainEvent[] $events
113
     * @return array
114
     */
115
    private function storedEventsFromEvents($streamId, $events)
116
    {
117 40
        return array_map(function (DomainEvent $event) use ($streamId) {
118 40
            return new StoredEvent(
119 40
                $this->nextStoredEventId(),
120
                $streamId,
121 40
                get_class($event->data()),
122 40
                $this->serializer->serialize($event->data()),
123 40
                json_encode($event->metadata()->all(), JSON_FORCE_OBJECT),
124 40
                $event->occurredOn(),
125 40
                is_null($event->version()) ? Version::fromString('1.0') : $event->version()
126
            );
127 40
        }, $events);
128
    }
129
130
    /**
131
     * @return string
132
     */
133 40
    protected function nextStoredEventId()
134
    {
135 40
        return Uuid::uuid4()->toString();
136
    }
137
138
    /**
139
     * @param string $type
140
     * @param Version $from
141
     * @param Version $to
142
     */
143 3
    public function migrate($type, $from, $to)
144
    {
145 3
        foreach ($this->readStoredEventsOfTypeAndVersion($type, $from) as $event) {
146 3
            $this->eventUpgrader->migrate($event, $to);
147
        }
148 3
    }
149
150
    /**
151
     * @param $eventStoreEvent
152
     * @param EventStoreListenerInterface|callable $eventStoreListener
153
     */
154 4
    public function addEventListener($eventStoreEvent, $eventStoreListener)
155
    {
156 4
        $this->eventListeners[$eventStoreEvent][] = $eventStoreListener;
157 4
    }
158
159
    /**
160
     * @param string $streamId
161
     * @param StoredEvent[] $storedEvents
162
     * @param int $expectedVersion
163
     */
164
    abstract protected function appendStoredEvents($streamId, $storedEvents, $expectedVersion);
165
166
    /**
167
     * @param string $streamId
168
     * @return bool
169
     */
170
    abstract protected function streamExists($streamId);
171
172
    /**
173
     * @param string $streamId
174
     * @return int
175
     */
176
    abstract protected function streamVersion($streamId);
177
178
    /**
179
     * @param string $type
180
     * @param Version $version
181
     * @return EventStreamInterface
182
     */
183
    abstract protected function readStoredEventsOfTypeAndVersion($type, $version);
184
185
    /**
186
     * @param DomainEvent[] $events
187
     * @param string $eventStoreEvent
188
     */
189 40
    protected function executeEventListeners($events, $eventStoreEvent)
190
    {
191 40
        if (isset($this->eventListeners[$eventStoreEvent])) {
192 4
            foreach ($this->eventListeners[$eventStoreEvent] as $eventListener) {
0 ignored issues
show
Bug introduced by
The expression $this->eventListeners[$eventStoreEvent] of type callable is not traversable.
Loading history...
193 4
                if ($eventListener instanceof EventStoreListenerInterface) {
194 1
                    $eventListener->handle($events);
195
                } else {
196 4
                    $eventListener($events);
197
                }
198
            }
199
        }
200 40
    }
201
}
202