Completed
Push — master ( f72a69...d1bcf2 )
by Kristof
11s
created

EventStream   A

Complexity

Total Complexity 19

Size/Duplication

Total Lines 221
Duplicated Lines 10.41 %

Coupling/Cohesion

Components 3
Dependencies 8

Importance

Changes 0
Metric Value
wmc 19
lcom 3
cbo 8
dl 23
loc 221
rs 10
c 0
b 0
f 0

9 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 13 13 1
A withStartId() 0 14 3
A withAggregateType() 0 6 1
A withCdbids() 0 14 3
A withDomainEventStreamDecorator() 0 6 1
A __invoke() 0 35 5
A getLastProcessedId() 0 4 1
A prepareLoadStatement() 0 23 3
A deserializeEvent() 10 10 1

How to fix   Duplicated Code   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

1
<?php
2
3
namespace CultuurNet\UDB3\EventSourcing\DBAL;
4
5
use Broadway\Domain\DateTime;
6
use Broadway\Domain\DomainEventStream;
7
use Broadway\Domain\DomainMessage;
8
use Broadway\EventSourcing\EventStreamDecoratorInterface;
9
use Broadway\Serializer\SerializerInterface;
10
use Doctrine\DBAL\Connection;
11
use Doctrine\DBAL\Driver\Statement;
12
use Doctrine\DBAL\DBALException;
13
14
class EventStream
15
{
16
    /**
17
     * @var Connection
18
     */
19
    protected $connection;
20
21
    /**
22
     * @var SerializerInterface
23
     */
24
    protected $payloadSerializer;
25
26
    /**
27
     * @var SerializerInterface
28
     */
29
    protected $metadataSerializer;
30
31
    /**
32
     * @var string
33
     */
34
    protected $tableName;
35
36
    /**
37
     * @var int
38
     */
39
    protected $startId;
40
41
    /**
42
     * @var int
43
     */
44
    protected $lastProcessedId;
45
46
    /**
47
     * @var string[]
48
     */
49
    protected $cdbids;
50
51
    /**
52
     * @var EventStreamDecoratorInterface
53
     */
54
    private $domainEventStreamDecorator;
55
56
    /**
57
     * @var string
58
     */
59
    private $aggregateType;
60
61
    /**
62
     * @param Connection $connection
63
     * @param SerializerInterface $payloadSerializer
64
     * @param SerializerInterface $metadataSerializer
65
     * @param string $tableName
66
     */
67 View Code Duplication
    public function __construct(
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
68
        Connection $connection,
69
        SerializerInterface $payloadSerializer,
70
        SerializerInterface $metadataSerializer,
71
        $tableName
72
    ) {
73
        $this->connection = $connection;
74
        $this->payloadSerializer = $payloadSerializer;
75
        $this->metadataSerializer = $metadataSerializer;
76
        $this->tableName = $tableName;
77
        $this->startId = 0;
78
        $this->aggregateType = '';
79
    }
80
81
    /**
82
     * @param int $startId
83
     * @return EventStream
84
     */
85
    public function withStartId($startId)
86
    {
87
        if (!is_int($startId)) {
88
            throw new \InvalidArgumentException('StartId should have type int.');
89
        }
90
91
        if ($startId <= 0) {
92
            throw new \InvalidArgumentException('StartId should be higher than 0.');
93
        }
94
95
        $c = clone $this;
96
        $c->startId = $startId;
97
        return $c;
98
    }
99
100
    /**
101
     * @param string $aggregateType
102
     * @return EventStream
103
     */
104
    public function withAggregateType($aggregateType)
105
    {
106
        $c = clone $this;
107
        $c->aggregateType = $aggregateType;
108
        return $c;
109
    }
110
111
112
    /**
113
     * @param string[] $cdbids
114
     * @return EventStream
115
     */
116
    public function withCdbids($cdbids)
117
    {
118
        if (!is_array($cdbids)) {
119
            throw new \InvalidArgumentException('Cdbids should have type array.');
120
        }
121
122
        if (empty($cdbids)) {
123
            throw new \InvalidArgumentException('Cdbids can\'t be empty.');
124
        }
125
126
        $c = clone $this;
127
        $c->cdbids = $cdbids;
128
        return $c;
129
    }
130
131
    /**
132
     * @param EventStreamDecoratorInterface $domainEventStreamDecorator
133
     * @return EventStream
134
     */
135
    public function withDomainEventStreamDecorator(EventStreamDecoratorInterface $domainEventStreamDecorator)
136
    {
137
        $c = clone $this;
138
        $c->domainEventStreamDecorator = $domainEventStreamDecorator;
139
        return $c;
140
    }
141
142
    public function __invoke()
143
    {
144
        do {
145
            $statement = $this->prepareLoadStatement();
146
147
            $events = [];
148
            while ($row = $statement->fetch()) {
149
                $events[] = $this->deserializeEvent($row);
150
                $this->lastProcessedId = $row['id'];
151
                // Make sure to increment to prevent endless loop.
152
                $this->startId = $row['id'] + 1;
153
            }
154
155
            /* @var DomainMessage[] $events */
156
            if (!empty($events)) {
157
                $event = $events[0];
158
                $domainEventStream = new DomainEventStream($events);
159
160
                if (!is_null($this->domainEventStreamDecorator)) {
161
                    // Because the load statement always returns one row at a
162
                    // time, and we always wrap a single domain message in a
163
                    // stream as a result, we can simply get the aggregate type
164
                    // and aggregate id from the first domain message in the
165
                    // stream.
166
                    $domainEventStream = $this->domainEventStreamDecorator->decorateForWrite(
167
                        get_class($event->getPayload()),
168
                        $event->getId(),
169
                        $domainEventStream
170
                    );
171
                }
172
173
                yield $domainEventStream;
174
            }
175
        } while (!empty($events));
176
    }
177
178
    /**
179
     * @return int
180
     */
181
    public function getLastProcessedId()
182
    {
183
        return $this->lastProcessedId;
184
    }
185
186
    /**
187
     * The load statement can no longer be 'cached' because of using the query
188
     * builder. The query builder requires all parameters to be set before
189
     * using the execute command. The previous solution used the prepare
190
     * statement on the connection, this did not require all parameters to be
191
     * set up front.
192
     *
193
     * @return Statement
194
     * @throws DBALException
195
     */
196
    protected function prepareLoadStatement()
197
    {
198
        $queryBuilder = $this->connection->createQueryBuilder();
199
200
        $queryBuilder->select('id', 'uuid', 'playhead', 'metadata', 'payload', 'recorded_on')
201
            ->from($this->tableName)
202
            ->where('id >= :startid')
203
            ->setParameter('startid', $this->startId)
204
            ->orderBy('id', 'ASC')
205
            ->setMaxResults(1);
206
207
        if ($this->cdbids) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $this->cdbids of type string[] is implicitly converted to a boolean; are you sure this is intended? If so, consider using ! empty($expr) instead to make it clear that you intend to check for an array without elements.

This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.

Consider making the comparison explicit by using empty(..) or ! empty(...) instead.

Loading history...
208
            $queryBuilder->andWhere('uuid IN (:uuids)')
209
                ->setParameter('uuids', $this->cdbids, Connection::PARAM_STR_ARRAY);
210
        }
211
212
        if (!empty($this->aggregateType)) {
213
            $queryBuilder->andWhere('aggregate_type = :aggregate_type');
214
            $queryBuilder->setParameter('aggregate_type', $this->aggregateType);
215
        }
216
217
        return $queryBuilder->execute();
0 ignored issues
show
Bug Compatibility introduced by
The expression $queryBuilder->execute(); of type Doctrine\DBAL\Driver\ResultStatement|integer adds the type integer to the return on line 217 which is incompatible with the return type documented by CultuurNet\UDB3\EventSou...m::prepareLoadStatement of type Doctrine\DBAL\Driver\Statement.
Loading history...
218
    }
219
220
    /**
221
     * @param $row
222
     * @return DomainMessage
223
     */
224 View Code Duplication
    private function deserializeEvent($row)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
225
    {
226
        return new DomainMessage(
227
            $row['uuid'],
228
            $row['playhead'],
229
            $this->metadataSerializer->deserialize(json_decode($row['metadata'], true)),
230
            $this->payloadSerializer->deserialize(json_decode($row['payload'], true)),
231
            DateTime::fromString($row['recorded_on'])
232
        );
233
    }
234
}
235