Completed
Push — master ( 4bac89...da3993 )
by Julián
02:37
created

InMemoryEventStore::loadEvents()   B

Complexity

Conditions 8
Paths 6

Size

Total Lines 24
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 8
eloc 10
nc 6
nop 3
dl 0
loc 24
rs 8.4444
c 0
b 0
f 0
1
<?php
2
3
/*
4
 * event-sourcing (https://github.com/phpgears/event-sourcing).
5
 * Event Sourcing base.
6
 *
7
 * @license MIT
8
 * @link https://github.com/phpgears/event-sourcing
9
 * @author Julián Gutiérrez <[email protected]>
10
 */
11
12
declare(strict_types=1);
13
14
namespace Gears\EventSourcing\Store\Event;
15
16
use Gears\EventSourcing\Aggregate\AggregateVersion;
17
use Gears\EventSourcing\Event\AggregateEvent;
18
use Gears\EventSourcing\Event\AggregateEventEmptyStream;
19
use Gears\EventSourcing\Event\AggregateEventIteratorStream;
20
use Gears\EventSourcing\Event\AggregateEventStream;
21
use Gears\EventSourcing\Store\StoreStream;
22
23
/**
24
 * In memory Event Store implementation.
25
 */
26
final class InMemoryEventStore extends AbstractEventStore
27
{
28
    /**
29
     * AggregateEvents streams.
30
     *
31
     * @var array<AggregateEvent[]>
32
     */
33
    private $streams = [];
34
35
    /**
36
     * {@inheritdoc}
37
     */
38
    protected function loadEvents(
39
        StoreStream $stream,
40
        AggregateVersion $fromVersion,
41
        ?AggregateVersion $toVersion = null
42
    ): AggregateEventStream {
43
        $streamId = $this->getStreamId($stream);
44
        if (!isset($this->streams[$streamId][$fromVersion->getValue()])) {
45
            // @codeCoverageIgnoreStart
46
            return new AggregateEventEmptyStream();
47
            // @codeCoverageIgnoreEnd
48
        }
49
50
        $events = new \ArrayObject();
51
        foreach ($this->streams[$streamId] as $version => $event) {
52
            if ($version >= $fromVersion->getValue() && ($toVersion === null || $version <= $toVersion->getValue())) {
53
                $events->append($event);
54
            }
55
56
            if ($toVersion !== null && $version >= $toVersion->getValue()) {
57
                break;
58
            }
59
        }
60
61
        return new AggregateEventIteratorStream($events->getIterator());
62
    }
63
64
    /**
65
     * {@inheritdoc}
66
     */
67
    protected function storeEvents(StoreStream $stream, AggregateEventStream $eventStream): void
68
    {
69
        $streamId = $this->getStreamId($stream);
70
71
        foreach ($eventStream as $aggregateEvent) {
72
            $aggregateVersion = $aggregateEvent->getAggregateVersion();
73
74
            $this->streams[$streamId][$aggregateVersion->getValue()] = $aggregateEvent;
75
        }
76
77
        \ksort($this->streams[$streamId]);
78
    }
79
80
    /**
81
     * {@inheritdoc}
82
     */
83
    protected function streamExists(StoreStream $stream): bool
84
    {
85
        return isset($this->streams[$this->getStreamId($stream)]);
86
    }
87
88
    /**
89
     * {@inheritdoc}
90
     */
91
    protected function createStream(StoreStream $stream): void
92
    {
93
        $this->streams[$this->getStreamId($stream)] = [];
94
    }
95
96
    /**
97
     * {@inheritdoc}
98
     */
99
    protected function getStreamVersion(StoreStream $stream): AggregateVersion
100
    {
101
        $streamId = $this->getStreamId($stream);
102
103
        if (\count($this->streams[$streamId]) === 0) {
104
            return new AggregateVersion(0);
105
        }
106
107
        $versions = \array_keys($this->streams[$streamId]);
108
        /** @var int $version */
109
        $version = \end($versions);
110
111
        return new AggregateVersion($version);
112
    }
113
114
    /**
115
     * Get stream identifier.
116
     *
117
     * @param StoreStream $stream
118
     *
119
     * @return string
120
     */
121
    private function getStreamId(StoreStream $stream): string
122
    {
123
        return $stream->getAggregateId()->getValue();
124
    }
125
}
126