Completed
Push — master ( 493087...558d15 )
by Beñat
02:06
created

SqlEventStore::unSerialize()   B

Complexity

Conditions 6
Paths 5

Size

Total Lines 27
Code Lines 18

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
c 0
b 0
f 0
dl 0
loc 27
rs 8.439
cc 6
eloc 18
nc 5
nop 3
1
<?php
2
3
/*
4
 * This file is part of the Shared Kernel library.
5
 *
6
 * Copyright (c) 2016-present LIN3S <[email protected]>
7
 *
8
 * For the full copyright and license information, please view the LICENSE
9
 * file that was distributed with this source code.
10
 */
11
12
declare(strict_types=1);
13
14
namespace LIN3S\SharedKernel\Infrastructure\Persistence\Sql\Event;
15
16
use LIN3S\SharedKernel\Domain\Model\DomainEvent;
17
use LIN3S\SharedKernel\Event\EventStore;
18
use LIN3S\SharedKernel\Event\StoredEvent;
19
use LIN3S\SharedKernel\Event\StreamName;
20
use LIN3S\SharedKernel\Event\StreamVersion;
21
use LIN3S\SharedKernel\Infrastructure\Persistence\Sql\Pdo;
22
23
/**
24
 * @author Beñat Espiña <[email protected]>
25
 */
26
final class SqlEventStore implements EventStore
27
{
28
    private const TABLE_NAME = 'events';
29
30
    private $pdo;
31
32
    public function __construct(Pdo $pdo)
33
    {
34
        $this->pdo = $pdo;
35
    }
36
37
    public function append(StoredEvent ...$events) : void
38
    {
39
        $numberOfEvents = count($events);
40
        if (0 === $numberOfEvents) {
41
            return;
42
        }
43
44
        $this->pdo->insert(self::TABLE_NAME, array_map(function (StoredEvent $event) {
45
            return $event->normalizeToAppend();
46
        }, $events));
47
    }
48
49
    public function eventsSince(?\DateTimeInterface $since, int $offset = 0, int $limit = -1) : array
50
    {
51
        $since = null === $since ? 0 : $since->getTimestamp();
52
        $tableName = self::TABLE_NAME;
53
54
        $sql = <<<SQL
55
SELECT * FROM `$tableName`
56
WHERE occurred_on >= :occurredOn
57
ORDER BY `order` ASC
58
LIMIT $limit
59
OFFSET $offset
60
SQL;
61
62
        $storedEventRows = $this->pdo->query($sql, ['occurredOn' => $since]);
63
        $storedEvents = $this->buildStoredEvents($storedEventRows);
64
65
        return $storedEvents;
66
    }
67
68
    private function buildStoredEvents(array $storedEventRows) : array
69
    {
70
        $events = [];
71
        foreach ($storedEventRows as $storedEventRow) {
72
            $storedEvent = new StoredEvent(
73
                $this->buildDomainEvent($storedEventRow),
74
                StreamName::fromName($storedEventRow['stream_name']),
75
                new StreamVersion((int) $storedEventRow['stream_version'])
76
            );
77
            $orderProperty = new \ReflectionProperty(StoredEvent::class, 'order');
78
            $orderProperty->setAccessible(true);
79
            $orderProperty->setValue($storedEvent, $storedEventRow['order']);
80
81
            $events[] = $storedEvent;
82
        }
83
84
        return $events;
85
    }
86
87
    private function buildDomainEvent(array $storedEventRow) : DomainEvent
88
    {
89
        $type = $storedEventRow['type'];
90
        $payload = json_decode($storedEventRow['payload'], true);
91
92
        $eventReflection = new \ReflectionClass($type);
93
        $domainEvent = $eventReflection->newInstanceWithoutConstructor();
94
        foreach ($eventReflection->getProperties() as $property) {
95
            $property->setAccessible(true);
96
97
            if ('occurredOn' === $property->getName()) {
98
                $occurredOn = new \DateTimeImmutable();
99
                $occurredOn->setTimestamp((int) $storedEventRow['occurred_on']);
100
                $property->setValue($domainEvent, $occurredOn);
101
                continue;
102
            }
103
            $this->unSerialize($property, $payload[$property->getName()], $domainEvent);
104
        }
105
106
        return $domainEvent;
107
    }
108
109
    private function unSerialize(\ReflectionProperty $reflectedProperty, $value, $object)
110
    {
111
        if (is_scalar($value)) {
112
            $reflectedProperty->setValue($object, $value);
113
114
            return $object;
115
        }
116
        if (null === $value) {
117
            return $object;
118
        }
119
        $className = key($value);
120
        if (null === $className || is_int($className)) {
121
            return $object;
122
        }
123
124
        $reflectedClass = new \ReflectionClass($className);
125
        $class = $reflectedClass->newInstanceWithoutConstructor();
126
        $classValues = $value[$className];
127
        foreach ($reflectedClass->getProperties() as $property) {
128
            $property->setAccessible(true);
129
            $attribute = $this->unSerialize($property, $classValues[$property->getName()], $class);
130
            $reflectedProperty->setAccessible(true);
131
            $reflectedProperty->setValue($object, $attribute);
132
        }
133
134
        return $object;
135
    }
136
137
    public static function createSchema() : string
138
    {
139
        $tableName = self::TABLE_NAME;
140
141
        return <<<SQL
142
CREATE TABLE IF NOT EXISTS `$tableName` (
143
  `order` BIGINT(20) NOT NULL AUTO_INCREMENT,
144
  `type` VARCHAR(150) COLLATE utf8_bin NOT NULL,
145
  `payload` LONGTEXT NOT NULL,
146
  `occurred_on` INT(10) NOT NULL,
147
  `stream_name` VARCHAR(255) NOT NULL,
148
  `stream_version` INT NOT NULL,
149
  PRIMARY KEY (`order`)
150
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
151
SQL;
152
    }
153
154
    public static function removeSchema() : string
155
    {
156
        $tableName = self::TABLE_NAME;
157
158
        return "DROP TABLE `$tableName`";
159
    }
160
}
161