Completed
Push — master ( 263533...6dddbe )
by Beñat
01:46
created

SqlEventStore   A

Complexity

Total Complexity 18

Size/Duplication

Total Lines 150
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 5

Importance

Changes 0
Metric Value
wmc 18
lcom 1
cbo 5
dl 0
loc 150
rs 10
c 0
b 0
f 0

9 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 4 1
A append() 0 21 4
A streamOfName() 0 9 1
A eventsSince() 0 18 2
A buildStoredEvents() 0 18 2
A buildDomainEvent() 0 21 3
A unSerialize() 0 21 3
A createSchema() 0 16 1
A removeSchema() 0 6 1
1
<?php
2
3
/*
4
 * This file is part of the Shared Kernel library.
5
 *
6
 * Copyright (c) 2016-present LIN3S <[email protected]>
7
 *
8
 * For the full copyright and license information, please view the LICENSE
9
 * file that was distributed with this source code.
10
 */
11
12
declare(strict_types=1);
13
14
namespace LIN3S\SharedKernel\Infrastructure\Persistence\Sql\Event;
15
16
use LIN3S\SharedKernel\Domain\Model\DomainEvent;
17
use LIN3S\SharedKernel\Event\EventStore;
18
use LIN3S\SharedKernel\Event\StoredEvent;
19
use LIN3S\SharedKernel\Event\Stream;
20
use LIN3S\SharedKernel\Event\StreamName;
21
use LIN3S\SharedKernel\Event\StreamVersion;
22
use LIN3S\SharedKernel\Infrastructure\Persistence\Sql\Pdo;
23
24
/**
25
 * @author Beñat Espiña <[email protected]>
26
 */
