Completed
Pull Request — master (#161)
by Kristof
05:38
created

EventStream   A

Complexity

Total Complexity 12

Size/Duplication

Total Lines 160
Duplicated Lines 0 %

Coupling/Cohesion

Components 3
Dependencies 7

Importance

Changes 3
Bugs 0 Features 1
Metric Value
wmc 12
c 3
b 0
f 1
lcom 3
cbo 7
dl 0
loc 160
rs 10

6 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 18 2
A withDomainEventStreamDecorator() 0 6 1
B __invoke() 0 36 5
A getPreviousId() 0 4 1
A prepareLoadStatement() 0 15 2
A deserializeEvent() 0 10 1
1
<?php
2
/**
3
 * @file
4
 */
5
6
namespace CultuurNet\UDB3\EventSourcing\DBAL;
7
8
use Broadway\Domain\DateTime;
9
use Broadway\Domain\DomainEventStream;
10
use Broadway\Domain\DomainMessage;
11
use Broadway\EventSourcing\EventStreamDecoratorInterface;
12
use Broadway\Serializer\SerializerInterface;
13
use Doctrine\DBAL\Connection;
14
use Doctrine\DBAL\Driver\Statement;
15
use Doctrine\DBAL\DBALException;
16
use Doctrine\DBAL\Schema\Column;
17
use Doctrine\DBAL\Schema\Index;
18
19
class EventStream
20
{
21
    /**
22
     * @var Connection
23
     */
24
    protected $connection;
25
26
    /**
27
     * @var SerializerInterface
28
     */
29
    protected $payloadSerializer;
30
31
    /**
32
     * @var SerializerInterface
33
     */
34
    protected $metadataSerializer;
35
36
    /**
37
     * @var string
38
     */
39
    protected $tableName;
40
41
    /**
42
     * @var Statement
43
     */
44
    protected $loadStatement;
45
46
    /**
47
     * @var int
48
     */
49
    protected $previousId;
50
51
    /**
52
     * @var string
53
     */
54
    protected $primaryKey;
55
56
    /**
57
     * @var EventStreamDecoratorInterface
58
     */
59
    private $domainEventStreamDecorator;
60
61
    /**
62
     * @param Connection $connection
63
     * @param SerializerInterface $payloadSerializer
64
     * @param SerializerInterface $metadataSerializer
65
     * @param string $tableName
66
     * @param int $startId
67
     * @param string $primaryKey
68
     */
69
    public function __construct(
70
        Connection $connection,
71
        SerializerInterface $payloadSerializer,
72
        SerializerInterface $metadataSerializer,
73
        $tableName,
74
        $startId = 0,
75
        $primaryKey = 'id'
76
    ) {
77
        $this->connection = $connection;
78
        $this->payloadSerializer = $payloadSerializer;
79
        $this->metadataSerializer = $metadataSerializer;
80
        $this->tableName = $tableName;
81
        $this->previousId = $startId > 0 ? $startId - 1 : 0;
82
83
        $this->primaryKey = $primaryKey;
84
85
        $this->domainEventStreamDecorator = null;
86
    }
87
88
    /**
89
     * @param EventStreamDecoratorInterface $domainEventStreamDecorator
90
     * @return EventStream
91
     */
92
    public function withDomainEventStreamDecorator(EventStreamDecoratorInterface $domainEventStreamDecorator)
93
    {
94
        $c = clone $this;
95
        $c->domainEventStreamDecorator = $domainEventStreamDecorator;
96
        return $c;
97
    }
98
99
    public function __invoke()
100
    {
101
        $statement = $this->prepareLoadStatement();
102
103
        do {
104
            $statement->bindValue('previousid', $this->previousId, 'integer');
105
            $statement->execute();
106
107
            $events = [];
108
            while ($row = $statement->fetch()) {
109
                $events[] = $this->deserializeEvent($row);
110
                $this->previousId = $row[$this->primaryKey];
111
            }
112
113
            /* @var DomainMessage[] $events */
114
            if (!empty($events)) {
115
                $event = $events[0];
116
                $domainEventStream = new DomainEventStream($events);
117
118
                if (!is_null($this->domainEventStreamDecorator)) {
119
                    // Because the load statement always returns one row at a
120
                    // time, and we always wrap a single domain message in a
121
                    // stream as a result, we can simply get the aggregate type
122
                    // and aggregate id from the first domain message in the
123
                    // stream.
124
                    $domainEventStream = $this->domainEventStreamDecorator->decorateForWrite(
125
                        get_class($event->getPayload()),
126
                        $event->getId(),
127
                        $domainEventStream
128
                    );
129
                }
130
131
                yield $domainEventStream;
132
            }
133
        } while (!empty($events));
134
    }
135
136
    /**
137
     * @return int
138
     */
139
    public function getPreviousId()
140
    {
141
        return $this->previousId;
142
    }
143
144
    /**
145
     * @return Statement
146
     * @throws DBALException
147
     */
148
    protected function prepareLoadStatement()
149
    {
150
        if (null === $this->loadStatement) {
151
            $id = $this->primaryKey;
152
            $query = "SELECT $id, uuid, playhead, metadata, payload, recorded_on
153
                FROM $this->tableName
154
                WHERE $id > :previousid
155
                ORDER BY $id ASC
156
                LIMIT 1";
157
158
            $this->loadStatement = $this->connection->prepare($query);
159
        }
160
161
        return $this->loadStatement;
162
    }
163
164
    /**
165
     * @param $row
166
     * @return DomainMessage
167
     */
168
    private function deserializeEvent($row)
169
    {
170
        return new DomainMessage(
171
            $row['uuid'],
172
            $row['playhead'],
173
            $this->metadataSerializer->deserialize(json_decode($row['metadata'], true)),
174
            $this->payloadSerializer->deserialize(json_decode($row['payload'], true)),
175
            DateTime::fromString($row['recorded_on'])
176
        );
177
    }
178
}
179