1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace CultuurNet\UDB3\EventSourcing\DBAL; |
4
|
|
|
|
5
|
|
|
use Broadway\Domain\DateTime; |
6
|
|
|
use Broadway\Domain\DomainEventStream; |
7
|
|
|
use Broadway\Domain\DomainMessage; |
8
|
|
|
use Broadway\EventSourcing\EventStreamDecoratorInterface; |
9
|
|
|
use Broadway\Serializer\SerializerInterface; |
10
|
|
|
use Doctrine\DBAL\Connection; |
11
|
|
|
use Doctrine\DBAL\Driver\Statement; |
12
|
|
|
use Doctrine\DBAL\DBALException; |
13
|
|
|
|
14
|
|
|
class EventStream |
15
|
|
|
{ |
16
|
|
|
/** |
17
|
|
|
* @var Connection |
18
|
|
|
*/ |
19
|
|
|
protected $connection; |
20
|
|
|
|
21
|
|
|
/** |
22
|
|
|
* @var SerializerInterface |
23
|
|
|
*/ |
24
|
|
|
protected $payloadSerializer; |
25
|
|
|
|
26
|
|
|
/** |
27
|
|
|
* @var SerializerInterface |
28
|
|
|
*/ |
29
|
|
|
protected $metadataSerializer; |
30
|
|
|
|
31
|
|
|
/** |
32
|
|
|
* @var string |
33
|
|
|
*/ |
34
|
|
|
protected $tableName; |
35
|
|
|
|
36
|
|
|
/** |
37
|
|
|
* @var int |
38
|
|
|
*/ |
39
|
|
|
protected $startId; |
40
|
|
|
|
41
|
|
|
/** |
42
|
|
|
* @var int |
43
|
|
|
*/ |
44
|
|
|
protected $lastProcessedId; |
45
|
|
|
|
46
|
|
|
/** |
47
|
|
|
* @var string[] |
48
|
|
|
*/ |
49
|
|
|
protected $cdbids; |
50
|
|
|
|
51
|
|
|
/** |
52
|
|
|
* @var EventStreamDecoratorInterface |
53
|
|
|
*/ |
54
|
|
|
private $domainEventStreamDecorator; |
55
|
|
|
|
56
|
|
|
/** |
57
|
|
|
* @param Connection $connection |
58
|
|
|
* @param SerializerInterface $payloadSerializer |
59
|
|
|
* @param SerializerInterface $metadataSerializer |
60
|
|
|
* @param string $tableName |
61
|
|
|
*/ |
62
|
|
|
public function __construct( |
63
|
|
|
Connection $connection, |
64
|
|
|
SerializerInterface $payloadSerializer, |
65
|
|
|
SerializerInterface $metadataSerializer, |
66
|
|
|
$tableName |
67
|
|
|
) { |
68
|
|
|
$this->connection = $connection; |
69
|
|
|
$this->payloadSerializer = $payloadSerializer; |
70
|
|
|
$this->metadataSerializer = $metadataSerializer; |
71
|
|
|
$this->tableName = $tableName; |
72
|
|
|
$this->startId = 0; |
73
|
|
|
} |
74
|
|
|
|
75
|
|
|
/** |
76
|
|
|
* @param int $startId |
77
|
|
|
* @return EventStream |
78
|
|
|
*/ |
79
|
|
|
public function withStartId($startId) |
80
|
|
|
{ |
81
|
|
|
if (!is_int($startId)) { |
82
|
|
|
throw new \InvalidArgumentException('StartId should have type int.'); |
83
|
|
|
} |
84
|
|
|
|
85
|
|
|
if ($startId <= 0) { |
86
|
|
|
throw new \InvalidArgumentException('StartId should be higher than 0.'); |
87
|
|
|
} |
88
|
|
|
|
89
|
|
|
$c = clone $this; |
90
|
|
|
$c->startId = $startId; |
91
|
|
|
return $c; |
92
|
|
|
} |
93
|
|
|
|
94
|
|
|
/** |
95
|
|
|
* @param string[] $cdbids |
96
|
|
|
* @return EventStream |
97
|
|
|
*/ |
98
|
|
|
public function withCdbids($cdbids) |
99
|
|
|
{ |
100
|
|
|
if (!is_array($cdbids)) { |
101
|
|
|
throw new \InvalidArgumentException('Cdbids should have type array.'); |
102
|
|
|
} |
103
|
|
|
|
104
|
|
|
if (empty($cdbids)) { |
105
|
|
|
throw new \InvalidArgumentException('Cdbids can\'t be empty.'); |
106
|
|
|
} |
107
|
|
|
|
108
|
|
|
$c = clone $this; |
109
|
|
|
$c->cdbids = $cdbids; |
110
|
|
|
return $c; |
111
|
|
|
} |
112
|
|
|
|
113
|
|
|
/** |
114
|
|
|
* @param EventStreamDecoratorInterface $domainEventStreamDecorator |
115
|
|
|
* @return EventStream |
116
|
|
|
*/ |
117
|
|
|
public function withDomainEventStreamDecorator(EventStreamDecoratorInterface $domainEventStreamDecorator) |
118
|
|
|
{ |
119
|
|
|
$c = clone $this; |
120
|
|
|
$c->domainEventStreamDecorator = $domainEventStreamDecorator; |
121
|
|
|
return $c; |
122
|
|
|
} |
123
|
|
|
|
124
|
|
|
public function __invoke() |
125
|
|
|
{ |
126
|
|
|
do { |
127
|
|
|
$statement = $this->prepareLoadStatement(); |
128
|
|
|
|
129
|
|
|
$events = []; |
130
|
|
|
while ($row = $statement->fetch()) { |
131
|
|
|
$events[] = $this->deserializeEvent($row); |
132
|
|
|
$this->lastProcessedId = $row['id']; |
133
|
|
|
// Make sure to increment to prevent endless loop. |
134
|
|
|
$this->startId = $row['id'] + 1; |
135
|
|
|
} |
136
|
|
|
|
137
|
|
|
/* @var DomainMessage[] $events */ |
138
|
|
|
if (!empty($events)) { |
139
|
|
|
$event = $events[0]; |
140
|
|
|
$domainEventStream = new DomainEventStream($events); |
141
|
|
|
|
142
|
|
|
if (!is_null($this->domainEventStreamDecorator)) { |
143
|
|
|
// Because the load statement always returns one row at a |
144
|
|
|
// time, and we always wrap a single domain message in a |
145
|
|
|
// stream as a result, we can simply get the aggregate type |
146
|
|
|
// and aggregate id from the first domain message in the |
147
|
|
|
// stream. |
148
|
|
|
$domainEventStream = $this->domainEventStreamDecorator->decorateForWrite( |
149
|
|
|
get_class($event->getPayload()), |
150
|
|
|
$event->getId(), |
151
|
|
|
$domainEventStream |
152
|
|
|
); |
153
|
|
|
} |
154
|
|
|
|
155
|
|
|
yield $domainEventStream; |
156
|
|
|
} |
157
|
|
|
} while (!empty($events)); |
158
|
|
|
} |
159
|
|
|
|
160
|
|
|
/** |
161
|
|
|
* @return int |
162
|
|
|
*/ |
163
|
|
|
public function getLastProcessedId() |
164
|
|
|
{ |
165
|
|
|
return $this->lastProcessedId; |
166
|
|
|
} |
167
|
|
|
|
168
|
|
|
/** |
169
|
|
|
* The load statement can no longer be 'cached' because of using the query |
170
|
|
|
* builder. The query builder requires all parameters to be set before |
171
|
|
|
* using the execute command. The previous solution used the prepare |
172
|
|
|
* statement on the connection, this did not require all parameters to be |
173
|
|
|
* set up front. |
174
|
|
|
* |
175
|
|
|
* @return Statement |
176
|
|
|
* @throws DBALException |
177
|
|
|
*/ |
178
|
|
|
protected function prepareLoadStatement() |
179
|
|
|
{ |
180
|
|
|
$queryBuilder = $this->connection->createQueryBuilder(); |
181
|
|
|
|
182
|
|
|
$queryBuilder->select('id', 'uuid', 'playhead', 'metadata', 'payload', 'recorded_on') |
183
|
|
|
->from($this->tableName) |
184
|
|
|
->where('id >= :startid') |
185
|
|
|
->setParameter('startid', $this->startId) |
186
|
|
|
->orderBy('id', 'ASC') |
187
|
|
|
->setMaxResults(1); |
188
|
|
|
|
189
|
|
|
if ($this->cdbids) { |
|
|
|
|
190
|
|
|
$queryBuilder->andWhere('uuid IN (:uuids)') |
191
|
|
|
->setParameter('uuids', $this->cdbids, Connection::PARAM_STR_ARRAY); |
192
|
|
|
} |
193
|
|
|
|
194
|
|
|
return $queryBuilder->execute(); |
|
|
|
|
195
|
|
|
} |
196
|
|
|
|
197
|
|
|
/** |
198
|
|
|
* @param $row |
199
|
|
|
* @return DomainMessage |
200
|
|
|
*/ |
201
|
|
|
private function deserializeEvent($row) |
202
|
|
|
{ |
203
|
|
|
return new DomainMessage( |
204
|
|
|
$row['uuid'], |
205
|
|
|
$row['playhead'], |
206
|
|
|
$this->metadataSerializer->deserialize(json_decode($row['metadata'], true)), |
207
|
|
|
$this->payloadSerializer->deserialize(json_decode($row['payload'], true)), |
208
|
|
|
DateTime::fromString($row['recorded_on']) |
209
|
|
|
); |
210
|
|
|
} |
211
|
|
|
} |
212
|
|
|
|
This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.
Consider making the comparison explicit by using
empty(..)
or! empty(...)
instead.