InMemoryEventStore::storeEvents()   A
last analyzed

Complexity

Conditions 2
Paths 2

Size

Total Lines 9
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 2
Bugs 0 Features 0
Metric Value
cc 2
eloc 4
c 2
b 0
f 0
nc 2
nop 2
dl 0
loc 9
rs 10
1
<?php
2
3
/*
4
 * event-sourcing (https://github.com/phpgears/event-sourcing).
5
 * Event Sourcing base.
6
 *
7
 * @license MIT
8
 * @link https://github.com/phpgears/event-sourcing
9
 * @author Julián Gutiérrez <[email protected]>
10
 */
11
12
declare(strict_types=1);
13
14
namespace Gears\EventSourcing\Store\Event;
15
16
use Gears\EventSourcing\Aggregate\AggregateVersion;
17
use Gears\EventSourcing\Event\AggregateEvent;
18
use Gears\EventSourcing\Event\AggregateEventEmptyStream;
19
use Gears\EventSourcing\Event\AggregateEventIteratorStream;
20
use Gears\EventSourcing\Event\AggregateEventStream;
21
use Gears\EventSourcing\Store\StoreStream;
22
23
/**
24
 * In memory Event Store implementation.
25
 */
26
final class InMemoryEventStore extends AbstractEventStore
27
{
28
    /**
29
     * AggregateEvents streams.
30
     *
31
     * @var array<string, array<int, AggregateEvent[]>>
32
     */
33
    private $streams = [];
34
35
    /**
36
     * {@inheritdoc}
37
     */
38
    protected function loadEvents(
39
        StoreStream $stream,
40
        AggregateVersion $fromVersion,
41
        ?AggregateVersion $toVersion = null
42
    ): AggregateEventStream {
43
        $streamId = $this->getStreamId($stream);
44
        if (!isset($this->streams[$streamId][$fromVersion->getValue()])) {
45
            // @codeCoverageIgnoreStart
46
            return new AggregateEventEmptyStream();
47
            // @codeCoverageIgnoreEnd
48
        }
49
50
        $length = $toVersion !== null ? $toVersion->getValue() - $fromVersion->getValue() + 1 : null;
51
        $events = \array_slice($this->streams[$streamId], $fromVersion->getValue() - 1, $length);
52
53
        return new AggregateEventIteratorStream(new \ArrayIterator($events));
54
    }
55
56
    /**
57
     * {@inheritdoc}
58
     */
59
    protected function storeEvents(StoreStream $stream, AggregateEventStream $eventStream): void
60
    {
61
        $streamId = $this->getStreamId($stream);
62
63
        foreach ($eventStream as $aggregateEvent) {
64
            $this->streams[$streamId][$aggregateEvent->getAggregateVersion()->getValue()] = $aggregateEvent;
65
        }
66
67
        \ksort($this->streams[$streamId]);
68
    }
69
70
    /**
71
     * {@inheritdoc}
72
     */
73
    protected function streamExists(StoreStream $stream): bool
74
    {
75
        return isset($this->streams[$this->getStreamId($stream)]);
76
    }
77
78
    /**
79
     * {@inheritdoc}
80
     */
81
    protected function createStream(StoreStream $stream): void
82
    {
83
        $this->streams[$this->getStreamId($stream)] = [];
84
    }
85
86
    /**
87
     * {@inheritdoc}
88
     */
89
    protected function getStreamVersion(StoreStream $stream): AggregateVersion
90
    {
91
        $streamId = $this->getStreamId($stream);
92
93
        if (\count($this->streams[$streamId]) === 0) {
94
            return new AggregateVersion(0);
95
        }
96
97
        $versions = \array_keys($this->streams[$streamId]);
98
        /** @var int $version */
99
        $version = \end($versions);
100
101
        return new AggregateVersion($version);
102
    }
103
104
    /**
105
     * Get stream identifier.
106
     *
107
     * @param StoreStream $stream
108
     *
109
     * @return string
110
     */
111
    private function getStreamId(StoreStream $stream): string
112
    {
113
        return $stream->getAggregateId()->getValue();
114
    }
115
}
116