Passed
Push — master ( d20dcf...a35003 )
by Daniel
03:16
created

MySqlJsonEventStore::readAllEvents()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 23
Code Lines 16

Duplication

Lines 23
Ratio 100 %

Code Coverage

Tests 15
CRAP Score 1

Importance

Changes 0
Metric Value
dl 23
loc 23
ccs 15
cts 15
cp 1
rs 9.0856
c 0
b 0
f 0
cc 1
eloc 16
nc 1
nop 0
crap 1
1
<?php
2
3
namespace DDDominio\EventSourcing\EventStore\Vendor;
4
5
use DDDominio\EventSourcing\Common\EventStream;
6
use DDDominio\EventSourcing\Common\EventStreamInterface;
7
use DDDominio\EventSourcing\EventStore\AbstractEventStore;
8
use DDDominio\EventSourcing\EventStore\ConcurrencyException;
9
use DDDominio\EventSourcing\EventStore\EventStreamDoesNotExistException;
10
use DDDominio\EventSourcing\EventStore\InitializableInterface;
11
use DDDominio\EventSourcing\EventStore\StoredEvent;
12
use DDDominio\EventSourcing\Serialization\SerializerInterface;
13
use DDDominio\EventSourcing\Versioning\EventUpgrader;
14
use DDDominio\EventSourcing\Versioning\Version;
15
16
class MySqlJsonEventStore extends AbstractEventStore implements InitializableInterface
17
{
18
    const MAX_UNSIGNED_BIG_INT = 9223372036854775807;
19
    const STREAMS_TABLE = 'streams';
20
    const EVENTS_TABLE = 'events';
21
22
    /**
23
     * @var \PDO
24
     */
25
    private $connection;
26
27
    /**
28
     * @param \PDO $connection
29
     * @param SerializerInterface $serializer
30
     * @param EventUpgrader $eventUpgrader
31
     */
32 19
    public function __construct(
33
        \PDO $connection,
34
        SerializerInterface $serializer,
35
        $eventUpgrader
36
    ) {
37 19
        $this->connection = $connection;
38 19
        parent::__construct($serializer, $eventUpgrader);
39 19
    }
40
41
    /**
42
     * @param string $streamId
43
     * @param int $start
44
     * @param int $count
45
     * @return EventStreamInterface
46
     */
47 4 View Code Duplication
    public function readStreamEvents($streamId, $start = 1, $count = null)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
48
    {
49 4
        if (!isset($count)) {
50 3
            $count = self::MAX_UNSIGNED_BIG_INT;
51
        }
52 4
        $stmt = $this->connection->prepare(
53
            'SELECT *
54
             FROM events
55
             WHERE stream_id = :streamId
56
             LIMIT :limit
57 4
             OFFSET :offset'
58
        );
59 4
        $stmt->bindValue(':streamId', $streamId);
60 4
        $stmt->bindValue(':offset', (int) $start - 1, \PDO::PARAM_INT);
61 4
        $stmt->bindValue(':limit', $count, \PDO::PARAM_INT);
62 4
        $stmt->execute();
63 4
        $results = $stmt->fetchAll();
64
65
        $storedEvents = array_map(function($event) {
66 3
            return new StoredEvent(
67 3
                $event['id'],
68 3
                $event['stream_id'],
69 3
                $event['type'],
70 3
                $event['event'],
71 3
                $event['metadata'],
72 3
                new \DateTimeImmutable($event['occurred_on']),
73 3
                Version::fromString($event['version'])
74
            );
75 4
        }, $results);
76
77 4
        return $this->domainEventStreamFromStoredEvents($storedEvents);
78
    }
79
80
    /**
81
     * @param string $streamId
82
     * @return EventStreamInterface
83
     */
84 6 View Code Duplication
    public function readFullStream($streamId)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
85
    {
86 6
        $stmt = $this->connection->prepare(
87
            'SELECT *
88
             FROM events
89 6
             WHERE stream_id = :streamId'
90
        );
91 6
        $stmt->bindValue(':streamId', $streamId);
92 6
        $stmt->execute();
93 6
        $results = $stmt->fetchAll();
94
95
        $storedEvents = array_map(function($event) {
96 5
            return new StoredEvent(
97 5
                $event['id'],
98 5
                $event['stream_id'],
99 5
                $event['type'],
100 5
                $event['event'],
101 5
                $event['metadata'],
102 5
                new \DateTimeImmutable($event['occurred_on']),
103 5
                Version::fromString($event['version'])
104
            );
105 6
        }, $results);
106
107 6
        return $this->domainEventStreamFromStoredEvents($storedEvents);
108
    }
109
110
    /**
111
     * @return EventStreamInterface[]
112
     */
113
    public function readAllStreams()
114
    {
115
        // TODO: Implement readAllStreams() method.
116
    }
117
118
    /**
119
     * @return EventStreamInterface
120
     */
121 1 View Code Duplication
    public function readAllEvents()
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
122
    {
123 1
        $stmt = $this->connection->prepare(
124
            'SELECT *
125 1
             FROM events'
126
        );
127 1
        $stmt->execute();
128 1
        $results = $stmt->fetchAll();
129
130
        $storedEvents = array_map(function($event) {
131 1
            return new StoredEvent(
132 1
                $event['id'],
133 1
                $event['stream_id'],
134 1
                $event['type'],
135 1
                $event['event'],
136 1
                $event['metadata'],
137 1
                new \DateTimeImmutable($event['occurred_on']),
138 1
                Version::fromString($event['version'])
139
            );
140 1
        }, $results);
141
142 1
        return $this->domainEventStreamFromStoredEvents($storedEvents);
143
    }
144
145
    /**
146
     * @param string $streamId
147
     * @param StoredEvent[] $storedEvents
148
     * @param int $expectedVersion
149
     * @throws \Exception
150
     */
151 10
    protected function appendStoredEvents($streamId, $storedEvents, $expectedVersion)
152
    {
153 10
        $this->connection->beginTransaction();
154
        try {
155 10 View Code Duplication
            if (!$this->streamExists($streamId)) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
156 10
                $stmt = $this->connection
157 10
                    ->prepare('INSERT INTO streams (id) VALUES (:streamId)');
158 10
                $stmt->bindValue(':streamId', $streamId);
159 10
                $stmt->execute();
160
            }
161 10 View Code Duplication
            foreach ($storedEvents as $storedEvent) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
162 10
                $stmt = $this->connection->prepare(
163
                    'INSERT INTO events (stream_id, type, event, metadata, occurred_on, version)
164 10
                 VALUES (:streamId, :type, :event, :metadata, :occurredOn, :version)'
165
                );
166 10
                $stmt->bindValue(':streamId', $streamId);
167 10
                $stmt->bindValue(':type', $storedEvent->type());
168 10
                $stmt->bindValue(':event', $storedEvent->data());
169 10
                $stmt->bindValue(':metadata', $storedEvent->metadata());
170 10
                $stmt->bindValue(':occurredOn', $storedEvent->occurredOn()->format('Y-m-d H:i:s'));
171 10
                $stmt->bindValue(':version', $storedEvent->version());
172 10
                $stmt->execute();
173
            }
174 10
            $streamFinalVersion = $this->streamVersion($streamId);
175 10
            if (count($storedEvents) !== $streamFinalVersion - $expectedVersion) {
176
                throw ConcurrencyException::fromVersions(
177
                    $this->streamVersion($streamId),
178
                    $expectedVersion
179
                );
180
            }
181 10
            $this->connection->commit();
182
        } catch (\Exception $e) {
183
            $this->connection->rollBack();
184
            throw $e;
185
        }
