1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace CultuurNet\UDB3\EventSourcing\DBAL; |
4
|
|
|
|
5
|
|
|
use Broadway\Domain\DateTime; |
6
|
|
|
use Broadway\Domain\DomainEventStream; |
7
|
|
|
use Broadway\Domain\DomainMessage; |
8
|
|
|
use Broadway\EventSourcing\EventStreamDecoratorInterface; |
9
|
|
|
use Broadway\Serializer\SerializerInterface; |
10
|
|
|
use Doctrine\DBAL\Connection; |
11
|
|
|
use Doctrine\DBAL\Driver\Statement; |
12
|
|
|
use Doctrine\DBAL\DBALException; |
13
|
|
|
|
14
|
|
|
class EventStream |
15
|
|
|
{ |
16
|
|
|
/** |
17
|
|
|
* @var Connection |
18
|
|
|
*/ |
19
|
|
|
protected $connection; |
20
|
|
|
|
21
|
|
|
/** |
22
|
|
|
* @var SerializerInterface |
23
|
|
|
*/ |
24
|
|
|
protected $payloadSerializer; |
25
|
|
|
|
26
|
|
|
/** |
27
|
|
|
* @var SerializerInterface |
28
|
|
|
*/ |
29
|
|
|
protected $metadataSerializer; |
30
|
|
|
|
31
|
|
|
/** |
32
|
|
|
* @var string |
33
|
|
|
*/ |
34
|
|
|
protected $tableName; |
35
|
|
|
|
36
|
|
|
/** |
37
|
|
|
* @var int |
38
|
|
|
*/ |
39
|
|
|
protected $startId; |
40
|
|
|
|
41
|
|
|
/** |
42
|
|
|
* @var int |
43
|
|
|
*/ |
44
|
|
|
protected $lastProcessedId; |
45
|
|
|
|
46
|
|
|
/** |
47
|
|
|
* @var string[] |
48
|
|
|
*/ |
49
|
|
|
protected $cdbids; |
50
|
|
|
|
51
|
|
|
/** |
52
|
|
|
* @var EventStreamDecoratorInterface |
53
|
|
|
*/ |
54
|
|
|
private $domainEventStreamDecorator; |
55
|
|
|
|
56
|
|
|
/** |
57
|
|
|
* @var string |
58
|
|
|
*/ |
59
|
|
|
private $aggregateType; |
60
|
|
|
|
61
|
|
|
/** |
62
|
|
|
* @param Connection $connection |
63
|
|
|
* @param SerializerInterface $payloadSerializer |
64
|
|
|
* @param SerializerInterface $metadataSerializer |
65
|
|
|
* @param string $tableName |
66
|
|
|
*/ |
67
|
|
View Code Duplication |
public function __construct( |
|
|
|
|
68
|
|
|
Connection $connection, |
69
|
|
|
SerializerInterface $payloadSerializer, |
70
|
|
|
SerializerInterface $metadataSerializer, |
71
|
|
|
$tableName |
72
|
|
|
) { |
73
|
|
|
$this->connection = $connection; |
74
|
|
|
$this->payloadSerializer = $payloadSerializer; |
75
|
|
|
$this->metadataSerializer = $metadataSerializer; |
76
|
|
|
$this->tableName = $tableName; |
77
|
|
|
$this->startId = 0; |
78
|
|
|
$this->aggregateType = ''; |
79
|
|
|
} |
80
|
|
|
|
81
|
|
|
/** |
82
|
|
|
* @param int $startId |
83
|
|
|
* @return EventStream |
84
|
|
|
*/ |
85
|
|
|
public function withStartId($startId) |
86
|
|
|
{ |
87
|
|
|
if (!is_int($startId)) { |
88
|
|
|
throw new \InvalidArgumentException('StartId should have type int.'); |
89
|
|
|
} |
90
|
|
|
|
91
|
|
|
if ($startId <= 0) { |
92
|
|
|
throw new \InvalidArgumentException('StartId should be higher than 0.'); |
93
|
|
|
} |
94
|
|
|
|
95
|
|
|
$c = clone $this; |
96
|
|
|
$c->startId = $startId; |
97
|
|
|
return $c; |
98
|
|
|
} |
99
|
|
|
|
100
|
|
|
/** |
101
|
|
|
* @param string $aggregateType |
102
|
|
|
* @return EventStream |
103
|
|
|
*/ |
104
|
|
|
public function withAggregateType($aggregateType) |
105
|
|
|
{ |
106
|
|
|
$c = clone $this; |
107
|
|
|
$c->aggregateType = $aggregateType; |
108
|
|
|
return $c; |
109
|
|
|
} |
110
|
|
|
|
111
|
|
|
|
112
|
|
|
/** |
113
|
|
|
* @param string[] $cdbids |
114
|
|
|
* @return EventStream |
115
|
|
|
*/ |
116
|
|
|
public function withCdbids($cdbids) |
117
|
|
|
{ |
118
|
|
|
if (!is_array($cdbids)) { |
119
|
|
|
throw new \InvalidArgumentException('Cdbids should have type array.'); |
120
|
|
|
} |
121
|
|
|
|
122
|
|
|
if (empty($cdbids)) { |
123
|
|
|
throw new \InvalidArgumentException('Cdbids can\'t be empty.'); |
124
|
|
|
} |
125
|
|
|
|
126
|
|
|
$c = clone $this; |
127
|
|
|
$c->cdbids = $cdbids; |
128
|
|
|
return $c; |
129
|
|
|
} |
130
|
|
|
|
131
|
|
|
/** |
132
|
|
|
* @param EventStreamDecoratorInterface $domainEventStreamDecorator |
133
|
|
|
* @return EventStream |
134
|
|
|
*/ |
135
|
|
|
public function withDomainEventStreamDecorator(EventStreamDecoratorInterface $domainEventStreamDecorator) |
136
|
|
|
{ |
137
|
|
|
$c = clone $this; |
138
|
|
|
$c->domainEventStreamDecorator = $domainEventStreamDecorator; |
139
|
|
|
return $c; |
140
|
|
|
} |
141
|
|
|
|
142
|
|
|
public function __invoke() |
143
|
|
|
{ |
144
|
|
|
do { |
145
|
|
|
$statement = $this->prepareLoadStatement(); |
146
|
|
|
|
147
|
|
|
$events = []; |
148
|
|
|
while ($row = $statement->fetch()) { |
149
|
|
|
$events[] = $this->deserializeEvent($row); |
150
|
|
|
$this->lastProcessedId = $row['id']; |
151
|
|
|
// Make sure to increment to prevent endless loop. |
152
|
|
|
$this->startId = $row['id'] + 1; |
153
|
|
|
} |
154
|
|
|
|
155
|
|
|
/* @var DomainMessage[] $events */ |
156
|
|
|
if (!empty($events)) { |
157
|
|
|
$event = $events[0]; |
158
|
|
|
$domainEventStream = new DomainEventStream($events); |
159
|
|
|
|
160
|
|
|
if (!is_null($this->domainEventStreamDecorator)) { |
161
|
|
|
// Because the load statement always returns one row at a |
162
|
|
|
// time, and we always wrap a single domain message in a |
163
|
|
|
// stream as a result, we can simply get the aggregate type |
164
|
|
|
// and aggregate id from the first domain message in the |
165
|
|
|
// stream. |
166
|
|
|
$domainEventStream = $this->domainEventStreamDecorator->decorateForWrite( |
167
|
|
|
get_class($event->getPayload()), |
168
|
|
|
$event->getId(), |
169
|
|
|
$domainEventStream |
170
|
|
|
); |
171
|
|
|
} |
172
|
|
|
|
173
|
|
|
yield $domainEventStream; |
174
|
|
|
} |
175
|
|
|
} while (!empty($events)); |
176
|
|
|
} |
177
|
|
|
|
178
|
|
|
/** |
179
|
|
|
* @return int |
180
|
|
|
*/ |
181
|
|
|
public function getLastProcessedId() |
182
|
|
|
{ |
183
|
|
|
return $this->lastProcessedId; |
184
|
|
|
} |
185
|
|
|
|
186
|
|
|
/** |
187
|
|
|
* The load statement can no longer be 'cached' because of using the query |
188
|
|
|
* builder. The query builder requires all parameters to be set before |
189
|
|
|
* using the execute command. The previous solution used the prepare |
190
|
|
|
* statement on the connection, this did not require all parameters to be |
191
|
|
|
* set up front. |
192
|
|
|
* |
193
|
|
|
* @return Statement |
194
|
|
|
* @throws DBALException |
195
|
|
|
*/ |
196
|
|
|
protected function prepareLoadStatement() |
197
|
|
|
{ |
198
|
|
|
$queryBuilder = $this->connection->createQueryBuilder(); |
199
|
|
|
|
200
|
|
|
$queryBuilder->select('id', 'uuid', 'playhead', 'metadata', 'payload', 'recorded_on') |
201
|
|
|
->from($this->tableName) |
202
|
|
|
->where('id >= :startid') |
203
|
|
|
->setParameter('startid', $this->startId) |
204
|
|
|
->orderBy('id', 'ASC') |
205
|
|
|
->setMaxResults(1); |
206
|
|
|
|
207
|
|
|
if ($this->cdbids) { |
|
|
|
|
208
|
|
|
$queryBuilder->andWhere('uuid IN (:uuids)') |
209
|
|
|
->setParameter('uuids', $this->cdbids, Connection::PARAM_STR_ARRAY); |
210
|
|
|
} |
211
|
|
|
|
212
|
|
|
if (!empty($this->aggregateType)) { |
213
|
|
|
$queryBuilder->andWhere('aggregate_type = :aggregate_type'); |
214
|
|
|
$queryBuilder->setParameter('aggregate_type', $this->aggregateType); |
215
|
|
|
} |
216
|
|
|
|
217
|
|
|
return $queryBuilder->execute(); |
|
|
|
|
218
|
|
|
} |
219
|
|
|
|
220
|
|
|
/** |
221
|
|
|
* @param $row |
222
|
|
|
* @return DomainMessage |
223
|
|
|
*/ |
224
|
|
View Code Duplication |
private function deserializeEvent($row) |
|
|
|
|
225
|
|
|
{ |
226
|
|
|
return new DomainMessage( |
227
|
|
|
$row['uuid'], |
228
|
|
|
$row['playhead'], |
229
|
|
|
$this->metadataSerializer->deserialize(json_decode($row['metadata'], true)), |
230
|
|
|
$this->payloadSerializer->deserialize(json_decode($row['payload'], true)), |
231
|
|
|
DateTime::fromString($row['recorded_on']) |
232
|
|
|
); |
233
|
|
|
} |
234
|
|
|
} |
235
|
|
|
|
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.
You can also find more detailed suggestions in the “Code” section of your repository.