Passed
Push — master ( 7b49c3...eb0ddf )
by Daniel
03:30
created

MySqlJsonEventStore::initialize()   B

Complexity

Conditions 2
Paths 5

Size

Total Lines 33
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 13
CRAP Score 2

Importance

Changes 0
Metric Value
c 0
b 0
f 0
dl 0
loc 33
ccs 13
cts 13
cp 1
rs 8.8571
cc 2
eloc 11
nc 5
nop 0
crap 2
1
<?php
2
3
namespace DDDominio\EventSourcing\EventStore\Vendor;
4
5
use DDDominio\EventSourcing\Common\EventStream;
6
use DDDominio\EventSourcing\Common\EventStreamInterface;
7
use DDDominio\EventSourcing\EventStore\AbstractEventStore;
8
use DDDominio\EventSourcing\EventStore\ConcurrencyException;
9
use DDDominio\EventSourcing\EventStore\EventStreamDoesNotExistException;
10
use DDDominio\EventSourcing\EventStore\InitializableInterface;
11
use DDDominio\EventSourcing\EventStore\StoredEvent;
12
use DDDominio\EventSourcing\Serialization\SerializerInterface;
13
use DDDominio\EventSourcing\Versioning\EventUpgrader;
14
use DDDominio\EventSourcing\Versioning\Version;
15
16
class MySqlJsonEventStore extends AbstractEventStore implements InitializableInterface
17
{
18
    const MAX_UNSIGNED_BIG_INT = 9223372036854775807;
19
    const STREAMS_TABLE = 'streams';
20
    const EVENTS_TABLE = 'events';
21
22
    /**
23
     * @var \PDO
24
     */
25
    private $connection;
26
27
    /**
28
     * @param \PDO $connection
29
     * @param SerializerInterface $serializer
30
     * @param EventUpgrader $eventUpgrader
31
     */
32 22
    public function __construct(
33
        \PDO $connection,
34
        SerializerInterface $serializer,
35
        $eventUpgrader
36
    ) {
37 22
        $this->connection = $connection;
38 22
        parent::__construct($serializer, $eventUpgrader);
39 22
    }
40
41
    /**
42
     * @param string $streamId
43
     * @param int $start
44
     * @param int $count
45
     * @return EventStreamInterface
46
     */
47 4 View Code Duplication
    public function readStreamEvents($streamId, $start = 1, $count = null)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
48
    {
49 4
        if (!isset($count)) {
50 3
            $count = self::MAX_UNSIGNED_BIG_INT;
51
        }
52 4
        $stmt = $this->connection->prepare(
53
            'SELECT *
54
             FROM events
55
             WHERE stream_id = :streamId
56
             LIMIT :limit
57 4
             OFFSET :offset'
58
        );
59 4
        $stmt->bindValue(':streamId', $streamId);
60 4
        $stmt->bindValue(':offset', (int) $start - 1, \PDO::PARAM_INT);
61 4
        $stmt->bindValue(':limit', $count, \PDO::PARAM_INT);
62 4
        $stmt->execute();
63 4
        $results = $stmt->fetchAll();
64
65
        $storedEvents = array_map(function($event) {
66 3
            return new StoredEvent(
67 3
                $event['id'],
68 3
                $event['stream_id'],
69 3
                $event['type'],
70 3
                $event['event'],
71 3
                $event['metadata'],
72 3
                new \DateTimeImmutable($event['occurred_on']),
73 3
                Version::fromString($event['version'])
74
            );
75 4
        }, $results);
76
77 4
        return $this->domainEventStreamFromStoredEvents($storedEvents);
78
    }
79
80
    /**
81
     * @param string $streamId
82
     * @return EventStreamInterface
83
     */
84 7 View Code Duplication
    public function readFullStream($streamId)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
85
    {
86 7
        $stmt = $this->connection->prepare(
87
            'SELECT *
88
             FROM events
89 7
             WHERE stream_id = :streamId'
90
        );
91 7
        $stmt->bindValue(':streamId', $streamId);
92 7
        $stmt->execute();
93 7
        $results = $stmt->fetchAll();
94
95
        $storedEvents = array_map(function($event) {
96 6
            return new StoredEvent(
97 6
                $event['id'],
98 6
                $event['stream_id'],
99 6
                $event['type'],
100 6
                $event['event'],
101 6
                $event['metadata'],
102 6
                new \DateTimeImmutable($event['occurred_on']),
103 6
                Version::fromString($event['version'])
104
            );
105 7
        }, $results);
106
107 7
        return $this->domainEventStreamFromStoredEvents($storedEvents);
108
    }
109
110
    /**
111
     * @return EventStreamInterface[]
112
     */
113 1 View Code Duplication
    public function readAllStreams()
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
114
    {
115 1
        $stmt = $this->connection->prepare(
116
            'SELECT *
117 1
             FROM streams'
118
        );
119 1
        $stmt->execute();
120
121 1
        $streams = [];
122 1
        foreach ($stmt->fetchAll() as $result) {
123 1
            $streams[] = $this->readFullStream($result['id']);
124
        }
125 1
        return $streams;
126
    }
127
128
    /**
129
     * @return EventStreamInterface
130
     */
131 1 View Code Duplication
    public function readAllEvents()
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
132
    {
133 1
        $stmt = $this->connection->prepare(
134
            'SELECT *
135 1
             FROM events'
136
        );
137 1
        $stmt->execute();
138 1
        $results = $stmt->fetchAll();
139
140
        $storedEvents = array_map(function($event) {
141 1
            return new StoredEvent(
142 1
                $event['id'],
143 1
                $event['stream_id'],
144 1
                $event['type'],
145 1
                $event['event'],
146 1
                $event['metadata'],
147 1
                new \DateTimeImmutable($event['occurred_on']),
148 1
                Version::fromString($event['version'])
149
            );
150 1
        }, $results);
151
152 1
        return $this->domainEventStreamFromStoredEvents($storedEvents);
153
    }
154
155
    /**
156
     * @param string $streamId
157
     * @param StoredEvent[] $storedEvents
158
     * @param int $expectedVersion
159
     * @throws \Exception
160
     */
161 12
    protected function appendStoredEvents($streamId, $storedEvents, $expectedVersion)
162
    {
163 12
        $this->connection->beginTransaction();
164
        try {
165 12 View Code Duplication
            if (!$this->streamExists($streamId)) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
166 12
                $stmt = $this->connection
167 12
                    ->prepare('INSERT INTO streams (id) VALUES (:streamId)');
168 12
                $stmt->bindValue(':streamId', $streamId);
169 12
                $stmt->execute();
170
            }
171 12 View Code Duplication
            foreach ($storedEvents as $storedEvent) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
172 12
                $stmt = $this->connection->prepare(
173
                    'INSERT INTO events (stream_id, type, event, metadata, occurred_on, version)
174 12
                 VALUES (:streamId, :type, :event, :metadata, :occurredOn, :version)'
175
                );
176 12
                $stmt->bindValue(':streamId', $streamId);
177 12
                $stmt->bindValue(':type', $storedEvent->type());
178 12
                $stmt->bindValue(':event', $storedEvent->data());
179 12
                $stmt->bindValue(':metadata', $storedEvent->metadata());
180 12
                $stmt->bindValue(':occurredOn', $storedEvent->occurredOn()->format('Y-m-d H:i:s'));
181 12
                $stmt->bindValue(':version', $storedEvent->version());
182 12
                $stmt->execute();
183
            }
184 12
            $streamFinalVersion = $this->streamVersion($streamId);
185 12
            if (count($storedEvents) !== $streamFinalVersion - $expectedVersion) {
186 1
                throw ConcurrencyException::fromVersions(
187 1
                    $this->streamVersion($streamId),
188
                    $expectedVersion
189
                );
190
            }
191 11
            $this->connection->commit();
192 1
        } catch (\Exception $e) {
193 1
            $this->connection->rollBack();
194 1
            throw $e;
195
        }