186 10
    }
187
188
    /**
189
     * @param string $streamId
190
     * @return bool
191
     */
192 12 View Code Duplication
    protected function streamExists($streamId)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
193
    {
194 12
        $stmt = $this->connection
195 12
            ->prepare('SELECT COUNT(*) FROM streams WHERE id = :streamId');
196 12
        $stmt->bindValue(':streamId', $streamId);
197 12
        $stmt->execute();
198 12
        return boolval($stmt->fetchColumn());
199
    }
200
    /**
201
     * @param string $streamId
202
     * @return int
203
     */
204 10 View Code Duplication
    protected function streamVersion($streamId)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
205
    {
206 10
        $stmt = $this->connection
207 10
            ->prepare('SELECT COUNT(*) FROM events WHERE stream_id = :streamId');
208 10
        $stmt->bindValue(':streamId', $streamId);
209 10
        $stmt->execute();
210 10
        return intval($stmt->fetchColumn());
211
    }
212
213
    /**
214
     * @param string $type
215
     * @param Version $version
216
     * @return EventStreamInterface
217
     */
218 1 View Code Duplication
    protected function readStoredEventsOfTypeAndVersion($type, $version)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
219
    {
220 1
        $stmt = $this->connection->prepare(
221
            'SELECT *
222
             FROM events
223
             WHERE type = :type
224 1
             AND version = :version'
225
        );
226 1
        $stmt->bindValue(':type', $type);
227 1
        $stmt->bindValue(':version', $version);
228 1
        $stmt->execute();
229 1
        $results = $stmt->fetchAll();
230
231 1
        $storedEvents = array_map(function($result) {
232 1
            return new StoredEvent(
233 1
                $result['id'],
234 1
                $result['stream_id'],
235 1
                $result['type'],
236 1
                $result['event'],
237 1
                $result['metadata'],
238 1
                new \DateTimeImmutable($result['occurred_on']),
239 1
                Version::fromString($result['version'])
240
            );
241 1
        }, $results);
242
243 1
        return new EventStream($storedEvents);
244
    }
