Completed
Push — master ( 59f7a1...075b21 )
by Daniel
03:25
created

MySqlJsonEventStore::initialize()   B

Complexity

Conditions 2
Paths 5

Size

Total Lines 33
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 10
CRAP Score 2.0491

Importance

Changes 0
Metric Value
dl 0
loc 33
c 0
b 0
f 0
ccs 10
cts 13
cp 0.7692
rs 8.8571
cc 2
eloc 11
nc 5
nop 0
crap 2.0491
1
<?php
2
3
namespace DDDominio\EventSourcing\EventStore\Vendor;
4
5
use DDDominio\EventSourcing\Common\EventStream;
6
use DDDominio\EventSourcing\Common\EventStreamInterface;
7
use DDDominio\EventSourcing\EventStore\AbstractEventStore;
8
use DDDominio\EventSourcing\EventStore\ConcurrencyException;
9
use DDDominio\EventSourcing\EventStore\EventStreamDoesNotExistException;
10
use DDDominio\EventSourcing\EventStore\InitializableInterface;
11
use DDDominio\EventSourcing\EventStore\StoredEvent;
12
use DDDominio\EventSourcing\Serialization\SerializerInterface;
13
use DDDominio\EventSourcing\Versioning\EventUpgrader;
14
use DDDominio\EventSourcing\Versioning\Version;
15
16
class MySqlJsonEventStore extends AbstractEventStore implements InitializableInterface
17
{
18
    const MAX_UNSIGNED_BIG_INT = 9223372036854775807;
19
    const STREAMS_TABLE = 'streams';
20
    const EVENTS_TABLE = 'events';
21
22
    /**
23
     * @var \PDO
24
     */
25
    private $connection;
26
27
    /**
28
     * @param \PDO $connection
29
     * @param SerializerInterface $serializer
30
     * @param EventUpgrader $eventUpgrader
31
     */
32 17
    public function __construct(
33
        \PDO $connection,
34
        SerializerInterface $serializer,
35
        $eventUpgrader
36
    ) {
37 17
        $this->connection = $connection;
38 17
        parent::__construct($serializer, $eventUpgrader);
39 17
    }
40
41
    /**
42
     * @param string $streamId
43
     * @param int $start
44
     * @param int $count
45
     * @return EventStreamInterface
46
     */
47 4 View Code Duplication
    public function readStreamEvents($streamId, $start = 1, $count = null)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
48
    {
49 4
        if (!isset($count)) {
50 3
            $count = self::MAX_UNSIGNED_BIG_INT;
51
        }
52 4
        $stmt = $this->connection->prepare(
53
            'SELECT *
54
             FROM events
55
             WHERE stream_id = :streamId
56
             LIMIT :limit
57 4
             OFFSET :offset'
58
        );
59 4
        $stmt->bindValue(':streamId', $streamId);
60 4
        $stmt->bindValue(':offset', (int) $start - 1, \PDO::PARAM_INT);
61 4
        $stmt->bindValue(':limit', $count, \PDO::PARAM_INT);
62 4
        $stmt->execute();
63 4
        $results = $stmt->fetchAll();
64
65
        $storedEvents = array_map(function($event) {
66 3
            return new StoredEvent(
67 3
                $event['id'],
68 3
                $event['stream_id'],
69 3
                $event['type'],
70 3
                $event['event'],
71 3
                $event['metadata'],
72 3
                new \DateTimeImmutable($event['occurred_on']),
73 3
                Version::fromString($event['version'])
74
            );
75 4
        }, $results);
76
77 4
        return $this->domainEventStreamFromStoredEvents($storedEvents);
78
    }
79
80
    /**
81
     * @param string $streamId
82
     * @return EventStreamInterface
83
     */
84 5 View Code Duplication
    public function readFullStream($streamId)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
85
    {
86 5
        $stmt = $this->connection->prepare(
87
            'SELECT *
88
             FROM events
89 5
             WHERE stream_id = :streamId'
90
        );
91 5
        $stmt->bindValue(':streamId', $streamId);
92 5
        $stmt->execute();
93 5
        $results = $stmt->fetchAll();
94
95
        $storedEvents = array_map(function($event) {
96 4
            return new StoredEvent(
97 4
                $event['id'],
98 4
                $event['stream_id'],
99 4
                $event['type'],
100 4
                $event['event'],
101 4
                $event['metadata'],
102 4
                new \DateTimeImmutable($event['occurred_on']),
103 4
                Version::fromString($event['version'])
104
            );
105 5
        }, $results);
106
107 5
        return $this->domainEventStreamFromStoredEvents($storedEvents);
108
    }
109
110
    /**
111
     * @return EventStreamInterface[]
112
     */
113
    public function readAllStreams()
114
    {
115
        // TODO: Implement readAllStreams() method.
116
    }
117
118
    /**
119
     * @return EventStreamInterface
120
     */
121
    public function readAllEvents()
122
    {
123
        // TODO: Implement readAllEvents() method.
124
    }
125
126
    /**
127
     * @param string $streamId
128
     * @param StoredEvent[] $storedEvents
129
     * @param int $expectedVersion
130
     * @throws \Exception
131
     */
132 9
    protected function appendStoredEvents($streamId, $storedEvents, $expectedVersion)
133
    {
134 9
        $this->connection->beginTransaction();
135
        try {
136 9 View Code Duplication
            if (!$this->streamExists($streamId)) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
137 9
                $stmt = $this->connection
138 9
                    ->prepare('INSERT INTO streams (id) VALUES (:streamId)');
139 9
                $stmt->bindValue(':streamId', $streamId);
140 9
                $stmt->execute();
141
            }
142 9 View Code Duplication
            foreach ($storedEvents as $storedEvent) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
143 9
                $stmt = $this->connection->prepare(
144
                    'INSERT INTO events (stream_id, type, event, metadata, occurred_on, version)
145 9
                 VALUES (:streamId, :type, :event, :metadata, :occurredOn, :version)'
146
                );
147 9
                $stmt->bindValue(':streamId', $streamId);
148 9
                $stmt->bindValue(':type', $storedEvent->type());
149 9
                $stmt->bindValue(':event', $storedEvent->data());
150 9
                $stmt->bindValue(':metadata', $storedEvent->metadata());
151 9
                $stmt->bindValue(':occurredOn', $storedEvent->occurredOn()->format('Y-m-d H:i:s'));
152 9
                $stmt->bindValue(':version', $storedEvent->version());
153 9
                $stmt->execute();
154
            }
155 9
            $streamFinalVersion = $this->streamVersion($streamId);
156 9
            if (count($storedEvents) !== $streamFinalVersion - $expectedVersion) {
157
                throw ConcurrencyException::fromVersions(
158
                    $this->streamVersion($streamId),
159
                    $expectedVersion
160
                );
161
            }
162 9
            $this->connection->commit();
163
        } catch (\Exception $e) {
164
            $this->connection->rollBack();
165
            throw $e;
166
        }
