Passed
Push — master ( 8f1ad4...65e262 )
by Daniel
06:10
created

MySqlJsonEventStore::readStreamEventsUntil()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
dl 0
loc 4
ccs 0
cts 2
cp 0
rs 10
c 0
b 0
f 0
cc 1
eloc 1
nc 1
nop 3
crap 2
1
<?php
2
3
namespace DDDominio\EventSourcing\EventStore\Vendor;
4
5
use DDDominio\EventSourcing\Common\EventStream;
6
use DDDominio\EventSourcing\Common\EventStreamInterface;
7
use DDDominio\EventSourcing\EventStore\AbstractEventStore;
8
use DDDominio\EventSourcing\EventStore\ConcurrencyException;
9
use DDDominio\EventSourcing\EventStore\InitializableInterface;
10
use DDDominio\EventSourcing\EventStore\StoredEvent;
11
use DDDominio\EventSourcing\Serialization\SerializerInterface;
12
use DDDominio\EventSourcing\Versioning\EventUpgrader;
13
use DDDominio\EventSourcing\Versioning\Version;
14
15
class MySqlJsonEventStore extends AbstractEventStore implements InitializableInterface
16
{
17
    const MAX_UNSIGNED_BIG_INT = 9223372036854775807;
18
    const STREAMS_TABLE = 'streams';
19
    const EVENTS_TABLE = 'events';
20
21
    /**
22
     * @var \PDO
23
     */
24
    private $connection;
25
26
    /**
27
     * @param \PDO $connection
28
     * @param SerializerInterface $serializer
29
     * @param EventUpgrader $eventUpgrader
30
     */
31 11
    public function __construct(
32
        \PDO $connection,
33
        SerializerInterface $serializer,
34
        $eventUpgrader
35
    ) {
36 11
        $this->connection = $connection;
37 11
        parent::__construct($serializer, $eventUpgrader);
38 11
    }
39
40
    /**
41
     * @param string $streamId
42
     * @param int $start
43
     * @param int $count
44
     * @return EventStreamInterface
45
     */
46 4 View Code Duplication
    public function readStreamEvents($streamId, $start = 1, $count = null)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
47
    {
48 4
        if (!isset($count)) {
49 3
            $count = self::MAX_UNSIGNED_BIG_INT;
50
        }
51 4
        $stmt = $this->connection->prepare(
52
            'SELECT *
53
             FROM events
54
             WHERE stream_id = :streamId
55
             LIMIT :limit
56 4
             OFFSET :offset'
57
        );
58 4
        $stmt->bindValue(':streamId', $streamId);
59 4
        $stmt->bindValue(':offset', (int) $start - 1, \PDO::PARAM_INT);
60 4
        $stmt->bindValue(':limit', $count, \PDO::PARAM_INT);
61 4
        $stmt->execute();
62 4
        $results = $stmt->fetchAll();
63
64
        $storedEvents = array_map(function($event) {
65 3
            return new StoredEvent(
66 3
                $event['id'],
67 3
                $event['stream_id'],
68 3
                $event['type'],
69 3
                $event['event'],
70 3
                $event['metadata'],
71 3
                new \DateTimeImmutable($event['occurred_on']),
72 3
                Version::fromString($event['version'])
73
            );
74 4
        }, $results);
75
76 4
        return $this->domainEventStreamFromStoredEvents($storedEvents);
77
    }
78
79
    /**
80
     * @param string $streamId
81
     * @param \DateTimeImmutable $datetime
82
     * @param int $start
83
     * @return EventStreamInterface
84
     */
85
    public function readStreamEventsUntil($streamId, $datetime, $start = 1)
86
    {
87
        // TODO: Implement readStreamEventsUntil() method.
88
    }
89
90
    /**
91
     * @param string $streamId
92
     * @return EventStreamInterface
93
     */
94 5 View Code Duplication
    public function readFullStream($streamId)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
95
    {
96 5
        $stmt = $this->connection->prepare(
97
            'SELECT *
98
             FROM events
99 5
             WHERE stream_id = :streamId'
100
        );
101 5
        $stmt->bindValue(':streamId', $streamId);
102 5
        $stmt->execute();
103 5
        $results = $stmt->fetchAll();
104
105
        $storedEvents = array_map(function($event) {
106 4
            return new StoredEvent(
107 4
                $event['id'],
108 4
                $event['stream_id'],
109 4
                $event['type'],
110 4
                $event['event'],
111 4
                $event['metadata'],
112 4
                new \DateTimeImmutable($event['occurred_on']),
113 4
                Version::fromString($event['version'])
114
            );
115 5
        }, $results);
116
117 5
        return $this->domainEventStreamFromStoredEvents($storedEvents);
118
    }
119
120
    /**
121
     * @return EventStreamInterface[]
122
     */
123
    public function readAllStreams()
124
    {
125
        // TODO: Implement readAllStreams() method.
126
    }
127
128
    /**
129
     * @return EventStreamInterface
130
     */
131
    public function readAllEvents()
132
    {
133
        // TODO: Implement readAllEvents() method.
134
    }
135
136
    /**
137
     * @param string $streamId
138
     * @param StoredEvent[] $storedEvents
139
     * @param int $expectedVersion
140
     * @throws \Exception
141
     */
142 7
    protected function appendStoredEvents($streamId, $storedEvents, $expectedVersion)
143
    {
144 7
        $this->connection->beginTransaction();
145
        try {
146 7 View Code Duplication
            if (!$this->streamExists($streamId)) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
147 7
                $stmt = $this->connection
148 7
                    ->prepare('INSERT INTO streams (id) VALUES (:streamId)');
149 7
                $stmt->bindValue(':streamId', $streamId);
150 7
                $stmt->execute();
151
            }
152 7 View Code Duplication
            foreach ($storedEvents as $storedEvent) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
153 7
                $stmt = $this->connection->prepare(
154
                    'INSERT INTO events (stream_id, type, event, metadata, occurred_on, version)
155 7
                 VALUES (:streamId, :type, :event, :metadata, :occurredOn, :version)'
156
                );
157 7
                $stmt->bindValue(':streamId', $streamId);
158 7
                $stmt->bindValue(':type', $storedEvent->type());
159 7
                $stmt->bindValue(':event', $storedEvent->data());
160 7
                $stmt->bindValue(':metadata', $storedEvent->metadata());
161 7
                $stmt->bindValue(':occurredOn', $storedEvent->occurredOn()->format('Y-m-d H:i:s'));
162 7
                $stmt->bindValue(':version', $storedEvent->version());
163 7
                $stmt->execute();
164
            }
165 7
            $streamFinalVersion = $this->streamVersion($streamId);
166 7
            if (count($storedEvents) !== $streamFinalVersion - $expectedVersion) {
167
                throw ConcurrencyException::fromVersions(
168
                    $this->streamVersion($streamId),
169
                    $expectedVersion
170
                );
171
            }
172 7
            $this->connection->commit();
173
        } catch (\Exception $e) {
174
            $this->connection->rollBack();
175
            throw $e;
176
        }