245
246 19
    public function initialize()
247
    {
248
        try {
249 19
            $this->connection->beginTransaction();
250
251 19
            $this->connection->exec(
252 19
                'CREATE TABLE `'.self::STREAMS_TABLE.'` (
253
                    `id` varchar(255) NOT NULL,
254
                    PRIMARY KEY (`id`)
255 19
                )'
256
            );
257
258 19
            $this->connection->exec(
259 19
                'CREATE TABLE `'.self::EVENTS_TABLE.'` (
260
                    `id` int(11) NOT NULL AUTO_INCREMENT,
261
                    `stream_id` varchar(255) NOT NULL,
262
                    `type` varchar(255) NOT NULL,
263
                    `event` json NOT NULL,
264
                    `metadata` json NOT NULL,
265
                    `occurred_on` datetime NOT NULL,
266
                    `version` varchar(255) NOT NULL,
267
                    PRIMARY KEY (`id`),
268
                    KEY `stream_id` (`stream_id`),
269
                    CONSTRAINT `events_ibfk_1` FOREIGN KEY (`stream_id`) REFERENCES `streams` (`id`)
270 19
                )'
271
            );
272
273 19
            $this->connection->commit();
274
        } catch (\Exception $e) {
275
            $this->connection->rollBack();
276
            throw $e;
277
        }
278 19
    }
279
280
    /**
281
     * @return bool
282
     */
283 3 View Code Duplication
    public function initialized()
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
284
    {
285
        try {
286 3
            $result = $this->connection->query('SELECT 1 FROM `'.self::STREAMS_TABLE.'` LIMIT 1');
287 1
        } catch (\Exception $e) {
288 1
            return false;
289
        }
290 2
        return $result !== false;
291
    }
292
293
    /**
294
     * @param string $streamId
295
     * @param \DateTimeImmutable $datetime
296
     * @return int
297
     * @throws EventStreamDoesNotExistException
298
     */
299 3 View Code Duplication
    public function getStreamVersionAt($streamId, \DateTimeImmutable $datetime)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
300
    {
301 3
        if (!$this->streamExists($streamId)) {
302 1
            throw EventStreamDoesNotExistException::fromStreamId($streamId);
303
        }
304 2
        $stmt = $this->connection->prepare(
305
            'SELECT COUNT(*)
306
             FROM events
307
             WHERE stream_id = :streamId
308 2
             AND occurred_on <= :occurred_on'
309
        );
310 2
        $stmt->bindValue(':streamId', $streamId);
311 2
        $stmt->bindValue(':occurred_on', $datetime->format('Y-m-d H:i:s'));
312 2
        $stmt->execute();
313 2
        return intval($stmt->fetchColumn());
314
    }
315
}
316