167 9
    }
168
169
    /**
170
     * @param string $streamId
171
     * @return bool
172
     */
173 11 View Code Duplication
    protected function streamExists($streamId)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
174
    {
175 11
        $stmt = $this->connection
176 11
            ->prepare('SELECT COUNT(*) FROM streams WHERE id = :streamId');
177 11
        $stmt->bindValue(':streamId', $streamId);
178 11
        $stmt->execute();
179 11
        return boolval($stmt->fetchColumn());
180
    }
181
    /**
182
     * @param string $streamId
183
     * @return int
184
     */
185 9 View Code Duplication
    protected function streamVersion($streamId)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
186
    {
187 9
        $stmt = $this->connection
188 9
            ->prepare('SELECT COUNT(*) FROM events WHERE stream_id = :streamId');
189 9
        $stmt->bindValue(':streamId', $streamId);
190 9
        $stmt->execute();
191 9
        return intval($stmt->fetchColumn());
192
    }
193
194
    /**
195
     * @param string $type
196
     * @param Version $version
197
     * @return EventStreamInterface
198
     */
199 View Code Duplication
    protected function readStoredEventsOfTypeAndVersion($type, $version)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
200
    {
201
        $stmt = $this->connection->prepare(
202
            'SELECT *
203
             FROM events
204
             WHERE type = :type
205
             AND version = :version'
206
        );
207
        $stmt->bindValue(':type', $type);
208
        $stmt->bindValue(':version', $version);
209
        $stmt->execute();
210
        $results = $stmt->fetchAll();
211
212
        $storedEvents = array_map(function($result) {
213
            return new StoredEvent(
214
                $result['id'],
215
                $result['stream_id'],
216
                $result['type'],
217
                $result['event'],
218
                $result['metadata'],
219
                new \DateTimeImmutable($result['occurred_on']),
220
                Version::fromString($result['version'])
221
            );
222
        }, $results);
223
224
        return new EventStream($storedEvents);
225
    }
