1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace DDDominio\EventSourcing\Snapshotting\Vendor; |
4
|
|
|
|
5
|
|
|
use DDDominio\EventSourcing\EventStore\InitializableInterface; |
6
|
|
|
use DDDominio\EventSourcing\Snapshotting\SnapshotInterface; |
7
|
|
|
use DDDominio\EventSourcing\Snapshotting\SnapshotStoreInterface; |
8
|
|
|
use DDDominio\EventSourcing\Serialization\SerializerInterface; |
9
|
|
|
use Doctrine\DBAL\Connection; |
10
|
|
|
use Doctrine\DBAL\Schema\Schema; |
11
|
|
|
|
12
|
|
|
class DoctrineDbalSnapshotStore implements SnapshotStoreInterface, InitializableInterface |
13
|
|
|
{ |
14
|
|
|
const SNAPSHOTS_TABLE = 'snapshots'; |
15
|
|
|
|
16
|
|
|
/** |
17
|
|
|
* @var Connection |
18
|
|
|
*/ |
19
|
|
|
private $connection; |
20
|
|
|
|
21
|
|
|
/** |
22
|
|
|
* @var SerializerInterface |
23
|
|
|
*/ |
24
|
|
|
private $serializer; |
25
|
|
|
|
26
|
|
|
/** |
27
|
|
|
* @param Connection $connection |
28
|
|
|
* @param SerializerInterface $serializer |
29
|
|
|
*/ |
30
|
6 |
|
public function __construct(Connection $connection, SerializerInterface $serializer) |
31
|
|
|
{ |
32
|
6 |
|
$this->connection = $connection; |
33
|
6 |
|
$this->serializer = $serializer; |
34
|
6 |
|
} |
35
|
|
|
|
36
|
|
|
/** |
37
|
|
|
* @param SnapshotInterface $snapshot |
38
|
|
|
*/ |
39
|
4 |
View Code Duplication |
public function addSnapshot($snapshot) |
|
|
|
|
40
|
|
|
{ |
41
|
4 |
|
$stmt = $this->connection |
42
|
4 |
|
->prepare('INSERT INTO snapshots (aggregate_type, aggregate_id, type, version, snapshot) VALUES (:aggregateType, :aggregateId, :type, :version, :snapshot)'); |
43
|
4 |
|
$stmt->bindValue(':aggregateType', $snapshot->aggregateClass()); |
44
|
4 |
|
$stmt->bindValue(':aggregateId', $snapshot->aggregateId()); |
45
|
4 |
|
$stmt->bindValue(':type', get_class($snapshot)); |
46
|
4 |
|
$stmt->bindValue(':version', $snapshot->version()); |
47
|
4 |
|
$stmt->bindValue(':snapshot', $this->serializer->serialize($snapshot)); |
48
|
4 |
|
$stmt->execute(); |
49
|
4 |
|
} |
50
|
|
|
|
51
|
|
|
/** |
52
|
|
|
* @param string $aggregateClass |
53
|
|
|
* @param string $aggregateId |
54
|
|
|
* @return SnapshotInterface|null |
55
|
|
|
*/ |
56
|
2 |
View Code Duplication |
public function findLastSnapshot($aggregateClass, $aggregateId) |
|
|
|
|
57
|
|
|
{ |
58
|
2 |
|
$stmt = $this->connection |
59
|
2 |
|
->prepare('SELECT type, snapshot FROM snapshots WHERE aggregate_type = :aggregateType AND aggregate_id = :aggregateId ORDER BY id DESC LIMIT 1'); |
60
|
2 |
|
$stmt->bindValue(':aggregateType', $aggregateClass); |
61
|
2 |
|
$stmt->bindValue(':aggregateId', $aggregateId); |
62
|
2 |
|
$stmt->execute(); |
63
|
2 |
|
$snapshot = $stmt->fetch(); |
|
|
|
|
64
|
2 |
|
return $snapshot ? $this->serializer->deserialize($snapshot['snapshot'], $snapshot['type']) : null; |
|
|
|
|
65
|
|
|
} |
66
|
|
|
|
67
|
|
|
/** |
68
|
|
|
* @param string $aggregateClass |
69
|
|
|
* @param string $aggregateId |
70
|
|
|
* @param int $version |
71
|
|
|
* @return SnapshotInterface|null |
72
|
|
|
*/ |
73
|
2 |
View Code Duplication |
public function findNearestSnapshotToVersion($aggregateClass, $aggregateId, $version) |
|
|
|
|
74
|
|
|
{ |
75
|
2 |
|
$stmt = $this->connection |
76
|
2 |
|
->prepare('SELECT type, snapshot FROM snapshots WHERE aggregate_type = :aggregateType AND aggregate_id = :aggregateId AND version <= :version ORDER BY version DESC LIMIT 1'); |
77
|
2 |
|
$stmt->bindValue(':aggregateType', $aggregateClass); |
78
|
2 |
|
$stmt->bindValue(':aggregateId', $aggregateId); |
79
|
2 |
|
$stmt->bindValue(':version', $version); |
80
|
2 |
|
$stmt->execute(); |
81
|
2 |
|
$snapshot = $stmt->fetch(); |
|
|
|
|
82
|
2 |
|
return $snapshot ? $this->serializer->deserialize($snapshot['snapshot'], $snapshot['type']) : null; |
|
|
|
|
83
|
|
|
} |
84
|
|
|
|
85
|
|
|
/** |
86
|
|
|
* Initialize the Event Store |
87
|
|
|
*/ |
88
|
6 |
|
public function initialize() |
89
|
|
|
{ |
90
|
6 |
|
$schema = new Schema(); |
91
|
|
|
|
92
|
6 |
|
$snapshotsTable = $schema->createTable(self::SNAPSHOTS_TABLE); |
93
|
6 |
|
$snapshotsTable->addColumn('id', 'integer', ['autoincrement' => true]); |
94
|
6 |
|
$snapshotsTable->addColumn('aggregate_type', 'string'); |
95
|
6 |
|
$snapshotsTable->addColumn('aggregate_id', 'string'); |
96
|
6 |
|
$snapshotsTable->addColumn('type', 'string'); |
97
|
6 |
|
$snapshotsTable->addColumn('version', 'integer'); |
98
|
6 |
|
$snapshotsTable->addColumn('snapshot', 'text'); |
99
|
6 |
|
$snapshotsTable->setPrimaryKey(['id']); |
100
|
|
|
|
101
|
6 |
|
$queries = $schema->toSql($this->connection->getDatabasePlatform()); |
102
|
6 |
|
$this->connection->transactional(function(Connection $connection) use ($queries) { |
103
|
6 |
|
foreach ($queries as $query) { |
104
|
6 |
|
$connection->exec($query); |
|
|
|
|
105
|
|
|
} |
106
|
6 |
|
}); |
107
|
6 |
|
} |
108
|
|
|
|
109
|
|
|
/** |
110
|
|
|
* Check if the Event Store has been initialized |
111
|
|
|
* |
112
|
|
|
* @return bool |
113
|
|
|
*/ |
114
|
2 |
|
public function initialized() |
115
|
|
|
{ |
116
|
2 |
|
return $this->connection->getSchemaManager()->tablesExist([self::SNAPSHOTS_TABLE]); |
117
|
|
|
} |
118
|
|
|
} |
119
|
|
|
|
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.
You can also find more detailed suggestions in the “Code” section of your repository.