Completed
Push — master ( da3993...4a1e8e )
by Julián
02:25
created

InMemoryEventStore   A

Complexity

Total Complexity 10

Size/Duplication

Total Lines 88
Duplicated Lines 0 %

Importance

Changes 2
Bugs 0 Features 0
Metric Value
eloc 21
c 2
b 0
f 0
dl 0
loc 88
rs 10
wmc 10

6 Methods

Rating   Name   Duplication   Size   Complexity  
A loadEvents() 0 16 3
A getStreamId() 0 3 1
A getStreamVersion() 0 13 2
A storeEvents() 0 9 2
A createStream() 0 3 1
A streamExists() 0 3 1
1
<?php
2
3
/*
4
 * event-sourcing (https://github.com/phpgears/event-sourcing).
5
 * Event Sourcing base.
6
 *
7
 * @license MIT
8
 * @link https://github.com/phpgears/event-sourcing
9
 * @author Julián Gutiérrez <[email protected]>
10
 */
11
12
declare(strict_types=1);
13
14
namespace Gears\EventSourcing\Store\Event;
15
16
use Gears\EventSourcing\Aggregate\AggregateVersion;
17
use Gears\EventSourcing\Event\AggregateEvent;
18
use Gears\EventSourcing\Event\AggregateEventEmptyStream;
19
use Gears\EventSourcing\Event\AggregateEventIteratorStream;
20
use Gears\EventSourcing\Event\AggregateEventStream;
21
use Gears\EventSourcing\Store\StoreStream;
22
23
/**
24
 * In memory Event Store implementation.
25
 */
26
final class InMemoryEventStore extends AbstractEventStore
27
{
28
    /**
29
     * AggregateEvents streams.
30
     *
31
     * @var array<string, array<int, AggregateEvent[]>>
32
     */
33
    private $streams = [];
34
35
    /**
36
     * {@inheritdoc}
37
     */
38
    protected function loadEvents(
39
        StoreStream $stream,
40
        AggregateVersion $fromVersion,
41
        ?AggregateVersion $toVersion = null
42
    ): AggregateEventStream {
43
        $streamId = $this->getStreamId($stream);
44
        if (!isset($this->streams[$streamId][$fromVersion->getValue()])) {
45
            // @codeCoverageIgnoreStart
46
            return new AggregateEventEmptyStream();
47
            // @codeCoverageIgnoreEnd
48
        }
49
50
        $length = $toVersion !== null ? $toVersion->getValue() - $fromVersion->getValue() + 1 : null;
51
        $events = \array_slice($this->streams[$streamId], $fromVersion->getValue() - 1, $length);
52
53
        return new AggregateEventIteratorStream((new \ArrayObject($events))->getIterator());
54
    }
55
56
    /**
57
     * {@inheritdoc}
58
     */
59
    protected function storeEvents(StoreStream $stream, AggregateEventStream $eventStream): void
60
    {
61
        $streamId = $this->getStreamId($stream);
62
63
        foreach ($eventStream as $aggregateEvent) {
64
            $this->streams[$streamId][$aggregateEvent->getAggregateVersion()->getValue()] = $aggregateEvent;
65
        }
66
67
        \ksort($this->streams[$streamId]);
68
    }
69
70
    /**
71
     * {@inheritdoc}
72
     */
73
    protected function streamExists(StoreStream $stream): bool
74
    {
75
        return isset($this->streams[$this->getStreamId($stream)]);
76
    }
77
78
    /**
79
     * {@inheritdoc}
80
     */
81
    protected function createStream(StoreStream $stream): void
82
    {
83
        $this->streams[$this->getStreamId($stream)] = [];
84
    }
85
86
    /**
87
     * {@inheritdoc}
88
     */
89
    protected function getStreamVersion(StoreStream $stream): AggregateVersion
90
    {
91
        $streamId = $this->getStreamId($stream);
92
93
        if (\count($this->streams[$streamId]) === 0) {
94
            return new AggregateVersion(0);
95
        }
96
97
        $versions = \array_keys($this->streams[$streamId]);
98
        /** @var int $version */
99
        $version = \end($versions);
100
101
        return new AggregateVersion($version);
102
    }
103
104
    /**
105
     * Get stream identifier.
106
     *
107
     * @param StoreStream $stream
108
     *
109
     * @return string
110
     */
111
    private function getStreamId(StoreStream $stream): string
112
    {
113
        return $stream->getAggregateId()->getValue();
114
    }
115
}
116