DoctrineDbalEventStore   A
last analyzed

Complexity

Total Complexity 20

Size/Duplication

Total Lines 296
Duplicated Lines 59.8 %

Coupling/Cohesion

Components 1
Dependencies 11

Test Coverage

Coverage 100%

Importance

Changes 0
Metric Value
wmc 20
lcom 1
cbo 11
dl 177
loc 296
ccs 151
cts 151
cp 1
rs 10
c 0
b 0
f 0

12 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 5 1
A readStreamEvents() 32 32 2
A readFullStream() 25 25 1
A readAllStreams() 14 14 2
A readAllEvents() 23 23 1
A readStoredEventsOfTypeAndVersion() 27 27 1
A appendStoredEvents() 24 34 5
A streamVersion() 8 8 1
A getStreamVersionAt() 16 16 2
A streamExists() 8 8 1
A initialize() 0 26 2
A initialized() 0 4 1

How to fix   Duplicated Code   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

1
<?php
2
3
namespace DDDominio\EventSourcing\EventStore\Vendor;
4
5
use DDDominio\EventSourcing\Common\EventStream;
6
use DDDominio\EventSourcing\Common\EventStreamInterface;
7
use DDDominio\EventSourcing\EventStore\AbstractEventStore;
8
use DDDominio\EventSourcing\EventStore\ConcurrencyException;
9
use DDDominio\EventSourcing\EventStore\EventStreamDoesNotExistException;
10
use DDDominio\EventSourcing\EventStore\StoredEvent;
11
use Doctrine\DBAL\Connection;
12
use DDDominio\EventSourcing\Serialization\SerializerInterface;
13
use DDDominio\EventSourcing\Versioning\EventUpgrader;
14
use DDDominio\EventSourcing\Versioning\Version;
15
use Doctrine\DBAL\Schema\Schema;
16
use DDDominio\EventSourcing\EventStore\InitializableInterface;
17
18
class DoctrineDbalEventStore extends AbstractEventStore implements InitializableInterface
19
{
20
    const STREAMS_TABLE = 'streams';
21
    const EVENTS_TABLE = 'events';
22
23
    /**
24
     * @var Connection
25
     */
26
    private $connection;
27
28
    /**
29
     * @param Connection $connection
30
     * @param SerializerInterface $serializer
31
     * @param EventUpgrader $eventUpgrader
32
     */
33 21
    public function __construct($connection, $serializer, $eventUpgrader)
34
    {
35 21
        parent::__construct($serializer, $eventUpgrader);
36 21
        $this->connection = $connection;
37 21
    }
38
39
    /**
40
     * @param string $streamId
41
     * @param int $start
42
     * @param int $count
43
     * @return EventStreamInterface
44
     */
45 4 View Code Duplication
    public function readStreamEvents($streamId, $start = 1, $count = null)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
46
    {
47 4
        if (!isset($count)) {
48 3
            $count = PHP_INT_MAX;
49
        }
50 4
        $stmt = $this->connection->prepare(
51
            'SELECT *
52
             FROM events
53
             WHERE stream_id = :streamId
54
             LIMIT :limit
55 4
             OFFSET :offset'
56
        );
57 4
        $stmt->bindValue(':streamId', $streamId);
58 4
        $stmt->bindValue(':offset', (int) $start - 1, \PDO::PARAM_INT);
59 4
        $stmt->bindValue(':limit', $count, \PDO::PARAM_INT);
60 4
        $stmt->execute();
61 4
        $results = $stmt->fetchAll();
0 ignored issues
show
Deprecated Code introduced by
The method Doctrine\DBAL\Statement::fetchAll() has been deprecated with message: Use fetchAllNumeric(), fetchAllAssociative() or fetchFirstColumn() instead.

This method has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the method will be removed from the class and what other method or class to use instead.

Loading history...
62
63 4
        $storedEvents = array_map(function($result) {
64 3
            return new StoredEvent(
65 3
                $result['id'],
66 3
                $result['stream_id'],
67 3
                $result['type'],
68 3
                $result['event'],
69 3
                $result['metadata'],
70 3
                new \DateTimeImmutable($result['occurred_on']),
71 3
                Version::fromString($result['version'])
72
            );
73 4
        }, $results);
74
75 4
        return $this->domainEventStreamFromStoredEvents(new EventStream($storedEvents));
76
    }
77
78
    /**
79
     * @param string $streamId
80
     * @return EventStreamInterface
81
     */
82 7 View Code Duplication
    public function readFullStream($streamId)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
83
    {
84 7
        $stmt = $this->connection->prepare(
85
            'SELECT *
86
             FROM events
87 7
             WHERE stream_id = :streamId'
88
        );
89 7
        $stmt->bindValue(':streamId', $streamId);
90 7
        $stmt->execute();
91 7
        $results = $stmt->fetchAll();
0 ignored issues
show
Deprecated Code introduced by
The method Doctrine\DBAL\Statement::fetchAll() has been deprecated with message: Use fetchAllNumeric(), fetchAllAssociative() or fetchFirstColumn() instead.

This method has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the method will be removed from the class and what other method or class to use instead.

Loading history...
92
93 7
        $storedEvents = array_map(function($result) {
94 6
            return new StoredEvent(
95 6
                $result['id'],
96 6
                $result['stream_id'],
97 6
                $result['type'],
98 6
                $result['event'],
99 6
                $result['metadata'],
100 6
                new \DateTimeImmutable($result['occurred_on']),
101 6
                Version::fromString($result['version'])
102
            );
103 7
        }, $results);
104
105 7
        return $this->domainEventStreamFromStoredEvents(new EventStream($storedEvents));
106
    }
107
108
    /**
109
     * @return EventStreamInterface[]
110
     */
111 1 View Code Duplication
    public function readAllStreams()
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
112
    {
113 1
        $stmt = $this->connection->prepare(
114
            'SELECT *
115 1
             FROM streams'
116
        );
117 1
        $stmt->execute();
118
119 1
        $streams = [];
120 1
        foreach ($stmt->fetchAll() as $result) {
0 ignored issues
show
Deprecated Code introduced by
The method Doctrine\DBAL\Statement::fetchAll() has been deprecated with message: Use fetchAllNumeric(), fetchAllAssociative() or fetchFirstColumn() instead.

This method has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the method will be removed from the class and what other method or class to use instead.

Loading history...
121 1
            $streams[] = $this->readFullStream($result['id']);
122
        }
123 1
        return $streams;
124
    }
125
126
    /**
127
     * @return EventStreamInterface
128
     */
129 1 View Code Duplication
    public function readAllEvents()
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
130
    {
131 1
        $stmt = $this->connection->prepare(
132
            'SELECT *
133 1
             FROM events'
134
        );
135 1
        $stmt->execute();
136 1
        $results = $stmt->fetchAll();
0 ignored issues
show
Deprecated Code introduced by
The method Doctrine\DBAL\Statement::fetchAll() has been deprecated with message: Use fetchAllNumeric(), fetchAllAssociative() or fetchFirstColumn() instead.

This method has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the method will be removed from the class and what other method or class to use instead.

Loading history...
137
138 1
        $storedEvents = array_map(function($result) {
139 1
            return new StoredEvent(
140 1
                $result['id'],
141 1
                $result['stream_id'],
142 1
                $result['type'],
143 1
                $result['event'],
144 1
                $result['metadata'],
145 1
                new \DateTimeImmutable($result['occurred_on']),
146 1
                Version::fromString($result['version'])
147
            );
148 1
        }, $results);
149
150 1
        return $this->domainEventStreamFromStoredEvents(new EventStream($storedEvents));
151
    }
152
153
    /**
154
     * @param string $type
155
     * @param Version $version
156
     * @return EventStreamInterface
157
     */
158 1 View Code Duplication
    protected function readStoredEventsOfTypeAndVersion($type, $version)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
159
    {
160 1
        $stmt = $this->connection->prepare(
161
            'SELECT *
162
             FROM events
163
             WHERE type = :type
164 1
             AND version = :version'
165
        );
166 1
        $stmt->bindValue(':type', $type);
167 1
        $stmt->bindValue(':version', $version);
168 1
        $stmt->execute();
169 1
        $results = $stmt->fetchAll();
0 ignored issues
show
Deprecated Code introduced by
The method Doctrine\DBAL\Statement::fetchAll() has been deprecated with message: Use fetchAllNumeric(), fetchAllAssociative() or fetchFirstColumn() instead.

This method has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the method will be removed from the class and what other method or class to use instead.

Loading history...
170
171 1
        $storedEvents = array_map(function($result) {
172 1
            return new StoredEvent(
173 1
                $result['id'],
174 1
                $result['stream_id'],
175 1
                $result['type'],
176 1
                $result['event'],
177 1
                $result['metadata'],
178 1
                new \DateTimeImmutable($result['occurred_on']),
179 1
                Version::fromString($result['version'])
180
            );
181 1
        }, $results);
182
183 1
        return new EventStream($storedEvents);
184
    }
185
186
    /**
187
     * @param string $streamId
188
     * @param StoredEvent[] $storedEvents
189
     * @param int $expectedVersion
190
     */
191
    protected function appendStoredEvents($streamId, $storedEvents, $expectedVersion)
192
    {
193 13
        $this->connection->transactional(function() use ($streamId, $storedEvents, $expectedVersion) {
194 13 View Code Duplication
            if (!$this->streamExists($streamId)) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
195 13
                $stmt = $this->connection
196 13
                    ->prepare('INSERT INTO streams (id) VALUES (:streamId)');
197 13
                $stmt->bindValue(':streamId', $streamId);
198 13
                $stmt->execute();
199
            }
200 13
            $stmt = $this->connection->prepare(
201
                'INSERT INTO events (stream_id, type, event, metadata, occurred_on, version)
202 13
                 VALUES (:streamId, :type, :event, :metadata, :occurredOn, :version)'
203
            );
204 13 View Code Duplication
            foreach ($storedEvents as $storedEvent) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
205 13
                $stmt->bindValue(':streamId', $streamId);
206 13
                $stmt->bindValue(':type', $storedEvent->type());
207 13
                $stmt->bindValue(':event', $storedEvent->data());
208 13
                $stmt->bindValue(':metadata', $storedEvent->metadata());
209 13
                $stmt->bindValue(':occurredOn', $storedEvent->occurredOn()->format('Y-m-d H:i:s'));
210 13
                $stmt->bindValue(':version', $storedEvent->version());
211 13
                $stmt->execute();
212
            }
213
214 13 View Code Duplication
            if ($expectedVersion !== self::EXPECTED_VERSION_ANY) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
215 12
                $streamFinalVersion = $this->streamVersion($streamId);
216 12
                if (count($storedEvents) !== $streamFinalVersion - $expectedVersion) {
217 1
                    throw ConcurrencyException::fromVersions(
218 1
                        $this->streamVersion($streamId),
219
                        $expectedVersion
220
                    );
221
                }
222
            }
223 13
        });
