1
|
|
|
<?php declare(strict_types=1); |
2
|
|
|
|
3
|
|
|
namespace SmoothPhp\LaravelAdapter\EventStore; |
4
|
|
|
|
5
|
|
|
use Illuminate\Database\Connection; |
6
|
|
|
use Illuminate\Database\DatabaseManager; |
7
|
|
|
use Illuminate\Database\QueryException; |
8
|
|
|
use SmoothPhp\Contracts\Domain\DomainEventStream; |
9
|
|
|
use SmoothPhp\Contracts\Domain\DomainMessage; |
10
|
|
|
use SmoothPhp\Contracts\EventStore\DomainEventStreamInterface; |
11
|
|
|
use SmoothPhp\Contracts\EventStore\EventStore; |
12
|
|
|
use SmoothPhp\Contracts\Serialization\Serializer; |
13
|
|
|
use SmoothPhp\Domain\DateTime; |
14
|
|
|
use SmoothPhp\EventStore\DuplicateAggregatePlayhead; |
15
|
|
|
use SmoothPhp\EventStore\EventStreamNotFound; |
16
|
|
|
|
17
|
|
|
/** |
18
|
|
|
* Class LaravelEventStore |
19
|
|
|
* @package SmoothPhp\LaravelAdapter\EventStore |
20
|
|
|
* @author Simon Bennett <[email protected]> |
21
|
|
|
*/ |
22
|
|
|
final class LaravelEventStore implements EventStore |
23
|
|
|
{ |
24
|
|
|
/** @var Serializer */ |
25
|
|
|
private $serializer; |
26
|
|
|
|
27
|
|
|
/** @var string */ |
28
|
|
|
private $eventStoreTableName; |
29
|
|
|
|
30
|
|
|
/** @var Connection */ |
31
|
|
|
private $db; |
32
|
|
|
|
33
|
|
|
/** |
34
|
|
|
* @param DatabaseManager $databaseManager |
35
|
|
|
* @param Serializer $serializer |
36
|
|
|
* @param string $eventStoreConnectionName |
37
|
|
|
* @param string $eventStoreTableName |
38
|
|
|
*/ |
39
|
|
|
public function __construct( |
40
|
|
|
DatabaseManager $databaseManager, |
41
|
|
|
Serializer $serializer, |
42
|
|
|
$eventStoreConnectionName, |
43
|
|
|
$eventStoreTableName |
44
|
|
|
) { |
45
|
|
|
$this->db = $databaseManager->connection($eventStoreConnectionName); |
46
|
|
|
$this->serializer = $serializer; |
47
|
|
|
$this->eventStoreTableName = $eventStoreTableName; |
48
|
|
|
} |
49
|
|
|
|
50
|
|
|
/** |
51
|
|
|
* @param string $id |
52
|
|
|
* @return DomainEventStream |
53
|
|
|
* @throws EventStreamNotFound |
54
|
|
|
*/ |
55
|
|
|
public function load(string $id) : DomainEventStream |
56
|
|
|
{ |
57
|
|
|
$rows = $this->db->table($this->eventStoreTableName) |
58
|
|
|
->select(['uuid', 'playhead', 'metadata', 'payload', 'recorded_on']) |
59
|
|
|
->where('uuid', $id) |
60
|
|
|
->orderBy('playhead', 'asc') |
61
|
|
|
->get(); |
62
|
|
|
$events = []; |
63
|
|
|
|
64
|
|
|
foreach ($rows as $row) { |
65
|
|
|
$events[] = $this->deserializeEvent($row); |
66
|
|
|
} |
67
|
|
|
|
68
|
|
|
if (empty($events)) { |
69
|
|
|
throw new EventStreamNotFound(sprintf('EventStream not found for aggregate with id %s', $id)); |
70
|
|
|
} |
71
|
|
|
|
72
|
|
|
return new \SmoothPhp\Domain\DomainEventStream($events); |
73
|
|
|
} |
74
|
|
|
|
75
|
|
|
/** |
76
|
|
|
* @param string $id |
77
|
|
|
* @param DomainEventStream $eventStream |
78
|
|
|
* @throws \PDOException |
79
|
|
|
* @throws \SmoothPhp\EventStore\DuplicateAggregatePlayhead |
80
|
|
|
* @throws \Illuminate\Database\QueryException |
81
|
|
|
*/ |
82
|
|
|
public function append(string $id, DomainEventStream $eventStream) : void |
83
|
|
|
{ |
84
|
|
|
$this->db->reconnect(); |
85
|
|
|
$this->db->beginTransaction(); |
86
|
|
|
|
87
|
|
|
try { |
88
|
|
|
foreach ($eventStream as $domainMessage) { |
89
|
|
|
$this->insertEvent($this->domainMessageToArray($domainMessage)); |
90
|
|
|
} |
91
|
|
|
|
92
|
|
|
$this->db->commit(); |
93
|
|
|
} catch (QueryException $ex) { |
94
|
|
|
$this->db->rollBack(); |
95
|
|
|
|
96
|
|
|
throw $ex; |
97
|
|
|
} |
98
|
|
|
} |
99
|
|
|
|
100
|
|
|
/** |
101
|
|
|
* @param array $eventRow |
102
|
|
|
* @throws DuplicateAggregatePlayhead |
103
|
|
|
* @throws \PDOException |
104
|
|
|
*/ |
105
|
|
|
private function insertEvent(array $eventRow) : void |
106
|
|
|
{ |
107
|
|
|
try { |
108
|
|
|
$this->db->table($this->eventStoreTableName)->insert($eventRow); |
109
|
|
|
} catch (\PDOException $ex) { |
110
|
|
|
if ((string)$ex->getCode() === '23000') { |
111
|
|
|
throw new DuplicateAggregatePlayhead($eventRow['uuid'], $eventRow['playhead'], $ex); |
|
|
|
|
112
|
|
|
} |
113
|
|
|
throw $ex; |
114
|
|
|
} |
115
|
|
|
} |
116
|
|
|
|
117
|
|
|
/** |
118
|
|
|
* @param \stdClass |
119
|
|
|
* @return DomainMessage |
120
|
|
|
*/ |
121
|
|
|
private function deserializeEvent($row) : DomainMessage |
122
|
|
|
{ |
123
|
|
|
return new \SmoothPhp\Domain\DomainMessage( |
124
|
|
|
$row->uuid, |
125
|
|
|
$row->playhead, |
126
|
|
|
$this->serializer->deserialize(json_decode($row->metadata, true)), |
127
|
|
|
$this->serializer->deserialize(json_decode($row->payload, true)), |
128
|
|
|
new DateTime($row->recorded_on) |
129
|
|
|
); |
130
|
|
|
} |
131
|
|
|
|
132
|
|
|
/** |
133
|
|
|
* @param string[] $eventTypes |
134
|
|
|
* @return int |
135
|
|
|
*/ |
136
|
|
|
public function getEventCountByTypes(array $eventTypes) : int |
137
|
|
|
{ |
138
|
|
|
return $this->db->table($this->eventStoreTableName) |
139
|
|
|
->whereIn('type', $eventTypes) |
140
|
|
|
->count(); |
141
|
|
|
} |
142
|
|
|
|
143
|
|
|
/** |
144
|
|
|
* @param string[] $eventTypes |
145
|
|
|
* @param int $take |
146
|
|
|
* @return \Generator |
147
|
|
|
*/ |
148
|
|
|
public function getEventsByType(array $eventTypes, int $take) : \Generator |
149
|
|
|
{ |
150
|
|
|
$lastId = 0; |
151
|
|
|
do { |
152
|
|
|
$rows = $this->db->table( |
153
|
|
|
$this->db->raw("`{$this->eventStoreTableName}` FORCE INDEX (eventstore_type_index)") |
154
|
|
|
) |
155
|
|
|
->select(['id', 'uuid', 'playhead', 'metadata', 'payload', 'recorded_on']) |
156
|
|
|
->whereIn('type', $eventTypes) |
157
|
|
|
->where('id', '>', $lastId) |
158
|
|
|
->take($take) |
159
|
|
|
->orderBy('id') |
160
|
|
|
->get(); |
161
|
|
|
$events = []; |
162
|
|
|
foreach ($rows as $row) { |
163
|
|
|
$events[] = $this->deserializeEvent($row); |
164
|
|
|
$lastId = $row->id; |
165
|
|
|
} |
166
|
|
|
|
167
|
|
|
yield new \SmoothPhp\Domain\DomainEventStream($events); |
168
|
|
|
} while (count($rows) > 0); |
169
|
|
|
} |
170
|
|
|
|
171
|
|
|
/** |
172
|
|
|
* @param DomainMessage $domainMessage |
173
|
|
|
* @return array |
174
|
|
|
*/ |
175
|
|
|
private function domainMessageToArray(DomainMessage $domainMessage) : array |
176
|
|
|
{ |
177
|
|
|
return [ |
178
|
|
|
'uuid' => (string)$domainMessage->getId(), |
179
|
|
|
'playhead' => $domainMessage->getPlayHead(), |
180
|
|
|
'metadata' => json_encode($this->serializer->serialize($domainMessage->getMetadata())), |
181
|
|
|
'payload' => json_encode($this->serializer->serialize($domainMessage->getPayload())), |
182
|
|
|
'recorded_on' => $domainMessage->getRecordedOn()->format('Y-m-d H:i:s'), |
183
|
|
|
'type' => $domainMessage->getType(), |
184
|
|
|
]; |
185
|
|
|
} |
186
|
|
|
|
187
|
|
|
/** |
188
|
|
|
* @param string $streamId |
189
|
|
|
*/ |
190
|
|
|
public function deleteStream(string $streamId) : void |
191
|
|
|
{ |
192
|
|
|
$this->db->table($this->eventStoreTableName) |
193
|
|
|
->where('uuid', $streamId) |
194
|
|
|
->delete(); |
195
|
|
|
} |
196
|
|
|
} |
This check compares calls to functions or methods with their respective definitions. If the call has more arguments than are defined, it raises an issue.
If a function is defined several times with a different number of parameters, the check may pick up the wrong definition and report false positives. One codebase where this has been known to happen is Wordpress.
In this case you can add the
@ignore
PhpDoc annotation to the duplicate definition and it will be ignored.