Completed
Push — master ( ab1322...e55add )
by Daniel
03:00
created

DoctrineDbalEventStore   A

Complexity

Total Complexity 16

Size/Duplication

Total Lines 242
Duplicated Lines 49.17 %

Coupling/Cohesion

Components 1
Dependencies 10

Test Coverage

Coverage 93.22%

Importance

Changes 0
Metric Value
dl 119
loc 242
c 0
b 0
f 0
wmc 16
lcom 1
cbo 10
ccs 110
cts 118
cp 0.9322
rs 10

11 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 5 1
B readStreamEventsForward() 32 32 2
B readFullStream() 25 25 1
A readAllStreams() 0 4 1
A readAllEvents() 0 4 1
B readStoredEventsOfTypeAndVersion() 27 27 1
B appendStoredEvents() 19 31 4
A streamVersion() 8 8 1
A streamExists() 8 8 1
B initialize() 0 26 2
A initialized() 0 4 1

How to fix   Duplicated Code   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

1
<?php
2
3
namespace DDDominio\EventSourcing\EventStore\Vendor;
4
5
use DDDominio\EventSourcing\Common\EventStream;
6
use DDDominio\EventSourcing\Common\EventStreamInterface;
7
use DDDominio\EventSourcing\EventStore\AbstractEventStore;
8
use DDDominio\EventSourcing\EventStore\ConcurrencyException;
9
use DDDominio\EventSourcing\EventStore\StoredEvent;
10
use Doctrine\DBAL\Connection;
11
use DDDominio\EventSourcing\Serialization\SerializerInterface;
12
use DDDominio\EventSourcing\Versioning\EventUpgrader;
13
use DDDominio\EventSourcing\Versioning\Version;
14
use Doctrine\DBAL\Schema\Schema;
15
use DDDominio\EventSourcing\EventStore\InitializableInterface;
16
17
class DoctrineDbalEventStore extends AbstractEventStore implements InitializableInterface
18
{
19
    const MAX_UNSIGNED_BIG_INT = 9223372036854775807;
20
    const STREAMS_TABLE = 'streams';
21
    const EVENTS_TABLE = 'events';
22
23
    /**
24
     * @var Connection
25
     */
26
    private $connection;
27
28
    /**
29
     * @param Connection $connection
30
     * @param SerializerInterface $serializer
31
     * @param EventUpgrader $eventUpgrader
32
     */
33 12
    public function __construct($connection, $serializer, $eventUpgrader)
34
    {
35 12
        parent::__construct($serializer, $eventUpgrader);
36 12
        $this->connection = $connection;
37 12
    }
38
39
    /**
40
     * @param string $streamId
41
     * @param int $start
42
     * @param int $count
43
     * @return EventStreamInterface
44
     */
45 4 View Code Duplication
    public function readStreamEventsForward($streamId, $start = 1, $count = null)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
46
    {
47 4
        if (!isset($count)) {
48 3
            $count = self::MAX_UNSIGNED_BIG_INT;
49
        }
50 4
        $stmt = $this->connection->prepare(
51
            'SELECT *
52
             FROM events
53
             WHERE stream_id = :streamId
54
             LIMIT :limit
55 4
             OFFSET :offset'
56
        );
57 4
        $stmt->bindValue(':streamId', $streamId);
58 4
        $stmt->bindValue(':offset', (int) $start - 1, \PDO::PARAM_INT);
59 4
        $stmt->bindValue(':limit', $count, \PDO::PARAM_INT);
60 4
        $stmt->execute();
61 4
        $results = $stmt->fetchAll();
62
63
        $storedEvents = array_map(function($result) {
64 3
            return new StoredEvent(
65 3
                $result['id'],
66 3
                $result['stream_id'],
67 3
                $result['type'],
68 3
                $result['event'],
69 3
                $result['metadata'],
70 3
                new \DateTimeImmutable($result['occurred_on']),
71 3
                Version::fromString($result['version'])
72
            );
73 4
        }, $results);
74
75 4
        return $this->domainEventStreamFromStoredEvents($storedEvents);
76
    }
77
78
    /**
79
     * @param string $streamId
80
     * @return EventStreamInterface
81
     */
82 6 View Code Duplication
    public function readFullStream($streamId)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
83
    {
84 6
        $stmt = $this->connection->prepare(
85
            'SELECT *
86
             FROM events
87 6
             WHERE stream_id = :streamId'
88
        );
89 6
        $stmt->bindValue(':streamId', $streamId);
90 6
        $stmt->execute();
91 6
        $results = $stmt->fetchAll();
92
93
        $storedEvents = array_map(function($result) {
94 5
            return new StoredEvent(
95 5
                $result['id'],
96 5
                $result['stream_id'],
97 5
                $result['type'],
98 5
                $result['event'],
99 5
                $result['metadata'],
100 5
                new \DateTimeImmutable($result['occurred_on']),
101 5
                Version::fromString($result['version'])
102
            );
103 6
        }, $results);
104
105 6
        return $this->domainEventStreamFromStoredEvents($storedEvents);
106
    }
107
108
    /**
109
     * @return EventStreamInterface[]
110
     */
111
    public function readAllStreams()
112
    {
113
        // TODO: Implement readAllStreams() method.
114
    }
115
116
    /**
117
     * @return EventStreamInterface
118
     */
119
    public function readAllEvents()
120
    {
121
        // TODO: Implement readAllEvents() method.
122
    }
123
124
    /**
125
     * @param string $type
126
     * @param Version $version
127
     * @return EventStreamInterface
128
     */
129 1 View Code Duplication
    protected function readStoredEventsOfTypeAndVersion($type, $version)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
130
    {
131 1
        $stmt = $this->connection->prepare(
132
            'SELECT *
133
             FROM events
134
             WHERE type = :type
135 1
             AND version = :version'
136
        );
137 1
        $stmt->bindValue(':type', $type);
138 1
        $stmt->bindValue(':version', $version);
139 1
        $stmt->execute();
140 1
        $results = $stmt->fetchAll();
141
142
        $storedEvents = array_map(function($result) {
143 1
            return new StoredEvent(
144 1
                $result['id'],
145 1
                $result['stream_id'],
146 1
                $result['type'],
147 1
                $result['event'],
148 1
                $result['metadata'],
149 1
                new \DateTimeImmutable($result['occurred_on']),
150 1
                Version::fromString($result['version'])
151
            );
152 1
        }, $results);
153
154 1
        return new EventStream($storedEvents);
155
    }
156
157
    /**
158
     * @param string $streamId
159
     * @param StoredEvent[] $storedEvents
160
     * @param int $expectedVersion
161
     */
162 7
    protected function appendStoredEvents($streamId, $storedEvents, $expectedVersion)
163
    {
164
        $this->connection->transactional(function() use ($streamId, $storedEvents, $expectedVersion) {
165 7 View Code Duplication
            if (!$this->streamExists($streamId)) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
166 7
                $stmt = $this->connection
167 7
                    ->prepare('INSERT INTO streams (id) VALUES (:streamId)');
168 7
                $stmt->bindValue(':streamId', $streamId);
169 7
                $stmt->execute();
170
            }
171 7 View Code Duplication
            foreach ($storedEvents as $storedEvent) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
172 7
                $stmt = $this->connection->prepare(
173
                    'INSERT INTO events (stream_id, type, event, metadata, occurred_on, version)
174 7
                 VALUES (:streamId, :type, :event, :metadata, :occurredOn, :version)'
175
                );
176 7
                $stmt->bindValue(':streamId', $streamId);
177 7
                $stmt->bindValue(':type', $storedEvent->type());
178 7
                $stmt->bindValue(':event', $storedEvent->data());
179 7
                $stmt->bindValue(':metadata', $storedEvent->metadata());
180 7
                $stmt->bindValue(':occurredOn', $storedEvent->occurredOn()->format('Y-m-d H:i:s'));
181 7
                $stmt->bindValue(':version', $storedEvent->version());
182 7
                $stmt->execute();
183
            }
184 7
            $streamFinalVersion = $this->streamVersion($streamId);
185 7
            if (count($storedEvents) !== $streamFinalVersion - $expectedVersion) {
186
                throw ConcurrencyException::fromVersions(
187
                    $this->streamVersion($streamId),
188
                    $expectedVersion
189
                );
190
            }
191 7
        });
