This project does not seem to handle request data directly as such no vulnerable execution paths were found.
include
, or for example
via PHP's auto-loading mechanism.
These results are based on our legacy PHP analysis, consider migrating to our new PHP analysis engine instead. Learn more
1 | <?php |
||
2 | |||
3 | namespace CultuurNet\UDB3\EventSourcing\DBAL; |
||
4 | |||
5 | use Broadway\Domain\DateTime; |
||
6 | use Broadway\Domain\DomainEventStream; |
||
7 | use Broadway\Domain\DomainMessage; |
||
8 | use Broadway\EventSourcing\EventStreamDecoratorInterface; |
||
9 | use Broadway\Serializer\SerializerInterface; |
||
10 | use Doctrine\DBAL\Connection; |
||
11 | use Doctrine\DBAL\Driver\Statement; |
||
12 | use Doctrine\DBAL\DBALException; |
||
13 | |||
14 | class EventStream |
||
15 | { |
||
16 | /** |
||
17 | * @var Connection |
||
18 | */ |
||
19 | protected $connection; |
||
20 | |||
21 | /** |
||
22 | * @var SerializerInterface |
||
23 | */ |
||
24 | protected $payloadSerializer; |
||
25 | |||
26 | /** |
||
27 | * @var SerializerInterface |
||
28 | */ |
||
29 | protected $metadataSerializer; |
||
30 | |||
31 | /** |
||
32 | * @var string |
||
33 | */ |
||
34 | protected $tableName; |
||
35 | |||
36 | /** |
||
37 | * @var int |
||
38 | */ |
||
39 | protected $startId; |
||
40 | |||
41 | /** |
||
42 | * @var int |
||
43 | */ |
||
44 | protected $lastProcessedId; |
||
45 | |||
46 | /** |
||
47 | * @var string[] |
||
48 | */ |
||
49 | protected $cdbids; |
||
50 | |||
51 | /** |
||
52 | * @var EventStreamDecoratorInterface |
||
53 | */ |
||
54 | private $domainEventStreamDecorator; |
||
55 | |||
56 | /** |
||
57 | * @var string |
||
58 | */ |
||
59 | private $aggregateType; |
||
60 | |||
61 | /** |
||
62 | * @param Connection $connection |
||
63 | * @param SerializerInterface $payloadSerializer |
||
64 | * @param SerializerInterface $metadataSerializer |
||
65 | * @param string $tableName |
||
66 | */ |
||
67 | View Code Duplication | public function __construct( |
|
0 ignored issues
–
show
|
|||
68 | Connection $connection, |
||
69 | SerializerInterface $payloadSerializer, |
||
70 | SerializerInterface $metadataSerializer, |
||
71 | $tableName |
||
72 | ) { |
||
73 | $this->connection = $connection; |
||
74 | $this->payloadSerializer = $payloadSerializer; |
||
75 | $this->metadataSerializer = $metadataSerializer; |
||
76 | $this->tableName = $tableName; |
||
77 | $this->startId = 0; |
||
78 | $this->aggregateType = ''; |
||
79 | } |
||
80 | |||
81 | /** |
||
82 | * @param int $startId |
||
83 | * @return EventStream |
||
84 | */ |
||
85 | public function withStartId($startId) |
||
86 | { |
||
87 | if (!is_int($startId)) { |
||
88 | throw new \InvalidArgumentException('StartId should have type int.'); |
||
89 | } |
||
90 | |||
91 | if ($startId <= 0) { |
||
92 | throw new \InvalidArgumentException('StartId should be higher than 0.'); |
||
93 | } |
||
94 | |||
95 | $c = clone $this; |
||
96 | $c->startId = $startId; |
||
97 | return $c; |
||
98 | } |
||
99 | |||
100 | /** |
||
101 | * @param string $aggregateType |
||
102 | * @return EventStream |
||
103 | */ |
||
104 | public function withAggregateType($aggregateType) |
||
105 | { |
||
106 | $c = clone $this; |
||
107 | $c->aggregateType = $aggregateType; |
||
108 | return $c; |
||
109 | } |
||
110 | |||
111 | |||
112 | /** |
||
113 | * @param string[] $cdbids |
||
114 | * @return EventStream |
||
115 | */ |
||
116 | public function withCdbids($cdbids) |
||
117 | { |
||
118 | if (!is_array($cdbids)) { |
||
119 | throw new \InvalidArgumentException('Cdbids should have type array.'); |
||
120 | } |
||
121 | |||
122 | if (empty($cdbids)) { |
||
123 | throw new \InvalidArgumentException('Cdbids can\'t be empty.'); |
||
124 | } |
||
125 | |||
126 | $c = clone $this; |
||
127 | $c->cdbids = $cdbids; |
||
128 | return $c; |
||
129 | } |
||
130 | |||
131 | /** |
||
132 | * @param EventStreamDecoratorInterface $domainEventStreamDecorator |
||
133 | * @return EventStream |
||
134 | */ |
||
135 | public function withDomainEventStreamDecorator(EventStreamDecoratorInterface $domainEventStreamDecorator) |
||
136 | { |
||
137 | $c = clone $this; |
||
138 | $c->domainEventStreamDecorator = $domainEventStreamDecorator; |
||
139 | return $c; |
||
140 | } |
||
141 | |||
142 | public function __invoke() |
||
143 | { |
||
144 | do { |
||
145 | $statement = $this->prepareLoadStatement(); |
||
146 | |||
147 | $events = []; |
||
148 | while ($row = $statement->fetch()) { |
||
0 ignored issues
–
show
The method
Doctrine\DBAL\Driver\ResultStatement::fetch() has been deprecated with message: Use fetchNumeric(), fetchAssociative() or fetchOne() instead.
This method has been deprecated. The supplier of the class has supplied an explanatory message. The explanatory message should give you some clue as to whether and when the method will be removed from the class and what other method or class to use instead. ![]() |
|||
149 | $events[] = $this->deserializeEvent($row); |
||
150 | $this->lastProcessedId = $row['id']; |
||
151 | // Make sure to increment to prevent endless loop. |
||
152 | $this->startId = $row['id'] + 1; |
||
153 | } |
||
154 | |||
155 | /* @var DomainMessage[] $events */ |
||
156 | if (!empty($events)) { |
||
157 | $event = $events[0]; |
||
158 | $domainEventStream = new DomainEventStream($events); |
||
159 | |||
160 | if (!is_null($this->domainEventStreamDecorator)) { |
||
161 | // Because the load statement always returns one row at a |
||
162 | // time, and we always wrap a single domain message in a |
||
163 | // stream as a result, we can simply get the aggregate type |
||
164 | // and aggregate id from the first domain message in the |
||
165 | // stream. |
||
166 | $domainEventStream = $this->domainEventStreamDecorator->decorateForWrite( |
||
167 | get_class($event->getPayload()), |
||
168 | $event->getId(), |
||
169 | $domainEventStream |
||
170 | ); |
||
171 | } |
||
172 | |||
173 | yield $domainEventStream; |
||
174 | } |
||
175 | } while (!empty($events)); |
||
176 | } |
||
177 | |||
178 | /** |
||
179 | * @return int |
||
180 | */ |
||
181 | public function getLastProcessedId() |
||
182 | { |
||
183 | return $this->lastProcessedId; |
||
184 | } |
||
185 | |||
186 | /** |
||
187 | * The load statement can no longer be 'cached' because of using the query |
||
188 | * builder. The query builder requires all parameters to be set before |
||
189 | * using the execute command. The previous solution used the prepare |
||
190 | * statement on the connection, this did not require all parameters to be |
||
191 | * set up front. |
||
192 | * |
||
193 | * @return Statement |
||
194 | * @throws DBALException |
||
195 | */ |
||
196 | protected function prepareLoadStatement() |
||
197 | { |
||
198 | $queryBuilder = $this->connection->createQueryBuilder(); |
||
199 | |||
200 | $queryBuilder->select('id', 'uuid', 'playhead', 'metadata', 'payload', 'recorded_on') |
||
201 | ->from($this->tableName) |
||
202 | ->where('id >= :startid') |
||
203 | ->setParameter('startid', $this->startId) |
||
204 | ->orderBy('id', 'ASC') |
||
205 | ->setMaxResults(1); |
||
206 | |||
207 | if ($this->cdbids) { |
||
0 ignored issues
–
show
The expression
$this->cdbids of type string[] is implicitly converted to a boolean; are you sure this is intended? If so, consider using ! empty($expr) instead to make it clear that you intend to check for an array without elements.
This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent. Consider making the comparison explicit by using ![]() |
|||
208 | $queryBuilder->andWhere('uuid IN (:uuids)') |
||
209 | ->setParameter('uuids', $this->cdbids, Connection::PARAM_STR_ARRAY); |
||
210 | } |
||
211 | |||
212 | if (!empty($this->aggregateType)) { |
||
213 | $queryBuilder->andWhere('aggregate_type = :aggregate_type'); |
||
214 | $queryBuilder->setParameter('aggregate_type', $this->aggregateType); |
||
215 | } |
||
216 | |||
217 | return $queryBuilder->execute(); |
||
0 ignored issues
–
show
The expression
$queryBuilder->execute(); of type Doctrine\DBAL\Driver\ResultStatement|integer adds the type integer to the return on line 217 which is incompatible with the return type documented by CultuurNet\UDB3\EventSou...m::prepareLoadStatement of type Doctrine\DBAL\Driver\Statement .
![]() |
|||
218 | } |
||
219 | |||
220 | View Code Duplication | private function deserializeEvent(array $row): DomainMessage |
|
0 ignored issues
–
show
This method seems to be duplicated in your project.
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation. You can also find more detailed suggestions in the “Code” section of your repository. ![]() |
|||
221 | { |
||
222 | return new DomainMessage( |
||
223 | $row['uuid'], |
||
224 | $row['playhead'], |
||
225 | $this->metadataSerializer->deserialize(json_decode($row['metadata'], true)), |
||
226 | $this->payloadSerializer->deserialize(json_decode($row['payload'], true)), |
||
227 | DateTime::fromString($row['recorded_on']) |
||
228 | ); |
||
229 | } |
||
230 | } |
||
231 |
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.
You can also find more detailed suggestions in the “Code” section of your repository.