Completed
Push — master ( 7c6dec...eed99e )
by Matthias
11s
created

AggregateRepository::getChildEntitiesByType()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 19
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 13
CRAP Score 2

Importance

Changes 0
Metric Value
cc 2
eloc 10
nc 2
nop 2
dl 0
loc 19
ccs 13
cts 13
cp 1
crap 2
rs 9.9332
c 0
b 0
f 0
1
<?php
2
3
namespace TalisOrm;
4
5
use Doctrine\DBAL\Connection;
6
use Doctrine\DBAL\Driver\ResultStatement;
7
use InvalidArgumentException;
8
use function is_a;
9
use LogicException;
10
use PDO;
11
use TalisOrm\DomainEvents\EventDispatcher;
12
use Webmozart\Assert\Assert;
13
14
final class AggregateRepository
15
{
16
    /**
17
     * @var Connection
18
     */
19
    private $connection;
20
21
    /**
22
     * @var EventDispatcher
23
     */
24
    private $eventDispatcher;
25
26 24
    public function __construct(Connection $connection, EventDispatcher $eventDispatcher)
27
    {
28 24
        $this->connection = $connection;
29 24
        $this->eventDispatcher = $eventDispatcher;
30 24
    }
31
32
    /**
33
     * @param Aggregate $aggregate
34
     * @throws ConcurrentUpdateOccurred (only when you use optimistic concurrency locking)
35
     * @return void
36
     */
37 23
    public function save(Aggregate $aggregate)
38
    {
39
        $this->connection->transactional(function () use ($aggregate) {
40 23
            $this->insertOrUpdate($aggregate);
41
42 23
            foreach ($aggregate->deletedChildEntities() as $childEntity) {
43 2
                $this->connection->delete(
44 2
                    $this->connection->quoteIdentifier($childEntity::tableName()),
45 2
                    $childEntity->identifier()
46 2
                );
47 23
            }
48
49 23
            foreach ($aggregate->childEntitiesByType() as $type => $childEntities) {
50 22
                foreach ($childEntities as $childEntity) {
51 10
                    $this->insertOrUpdate($childEntity);
52 22
                }
53 23
            }
54 23
        });
55
56 23
        $this->eventDispatcher->dispatch($aggregate->releaseEvents());
57 23
    }
58
59
    /**
60
     * @param string $aggregateClass
61
     * @param AggregateId $aggregateId
62
     * @return Aggregate
63
     */
64 22
    public function getById($aggregateClass, AggregateId $aggregateId)
65
    {
66 22
        Assert::string($aggregateClass);
67
68 22
        if (!is_a($aggregateClass, Aggregate::class, true)) {
69 1
            throw new InvalidArgumentException(sprintf(
70 1
                'Class "%s" has to implement "%s"',
71 1
                $aggregateClass,
72
                Aggregate::class
73 1
            ));
74
        }
75
76 21
        $aggregateState = $this->getAggregateState($aggregateClass, $aggregateId);
77
78 19
        $childEntitiesByType = $this->getChildEntitiesByType($aggregateClass, $aggregateId);
79
80 19
        $aggregate = $aggregateClass::fromState($aggregateState, $childEntitiesByType);
81
82 19
        if (!$aggregate instanceof $aggregateClass || !$aggregate instanceof Aggregate) {
83 1
            throw new LogicException(sprintf(
84 1
                'Method "%s::fromState()" was expected to return an instance of "%1$s"',
85
                $aggregateClass
86 1
            ));
87
        }
88
89 18
        return $aggregate;
90
    }
91
92
    /**
93
     * @param string $aggregateClass
94
     * @param AggregateId $aggregateId
95
     * @return array
96
     */
97 21
    private function getAggregateState($aggregateClass, AggregateId $aggregateId)
98
    {
99 21
        $aggregateStates = $this->fetchAll(
100 21
            $aggregateClass::tableName(),
101 21
            $aggregateClass::identifierForQuery($aggregateId)
102 21
        );
103
104 21
        if (\count($aggregateStates) === 0) {
105 4
            throw new AggregateNotFoundException(sprintf(
106 4
                'Could not find aggregate of type "%s" with id "%s"',
107 4
                $aggregateClass,
108
                $aggregateId
109 4
            ));
110
        }
111
112 19
        $aggregateState = reset($aggregateStates);
113
114 19
        Assert::isArray($aggregateState);
115
116 19
        return $aggregateState;
117
    }
118
119
    /**
120
     * @param string $aggregateClass
121
     * @param AggregateId $aggregateId
122
     * @return array[]
123
     */
124 19
    private function getChildEntitiesByType($aggregateClass, AggregateId $aggregateId)
125
    {
126 19
        $childEntitiesByType = [];
127
128 19
        foreach ($aggregateClass::childEntityTypes() as $childEntityType) {
129 18
            $childEntityStates = $this->fetchAll(
130 18
                $childEntityType::tableName(),
131 18
                $childEntityType::identifierForQuery($aggregateId)
132 18
            );
133
134 18
            $childEntitiesByType[$childEntityType] = array_map(
135
                function (array $childEntityState) use ($childEntityType) {
136 8
                    return $childEntityType::fromState($childEntityState);
137 18
                },
138
                $childEntityStates
139 18
            );
140 19
        }
141
142 19
        return $childEntitiesByType;
143
    }
144
145
    public function delete(Aggregate $aggregate)
146
    {
147 4
        $this->connection->transactional(function () use ($aggregate) {
148 4
            $this->connection->delete(
149 4
                $this->connection->quoteIdentifier($aggregate::tableName()),
150 4
                $aggregate->identifier()
151 4
            );
152
153 4
            foreach ($aggregate->childEntitiesByType() as $type => $childEntities) {
154 4
                foreach ($childEntities as $childEntity) {
155
                    /** @var ChildEntity $childEntity */
156 2
                    $this->connection->delete(
157 2
                        $this->connection->quoteIdentifier($childEntity::tableName()),
158 2
                        $childEntity->identifier()
159 2
                    );
160 4
                }
161 4
            }
162 4
        });
163 4
    }
164
165 23
    private function insertOrUpdate(Entity $entity)
166
    {
167 23
        if ($entity->isNew()) {
168 23
            $this->connection->insert(
169 23
                $this->connection->quoteIdentifier($entity::tableName()),
170 23
                $entity->state()
171 23
            );
172 23
        } else {
173 10
            $state = $entity->state();
174 10
            if (array_key_exists(Aggregate::VERSION_COLUMN, $state)) {
175 10
                $aggregateVersion = $state[Aggregate::VERSION_COLUMN];
176 10
                $aggregateVersionInDb = (int)$this->select(
177 10
                    Aggregate::VERSION_COLUMN,
178 10
                    $entity->tableName(),
179 10
                    $entity->identifier()
180 10
                )->fetchColumn();
181 10
                if ($aggregateVersionInDb >= $aggregateVersion) {
182 4
                    throw ConcurrentUpdateOccurred::ofEntity($entity);
183
                }
184 10
            }
185 10
            $this->connection->update(
186 10
                $this->connection->quoteIdentifier($entity::tableName()),
187 10
                $state,
188 10
                $entity->identifier()
189 10
            );
190
        }
191 23
    }
192
193
    /**
194
     * @param string $tableName
195
     * @param array $identifier
196
     * @return array[]
197
     */
198 21
    private function fetchAll($tableName, array $identifier)
199
    {
200 21
        Assert::string($tableName);
201
202 21
        return $this->select('*', $tableName, $identifier)->fetchAll(PDO::FETCH_ASSOC);
203
    }
204
205
    /**
206
     * This method might have been on Connection itself...
207
     *
208
     * @param string $selectExpression
209
     * @param string $tableExpression
210
     * @param array $where
211
     * @return ResultStatement
212
     */
213 21
    private function select($selectExpression, $tableExpression, array $where)
214
    {
215 21
        Assert::string($selectExpression);
216 21
        Assert::string($tableExpression);
217
218 21
        $conditions = [];
219 21
        $values = [];
220 21
        foreach ($where as $columnName => $value) {
221 21
            $conditions[] = $columnName . ' = ?';
222 21
            $values[] = $value;
223 21
        }
224
225
        $sql = 'SELECT ' . $selectExpression
226 21
            . ' FROM ' . $this->connection->quoteIdentifier($tableExpression)
227 21
            . ' WHERE ' . implode(' AND ', $conditions);
228
229 21
        return $this->connection->executeQuery($sql, $values);
230
    }
231
}
232