Completed
Push — master ( a35003...7b49c3 )
by Daniel
03:21
created

DoctrineDbalEventStore::readAllStreams()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 14
Code Lines 8

Duplication

Lines 14
Ratio 100 %

Code Coverage

Tests 8
CRAP Score 2

Importance

Changes 0
Metric Value
dl 14
loc 14
ccs 8
cts 8
cp 1
rs 9.4285
c 0
b 0
f 0
cc 2
eloc 8
nc 2
nop 0
crap 2
1
<?php
2
3
namespace DDDominio\EventSourcing\EventStore\Vendor;
4
5
use DDDominio\EventSourcing\Common\EventStream;
6
use DDDominio\EventSourcing\Common\EventStreamInterface;
7
use DDDominio\EventSourcing\EventStore\AbstractEventStore;
8
use DDDominio\EventSourcing\EventStore\ConcurrencyException;
9
use DDDominio\EventSourcing\EventStore\EventStreamDoesNotExistException;
10
use DDDominio\EventSourcing\EventStore\StoredEvent;
11
use Doctrine\DBAL\Connection;
12
use DDDominio\EventSourcing\Serialization\SerializerInterface;
13
use DDDominio\EventSourcing\Versioning\EventUpgrader;
14
use DDDominio\EventSourcing\Versioning\Version;
15
use Doctrine\DBAL\Schema\Schema;
16
use DDDominio\EventSourcing\EventStore\InitializableInterface;
17
18
class DoctrineDbalEventStore extends AbstractEventStore implements InitializableInterface
19
{
20
    const MAX_UNSIGNED_BIG_INT = 9223372036854775807;
21
    const STREAMS_TABLE = 'streams';
22
    const EVENTS_TABLE = 'events';
23
24
    /**
25
     * @var Connection
26
     */
27
    private $connection;
28
29
    /**
30
     * @param Connection $connection
31
     * @param SerializerInterface $serializer
32
     * @param EventUpgrader $eventUpgrader
33
     */
34 19
    public function __construct($connection, $serializer, $eventUpgrader)
35
    {
36 19
        parent::__construct($serializer, $eventUpgrader);
37 19
        $this->connection = $connection;
38 19
    }
39
40
    /**
41
     * @param string $streamId
42
     * @param int $start
43
     * @param int $count
44
     * @return EventStreamInterface
45
     */
46 4 View Code Duplication
    public function readStreamEvents($streamId, $start = 1, $count = null)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
47
    {
48 4
        if (!isset($count)) {
49 3
            $count = self::MAX_UNSIGNED_BIG_INT;
50
        }
51 4
        $stmt = $this->connection->prepare(
52
            'SELECT *
53
             FROM events
54
             WHERE stream_id = :streamId
55
             LIMIT :limit
56 4
             OFFSET :offset'
57
        );
58 4
        $stmt->bindValue(':streamId', $streamId);
59 4
        $stmt->bindValue(':offset', (int) $start - 1, \PDO::PARAM_INT);
60 4
        $stmt->bindValue(':limit', $count, \PDO::PARAM_INT);
61 4
        $stmt->execute();
62 4
        $results = $stmt->fetchAll();
63
64
        $storedEvents = array_map(function($result) {
65 3
            return new StoredEvent(
66 3
                $result['id'],
67 3
                $result['stream_id'],
68 3
                $result['type'],
69 3
                $result['event'],
70 3
                $result['metadata'],
71 3
                new \DateTimeImmutable($result['occurred_on']),
72 3
                Version::fromString($result['version'])
73
            );
74 4
        }, $results);
75
76 4
        return $this->domainEventStreamFromStoredEvents($storedEvents);
77
    }
78
79
    /**
80
     * @param string $streamId
81
     * @return EventStreamInterface
82
     */
83 7 View Code Duplication
    public function readFullStream($streamId)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
84
    {
85 7
        $stmt = $this->connection->prepare(
86
            'SELECT *
87
             FROM events
88 7
             WHERE stream_id = :streamId'
89
        );
90 7
        $stmt->bindValue(':streamId', $streamId);
91 7
        $stmt->execute();
92 7
        $results = $stmt->fetchAll();
93
94
        $storedEvents = array_map(function($result) {
95 6
            return new StoredEvent(
96 6
                $result['id'],
97 6
                $result['stream_id'],
98 6
                $result['type'],
99 6
                $result['event'],
100 6
                $result['metadata'],
101 6
                new \DateTimeImmutable($result['occurred_on']),
102 6
                Version::fromString($result['version'])
103
            );
104 7
        }, $results);
105
106 7
        return $this->domainEventStreamFromStoredEvents($storedEvents);
107
    }
108
109
    /**
110
     * @return EventStreamInterface[]
111
     */
112 1 View Code Duplication
    public function readAllStreams()
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
113
    {
114 1
        $stmt = $this->connection->prepare(
115
            'SELECT *
116 1
             FROM streams'
117
        );
118 1
        $stmt->execute();
119
120 1
        $streams = [];
121 1
        foreach ($stmt->fetchAll() as $result) {
122 1
            $streams[] = $this->readFullStream($result['id']);
123
        }
124 1
        return $streams;
125
    }
126
127
    /**
128
     * @return EventStreamInterface
129
     */
130 1 View Code Duplication
    public function readAllEvents()
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
131
    {
132 1
        $stmt = $this->connection->prepare(
133
            'SELECT *
134 1
             FROM events'
135
        );
136 1
        $stmt->execute();
137 1
        $results = $stmt->fetchAll();
138
139
        $storedEvents = array_map(function($result) {
140 1
            return new StoredEvent(
141 1
                $result['id'],
142 1
                $result['stream_id'],
143 1
                $result['type'],
144 1
                $result['event'],
145 1
                $result['metadata'],
146 1
                new \DateTimeImmutable($result['occurred_on']),
147 1
                Version::fromString($result['version'])
148
            );
149 1
        }, $results);
150
151 1
        return $this->domainEventStreamFromStoredEvents($storedEvents);
152
    }
153
154
    /**
155
     * @param string $type
156
     * @param Version $version
157
     * @return EventStreamInterface
158
     */
159 1 View Code Duplication
    protected function readStoredEventsOfTypeAndVersion($type, $version)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
160
    {
161 1
        $stmt = $this->connection->prepare(
162
            'SELECT *
163
             FROM events
164
             WHERE type = :type
165 1
             AND version = :version'
166
        );
167 1
        $stmt->bindValue(':type', $type);
168 1
        $stmt->bindValue(':version', $version);
169 1
        $stmt->execute();
170 1
        $results = $stmt->fetchAll();
171
172
        $storedEvents = array_map(function($result) {
173 1
            return new StoredEvent(
174 1
                $result['id'],
175 1
                $result['stream_id'],
176 1
                $result['type'],
177 1
                $result['event'],
178 1
                $result['metadata'],
179 1
                new \DateTimeImmutable($result['occurred_on']),
180 1
                Version::fromString($result['version'])
181
            );
182 1
        }, $results);
183
184 1
        return new EventStream($storedEvents);
185
    }
186
187
    /**
188
     * @param string $streamId
189
     * @param StoredEvent[] $storedEvents
190
     * @param int $expectedVersion
191
     */
192 11
    protected function appendStoredEvents($streamId, $storedEvents, $expectedVersion)
193
    {
194
        $this->connection->transactional(function() use ($streamId, $storedEvents, $expectedVersion) {
195 11 View Code Duplication
            if (!$this->streamExists($streamId)) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
196 11
                $stmt = $this->connection
197 11
                    ->prepare('INSERT INTO streams (id) VALUES (:streamId)');
198 11
                $stmt->bindValue(':streamId', $streamId);
199 11
                $stmt->execute();
200
            }
201 11 View Code Duplication
            foreach ($storedEvents as $storedEvent) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
202 11
                $stmt = $this->connection->prepare(
203
                    'INSERT INTO events (stream_id, type, event, metadata, occurred_on, version)
204 11
                 VALUES (:streamId, :type, :event, :metadata, :occurredOn, :version)'
205
                );
206 11
                $stmt->bindValue(':streamId', $streamId);
207 11
                $stmt->bindValue(':type', $storedEvent->type());
208 11
                $stmt->bindValue(':event', $storedEvent->data());
209 11
                $stmt->bindValue(':metadata', $storedEvent->metadata());
210 11
                $stmt->bindValue(':occurredOn', $storedEvent->occurredOn()->format('Y-m-d H:i:s'));
211 11
                $stmt->bindValue(':version', $storedEvent->version());
212 11
                $stmt->execute();
213
            }
214 11
            $streamFinalVersion = $this->streamVersion($streamId);
215 11
            if (count($storedEvents) !== $streamFinalVersion - $expectedVersion) {
216
                throw ConcurrencyException::fromVersions(
217
                    $this->streamVersion($streamId),
218
                    $expectedVersion
219
                );
220
            }
221 11
        });
