Passed
Push — master ( d20dcf...a35003 )
by Daniel
03:16
created

DoctrineDbalEventStore   A

Complexity

Total Complexity 18

Size/Duplication

Total Lines 284
Duplicated Lines 55.63 %

Coupling/Cohesion

Components 1
Dependencies 11

Test Coverage

Coverage 97.14%

Importance

Changes 0
Metric Value
wmc 18
lcom 1
cbo 11
dl 158
loc 284
ccs 136
cts 140
cp 0.9714
rs 10
c 0
b 0
f 0

12 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 5 1
B readStreamEvents() 32 32 2
B readFullStream() 25 25 1
A readAllStreams() 0 4 1
A readAllEvents() 23 23 1
B readStoredEventsOfTypeAndVersion() 27 27 1
B appendStoredEvents() 19 31 4
A streamVersion() 8 8 1
A getStreamVersionAt() 16 16 2
A streamExists() 8 8 1
B initialize() 0 26 2
A initialized() 0 4 1

How to fix   Duplicated Code   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

1
<?php
2
3
namespace DDDominio\EventSourcing\EventStore\Vendor;
4
5
use DDDominio\EventSourcing\Common\EventStream;
6
use DDDominio\EventSourcing\Common\EventStreamInterface;
7
use DDDominio\EventSourcing\EventStore\AbstractEventStore;
8
use DDDominio\EventSourcing\EventStore\ConcurrencyException;
9
use DDDominio\EventSourcing\EventStore\EventStreamDoesNotExistException;
10
use DDDominio\EventSourcing\EventStore\StoredEvent;
11
use Doctrine\DBAL\Connection;
12
use DDDominio\EventSourcing\Serialization\SerializerInterface;
13
use DDDominio\EventSourcing\Versioning\EventUpgrader;
14
use DDDominio\EventSourcing\Versioning\Version;
15
use Doctrine\DBAL\Schema\Schema;
16
use DDDominio\EventSourcing\EventStore\InitializableInterface;
17
18
class DoctrineDbalEventStore extends AbstractEventStore implements InitializableInterface
19
{
20
    const MAX_UNSIGNED_BIG_INT = 9223372036854775807;
21
    const STREAMS_TABLE = 'streams';
22
    const EVENTS_TABLE = 'events';
23
24
    /**
25
     * @var Connection
26
     */
27
    private $connection;
28
29
    /**
30
     * @param Connection $connection
31
     * @param SerializerInterface $serializer
32
     * @param EventUpgrader $eventUpgrader
33
     */
34 18
    public function __construct($connection, $serializer, $eventUpgrader)
35
    {
36 18
        parent::__construct($serializer, $eventUpgrader);
37 18
        $this->connection = $connection;
38 18
    }
39
40
    /**
41
     * @param string $streamId
42
     * @param int $start
43
     * @param int $count
44
     * @return EventStreamInterface
45
     */
46 4 View Code Duplication
    public function readStreamEvents($streamId, $start = 1, $count = null)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
47
    {
48 4
        if (!isset($count)) {
49 3
            $count = self::MAX_UNSIGNED_BIG_INT;
50
        }
51 4
        $stmt = $this->connection->prepare(
52
            'SELECT *
53
             FROM events
54
             WHERE stream_id = :streamId
55
             LIMIT :limit
56 4
             OFFSET :offset'
57
        );
58 4
        $stmt->bindValue(':streamId', $streamId);
59 4
        $stmt->bindValue(':offset', (int) $start - 1, \PDO::PARAM_INT);
60 4
        $stmt->bindValue(':limit', $count, \PDO::PARAM_INT);
61 4
        $stmt->execute();
62 4
        $results = $stmt->fetchAll();
63
64
        $storedEvents = array_map(function($result) {
65 3
            return new StoredEvent(
66 3
                $result['id'],
67 3
                $result['stream_id'],
68 3
                $result['type'],
69 3
                $result['event'],
70 3
                $result['metadata'],
71 3
                new \DateTimeImmutable($result['occurred_on']),
72 3
                Version::fromString($result['version'])
73
            );
74 4
        }, $results);
75
76 4
        return $this->domainEventStreamFromStoredEvents($storedEvents);
77
    }
78
79
    /**
80
     * @param string $streamId
81
     * @return EventStreamInterface
82
     */
83 6 View Code Duplication
    public function readFullStream($streamId)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
84
    {
85 6
        $stmt = $this->connection->prepare(
86
            'SELECT *
87
             FROM events
88 6
             WHERE stream_id = :streamId'
89
        );
90 6
        $stmt->bindValue(':streamId', $streamId);
91 6
        $stmt->execute();
92 6
        $results = $stmt->fetchAll();
93
94
        $storedEvents = array_map(function($result) {
95 5
            return new StoredEvent(
96 5
                $result['id'],
97 5
                $result['stream_id'],
98 5
                $result['type'],
99 5
                $result['event'],
100 5
                $result['metadata'],
101 5
                new \DateTimeImmutable($result['occurred_on']),
102 5
                Version::fromString($result['version'])
103
            );
104 6
        }, $results);
105
106 6
        return $this->domainEventStreamFromStoredEvents($storedEvents);
107
    }
108
109
    /**
110
     * @return EventStreamInterface[]
111
     */
112
    public function readAllStreams()
113
    {
114
        // TODO: Implement readAllStreams() method.
115
    }
116
117
    /**
118
     * @return EventStreamInterface
119
     */
120 1 View Code Duplication
    public function readAllEvents()
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
121
    {
122 1
        $stmt = $this->connection->prepare(
123
            'SELECT *
124 1
             FROM events'
125
        );
126 1
        $stmt->execute();
127 1
        $results = $stmt->fetchAll();
128
129
        $storedEvents = array_map(function($result) {
130 1
            return new StoredEvent(
131 1
                $result['id'],
132 1
                $result['stream_id'],
133 1
                $result['type'],
134 1
                $result['event'],
135 1
                $result['metadata'],
136 1
                new \DateTimeImmutable($result['occurred_on']),
137 1
                Version::fromString($result['version'])
138
            );
139 1
        }, $results);
140
141 1
        return $this->domainEventStreamFromStoredEvents($storedEvents);
142
    }
143
144
    /**
145
     * @param string $type
146
     * @param Version $version
147
     * @return EventStreamInterface
148
     */
149 1 View Code Duplication
    protected function readStoredEventsOfTypeAndVersion($type, $version)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
150
    {
151 1
        $stmt = $this->connection->prepare(
152
            'SELECT *
153
             FROM events
154
             WHERE type = :type
155 1
             AND version = :version'
156
        );
157 1
        $stmt->bindValue(':type', $type);
158 1
        $stmt->bindValue(':version', $version);
159 1
        $stmt->execute();
160 1
        $results = $stmt->fetchAll();
161
162
        $storedEvents = array_map(function($result) {
163 1
            return new StoredEvent(
164 1
                $result['id'],
165 1
                $result['stream_id'],
166 1
                $result['type'],
167 1
                $result['event'],
168 1
                $result['metadata'],
169 1
                new \DateTimeImmutable($result['occurred_on']),
170 1
                Version::fromString($result['version'])
171
            );
172 1
        }, $results);
173
174 1
        return new EventStream($storedEvents);
175
    }
176
177
    /**
178
     * @param string $streamId
179
     * @param StoredEvent[] $storedEvents
180
     * @param int $expectedVersion
181
     */
182 10
    protected function appendStoredEvents($streamId, $storedEvents, $expectedVersion)
183
    {
184
        $this->connection->transactional(function() use ($streamId, $storedEvents, $expectedVersion) {
185 10 View Code Duplication
            if (!$this->streamExists($streamId)) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
186 10
                $stmt = $this->connection
187 10
                    ->prepare('INSERT INTO streams (id) VALUES (:streamId)');
188 10
                $stmt->bindValue(':streamId', $streamId);
189 10
                $stmt->execute();
190
            }
191 10 View Code Duplication
            foreach ($storedEvents as $storedEvent) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
192 10
                $stmt = $this->connection->prepare(
193
                    'INSERT INTO events (stream_id, type, event, metadata, occurred_on, version)
194 10
                 VALUES (:streamId, :type, :event, :metadata, :occurredOn, :version)'
195
                );
196 10
                $stmt->bindValue(':streamId', $streamId);
197 10
                $stmt->bindValue(':type', $storedEvent->type());
198 10
                $stmt->bindValue(':event', $storedEvent->data());
199 10
                $stmt->bindValue(':metadata', $storedEvent->metadata());
200 10
                $stmt->bindValue(':occurredOn', $storedEvent->occurredOn()->format('Y-m-d H:i:s'));
201 10
                $stmt->bindValue(':version', $storedEvent->version());
202 10
                $stmt->execute();
203
            }
204 10
            $streamFinalVersion = $this->streamVersion($streamId);
205 10
            if (count($storedEvents) !== $streamFinalVersion - $expectedVersion) {
206
                throw ConcurrencyException::fromVersions(
207
                    $this->streamVersion($streamId),
208
                    $expectedVersion
209
                );
210
            }
211 10
        });