177 7
    }
178
179
    /**
180
     * @param string $streamId
181
     * @return bool
182
     */
183 8 View Code Duplication
    protected function streamExists($streamId)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
184
    {
185 8
        $stmt = $this->connection
186 8
            ->prepare('SELECT COUNT(*) FROM streams WHERE id = :streamId');
187 8
        $stmt->bindValue(':streamId', $streamId);
188 8
        $stmt->execute();
189 8
        return boolval($stmt->fetchColumn());
190
    }
191
    /**
192
     * @param string $streamId
193
     * @return int
194
     */
195 7 View Code Duplication
    protected function streamVersion($streamId)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
196
    {
197 7
        $stmt = $this->connection
198 7
            ->prepare('SELECT COUNT(*) FROM events WHERE stream_id = :streamId');
199 7
        $stmt->bindValue(':streamId', $streamId);
200 7
        $stmt->execute();
201 7
        return intval($stmt->fetchColumn());
202
    }
203
204
    /**
205
     * @param string $type
206
     * @param Version $version
207
     * @return EventStreamInterface
208
     */
209 View Code Duplication
    protected function readStoredEventsOfTypeAndVersion($type, $version)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
210
    {
211
        $stmt = $this->connection->prepare(
212
            'SELECT *
213
             FROM events
214
             WHERE type = :type
215
             AND version = :version'
216
        );
217
        $stmt->bindValue(':type', $type);
218
        $stmt->bindValue(':version', $version);
219
        $stmt->execute();
220
        $results = $stmt->fetchAll();
221
222
        $storedEvents = array_map(function($result) {
223
            return new StoredEvent(
224
                $result['id'],
225
                $result['stream_id'],
226
                $result['type'],
227
                $result['event'],
228
                $result['metadata'],
229
                new \DateTimeImmutable($result['occurred_on']),
230
                Version::fromString($result['version'])
231
            );
232
        }, $results);
233
234
        return new EventStream($storedEvents);
235
    }
236
237 11
    public function initialize()
238
    {
239
        try {
240 11
            $this->connection->beginTransaction();
241
242 11
            $this->connection->exec(
243 11
                'CREATE TABLE `'.self::STREAMS_TABLE.'` (
244
                    `id` varchar(255) NOT NULL,
245
                    PRIMARY KEY (`id`)
246 11
                )'
247
            );
248
249 11
            $this->connection->exec(
250 11
                'CREATE TABLE `'.self::EVENTS_TABLE.'` (
251
                    `id` int(11) NOT NULL AUTO_INCREMENT,
252
                    `stream_id` varchar(255) NOT NULL,
253
                    `type` varchar(255) NOT NULL,
254
                    `event` json NOT NULL,
255
                    `metadata` json NOT NULL,
256
                    `occurred_on` datetime NOT NULL,
257
                    `version` varchar(255) NOT NULL,
258
                    PRIMARY KEY (`id`),
259
                    KEY `stream_id` (`stream_id`),
260
                    CONSTRAINT `events_ibfk_1` FOREIGN KEY (`stream_id`) REFERENCES `streams` (`id`)
261 11
                )'
262
            );
263
264 11
            $this->connection->commit();
265
        } catch (\Exception $e) {
266
            $this->connection->rollBack();
267
            throw $e;
268
        }
269 11
    }
270
271
    /**
272
     * @return bool
273
     */
274 View Code Duplication
    public function initialized()
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
275
    {
276
        try {
277
            $result = $this->connection->query('SELECT 1 FROM `'.self::STREAMS_TABLE.'` LIMIT 1');
278
        } catch (\Exception $e) {
279
            return false;
280
        }
281
        return $result !== false;
282
    }
283
284
    /**
285
     * @param string $streamId
286
     * @param \DateTimeImmutable $datetime
287
     * @return int
288
     */
289
    public function getStreamVersionAt($streamId, \DateTimeImmutable $datetime)
290
    {
291
        // TODO: Implement findStreamVersionAt() method.
292
    }
293
}
294