Passed
Push — master ( 7b49c3...eb0ddf )
by Daniel
03:30
created

DoctrineDbalEventStore::streamVersion()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 8
Code Lines 6

Duplication

Lines 8
Ratio 100 %

Code Coverage

Tests 6
CRAP Score 1

Importance

Changes 0
Metric Value
dl 8
loc 8
ccs 6
cts 6
cp 1
rs 9.4285
c 0
b 0
f 0
cc 1
eloc 6
nc 1
nop 1
crap 1
1
<?php
2
3
namespace DDDominio\EventSourcing\EventStore\Vendor;
4
5
use DDDominio\EventSourcing\Common\EventStream;
6
use DDDominio\EventSourcing\Common\EventStreamInterface;
7
use DDDominio\EventSourcing\EventStore\AbstractEventStore;
8
use DDDominio\EventSourcing\EventStore\ConcurrencyException;
9
use DDDominio\EventSourcing\EventStore\EventStreamDoesNotExistException;
10
use DDDominio\EventSourcing\EventStore\StoredEvent;
11
use Doctrine\DBAL\Connection;
12
use DDDominio\EventSourcing\Serialization\SerializerInterface;
13
use DDDominio\EventSourcing\Versioning\EventUpgrader;
14
use DDDominio\EventSourcing\Versioning\Version;
15
use Doctrine\DBAL\Schema\Schema;
16
use DDDominio\EventSourcing\EventStore\InitializableInterface;
17
18
class DoctrineDbalEventStore extends AbstractEventStore implements InitializableInterface
19
{
20
    const MAX_UNSIGNED_BIG_INT = 9223372036854775807;
21
    const STREAMS_TABLE = 'streams';
22
    const EVENTS_TABLE = 'events';
23
24
    /**
25
     * @var Connection
26
     */
27
    private $connection;
28
29
    /**
30
     * @param Connection $connection
31
     * @param SerializerInterface $serializer
32
     * @param EventUpgrader $eventUpgrader
33
     */
34 20
    public function __construct($connection, $serializer, $eventUpgrader)
35
    {
36 20
        parent::__construct($serializer, $eventUpgrader);
37 20
        $this->connection = $connection;
38 20
    }
39
40
    /**
41
     * @param string $streamId
42
     * @param int $start
43
     * @param int $count
44
     * @return EventStreamInterface
45
     */
46 4 View Code Duplication
    public function readStreamEvents($streamId, $start = 1, $count = null)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
47
    {
48 4
        if (!isset($count)) {
49 3
            $count = self::MAX_UNSIGNED_BIG_INT;
50
        }
51 4
        $stmt = $this->connection->prepare(
52
            'SELECT *
53
             FROM events
54
             WHERE stream_id = :streamId
55
             LIMIT :limit
56 4
             OFFSET :offset'
57
        );
58 4
        $stmt->bindValue(':streamId', $streamId);
59 4
        $stmt->bindValue(':offset', (int) $start - 1, \PDO::PARAM_INT);
60 4
        $stmt->bindValue(':limit', $count, \PDO::PARAM_INT);
61 4
        $stmt->execute();
62 4
        $results = $stmt->fetchAll();
63
64
        $storedEvents = array_map(function($result) {
65 3
            return new StoredEvent(
66 3
                $result['id'],
67 3
                $result['stream_id'],
68 3
                $result['type'],
69 3
                $result['event'],
70 3
                $result['metadata'],
71 3
                new \DateTimeImmutable($result['occurred_on']),
72 3
                Version::fromString($result['version'])
73
            );
74 4
        }, $results);
75
76 4
        return $this->domainEventStreamFromStoredEvents($storedEvents);
77
    }
78
79
    /**
80
     * @param string $streamId
81
     * @return EventStreamInterface
82
     */
83 7 View Code Duplication
    public function readFullStream($streamId)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
84
    {
85 7
        $stmt = $this->connection->prepare(
86
            'SELECT *
87
             FROM events
88 7
             WHERE stream_id = :streamId'
89
        );
90 7
        $stmt->bindValue(':streamId', $streamId);
91 7
        $stmt->execute();
92 7
        $results = $stmt->fetchAll();
93
94
        $storedEvents = array_map(function($result) {
95 6
            return new StoredEvent(
96 6
                $result['id'],
97 6
                $result['stream_id'],
98 6
                $result['type'],
99 6
                $result['event'],
100 6
                $result['metadata'],
101 6
                new \DateTimeImmutable($result['occurred_on']),
102 6
                Version::fromString($result['version'])
103
            );
104 7
        }, $results);
105
106 7
        return $this->domainEventStreamFromStoredEvents($storedEvents);
107
    }
108
109
    /**
110
     * @return EventStreamInterface[]
111
     */
112 1 View Code Duplication
    public function readAllStreams()
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
113
    {
114 1
        $stmt = $this->connection->prepare(
115
            'SELECT *
116 1
             FROM streams'
117
        );
118 1
        $stmt->execute();
119
120 1
        $streams = [];
121 1
        foreach ($stmt->fetchAll() as $result) {
122 1
            $streams[] = $this->readFullStream($result['id']);
123
        }
124 1
        return $streams;
125
    }
126
127
    /**
128
     * @return EventStreamInterface
129
     */
130 1 View Code Duplication
    public function readAllEvents()
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
131
    {
132 1
        $stmt = $this->connection->prepare(
133
            'SELECT *
134 1
             FROM events'
135
        );
136 1
        $stmt->execute();
137 1
        $results = $stmt->fetchAll();
138
139
        $storedEvents = array_map(function($result) {
140 1
            return new StoredEvent(
141 1
                $result['id'],
142 1
                $result['stream_id'],
143 1
                $result['type'],
144 1
                $result['event'],
145 1
                $result['metadata'],
146 1
                new \DateTimeImmutable($result['occurred_on']),
147 1
                Version::fromString($result['version'])
148
            );
149 1
        }, $results);
150
151 1
        return $this->domainEventStreamFromStoredEvents($storedEvents);
152
    }
153
154
    /**
155
     * @param string $type
156
     * @param Version $version
157
     * @return EventStreamInterface
158
     */
159 1 View Code Duplication
    protected function readStoredEventsOfTypeAndVersion($type, $version)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
160
    {
161 1
        $stmt = $this->connection->prepare(
162
            'SELECT *
163
             FROM events
164
             WHERE type = :type
165 1
             AND version = :version'
166
        );
167 1
        $stmt->bindValue(':type', $type);
168 1
        $stmt->bindValue(':version', $version);
169 1
        $stmt->execute();
170 1
        $results = $stmt->fetchAll();
171
172
        $storedEvents = array_map(function($result) {
173 1
            return new StoredEvent(
174 1
                $result['id'],
175 1
                $result['stream_id'],
176 1
                $result['type'],
177 1
                $result['event'],
178 1
                $result['metadata'],
179 1
                new \DateTimeImmutable($result['occurred_on']),
180 1
                Version::fromString($result['version'])
181
            );
182 1
        }, $results);
183
184 1
        return new EventStream($storedEvents);
185
    }
186
187
    /**
188
     * @param string $streamId
189
     * @param StoredEvent[] $storedEvents
190
     * @param int $expectedVersion
191
     */
192 12
    protected function appendStoredEvents($streamId, $storedEvents, $expectedVersion)
193
    {
194
        $this->connection->transactional(function() use ($streamId, $storedEvents, $expectedVersion) {
195 12 View Code Duplication
            if (!$this->streamExists($streamId)) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
196 12
                $stmt = $this->connection
197 12
                    ->prepare('INSERT INTO streams (id) VALUES (:streamId)');
198 12
                $stmt->bindValue(':streamId', $streamId);
199 12
                $stmt->execute();
200
            }
201 12 View Code Duplication
            foreach ($storedEvents as $storedEvent) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
202 12
                $stmt = $this->connection->prepare(
203
                    'INSERT INTO events (stream_id, type, event, metadata, occurred_on, version)
204 12
                 VALUES (:streamId, :type, :event, :metadata, :occurredOn, :version)'
205
                );
206 12
                $stmt->bindValue(':streamId', $streamId);
207 12
                $stmt->bindValue(':type', $storedEvent->type());
208 12
                $stmt->bindValue(':event', $storedEvent->data());
209 12
                $stmt->bindValue(':metadata', $storedEvent->metadata());
210 12
                $stmt->bindValue(':occurredOn', $storedEvent->occurredOn()->format('Y-m-d H:i:s'));
211 12
                $stmt->bindValue(':version', $storedEvent->version());
212 12
                $stmt->execute();
213
            }
214 12
            $streamFinalVersion = $this->streamVersion($streamId);
215 12
            if (count($storedEvents) !== $streamFinalVersion - $expectedVersion) {
216 1
                throw ConcurrencyException::fromVersions(
217 1
                    $this->streamVersion($streamId),
218
                    $expectedVersion
219
                );
220
            }
221 12
        });
