Completed
Push — master ( a35003...7b49c3 )
by Daniel
03:21
created

MySqlJsonEventStore::readAllStreams()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 14
Code Lines 8

Duplication

Lines 14
Ratio 100 %

Code Coverage

Tests 8
CRAP Score 2

Importance

Changes 0
Metric Value
dl 14
loc 14
ccs 8
cts 8
cp 1
rs 9.4285
c 0
b 0
f 0
cc 2
eloc 8
nc 2
nop 0
crap 2
1
<?php
2
3
namespace DDDominio\EventSourcing\EventStore\Vendor;
4
5
use DDDominio\EventSourcing\Common\EventStream;
6
use DDDominio\EventSourcing\Common\EventStreamInterface;
7
use DDDominio\EventSourcing\EventStore\AbstractEventStore;
8
use DDDominio\EventSourcing\EventStore\ConcurrencyException;
9
use DDDominio\EventSourcing\EventStore\EventStreamDoesNotExistException;
10
use DDDominio\EventSourcing\EventStore\InitializableInterface;
11
use DDDominio\EventSourcing\EventStore\StoredEvent;
12
use DDDominio\EventSourcing\Serialization\SerializerInterface;
13
use DDDominio\EventSourcing\Versioning\EventUpgrader;
14
use DDDominio\EventSourcing\Versioning\Version;
15
16
class MySqlJsonEventStore extends AbstractEventStore implements InitializableInterface
17
{
18
    const MAX_UNSIGNED_BIG_INT = 9223372036854775807;
19
    const STREAMS_TABLE = 'streams';
20
    const EVENTS_TABLE = 'events';
21
22
    /**
23
     * @var \PDO
24
     */
25
    private $connection;
26
27
    /**
28
     * @param \PDO $connection
29
     * @param SerializerInterface $serializer
30
     * @param EventUpgrader $eventUpgrader
31
     */
32 20
    public function __construct(
33
        \PDO $connection,
34
        SerializerInterface $serializer,
35
        $eventUpgrader
36
    ) {
37 20
        $this->connection = $connection;
38 20
        parent::__construct($serializer, $eventUpgrader);
39 20
    }
40
41
    /**
42
     * @param string $streamId
43
     * @param int $start
44
     * @param int $count
45
     * @return EventStreamInterface
46
     */
47 4 View Code Duplication
    public function readStreamEvents($streamId, $start = 1, $count = null)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
48
    {
49 4
        if (!isset($count)) {
50 3
            $count = self::MAX_UNSIGNED_BIG_INT;
51
        }
52 4
        $stmt = $this->connection->prepare(
53
            'SELECT *
54
             FROM events
55
             WHERE stream_id = :streamId
56
             LIMIT :limit
57 4
             OFFSET :offset'
58
        );
59 4
        $stmt->bindValue(':streamId', $streamId);
60 4
        $stmt->bindValue(':offset', (int) $start - 1, \PDO::PARAM_INT);
61 4
        $stmt->bindValue(':limit', $count, \PDO::PARAM_INT);
62 4
        $stmt->execute();
63 4
        $results = $stmt->fetchAll();
64
65
        $storedEvents = array_map(function($event) {
66 3
            return new StoredEvent(
67 3
                $event['id'],
68 3
                $event['stream_id'],
69 3
                $event['type'],
70 3
                $event['event'],
71 3
                $event['metadata'],
72 3
                new \DateTimeImmutable($event['occurred_on']),
73 3
                Version::fromString($event['version'])
74
            );
75 4
        }, $results);
76
77 4
        return $this->domainEventStreamFromStoredEvents($storedEvents);
78
    }
79
80
    /**
81
     * @param string $streamId
82
     * @return EventStreamInterface
83
     */
84 7 View Code Duplication
    public function readFullStream($streamId)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
85
    {
86 7
        $stmt = $this->connection->prepare(
87
            'SELECT *
88
             FROM events
89 7
             WHERE stream_id = :streamId'
90
        );
91 7
        $stmt->bindValue(':streamId', $streamId);
92 7
        $stmt->execute();
93 7
        $results = $stmt->fetchAll();
94
95
        $storedEvents = array_map(function($event) {
96 6
            return new StoredEvent(
97 6
                $event['id'],
98 6
                $event['stream_id'],
99 6
                $event['type'],
100 6
                $event['event'],
101 6
                $event['metadata'],
102 6
                new \DateTimeImmutable($event['occurred_on']),
103 6
                Version::fromString($event['version'])
104
            );
105 7
        }, $results);
106
107 7
        return $this->domainEventStreamFromStoredEvents($storedEvents);
108
    }
109
110
    /**
111
     * @return EventStreamInterface[]
112
     */
113 1 View Code Duplication
    public function readAllStreams()
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
114
    {
115 1
        $stmt = $this->connection->prepare(
116
            'SELECT *
117 1
             FROM streams'
118
        );
119 1
        $stmt->execute();
120
121 1
        $streams = [];
122 1
        foreach ($stmt->fetchAll() as $result) {
123 1
            $streams[] = $this->readFullStream($result['id']);
124
        }
125 1
        return $streams;
126
    }
127
128
    /**
129
     * @return EventStreamInterface
130
     */
131 1 View Code Duplication
    public function readAllEvents()
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
132
    {
133 1
        $stmt = $this->connection->prepare(
134
            'SELECT *
135 1
             FROM events'
136
        );
137 1
        $stmt->execute();
138 1
        $results = $stmt->fetchAll();
139
140
        $storedEvents = array_map(function($event) {
141 1
            return new StoredEvent(
142 1
                $event['id'],
143 1
                $event['stream_id'],
144 1
                $event['type'],
145 1
                $event['event'],
146 1
                $event['metadata'],
147 1
                new \DateTimeImmutable($event['occurred_on']),
148 1
                Version::fromString($event['version'])
149
            );
150 1
        }, $results);
151
152 1
        return $this->domainEventStreamFromStoredEvents($storedEvents);
153
    }
154
155
    /**
156
     * @param string $streamId
157
     * @param StoredEvent[] $storedEvents
158
     * @param int $expectedVersion
159
     * @throws \Exception
160
     */
161 11
    protected function appendStoredEvents($streamId, $storedEvents, $expectedVersion)
162
    {
163 11
        $this->connection->beginTransaction();
164
        try {
165 11 View Code Duplication
            if (!$this->streamExists($streamId)) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
166 11
                $stmt = $this->connection
167 11
                    ->prepare('INSERT INTO streams (id) VALUES (:streamId)');
168 11
                $stmt->bindValue(':streamId', $streamId);
169 11
                $stmt->execute();
170
            }
171 11 View Code Duplication
            foreach ($storedEvents as $storedEvent) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
172 11
                $stmt = $this->connection->prepare(
173
                    'INSERT INTO events (stream_id, type, event, metadata, occurred_on, version)
174 11
                 VALUES (:streamId, :type, :event, :metadata, :occurredOn, :version)'
175
                );
176 11
                $stmt->bindValue(':streamId', $streamId);
177 11
                $stmt->bindValue(':type', $storedEvent->type());
178 11
                $stmt->bindValue(':event', $storedEvent->data());
179 11
                $stmt->bindValue(':metadata', $storedEvent->metadata());
180 11
                $stmt->bindValue(':occurredOn', $storedEvent->occurredOn()->format('Y-m-d H:i:s'));
181 11
                $stmt->bindValue(':version', $storedEvent->version());
182 11
                $stmt->execute();
183
            }
184 11
            $streamFinalVersion = $this->streamVersion($streamId);
185 11
            if (count($storedEvents) !== $streamFinalVersion - $expectedVersion) {
186
                throw ConcurrencyException::fromVersions(
187
                    $this->streamVersion($streamId),
188
                    $expectedVersion
189
                );
190
            }
191 11
            $this->connection->commit();
192
        } catch (\Exception $e) {
193
            $this->connection->rollBack();
194
            throw $e;
195
        }
