Passed
Pull Request — master (#20)
by Matthias
02:20
created

AggregateRepository::save()   A

Complexity

Conditions 4
Paths 1

Size

Total Lines 23
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 18
CRAP Score 4

Importance

Changes 0
Metric Value
cc 4
eloc 13
nc 1
nop 1
dl 0
loc 23
ccs 18
cts 18
cp 1
crap 4
rs 9.8333
c 0
b 0
f 0
1
<?php
2
3
namespace TalisOrm;
4
5
use function array_diff;
6
use Doctrine\DBAL\Connection;
7
use Doctrine\DBAL\Driver\ResultStatement;
8
use InvalidArgumentException;
9
use function is_a;
10
use LogicException;
11
use PDO;
12
use function spl_object_hash;
13
use TalisOrm\DomainEvents\EventDispatcher;
14
use Webmozart\Assert\Assert;
15
16
final class AggregateRepository
17
{
18
    /**
19
     * @var Connection
20
     */
21
    private $connection;
22
23
    /**
24
     * @var EventDispatcher
25
     */
26
    private $eventDispatcher;
27
28
    /**
29
     * @var Entity[]
30
     */
31
    private $knownEntities = [];
32
33
    /**
34
     * @var array
35
     */
36
    private $knownChildEntities = [];
37
38 25
    public function __construct(Connection $connection, EventDispatcher $eventDispatcher)
39
    {
40 25
        $this->connection = $connection;
41 25
        $this->eventDispatcher = $eventDispatcher;
42 25
    }
43
44
    /**
45
     * @param Aggregate $aggregate
46
     * @throws ConcurrentUpdateOccurred (only when you use optimistic concurrency locking)
47
     * @return void
48
     */
49 24
    public function save(Aggregate $aggregate)
50
    {
51
        $this->connection->transactional(function () use ($aggregate) {
52 24
            $this->insertOrUpdate($aggregate);
53 24
            $this->rememberEntity($aggregate);
54
55 24
            foreach ($this->deletedChildEntities($aggregate) as $childEntity) {
56 2
                $this->connection->delete(
57 2
                    $this->connection->quoteIdentifier($childEntity::tableName()),
58 2
                    $childEntity->identifier()
59 2
                );
60 24
            }
61
62 24
            $childEntitiesByType = $aggregate->childEntitiesByType();
63 24
            foreach ($childEntitiesByType as $type => $childEntities) {
64 22
                foreach ($childEntities as $childEntity) {
65 12
                    $this->insertOrUpdate($childEntity);
66 22
                }
67 24
            }
68 24
            $this->rememberChildEntities($aggregate, $childEntitiesByType);
69 24
        });
70
71 24
        $this->eventDispatcher->dispatch($aggregate->releaseEvents());
72 24
    }
73
74
    /**
75
     * @param string $aggregateClass
76
     * @param AggregateId $aggregateId
77
     * @return Aggregate
78
     */
79 24
    public function getById($aggregateClass, AggregateId $aggregateId)
80
    {
81 24
        Assert::string($aggregateClass);
82
83 24
        if (!is_a($aggregateClass, Aggregate::class, true)) {
84 1
            throw new InvalidArgumentException(sprintf(
85 1
                'Class "%s" has to implement "%s"',
86 1
                $aggregateClass,
87
                Aggregate::class
88 1
            ));
89
        }
90
91 23
        $aggregateState = $this->getAggregateState($aggregateClass, $aggregateId);
92
93 21
        $childEntitiesByType = $this->getChildEntitiesByType($aggregateClass, $aggregateId);
94
95 21
        $aggregate = $aggregateClass::fromState($aggregateState, $childEntitiesByType);
96
97 21
        if (!$aggregate instanceof $aggregateClass || !$aggregate instanceof Aggregate) {
98 1
            throw new LogicException(sprintf(
99 1
                'Method "%s::fromState()" was expected to return an instance of "%1$s"',
100
                $aggregateClass
101 1
            ));
102
        }
103
104 20
        $this->rememberEntity($aggregate);
105 20
        $this->rememberChildEntities($aggregate, $childEntitiesByType);
106
107 20
        return $aggregate;
108
    }
109
110
    /**
111
     * @param string $aggregateClass
112
     * @param AggregateId $aggregateId
113
     * @return array
114
     */
115 23
    private function getAggregateState($aggregateClass, AggregateId $aggregateId)
116
    {
117 23
        $aggregateStates = $this->fetchAll(
118 23
            $aggregateClass::tableName(),
119 23
            $aggregateClass::identifierForQuery($aggregateId)
120 23
        );
121
122 23
        if (\count($aggregateStates) === 0) {
123 4
            throw new AggregateNotFoundException(sprintf(
124 4
                'Could not find aggregate of type "%s" with id "%s"',
125 4
                $aggregateClass,
126
                $aggregateId
127 4
            ));
128
        }
129
130 21
        $aggregateState = reset($aggregateStates);
131
132 21
        Assert::isArray($aggregateState);
133
134 21
        return $aggregateState;
135
    }
136
137
    /**
138
     * @param string $aggregateClass
139
     * @param AggregateId $aggregateId
140
     * @return array[]
141
     */
142 21
    private function getChildEntitiesByType($aggregateClass, AggregateId $aggregateId)
143
    {
144 21
        $childEntitiesByType = [];
145
146 21
        foreach ($aggregateClass::childEntityTypes() as $childEntityType) {
147 20
            $childEntityStates = $this->fetchAll(
148 20
                $childEntityType::tableName(),
149 20
                $childEntityType::identifierForQuery($aggregateId)
150 20
            );
151
152 20
            $childEntitiesByType[$childEntityType] = array_map(
153
                function (array $childEntityState) use ($childEntityType) {
154 10
                    return $childEntityType::fromState($childEntityState);
155 20
                },
156
                $childEntityStates
157 20
            );
158 21
        }
159
160 21
        return $childEntitiesByType;
161
    }
162
163 4
    public function delete(Aggregate $aggregate)
164
    {
165
        $this->connection->transactional(function () use ($aggregate) {
166 4
            $this->connection->delete(
167 4
                $this->connection->quoteIdentifier($aggregate::tableName()),
168 4
                $aggregate->identifier()
169 4
            );
170
171 4
            foreach ($aggregate->childEntitiesByType() as $type => $childEntities) {
172 4
                foreach ($childEntities as $childEntity) {
173
                    /** @var ChildEntity $childEntity */
174 2
                    $this->connection->delete(
175 2
                        $this->connection->quoteIdentifier($childEntity::tableName()),
176 2
                        $childEntity->identifier()
177 2
                    );
178 4
                }
179 4
            }
180 4
        });
181 4
    }
182
183 24
    private function insertOrUpdate(Entity $entity)
184
    {
185 24
        if ($this->exists($entity)) {
186 12
            $state = $entity->state();
187 12
            if (array_key_exists(Aggregate::VERSION_COLUMN, $state)) {
188 12
                $aggregateVersion = $state[Aggregate::VERSION_COLUMN];
189 12
                $aggregateVersionInDb = (int)$this->select(
190 12
                    Aggregate::VERSION_COLUMN,
191 12
                    $entity->tableName(),
192 12
                    $entity->identifier()
193 12
                )->fetchColumn();
194 12
                if ($aggregateVersionInDb >= $aggregateVersion) {
195 4
                    throw ConcurrentUpdateOccurred::ofEntity($entity);
196
                }
197 12
            }
198 12
            $this->connection->update(
199 12
                $this->connection->quoteIdentifier($entity::tableName()),
200 12
                $state,
201 12
                $entity->identifier()
202 12
            );
203 12
        } else {
204 24
            $this->connection->insert(
205 24
                $this->connection->quoteIdentifier($entity::tableName()),
206 24
                $entity->state()
207 24
            );
208
        }
209 24
    }
210
211
    /**
212
     * @param string $tableName
213
     * @param array $identifier
214
     * @return array[]
215
     */
216 23
    private function fetchAll($tableName, array $identifier)
217
    {
218 23
        Assert::string($tableName);
219
220 23
        return $this->select('*', $tableName, $identifier)->fetchAll(PDO::FETCH_ASSOC);
221
    }
222
223
    /**
224
     * This method might have been on Connection itself...
225
     *
226
     * @param string $selectExpression
227
     * @param string $tableExpression
228
     * @param array $where
229
     * @return ResultStatement
230
     */
231 23
    private function select($selectExpression, $tableExpression, array $where)
232
    {
233 23
        Assert::string($selectExpression);
234 23
        Assert::string($tableExpression);
235
236 23
        $conditions = [];
237 23
        $values = [];
238 23
        foreach ($where as $columnName => $value) {
239 23
            $conditions[] = $columnName . ' = ?';
240 23
            $values[] = $value;
241 23
        }
242
243
        $sql = 'SELECT ' . $selectExpression
244 23
            . ' FROM ' . $this->connection->quoteIdentifier($tableExpression)
245 23
            . ' WHERE ' . implode(' AND ', $conditions);
246
247 23
        return $this->connection->executeQuery($sql, $values);
248
    }
249
250 24
    private function rememberEntity(Entity $entity)
251
    {
252 24
        $this->knownEntities[spl_object_hash($entity)] = $entity;
253 24
    }
254
255
    /**
256
     * @param Entity $entity
257
     * @return bool
258
     */
259 24
    private function exists(Entity $entity)
260
    {
261 24
        return isset($this->knownEntities[spl_object_hash($entity)]);
262
    }
263
264
    /**
265
     * @param Aggregate $aggregate
266
     * @return Entity[]
267
     */
268 24
    private function deletedChildEntities(Aggregate $aggregate)
269
    {
270 24
        $deletedChildEntities = [];
271
272 24
        foreach ($aggregate->childEntitiesByType() as $type => $childEntities) {
273 22
            $childEntityHashes = array_map(function (ChildEntity $childEntity) {
274 12
                return spl_object_hash($childEntity);
275 22
            }, $childEntities);
276 22
            $knownChildEntityHashes = array_keys(isset($this->knownChildEntities[spl_object_hash($aggregate)][$type]) ? $this->knownChildEntities[spl_object_hash($aggregate)][$type] : []);
277
278 22
            $deletedChildEntityHashes = array_diff($knownChildEntityHashes, $childEntityHashes);
279 22
            foreach ($deletedChildEntityHashes as $hash) {
280 2
                $deletedChildEntities[] = $this->knownEntities[$hash];
281 22
            }
282 24
        }
283
284 24
        return $deletedChildEntities;
285
    }
286
287 24
    private function rememberChildEntities(Aggregate $aggregate, array $childEntitiesByType)
288
    {
289 24
        $this->knownChildEntities[spl_object_hash($aggregate)] = [];
290
291 24
        foreach ($childEntitiesByType as $type => $childEntities) {
292 22
            foreach ($childEntities as $childEntity) {
293 12
                $this->knownChildEntities[spl_object_hash($aggregate)][$type][spl_object_hash($childEntity)] = $childEntity;
294 12
                $this->rememberEntity($childEntity);
295 22
            }
296 24
        }
297 24
    }
298
}
299