192 7
    }
193
194
    /**
195
     * @param string $streamId
196
     * @return int
197
     */
198 7 View Code Duplication
    protected function streamVersion($streamId)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
199
    {
200 7
        $stmt = $this->connection
201 7
            ->prepare('SELECT COUNT(*) FROM events WHERE stream_id = :streamId');
202 7
        $stmt->bindValue(':streamId', $streamId);
203 7
        $stmt->execute();
204 7
        return intval($stmt->fetchColumn());
205
    }
206
    /**
207
     * @param string $streamId
208
     * @return bool
209
     */
210 8 View Code Duplication
    protected function streamExists($streamId)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
211
    {
212 8
        $stmt = $this->connection
213 8
            ->prepare('SELECT COUNT(*) FROM streams WHERE id = :streamId');
214 8
        $stmt->bindValue(':streamId', $streamId);
215 8
        $stmt->execute();
216 8
        return boolval($stmt->fetchColumn());
217
    }
218
219
    /**
220
     * Initialize the Event Store
221
     */
222 12
    public function initialize()
223
    {
224 12
        $schema = new Schema();
225
226 12
        $streamTable = $schema->createTable(self::STREAMS_TABLE);
227 12
        $streamTable->addColumn('id', 'string');
228 12
        $streamTable->setPrimaryKey(array("id"));
229
230 12
        $eventsTable = $schema->createTable(self::EVENTS_TABLE);
231 12
        $eventsTable->addColumn('id', 'integer', ['autoincrement' => true]);
232 12
        $eventsTable->addColumn('stream_id', 'string');
233 12
        $eventsTable->addColumn('type', 'string');
234 12
        $eventsTable->addColumn('event', 'text');
235 12
        $eventsTable->addColumn('metadata', 'text');
236 12
        $eventsTable->addColumn('occurred_on', 'datetime');
237 12
        $eventsTable->addColumn('version', 'string');
238 12
        $eventsTable->setPrimaryKey(['id']);
239 12
        $eventsTable->addForeignKeyConstraint($streamTable, ['stream_id'], ['id']);
240
241 12
        $queries = $schema->toSql($this->connection->getDatabasePlatform());
242 12
        $this->connection->transactional(function(Connection $connection) use ($queries) {
243 12
            foreach ($queries as $query) {
244 12
                $connection->exec($query);
245
            }
246 12
        });
247 12
    }
248
249
    /**
250
     * Check if the Event Store has been initialized
251
     *
252
     * @return bool
253
     */
254
    public function initialized()
255
    {
256
        return $this->connection->getSchemaManager()->tablesExist([self::STREAMS_TABLE]);
257
    }
258
}
259