findByIdAndVersion()   A
last analyzed

Complexity

Conditions 2
Paths 2

Size

Total Lines 27

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 14
CRAP Score 2

Importance

Changes 0
Metric Value
dl 0
loc 27
ccs 14
cts 14
cp 1
rs 9.488
c 0
b 0
f 0
cc 2
nc 2
nop 2
crap 2
1
<?php
2
3
namespace DDDominio\EventSourcing\Common;
4
5
use DDDominio\EventSourcing\EventStore\EventStoreInterface;
6
use DDDominio\EventSourcing\Snapshotting\SnapshotStoreInterface;
7
8
class EventSourcedAggregateRepository
9
{
10
    /**
11
     * @var EventStoreInterface
12
     */
13
    private $eventStore;
14
15
    /**
16
     * @var SnapshotStoreInterface
17
     */
18
    private $snapshotStore;
19
20
    /**
21
     * @var AggregateReconstructor
22
     */
23
    private $aggregateReconstructor;
24
25
    /**
26
     * @var string
27
     */
28
    private $aggregateClass;
29
30
    /**
31
     * @var AggregateIdExtractorInterface
32
     */
33
    private $aggregateIdExtractor;
34
35
    /**
36
     * @param EventStoreInterface $eventStore
37
     * @param SnapshotStoreInterface $snapshotStore
38
     * @param AggregateReconstructor $aggregateReconstructor
39
     * @param AggregateIdExtractorInterface $aggregateIdExtractor
40
     * @param string $aggregateClass
41
     */
42 14
    public function __construct(
43
        EventStoreInterface $eventStore,
44
        SnapshotStoreInterface $snapshotStore,
45
        $aggregateReconstructor,
46
        $aggregateIdExtractor,
47
        $aggregateClass
48
    ) {
49 14
        $this->eventStore = $eventStore;
50 14
        $this->snapshotStore = $snapshotStore;
51 14
        $this->aggregateReconstructor = $aggregateReconstructor;
52 14
        $this->aggregateIdExtractor = $aggregateIdExtractor;
53 14
        $this->aggregateClass = $aggregateClass;
54 14
    }
55
56
    /**
57
     * @param EventSourcedAggregateRootInterface $aggregate
58
     */
59 8
    public function save($aggregate)
60
    {
61 8
        $this->assertValidAggregate($aggregate);
62 6
        $this->eventStore->appendToStream(
63 6
            $this->streamIdFromAggregate($aggregate),
64 6
            $aggregate->changes()->events(),
65 6
            $aggregate->originalVersion()
66
        );
67 6
        $aggregate->clearChanges();
68 6
    }
69
70
    /**
71
     * @param $aggregate
72
     */
73 8
    private function assertValidAggregate($aggregate)
74
    {
75 8
        if (is_null($aggregate)) {
76 1
            throw new \InvalidArgumentException(
77 1
                sprintf('EventSourcedAggregateRepository expects an aggregate of type "%s" but "NULL" given', $this->aggregateClass)
78
            );
79
        }
80
81 7
        if (!($aggregate instanceof $this->aggregateClass)) {
82 1
            throw new \InvalidArgumentException(
83 1
                sprintf('EventSourcedAggregateRepository expects an aggregate of type "%s" but "%s" given', $this->aggregateClass, get_class($aggregate))
84
            );
85
        }
86 6
    }
87
88
    /**
89
     * @param string $id
90
     * @return EventSourcedAggregateRootInterface
91
     */
92 2
    public function findById($id)
93
    {
94 2
        $snapshot = $this->snapshotStore
95 2
            ->findLastSnapshot($this->aggregateClass, $id);
96
97 2
        $streamId = $this->streamIdFromAggregateId($id);
98 2
        if ($snapshot) {
99 1
            $stream = $this->eventStore
100 1
                ->readStreamEvents($streamId, $snapshot->version() + 1);
101
        } else {
102 1
            $stream = $this->eventStore->readFullStream($streamId);
103
        }
104
105 2
        return $this->aggregateReconstructor->reconstitute(
106 2
            $this->aggregateClass,
107
            $stream,
108
            $snapshot
109
        );
110
    }
111
112
    /**
113
     * @param string $id
114
     * @param int $version
115
     * @return EventSourcedAggregateRootInterface
116
     */
117 4
    public function findByIdAndVersion($id, $version)
118
    {
119 4
        $snapshot = $this->snapshotStore
120 4
            ->findNearestSnapshotToVersion($this->aggregateClass, $id, $version);
121
122 4
        $streamId = $this->streamIdFromAggregateId($id);
123 4
        if ($snapshot) {
124 2
            $stream = $this->eventStore
125 2
                ->readStreamEvents(
126
                    $streamId,
127 2
                    $snapshot->version() + 1,
128 2
                    $version - $snapshot->version()
129
                );
130
        } else {
131 2
            $stream = $this->eventStore
132 2
                ->readStreamEvents(
133
                    $streamId,
134 2
                    1,
135
                    $version
136
                );
137
        }
138 4
        return $this->aggregateReconstructor->reconstitute(
139 4
            $this->aggregateClass,
140
            $stream,
141
            $snapshot
142
        );
143
    }
144
145
    /**
146
     * @param string $id
147
     * @param \DateTimeImmutable $datetime
148
     * @return EventSourcedAggregateRootInterface
149
     */
150 2
    public function findByIdAndDatetime($id, $datetime)
151
    {
152 2
        $streamId = $this->streamIdFromAggregateId($id);
153 2
        $version = $this->eventStore->getStreamVersionAt($streamId, $datetime);
154 2
        return $this->findByIdAndVersion($id, $version);
155
    }
156
157
    /**
158
     * @param EventSourcedAggregateRootInterface $aggregate
159
     * @return string
160
     */
161 6
    private function streamIdFromAggregate($aggregate)
162
    {
163 6
        return $this->streamIdFromAggregateId(
164 6
            $this->aggregateIdExtractor->extract($aggregate)
165
        );
166
    }
167
168
    /**
169
     * @param string $aggregateId
170
     * @return string
171
     */
172 12
    protected function streamIdFromAggregateId($aggregateId)
173
    {
174 12
        return $this->aggregateClass . '-' . $aggregateId;
175
    }
176
}
177