AbstractEventStore::store()   A
last analyzed

Complexity

Conditions 6
Paths 9

Size

Total Lines 34
Code Lines 19

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 6
eloc 19
c 1
b 0
f 0
nc 9
nop 2
dl 0
loc 34
rs 9.0111
1
<?php
2
3
/*
4
 * event-sourcing (https://github.com/phpgears/event-sourcing).
5
 * Event Sourcing base.
6
 *
7
 * @license MIT
8
 * @link https://github.com/phpgears/event-sourcing
9
 * @author Julián Gutiérrez <[email protected]>
10
 */
11
12
declare(strict_types=1);
13
14
namespace Gears\EventSourcing\Store\Event;
15
16
use Gears\EventSourcing\Aggregate\AggregateVersion;
17
use Gears\EventSourcing\Event\AggregateEventEmptyStream;
18
use Gears\EventSourcing\Event\AggregateEventStream;
19
use Gears\EventSourcing\Store\Event\Exception\ConcurrencyException;
20
use Gears\EventSourcing\Store\Event\Exception\EventStoreException;
21
use Gears\EventSourcing\Store\StoreStream;
22
23
/**
24
 * In memory Event Store implementation.
25
 */
26
abstract class AbstractEventStore implements EventStore
27
{
28
    /**
29
     * {@inheritdoc}
30
     */
31
    public function loadFrom(
32
        StoreStream $stream,
33
        AggregateVersion $fromVersion,
34
        ?int $count = null
35
    ): AggregateEventStream {
36
        if ($fromVersion->isEqualTo(new AggregateVersion(0))) {
37
            throw new EventStoreException(
38
                \sprintf('Event store load from version must be at least 1, "%s" given', $fromVersion->getValue())
39
            );
40
        }
41
        if ($count !== null && $count < 1) {
42
            throw new EventStoreException(\sprintf('Event store load count must be at least 1, "%s" given', $count));
43
        }
44
45
        if (!$this->streamExists($stream)) {
46
            return new AggregateEventEmptyStream();
47
        }
48
49
        $toVersion = $count !== null
50
            ? new AggregateVersion($fromVersion->getValue() + $count - 1)
51
            : null;
52
53
        return $this->loadEvents($stream, $fromVersion, $toVersion);
54
    }
55
56
    /**
57
     * {@inheritdoc}
58
     */
59
    public function loadTo(
60
        StoreStream $stream,
61
        AggregateVersion $toVersion,
62
        ?AggregateVersion $fromVersion = null
63
    ): AggregateEventStream {
64
        if ($toVersion->isEqualTo(new AggregateVersion(0))) {
65
            throw new EventStoreException(
66
                \sprintf('Event store load to version must be at least 1, "%s" given', $toVersion->getValue())
67
            );
68
        }
69
70
        $fromVersion = $fromVersion ?? new AggregateVersion(1);
71
        if ($fromVersion->isEqualTo(new AggregateVersion(0))) {
72
            throw new EventStoreException(
73
                \sprintf('Event store load from version must be at least 1, "%s" given', $fromVersion->getValue())
74
            );
75
        }
76
        if ($fromVersion->getValue() > $toVersion->getValue()) {
77
            throw new EventStoreException(\sprintf(
78
                'Event store load to version "%s" must be greater than from version "%s"',
79
                $toVersion->getValue(),
80
                $fromVersion->getValue()
81
            ));
82
        }
83
84
        if (!$this->streamExists($stream)) {
85
            return new AggregateEventEmptyStream();
86
        }
87
88
        return $this->loadEvents($stream, $fromVersion, $toVersion);
89
    }
90
91
    /**
92
     * Get aggregate event stream.
93
     *
94
     * @param StoreStream           $stream
95
     * @param AggregateVersion      $fromVersion
96
     * @param AggregateVersion|null $toVersion
97
     *
98
     * @return AggregateEventStream
99
     */
100
    abstract protected function loadEvents(
101
        StoreStream $stream,
102
        AggregateVersion $fromVersion,
103
        ?AggregateVersion $toVersion = null
104
    ): AggregateEventStream;
105
106
    /**
107
     * {@inheritdoc}
108
     */
109
    public function store(StoreStream $stream, AggregateEventStream $eventStream): void
110
    {
111
        if ($eventStream->count() === 0) {
112
            return;
113
        }
114
115
        if (!$this->streamExists($stream)) {
116
            $this->createStream($stream);
117
        }
118
119
        $eventStream->rewind();
120
        $expectedVersion = $eventStream->current()->getAggregateVersion()->getPrevious();
121
122
        $currentVersion = $this->getStreamVersion($stream);
123
        if (!$currentVersion->isEqualTo($expectedVersion)) {
124
            throw new ConcurrencyException(\sprintf(
125
                'Expected stream version "%s" does not match current version "%s"',
126
                $expectedVersion->getValue(),
127
                $currentVersion->getValue()
128
            ));
129
        }
130
131
        foreach ($eventStream as $aggregateEvent) {
132
            $aggregateVersion = $aggregateEvent->getAggregateVersion();
133
134
            if (!$aggregateVersion->getPrevious()->isEqualTo($currentVersion)) {
135
                throw new ConcurrencyException('Event stream cannot be stored due to versions mismatch');
136
            }
137
138
            $currentVersion = $currentVersion->getNext();
139
        }
140
141
        $eventStream->rewind();
142
        $this->storeEvents($stream, $eventStream);
143
    }
144
145
    /**
146
     * Append events to store.
147
     *
148
     * @param StoreStream          $stream
149
     * @param AggregateEventStream $eventStream
150
     */
151
    abstract protected function storeEvents(StoreStream $stream, AggregateEventStream $eventStream): void;
152
153
    /**
154
     * Check stream existence.
155
     *
156
     * @param StoreStream $stream
157
     *
158
     * @return bool
159
     */
160
    abstract protected function streamExists(StoreStream $stream): bool;
161
162
    /**
163
     * Create stream.
164
     *
165
     * @param StoreStream $stream
166
     */
167
    abstract protected function createStream(StoreStream $stream): void;
168
169
    /**
170
     * Get current stream version.
171
     *
172
     * @param StoreStream $stream
173
     *
174
     * @return AggregateVersion
175
     */
176
    abstract protected function getStreamVersion(StoreStream $stream): AggregateVersion;
177
}
178