196 11
    }
197
198
    /**
199
     * @param string $streamId
200
     * @return bool
201
     */
202 14 View Code Duplication
    protected function streamExists($streamId)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
203
    {
204 14
        $stmt = $this->connection
205 14
            ->prepare('SELECT COUNT(*) FROM streams WHERE id = :streamId');
206 14
        $stmt->bindValue(':streamId', $streamId);
207 14
        $stmt->execute();
208 14
        return boolval($stmt->fetchColumn());
209
    }
210
    /**
211
     * @param string $streamId
212
     * @return int
213
     */
214 11 View Code Duplication
    protected function streamVersion($streamId)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
215
    {
216 11
        $stmt = $this->connection
217 11
            ->prepare('SELECT COUNT(*) FROM events WHERE stream_id = :streamId');
218 11
        $stmt->bindValue(':streamId', $streamId);
219 11
        $stmt->execute();
220 11
        return intval($stmt->fetchColumn());
221
    }
222
223
    /**
224
     * @param string $type
225
     * @param Version $version
226
     * @return EventStreamInterface
227
     */
228 1 View Code Duplication
    protected function readStoredEventsOfTypeAndVersion($type, $version)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
229
    {
230 1
        $stmt = $this->connection->prepare(
231
            'SELECT *
232
             FROM events
233
             WHERE type = :type
234 1
             AND version = :version'
235
        );
236 1
        $stmt->bindValue(':type', $type);
237 1
        $stmt->bindValue(':version', $version);
238 1
        $stmt->execute();
239 1
        $results = $stmt->fetchAll();
240
241 1
        $storedEvents = array_map(function($result) {
242 1
            return new StoredEvent(
243 1
                $result['id'],
244 1
                $result['stream_id'],
245 1
                $result['type'],
246 1
                $result['event'],
247 1
                $result['metadata'],
248 1
                new \DateTimeImmutable($result['occurred_on']),
249 1
                Version::fromString($result['version'])
250
            );
251 1
        }, $results);
252
253 1
        return new EventStream($storedEvents);
254
    }
