Passed
Push — 1.0.x ( 6a4ef4...259f9a )
by Koldo
03:14
created

DbalAggregateRootPerTableRepository::tableExist()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 20
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
eloc 13
c 1
b 0
f 0
nc 1
nop 1
dl 0
loc 20
ccs 0
cts 8
cp 0
crap 2
rs 9.8333
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Antidot\EventSource\Infrastructure\Repository;
6
7
use Antidot\EventSource\Domain\Event\AggregateChanged;
8
use Antidot\EventSource\Domain\Model\Aggregate\AggregateRoot;
9
use Antidot\EventSource\Domain\Model\ValueObject\AggregateRootId;
10
use DateTimeImmutable;
11
use Doctrine\DBAL\Connection;
12
use function get_class;
13
use function json_encode;
14
15
abstract class DbalAggregateRootPerTableRepository
16
{
17
    protected Connection $connection;
18
    protected string $streamName;
19
20
    public function __construct(Connection $connection, string $streamName)
21
    {
22
        $this->connection = $connection;
23
        $this->streamName = $streamName;
24
    }
25
26
    protected function getAggregate(AggregateRootId $aggregateRootId, string $aggregateClass): ?AggregateRoot
27
    {
28
        $tableName = sprintf('%s_%s', $this->streamName, str_replace('-', '_', $aggregateRootId->value()));
29
        if (false === $this->tableExist($tableName)) {
30
            return null;
31
        }
32
33
        $statement = $this->connection->prepare(<<<SQL
34
            SELECT * FROM 
35
                `$tableName` 
36
            WHERE 
37
                aggregate_id = :aggregate_id 
38
            ORDER BY  
39
                no ASC;
40
            SQL
41
        );
42
        $statement->bindValue('aggregate_id', $aggregateRootId->value());
43
        $statement->execute();
44
        /** @var null|array $queryResult */
45
        $queryResult = $statement->fetchAll();
46
        /** @var AggregateRoot $aggregate */
47
        $aggregate = new $aggregateClass();
48
49
        /** @var array<string, mixed> $item */
50
        foreach ($queryResult ?? [] as $item) {
51
            /** @var string $eventClass */
52
            $eventClass = $item['event_class'];
53
            /** @var string $payload */
54
            $payload = $item['payload'];
55
            /** @var string $occurredOn */
56
            $occurredOn = $item['occurred_on'];
57
            /** @var AggregateChanged $event */
58
            $event = new $eventClass(
59
                $item['event_id'],
60
                $item['aggregate_id'],
61
                json_decode($payload, true, 16, JSON_THROW_ON_ERROR),
62
                DateTimeImmutable::createFromFormat('Y-m-d H:i:s', $occurredOn)
63
            );
64
            $aggregate->apply($event);
65
        }
66
67
        return $aggregate;
68
    }
69
70
    protected function saveAggregate(AggregateRoot $aggregateRoot): void
71
    {
72
        $aggregateId = $aggregateRoot->aggregateId();
73
        $tableName = sprintf('%s_%s', $this->streamName, str_replace('-', '_', $aggregateId));
74
        $this->createAggregateStream($tableName);
75
        $this->connection->beginTransaction();
76
        try {
77
            /** @var AggregateChanged $event */
78
            foreach ($aggregateRoot->popEvents() as $event) {
79
                $this->connection->insert($tableName, [
80
                    'event_id' => $event->eventId(),
81
                    'event_class' => get_class($event),
0 ignored issues
show
Bug introduced by
$event of type array is incompatible with the type object expected by parameter $object of get_class(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

81
                    'event_class' => get_class(/** @scrutinizer ignore-type */ $event),
Loading history...
82
                    'aggregate_id' => $event->aggregateId(),
83
                    'aggregate_class' => get_class($aggregateRoot),
84
                    'payload' => json_encode($event->payload(), JSON_THROW_ON_ERROR),
85
                    'occurred_on' => $event->occurredOn()->format('Y-m-d H:i:s'),
86
                    'version' => $event->version(),
87
                ]);
88
            }
89
            $this->connection->commit();
90
        } catch (\Throwable $exception) {
91
            $this->connection->rollBack();
92
            throw $exception;
93
        }
94
    }
95
96
    protected function createAggregateStream(string $tableName): void
97
    {
98
        if ($this->tableExist($tableName)) {
99
            return;
100
        }
101
102
        $statement = $this->connection->prepare(<<<SQL
103
            CREATE TABLE `$tableName` (
104
              `no` BIGINT(20) NOT NULL AUTO_INCREMENT,
105
              `event_id` VARCHAR(36) NOT NULL,
106
              `event_class` CHAR(180) NOT NULL,
107
              `aggregate_id` VARCHAR(36) NOT NULL,
108
              `aggregate_class` CHAR(180) NOT NULL,
109
              `payload` JSON,
110
              `occurred_on` DATETIME NOT NULL,
111
              `version` INT(3) NOT NULL,
112
              PRIMARY KEY (`no`),
113
              UNIQUE KEY `ix_rsn` (`event_id`),
114
              KEY `ix_aggregate_id` (`aggregate_id`)
115
            ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
116
        SQL
117
        );
118
        $statement->execute();
119
    }
120
121
    private function tableExist(string $tableName): bool
122
    {
123
        $statement = $this->connection->prepare(<<<SQL
124
            SELECT * FROM 
125
                information_schema.tables t
126
            WHERE 
127
                t.table_schema = :db_name 
128
                AND t.table_name = :aggregate_stream
129
            LIMIT 1;
130
        SQL
131
        );
132
133
        $statement->bindValue(':db_name', $this->connection->getDatabase());
134
        $statement->bindValue(':aggregate_stream', $tableName);
135
        $statement->execute();
136
137
        /** @var array|false $result */
138
        $result = $statement->fetch();
139
140
        return false === empty($result);
141
    }
142
}
143