Completed
Push — master ( ea11b7...4be545 )
by Beñat
01:45
created

SqlEventStore::createSchema()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 16
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 16
rs 9.4285
c 0
b 0
f 0
cc 1
eloc 5
nc 1
nop 0
1
<?php
2
3
/*
4
 * This file is part of the Shared Kernel library.
5
 *
6
 * Copyright (c) 2016-present LIN3S <[email protected]>
7
 *
8
 * For the full copyright and license information, please view the LICENSE
9
 * file that was distributed with this source code.
10
 */
11
12
declare(strict_types=1);
13
14
namespace LIN3S\SharedKernel\Infrastructure\Persistence\Sql\Event;
15
16
use LIN3S\SharedKernel\Domain\Model\DomainEventCollection;
17
use LIN3S\SharedKernel\Event\EventStore;
18
use LIN3S\SharedKernel\Event\StoredEvent;
19
use LIN3S\SharedKernel\Event\Stream;
20
use LIN3S\SharedKernel\Event\StreamName;
21
use LIN3S\SharedKernel\Infrastructure\Persistence\Sql\Pdo;
22
23
/**
24
 * @author Beñat Espiña <[email protected]>
25
 */
26
final class SqlEventStore implements EventStore
27
{
28
    private const TABLE_NAME = 'events';
29
    private const COLUMN_NAMES = ['type', 'payload', 'occurred_on', 'stream_name', 'stream_version'];
30
31
    private $pdo;
32
33
    public function __construct(Pdo $pdo)
34
    {
35
        $this->pdo = $pdo;
36
    }
37
38
    public function append(Stream $stream) : void
39
    {
40
        $storedEvents = [];
41
        foreach ($stream->events() as $event) {
42
            $storedEvents[] = StoredEvent::fromDomainEvent($event, $stream->name(), $stream->version());
43
        }
44
45
        $numberOfEvents = count($storedEvents);
46
        if (0 === $numberOfEvents) {
47
            return;
48
        }
49
50
        $this->pdo->insert(self::TABLE_NAME, self::COLUMN_NAMES, $numberOfEvents, function () use ($storedEvents) {
51
            $data = [];
52
            foreach ($storedEvents as $event) {
53
                $data = array_merge($data, $event->toArray());
54
            }
55
56
            return $data;
57
        });
58
    }
59
60
    public function streamOfName(StreamName $name) : Stream
61
    {
62
        $tableName = self::TABLE_NAME;
63
        $sql = "SELECT * FROM `$tableName` WHERE stream_name = :stream_name ORDER BY id ASC";
64
        $storedEventRows = $this->pdo->query($sql, ['stream_name' => $name->name()]);
65
        $domainEventsCollection = $this->buildDomainEventsCollection($storedEventRows);
66
67
        return new Stream($name, $domainEventsCollection);
0 ignored issues
show
Bug introduced by
The call to Stream::__construct() misses a required argument $events.

This check looks for function calls that miss required arguments.

Loading history...
Documentation introduced by
$domainEventsCollection is of type object<LIN3S\SharedKerne...\DomainEventCollection>, but the function expects a object<LIN3S\SharedKernel\Event\StreamVersion>.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
68
    }
69
70
    private function buildDomainEventsCollection(array $storedEventRows) : DomainEventCollection
71
    {
72
        $domainEvents = new DomainEventCollection();
73
        foreach ($storedEventRows as $storedEventRow) {
74
            $eventType = $storedEventRow['type'];
75
            $payload = json_decode($storedEventRow['payload'], true);
76
77
            $eventReflection = new \ReflectionClass($eventType);
78
            $domainEvent = $eventReflection->newInstanceWithoutConstructor();
79
            foreach ($eventReflection->getProperties() as $property) {
80
                $property->setAccessible(true);
81
82
                if (isset($payload[$property->name])) {
83
                    $property->setValue($domainEvent, $payload[$property->name]);
84
                    continue;
85
                }
86
                $property->setValue($domainEvent, $storedEventRow[$property]);
87
            }
88
89
            $domainEvents->add($domainEvent);
90
        }
91
92
        return $domainEvents;
93
    }
94
95
    public static function createSchema() : string
96
    {
97
        $tableName = self::TABLE_NAME;
98
99
        return <<<SQL
100
CREATE TABLE IF NOT EXISTS `$tableName` (
101
  `order` BIGINT(20) NOT NULL AUTO_INCREMENT,
102
  `type` VARCHAR(150) COLLATE utf8_bin NOT NULL,
103
  `payload` JSON NOT NULL,
104
  `occurred_on` INT(10) NOT NULL,
105
  `stream_name` VARCHAR(255) NOT NULL,
106
  `stream_version` INT NOT NULL,
107
  PRIMARY KEY (`order`)
108
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
109
SQL;
110
    }
111
112
    public static function removeSchema() : string
113
    {
114
        $tableName = self::TABLE_NAME;
115
116
        return "DROP TABLE `$tableName`";
117
    }
118
}
119