Passed
Pull Request — master (#20)
by Matthias
07:01
created

AggregateRepository::select()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 17
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 12
CRAP Score 2

Importance

Changes 0
Metric Value
cc 2
eloc 11
nc 2
nop 3
dl 0
loc 17
ccs 12
cts 12
cp 1
crap 2
rs 9.9
c 0
b 0
f 0
1
<?php
2
3
namespace TalisOrm;
4
5
use function array_diff;
6
use Doctrine\DBAL\Connection;
7
use Doctrine\DBAL\Driver\ResultStatement;
8
use InvalidArgumentException;
9
use function is_a;
10
use LogicException;
11
use PDO;
12
use function spl_object_hash;
13
use TalisOrm\DomainEvents\EventDispatcher;
14
use Webmozart\Assert\Assert;
15
16
final class AggregateRepository
17
{
18
    /**
19
     * @var Connection
20
     */
21
    private $connection;
22
23
    /**
24
     * @var EventDispatcher
25
     */
26
    private $eventDispatcher;
27
28
    /**
29
     * @var Entity[]
30
     */
31
    private $knownEntities = [];
32
33
    /**
34
     * @var array
35
     */
36
    private $knownChildEntities = [];
37
38 27
    public function __construct(Connection $connection, EventDispatcher $eventDispatcher)
39
    {
40 27
        $this->connection = $connection;
41 27
        $this->eventDispatcher = $eventDispatcher;
42 27
    }
43
44
    /**
45
     * @param Aggregate $aggregate
46
     * @throws ConcurrentUpdateOccurred (only when you use optimistic concurrency locking)
47
     * @return void
48
     */
49 26
    public function save(Aggregate $aggregate)
50
    {
51
        $this->connection->transactional(function () use ($aggregate) {
52 26
            $this->insertOrUpdate($aggregate);
53 26
            $this->rememberEntity($aggregate);
54
55 26
            foreach ($this->deletedChildEntities($aggregate) as $childEntity) {
56 2
                $this->connection->delete(
57 2
                    $this->connection->quoteIdentifier($childEntity::tableName()),
58 2
                    $childEntity->identifier()
59 2
                );
60 26
            }
61
62 26
            $childEntitiesByType = $aggregate->childEntitiesByType();
63 26
            foreach ($childEntitiesByType as $type => $childEntities) {
64 24
                foreach ($childEntities as $childEntity) {
65 12
                    $this->insertOrUpdate($childEntity);
66 24
                }
67 26
            }
68 26
            $this->rememberChildEntities($aggregate, $childEntitiesByType);
69 26
        });
70
71 26
        $this->eventDispatcher->dispatch($aggregate->releaseEvents());
72 26
    }
73
74
    /**
75
     * @param string $aggregateClass
76
     * @param AggregateId $aggregateId
77
     * @return Aggregate
78
     */
79 26
    public function getById($aggregateClass, AggregateId $aggregateId)
80
    {
81 26
        Assert::string($aggregateClass);
82
83 26
        if (!is_a($aggregateClass, Aggregate::class, true)) {
84 1
            throw new InvalidArgumentException(sprintf(
85 1
                'Class "%s" has to implement "%s"',
86 1
                $aggregateClass,
87
                Aggregate::class
88 1
            ));
89
        }
90
91 25
        $aggregateState = $this->getAggregateState($aggregateClass, $aggregateId);
92
93 23
        $childEntitiesByType = $this->getChildEntitiesByType($aggregateClass, $aggregateId);
94
95 23
        $aggregate = $aggregateClass::fromState($aggregateState, $childEntitiesByType);
96
97 23
        if (!$aggregate instanceof $aggregateClass || !$aggregate instanceof Aggregate) {
98 1
            throw new LogicException(sprintf(
99 1
                'Method "%s::fromState()" was expected to return an instance of "%1$s"',
100
                $aggregateClass
101 1
            ));
102
        }
103
104 22
        $this->rememberEntity($aggregate);
105 22
        $this->rememberChildEntities($aggregate, $childEntitiesByType);
106
107 22
        return $aggregate;
108
    }
109
110
    /**
111
     * @param string $aggregateClass
112
     * @param AggregateId $aggregateId
113
     * @return array
114
     */
115 25
    private function getAggregateState($aggregateClass, AggregateId $aggregateId)
116
    {
117 25
        $aggregateStates = $this->fetchAll(
118 25
            $aggregateClass::tableName(),
119 25
            $aggregateClass::identifierForQuery($aggregateId)
120 25
        );
121
122 25
        if (\count($aggregateStates) === 0) {
123 4
            throw new AggregateNotFoundException(sprintf(
124 4
                'Could not find aggregate of type "%s" with id "%s"',
125 4
                $aggregateClass,
126
                $aggregateId
127 4
            ));
128
        }
129
130 23
        $aggregateState = reset($aggregateStates);
131
132 23
        Assert::isArray($aggregateState);
133
134 23
        return $aggregateState;
135
    }
136
137
    /**
138
     * @param string $aggregateClass
139
     * @param AggregateId $aggregateId
140
     * @return array[]
141
     */
142 23
    private function getChildEntitiesByType($aggregateClass, AggregateId $aggregateId)
143
    {
144 23
        $childEntitiesByType = [];
145
146 23
        foreach ($aggregateClass::childEntityTypes() as $childEntityType) {
147 22
            $childEntityStates = $this->fetchAll(
148 22
                $childEntityType::tableName(),
149 22
                $childEntityType::identifierForQuery($aggregateId)
150 22
            );
151
152 22
            $childEntitiesByType[$childEntityType] = array_map(
153
                function (array $childEntityState) use ($childEntityType) {
154 10
                    return $childEntityType::fromState($childEntityState);
155 22
                },
156
                $childEntityStates
157 22
            );
158 23
        }
159
160 23
        return $childEntitiesByType;
161
    }
162
163 6
    public function delete(Aggregate $aggregate)
164
    {
165
        $this->connection->transactional(function () use ($aggregate) {
166 6
            $this->connection->delete(
167 6
                $this->connection->quoteIdentifier($aggregate::tableName()),
168 6
                $aggregate->identifier()
169 6
            );
170
171 6
            foreach ($aggregate->childEntitiesByType() as $type => $childEntities) {
172 6
                foreach ($childEntities as $childEntity) {
173
                    /** @var ChildEntity $childEntity */
174 2
                    $this->connection->delete(
175 2
                        $this->connection->quoteIdentifier($childEntity::tableName()),
176 2
                        $childEntity->identifier()
177 2
                    );
178 6
                }
179 6
            }
180
181 6
            $this->forgetAggregate($aggregate);
182 6
        });
183 6
    }
184
185 26
    private function insertOrUpdate(Entity $entity)
186
    {
187 26
        if ($this->exists($entity)) {
188 12
            $state = $entity->state();
189 12
            if (array_key_exists(Aggregate::VERSION_COLUMN, $state)) {
190 12
                $aggregateVersion = $state[Aggregate::VERSION_COLUMN];
191 12
                $aggregateVersionInDb = (int)$this->select(
192 12
                    Aggregate::VERSION_COLUMN,
193 12
                    $entity->tableName(),
194 12
                    $entity->identifier()
195 12
                )->fetchColumn();
196 12
                if ($aggregateVersionInDb >= $aggregateVersion) {
197 4
                    throw ConcurrentUpdateOccurred::ofEntity($entity);
198
                }
199 12
            }
200 12
            $this->connection->update(
201 12
                $this->connection->quoteIdentifier($entity::tableName()),
202 12
                $state,
203 12
                $entity->identifier()
204 12
            );
205 12
        } else {
206 26
            $this->connection->insert(
207 26
                $this->connection->quoteIdentifier($entity::tableName()),
208 26
                $entity->state()
209 26
            );
210
        }
211 26
    }
212
213
    /**
214
     * @param string $tableName
215
     * @param array $identifier
216
     * @return array[]
217
     */
218 25
    private function fetchAll($tableName, array $identifier)
219
    {
220 25
        Assert::string($tableName);
221
222 25
        return $this->select('*', $tableName, $identifier)->fetchAll(PDO::FETCH_ASSOC);
223
    }
224
225
    /**
226
     * This method might have been on Connection itself...
227
     *
228
     * @param string $selectExpression
229
     * @param string $tableExpression
230
     * @param array $where
231
     * @return ResultStatement
232
     */
233 25
    private function select($selectExpression, $tableExpression, array $where)
234
    {
235 25
        Assert::string($selectExpression);
236 25
        Assert::string($tableExpression);
237
238 25
        $conditions = [];
239 25
        $values = [];
240 25
        foreach ($where as $columnName => $value) {
241 25
            $conditions[] = $columnName . ' = ?';
242 25
            $values[] = $value;
243 25
        }
244
245
        $sql = 'SELECT ' . $selectExpression
246 25
            . ' FROM ' . $this->connection->quoteIdentifier($tableExpression)
247 25
            . ' WHERE ' . implode(' AND ', $conditions);
248
249 25
        return $this->connection->executeQuery($sql, $values);
250
    }
251
252 26
    private function rememberEntity(Entity $entity)
253
    {
254 26
        $this->knownEntities[spl_object_hash($entity)] = $entity;
255 26
    }
256
257
    /**
258
     * @param Entity $entity
259
     * @return bool
260
     */
261 26
    private function exists(Entity $entity)
262
    {
263 26
        return isset($this->knownEntities[spl_object_hash($entity)]);
264
    }
265
266
    /**
267
     * @param Aggregate $aggregate
268
     * @return Entity[]
269
     */
270 26
    private function deletedChildEntities(Aggregate $aggregate)
271
    {
272 26
        $deletedChildEntities = [];
273
274 26
        foreach ($aggregate->childEntitiesByType() as $type => $childEntities) {
275 24
            $childEntityHashes = array_map(function (ChildEntity $childEntity) {
276 12
                return spl_object_hash($childEntity);
277 24
            }, $childEntities);
278 24
            $knownChildEntityHashes = array_keys(isset($this->knownChildEntities[spl_object_hash($aggregate)][$type]) ? $this->knownChildEntities[spl_object_hash($aggregate)][$type] : []);
279
280 24
            $deletedChildEntityHashes = array_diff($knownChildEntityHashes, $childEntityHashes);
281 24
            foreach ($deletedChildEntityHashes as $hash) {
282 2
                $deletedChildEntities[] = $this->knownEntities[$hash];
283 24
            }
284 26
        }
285
286 26
        return $deletedChildEntities;
287
    }
288
289 26
    private function rememberChildEntities(Aggregate $aggregate, array $childEntitiesByType)
290
    {
291 26
        $this->knownChildEntities[spl_object_hash($aggregate)] = [];
292
293 26
        foreach ($childEntitiesByType as $type => $childEntities) {
294 24
            foreach ($childEntities as $childEntity) {
295 12
                $this->knownChildEntities[spl_object_hash($aggregate)][$type][spl_object_hash($childEntity)] = $childEntity;
296 12
                $this->rememberEntity($childEntity);
297 24
            }
298 26
        }
299 26
    }
300
301 6
    private function forgetAggregate(Aggregate $aggregate)
302
    {
303
        unset(
304 6
            $this->knownEntities[spl_object_hash($aggregate)],
305 6
            $this->knownChildEntities[spl_object_hash($aggregate)]
306
        );
307 6
    }
308
}
309