1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace Rawkode\Eidetic\EventStore\DBALEventStore; |
4
|
|
|
|
5
|
|
|
use Doctrine\DBAL\Connection; |
6
|
|
|
use Doctrine\DBAL\DriverManager; |
7
|
|
|
use Doctrine\DBAL\Schema\Table; |
8
|
|
|
use Rawkode\Eidetic\EventStore\InvalidEventException; |
9
|
|
|
use Rawkode\Eidetic\EventStore\EventStore; |
10
|
|
|
use Rawkode\Eidetic\EventStore\EventPublisherMixin; |
11
|
|
|
use Rawkode\Eidetic\EventStore\NoEventsFoundForKeyException; |
12
|
|
|
use Rawkode\Eidetic\EventStore\Subscriber; |
13
|
|
|
use Rawkode\Eidetic\EventStore\VerifyEventIsAClassTrait; |
14
|
|
|
|
15
|
|
|
final class DBALEventStore implements EventStore |
16
|
|
|
{ |
17
|
|
|
use EventPublisherMixin; |
18
|
|
|
use VerifyEventIsAClassTrait; |
19
|
|
|
|
20
|
|
|
/** |
21
|
|
|
* @var string |
22
|
|
|
*/ |
23
|
|
|
private $tableName; |
24
|
|
|
|
25
|
|
|
/** |
26
|
|
|
* @var Connection |
27
|
|
|
*/ |
28
|
|
|
private $connection; |
29
|
|
|
|
30
|
|
|
/** @var array */ |
31
|
|
|
private $stagedEvents = [ ]; |
32
|
|
|
|
33
|
|
|
/** |
34
|
|
|
* @param string $tableName |
35
|
|
|
* @param Connection $connection |
36
|
|
|
*/ |
37
|
6 |
|
private function __construct($tableName, Connection $connection) |
38
|
|
|
{ |
39
|
6 |
|
$this->tableName = $tableName; |
40
|
6 |
|
$this->connection = $connection; |
41
|
6 |
|
} |
42
|
|
|
|
43
|
|
|
/** |
44
|
|
|
* @param string $tableName |
45
|
|
|
* @param array $options |
46
|
|
|
* |
47
|
|
|
* @return self |
48
|
|
|
*/ |
49
|
6 |
|
public static function createWithOptions($tableName, array $options) |
50
|
|
|
{ |
51
|
6 |
|
$connection = DriverManager::getConnection($options); |
52
|
|
|
|
53
|
6 |
|
return new self($tableName, $connection); |
54
|
|
|
} |
55
|
|
|
|
56
|
|
|
/** |
57
|
|
|
* @param string $key |
58
|
|
|
* @param array $events |
59
|
|
|
*/ |
60
|
5 |
View Code Duplication |
public function store($key, array $events) |
|
|
|
|
61
|
|
|
{ |
62
|
|
|
try { |
63
|
5 |
|
$this->startTransaction(); |
64
|
|
|
|
65
|
5 |
|
foreach ($events as $event) { |
66
|
5 |
|
$this->persistEvent($key, $event); |
67
|
5 |
|
} |
68
|
5 |
|
} catch (InvalidEventException $invalidEventException) { |
69
|
2 |
|
$this->abortTransaction(); |
70
|
|
|
|
71
|
2 |
|
throw $invalidEventException; |
72
|
|
|
} |
73
|
|
|
|
74
|
4 |
|
$this->completeTransaction(); |
75
|
4 |
|
} |
76
|
|
|
|
77
|
|
|
/** |
78
|
|
|
* @param string $key |
79
|
|
|
* |
80
|
|
|
* @return array |
81
|
|
|
*/ |
82
|
2 |
|
public function retrieve($key) |
83
|
|
|
{ |
84
|
2 |
|
$results = $this->eventLogs($key); |
85
|
|
|
|
86
|
|
|
return array_map(function ($eventLog) { |
87
|
1 |
|
return unserialize(base64_decode($eventLog['event'])); |
88
|
1 |
|
}, $results); |
89
|
|
|
} |
90
|
|
|
|
91
|
|
|
/** |
92
|
|
|
* @param string $key |
93
|
|
|
* |
94
|
|
|
* @return array |
95
|
|
|
*/ |
96
|
1 |
|
public function retrieveLogs($key) |
97
|
|
|
{ |
98
|
1 |
|
return $this->eventLogs($key); |
99
|
|
|
} |
100
|
|
|
|
101
|
|
|
/** |
102
|
|
|
* @param string $key |
103
|
|
|
* |
104
|
|
|
* @return array |
105
|
|
|
*/ |
106
|
3 |
|
private function eventLogs($key) |
107
|
|
|
{ |
108
|
3 |
|
$this->verifyEventExistsForKey($key); |
109
|
|
|
|
110
|
2 |
|
$statement = $this->eventLogQuery($key)->execute(); |
111
|
|
|
|
112
|
2 |
|
$results = $statement->fetchAll(); |
113
|
|
|
|
114
|
2 |
|
return array_map(function ($eventLog) { |
115
|
2 |
|
if (true === array_key_exists('recorded_at', $eventLog)) { |
116
|
2 |
|
$eventLog['recorded_at'] = new \DateTime($eventLog['recorded_at']); |
117
|
2 |
|
} |
118
|
|
|
|
119
|
2 |
|
return $eventLog; |
120
|
2 |
|
}, $results); |
121
|
|
|
} |
122
|
|
|
|
123
|
|
|
/** |
124
|
|
|
*/ |
125
|
5 |
|
private function startTransaction() |
126
|
|
|
{ |
127
|
5 |
|
$this->connection->beginTransaction(); |
128
|
5 |
|
$this->stagedEvents = [ ]; |
129
|
5 |
|
} |
130
|
|
|
|
131
|
|
|
/** |
132
|
|
|
*/ |
133
|
2 |
|
private function abortTransaction() |
134
|
|
|
{ |
135
|
2 |
|
$this->connection->rollBack(); |
136
|
2 |
|
$this->stagedEvents = [ ]; |
137
|
2 |
|
} |
138
|
|
|
|
139
|
|
|
/** |
140
|
|
|
*/ |
141
|
4 |
|
private function completeTransaction() |
142
|
|
|
{ |
143
|
4 |
|
$this->connection->commit(); |
144
|
|
|
|
145
|
4 |
|
foreach ($this->stagedEvents as $event) { |
146
|
4 |
|
$this->publish(self::EVENT_STORED, $event); |
147
|
4 |
|
} |
148
|
|
|
|
149
|
4 |
|
$this->stagedEvents = [ ]; |
150
|
4 |
|
} |
151
|
|
|
|
152
|
|
|
/** |
153
|
|
|
* @param string $key |
154
|
|
|
* @param object $event |
155
|
|
|
*/ |
156
|
6 |
|
private function persistEvent($key, $event) |
157
|
|
|
{ |
158
|
5 |
|
$this->verifyEventIsAClass($event); |
159
|
|
|
|
160
|
5 |
|
$this->connection->insert($this->tableName, [ |
161
|
6 |
|
'key' => $key, |
162
|
5 |
|
'recorded_at' => new \DateTime('now', new \DateTimeZone('UTC')), |
163
|
5 |
|
'event_class' => get_class($event), |
164
|
5 |
|
'event' => base64_encode(serialize($event)), |
165
|
5 |
|
], [ |
166
|
5 |
|
\PDO::PARAM_STR, |
167
|
5 |
|
'datetime', |
168
|
5 |
|
\PDO::PARAM_STR, |
169
|
5 |
|
\PDO::PARAM_STR, |
170
|
5 |
|
]); |
171
|
|
|
|
172
|
6 |
|
array_push($this->stagedEvents, $event); |
173
|
5 |
|
} |
174
|
|
|
|
175
|
|
|
/** |
176
|
|
|
*/ |
177
|
6 |
|
public function createTable() |
178
|
|
|
{ |
179
|
6 |
|
$schemaManager = $this->connection->getSchemaManager(); |
180
|
6 |
|
$schema = $schemaManager->createSchema(); |
181
|
|
|
|
182
|
6 |
|
if ($schema->hasTable($this->tableName)) { |
183
|
|
|
return; |
184
|
|
|
} |
185
|
|
|
|
186
|
6 |
|
$table = $schema->createTable($this->tableName); |
187
|
|
|
|
188
|
6 |
|
$serialNumberColumn = $table->addColumn('serial_number', 'integer'); |
189
|
6 |
|
$serialNumberColumn->setAutoincrement(true); |
190
|
6 |
|
$table->setPrimaryKey(['serial_number']); |
191
|
|
|
|
192
|
6 |
|
$table->addColumn('key', 'string', ['length' => 255]); |
193
|
6 |
|
$table->addColumn('recorded_at', 'datetime'); |
194
|
6 |
|
$table->addColumn('event_class', 'string', ['length' => 255]); |
195
|
6 |
|
$table->addColumn('event', 'text'); |
196
|
|
|
|
197
|
6 |
|
$table->addIndex(['key']); |
198
|
6 |
|
$table->addIndex(['recorded_at']); |
199
|
6 |
|
$table->addIndex(['event_class']); |
200
|
|
|
|
201
|
6 |
|
$schemaManager->createTable($table); |
202
|
6 |
|
} |
203
|
|
|
|
204
|
|
|
/** |
205
|
|
|
*/ |
206
|
6 |
|
public function dropTable() |
207
|
|
|
{ |
208
|
6 |
|
$this->connection->getSchemaManager()->dropTable($this->tableName); |
209
|
6 |
|
} |
210
|
|
|
|
211
|
|
|
/** |
212
|
|
|
* @param $key |
213
|
|
|
* @throws NoEventsFoundForKeyException |
214
|
|
|
*/ |
215
|
3 |
|
private function verifyEventExistsForKey($key) |
216
|
|
|
{ |
217
|
3 |
|
$results = $this->singleLogForKey($key); |
218
|
|
|
|
219
|
3 |
|
if (count($results) === 0) { |
220
|
1 |
|
throw new NoEventsFoundForKeyException(); |
221
|
|
|
} |
222
|
2 |
|
} |
223
|
|
|
|
224
|
|
|
/** |
225
|
|
|
* @param $key |
226
|
|
|
* @return \Doctrine\DBAL\Query\QueryBuilder |
227
|
|
|
*/ |
228
|
3 |
|
private function eventLogQuery($key) |
229
|
|
|
{ |
230
|
3 |
|
$queryBuilder = $this->connection->createQueryBuilder(); |
231
|
|
|
|
232
|
3 |
|
$queryBuilder->select('*'); |
233
|
3 |
|
$queryBuilder->from($this->tableName); |
234
|
3 |
|
$queryBuilder->where('key = :key'); |
235
|
3 |
|
$queryBuilder->orderBy('serial_number', 'ASC'); |
236
|
|
|
|
237
|
3 |
|
$queryBuilder->setParameter('key', $key); |
238
|
|
|
|
239
|
3 |
|
return $queryBuilder; |
240
|
|
|
} |
241
|
|
|
|
242
|
|
|
/** |
243
|
|
|
* @param $key |
244
|
|
|
* @return array |
245
|
|
|
*/ |
246
|
3 |
|
private function singleLogForKey($key) |
247
|
|
|
{ |
248
|
3 |
|
$queryBuilder = $this->eventLogQuery($key); |
249
|
|
|
|
250
|
3 |
|
$queryBuilder->setMaxResults(1); |
251
|
|
|
|
252
|
3 |
|
$statement = $queryBuilder->execute(); |
253
|
|
|
|
254
|
3 |
|
$results = $statement->fetchAll(); |
255
|
|
|
|
256
|
3 |
|
return $results; |
257
|
|
|
} |
258
|
|
|
} |
259
|
|
|
|
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.
You can also find more detailed suggestions in the “Code” section of your repository.