Completed
Push — master ( d92413...ff1eea )
by Beñat
01:42
created

SqlEventStore   A

Complexity

Total Complexity 13

Size/Duplication

Total Lines 101
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 5

Importance

Changes 0
Metric Value
wmc 13
lcom 1
cbo 5
dl 0
loc 101
rs 10
c 0
b 0
f 0

8 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 4 1
A append() 0 8 2
A streamOfName() 0 11 1
B buildDomainEventsCollection() 0 24 4
A insert() 0 9 1
A prepareData() 0 9 2
A createSchema() 0 15 1
A removeSchema() 0 6 1
1
<?php
2
3
/*
4
 * This file is part of the Shared Kernel library.
5
 *
6
 * Copyright (c) 2016-present LIN3S <[email protected]>
7
 *
8
 * For the full copyright and license information, please view the LICENSE
9
 * file that was distributed with this source code.
10
 */
11
12
declare(strict_types=1);
13
14
namespace LIN3S\SharedKernel\Infrastructure\Persistence\Sql\Event;
15
16
use LIN3S\SharedKernel\Domain\Model\DomainEventCollection;
17
use LIN3S\SharedKernel\Event\EventStore;
18
use LIN3S\SharedKernel\Event\StoredEvent;
19
use LIN3S\SharedKernel\Event\Stream;
20
use LIN3S\SharedKernel\Event\StreamName;
21
use LIN3S\SharedKernel\Infrastructure\Persistence\Sql\Pdo;
22
23
/**
24
 * @author Beñat Espiña <[email protected]>
25
 */
26
final class SqlEventStore implements EventStore
27
{
28
    private const TABLE_NAME = 'events';
29
    private const COLUMN_NAMES = ['type', 'payload', 'occurred_on', 'stream'];
30
31
    private $pdo;
32
33
    public function __construct(Pdo $pdo)
34
    {
35
        $this->pdo = $pdo;
36
    }
37
38
    public function append(Stream $stream) : void
39
    {
40
        $storedEvents = [];
41
        foreach ($stream->events() as $event) {
42
            $storedEvents[] = StoredEvent::fromDomainEvent($event, $stream->name());
43
        }
44
        $this->insert(...$storedEvents);
45
    }
46
47
    public function streamOfName(StreamName $name) : Stream
48
    {
49
        $tableName = self::TABLE_NAME;
50
        $sql = <<<SQL
51
SELECT * FROM $tableName WHERE stream = :stream ORDER BY id ASC
52
SQL;
53
        $storedEventRows = $this->pdo->execute($sql, ['stream' => $name->name()])->fetchAll(\PDO::FETCH_ASSOC);
54
        $domainEventsCollection = $this->buildDomainEventsCollection($storedEventRows);
55
56
        return new Stream($name, $domainEventsCollection);
57
    }
58
59
    private function buildDomainEventsCollection(array $storedEventRows) : DomainEventCollection
60
    {
61
        $domainEvents = new DomainEventCollection();
62
        foreach ($storedEventRows as $storedEventRow) {
63
            $eventType = $storedEventRow['type'];
64
            $payload = json_decode($storedEventRow['payload'], true);
65
66
            $eventReflection = new \ReflectionClass($eventType);
67
            $domainEvent = $eventReflection->newInstanceWithoutConstructor();
68
            foreach ($eventReflection->getProperties() as $property) {
69
                $property->setAccessible(true);
70
71
                if (isset($payload[$property->name])) {
72
                    $property->setValue($domainEvent, $payload[$property->name]);
73
                    continue;
74
                }
75
                $property->setValue($domainEvent, $storedEventRow[$property]);
76
            }
77
78
            $domainEvents->add($domainEvent);
79
        }
80
81
        return $domainEvents;
82
    }
83
84
    private function insert(StoredEvent ...$events) : void
85
    {
86
        $rowPlaces = '(' . implode(', ', array_fill(0, count(self::COLUMN_NAMES), '?')) . ')';
87
        $allPlaces = implode(', ', array_fill(0, count($events), $rowPlaces));
88
89
        $sql = 'INSERT INTO ' . self::TABLE_NAME . ' (' . implode(', ', self::COLUMN_NAMES) . ') VALUES ' . $allPlaces;
90
91
        $this->pdo->execute($sql, $this->prepareData(...$events));
92
    }
93
94
    private function prepareData(StoredEvent ...$events) : array
95
    {
96
        $data = [];
97
        foreach ($events as $event) {
98
            $data = array_merge($data, $event->toArray());
99
        }
100
101
        return $data;
102
    }
103
104
    public static function createSchema() : string
105
    {
106
        $tableName = self::TABLE_NAME;
107
108
        return <<<SQL
109
CREATE TABLE IF NOT EXISTS `$tableName` (
110
  `id` BIGINT(20) NOT NULL AUTO_INCREMENT,
111
  `type` VARCHAR(150) COLLATE utf8_bin NOT NULL,
112
  `payload` JSON NOT NULL,
113
  `occurred_on` INT(10) NOT NULL,
114
  `stream` VARCHAR(255) NOT NULL,
115
  PRIMARY KEY (`id`)
116
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
117
SQL;
118
    }
119
120
    public static function removeSchema() : string
121
    {
122
        $tableName = self::TABLE_NAME;
123
124
        return "DROP TABLE `$tableName`";
125
    }
126
}
127