AggregateRepository::getChildEntitiesByType()   A
last analyzed

Complexity

Conditions 2
Paths 2

Size

Total Lines 26
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 13
CRAP Score 2

Importance

Changes 0
Metric Value
cc 2
eloc 12
nc 2
nop 3
dl 0
loc 26
ccs 13
cts 13
cp 1
crap 2
rs 9.8666
c 0
b 0
f 0
1
<?php
2
3
namespace TalisOrm;
4
5
use Doctrine\DBAL\Connection;
6
use Doctrine\DBAL\Driver\ResultStatement;
7
use InvalidArgumentException;
8
use function is_a;
9
use LogicException;
10
use PDO;
11
use TalisOrm\DomainEvents\EventDispatcher;
12
use Webmozart\Assert\Assert;
13
14
/**
15
 * @phpstan-template T of Aggregate
16
 */
17
final class AggregateRepository
18
{
19
    /**
20
     * @var Connection
21
     */
22
    private $connection;
23
24
    /**
25
     * @var EventDispatcher
26
     */
27
    private $eventDispatcher;
28
29 28
    public function __construct(Connection $connection, EventDispatcher $eventDispatcher)
30
    {
31 28
        $this->connection = $connection;
32 28
        $this->eventDispatcher = $eventDispatcher;
33 28
    }
34
35
    /**
36
     * @throws ConcurrentUpdateOccurred (only when you use optimistic concurrency locking)
37
     *
38
     * @phpstan-param T $aggregate
39
     */
40 27
    public function save(Aggregate $aggregate): void
41
    {
42
        $this->connection->transactional(function () use ($aggregate) {
43 27
            $persistedEntities = [];
44
45 27
            $this->insertOrUpdate($aggregate);
46 27
            $persistedEntities[] = $aggregate;
47
48 27
            foreach ($aggregate->deletedChildEntities() as $childEntity) {
49 2
                $this->connection->delete(
50 2
                    $this->connection->quoteIdentifier($childEntity::tableName()),
51 2
                    $childEntity->identifier()
52
                );
53
            }
54
55 27
            foreach ($aggregate->childEntitiesByType() as $type => $childEntities) {
56 26
                foreach ($childEntities as $childEntity) {
57 14
                    $this->insertOrUpdate($childEntity);
58 14
                    $persistedEntities[] = $childEntity;
59
                }
60
            }
61
62 25
            foreach ($persistedEntities as $persistedObject) {
63 25
                $persistedObject->markAsPersisted();
64
            }
65 27
        });
66
67 25
        $this->eventDispatcher->dispatch($aggregate->releaseEvents());
68 25
    }
69
70
    /**
71
     * @param array<string, mixed> $extraState
72
     * @return mixed|Aggregate
73
     *
74
     * @phpstan-param class-string<T> $aggregateClass
75
     * @phpstan-return T
76
     */
77 24
    public function getById(string $aggregateClass, AggregateId $aggregateId, array $extraState = []): Aggregate
78
    {
79 24
        if (!is_a($aggregateClass, Aggregate::class, true)) {
80 1
            throw new InvalidArgumentException(sprintf(
81 1
                'Class "%s" has to implement "%s"',
82 1
                $aggregateClass,
83 1
                Aggregate::class
84
            ));
85
        }
86
87 23
        $aggregateState = $this->getAggregateState($aggregateClass, $aggregateId);
88 21
        $aggregateState = array_merge($aggregateState, $extraState);
89
90 21
        $childEntitiesByType = $this->getChildEntitiesByType($aggregateClass, $aggregateId, $aggregateState);
91
92 21
        $aggregate = $aggregateClass::fromState($aggregateState, $childEntitiesByType);
93
94 21
        if (!$aggregate instanceof $aggregateClass || !$aggregate instanceof Aggregate) {
95 1
            throw new LogicException(sprintf(
96 1
                'Method "%s::fromState()" was expected to return an instance of "%1$s"',
97 1
                $aggregateClass
98
            ));
99
        }
100
101 20
        $aggregate->markAsPersisted();
102
103 20
        return $aggregate;
104
    }
105
106
    /**
107
     * @return array<string, mixed>
108
     *
109
     * @phpstan-param class-string<T> $aggregateClass
110
     */
111 23
    private function getAggregateState(string $aggregateClass, AggregateId $aggregateId): array
112
    {
113 23
        $aggregateStates = $this->fetchAll(
114 23
            $aggregateClass::tableName(),
115 23
            $aggregateClass::identifierForQuery($aggregateId)
116
        );
117
118 23
        if (\count($aggregateStates) === 0) {
119 4
            throw new AggregateNotFoundException(sprintf(
120 4
                'Could not find aggregate of type "%s" with id "%s"',
121 4
                $aggregateClass,
122 4
                $aggregateId
123
            ));
124
        }
125
126 21
        $aggregateState = reset($aggregateStates);
127
128 21
        Assert::isArray($aggregateState);
129
130 21
        return $aggregateState;
131
    }
132
133
    /**
134
     * @param array<string, mixed> $aggregateState
135
     * @return array<string, mixed[]>
136
     *
137
     * @phpstan-param class-string<T> $aggregateClass
138
     * @phpstan-return array<class-string<ChildEntity>, mixed[]>
139
     */
140 21
    private function getChildEntitiesByType(
141
        string $aggregateClass,
142
        AggregateId $aggregateId,
143
        array $aggregateState
144
    ): array {
145 21
        $childEntitiesByType = [];
146
147 21
        foreach ($aggregateClass::childEntityTypes() as $childEntityType) {
148 20
            $childEntityStates = $this->fetchAll(
149 20
                $childEntityType::tableName(),
150 20
                $childEntityType::identifierForQuery($aggregateId)
151
            );
152
153 20
            $childEntitiesByType[$childEntityType] = array_map(
154
                function (array $childEntityState) use ($childEntityType, $aggregateState) {
155 10
                    $childEntity = $childEntityType::fromState($childEntityState, $aggregateState);
156
157 10
                    $childEntity->markAsPersisted();
158
159 10
                    return $childEntity;
160 20
                },
161 20
                $childEntityStates
162
            );
163
        }
164
165 21
        return $childEntitiesByType;
166
    }
167
168
    /**
169
     * @phpstan-param T $aggregate
170
     */
171 4
    public function delete(Aggregate $aggregate): void
172
    {
173
        $this->connection->transactional(function () use ($aggregate) {
174 4
            $this->connection->delete(
175 4
                $this->connection->quoteIdentifier($aggregate::tableName()),
176 4
                $aggregate->identifier()
177
            );
178
179 4
            foreach ($aggregate->childEntitiesByType() as $type => $childEntities) {
180 4
                foreach ($childEntities as $childEntity) {
181 2
                    $this->connection->delete(
182 2
                        $this->connection->quoteIdentifier($childEntity::tableName()),
183 2
                        $childEntity->identifier()
184
                    );
185
                }
186
            }
187 4
        });
188 4
    }
189
190 27
    private function insertOrUpdate(Entity $entity): void
191
    {
192 27
        if ($entity->isNew()) {
193 27
            $this->connection->insert(
194 27
                $this->connection->quoteIdentifier($entity::tableName()),
195 27
                $entity->state()
196
            );
197
        } else {
198 10
            $state = $entity->state();
199 10
            if (array_key_exists(Aggregate::VERSION_COLUMN, $state)) {
200 10
                $aggregateVersion = $state[Aggregate::VERSION_COLUMN];
201 10
                $aggregateVersionInDb = (int)$this->select(
202 10
                    Aggregate::VERSION_COLUMN,
203 10
                    $entity->tableName(),
204 10
                    $entity->identifier()
205 10
                )->fetchColumn();
206 10
                if ($aggregateVersionInDb >= $aggregateVersion) {
207 4
                    throw ConcurrentUpdateOccurred::ofEntity($entity);
208
                }
209
            }
210 10
            $this->connection->update(
211 10
                $this->connection->quoteIdentifier($entity::tableName()),
212 10
                $state,
213 10
                $entity->identifier()
214
            );
215
        }
216 27
    }
217
218
    /**
219
     * @param array<string, mixed> $identifier
220
     * @return array<int, array<string, mixed>>
221
     */
222 23
    private function fetchAll(string $tableName, array $identifier): array
223
    {
224 23
        Assert::string($tableName);
225
226 23
        return $this->select('*', $tableName, $identifier)->fetchAll(PDO::FETCH_ASSOC);
227
    }
228
229
    /**
230
     * This method might have been on Connection itself...
231
     *
232
     * @param array<string, mixed> $where
233
     * @return ResultStatement<mixed>
234
     */
235 23
    private function select(string $selectExpression, string $tableExpression, array $where): ResultStatement
236
    {
237 23
        Assert::string($selectExpression);
238 23
        Assert::string($tableExpression);
239
240 23
        $conditions = [];
241 23
        $values = [];
242 23
        foreach ($where as $columnName => $value) {
243 23
            $conditions[] = $columnName . ' = ?';
244 23
            $values[] = $value;
245
        }
246
247 23
        $sql = 'SELECT ' . $selectExpression
248 23
            . ' FROM ' . $this->connection->quoteIdentifier($tableExpression)
249 23
            . ' WHERE ' . implode(' AND ', $conditions);
250
251 23
        return $this->connection->executeQuery($sql, $values);
252
    }
253
}
254