224 12
    }
225
    /**
226
     * @param string $streamId
227
     * @return int
228
     */
229 11 View Code Duplication
    protected function streamVersion($streamId)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
230
    {
231 11
        $stmt = $this->connection
232 11
            ->prepare('SELECT COUNT(*) FROM events WHERE stream_id = :streamId');
233 11
        $stmt->bindValue(':streamId', $streamId);
234 11
        $stmt->execute();
235 11
        return intval($stmt->fetchColumn());
0 ignored issues
show
Deprecated Code introduced by
The method Doctrine\DBAL\Statement::fetchColumn() has been deprecated with message: Use fetchOne() instead.

This method has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the method will be removed from the class and what other method or class to use instead.

Loading history...
236
    }
237
238
    /**
239
     * @param string $streamId
240
     * @param \DateTimeImmutable $datetime
241
     * @return int
242
     * @throws EventStreamDoesNotExistException
243
     */
244 3 View Code Duplication
    public function getStreamVersionAt($streamId, \DateTimeImmutable $datetime)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
245
    {
246 3
        if (!$this->streamExists($streamId)) {
247 1
            throw EventStreamDoesNotExistException::fromStreamId($streamId);
248
        }
249 2
        $stmt = $this->connection->prepare(
250
            'SELECT COUNT(*)
251
             FROM events
252
             WHERE stream_id = :streamId
253 2
             AND occurred_on <= :occurred_on'
254
        );
255 2
        $stmt->bindValue(':streamId', $streamId);
256 2
        $stmt->bindValue(':occurred_on', $datetime->format('Y-m-d H:i:s'));
257 2
        $stmt->execute();
258 2
        return intval($stmt->fetchColumn());
0 ignored issues
show
Deprecated Code introduced by
The method Doctrine\DBAL\Statement::fetchColumn() has been deprecated with message: Use fetchOne() instead.

This method has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the method will be removed from the class and what other method or class to use instead.

Loading history...
259
    }
