MySqlJsonEventStore::readStreamEvents()   A
last analyzed

Complexity

Conditions 2
Paths 2

Size

Total Lines 32

Duplication

Lines 32
Ratio 100 %

Code Coverage

Tests 21
CRAP Score 2

Importance

Changes 0
Metric Value
dl 32
loc 32
ccs 21
cts 21
cp 1
rs 9.408
c 0
b 0
f 0
cc 2
nc 2
nop 3
crap 2
1
<?php
2
3
namespace DDDominio\EventSourcing\EventStore\Vendor;
4
5
use DDDominio\EventSourcing\Common\EventStream;
6
use DDDominio\EventSourcing\Common\EventStreamInterface;
7
use DDDominio\EventSourcing\EventStore\AbstractEventStore;
8
use DDDominio\EventSourcing\EventStore\ConcurrencyException;
9
use DDDominio\EventSourcing\EventStore\EventStreamDoesNotExistException;
10
use DDDominio\EventSourcing\EventStore\InitializableInterface;
11
use DDDominio\EventSourcing\EventStore\StoredEvent;
12
use DDDominio\EventSourcing\Serialization\SerializerInterface;
13
use DDDominio\EventSourcing\Versioning\EventUpgraderInterface;
14
use DDDominio\EventSourcing\Versioning\Version;
15
16
class MySqlJsonEventStore extends AbstractEventStore implements InitializableInterface
17
{
18
    const STREAMS_TABLE = 'streams';
19
    const EVENTS_TABLE = 'events';
20
21
    /**
22
     * @var \PDO
23
     */
24
    private $connection;
25
26
    /**
27
     * @param \PDO $connection
28
     * @param SerializerInterface $serializer
29
     * @param EventUpgraderInterface $eventUpgrader
30
     */
31 23
    public function __construct(
32
        \PDO $connection,
33
        SerializerInterface $serializer,
34
        EventUpgraderInterface $eventUpgrader
35
    ) {
36 23
        $this->connection = $connection;
37 23
        parent::__construct($serializer, $eventUpgrader);
38 23
    }
39
40
    /**
41
     * @param string $streamId
42
     * @param int $start
43
     * @param int $count
44
     * @return EventStreamInterface
45
     */
46 4 View Code Duplication
    public function readStreamEvents($streamId, $start = 1, $count = null)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
47
    {
48 4
        if (!isset($count)) {
49 3
            $count = PHP_INT_MAX;
50
        }
51 4
        $stmt = $this->connection->prepare(
52
            'SELECT *
53
             FROM events
54
             WHERE stream_id = :streamId
55
             LIMIT :limit
56 4
             OFFSET :offset'
57
        );
58 4
        $stmt->bindValue(':streamId', $streamId);
59 4
        $stmt->bindValue(':offset', (int) $start - 1, \PDO::PARAM_INT);
60 4
        $stmt->bindValue(':limit', $count, \PDO::PARAM_INT);
61 4
        $stmt->execute();
62 4
        $results = $stmt->fetchAll();
63
64 4
        $storedEvents = array_map(function($event) {
65 3
            return new StoredEvent(
66 3
                $event['id'],
67 3
                $event['stream_id'],
68 3
                $event['type'],
69 3
                $event['event'],
70 3
                $event['metadata'],
71 3
                new \DateTimeImmutable($event['occurred_on']),
72 3
                Version::fromString($event['version'])
73
            );
74 4
        }, $results);
75
76 4
        return $this->domainEventStreamFromStoredEvents(new EventStream($storedEvents));
77
    }
78
79
    /**
80
     * @param string $streamId
81
     * @return EventStreamInterface
82
     */
83 7 View Code Duplication
    public function readFullStream($streamId)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
84
    {
85 7
        $stmt = $this->connection->prepare(
86
            'SELECT *
87
             FROM events
88 7
             WHERE stream_id = :streamId'
89
        );
90 7
        $stmt->bindValue(':streamId', $streamId);
91 7
        $stmt->execute();
92 7
        $results = $stmt->fetchAll();
93
94 7
        $storedEvents = array_map(function($event) {
95 6
            return new StoredEvent(
96 6
                $event['id'],
97 6
                $event['stream_id'],
98 6
                $event['type'],
99 6
                $event['event'],
100 6
                $event['metadata'],
101 6
                new \DateTimeImmutable($event['occurred_on']),
102 6
                Version::fromString($event['version'])
103
            );
104 7
        }, $results);
105
106 7
        return $this->domainEventStreamFromStoredEvents(new EventStream($storedEvents));
107
    }
108
109
    /**
110
     * @return EventStreamInterface[]
111
     */
112 1 View Code Duplication
    public function readAllStreams()
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
113
    {
114 1
        $stmt = $this->connection->prepare(
115
            'SELECT *
116 1
             FROM streams'
117
        );
118 1
        $stmt->execute();
119
120 1
        $streams = [];
121 1
        foreach ($stmt->fetchAll() as $result) {
122 1
            $streams[] = $this->readFullStream($result['id']);
123
        }
124 1
        return $streams;
125
    }
126
127
    /**
128
     * @return EventStreamInterface
129
     */
130 1 View Code Duplication
    public function readAllEvents()
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
131
    {
132 1
        $stmt = $this->connection->prepare(
133
            'SELECT *
134 1
             FROM events'
135
        );
136 1
        $stmt->execute();
137 1
        $results = $stmt->fetchAll();
138
139 1
        $storedEvents = array_map(function($event) {
140 1
            return new StoredEvent(
141 1
                $event['id'],
142 1
                $event['stream_id'],
143 1
                $event['type'],
144 1
                $event['event'],
145 1
                $event['metadata'],
146 1
                new \DateTimeImmutable($event['occurred_on']),
147 1
                Version::fromString($event['version'])
148
            );
149 1
        }, $results);
150
151 1
        return $this->domainEventStreamFromStoredEvents(new EventStream($storedEvents));
152
    }
153
154
    /**
155
     * @param string $streamId
156
     * @param StoredEvent[] $storedEvents
157
     * @param int $expectedVersion
158
     * @throws \Exception
159
     */
160 13
    protected function appendStoredEvents($streamId, $storedEvents, $expectedVersion)
161
    {
162 13
        $this->connection->beginTransaction();
163
        try {
164 13 View Code Duplication
            if (!$this->streamExists($streamId)) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
165 13
                $stmt = $this->connection
166 13
                    ->prepare('INSERT INTO streams (id) VALUES (:streamId)');
167 13
                $stmt->bindValue(':streamId', $streamId);
168 13
                $stmt->execute();
169
            }
170 13
            $stmt = $this->connection->prepare(
171
                'INSERT INTO events (stream_id, type, event, metadata, occurred_on, version)
172 13
                 VALUES (:streamId, :type, :event, :metadata, :occurredOn, :version)'
173
            );
174 13 View Code Duplication
            foreach ($storedEvents as $storedEvent) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
175 13
                $stmt->bindValue(':streamId', $streamId);
176 13
                $stmt->bindValue(':type', $storedEvent->type());
177 13
                $stmt->bindValue(':event', $storedEvent->data());
178 13
                $stmt->bindValue(':metadata', $storedEvent->metadata());
179 13
                $stmt->bindValue(':occurredOn', $storedEvent->occurredOn()->format('Y-m-d H:i:s'));
180 13
                $stmt->bindValue(':version', $storedEvent->version());
181 13
                $stmt->execute();
182
            }
183
184 13 View Code Duplication
            if ($expectedVersion !== self::EXPECTED_VERSION_ANY) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
185 12
                $streamFinalVersion = $this->streamVersion($streamId);
186 12
                if (count($storedEvents) !== $streamFinalVersion - $expectedVersion) {
187 1
                    throw ConcurrencyException::fromVersions(
188 1
                        $this->streamVersion($streamId),
189
                        $expectedVersion
190
                    );
191
                }
192
            }
