Completed
Push — master ( e6854c...b0ed9c )
by Julián
05:36
created

InMemoryEventStore::loadEventsTo()   A

Complexity

Conditions 6
Paths 6

Size

Total Lines 25
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 6
eloc 10
c 1
b 0
f 0
nc 6
nop 3
dl 0
loc 25
rs 9.2222
1
<?php
2
3
/*
4
 * event-sourcing (https://github.com/phpgears/event-sourcing).
5
 * Event Sourcing base.
6
 *
7
 * @license MIT
8
 * @link https://github.com/phpgears/event-sourcing
9
 * @author Julián Gutiérrez <[email protected]>
10
 */
11
12
declare(strict_types=1);
13
14
namespace Gears\EventSourcing\Store\Event;
15
16
use Gears\EventSourcing\Aggregate\AggregateVersion;
17
use Gears\EventSourcing\Event\AggregateEvent;
18
use Gears\EventSourcing\Event\AggregateEventIteratorStream;
19
use Gears\EventSourcing\Event\AggregateEventStream;
20
use Gears\EventSourcing\Store\Event\Exception\EventStoreException;
21
use Gears\EventSourcing\Store\StoreStream;
22
23
/**
24
 * In memory Event Store implementation.
25
 */
26
final class InMemoryEventStore extends AbstractEventStore
27
{
28
    /**
29
     * AggregateEvents streams.
30
     *
31
     * @var array<AggregateEvent[]>
32
     */
33
    private $streams = [];
34
35
    /**
36
     * {@inheritdoc}
37
     */
38
    protected function loadEventsFrom(
39
        StoreStream $stream,
40
        AggregateVersion $fromVersion,
41
        ?int $count = null
42
    ): AggregateEventStream {
43
        $events = new \ArrayObject();
44
45
        $streamId = $this->getStreamId($stream);
46
        if (!isset($this->streams[$streamId][$fromVersion->getValue()])) {
47
            // @codeCoverageIgnoreStart
48
            return new AggregateEventIteratorStream($events->getIterator());
49
            // @codeCoverageIgnoreEnd
50
        }
51
52
        foreach ($this->streams[$streamId] as $version => $event) {
53
            if ($version >= $fromVersion->getValue()) {
54
                $events->append($event);
55
            }
56
57
            if ($count !== null && \count($events) === $count) {
58
                break;
59
            }
60
        }
61
62
        return new AggregateEventIteratorStream($events->getIterator());
63
    }
64
65
    /**
66
     * {@inheritdoc}
67
     */
68
    protected function loadEventsTo(
69
        StoreStream $stream,
70
        AggregateVersion $toVersion,
71
        AggregateVersion $fromVersion
72
    ): AggregateEventStream {
73
        $events = new \ArrayObject();
74
75
        $streamId = $this->getStreamId($stream);
76
        if (!isset($this->streams[$streamId][$fromVersion->getValue()])) {
77
            // @codeCoverageIgnoreStart
78
            return new AggregateEventIteratorStream($events->getIterator());
79
            // @codeCoverageIgnoreEnd
80
        }
81
82
        foreach ($this->streams[$streamId] as $version => $event) {
83
            if ($version >= $fromVersion->getValue() && $version <= $toVersion->getValue()) {
84
                $events->append($event);
85
            }
86
87
            if ($version >= $toVersion->getValue()) {
88
                break;
89
            }
90
        }
91
92
        return new AggregateEventIteratorStream($events->getIterator());
93
    }
94
95
    /**
96
     * {@inheritdoc}
97
     */
98
    protected function storeEvents(
99
        StoreStream $stream,
100
        AggregateEventStream $eventStream,
101
        AggregateVersion $expectedVersion
102
    ): void {
103
        $lastVersion = $this->getStreamVersion($stream);
104
105
        $streamId = $this->getStreamId($stream);
106
107
        foreach ($eventStream as $aggregateEvent) {
108
            $aggregateVersion = $aggregateEvent->getAggregateVersion();
109
110
            if (!$aggregateVersion->getPrevious()->isEqualTo($lastVersion)) {
111
                throw new EventStoreException(\sprintf(
112
                    'Aggregate event for version "%s" cannot be stored',
113
                    $aggregateVersion->getValue()
114
                ));
115
            }
116
117
            $this->streams[$streamId][$aggregateVersion->getValue()] = $aggregateEvent;
118
119
            $lastVersion = $lastVersion->getNext();
120
        }
121
122
        \ksort($this->streams[$streamId]);
123
    }
124
125
    /**
126
     * {@inheritdoc}
127
     */
128
    protected function getStreamVersion(StoreStream $stream): AggregateVersion
129
    {
130
        $streamId = $this->getStreamId($stream);
131
132
        if (\count($this->streams[$streamId]) === 0) {
133
            return new AggregateVersion(0);
134
        }
135
136
        $versions = \array_keys($this->streams[$streamId]);
137
        /** @var int $version */
138
        $version = \end($versions);
139
140
        return new AggregateVersion($version);
141
    }
142
143
    /**
144
     * {@inheritdoc}
145
     */
146
    protected function streamExists(StoreStream $stream): bool
147
    {
148
        return isset($this->streams[$this->getStreamId($stream)]);
149
    }
150
151
    /**
152
     * {@inheritdoc}
153
     */
154
    protected function createStream(StoreStream $stream): void
155
    {
156
        $this->streams[$this->getStreamId($stream)] = [];
157
    }
158
159
    /**
160
     * Get stream identifier.
161
     *
162
     * @param StoreStream $stream
163
     *
164
     * @return string
165
     */
166
    private function getStreamId(StoreStream $stream): string
167
    {
168
        return $stream->getAggregateId()->getValue();
169
    }
170
}
171