Completed
Push — master ( ab1322...e55add )
by Daniel
03:00
created

MySqlJsonEventStore::initialize()   B

Complexity

Conditions 2
Paths 5

Size

Total Lines 33
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 10
CRAP Score 2.0491

Importance

Changes 0
Metric Value
dl 0
loc 33
c 0
b 0
f 0
ccs 10
cts 13
cp 0.7692
rs 8.8571
cc 2
eloc 11
nc 5
nop 0
crap 2.0491
1
<?php
2
3
namespace DDDominio\EventSourcing\EventStore\Vendor;
4
5
use DDDominio\EventSourcing\Common\EventStream;
6
use DDDominio\EventSourcing\Common\EventStreamInterface;
7
use DDDominio\EventSourcing\EventStore\AbstractEventStore;
8
use DDDominio\EventSourcing\EventStore\ConcurrencyException;
9
use DDDominio\EventSourcing\EventStore\InitializableInterface;
10
use DDDominio\EventSourcing\EventStore\StoredEvent;
11
use DDDominio\EventSourcing\Serialization\SerializerInterface;
12
use DDDominio\EventSourcing\Versioning\EventUpgrader;
13
use DDDominio\EventSourcing\Versioning\Version;
14
15
class MySqlJsonEventStore extends AbstractEventStore implements InitializableInterface
16
{
17
    const MAX_UNSIGNED_BIG_INT = 9223372036854775807;
18
    const STREAMS_TABLE = 'streams';
19
    const EVENTS_TABLE = 'events';
20
21
    /**
22
     * @var \PDO
23
     */
24
    private $connection;
25
26
    /**
27
     * @param \PDO $connection
28
     * @param SerializerInterface $serializer
29
     * @param EventUpgrader $eventUpgrader
30
     */
31 11
    public function __construct(
32
        \PDO $connection,
33
        SerializerInterface $serializer,
34
        $eventUpgrader
35
    ) {
36 11
        $this->connection = $connection;
37 11
        parent::__construct($serializer, $eventUpgrader);
38 11
    }
39
40
    /**
41
     * @param string $streamId
42
     * @param int $start
43
     * @param int $count
44
     * @return EventStreamInterface
45
     */
46 4 View Code Duplication
    public function readStreamEventsForward($streamId, $start = 1, $count = null)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
47
    {
48 4
        if (!isset($count)) {
49 3
            $count = self::MAX_UNSIGNED_BIG_INT;
50
        }
51 4
        $stmt = $this->connection->prepare(
52
            'SELECT *
53
             FROM events
54
             WHERE stream_id = :streamId
55
             LIMIT :limit
56 4
             OFFSET :offset'
57
        );
58 4
        $stmt->bindValue(':streamId', $streamId);
59 4
        $stmt->bindValue(':offset', (int) $start - 1, \PDO::PARAM_INT);
60 4
        $stmt->bindValue(':limit', $count, \PDO::PARAM_INT);
61 4
        $stmt->execute();
62 4
        $results = $stmt->fetchAll();
63
64
        $storedEvents = array_map(function($event) {
65 3
            return new StoredEvent(
66 3
                $event['id'],
67 3
                $event['stream_id'],
68 3
                $event['type'],
69 3
                $event['event'],
70 3
                $event['metadata'],
71 3
                new \DateTimeImmutable($event['occurred_on']),
72 3
                Version::fromString($event['version'])
73
            );
74 4
        }, $results);
75
76 4
        return $this->domainEventStreamFromStoredEvents($storedEvents);
77
    }
78
79
    /**
80
     * @param string $streamId
81
     * @return EventStreamInterface
82
     */
83 5 View Code Duplication
    public function readFullStream($streamId)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
84
    {
85 5
        $stmt = $this->connection->prepare(
86
            'SELECT *
87
             FROM events
88 5
             WHERE stream_id = :streamId'
89
        );
90 5
        $stmt->bindValue(':streamId', $streamId);
91 5
        $stmt->execute();
92 5
        $results = $stmt->fetchAll();
93
94
        $storedEvents = array_map(function($event) {
95 4
            return new StoredEvent(
96 4
                $event['id'],
97 4
                $event['stream_id'],
98 4
                $event['type'],
99 4
                $event['event'],
100 4
                $event['metadata'],
101 4
                new \DateTimeImmutable($event['occurred_on']),
102 4
                Version::fromString($event['version'])
103
            );
104 5
        }, $results);
105
106 5
        return $this->domainEventStreamFromStoredEvents($storedEvents);
107
    }
108
109
    /**
110
     * @return EventStreamInterface[]
111
     */
112
    public function readAllStreams()
113
    {
114
        // TODO: Implement readAllStreams() method.
115
    }
116
117
    /**
118
     * @return EventStreamInterface
119
     */
120
    public function readAllEvents()
121
    {
122
        // TODO: Implement readAllEvents() method.
123
    }
124
125
    /**
126
     * @param string $streamId
127
     * @param StoredEvent[] $storedEvents
128
     * @param int $expectedVersion
129
     * @throws \Exception
130
     */
131 7
    protected function appendStoredEvents($streamId, $storedEvents, $expectedVersion)
132
    {
133 7
        $this->connection->beginTransaction();
134
        try {
135 7 View Code Duplication
            if (!$this->streamExists($streamId)) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
136 7
                $stmt = $this->connection
137 7
                    ->prepare('INSERT INTO streams (id) VALUES (:streamId)');
138 7
                $stmt->bindValue(':streamId', $streamId);
139 7
                $stmt->execute();
140
            }
141 7 View Code Duplication
            foreach ($storedEvents as $storedEvent) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
142 7
                $stmt = $this->connection->prepare(
143
                    'INSERT INTO events (stream_id, type, event, metadata, occurred_on, version)
144 7
                 VALUES (:streamId, :type, :event, :metadata, :occurredOn, :version)'
145
                );
146 7
                $stmt->bindValue(':streamId', $streamId);
147 7
                $stmt->bindValue(':type', $storedEvent->type());
148 7
                $stmt->bindValue(':event', $storedEvent->data());
149 7
                $stmt->bindValue(':metadata', $storedEvent->metadata());
150 7
                $stmt->bindValue(':occurredOn', $storedEvent->occurredOn()->format('Y-m-d H:i:s'));
151 7
                $stmt->bindValue(':version', $storedEvent->version());
152 7
                $stmt->execute();
153
            }
154 7
            $streamFinalVersion = $this->streamVersion($streamId);
155 7
            if (count($storedEvents) !== $streamFinalVersion - $expectedVersion) {
156
                throw ConcurrencyException::fromVersions(
157
                    $this->streamVersion($streamId),
158
                    $expectedVersion
159
                );
160
            }
161 7
            $this->connection->commit();
162
        } catch (\Exception $e) {
163
            $this->connection->rollBack();
164
            throw $e;
165
        }