193 12
            $this->connection->commit();
194 1
        } catch (\Exception $e) {
195 1
            $this->connection->rollBack();
196 1
            throw $e;
197
        }
198 12
    }
199
200
    /**
201
     * @param string $streamId
202
     * @return bool
203
     */
204 15 View Code Duplication
    protected function streamExists($streamId)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
205
    {
206 15
        $stmt = $this->connection
207 15
            ->prepare('SELECT COUNT(*) FROM streams WHERE id = :streamId');
208 15
        $stmt->bindValue(':streamId', $streamId);
209 15
        $stmt->execute();
210 15
        return boolval($stmt->fetchColumn());
211
    }
212
    /**
213
     * @param string $streamId
214
     * @return int
215
     */
216 11 View Code Duplication
    protected function streamVersion($streamId)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
217
    {
218 11
        $stmt = $this->connection
219 11
            ->prepare('SELECT COUNT(*) FROM events WHERE stream_id = :streamId');
220 11
        $stmt->bindValue(':streamId', $streamId);
221 11
        $stmt->execute();
222 11
        return intval($stmt->fetchColumn());
223
    }
224
225
    /**
226
     * @param string $type
227
     * @param Version $version
228
     * @return EventStreamInterface
229
     */
230 1 View Code Duplication
    protected function readStoredEventsOfTypeAndVersion($type, $version)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