196 11
    }
197
198
    /**
199
     * @param string $streamId
200
     * @return bool
201
     */
202 13 View Code Duplication
    protected function streamExists($streamId)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
203
    {
204 13
        $stmt = $this->connection
205 13
            ->prepare('SELECT COUNT(*) FROM streams WHERE id = :streamId');
206 13
        $stmt->bindValue(':streamId', $streamId);
207 13
        $stmt->execute();
208 13
        return boolval($stmt->fetchColumn());
209
    }
210
    /**
211
     * @param string $streamId
212
     * @return int
213
     */
214 11 View Code Duplication
    protected function streamVersion($streamId)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
215
    {
216 11
        $stmt = $this->connection
217 11
            ->prepare('SELECT COUNT(*) FROM events WHERE stream_id = :streamId');
218 11
        $stmt->bindValue(':streamId', $streamId);
219 11
        $stmt->execute();
220 11
        return intval($stmt->fetchColumn());
221
    }
222
223
    /**
224
     * @param string $type
225
     * @param Version $version
226
     * @return EventStreamInterface
227
     */
228 1 View Code Duplication
    protected function readStoredEventsOfTypeAndVersion($type, $version)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
229
    {
230 1
        $stmt = $this->connection->prepare(
231
            'SELECT *
232
             FROM events
233
             WHERE type = :type
234 1
             AND version = :version'
235
        );
236 1
        $stmt->bindValue(':type', $type);
237 1
        $stmt->bindValue(':version', $version);
238 1
        $stmt->execute();
239 1
        $results = $stmt->fetchAll();
240
241 1
        $storedEvents = array_map(function($result) {
242 1
            return new StoredEvent(
243 1
                $result['id'],
244 1
                $result['stream_id'],
245 1
                $result['type'],
246 1
                $result['event'],
247 1
                $result['metadata'],
248 1
                new \DateTimeImmutable($result['occurred_on']),
249 1
                Version::fromString($result['version'])
250
            );
251 1
        }, $results);
252
253 1
        return new EventStream($storedEvents);
254
    }
