Completed
Pull Request — master (#293)
by
unknown
05:37
created

EventStream   A

Complexity

Total Complexity 17

Size/Duplication

Total Lines 198
Duplicated Lines 0 %

Coupling/Cohesion

Components 3
Dependencies 8

Importance

Changes 1
Bugs 0 Features 0
Metric Value
wmc 17
c 1
b 0
f 0
lcom 3
cbo 8
dl 0
loc 198
rs 10

8 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 12 1
A withCdbid() 0 14 3
A withDomainEventStreamDecorator() 0 6 1
A withStartId() 0 14 3
B __invoke() 0 35 5
A getLastProcessedId() 0 4 1
A prepareLoadStatement() 0 18 2
A deserializeEvent() 0 10 1
1
<?php
2
3
namespace CultuurNet\UDB3\EventSourcing\DBAL;
4
5
use Broadway\Domain\DateTime;
6
use Broadway\Domain\DomainEventStream;
7
use Broadway\Domain\DomainMessage;
8
use Broadway\EventSourcing\EventStreamDecoratorInterface;
9
use Broadway\Serializer\SerializerInterface;
10
use Doctrine\DBAL\Connection;
11
use Doctrine\DBAL\Driver\Statement;
12
use Doctrine\DBAL\DBALException;
13
14
class EventStream
15
{
16
    /**
17
     * @var Connection
18
     */
19
    protected $connection;
20
21
    /**
22
     * @var SerializerInterface
23
     */
24
    protected $payloadSerializer;
25
26
    /**
27
     * @var SerializerInterface
28
     */
29
    protected $metadataSerializer;
30
31
    /**
32
     * @var string
33
     */
34
    protected $tableName;
35
36
    /**
37
     * @var int
38
     */
39
    protected $startId;
40
41
    /**
42
     * @var int
43
     */
44
    protected $lastProcessedId;
45
46
    /**
47
     * @var string
48
     */
49
    protected $cdbid;
50
51
    /**
52
     * @var EventStreamDecoratorInterface
53
     */
54
    private $domainEventStreamDecorator;
55
56
    /**
57
     * @param Connection $connection
58
     * @param SerializerInterface $payloadSerializer
59
     * @param SerializerInterface $metadataSerializer
60
     * @param string $tableName
61
     */
62
    public function __construct(
63
        Connection $connection,
64
        SerializerInterface $payloadSerializer,
65
        SerializerInterface $metadataSerializer,
66
        $tableName
67
    ) {
68
        $this->connection = $connection;
69
        $this->payloadSerializer = $payloadSerializer;
70
        $this->metadataSerializer = $metadataSerializer;
71
        $this->tableName = $tableName;
72
        $this->startId = 0;
73
    }
74
75
    /**
76
     * @param int $startId
77
     * @return EventStream
78
     */
79
    public function withStartId($startId)
80
    {
81
        if (!is_int($startId)) {
82
            throw new \InvalidArgumentException('StartId should have type int.');
83
        }
84
85
        if ($startId <= 0) {
86
            throw new \InvalidArgumentException('StartId should be higher than 0.');
87
        }
88
89
        $c = clone $this;
90
        $c->startId = $startId;
91
        return $c;
92
    }
93
94
    /**
95
     * @param string $cdbid
96
     * @return EventStream
97
     */
98
    public function withCdbid($cdbid)
99
    {
100
        if (!is_string($cdbid)) {
101
            throw new \InvalidArgumentException('Cdbid should have type string.');
102
        }
103
104
        if (empty($cdbid)) {
105
            throw new \InvalidArgumentException('Cdbid can\'t be empty.');
106
        }
107
108
        $c = clone $this;
109
        $c->cdbid = $cdbid;
110
        return $c;
111
    }
112
113
    /**
114
     * @param EventStreamDecoratorInterface $domainEventStreamDecorator
115
     * @return EventStream
116
     */
117
    public function withDomainEventStreamDecorator(EventStreamDecoratorInterface $domainEventStreamDecorator)
118
    {
119
        $c = clone $this;
120
        $c->domainEventStreamDecorator = $domainEventStreamDecorator;
121
        return $c;
122
    }
123
124
    public function __invoke()
125
    {
126
        do {
127
            $statement = $this->prepareLoadStatement();
128
129
            $events = [];
130
            while ($row = $statement->fetch()) {
131
                $events[] = $this->deserializeEvent($row);
132
                $this->lastProcessedId = $row['id'];
133
                // Make sure to increment to prevent endless loop.
134
                $this->startId = $row['id'] + 1;
135
            }
136
137
            /* @var DomainMessage[] $events */
138
            if (!empty($events)) {
139
                $event = $events[0];
140
                $domainEventStream = new DomainEventStream($events);
141
142
                if (!is_null($this->domainEventStreamDecorator)) {
143
                    // Because the load statement always returns one row at a
144
                    // time, and we always wrap a single domain message in a
145
                    // stream as a result, we can simply get the aggregate type
146
                    // and aggregate id from the first domain message in the
147
                    // stream.
148
                    $domainEventStream = $this->domainEventStreamDecorator->decorateForWrite(
149
                        get_class($event->getPayload()),
150
                        $event->getId(),
151
                        $domainEventStream
152
                    );
153
                }
154
155
                yield $domainEventStream;
156
            }
157
        } while (!empty($events));
158
    }
159
160
    /**
161
     * @return int
162
     */
163
    public function getLastProcessedId()
164
    {
165
        return $this->lastProcessedId;
166
    }
167
168
    /**
169
     * The load statement can no longer be 'cached' because of using the query
170
     * builder. The query builder requires all parameters to be set before
171
     * using the execute command. The previous solution used the prepare
172
     * statement on the connection, this did not require all parameters to be
173
     * set up front.
174
     *
175
     * @return Statement
176
     * @throws DBALException
177
     */
178
    protected function prepareLoadStatement()
179
    {
180
        $queryBuilder = $this->connection->createQueryBuilder();
181
182
        $queryBuilder->select('id', 'uuid', 'playhead', 'metadata', 'payload', 'recorded_on')
183
            ->from($this->tableName)
184
            ->where('id >= :startid')
185
            ->setParameter('startid', $this->startId)
186
            ->orderBy('id', 'ASC')
187
            ->setMaxResults(1);
188
189
        if ($this->cdbid) {
190
            $queryBuilder->andWhere('uuid = :uuid')
191
                ->setParameter('uuid', $this->cdbid);
192
        }
193
194
        return $queryBuilder->execute();
0 ignored issues
show
Bug Compatibility introduced by
The expression $queryBuilder->execute(); of type Doctrine\DBAL\Driver\Statement|integer adds the type integer to the return on line 194 which is incompatible with the return type documented by CultuurNet\UDB3\EventSou...m::prepareLoadStatement of type Doctrine\DBAL\Driver\Statement.
Loading history...
195
    }
196
197
    /**
198
     * @param $row
199
     * @return DomainMessage
200
     */
201
    private function deserializeEvent($row)
202
    {
203
        return new DomainMessage(
204
            $row['uuid'],
205
            $row['playhead'],
206
            $this->metadataSerializer->deserialize(json_decode($row['metadata'], true)),
207
            $this->payloadSerializer->deserialize(json_decode($row['payload'], true)),
208
            DateTime::fromString($row['recorded_on'])
209
        );
210
    }
211
}
212