|
1
|
|
|
<?php |
|
2
|
|
|
|
|
3
|
|
|
namespace CultuurNet\UDB3\EventSourcing\DBAL; |
|
4
|
|
|
|
|
5
|
|
|
use Broadway\Domain\DateTime; |
|
6
|
|
|
use Broadway\Domain\DomainEventStream; |
|
7
|
|
|
use Broadway\Domain\DomainMessage; |
|
8
|
|
|
use Broadway\EventSourcing\EventStreamDecoratorInterface; |
|
9
|
|
|
use Broadway\Serializer\SerializerInterface; |
|
10
|
|
|
use Doctrine\DBAL\Connection; |
|
11
|
|
|
use Doctrine\DBAL\Driver\Statement; |
|
12
|
|
|
use Doctrine\DBAL\DBALException; |
|
13
|
|
|
|
|
14
|
|
|
class EventStream |
|
15
|
|
|
{ |
|
16
|
|
|
/** |
|
17
|
|
|
* @var Connection |
|
18
|
|
|
*/ |
|
19
|
|
|
protected $connection; |
|
20
|
|
|
|
|
21
|
|
|
/** |
|
22
|
|
|
* @var SerializerInterface |
|
23
|
|
|
*/ |
|
24
|
|
|
protected $payloadSerializer; |
|
25
|
|
|
|
|
26
|
|
|
/** |
|
27
|
|
|
* @var SerializerInterface |
|
28
|
|
|
*/ |
|
29
|
|
|
protected $metadataSerializer; |
|
30
|
|
|
|
|
31
|
|
|
/** |
|
32
|
|
|
* @var string |
|
33
|
|
|
*/ |
|
34
|
|
|
protected $tableName; |
|
35
|
|
|
|
|
36
|
|
|
/** |
|
37
|
|
|
* @var Statement |
|
38
|
|
|
*/ |
|
39
|
|
|
protected $loadStatement; |
|
40
|
|
|
|
|
41
|
|
|
/** |
|
42
|
|
|
* @var int |
|
43
|
|
|
*/ |
|
44
|
|
|
protected $previousId; |
|
45
|
|
|
|
|
46
|
|
|
/** |
|
47
|
|
|
* @var string |
|
48
|
|
|
*/ |
|
49
|
|
|
protected $cdbid; |
|
50
|
|
|
|
|
51
|
|
|
/** |
|
52
|
|
|
* @var EventStreamDecoratorInterface |
|
53
|
|
|
*/ |
|
54
|
|
|
private $domainEventStreamDecorator; |
|
55
|
|
|
|
|
56
|
|
|
/** |
|
57
|
|
|
* @param Connection $connection |
|
58
|
|
|
* @param SerializerInterface $payloadSerializer |
|
59
|
|
|
* @param SerializerInterface $metadataSerializer |
|
60
|
|
|
* @param string $tableName |
|
61
|
|
|
*/ |
|
62
|
|
|
public function __construct( |
|
63
|
|
|
Connection $connection, |
|
64
|
|
|
SerializerInterface $payloadSerializer, |
|
65
|
|
|
SerializerInterface $metadataSerializer, |
|
66
|
|
|
$tableName |
|
67
|
|
|
) { |
|
68
|
|
|
$this->connection = $connection; |
|
69
|
|
|
$this->payloadSerializer = $payloadSerializer; |
|
70
|
|
|
$this->metadataSerializer = $metadataSerializer; |
|
71
|
|
|
$this->tableName = $tableName; |
|
72
|
|
|
$this->previousId = 0; |
|
73
|
|
|
} |
|
74
|
|
|
|
|
75
|
|
|
/** |
|
76
|
|
|
* @param int $startId |
|
77
|
|
|
* @return EventStream |
|
78
|
|
|
*/ |
|
79
|
|
|
public function withStartId($startId) |
|
80
|
|
|
{ |
|
81
|
|
|
if (!is_int($startId)) { |
|
82
|
|
|
throw new \InvalidArgumentException('StartId should have type int.'); |
|
83
|
|
|
} |
|
84
|
|
|
|
|
85
|
|
|
if (empty($startId)) { |
|
86
|
|
|
throw new \InvalidArgumentException('StartId can\'t be empty.'); |
|
87
|
|
|
} |
|
88
|
|
|
|
|
89
|
|
|
$c = clone $this; |
|
90
|
|
|
$c->previousId = $startId > 0 ? $startId - 1 : 0; |
|
91
|
|
|
return $c; |
|
92
|
|
|
} |
|
93
|
|
|
|
|
94
|
|
|
/** |
|
95
|
|
|
* @param string $cdbid |
|
96
|
|
|
* @return EventStream |
|
97
|
|
|
*/ |
|
98
|
|
|
public function withCdbid($cdbid) |
|
99
|
|
|
{ |
|
100
|
|
|
if (!is_string($cdbid)) { |
|
101
|
|
|
throw new \InvalidArgumentException('Cdbid should have type string.'); |
|
102
|
|
|
} |
|
103
|
|
|
|
|
104
|
|
|
if (empty($cdbid)) { |
|
105
|
|
|
throw new \InvalidArgumentException('Cdbid can\'t be empty.'); |
|
106
|
|
|
} |
|
107
|
|
|
|
|
108
|
|
|
$c = clone $this; |
|
109
|
|
|
$c->cdbid = $cdbid; |
|
110
|
|
|
return $c; |
|
111
|
|
|
} |
|
112
|
|
|
|
|
113
|
|
|
/** |
|
114
|
|
|
* @param EventStreamDecoratorInterface $domainEventStreamDecorator |
|
115
|
|
|
* @return EventStream |
|
116
|
|
|
*/ |
|
117
|
|
|
public function withDomainEventStreamDecorator(EventStreamDecoratorInterface $domainEventStreamDecorator) |
|
118
|
|
|
{ |
|
119
|
|
|
$c = clone $this; |
|
120
|
|
|
$c->domainEventStreamDecorator = $domainEventStreamDecorator; |
|
121
|
|
|
return $c; |
|
122
|
|
|
} |
|
123
|
|
|
|
|
124
|
|
|
public function __invoke() |
|
125
|
|
|
{ |
|
126
|
|
|
$statement = $this->prepareLoadStatement(); |
|
127
|
|
|
|
|
128
|
|
|
do { |
|
129
|
|
|
$statement->bindValue('previousid', $this->previousId, \PDO::PARAM_INT); |
|
130
|
|
|
if ($this->cdbid) { |
|
131
|
|
|
$statement->bindValue('uuid', $this->cdbid, \PDO::PARAM_STR); |
|
132
|
|
|
} |
|
133
|
|
|
|
|
134
|
|
|
$statement->execute(); |
|
135
|
|
|
|
|
136
|
|
|
$events = []; |
|
137
|
|
|
while ($row = $statement->fetch()) { |
|
138
|
|
|
$events[] = $this->deserializeEvent($row); |
|
139
|
|
|
$this->previousId = $row['id']; |
|
140
|
|
|
} |
|
141
|
|
|
|
|
142
|
|
|
/* @var DomainMessage[] $events */ |
|
143
|
|
|
if (!empty($events)) { |
|
144
|
|
|
$event = $events[0]; |
|
145
|
|
|
$domainEventStream = new DomainEventStream($events); |
|
146
|
|
|
|
|
147
|
|
|
if (!is_null($this->domainEventStreamDecorator)) { |
|
148
|
|
|
// Because the load statement always returns one row at a |
|
149
|
|
|
// time, and we always wrap a single domain message in a |
|
150
|
|
|
// stream as a result, we can simply get the aggregate type |
|
151
|
|
|
// and aggregate id from the first domain message in the |
|
152
|
|
|
// stream. |
|
153
|
|
|
$domainEventStream = $this->domainEventStreamDecorator->decorateForWrite( |
|
154
|
|
|
get_class($event->getPayload()), |
|
155
|
|
|
$event->getId(), |
|
156
|
|
|
$domainEventStream |
|
157
|
|
|
); |
|
158
|
|
|
} |
|
159
|
|
|
|
|
160
|
|
|
yield $domainEventStream; |
|
161
|
|
|
} |
|
162
|
|
|
} while (!empty($events)); |
|
163
|
|
|
} |
|
164
|
|
|
|
|
165
|
|
|
/** |
|
166
|
|
|
* @return int |
|
167
|
|
|
*/ |
|
168
|
|
|
public function getPreviousId() |
|
169
|
|
|
{ |
|
170
|
|
|
return $this->previousId; |
|
171
|
|
|
} |
|
172
|
|
|
|
|
173
|
|
|
/** |
|
174
|
|
|
* @return Statement |
|
175
|
|
|
* @throws DBALException |
|
176
|
|
|
*/ |
|
177
|
|
|
protected function prepareLoadStatement() |
|
178
|
|
|
{ |
|
179
|
|
|
if (null === $this->loadStatement) { |
|
180
|
|
|
$queryBuilder = $this->connection->createQueryBuilder(); |
|
181
|
|
|
|
|
182
|
|
|
$queryBuilder->select('id', 'uuid', 'playhead', 'metadata', 'payload', 'recorded_on') |
|
183
|
|
|
->from($this->tableName) |
|
184
|
|
|
->where('id > :previousid') |
|
185
|
|
|
->orderBy('id', 'ASC') |
|
186
|
|
|
->setMaxResults(1); |
|
187
|
|
|
|
|
188
|
|
|
if ($this->cdbid) { |
|
189
|
|
|
$queryBuilder->andWhere('uuid = :uuid'); |
|
190
|
|
|
} |
|
191
|
|
|
|
|
192
|
|
|
$this->loadStatement = $queryBuilder->execute(); |
|
|
|
|
|
|
193
|
|
|
} |
|
194
|
|
|
|
|
195
|
|
|
return $this->loadStatement; |
|
|
|
|
|
|
196
|
|
|
} |
|
197
|
|
|
|
|
198
|
|
|
/** |
|
199
|
|
|
* @param $row |
|
200
|
|
|
* @return DomainMessage |
|
201
|
|
|
*/ |
|
202
|
|
|
private function deserializeEvent($row) |
|
203
|
|
|
{ |
|
204
|
|
|
return new DomainMessage( |
|
205
|
|
|
$row['uuid'], |
|
206
|
|
|
$row['playhead'], |
|
207
|
|
|
$this->metadataSerializer->deserialize(json_decode($row['metadata'], true)), |
|
208
|
|
|
$this->payloadSerializer->deserialize(json_decode($row['payload'], true)), |
|
209
|
|
|
DateTime::fromString($row['recorded_on']) |
|
210
|
|
|
); |
|
211
|
|
|
} |
|
212
|
|
|
} |
|
213
|
|
|
|
Our type inference engine has found a suspicous assignment of a value to a property. This check raises an issue when a value that can be of a mixed type is assigned to a property that is type hinted more strictly.
For example, imagine you have a variable
$accountIdthat can either hold an Id object or false (if there is no account id yet). Your code now assigns that value to theidproperty of an instance of theAccountclass. This class holds a proper account, so the id value must no longer be false.Either this assignment is in error or a type check should be added for that assignment.