1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
declare(strict_types=1); |
4
|
|
|
|
5
|
|
|
namespace Antidot\EventSource\Infrastructure\Repository; |
6
|
|
|
|
7
|
|
|
use Antidot\EventSource\Domain\Event\AggregateChanged; |
8
|
|
|
use Antidot\EventSource\Domain\Model\Aggregate\AggregateRoot; |
9
|
|
|
use Antidot\EventSource\Domain\Model\ValueObject\AggregateRootId; |
10
|
|
|
use DateTimeImmutable; |
11
|
|
|
use Doctrine\DBAL\Connection; |
12
|
|
|
use function get_class; |
13
|
|
|
use function json_encode; |
14
|
|
|
|
15
|
|
|
abstract class DbalAggregateRootPerTableRepository |
16
|
|
|
{ |
17
|
|
|
protected Connection $connection; |
18
|
|
|
protected string $streamName; |
19
|
|
|
|
20
|
|
|
public function __construct(Connection $connection, string $streamName) |
21
|
|
|
{ |
22
|
|
|
$this->connection = $connection; |
23
|
|
|
$this->streamName = $streamName; |
24
|
|
|
} |
25
|
|
|
|
26
|
|
|
protected function getAggregate(AggregateRootId $aggregateRootId, string $aggregateClass): ?AggregateRoot |
27
|
|
|
{ |
28
|
|
|
$tableName = sprintf('%s_%s', $this->streamName, str_replace('-', '_', $aggregateRootId->value())); |
29
|
|
|
if (false === $this->tableExist($tableName)) { |
30
|
|
|
return null; |
31
|
|
|
} |
32
|
|
|
|
33
|
|
|
$statement = $this->connection->prepare(<<<SQL |
34
|
|
|
SELECT * FROM |
35
|
|
|
`$tableName` |
36
|
|
|
WHERE |
37
|
|
|
aggregate_id = :aggregate_id |
38
|
|
|
ORDER BY |
39
|
|
|
no ASC; |
40
|
|
|
SQL |
41
|
|
|
); |
42
|
|
|
$statement->bindValue('aggregate_id', $aggregateRootId->value()); |
43
|
|
|
$statement->execute(); |
|
|
|
|
44
|
|
|
/** @var null|array $queryResult */ |
45
|
|
|
$queryResult = $statement->fetchAll(); |
|
|
|
|
46
|
|
|
/** @var AggregateRoot $aggregate */ |
47
|
|
|
$aggregate = new $aggregateClass(); |
48
|
|
|
|
49
|
|
|
/** @var array<string, mixed> $item */ |
50
|
|
|
foreach ($queryResult ?? [] as $item) { |
51
|
|
|
/** @var string $eventClass */ |
52
|
|
|
$eventClass = $item['event_class']; |
53
|
|
|
/** @var string $payload */ |
54
|
|
|
$payload = $item['payload']; |
55
|
|
|
/** @var string $occurredOn */ |
56
|
|
|
$occurredOn = $item['occurred_on']; |
57
|
|
|
/** @var AggregateChanged $event */ |
58
|
|
|
$event = new $eventClass( |
59
|
|
|
$item['event_id'], |
60
|
|
|
$item['aggregate_id'], |
61
|
|
|
json_decode($payload, true, 16, JSON_THROW_ON_ERROR), |
62
|
|
|
DateTimeImmutable::createFromFormat('Y-m-d H:i:s', $occurredOn) |
63
|
|
|
); |
64
|
|
|
$aggregate->apply($event); |
65
|
|
|
} |
66
|
|
|
|
67
|
|
|
return $aggregate; |
68
|
|
|
} |
69
|
|
|
|
70
|
|
|
protected function saveAggregate(AggregateRoot $aggregateRoot): void |
71
|
|
|
{ |
72
|
|
|
$aggregateId = $aggregateRoot->aggregateId(); |
73
|
|
|
$tableName = sprintf('%s_%s', $this->streamName, str_replace('-', '_', $aggregateId)); |
74
|
|
|
$this->createAggregateStream($tableName); |
75
|
|
|
$this->connection->beginTransaction(); |
76
|
|
|
try { |
77
|
|
|
/** @var AggregateChanged $event */ |
78
|
|
|
foreach ($aggregateRoot->popEvents() as $event) { |
79
|
|
|
$this->connection->insert($tableName, [ |
80
|
|
|
'event_id' => $event->eventId(), |
81
|
|
|
'event_class' => get_class($event), |
|
|
|
|
82
|
|
|
'aggregate_id' => $event->aggregateId(), |
83
|
|
|
'aggregate_class' => get_class($aggregateRoot), |
84
|
|
|
'payload' => json_encode($event->payload(), JSON_THROW_ON_ERROR), |
85
|
|
|
'occurred_on' => $event->occurredOn()->format('Y-m-d H:i:s'), |
86
|
|
|
'version' => $event->version(), |
87
|
|
|
]); |
88
|
|
|
} |
89
|
|
|
$this->connection->commit(); |
90
|
|
|
} catch (\Throwable $exception) { |
91
|
|
|
$this->connection->rollBack(); |
92
|
|
|
throw $exception; |
93
|
|
|
} |
94
|
|
|
} |
95
|
|
|
|
96
|
|
|
protected function createAggregateStream(string $tableName): void |
97
|
|
|
{ |
98
|
|
|
if ($this->tableExist($tableName)) { |
99
|
|
|
return; |
100
|
|
|
} |
101
|
|
|
|
102
|
|
|
$statement = $this->connection->prepare(<<<SQL |
103
|
|
|
CREATE TABLE `$tableName` ( |
104
|
|
|
`no` BIGINT(20) NOT NULL AUTO_INCREMENT, |
105
|
|
|
`event_id` VARCHAR(36) NOT NULL, |
106
|
|
|
`event_class` CHAR(180) NOT NULL, |
107
|
|
|
`aggregate_id` VARCHAR(36) NOT NULL, |
108
|
|
|
`aggregate_class` CHAR(180) NOT NULL, |
109
|
|
|
`payload` JSON, |
110
|
|
|
`occurred_on` DATETIME NOT NULL, |
111
|
|
|
`version` INT(3) NOT NULL, |
112
|
|
|
PRIMARY KEY (`no`), |
113
|
|
|
UNIQUE KEY `ix_rsn` (`event_id`), |
114
|
|
|
KEY `ix_aggregate_id` (`aggregate_id`) |
115
|
|
|
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin; |
116
|
|
|
SQL |
117
|
|
|
); |
118
|
|
|
$statement->execute(); |
|
|
|
|
119
|
|
|
} |
120
|
|
|
|
121
|
|
|
private function tableExist(string $tableName): bool |
122
|
|
|
{ |
123
|
|
|
$statement = $this->connection->prepare(<<<SQL |
124
|
|
|
SELECT * FROM |
125
|
|
|
information_schema.tables t |
126
|
|
|
WHERE |
127
|
|
|
t.table_schema = :db_name |
128
|
|
|
AND t.table_name = :aggregate_stream |
129
|
|
|
LIMIT 1; |
130
|
|
|
SQL |
131
|
|
|
); |
132
|
|
|
|
133
|
|
|
$statement->bindValue(':db_name', $this->connection->getDatabase()); |
134
|
|
|
$statement->bindValue(':aggregate_stream', $tableName); |
135
|
|
|
$statement->execute(); |
|
|
|
|
136
|
|
|
|
137
|
|
|
/** @var array|false $result */ |
138
|
|
|
$result = $statement->fetch(); |
|
|
|
|
139
|
|
|
|
140
|
|
|
return false === empty($result); |
141
|
|
|
} |
142
|
|
|
} |
143
|
|
|
|
This function has been deprecated. The supplier of the function has supplied an explanatory message.
The explanatory message should give you some clue as to whether and when the function will be removed and what other function to use instead.