Completed
Push — master ( b7ec1d...7c6dec )
by Matthias
12s
created

AggregateRepository::getById()   A

Complexity

Conditions 4
Paths 3

Size

Total Lines 26
Code Lines 14

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 15
CRAP Score 4

Importance

Changes 0
Metric Value
cc 4
eloc 14
nc 3
nop 2
dl 0
loc 26
ccs 15
cts 15
cp 1
crap 4
rs 9.7998
c 0
b 0
f 0
1
<?php
2
3
namespace TalisOrm;
4
5
use Doctrine\DBAL\Connection;
6
use Doctrine\DBAL\Driver\ResultStatement;
7
use InvalidArgumentException;
8
use function is_a;
9
use LogicException;
10
use PDO;
11
use TalisOrm\DomainEvents\EventDispatcher;
12
use Webmozart\Assert\Assert;
13
14
final class AggregateRepository
15
{
16
    /**
17
     * @var Connection
18
     */
19
    private $connection;
20
21
    /**
22
     * @var EventDispatcher
23
     */
24
    private $eventDispatcher;
25
26 22
    public function __construct(Connection $connection, EventDispatcher $eventDispatcher)
27
    {
28 22
        $this->connection = $connection;
29 22
        $this->eventDispatcher = $eventDispatcher;
30 22
    }
31
32
    /**
33
     * @param Aggregate $aggregate
34
     * @throws ConcurrentUpdateOccurred (only when you use optimistic concurrency locking)
35
     * @return void
36
     */
37 21
    public function save(Aggregate $aggregate)
38
    {
39
        $this->connection->transactional(function () use ($aggregate) {
40 21
            $this->insertOrUpdate($aggregate);
41
42 21
            foreach ($aggregate->deletedChildEntities() as $childEntity) {
43 2
                $this->connection->delete(
44 2
                    $this->connection->quoteIdentifier($childEntity::tableName()),
45 2
                    $childEntity->identifier()
46 2
                );
47 21
            }
48
49 21
            foreach ($aggregate->childEntitiesByType() as $type => $childEntities) {
50 20
                foreach ($childEntities as $childEntity) {
51 10
                    $this->insertOrUpdate($childEntity);
52 20
                }
53 21
            }
54 21
        });
55
56 21
        $this->eventDispatcher->dispatch($aggregate->releaseEvents());
57 21
    }
58
59
    /**
60
     * @param string $aggregateClass
61
     * @param AggregateId $aggregateId
62
     * @return Aggregate
63
     */
64 22
    public function getById($aggregateClass, AggregateId $aggregateId)
65
    {
66 22
        Assert::string($aggregateClass);
67
68 22
        if (!is_a($aggregateClass, Aggregate::class, true)) {
69 1
            throw new InvalidArgumentException(sprintf(
70 1
                'Class "%s" has to implement "%s"',
71 1
                $aggregateClass,
72
                Aggregate::class
73 1
            ));
74
        }
75
76 21
        $aggregateState = $this->getAggregateState($aggregateClass, $aggregateId);
77
78 19
        $childEntityStatesByType = $this->getChildEntityStatesByType($aggregateClass, $aggregateId);
79
80 19
        $aggregate = $aggregateClass::fromState($aggregateState, $childEntityStatesByType);
81
82 19
        if (!$aggregate instanceof $aggregateClass || !$aggregate instanceof Aggregate) {
83 1
            throw new LogicException(sprintf(
84 1
                'Method "%s::fromState()" was expected to return an instance of "%1$s"',
85
                $aggregateClass
86 1
            ));
87
        }
88
89 18
        return $aggregate;
90
    }
91
92
    /**
93
     * @param string $aggregateClass
94
     * @param AggregateId $aggregateId
95
     * @return array
96
     */
97 21
    private function getAggregateState($aggregateClass, AggregateId $aggregateId)
98
    {
99 21
        $aggregateStates = $this->fetchAll(
100 21
            $aggregateClass::tableName(),
101 21
            $aggregateClass::identifierForQuery($aggregateId)
102 21
        );
103
104 21
        if (\count($aggregateStates) === 0) {
105 4
            throw new AggregateNotFoundException(sprintf(
106 4
                'Could not find aggregate of type "%s" with id "%s"',
107 4
                $aggregateClass,
108
                $aggregateId
109 4
            ));
110
        }
111
112 19
        $aggregateState = reset($aggregateStates);
113
114 19
        Assert::isArray($aggregateState);
115
116 19
        return $aggregateState;
117
    }
118
119
    /**
120
     * @param string $aggregateClass
121
     * @param AggregateId $aggregateId
122
     * @return array[]
123
     */
124 19
    private function getChildEntityStatesByType($aggregateClass, AggregateId $aggregateId)
125
    {
126 19
        $childEntityStatesByType = [];
127
128 19
        foreach ($aggregateClass::childEntityTypes() as $childEntityType) {
129 18
            $childEntityStatesByType = $this->fetchAll(
130 18
                $childEntityType::tableName(),
131 18
                $childEntityType::identifierForQuery($aggregateId)
132 18
            );
133
134 18
            $childEntityStatesByType[$childEntityType] = $childEntityStatesByType;
135 19
        }
136
137 19
        return $childEntityStatesByType;
138
    }
139
140
    public function delete(Aggregate $aggregate)
141
    {
142 4
        $this->connection->transactional(function () use ($aggregate) {
143 4
            $this->connection->delete(
144 4
                $this->connection->quoteIdentifier($aggregate::tableName()),
145 4
                $aggregate->identifier()
146 4
            );
147
148 4
            foreach ($aggregate->childEntitiesByType() as $type => $childEntities) {
149 4
                foreach ($childEntities as $childEntity) {
150
                    /** @var ChildEntity $childEntity */
151 2
                    $this->connection->delete(
152 2
                        $this->connection->quoteIdentifier($childEntity::tableName()),
153 2
                        $childEntity->identifier()
154 2
                    );
155 4
                }
156 4
            }
157 4
        });
158 4
    }
159
160 21
    private function insertOrUpdate(Entity $entity)
161
    {
162 21
        if ($this->exists($entity::tableName(), $entity->identifier())) {
163 10
            $state = $entity->state();
164 10
            if (array_key_exists(Aggregate::VERSION_COLUMN, $state)) {
165 10
                $aggregateVersion = $state[Aggregate::VERSION_COLUMN];
166 10
                $aggregateVersionInDb = (int)$this->select(
167 10
                    Aggregate::VERSION_COLUMN,
168 10
                    $entity->tableName(),
169 10
                    $entity->identifier()
170 10
                )->fetchColumn();
171 10
                if ($aggregateVersionInDb >= $aggregateVersion) {
172 4
                    throw ConcurrentUpdateOccurred::ofEntity($entity);
173
                }
174 10
            }
175 10
            $this->connection->update(
176 10
                $this->connection->quoteIdentifier($entity::tableName()),
177 10
                $state,
178 10
                $entity->identifier()
179 10
            );
180 10
        } else {
181 21
            $this->connection->insert(
182 21
                $this->connection->quoteIdentifier($entity::tableName()),
183 21
                $entity->state()
184 21
            );
185
        }
186 21
    }
187
188
    /**
189
     * @param string $tableName
190
     * @param array $identifier
191
     * @return bool
192
     */
193 21
    private function exists($tableName, array $identifier)
194
    {
195 21
        Assert::string($tableName);
196
197 21
        $count = $this->select('COUNT(*)', $tableName, $identifier)->fetchColumn();
198
199 21
        return (int)$count > 0;
200
    }
201
202
    /**
203
     * @param string $tableName
204
     * @param array $identifier
205
     * @return array[]
206
     */
207 21
    private function fetchAll($tableName, array $identifier)
208
    {
209 21
        Assert::string($tableName);
210
211 21
        return $this->select('*', $tableName, $identifier)->fetchAll(PDO::FETCH_ASSOC);
212
    }
213
214
    /**
215
     * This method might have been on Connection itself...
216
     *
217
     * @param string $selectExpression
218
     * @param string $tableExpression
219
     * @param array $where
220
     * @return ResultStatement
221
     */
222 21
    private function select($selectExpression, $tableExpression, array $where)
223
    {
224 21
        Assert::string($selectExpression);
225 21
        Assert::string($tableExpression);
226
227 21
        $conditions = [];
228 21
        $values = [];
229 21
        foreach ($where as $columnName => $value) {
230 21
            $conditions[] = $columnName . ' = ?';
231 21
            $values[] = $value;
232 21
        }
233
234
        $sql = 'SELECT ' . $selectExpression
235 21
            . ' FROM ' . $this->connection->quoteIdentifier($tableExpression)
236 21
            . ' WHERE ' . implode(' AND ', $conditions);
237
238 21
        return $this->connection->executeQuery($sql, $values);
239
    }
240
}
241