Passed
Push — master ( 8f1ad4...65e262 )
by Daniel
06:10
created

findByIdAndDatetime()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 6
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 6
ccs 4
cts 4
cp 1
rs 9.4285
c 0
b 0
f 0
cc 1
eloc 4
nc 1
nop 2
crap 1
1
<?php
2
3
namespace DDDominio\EventSourcing\Common;
4
5
use DDDominio\EventSourcing\Common\Annotation\AggregateId;
6
use DDDominio\EventSourcing\EventStore\EventStoreInterface;
7
use DDDominio\EventSourcing\Snapshotting\SnapshotStoreInterface;
8
use Doctrine\Common\Annotations\AnnotationReader;
9
10
class EventSourcedAggregateRepository
11
{
12
    /**
13
     * @var EventStoreInterface
14
     */
15
    private $eventStore;
16
17
    /**
18
     * @var SnapshotStoreInterface
19
     */
20
    private $snapshotStore;
21
22
    /**
23
     * @var AggregateReconstructor
24
     */
25
    private $aggregateReconstructor;
26
27
    /**
28
     * @var string
29
     */
30
    private $aggregateClass;
31
32
    /**
33
     * @param EventStoreInterface $eventStore
34
     * @param SnapshotStoreInterface $snapshotStore
35
     * @param AggregateReconstructor $aggregateReconstructor
36
     * @param string $aggregateClass
37
     */
38 12
    public function __construct(
39
        EventStoreInterface $eventStore, SnapshotStoreInterface $snapshotStore, $aggregateReconstructor, $aggregateClass
40
    ) {
41 12
        $this->eventStore = $eventStore;
42 12
        $this->snapshotStore = $snapshotStore;
43 12
        $this->aggregateReconstructor = $aggregateReconstructor;
44 12
        $this->aggregateClass = $aggregateClass;
45 12
    }
46
47
    /**
48
     * @param EventSourcedAggregateRootInterface $aggregate
49
     */
50 3
    public function add($aggregate)
51
    {
52 3
        $this->eventStore->appendToStream(
53 3
            $this->streamIdFromAggregate($aggregate),
54 3
            $aggregate->changes()
55
        );
56 3
        $aggregate->clearChanges();
57 3
    }
58
59
    /**
60
     * @param EventSourcedAggregateRootInterface $aggregate
61
     */
62 3
    public function save($aggregate)
63
    {
64 3
        $this->eventStore->appendToStream(
65 3
            $this->streamIdFromAggregate($aggregate),
66 3
            $aggregate->changes(),
67 3
            $aggregate->originalVersion()
68
        );
69 3
        $aggregate->clearChanges();
70 3
    }
71
72
    /**
73
     * @param string $id
74
     * @return EventSourcedAggregateRootInterface
75
     */
76 2
    public function findById($id)
77
    {
78 2
        $snapshot = $this->snapshotStore
79 2
            ->findLastSnapshot($this->aggregateClass, $id);
80
81 2
        $streamId = $this->streamIdFromAggregateId($id);
82 2
        if ($snapshot) {
83 1
            $stream = $this->eventStore
84 1
                ->readStreamEvents($streamId, $snapshot->version() + 1);
85
        } else {
86 1
            $stream = $this->eventStore->readFullStream($streamId);
87
        }
88
89 2
        return $this->aggregateReconstructor->reconstitute(
90 2
            $this->aggregateClass,
91
            $stream,
92
            $snapshot
93
        );
94
    }
95
96
    /**
97
     * @param string $id
98
     * @param int $version
99
     * @return EventSourcedAggregateRootInterface
100
     */
101 4
    public function findByIdAndVersion($id, $version)
102
    {
103 4
        $snapshot = $this->snapshotStore
104 4
            ->findNearestSnapshotToVersion($this->aggregateClass, $id, $version);
105
106 4
        $streamId = $this->streamIdFromAggregateId($id);
107 4
        if ($snapshot) {
108 2
            $stream = $this->eventStore
109 2
                ->readStreamEvents(
110
                    $streamId,
111 2
                    $snapshot->version() + 1,
112 2
                    $version - $snapshot->version()
113
                );
114
        } else {
115 2
            $stream = $this->eventStore
116 2
                ->readStreamEvents(
117
                    $streamId,
118 2
                    1,
119
                    $version
120
                );
121
        }
122 4
        return $this->aggregateReconstructor->reconstitute(
123 4
            $this->aggregateClass,
124
            $stream,
125
            $snapshot
126
        );
127
    }
128
129
    /**
130
     * @param string $id
131
     * @param \DateTimeImmutable $datetime
132
     * @return EventSourcedAggregateRootInterface
133
     */
134 2
    public function findByIdAndDatetime($id, $datetime)
135
    {
136 2
        $streamId = $this->streamIdFromAggregateId($id);
137 2
        $version = $this->eventStore->getStreamVersionAt($streamId, $datetime);
138 2
        return $this->findByIdAndVersion($id, $version);
139
    }
140
141
    /**
142
     * @param EventSourcedAggregateRootInterface $aggregate
143
     * @return string
144
     */
145 6
    private function streamIdFromAggregate($aggregate)
146
    {
147 6
        return $this->streamIdFromAggregateId($this->aggregateId($aggregate));
148
    }
149
150
    /**
151
     * @param string $aggregateId
152
     * @return string
153
     */
154 12
    protected function streamIdFromAggregateId($aggregateId)
155
    {
156 12
        return $this->aggregateClass . '-' . $aggregateId;
157
    }
158
159
    /**
160
     * @param object $aggregate
161
     * @return string
162
     * @throws \Exception
163
     */
164 6
    private function aggregateId($aggregate)
165
    {
166 6
        if (method_exists($aggregate, 'id')) {
167 6
            return (string) $aggregate->id();
168
        }
169
        if (method_exists($aggregate, 'getId')) {
170
            return (string) $aggregate->getId();
171
        }
172
        $reflection = new \ReflectionClass($aggregate);
173
        $annotationReader = new AnnotationReader();
174
        $aggregateIdMethodName = null;
175
        foreach ($reflection->getMethods() as $reflectionMethod) {
176
            $annotation = $annotationReader->getMethodAnnotation(
177
                $reflectionMethod,
178
                AggregateId::class
179
            );
180
            if (!is_null($annotation)) {
181
                $aggregateIdMethodName = $reflectionMethod->getName();
0 ignored issues
show
Bug introduced by
Consider using $reflectionMethod->name. There is an issue with getName() and APC-enabled PHP versions.
Loading history...
182
                break;
183
            }
184
        }
185
        if (is_null($aggregateIdMethodName)) {
186
            throw new \RuntimeException('No method "id", "getId" or with "@AggregateId" annotation found in '. get_class($aggregate));
187
        }
188
        return (string) $aggregate->{$aggregateIdMethodName}();
189
    }
190
}
191