212 10
    }
213
    /**
214
     * @param string $streamId
215
     * @return int
216
     */
217 10 View Code Duplication
    protected function streamVersion($streamId)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
218
    {
219 10
        $stmt = $this->connection
220 10
            ->prepare('SELECT COUNT(*) FROM events WHERE stream_id = :streamId');
221 10
        $stmt->bindValue(':streamId', $streamId);
222 10
        $stmt->execute();
223 10
        return intval($stmt->fetchColumn());
224
    }
225
226
    /**
227
     * @param string $streamId
228
     * @param \DateTimeImmutable $datetime
229
     * @return int
230
     * @throws EventStreamDoesNotExistException
231
     */
232 3 View Code Duplication
    public function getStreamVersionAt($streamId, \DateTimeImmutable $datetime)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
233
    {
234 3
        if (!$this->streamExists($streamId)) {
235 1
            throw EventStreamDoesNotExistException::fromStreamId($streamId);
236
        }
237 2
        $stmt = $this->connection->prepare(
238
            'SELECT COUNT(*)
239
             FROM events
240
             WHERE stream_id = :streamId
241 2
             AND occurred_on <= :occurred_on'
242
        );
243 2
        $stmt->bindValue(':streamId', $streamId);
244 2
        $stmt->bindValue(':occurred_on', $datetime->format('Y-m-d H:i:s'));
245 2
        $stmt->execute();
246 2
        return intval($stmt->fetchColumn());
247
    }