222 11
    }
223
    /**
224
     * @param string $streamId
225
     * @return int
226
     */
227 11 View Code Duplication
    protected function streamVersion($streamId)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
228
    {
229 11
        $stmt = $this->connection
230 11
            ->prepare('SELECT COUNT(*) FROM events WHERE stream_id = :streamId');
231 11
        $stmt->bindValue(':streamId', $streamId);
232 11
        $stmt->execute();
233 11
        return intval($stmt->fetchColumn());
234
    }
235
236
    /**
237
     * @param string $streamId
238
     * @param \DateTimeImmutable $datetime
239
     * @return int
240
     * @throws EventStreamDoesNotExistException
241
     */
242 3 View Code Duplication
    public function getStreamVersionAt($streamId, \DateTimeImmutable $datetime)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
243
    {
244 3
        if (!$this->streamExists($streamId)) {
245 1
            throw EventStreamDoesNotExistException::fromStreamId($streamId);
246
        }
247 2
        $stmt = $this->connection->prepare(
248
            'SELECT COUNT(*)
249
             FROM events
250
             WHERE stream_id = :streamId
251 2
             AND occurred_on <= :occurred_on'
252
        );
253 2
        $stmt->bindValue(':streamId', $streamId);
254 2
        $stmt->bindValue(':occurred_on', $datetime->format('Y-m-d H:i:s'));
255 2
        $stmt->execute();
256 2
        return intval($stmt->fetchColumn());
257
    }
