1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace CultuurNet\UDB3\EventSourcing\DBAL; |
4
|
|
|
|
5
|
|
|
use Broadway\Domain\DateTime; |
6
|
|
|
use Broadway\Domain\DomainEventStream; |
7
|
|
|
use Broadway\Domain\DomainEventStreamInterface; |
8
|
|
|
use Broadway\Domain\DomainMessage; |
9
|
|
|
use Broadway\EventStore\DBALEventStoreException; |
10
|
|
|
use Broadway\EventStore\EventStoreInterface; |
11
|
|
|
use Broadway\EventStore\EventStreamNotFoundException; |
12
|
|
|
use Broadway\Serializer\SerializerInterface; |
13
|
|
|
use Doctrine\DBAL\Connection; |
14
|
|
|
use Doctrine\DBAL\DBALException; |
15
|
|
|
use Doctrine\DBAL\Schema\Schema; |
16
|
|
|
use Doctrine\DBAL\Schema\Table; |
17
|
|
|
|
18
|
|
|
/** |
19
|
|
|
* Event store making use of Doctrine DBAL and aware of the aggregate type. |
20
|
|
|
* |
21
|
|
|
* Based on Broadways DBALEventStore. |
22
|
|
|
*/ |
23
|
|
|
class AggregateAwareDBALEventStore implements EventStoreInterface |
24
|
|
|
{ |
25
|
|
|
/** |
26
|
|
|
* @var Connection |
27
|
|
|
*/ |
28
|
|
|
private $connection; |
29
|
|
|
|
30
|
|
|
/** |
31
|
|
|
* @var SerializerInterface |
32
|
|
|
*/ |
33
|
|
|
private $payloadSerializer; |
34
|
|
|
|
35
|
|
|
/** |
36
|
|
|
* @var SerializerInterface |
37
|
|
|
*/ |
38
|
|
|
private $metadataSerializer; |
39
|
|
|
|
40
|
|
|
/** |
41
|
|
|
* @var null |
42
|
|
|
*/ |
43
|
|
|
private $loadStatement = null; |
44
|
|
|
|
45
|
|
|
/** |
46
|
|
|
* @var string |
47
|
|
|
*/ |
48
|
|
|
private $tableName; |
49
|
|
|
|
50
|
|
|
/** |
51
|
|
|
* @var string |
52
|
|
|
*/ |
53
|
|
|
private $aggregateType; |
54
|
|
|
|
55
|
|
|
/** |
56
|
|
|
* @param Connection $connection |
57
|
|
|
* @param SerializerInterface $payloadSerializer |
58
|
|
|
* @param SerializerInterface $metadataSerializer |
59
|
|
|
* @param string $tableName |
60
|
|
|
* @param mixed $aggregateType |
61
|
|
|
*/ |
62
|
|
View Code Duplication |
public function __construct( |
|
|
|
|
63
|
|
|
Connection $connection, |
64
|
|
|
SerializerInterface $payloadSerializer, |
65
|
|
|
SerializerInterface $metadataSerializer, |
66
|
|
|
$tableName, |
67
|
|
|
$aggregateType |
68
|
|
|
) { |
69
|
|
|
$this->connection = $connection; |
70
|
|
|
$this->payloadSerializer = $payloadSerializer; |
71
|
|
|
$this->metadataSerializer = $metadataSerializer; |
72
|
|
|
$this->tableName = $tableName; |
73
|
|
|
$this->aggregateType = (string) $aggregateType; |
74
|
|
|
} |
75
|
|
|
|
76
|
|
|
/** |
77
|
|
|
* {@inheritDoc} |
78
|
|
|
*/ |
79
|
|
|
public function load($id) |
80
|
|
|
{ |
81
|
|
|
$statement = $this->prepareLoadStatement(); |
82
|
|
|
$statement->bindValue('uuid', $id); |
83
|
|
|
$statement->execute(); |
84
|
|
|
|
85
|
|
|
$events = array(); |
86
|
|
|
while ($row = $statement->fetch()) { |
87
|
|
|
$events[] = $this->deserializeEvent($row); |
88
|
|
|
} |
89
|
|
|
|
90
|
|
|
if (empty($events)) { |
91
|
|
|
throw new EventStreamNotFoundException(sprintf('EventStream not found for aggregate with id %s', $id)); |
92
|
|
|
} |
93
|
|
|
|
94
|
|
|
return new DomainEventStream($events); |
95
|
|
|
} |
96
|
|
|
|
97
|
|
|
/** |
98
|
|
|
* {@inheritDoc} |
99
|
|
|
*/ |
100
|
|
|
public function append($id, DomainEventStreamInterface $eventStream) |
101
|
|
|
{ |
102
|
|
|
// The original Broadway DBALEventStore implementation did only check |
103
|
|
|
// the type of $id. It is better to test all UUIDs inside the event |
104
|
|
|
// stream. |
105
|
|
|
$this->guardStream($eventStream); |
|
|
|
|
106
|
|
|
|
107
|
|
|
// Make the transaction more robust by using the transactional statement. |
108
|
|
|
$this->connection->transactional(function (Connection $connection) use ($eventStream) { |
109
|
|
|
try { |
110
|
|
|
foreach ($eventStream as $domainMessage) { |
111
|
|
|
$this->insertMessage($connection, $domainMessage); |
112
|
|
|
} |
113
|
|
|
} catch (DBALException $exception) { |
114
|
|
|
throw DBALEventStoreException::create($exception); |
115
|
|
|
} |
116
|
|
|
}); |
117
|
|
|
} |
118
|
|
|
|
119
|
|
|
/** |
120
|
|
|
* @param Connection $connection |
121
|
|
|
* @param DomainMessage $domainMessage |
122
|
|
|
*/ |
123
|
|
|
private function insertMessage(Connection $connection, DomainMessage $domainMessage) |
124
|
|
|
{ |
125
|
|
|
$data = array( |
126
|
|
|
'uuid' => (string) $domainMessage->getId(), |
127
|
|
|
'playhead' => $domainMessage->getPlayhead(), |
128
|
|
|
'metadata' => json_encode($this->metadataSerializer->serialize($domainMessage->getMetadata())), |
129
|
|
|
'payload' => json_encode($this->payloadSerializer->serialize($domainMessage->getPayload())), |
130
|
|
|
'recorded_on' => $domainMessage->getRecordedOn()->toString(), |
131
|
|
|
'type' => $domainMessage->getType(), |
132
|
|
|
'aggregate_type' => $this->aggregateType, |
133
|
|
|
); |
134
|
|
|
|
135
|
|
|
$connection->insert($this->tableName, $data); |
136
|
|
|
} |
137
|
|
|
|
138
|
|
|
/** |
139
|
|
|
* @param Schema $schema |
140
|
|
|
* @return Table|null |
141
|
|
|
*/ |
142
|
|
|
public function configureSchema(Schema $schema) |
143
|
|
|
{ |
144
|
|
|
if ($schema->hasTable($this->tableName)) { |
145
|
|
|
return null; |
146
|
|
|
} |
147
|
|
|
|
148
|
|
|
return $this->configureTable(); |
149
|
|
|
} |
150
|
|
|
|
151
|
|
|
/** |
152
|
|
|
* @return mixed |
153
|
|
|
*/ |
154
|
|
|
public function configureTable() |
155
|
|
|
{ |
156
|
|
|
$schema = new Schema(); |
157
|
|
|
|
158
|
|
|
$table = $schema->createTable($this->tableName); |
159
|
|
|
|
160
|
|
|
$table->addColumn('id', 'integer', array('autoincrement' => true)); |
161
|
|
|
$table->addColumn('uuid', 'guid', array('length' => 36,)); |
162
|
|
|
$table->addColumn('playhead', 'integer', array('unsigned' => true)); |
163
|
|
|
$table->addColumn('payload', 'text'); |
164
|
|
|
$table->addColumn('metadata', 'text'); |
165
|
|
|
$table->addColumn('recorded_on', 'string', array('length' => 32)); |
166
|
|
|
$table->addColumn('type', 'string', array('length' => 128)); |
167
|
|
|
$table->addColumn('aggregate_type', 'string', array('length' => 128)); |
168
|
|
|
|
169
|
|
|
$table->setPrimaryKey(array('id')); |
|
|
|
|
170
|
|
|
|
171
|
|
|
$table->addUniqueIndex(array('uuid', 'playhead')); |
|
|
|
|
172
|
|
|
|
173
|
|
|
$table->addIndex(['type']); |
|
|
|
|
174
|
|
|
$table->addIndex(['aggregate_type']); |
|
|
|
|
175
|
|
|
|
176
|
|
|
return $table; |
177
|
|
|
} |
178
|
|
|
|
179
|
|
|
/** |
180
|
|
|
* @return \Doctrine\DBAL\Driver\Statement|null |
181
|
|
|
*/ |
182
|
|
|
private function prepareLoadStatement() |
183
|
|
|
{ |
184
|
|
|
if (null === $this->loadStatement) { |
185
|
|
|
$queryBuilder = $this->connection->createQueryBuilder(); |
186
|
|
|
|
187
|
|
|
$queryBuilder->select( |
188
|
|
|
['uuid', 'playhead', 'metadata', 'payload', 'recorded_on'] |
189
|
|
|
) |
190
|
|
|
->from($this->tableName) |
191
|
|
|
->where('uuid = :uuid') |
192
|
|
|
->orderBy('playhead', 'ASC'); |
193
|
|
|
|
194
|
|
|
$this->loadStatement = $this->connection->prepare( |
|
|
|
|
195
|
|
|
$queryBuilder->getSQL() |
196
|
|
|
); |
197
|
|
|
} |
198
|
|
|
|
199
|
|
|
return $this->loadStatement; |
200
|
|
|
} |
201
|
|
|
|
202
|
|
|
/** |
203
|
|
|
* @param $row |
204
|
|
|
* @return DomainMessage |
205
|
|
|
*/ |
206
|
|
View Code Duplication |
private function deserializeEvent($row) |
|
|
|
|
207
|
|
|
{ |
208
|
|
|
return new DomainMessage( |
209
|
|
|
$row['uuid'], |
210
|
|
|
$row['playhead'], |
211
|
|
|
$this->metadataSerializer->deserialize(json_decode($row['metadata'], true)), |
212
|
|
|
$this->payloadSerializer->deserialize(json_decode($row['payload'], true)), |
213
|
|
|
DateTime::fromString($row['recorded_on']) |
214
|
|
|
); |
215
|
|
|
} |
216
|
|
|
|
217
|
|
|
/** |
218
|
|
|
* Ensure that an error will be thrown if the ID in the domain messages is |
219
|
|
|
* not something that can be converted to a string. |
220
|
|
|
* |
221
|
|
|
* If we let this move on without doing this DBAL will eventually |
222
|
|
|
* give us a hard time but the true reason for the problem will be |
223
|
|
|
* obfuscated. |
224
|
|
|
* |
225
|
|
|
* @param DomainEventStreamInterface $eventStream |
226
|
|
|
*/ |
227
|
|
|
private function guardStream(DomainEventStreamInterface $eventStream) |
228
|
|
|
{ |
229
|
|
|
foreach ($eventStream as $domainMessage) { |
230
|
|
|
/** @var DomainMessage $domainMessage */ |
231
|
|
|
$id = (string) $domainMessage->getId(); |
|
|
|
|
232
|
|
|
} |
233
|
|
|
} |
234
|
|
|
} |
235
|
|
|
|
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.
You can also find more detailed suggestions in the “Code” section of your repository.