248
249
    /**
250
     * @param string $streamId
251
     * @return bool
252
     */
253 12 View Code Duplication
    protected function streamExists($streamId)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
254
    {
255 12
        $stmt = $this->connection
256 12
            ->prepare('SELECT COUNT(*) FROM streams WHERE id = :streamId');
257 12
        $stmt->bindValue(':streamId', $streamId);
258 12
        $stmt->execute();
259 12
        return boolval($stmt->fetchColumn());
260
    }
261
262
    /**
263
     * Initialize the Event Store
264
     */
265 18
    public function initialize()
266
    {
267 18
        $schema = new Schema();
268
269 18
        $streamTable = $schema->createTable(self::STREAMS_TABLE);
270 18
        $streamTable->addColumn('id', 'string');
271 18
        $streamTable->setPrimaryKey(array("id"));
272
273 18
        $eventsTable = $schema->createTable(self::EVENTS_TABLE);
274 18
        $eventsTable->addColumn('id', 'integer', ['autoincrement' => true]);
275 18
        $eventsTable->addColumn('stream_id', 'string');
276 18
        $eventsTable->addColumn('type', 'string');
277 18
        $eventsTable->addColumn('event', 'text');
278 18
        $eventsTable->addColumn('metadata', 'text');
279 18
        $eventsTable->addColumn('occurred_on', 'datetime');
280 18
        $eventsTable->addColumn('version', 'string');
281 18
        $eventsTable->setPrimaryKey(['id']);
282 18
        $eventsTable->addForeignKeyConstraint($streamTable, ['stream_id'], ['id']);
283
284 18
        $queries = $schema->toSql($this->connection->getDatabasePlatform());
285 18
        $this->connection->transactional(function(Connection $connection) use ($queries) {
286 18
            foreach ($queries as $query) {
287 18
                $connection->exec($query);
288
            }
289 18
        });
290 18
    }
291
292
    /**
293
     * Check if the Event Store has been initialized
294
     *
295
     * @return bool
296
     */
297 2
    public function initialized()
298
    {
299 2
        return $this->connection->getSchemaManager()->tablesExist([self::STREAMS_TABLE]);
300
    }
301
}
302