Completed
Push — master ( 351bce...59f7a1 )
by Daniel
03:11
created

DoctrineDbalEventStore::initialized()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 4
c 0
b 0
f 0
ccs 2
cts 2
cp 1
rs 10
cc 1
eloc 2
nc 1
nop 0
crap 1
1
<?php
2
3
namespace DDDominio\EventSourcing\EventStore\Vendor;
4
5
use DDDominio\EventSourcing\Common\EventStream;
6
use DDDominio\EventSourcing\Common\EventStreamInterface;
7
use DDDominio\EventSourcing\EventStore\AbstractEventStore;
8
use DDDominio\EventSourcing\EventStore\ConcurrencyException;
9
use DDDominio\EventSourcing\EventStore\EventStreamDoesNotExistException;
10
use DDDominio\EventSourcing\EventStore\StoredEvent;
11
use Doctrine\DBAL\Connection;
12
use DDDominio\EventSourcing\Serialization\SerializerInterface;
13
use DDDominio\EventSourcing\Versioning\EventUpgrader;
14
use DDDominio\EventSourcing\Versioning\Version;
15
use Doctrine\DBAL\Schema\Schema;
16
use DDDominio\EventSourcing\EventStore\InitializableInterface;
17
18
class DoctrineDbalEventStore extends AbstractEventStore implements InitializableInterface
19
{
20
    const MAX_UNSIGNED_BIG_INT = 9223372036854775807;
21
    const STREAMS_TABLE = 'streams';
22
    const EVENTS_TABLE = 'events';
23
24
    /**
25
     * @var Connection
26
     */
27
    private $connection;
28
29
    /**
30
     * @param Connection $connection
31
     * @param SerializerInterface $serializer
32
     * @param EventUpgrader $eventUpgrader
33
     */
34 17
    public function __construct($connection, $serializer, $eventUpgrader)
35
    {
36 17
        parent::__construct($serializer, $eventUpgrader);
37 17
        $this->connection = $connection;
38 17
    }
39
40
    /**
41
     * @param string $streamId
42
     * @param int $start
43
     * @param int $count
44
     * @return EventStreamInterface
45
     */
46 4 View Code Duplication
    public function readStreamEvents($streamId, $start = 1, $count = null)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
47
    {
48 4
        if (!isset($count)) {
49 3
            $count = self::MAX_UNSIGNED_BIG_INT;
50
        }
51 4
        $stmt = $this->connection->prepare(
52
            'SELECT *
53
             FROM events
54
             WHERE stream_id = :streamId
55
             LIMIT :limit
56 4
             OFFSET :offset'
57
        );
58 4
        $stmt->bindValue(':streamId', $streamId);
59 4
        $stmt->bindValue(':offset', (int) $start - 1, \PDO::PARAM_INT);
60 4
        $stmt->bindValue(':limit', $count, \PDO::PARAM_INT);
61 4
        $stmt->execute();
62 4
        $results = $stmt->fetchAll();
63
64
        $storedEvents = array_map(function($result) {
65 3
            return new StoredEvent(
66 3
                $result['id'],
67 3
                $result['stream_id'],
68 3
                $result['type'],
69 3
                $result['event'],
70 3
                $result['metadata'],
71 3
                new \DateTimeImmutable($result['occurred_on']),
72 3
                Version::fromString($result['version'])
73
            );
74 4
        }, $results);
75
76 4
        return $this->domainEventStreamFromStoredEvents($storedEvents);
77
    }
78
79
    /**
80
     * @param string $streamId
81
     * @return EventStreamInterface
82
     */
83 6 View Code Duplication
    public function readFullStream($streamId)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
84
    {
85 6
        $stmt = $this->connection->prepare(
86
            'SELECT *
87
             FROM events
88 6
             WHERE stream_id = :streamId'
89
        );
90 6
        $stmt->bindValue(':streamId', $streamId);
91 6
        $stmt->execute();
92 6
        $results = $stmt->fetchAll();
93
94
        $storedEvents = array_map(function($result) {
95 5
            return new StoredEvent(
96 5
                $result['id'],
97 5
                $result['stream_id'],
98 5
                $result['type'],
99 5
                $result['event'],
100 5
                $result['metadata'],
101 5
                new \DateTimeImmutable($result['occurred_on']),
102 5
                Version::fromString($result['version'])
103
            );
104 6
        }, $results);
105
106 6
        return $this->domainEventStreamFromStoredEvents($storedEvents);
107
    }
108
109
    /**
110
     * @return EventStreamInterface[]
111
     */
112
    public function readAllStreams()
113
    {
114
        // TODO: Implement readAllStreams() method.
115
    }
116
117
    /**
118
     * @return EventStreamInterface
119
     */
120
    public function readAllEvents()
121
    {
122
        // TODO: Implement readAllEvents() method.
123
    }
124
125
    /**
126
     * @param string $type
127
     * @param Version $version
128
     * @return EventStreamInterface
129
     */
130 1 View Code Duplication
    protected function readStoredEventsOfTypeAndVersion($type, $version)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
131
    {
132 1
        $stmt = $this->connection->prepare(
133
            'SELECT *
134
             FROM events
135
             WHERE type = :type
136 1
             AND version = :version'
137
        );
138 1
        $stmt->bindValue(':type', $type);
139 1
        $stmt->bindValue(':version', $version);
140 1
        $stmt->execute();
141 1
        $results = $stmt->fetchAll();
142
143
        $storedEvents = array_map(function($result) {
144 1
            return new StoredEvent(
145 1
                $result['id'],
146 1
                $result['stream_id'],
147 1
                $result['type'],
148 1
                $result['event'],
149 1
                $result['metadata'],
150 1
                new \DateTimeImmutable($result['occurred_on']),
151 1
                Version::fromString($result['version'])
152
            );
153 1
        }, $results);
154
155 1
        return new EventStream($storedEvents);
156
    }
157
158
    /**
159
     * @param string $streamId
160
     * @param StoredEvent[] $storedEvents
161
     * @param int $expectedVersion
162
     */
163 9
    protected function appendStoredEvents($streamId, $storedEvents, $expectedVersion)
164
    {
165
        $this->connection->transactional(function() use ($streamId, $storedEvents, $expectedVersion) {
166 9 View Code Duplication
            if (!$this->streamExists($streamId)) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
167 9
                $stmt = $this->connection
168 9
                    ->prepare('INSERT INTO streams (id) VALUES (:streamId)');
169 9
                $stmt->bindValue(':streamId', $streamId);
170 9
                $stmt->execute();
171
            }
172 9 View Code Duplication
            foreach ($storedEvents as $storedEvent) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
173 9
                $stmt = $this->connection->prepare(
174
                    'INSERT INTO events (stream_id, type, event, metadata, occurred_on, version)
175 9
                 VALUES (:streamId, :type, :event, :metadata, :occurredOn, :version)'
176
                );
177 9
                $stmt->bindValue(':streamId', $streamId);
178 9
                $stmt->bindValue(':type', $storedEvent->type());
179 9
                $stmt->bindValue(':event', $storedEvent->data());
180 9
                $stmt->bindValue(':metadata', $storedEvent->metadata());
181 9
                $stmt->bindValue(':occurredOn', $storedEvent->occurredOn()->format('Y-m-d H:i:s'));
182 9
                $stmt->bindValue(':version', $storedEvent->version());
183 9
                $stmt->execute();
184
            }
185 9
            $streamFinalVersion = $this->streamVersion($streamId);
186 9
            if (count($storedEvents) !== $streamFinalVersion - $expectedVersion) {
187
                throw ConcurrencyException::fromVersions(
188
                    $this->streamVersion($streamId),
189
                    $expectedVersion
190
                );
191
            }
192 9
        });