226
227 17
    public function initialize()
228
    {
229
        try {
230 17
            $this->connection->beginTransaction();
231
232 17
            $this->connection->exec(
233 17
                'CREATE TABLE `'.self::STREAMS_TABLE.'` (
234
                    `id` varchar(255) NOT NULL,
235
                    PRIMARY KEY (`id`)
236 17
                )'
237
            );
238
239 17
            $this->connection->exec(
240 17
                'CREATE TABLE `'.self::EVENTS_TABLE.'` (
241
                    `id` int(11) NOT NULL AUTO_INCREMENT,
242
                    `stream_id` varchar(255) NOT NULL,
243
                    `type` varchar(255) NOT NULL,
244
                    `event` json NOT NULL,
245
                    `metadata` json NOT NULL,
246
                    `occurred_on` datetime NOT NULL,
247
                    `version` varchar(255) NOT NULL,
248
                    PRIMARY KEY (`id`),
249
                    KEY `stream_id` (`stream_id`),
250
                    CONSTRAINT `events_ibfk_1` FOREIGN KEY (`stream_id`) REFERENCES `streams` (`id`)
251 17
                )'
252
            );
253
254 17
            $this->connection->commit();
255
        } catch (\Exception $e) {
256
            $this->connection->rollBack();
257
            throw $e;
258
        }
259 17
    }
260
261
    /**
262
     * @return bool
263
     */
264 3 View Code Duplication
    public function initialized()
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
265
    {
266
        try {
267 3
            $result = $this->connection->query('SELECT 1 FROM `'.self::STREAMS_TABLE.'` LIMIT 1');
268 1
        } catch (\Exception $e) {
269 1
            return false;
270
        }
271 2
        return $result !== false;
272
    }
273
274
    /**
275
     * @param string $streamId
276
     * @param \DateTimeImmutable $datetime
277
     * @return int
278
     * @throws EventStreamDoesNotExistException
279
     */
280 3 View Code Duplication
    public function getStreamVersionAt($streamId, \DateTimeImmutable $datetime)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
281
    {
282 3
        if (!$this->streamExists($streamId)) {
283 1
            throw EventStreamDoesNotExistException::fromStreamId($streamId);
284
        }
285 2
        $stmt = $this->connection->prepare(
286
            'SELECT COUNT(*)
287
             FROM events
288
             WHERE stream_id = :streamId
289 2
             AND occurred_on <= :occurred_on'
290
        );
291 2
        $stmt->bindValue(':streamId', $streamId);
292 2
        $stmt->bindValue(':occurred_on', $datetime->format('Y-m-d H:i:s'));
293 2
        $stmt->execute();
294 2
        return intval($stmt->fetchColumn());
295
    }
296
}
297