InMemoryEventStore::appendStoredEvents()   A
last analyzed

Complexity

Conditions 2
Paths 2

Size

Total Lines 8

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 5
CRAP Score 2

Importance

Changes 0
Metric Value
dl 0
loc 8
ccs 5
cts 5
cp 1
rs 10
c 0
b 0
f 0
cc 2
nc 2
nop 3
crap 2
1
<?php
2
3
namespace DDDominio\EventSourcing\EventStore;
4
5
use DDDominio\EventSourcing\Common\EventStream;
6
use DDDominio\EventSourcing\Common\EventStreamInterface;
7
use DDDominio\EventSourcing\Serialization\SerializerInterface;
8
use DDDominio\EventSourcing\Versioning\EventUpgraderInterface;
9
use DDDominio\EventSourcing\Versioning\Version;
10
11
class InMemoryEventStore extends AbstractEventStore
12
{
13
    /**
14
     * @var IdentifiedEventStream[]
15
     */
16
    private $streams;
17
18
    /**
19
     * @param SerializerInterface $serializer
20
     * @param EventUpgraderInterface $eventUpgrader
21
     * @param IdentifiedEventStream[] $streams
22
     */
23 35
    public function __construct(
24
        SerializerInterface $serializer,
25
        EventUpgraderInterface $eventUpgrader,
26
        array $streams = []
27
    ) {
28 35
        parent::__construct($serializer, $eventUpgrader);
29 35
        $this->streams = $streams;
30 35
    }
31
32
    /**
33
     * @param string $streamId
34
     * @param StoredEvent[] $storedEvents
35
     * @param int $expectedVersion
36
     */
37 14
    protected function appendStoredEvents($streamId, $storedEvents, $expectedVersion)
38
    {
39 14
        if ($this->streamExists($streamId)) {
40 4
            $this->streams[$streamId] = $this->streams[$streamId]->append($storedEvents);
41
        } else {
42 11
            $this->streams[$streamId] = new IdentifiedEventStream($streamId, $storedEvents);
43
        }
44 14
    }
45
46
    /**
47
     * @param string $streamId
48
     * @return EventStreamInterface
49
     */
50 18
    public function readFullStream($streamId)
51
    {
52 18
        if ($this->streamExists($streamId)) {
53 14
            return $this->domainEventStreamFromStoredEvents($this->streams[$streamId]);
54
        } else {
55 4
            return EventStream::buildEmpty();
56
        }
57
    }
58
59
    /**
60
     * @param string $streamId
61
     * @param int $start
62
     * @param int $count
63
     * @return EventStreamInterface
64
     */
65 6
    public function readStreamEvents($streamId, $start = 1, $count = null)
66
    {
67 6
        if (!$this->streamExists($streamId)) {
68 1
            return EventStream::buildEmpty();
69
        }
70
71 5
        return $this->domainEventStreamFromStoredEvents(
72 5
            $this->streams[$streamId]->slice($start - 1, $count)
73
        );
74
    }
75
76
    /**
77
     * @return EventStreamInterface[]
78
     */
79 2
    public function readAllStreams()
80
    {
81 2
        $allStreams = [];
82 2
        foreach ($this->streams as $stream) {
83 1
            $allStreams[] = $this->domainEventStreamFromStoredEvents($stream);
84
        }
85 2
        return $allStreams;
86
    }
87
88
    /**
89
     * @return EventStreamInterface
90
     */
91 1
    public function readAllEvents()
92
    {
93 1
        $allEventsStream = EventStream::buildEmpty();
94 1
        foreach ($this->streams as $stream) {
95 1
            $streamEvents = $this->domainEventStreamFromStoredEvents($stream);
96 1
            $allEventsStream = $allEventsStream->append($streamEvents->events());
97
        }
98 1
        return $allEventsStream;
99
    }
100
101
    /**
102
     * @param string $streamId
103
     * @return int
104
     */
105 4
    protected function streamVersion($streamId)
106
    {
107 4
        return $this->streamExists($streamId) ?
108 4
            $this->streams[$streamId]->count() : 0;
109
    }
110
111
    /**
112
     * @param string $type
113
     * @param Version $version
114
     * @return EventStreamInterface
115
     */
116 1
    protected function readStoredEventsOfTypeAndVersion($type, $version)
117
    {
118 1
        $storedEvents = [];
119 1
        foreach ($this->streams as $stream) {
120
            /** @var StoredEvent $event */
121 1
            foreach ($stream as $event) {
122 1
                if ($event->type() === $type && $event->version()->equalTo($version)) {
123 1
                    $storedEvents[] = $event;
124
                }
125
            }
126
        }
127 1
        return new EventStream($storedEvents);
128
    }
129
130
    /**
131
     * @param string $streamId
132
     * @return bool
133
     */
134 34
    protected function streamExists($streamId)
135
    {
136 34
        return isset($this->streams[$streamId]);
137
    }
138
139
    /**
140
     * @param string $streamId
141
     * @param \DateTimeImmutable $datetime
142
     * @return int
143
     * @throws EventStreamDoesNotExistException
144
     */
145 4
    public function getStreamVersionAt($streamId, \DateTimeImmutable $datetime)
146
    {
147 4
        if (!$this->streamExists($streamId)) {
148 1
            throw EventStreamDoesNotExistException::fromStreamId($streamId);
149
        }
150
151 3
        $filteredStoredEvents = $this->streams[$streamId]->filter(function(StoredEvent $event) use ($datetime) {
152 3
            return $event->occurredOn()->getTimestamp() <= $datetime->getTimestamp();
153 3
        });
154
155
        return count($filteredStoredEvents);
156
    }
157
}
158