Passed
Push — master ( 36a807...bb7b0f )
by Daniel
03:08
created

findByIdAndVersion()   B

Complexity

Conditions 2
Paths 2

Size

Total Lines 27
Code Lines 20

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 14
CRAP Score 2

Importance

Changes 0
Metric Value
dl 0
loc 27
ccs 14
cts 14
cp 1
rs 8.8571
c 0
b 0
f 0
cc 2
eloc 20
nc 2
nop 2
crap 2
1
<?php
2
3
namespace DDDominio\EventSourcing\Common;
4
5
use DDDominio\EventSourcing\Common\Annotation\AggregateId;
6
use DDDominio\EventSourcing\EventStore\EventStoreInterface;
7
use DDDominio\EventSourcing\Snapshotting\SnapshotStoreInterface;
8
use Doctrine\Common\Annotations\AnnotationReader;
9
10
class EventSourcedAggregateRepository
11
{
12
    /**
13
     * @var EventStoreInterface
14
     */
15
    private $eventStore;
16
17
    /**
18
     * @var SnapshotStoreInterface
19
     */
20
    private $snapshotStore;
21
22
    /**
23
     * @var AggregateReconstructor
24
     */
25
    private $aggregateReconstructor;
26
27
    /**
28
     * @var string
29
     */
30
    private $aggregateClass;
31
32
    /**
33
     * @param EventStoreInterface $eventStore
34
     * @param SnapshotStoreInterface $snapshotStore
35
     * @param AggregateReconstructor $aggregateReconstructor
36
     * @param string $aggregateClass
37
     */
38 10
    public function __construct(
39
        EventStoreInterface $eventStore, SnapshotStoreInterface $snapshotStore, $aggregateReconstructor, $aggregateClass
40
    ) {
41 10
        $this->eventStore = $eventStore;
42 10
        $this->snapshotStore = $snapshotStore;
43 10
        $this->aggregateReconstructor = $aggregateReconstructor;
44 10
        $this->aggregateClass = $aggregateClass;
45 10
    }
46
47
    /**
48
     * @param EventSourcedAggregateRootInterface $aggregate
49
     */
50 3
    public function add($aggregate)
51
    {
52 3
        $this->eventStore->appendToStream(
53 3
            $this->streamIdFromAggregate($aggregate),
54 3
            $aggregate->changes()
55
        );
56 3
        $aggregate->clearChanges();
57 3
    }
58
59
    /**
60
     * @param EventSourcedAggregateRootInterface $aggregate
61
     */
62 3
    public function save($aggregate)
63
    {
64 3
        $this->eventStore->appendToStream(
65 3
            $this->streamIdFromAggregate($aggregate),
66 3
            $aggregate->changes(),
67 3
            $aggregate->originalVersion()
68
        );
69 3
        $aggregate->clearChanges();
70 3
    }
71
72
    /**
73
     * @param string $id
74
     * @return EventSourcedAggregateRootInterface
75
     */
76 2
    public function findById($id)
77
    {
78 2
        $snapshot = $this->snapshotStore
79 2
            ->findLastSnapshot($this->aggregateClass, $id);
80
81 2
        $streamId = $this->streamIdFromAggregateId($id);
82 2
        if ($snapshot) {
83 1
            $stream = $this->eventStore
84 1
                ->readStreamEventsForward($streamId, $snapshot->version() + 1);
85
        } else {
86 1
            $stream = $this->eventStore->readFullStream($streamId);
87
        }
88
89 2
        return $this->aggregateReconstructor->reconstitute(
90 2
            $this->aggregateClass,
91
            $stream,
92
            $snapshot
93
        );
94
    }
95
96
    /**
97
     * @param string $id
98
     * @param int $version
99
     * @return EventSourcedAggregateRootInterface
100
     */
101 2
    public function findByIdAndVersion($id, $version)
102
    {
103 2
        $snapshot = $this->snapshotStore
104 2
            ->findNearestSnapshotToVersion($this->aggregateClass, $id, $version);
105
106 2
        $streamId = $this->streamIdFromAggregateId($id);
107 2
        if ($snapshot) {
108 1
            $stream = $this->eventStore
109 1
                ->readStreamEventsForward(
110
                    $streamId,
111 1
                    $snapshot->version() + 1,
112 1
                    $version - $snapshot->version()
113
                );
114
        } else {
115 1
            $stream = $this->eventStore
116 1
                ->readStreamEventsForward(
117
                    $streamId,
118 1
                    1,
119
                    $version
120
                );
121
        }
122 2
        return $this->aggregateReconstructor->reconstitute(
123 2
            $this->aggregateClass,
124
            $stream,
125
            $snapshot
126
        );
127
    }
128
129
    /**
130
     * @param EventSourcedAggregateRootInterface $aggregate
131
     * @return string
132
     */
133 6
    private function streamIdFromAggregate($aggregate)
134
    {
135 6
        return $this->streamIdFromAggregateId($this->aggregateId($aggregate));
136
    }
137
138
    /**
139
     * @param string $aggregateId
140
     * @return string
141
     */
142 10
    protected function streamIdFromAggregateId($aggregateId)
143
    {
144 10
        return $this->aggregateClass . '-' . $aggregateId;
145
    }
146
147
    /**
148
     * @param object $aggregate
149
     * @return string
150
     * @throws \Exception
151
     */
152 6
    private function aggregateId($aggregate)
153
    {
154 6
        if (method_exists($aggregate, 'id')) {
155 6
            return (string) $aggregate->id();
156
        }
157
        if (method_exists($aggregate, 'getId')) {
158
            return (string) $aggregate->getId();
159
        }
160
        $reflection = new \ReflectionClass($aggregate);
161
        $annotationReader = new AnnotationReader();
162
        $aggregateIdMethodName = null;
163
        foreach ($reflection->getMethods() as $reflectionMethod) {
164
            $annotation = $annotationReader->getMethodAnnotation(
165
                $reflectionMethod,
166
                AggregateId::class
167
            );
168
            if (!is_null($annotation)) {
169
                $aggregateIdMethodName = $reflectionMethod->getName();
0 ignored issues
show
Bug introduced by
Consider using $reflectionMethod->name. There is an issue with getName() and APC-enabled PHP versions.
Loading history...
170
                break;
171
            }
172
        }
173
        if (is_null($aggregateIdMethodName)) {
174
            throw new \RuntimeException('No method "id", "getId" or with "@AggregateId" annotation found in '. get_class($aggregate));
175
        }
176
        return (string) $aggregate->{$aggregateIdMethodName}();
177
    }
178
}
179