Completed
Push — master ( eb0ddf...6e23e2 )
by Daniel
03:16
created

assertValidAggregate()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 8
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 5
CRAP Score 2

Importance

Changes 0
Metric Value
dl 0
loc 8
ccs 5
cts 5
cp 1
rs 9.4285
c 0
b 0
f 0
cc 2
eloc 4
nc 2
nop 1
crap 2
1
<?php
2
3
namespace DDDominio\EventSourcing\Common;
4
5
use DDDominio\EventSourcing\EventStore\EventStoreInterface;
6
use DDDominio\EventSourcing\Snapshotting\SnapshotStoreInterface;
7
8
class EventSourcedAggregateRepository
9
{
10
    /**
11
     * @var EventStoreInterface
12
     */
13
    private $eventStore;
14
15
    /**
16
     * @var SnapshotStoreInterface
17
     */
18
    private $snapshotStore;
19
20
    /**
21
     * @var AggregateReconstructor
22
     */
23
    private $aggregateReconstructor;
24
25
    /**
26
     * @var string
27
     */
28
    private $aggregateClass;
29
30
    /**
31
     * @var AggregateIdExtractorInterface
32
     */
33
    private $aggregateIdExtractor;
34
35
    /**
36
     * @param EventStoreInterface $eventStore
37
     * @param SnapshotStoreInterface $snapshotStore
38
     * @param AggregateReconstructor $aggregateReconstructor
39
     * @param AggregateIdExtractorInterface $aggregateIdExtractor
40
     * @param string $aggregateClass
41
     */
42 14
    public function __construct(
43
        EventStoreInterface $eventStore,
44
        SnapshotStoreInterface $snapshotStore,
45
        $aggregateReconstructor,
46
        $aggregateIdExtractor,
47
        $aggregateClass
48
    ) {
49 14
        $this->eventStore = $eventStore;
50 14
        $this->snapshotStore = $snapshotStore;
51 14
        $this->aggregateReconstructor = $aggregateReconstructor;
52 14
        $this->aggregateIdExtractor = $aggregateIdExtractor;
53 14
        $this->aggregateClass = $aggregateClass;
54 14
    }
55
56
    /**
57
     * @param EventSourcedAggregateRootInterface $aggregate
58
     */
59 8
    public function save($aggregate)
60
    {
61 8
        $this->assertValidAggregate($aggregate);
62 6
        $this->eventStore->appendToStream(
63 6
            $this->streamIdFromAggregate($aggregate),
64 6
            $aggregate->changes(),
65 6
            $aggregate->originalVersion()
66
        );
67 6
        $aggregate->clearChanges();
68 6
    }
69
70
    /**
71
     * @param $aggregate
72
     */
73 8
    private function assertValidAggregate($aggregate)
74
    {
75 8
        if (!($aggregate instanceof $this->aggregateClass)) {
76 2
            throw new \InvalidArgumentException(
77 2
                sprintf('EventSourcedAggregateRepository expects an aggregate of type "%s" but "%s" given', $this->aggregateClass, get_class($aggregate))
78
            );
79
        }
80 6
    }
81
82
    /**
83
     * @param string $id
84
     * @return EventSourcedAggregateRootInterface
85
     */
86 2
    public function findById($id)
87
    {
88 2
        $snapshot = $this->snapshotStore
89 2
            ->findLastSnapshot($this->aggregateClass, $id);
90
91 2
        $streamId = $this->streamIdFromAggregateId($id);
92 2
        if ($snapshot) {
93 1
            $stream = $this->eventStore
94 1
                ->readStreamEvents($streamId, $snapshot->version() + 1);
95
        } else {
96 1
            $stream = $this->eventStore->readFullStream($streamId);
97
        }
98
99 2
        return $this->aggregateReconstructor->reconstitute(
100 2
            $this->aggregateClass,
101
            $stream,
102
            $snapshot
103
        );
104
    }
105
106
    /**
107
     * @param string $id
108
     * @param int $version
109
     * @return EventSourcedAggregateRootInterface
110
     */
111 4
    public function findByIdAndVersion($id, $version)
112
    {
113 4
        $snapshot = $this->snapshotStore
114 4
            ->findNearestSnapshotToVersion($this->aggregateClass, $id, $version);
115
116 4
        $streamId = $this->streamIdFromAggregateId($id);
117 4
        if ($snapshot) {
118 2
            $stream = $this->eventStore
119 2
                ->readStreamEvents(
120
                    $streamId,
121 2
                    $snapshot->version() + 1,
122 2
                    $version - $snapshot->version()
123
                );
124
        } else {
125 2
            $stream = $this->eventStore
126 2
                ->readStreamEvents(
127
                    $streamId,
128 2
                    1,
129
                    $version
130
                );
131
        }
132 4
        return $this->aggregateReconstructor->reconstitute(
133 4
            $this->aggregateClass,
134
            $stream,
135
            $snapshot
136
        );
137
    }
138
139
    /**
140
     * @param string $id
141
     * @param \DateTimeImmutable $datetime
142
     * @return EventSourcedAggregateRootInterface
143
     */
144 2
    public function findByIdAndDatetime($id, $datetime)
145
    {
146 2
        $streamId = $this->streamIdFromAggregateId($id);
147 2
        $version = $this->eventStore->getStreamVersionAt($streamId, $datetime);
148 2
        return $this->findByIdAndVersion($id, $version);
149
    }
150
151
    /**
152
     * @param EventSourcedAggregateRootInterface $aggregate
153
     * @return string
154
     */
155 6
    private function streamIdFromAggregate($aggregate)
156
    {
157 6
        return $this->streamIdFromAggregateId(
158 6
            $this->aggregateIdExtractor->extract($aggregate)
159
        );
160
    }
161
162
    /**
163
     * @param string $aggregateId
164
     * @return string
165
     */
166 12
    protected function streamIdFromAggregateId($aggregateId)
167
    {
168 12
        return $this->aggregateClass . '-' . $aggregateId;
169
    }
170
}
171