Completed
Push — master ( a5631d...69c428 )
by Daniel
03:14
created

EventSourcedAggregateRepository   A

Complexity

Total Complexity 10

Size/Duplication

Total Lines 162
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 6

Test Coverage

Coverage 100%

Importance

Changes 0
Metric Value
dl 0
loc 162
c 0
b 0
f 0
wmc 10
lcom 1
cbo 6
ccs 53
cts 53
cp 1
rs 10

8 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 13 1
A add() 0 8 1
A save() 0 9 1
A findById() 0 19 2
B findByIdAndVersion() 0 27 2
A findByIdAndDatetime() 0 6 1
A streamIdFromAggregate() 0 6 1
A streamIdFromAggregateId() 0 4 1
1
<?php
2
3
namespace DDDominio\EventSourcing\Common;
4
5
use DDDominio\EventSourcing\EventStore\EventStoreInterface;
6
use DDDominio\EventSourcing\Snapshotting\SnapshotStoreInterface;
7
8
class EventSourcedAggregateRepository
9
{
10
    /**
11
     * @var EventStoreInterface
12
     */
13
    private $eventStore;
14
15
    /**
16
     * @var SnapshotStoreInterface
17
     */
18
    private $snapshotStore;
19
20
    /**
21
     * @var AggregateReconstructor
22
     */
23
    private $aggregateReconstructor;
24
25
    /**
26
     * @var string
27
     */
28
    private $aggregateClass;
29
30
    /**
31
     * @var AggregateIdExtractorInterface
32
     */
33
    private $aggregateIdExtractor;
34
35
    /**
36
     * @param EventStoreInterface $eventStore
37
     * @param SnapshotStoreInterface $snapshotStore
38
     * @param AggregateReconstructor $aggregateReconstructor
39
     * @param AggregateIdExtractorInterface $aggregateIdExtractor
40
     * @param string $aggregateClass
41
     */
42 12
    public function __construct(
43
        EventStoreInterface $eventStore,
44
        SnapshotStoreInterface $snapshotStore,
45
        $aggregateReconstructor,
46
        $aggregateIdExtractor,
47
        $aggregateClass
48
    ) {
49 12
        $this->eventStore = $eventStore;
50 12
        $this->snapshotStore = $snapshotStore;
51 12
        $this->aggregateReconstructor = $aggregateReconstructor;
52 12
        $this->aggregateIdExtractor = $aggregateIdExtractor;
53 12
        $this->aggregateClass = $aggregateClass;
54 12
    }
55
56
    /**
57
     * @param EventSourcedAggregateRootInterface $aggregate
58
     */
59 3
    public function add($aggregate)
60
    {
61 3
        $this->eventStore->appendToStream(
62 3
            $this->streamIdFromAggregate($aggregate),
63 3
            $aggregate->changes()
64
        );
65 3
        $aggregate->clearChanges();
66 3
    }
67
68
    /**
69
     * @param EventSourcedAggregateRootInterface $aggregate
70
     */
71 3
    public function save($aggregate)
72
    {
73 3
        $this->eventStore->appendToStream(
74 3
            $this->streamIdFromAggregate($aggregate),
75 3
            $aggregate->changes(),
76 3
            $aggregate->originalVersion()
77
        );
78 3
        $aggregate->clearChanges();
79 3
    }
80
81
    /**
82
     * @param string $id
83
     * @return EventSourcedAggregateRootInterface
84
     */
85 2
    public function findById($id)
86
    {
87 2
        $snapshot = $this->snapshotStore
88 2
            ->findLastSnapshot($this->aggregateClass, $id);
89
90 2
        $streamId = $this->streamIdFromAggregateId($id);
91 2
        if ($snapshot) {
92 1
            $stream = $this->eventStore
93 1
                ->readStreamEvents($streamId, $snapshot->version() + 1);
94
        } else {
95 1
            $stream = $this->eventStore->readFullStream($streamId);
96
        }
97
98 2
        return $this->aggregateReconstructor->reconstitute(
99 2
            $this->aggregateClass,
100
            $stream,
101
            $snapshot
102
        );
103
    }
104
105
    /**
106
     * @param string $id
107
     * @param int $version
108
     * @return EventSourcedAggregateRootInterface
109
     */
110 4
    public function findByIdAndVersion($id, $version)
111
    {
112 4
        $snapshot = $this->snapshotStore
113 4
            ->findNearestSnapshotToVersion($this->aggregateClass, $id, $version);
114
115 4
        $streamId = $this->streamIdFromAggregateId($id);
116 4
        if ($snapshot) {
117 2
            $stream = $this->eventStore
118 2
                ->readStreamEvents(
119
                    $streamId,
120 2
                    $snapshot->version() + 1,
121 2
                    $version - $snapshot->version()
122
                );
123
        } else {
124 2
            $stream = $this->eventStore
125 2
                ->readStreamEvents(
126
                    $streamId,
127 2
                    1,
128
                    $version
129
                );
130
        }
131 4
        return $this->aggregateReconstructor->reconstitute(
132 4
            $this->aggregateClass,
133
            $stream,
134
            $snapshot
135
        );
136
    }
137
138
    /**
139
     * @param string $id
140
     * @param \DateTimeImmutable $datetime
141
     * @return EventSourcedAggregateRootInterface
142
     */
143 2
    public function findByIdAndDatetime($id, $datetime)
144
    {
145 2
        $streamId = $this->streamIdFromAggregateId($id);
146 2
        $version = $this->eventStore->getStreamVersionAt($streamId, $datetime);
147 2
        return $this->findByIdAndVersion($id, $version);
148
    }
149
150
    /**
151
     * @param EventSourcedAggregateRootInterface $aggregate
152
     * @return string
153
     */
154 6
    private function streamIdFromAggregate($aggregate)
155
    {
156 6
        return $this->streamIdFromAggregateId(
157 6
            $this->aggregateIdExtractor->extract($aggregate)
158
        );
159
    }
160
161
    /**
162
     * @param string $aggregateId
163
     * @return string
164
     */
165 12
    protected function streamIdFromAggregateId($aggregateId)
166
    {
167 12
        return $this->aggregateClass . '-' . $aggregateId;
168
    }
169
}
170