231
    {
232 1
        $stmt = $this->connection->prepare(
233
            'SELECT *
234
             FROM events
235
             WHERE type = :type
236 1
             AND version = :version'
237
        );
238 1
        $stmt->bindValue(':type', $type);
239 1
        $stmt->bindValue(':version', $version);
240 1
        $stmt->execute();
241 1
        $results = $stmt->fetchAll();
242
243 1
        $storedEvents = array_map(function($result) {
244 1
            return new StoredEvent(
245 1
                $result['id'],
246 1
                $result['stream_id'],
247 1
                $result['type'],
248 1
                $result['event'],
249 1
                $result['metadata'],
250 1
                new \DateTimeImmutable($result['occurred_on']),
251 1
                Version::fromString($result['version'])
252
            );
253 1
        }, $results);
254
255 1
        return new EventStream($storedEvents);
256
    }
257
258 23
    public function initialize()
259
    {
260
        try {
261 23
            $this->connection->beginTransaction();
262
263 23
            $this->connection->exec(
264 23
                'CREATE TABLE `'.self::STREAMS_TABLE.'` (
265
                    `id` varchar(255) NOT NULL,
266
                    PRIMARY KEY (`id`)
267 23
                )'
268
            );
269
270 23
            $this->connection->exec(
271 23
                'CREATE TABLE `'.self::EVENTS_TABLE.'` (
272
                    `id` int(11) NOT NULL AUTO_INCREMENT,
273
                    `stream_id` varchar(255) NOT NULL,
274
                    `type` varchar(255) NOT NULL,
275
                    `event` json NOT NULL,
276
                    `metadata` json NOT NULL,
277
                    `occurred_on` datetime NOT NULL,
278
                    `version` varchar(255) NOT NULL,
279
                    PRIMARY KEY (`id`),
280
                    KEY `stream_id` (`stream_id`),
281
                    CONSTRAINT `events_ibfk_1` FOREIGN KEY (`stream_id`) REFERENCES `streams` (`id`)
282 23
                )'
283
            );
284
285 23
            $this->connection->commit();
286 1
        } catch (\Exception $e) {
287 1
            $this->connection->rollBack();
288 1
            throw $e;
289
        }
290 23
    }
291
292
    /**
293
     * @return bool
294
     */
295 3 View Code Duplication
    public function initialized()
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
296
    {
297
        try {
298 3
            $result = $this->connection->query('SELECT 1 FROM `'.self::STREAMS_TABLE.'` LIMIT 1');
299 1
        } catch (\Exception $e) {
300 1
            return false;
301
        }
302 2
        return $result !== false;
303
    }
304
305
    /**
306
     * @param string $streamId
307
     * @param \DateTimeImmutable $datetime
308
     * @return int
309
     * @throws EventStreamDoesNotExistException
310
     */
311 3 View Code Duplication
    public function getStreamVersionAt($streamId, \DateTimeImmutable $datetime)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
312
    {
313 3
        if (!$this->streamExists($streamId)) {
314 1
            throw EventStreamDoesNotExistException::fromStreamId($streamId);
315
        }
316 2
        $stmt = $this->connection->prepare(
317
            'SELECT COUNT(*)
318
             FROM events
319
             WHERE stream_id = :streamId
320 2
             AND occurred_on <= :occurred_on'
321
        );
322 2
        $stmt->bindValue(':streamId', $streamId);
323 2
        $stmt->bindValue(':occurred_on', $datetime->format('Y-m-d H:i:s'));
324 2
        $stmt->execute();
325 2
        return intval($stmt->fetchColumn());
326
    }
327
}
328