Completed
Pull Request — master (#293)
by Luc
04:35
created

EventStream::withStartId()   A

Complexity

Conditions 4
Paths 4

Size

Total Lines 14
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 14
rs 9.2
c 0
b 0
f 0
cc 4
eloc 8
nc 4
nop 1
1
<?php
2
3
namespace CultuurNet\UDB3\EventSourcing\DBAL;
4
5
use Broadway\Domain\DateTime;
6
use Broadway\Domain\DomainEventStream;
7
use Broadway\Domain\DomainMessage;
8
use Broadway\EventSourcing\EventStreamDecoratorInterface;
9
use Broadway\Serializer\SerializerInterface;
10
use Doctrine\DBAL\Connection;
11
use Doctrine\DBAL\Driver\Statement;
12
use Doctrine\DBAL\DBALException;
13
14
class EventStream
15
{
16
    /**
17
     * @var Connection
18
     */
19
    protected $connection;
20
21
    /**
22
     * @var SerializerInterface
23
     */
24
    protected $payloadSerializer;
25
26
    /**
27
     * @var SerializerInterface
28
     */
29
    protected $metadataSerializer;
30
31
    /**
32
     * @var string
33
     */
34
    protected $tableName;
35
36
    /**
37
     * @var Statement
38
     */
39
    protected $loadStatement;
40
41
    /**
42
     * @var int
43
     */
44
    protected $previousId;
45
46
    /**
47
     * @var string
48
     */
49
    protected $cdbid;
50
51
    /**
52
     * @var EventStreamDecoratorInterface
53
     */
54
    private $domainEventStreamDecorator;
55
56
    /**
57
     * @param Connection $connection
58
     * @param SerializerInterface $payloadSerializer
59
     * @param SerializerInterface $metadataSerializer
60
     * @param string $tableName
61
     */
62
    public function __construct(
63
        Connection $connection,
64
        SerializerInterface $payloadSerializer,
65
        SerializerInterface $metadataSerializer,
66
        $tableName
67
    ) {
68
        $this->connection = $connection;
69
        $this->payloadSerializer = $payloadSerializer;
70
        $this->metadataSerializer = $metadataSerializer;
71
        $this->tableName = $tableName;
72
        $this->previousId = 0;
73
    }
74
75
    /**
76
     * @param int $startId
77
     * @return EventStream
78
     */
79
    public function withStartId($startId)
80
    {
81
        if (!is_int($startId)) {
82
            throw new \InvalidArgumentException('StartId should have type int.');
83
        }
84
85
        if (empty($startId)) {
86
            throw new \InvalidArgumentException('StartId can\'t be empty.');
87
        }
88
89
        $c = clone $this;
90
        $c->previousId = $startId > 0 ? $startId - 1 : 0;
91
        return $c;
92
    }
93
94
    /**
95
     * @param string $cdbid
96
     * @return EventStream
97
     */
98
    public function withCdbid($cdbid)
99
    {
100
        if (!is_string($cdbid)) {
101
            throw new \InvalidArgumentException('Cdbid should have type string.');
102
        }
103
104
        if (empty($cdbid)) {
105
            throw new \InvalidArgumentException('Cdbid can\'t be empty.');
106
        }
107
108
        $c = clone $this;
109
        $c->cdbid = $cdbid;
110
        return $c;
111
    }
112
113
    /**
114
     * @param EventStreamDecoratorInterface $domainEventStreamDecorator
115
     * @return EventStream
116
     */
117
    public function withDomainEventStreamDecorator(EventStreamDecoratorInterface $domainEventStreamDecorator)
118
    {
119
        $c = clone $this;
120
        $c->domainEventStreamDecorator = $domainEventStreamDecorator;
121
        return $c;
122
    }
123
124
    public function __invoke()
125
    {
126
        $statement = $this->prepareLoadStatement();
127
128
        do {
129
            $statement->bindValue('previousid', $this->previousId, \PDO::PARAM_INT);
130
            if ($this->cdbid) {
131
                $statement->bindValue('uuid', $this->cdbid, \PDO::PARAM_STR);
132
            }
133
134
            $statement->execute();
135
136
            $events = [];
137
            while ($row = $statement->fetch()) {
138
                $events[] = $this->deserializeEvent($row);
139
                $this->previousId = $row['id'];
140
            }
141
142
            /* @var DomainMessage[] $events */
143
            if (!empty($events)) {
144
                $event = $events[0];
145
                $domainEventStream = new DomainEventStream($events);
146
147
                if (!is_null($this->domainEventStreamDecorator)) {
148
                    // Because the load statement always returns one row at a
149
                    // time, and we always wrap a single domain message in a
150
                    // stream as a result, we can simply get the aggregate type
151
                    // and aggregate id from the first domain message in the
152
                    // stream.
153
                    $domainEventStream = $this->domainEventStreamDecorator->decorateForWrite(
154
                        get_class($event->getPayload()),
155
                        $event->getId(),
156
                        $domainEventStream
157
                    );
158
                }
159
160
                yield $domainEventStream;
161
            }
162
        } while (!empty($events));
163
    }
164
165
    /**
166
     * @return int
167
     */
168
    public function getPreviousId()
169
    {
170
        return $this->previousId;
171
    }
172
173
    /**
174
     * @return Statement
175
     * @throws DBALException
176
     */
177
    protected function prepareLoadStatement()
178
    {
179
        if (null === $this->loadStatement) {
180
            $queryBuilder = $this->connection->createQueryBuilder();
181
182
            $queryBuilder->select('id', 'uuid', 'playhead', 'metadata', 'payload', 'recorded_on')
183
                ->from($this->tableName)
184
                ->where('id > :previousid')
185
                ->orderBy('id', 'ASC')
186
                ->setMaxResults(1);
187
188
            if ($this->cdbid) {
189
                $queryBuilder->andWhere('uuid = :uuid');
190
            }
191
192
            $this->loadStatement = $queryBuilder->execute();
0 ignored issues
show
Documentation Bug introduced by
It seems like $queryBuilder->execute() can also be of type integer. However, the property $loadStatement is declared as type object<Doctrine\DBAL\Driver\Statement>. Maybe add an additional type check?

Our type inference engine has found a suspicous assignment of a value to a property. This check raises an issue when a value that can be of a mixed type is assigned to a property that is type hinted more strictly.

For example, imagine you have a variable $accountId that can either hold an Id object or false (if there is no account id yet). Your code now assigns that value to the id property of an instance of the Account class. This class holds a proper account, so the id value must no longer be false.

Either this assignment is in error or a type check should be added for that assignment.

class Id
{
    public $id;

    public function __construct($id)
    {
        $this->id = $id;
    }

}

class Account
{
    /** @var  Id $id */
    public $id;
}

$account_id = false;

if (starsAreRight()) {
    $account_id = new Id(42);
}

$account = new Account();
if ($account instanceof Id)
{
    $account->id = $account_id;
}
Loading history...
193
        }
194
195
        return $this->loadStatement;
0 ignored issues
show
Bug Compatibility introduced by
The expression $this->loadStatement; of type Doctrine\DBAL\Driver\Statement|integer adds the type integer to the return on line 195 which is incompatible with the return type documented by CultuurNet\UDB3\EventSou...m::prepareLoadStatement of type Doctrine\DBAL\Driver\Statement.
Loading history...
196
    }
197
198
    /**
199
     * @param $row
200
     * @return DomainMessage
201
     */
202
    private function deserializeEvent($row)
203
    {
204
        return new DomainMessage(
205
            $row['uuid'],
206
            $row['playhead'],
207
            $this->metadataSerializer->deserialize(json_decode($row['metadata'], true)),
208
            $this->payloadSerializer->deserialize(json_decode($row['payload'], true)),
209
            DateTime::fromString($row['recorded_on'])
210
        );
211
    }
212
}
213