Completed
Push — master ( e6854c...b0ed9c )
by Julián
05:36
created

AbstractEventStore::loadTo()   A

Complexity

Conditions 5
Paths 5

Size

Total Lines 32
Code Lines 16

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 5
eloc 16
c 1
b 0
f 0
nc 5
nop 3
dl 0
loc 32
rs 9.4222
1
<?php
2
3
/*
4
 * event-sourcing (https://github.com/phpgears/event-sourcing).
5
 * Event Sourcing base.
6
 *
7
 * @license MIT
8
 * @link https://github.com/phpgears/event-sourcing
9
 * @author Julián Gutiérrez <[email protected]>
10
 */
11
12
declare(strict_types=1);
13
14
namespace Gears\EventSourcing\Store\Event;
15
16
use Gears\EventSourcing\Aggregate\AggregateVersion;
17
use Gears\EventSourcing\Event\AggregateEventEmptyStream;
18
use Gears\EventSourcing\Event\AggregateEventStream;
19
use Gears\EventSourcing\Store\Event\Exception\ConcurrencyException;
20
use Gears\EventSourcing\Store\Event\Exception\EventStoreException;
21
use Gears\EventSourcing\Store\StoreStream;
22
23
/**
24
 * In memory Event Store implementation.
25
 */
26
abstract class AbstractEventStore implements EventStore
27
{
28
    /**
29
     * {@inheritdoc}
30
     */
31
    public function loadFrom(
32
        StoreStream $stream,
33
        AggregateVersion $fromVersion,
34
        ?int $count = null
35
    ): AggregateEventStream {
36
        if ($fromVersion->getValue() < 1) {
37
            throw new EventStoreException(
38
                \sprintf('Event store load from version must be at least 1, "%s" given', $fromVersion->getValue())
39
            );
40
        }
41
        if ($count !== null && $count < 1) {
42
            throw new EventStoreException(\sprintf('Event store load count must be at least 1, "%s" given', $count));
43
        }
44
45
        if (!$this->streamExists($stream)) {
46
            $this->createStream($stream);
47
48
            return new AggregateEventEmptyStream();
49
        }
50
51
        return $this->loadEventsFrom($stream, $fromVersion, $count);
52
    }
53
54
    /**
55
     * {@inheritdoc}
56
     */
57
    public function loadTo(
58
        StoreStream $stream,
59
        AggregateVersion $toVersion,
60
        ?AggregateVersion $fromVersion = null
61
    ): AggregateEventStream {
62
        if ($toVersion->getValue() < 1) {
63
            throw new EventStoreException(
64
                \sprintf('Event store load to version must be at least 1, "%s" given', $toVersion->getValue())
65
            );
66
        }
67
68
        $fromVersion = $fromVersion ?? new AggregateVersion(1);
69
        if ($fromVersion->getValue() < 1) {
70
            throw new EventStoreException(
71
                \sprintf('Event store load from version must be at least 1, "%s" given', $fromVersion->getValue())
72
            );
73
        }
74
        if ($fromVersion->getValue() > $toVersion->getValue()) {
75
            throw new EventStoreException(\sprintf(
76
                'Event store load to version "%s" must be greater than from version "%s"',
77
                $toVersion->getValue(),
78
                $fromVersion->getValue()
79
            ));
80
        }
81
82
        if (!$this->streamExists($stream)) {
83
            $this->createStream($stream);
84
85
            return new AggregateEventEmptyStream();
86
        }
87
88
        return $this->loadEventsTo($stream, $toVersion, $fromVersion);
89
    }
90
91
    /**
92
     * {@inheritdoc}
93
     */
94
    public function store(
95
        StoreStream $stream,
96
        AggregateEventStream $eventStream,
97
        AggregateVersion $expectedVersion
98
    ): void {
99
        if ($eventStream->count() === 0) {
100
            return;
101
        }
102
        $eventStream->rewind();
103
104
        if (!$this->streamExists($stream)) {
105
            $this->createStream($stream);
106
        }
107
108
        $startVersion = $this->getStreamVersion($stream);
109
        if (!$startVersion->isEqualTo($expectedVersion)) {
110
            throw new ConcurrencyException(\sprintf(
111
                'Expected stream version "%s" does not match current version "%s"',
112
                $expectedVersion->getValue(),
113
                $startVersion->getValue()
114
            ));
115
        }
116
117
        $this->storeEvents($stream, $eventStream, $expectedVersion);
118
119
        $eventStream->rewind();
120
        $events = \iterator_to_array($eventStream);
121
        /** @var AggregateVersion $finalVersion */
122
        $finalVersion = \end($events)->getAggregateVersion();
123
124
        $endVersion = $this->getStreamVersion($stream);
125
        if (!$endVersion->isEqualTo($finalVersion)) {
126
            throw new ConcurrencyException(\sprintf(
127
                'Expected final stream version "%s" does not match current version "%s"',
128
                $finalVersion->getValue(),
129
                $endVersion->getValue()
130
            ));
131
        }
132
    }
133
134
    /**
135
     * Check stream existence.
136
     *
137
     * @param StoreStream $stream
138
     *
139
     * @return bool
140
     */
141
    abstract protected function streamExists(StoreStream $stream): bool;
142
143
    /**
144
     * Create stream.
145
     *
146
     * @param StoreStream $stream
147
     */
148
    abstract protected function createStream(StoreStream $stream): void;
149
150
    /**
151
     * Load aggregate events from a version.
152
     *
153
     * @param StoreStream      $stream
154
     * @param AggregateVersion $fromVersion
155
     * @param int|null         $count
156
     *
157
     * @return AggregateEventStream
158
     */
159
    abstract protected function loadEventsFrom(
160
        StoreStream $stream,
161
        AggregateVersion $fromVersion,
162
        ?int $count = null
163
    ): AggregateEventStream;
164
165
    /**
166
     * Load aggregate events up to a version.
167
     *
168
     * @param StoreStream      $stream
169
     * @param AggregateVersion $toVersion
170
     * @param AggregateVersion $fromVersion
171
     *
172
     * @return AggregateEventStream
173
     */
174
    abstract protected function loadEventsTo(
175
        StoreStream $stream,
176
        AggregateVersion $toVersion,
177
        AggregateVersion $fromVersion
178
    ): AggregateEventStream;
179
180
    /**
181
     * Append events to store.
182
     *
183
     * @param StoreStream          $stream
184
     * @param AggregateEventStream $eventStream
185
     * @param AggregateVersion     $expectedVersion
186
     */
187
    abstract protected function storeEvents(
188
        StoreStream $stream,
189
        AggregateEventStream $eventStream,
190
        AggregateVersion $expectedVersion
191
    ): void;
192
193
    /**
194
     * Get current stream version.
195
     *
196
     * @param StoreStream $stream
197
     *
198
     * @return AggregateVersion
199
     */
200
    abstract protected function getStreamVersion(StoreStream $stream): AggregateVersion;
201
}
202