255
256 20
    public function initialize()
257
    {
258
        try {
259 20
            $this->connection->beginTransaction();
260
261 20
            $this->connection->exec(
262 20
                'CREATE TABLE `'.self::STREAMS_TABLE.'` (
263
                    `id` varchar(255) NOT NULL,
264
                    PRIMARY KEY (`id`)
265 20
                )'
266
            );
267
268 20
            $this->connection->exec(
269 20
                'CREATE TABLE `'.self::EVENTS_TABLE.'` (
270
                    `id` int(11) NOT NULL AUTO_INCREMENT,
271
                    `stream_id` varchar(255) NOT NULL,
272
                    `type` varchar(255) NOT NULL,
273
                    `event` json NOT NULL,
274
                    `metadata` json NOT NULL,
275
                    `occurred_on` datetime NOT NULL,
276
                    `version` varchar(255) NOT NULL,
277
                    PRIMARY KEY (`id`),
278
                    KEY `stream_id` (`stream_id`),
279
                    CONSTRAINT `events_ibfk_1` FOREIGN KEY (`stream_id`) REFERENCES `streams` (`id`)
280 20
                )'
281
            );
282
283 20
            $this->connection->commit();
284
        } catch (\Exception $e) {
285
            $this->connection->rollBack();
286
            throw $e;
287
        }
288 20
    }
289
290
    /**
291
     * @return bool
292
     */
293 3 View Code Duplication
    public function initialized()
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
294
    {
295
        try {
296 3
            $result = $this->connection->query('SELECT 1 FROM `'.self::STREAMS_TABLE.'` LIMIT 1');
297 1
        } catch (\Exception $e) {
298 1
            return false;
299
        }
300 2
        return $result !== false;
301
    }
302
303
    /**
304
     * @param string $streamId
305
     * @param \DateTimeImmutable $datetime
306
     * @return int
307
     * @throws EventStreamDoesNotExistException
308
     */
309 3 View Code Duplication
    public function getStreamVersionAt($streamId, \DateTimeImmutable $datetime)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
310
    {
311 3
        if (!$this->streamExists($streamId)) {
312 1
            throw EventStreamDoesNotExistException::fromStreamId($streamId);
313
        }
314 2
        $stmt = $this->connection->prepare(
315
            'SELECT COUNT(*)
316
             FROM events
317
             WHERE stream_id = :streamId
318 2
             AND occurred_on <= :occurred_on'
319
        );
320 2
        $stmt->bindValue(':streamId', $streamId);
321 2
        $stmt->bindValue(':occurred_on', $datetime->format('Y-m-d H:i:s'));
322 2
        $stmt->execute();
323 2
        return intval($stmt->fetchColumn());
324
    }
325
}
326