27
final class SqlEventStore implements EventStore
28
{
29
    private const TABLE_NAME = 'events';
30
    private const COLUMN_NAMES = ['type', 'payload', 'occurred_on', 'stream_name', 'stream_version'];
31
32
    private $pdo;
33
34
    public function __construct(Pdo $pdo)
35
    {
36
        $this->pdo = $pdo;
37
    }
38
39
    public function append(Stream $stream) : void
40
    {
41
        $storedEvents = [];
42
        foreach ($stream->events() as $event) {
43
            $storedEvents[] = new StoredEvent($event, $stream->name(), $stream->version());
44
        }
45
46
        $numberOfEvents = count($storedEvents);
47
        if (0 === $numberOfEvents) {
48
            return;
49
        }
50
51
        $this->pdo->insert(self::TABLE_NAME, self::COLUMN_NAMES, $numberOfEvents, function () use ($storedEvents) {
52
            $data = [];
53
            foreach ($storedEvents as $event) {
54
                $data = array_merge($data, $event->toArray());
55
            }
56
57
            return $data;
58
        });
59
    }
60
61
    public function streamOfName(StreamName $name) : Stream
62
    {
63
        $tableName = self::TABLE_NAME;
64
        $sql = "SELECT * FROM `$tableName` WHERE stream_name = :streamName ORDER BY `order` ASC";
65
        $storedEventRows = $this->pdo->query($sql, ['streamName' => $name->name()]);
66
        $domainEventsCollection = $this->buildDomainEventsCollection($storedEventRows);
0 ignored issues
show
Bug introduced by
The method buildDomainEventsCollection() does not exist on LIN3S\SharedKernel\Infra...Sql\Event\SqlEventStore. Did you maybe mean buildDomainEvent()?

This check marks calls to methods that do not seem to exist on an object.

This is most likely the result of a method being renamed without all references to it being renamed likewise.

Loading history...
67
68
        return new Stream($name, $domainEventsCollection);
0 ignored issues
show
Bug introduced by
The call to Stream::__construct() misses a required argument $events.

This check looks for function calls that miss required arguments.

Loading history...
69
    }
70
71
    public function eventsSince(?\DateTimeInterface $since, int $offset = 0, int $limit = -1) : array
72
    {
73
        $since = null === $since ? 0 : $since->getTimestamp();
74
        $tableName = self::TABLE_NAME;
75
76
        $sql = <<<SQL
77
SELECT * FROM `$tableName`
78
WHERE occurred_on >= :occurredOn
79
ORDER BY `order` ASC
80
LIMIT $limit
81
OFFSET $offset
82
SQL;
83
84
        $storedEventRows = $this->pdo->query($sql, ['occurredOn' => $since]);
85
        $storedEvents = $this->buildStoredEvents($storedEventRows);
86
87
        return $storedEvents;
88
    }
89
90
    private function buildStoredEvents(array $storedEventRows) : array
91
    {
92
        $events = [];
93
        foreach ($storedEventRows as $storedEventRow) {
94
            $storedEvent = new StoredEvent(
95
                $this->buildDomainEvent($storedEventRow),
96
                StreamName::fromName($storedEventRow['stream_name']),
97
                new StreamVersion((int) $storedEventRow['stream_version'])
98
            );
99
            $orderProperty = new \ReflectionProperty(StoredEvent::class, 'order');
100
            $orderProperty->setAccessible(true);
101
            $orderProperty->setValue($storedEvent, $storedEventRow['order']);
102
103
            $events[] = $storedEvent;
104
        }
105
106
        return $events;
107
    }
108
109
    private function buildDomainEvent(array $storedEventRow) : DomainEvent
110
    {
111
        $type = $storedEventRow['type'];
112
        $payload = json_decode($storedEventRow['payload'], true);
113
114
        $eventReflection = new \ReflectionClass($type);
115
        $domainEvent = $eventReflection->newInstanceWithoutConstructor();
116
        foreach ($eventReflection->getProperties() as $property) {
117
            $property->setAccessible(true);
118
119
            if ('occurredOn' === $property->getName()) {
120
                $occurredOn = new \DateTimeImmutable();
121
                $occurredOn->setTimestamp((int) $storedEventRow['occurred_on']);
122
                $property->setValue($domainEvent, $occurredOn);
123
                continue;
124
            }
125
            $this->unSerialize($property, $payload[$property->getName()], $domainEvent);
126
        }
127
128
        return $domainEvent;
129
    }
130
131
    private function unSerialize(\ReflectionProperty $reflectedProperty, $value, $object)
132
    {
133
        if (is_scalar($value)) {
134
            $reflectedProperty->setValue($object, $value);
135
136
            return $object;
137
        }
138
139
        $className = key($value);
140
        $reflectedClass = new \ReflectionClass($className);
141
        $class = $reflectedClass->newInstanceWithoutConstructor();
142
        $classValues = $value[$className];
143
        foreach ($reflectedClass->getProperties() as $property) {
144
            $property->setAccessible(true);
145
            $attribute = $this->unSerialize($property, $classValues[$property->getName()], $class);
146
            $reflectedProperty->setAccessible(true);
147
            $reflectedProperty->setValue($object, $attribute);
148
        }
149
150
        return $object;
151
    }
152
153
    public static function createSchema() : string
154
    {
155
        $tableName = self::TABLE_NAME;
156
157
        return <<<SQL
158
CREATE TABLE IF NOT EXISTS `$tableName` (
159
  `order` BIGINT(20) NOT NULL AUTO_INCREMENT,
160
  `type` VARCHAR(150) COLLATE utf8_bin NOT NULL,
161
  `payload` JSON NOT NULL,
162
  `occurred_on` INT(10) NOT NULL,
163
  `stream_name` VARCHAR(255) NOT NULL,
164
  `stream_version` INT NOT NULL,
165
  PRIMARY KEY (`order`)
166
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
167
SQL;
168
    }
169
170
    public static function removeSchema() : string
171
    {
172
        $tableName = self::TABLE_NAME;
173
174
        return "DROP TABLE `$tableName`";
175
    }
176
}
177