260
261
    /**
262
     * @param string $streamId
263
     * @return bool
264
     */
265 15 View Code Duplication
    protected function streamExists($streamId)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
266
    {
267 15
        $stmt = $this->connection
268 15
            ->prepare('SELECT COUNT(*) FROM streams WHERE id = :streamId');
269 15
        $stmt->bindValue(':streamId', $streamId);
270 15
        $stmt->execute();
271 15
        return boolval($stmt->fetchColumn());
0 ignored issues
show
Deprecated Code introduced by
The method Doctrine\DBAL\Statement::fetchColumn() has been deprecated with message: Use fetchOne() instead.

This method has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the method will be removed from the class and what other method or class to use instead.

Loading history...
272
    }
273
274
    /**
275
     * Initialize the Event Store
276
     */
277 21
    public function initialize()
278
    {
279 21
        $schema = new Schema();
280
281 21
        $streamTable = $schema->createTable(self::STREAMS_TABLE);
282 21
        $streamTable->addColumn('id', 'string');
283 21
        $streamTable->setPrimaryKey(array("id"));
284
285 21
        $eventsTable = $schema->createTable(self::EVENTS_TABLE);
286 21
        $eventsTable->addColumn('id', 'integer', ['autoincrement' => true]);
287 21
        $eventsTable->addColumn('stream_id', 'string');
288 21
        $eventsTable->addColumn('type', 'string');
289 21
        $eventsTable->addColumn('event', 'text');
290 21
        $eventsTable->addColumn('metadata', 'text');
291 21
        $eventsTable->addColumn('occurred_on', 'datetime');
292 21
        $eventsTable->addColumn('version', 'string');
293 21
        $eventsTable->setPrimaryKey(['id']);
294 21
        $eventsTable->addForeignKeyConstraint($streamTable, ['stream_id'], ['id']);
295
296 21
        $queries = $schema->toSql($this->connection->getDatabasePlatform());
297 21
        $this->connection->transactional(function(Connection $connection) use ($queries) {
298 21
            foreach ($queries as $query) {
299 21
                $connection->exec($query);
0 ignored issues
show
Deprecated Code introduced by
The method Doctrine\DBAL\Connection::exec() has been deprecated with message: Use {@link executeStatement()} instead.

This method has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the method will be removed from the class and what other method or class to use instead.

Loading history...
300
            }
301 21
        });
302 21
    }
303
304
    /**
305
     * Check if the Event Store has been initialized
306
     *
307
     * @return bool
308
     */
309 2
    public function initialized()
310
    {
311 2
        return $this->connection->getSchemaManager()->tablesExist([self::STREAMS_TABLE]);
312
    }
313
}
314