258
259
    /**
260
     * @param string $streamId
261
     * @return bool
262
     */
263 14 View Code Duplication
    protected function streamExists($streamId)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
264
    {
265 14
        $stmt = $this->connection
266 14
            ->prepare('SELECT COUNT(*) FROM streams WHERE id = :streamId');
267 14
        $stmt->bindValue(':streamId', $streamId);
268 14
        $stmt->execute();
269 14
        return boolval($stmt->fetchColumn());
270
    }
271
272
    /**
273
     * Initialize the Event Store
274
     */
275 20
    public function initialize()
276
    {
277 20
        $schema = new Schema();
278
279 20
        $streamTable = $schema->createTable(self::STREAMS_TABLE);
280 20
        $streamTable->addColumn('id', 'string');
281 20
        $streamTable->setPrimaryKey(array("id"));
282
283 20
        $eventsTable = $schema->createTable(self::EVENTS_TABLE);
284 20
        $eventsTable->addColumn('id', 'integer', ['autoincrement' => true]);
285 20
        $eventsTable->addColumn('stream_id', 'string');
286 20
        $eventsTable->addColumn('type', 'string');
287 20
        $eventsTable->addColumn('event', 'text');
288 20
        $eventsTable->addColumn('metadata', 'text');
289 20
        $eventsTable->addColumn('occurred_on', 'datetime');
290 20
        $eventsTable->addColumn('version', 'string');
291 20
        $eventsTable->setPrimaryKey(['id']);
292 20
        $eventsTable->addForeignKeyConstraint($streamTable, ['stream_id'], ['id']);
293
294 20
        $queries = $schema->toSql($this->connection->getDatabasePlatform());
295 20
        $this->connection->transactional(function(Connection $connection) use ($queries) {
296 20
            foreach ($queries as $query) {
297 20
                $connection->exec($query);
298
            }
299 20
        });
300 20
    }
301
302
    /**
303
     * Check if the Event Store has been initialized
304
     *
305
     * @return bool
306
     */
307 2
    public function initialized()
308
    {
309 2
        return $this->connection->getSchemaManager()->tablesExist([self::STREAMS_TABLE]);
310
    }
311
}
312