222 11
    }
223
    /**
224
     * @param string $streamId
225
     * @return int
226
     */
227 11 View Code Duplication
    protected function streamVersion($streamId)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
228
    {
229 11
        $stmt = $this->connection
230 11
            ->prepare('SELECT COUNT(*) FROM events WHERE stream_id = :streamId');
231 11
        $stmt->bindValue(':streamId', $streamId);
232 11
        $stmt->execute();
233 11
        return intval($stmt->fetchColumn());
234
    }
235
236
    /**
237
     * @param string $streamId
238
     * @param \DateTimeImmutable $datetime
239
     * @return int
240
     * @throws EventStreamDoesNotExistException
241
     */
242 3 View Code Duplication
    public function getStreamVersionAt($streamId, \DateTimeImmutable $datetime)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
243
    {
244 3
        if (!$this->streamExists($streamId)) {
245 1
            throw EventStreamDoesNotExistException::fromStreamId($streamId);
246
        }
247 2
        $stmt = $this->connection->prepare(
248
            'SELECT COUNT(*)
249
             FROM events
250
             WHERE stream_id = :streamId
251 2
             AND occurred_on <= :occurred_on'
252
        );
253 2
        $stmt->bindValue(':streamId', $streamId);
254 2
        $stmt->bindValue(':occurred_on', $datetime->format('Y-m-d H:i:s'));
255 2
        $stmt->execute();
256 2
        return intval($stmt->fetchColumn());
257
    }