255
256 22
    public function initialize()
257
    {
258
        try {
259 22
            $this->connection->beginTransaction();
260
261 22
            $this->connection->exec(
262 22
                'CREATE TABLE `'.self::STREAMS_TABLE.'` (
263
                    `id` varchar(255) NOT NULL,
264
                    PRIMARY KEY (`id`)
265 22
                )'
266
            );
267
268 22
            $this->connection->exec(
269 22
                'CREATE TABLE `'.self::EVENTS_TABLE.'` (
270
                    `id` int(11) NOT NULL AUTO_INCREMENT,
271
                    `stream_id` varchar(255) NOT NULL,
272
                    `type` varchar(255) NOT NULL,
273
                    `event` json NOT NULL,
274
                    `metadata` json NOT NULL,
275
                    `occurred_on` datetime NOT NULL,
276
                    `version` varchar(255) NOT NULL,
277
                    PRIMARY KEY (`id`),
278
                    KEY `stream_id` (`stream_id`),
279
                    CONSTRAINT `events_ibfk_1` FOREIGN KEY (`stream_id`) REFERENCES `streams` (`id`)
280 22
                )'
281
            );
282
283 22
            $this->connection->commit();
284 1
        } catch (\Exception $e) {
285 1
            $this->connection->rollBack();
286 1
            throw $e;
287
        }
288 22
    }
289
290
    /**
291
     * @return bool
292
     */
293 3 View Code Duplication
    public function initialized()
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
294
    {
295
        try {
296 3
            $result = $this->connection->query('SELECT 1 FROM `'.self::STREAMS_TABLE.'` LIMIT 1');
297 1
        } catch (\Exception $e) {
298 1
            return false;
299
        }
300 2
        return $result !== false;
301
    }
302
303
    /**
304
     * @param string $streamId
305
     * @param \DateTimeImmutable $datetime
306
     * @return int
307
     * @throws EventStreamDoesNotExistException
308
     */
309 3 View Code Duplication
    public function getStreamVersionAt($streamId, \DateTimeImmutable $datetime)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
310
    {
311 3
        if (!$this->streamExists($streamId)) {
312 1
            throw EventStreamDoesNotExistException::fromStreamId($streamId);
313
        }
314 2
        $stmt = $this->connection->prepare(
315
            'SELECT COUNT(*)
316
             FROM events
317
             WHERE stream_id = :streamId
318 2
             AND occurred_on <= :occurred_on'
319
        );
320 2
        $stmt->bindValue(':streamId', $streamId);
321 2
        $stmt->bindValue(':occurred_on', $datetime->format('Y-m-d H:i:s'));
322 2
        $stmt->execute();
323 2
        return intval($stmt->fetchColumn());
324
    }
325
}
326