166 7
    }
167
168
    /**
169
     * @param string $streamId
170
     * @return bool
171
     */
172 8 View Code Duplication
    protected function streamExists($streamId)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
173
    {
174 8
        $stmt = $this->connection
175 8
            ->prepare('SELECT COUNT(*) FROM streams WHERE id = :streamId');
176 8
        $stmt->bindValue(':streamId', $streamId);
177 8
        $stmt->execute();
178 8
        return boolval($stmt->fetchColumn());
179
    }
180
181
    /**
182
     * @param string $streamId
183
     * @return int
184
     */
185 7 View Code Duplication
    protected function streamVersion($streamId)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
186
    {
187 7
        $stmt = $this->connection
188 7
            ->prepare('SELECT COUNT(*) FROM events WHERE stream_id = :streamId');
189 7
        $stmt->bindValue(':streamId', $streamId);
190 7
        $stmt->execute();
191 7
        return intval($stmt->fetchColumn());
192
    }
193
    /**
194
     * @param string $type
195
     * @param Version $version
196
     * @return EventStreamInterface
197
     */
198 View Code Duplication
    protected function readStoredEventsOfTypeAndVersion($type, $version)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
199
    {
200
        $stmt = $this->connection->prepare(
201
            'SELECT *
202
             FROM events
203
             WHERE type = :type
204
             AND version = :version'
205
        );
206
        $stmt->bindValue(':type', $type);
207
        $stmt->bindValue(':version', $version);
208
        $stmt->execute();
209
        $results = $stmt->fetchAll();
210
211
        $storedEvents = array_map(function($result) {
212
            return new StoredEvent(
213
                $result['id'],
214
                $result['stream_id'],
215
                $result['type'],
216
                $result['event'],
217
                $result['metadata'],
218
                new \DateTimeImmutable($result['occurred_on']),
219
                Version::fromString($result['version'])
220
            );
221
        }, $results);
222
223
        return new EventStream($storedEvents);
224
    }
225
226 11
    public function initialize()
227
    {
228
        try {
229 11
            $this->connection->beginTransaction();
230
231 11
            $this->connection->exec(
232 11
                'CREATE TABLE `'.self::STREAMS_TABLE.'` (
233
                    `id` varchar(255) NOT NULL,
234
                    PRIMARY KEY (`id`)
235 11
                )'
236
            );
237
238 11
            $this->connection->exec(
239 11
                'CREATE TABLE `'.self::EVENTS_TABLE.'` (
240
                    `id` int(11) NOT NULL AUTO_INCREMENT,
241
                    `stream_id` varchar(255) NOT NULL,
242
                    `type` varchar(255) NOT NULL,
243
                    `event` json NOT NULL,
244
                    `metadata` json NOT NULL,
245
                    `occurred_on` datetime NOT NULL,
246
                    `version` varchar(255) NOT NULL,
247
                    PRIMARY KEY (`id`),
248
                    KEY `stream_id` (`stream_id`),
249
                    CONSTRAINT `events_ibfk_1` FOREIGN KEY (`stream_id`) REFERENCES `streams` (`id`)
250 11
                )'
251
            );
252
253 11
            $this->connection->commit();
254
        } catch (\Exception $e) {
255
            $this->connection->rollBack();
256
            throw $e;
257
        }
258 11
    }
259
260
    /**
261
     * @return bool
262
     */
263 View Code Duplication
    public function initialized()
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
264
    {
265
        try {
266
            $result = $this->connection->query('SELECT 1 FROM `'.self::STREAMS_TABLE.'` LIMIT 1');
267
        } catch (\Exception $e) {
268
            return false;
269
        }
270
        return $result !== false;
271
    }
272
}
273