258
259
    /**
260
     * @param string $streamId
261
     * @return bool
262
     */
263 13 View Code Duplication
    protected function streamExists($streamId)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
264
    {
265 13
        $stmt = $this->connection
266 13
            ->prepare('SELECT COUNT(*) FROM streams WHERE id = :streamId');
267 13
        $stmt->bindValue(':streamId', $streamId);
268 13
        $stmt->execute();
269 13
        return boolval($stmt->fetchColumn());
270
    }
271
272
    /**
273
     * Initialize the Event Store
274
     */
275 19
    public function initialize()
276
    {
277 19
        $schema = new Schema();
278
279 19
        $streamTable = $schema->createTable(self::STREAMS_TABLE);
280 19
        $streamTable->addColumn('id', 'string');
281 19
        $streamTable->setPrimaryKey(array("id"));
282
283 19
        $eventsTable = $schema->createTable(self::EVENTS_TABLE);
284 19
        $eventsTable->addColumn('id', 'integer', ['autoincrement' => true]);
285 19
        $eventsTable->addColumn('stream_id', 'string');
286 19
        $eventsTable->addColumn('type', 'string');
287 19
        $eventsTable->addColumn('event', 'text');
288 19
        $eventsTable->addColumn('metadata', 'text');
289 19
        $eventsTable->addColumn('occurred_on', 'datetime');
290 19
        $eventsTable->addColumn('version', 'string');
291 19
        $eventsTable->setPrimaryKey(['id']);
292 19
        $eventsTable->addForeignKeyConstraint($streamTable, ['stream_id'], ['id']);
293
294 19
        $queries = $schema->toSql($this->connection->getDatabasePlatform());
295 19
        $this->connection->transactional(function(Connection $connection) use ($queries) {
296 19
            foreach ($queries as $query) {
297 19
                $connection->exec($query);
298
            }
299 19
        });
300 19
    }
301
302
    /**
303
     * Check if the Event Store has been initialized
304
     *
305
     * @return bool
306
     */
307 2
    public function initialized()
308
    {
309 2
        return $this->connection->getSchemaManager()->tablesExist([self::STREAMS_TABLE]);
310
    }
311
}
312