193 9
    }
194
    /**
195
     * @param string $streamId
196
     * @return int
197
     */
198 9 View Code Duplication
    protected function streamVersion($streamId)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
199
    {
200 9
        $stmt = $this->connection
201 9
            ->prepare('SELECT COUNT(*) FROM events WHERE stream_id = :streamId');
202 9
        $stmt->bindValue(':streamId', $streamId);
203 9
        $stmt->execute();
204 9
        return intval($stmt->fetchColumn());
205
    }
206
207
    /**
208
     * @param string $streamId
209
     * @param \DateTimeImmutable $datetime
210
     * @return int
211
     * @throws EventStreamDoesNotExistException
212
     */
213 3 View Code Duplication
    public function getStreamVersionAt($streamId, \DateTimeImmutable $datetime)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
214
    {
215 3
        if (!$this->streamExists($streamId)) {
216 1
            throw EventStreamDoesNotExistException::fromStreamId($streamId);
217
        }
218 2
        $stmt = $this->connection->prepare(
219
            'SELECT COUNT(*)
220
             FROM events
221
             WHERE stream_id = :streamId
222 2
             AND occurred_on <= :occurred_on'
223
        );
224 2
        $stmt->bindValue(':streamId', $streamId);
225 2
        $stmt->bindValue(':occurred_on', $datetime->format('Y-m-d H:i:s'));
226 2
        $stmt->execute();
227 2
        return intval($stmt->fetchColumn());
228
    }
229
230
    /**
231
     * @param string $streamId
232
     * @return bool
233
     */
234 11 View Code Duplication
    protected function streamExists($streamId)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
235
    {
236 11
        $stmt = $this->connection
237 11
            ->prepare('SELECT COUNT(*) FROM streams WHERE id = :streamId');
238 11
        $stmt->bindValue(':streamId', $streamId);
239 11
        $stmt->execute();
240 11
        return boolval($stmt->fetchColumn());
241
    }
242
243
    /**
244
     * Initialize the Event Store
245
     */
246 17
    public function initialize()
247
    {
248 17
        $schema = new Schema();
249
250 17
        $streamTable = $schema->createTable(self::STREAMS_TABLE);
251 17
        $streamTable->addColumn('id', 'string');
252 17
        $streamTable->setPrimaryKey(array("id"));
253
254 17
        $eventsTable = $schema->createTable(self::EVENTS_TABLE);
255 17
        $eventsTable->addColumn('id', 'integer', ['autoincrement' => true]);
256 17
        $eventsTable->addColumn('stream_id', 'string');
257 17
        $eventsTable->addColumn('type', 'string');
258 17
        $eventsTable->addColumn('event', 'text');
259 17
        $eventsTable->addColumn('metadata', 'text');
260 17
        $eventsTable->addColumn('occurred_on', 'datetime');
261 17
        $eventsTable->addColumn('version', 'string');
262 17
        $eventsTable->setPrimaryKey(['id']);
263 17
        $eventsTable->addForeignKeyConstraint($streamTable, ['stream_id'], ['id']);
264
265 17
        $queries = $schema->toSql($this->connection->getDatabasePlatform());
266 17
        $this->connection->transactional(function(Connection $connection) use ($queries) {
267 17
            foreach ($queries as $query) {
268 17
                $connection->exec($query);
269
            }
270 17
        });
271 17
    }
272
273
    /**
274
     * Check if the Event Store has been initialized
275
     *
276
     * @return bool
277
     */
278 2
    public function initialized()
279
    {
280 2
        return $this->connection->getSchemaManager()->tablesExist([self::STREAMS_TABLE]);
281
    }
282
}
283