Passed
Push — master ( 65e262...351bce )
by Daniel
03:26
created

DoctrineDbalEventStore::getStreamVersionAt()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 16
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 9
CRAP Score 2

Importance

Changes 0
Metric Value
dl 0
loc 16
c 0
b 0
f 0
ccs 9
cts 9
cp 1
rs 9.4285
cc 2
eloc 9
nc 2
nop 2
crap 2
1
<?php
2
3
namespace DDDominio\EventSourcing\EventStore\Vendor;
4
5
use DDDominio\EventSourcing\Common\EventStream;
6
use DDDominio\EventSourcing\Common\EventStreamInterface;
7
use DDDominio\EventSourcing\EventStore\AbstractEventStore;
8
use DDDominio\EventSourcing\EventStore\ConcurrencyException;
9
use DDDominio\EventSourcing\EventStore\EventStreamDoesNotExistException;
10
use DDDominio\EventSourcing\EventStore\StoredEvent;
11
use Doctrine\DBAL\Connection;
12
use DDDominio\EventSourcing\Serialization\SerializerInterface;
13
use DDDominio\EventSourcing\Versioning\EventUpgrader;
14
use DDDominio\EventSourcing\Versioning\Version;
15
use Doctrine\DBAL\Schema\Schema;
16
use DDDominio\EventSourcing\EventStore\InitializableInterface;
17
18
class DoctrineDbalEventStore extends AbstractEventStore implements InitializableInterface
19
{
20
    const MAX_UNSIGNED_BIG_INT = 9223372036854775807;
21
    const STREAMS_TABLE = 'streams';
22
    const EVENTS_TABLE = 'events';
23
24
    /**
25
     * @var Connection
26
     */
27
    private $connection;
28
29
    /**
30
     * @param Connection $connection
31
     * @param SerializerInterface $serializer
32
     * @param EventUpgrader $eventUpgrader
33
     */
34 15
    public function __construct($connection, $serializer, $eventUpgrader)
35
    {
36 15
        parent::__construct($serializer, $eventUpgrader);
37 15
        $this->connection = $connection;
38 15
    }
39
40
    /**
41
     * @param string $streamId
42
     * @param int $start
43
     * @param int $count
44
     * @return EventStreamInterface
45
     */
46 4 View Code Duplication
    public function readStreamEvents($streamId, $start = 1, $count = null)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
47
    {
48 4
        if (!isset($count)) {
49 3
            $count = self::MAX_UNSIGNED_BIG_INT;
50
        }
51 4
        $stmt = $this->connection->prepare(
52
            'SELECT *
53
             FROM events
54
             WHERE stream_id = :streamId
55
             LIMIT :limit
56 4
             OFFSET :offset'
57
        );
58 4
        $stmt->bindValue(':streamId', $streamId);
59 4
        $stmt->bindValue(':offset', (int) $start - 1, \PDO::PARAM_INT);
60 4
        $stmt->bindValue(':limit', $count, \PDO::PARAM_INT);
61 4
        $stmt->execute();
62 4
        $results = $stmt->fetchAll();
63
64
        $storedEvents = array_map(function($result) {
65 3
            return new StoredEvent(
66 3
                $result['id'],
67 3
                $result['stream_id'],
68 3
                $result['type'],
69 3
                $result['event'],
70 3
                $result['metadata'],
71 3
                new \DateTimeImmutable($result['occurred_on']),
72 3
                Version::fromString($result['version'])
73
            );
74 4
        }, $results);
75
76 4
        return $this->domainEventStreamFromStoredEvents($storedEvents);
77
    }
78
79
    /**
80
     * @param string $streamId
81
     * @param \DateTimeImmutable $datetime
82
     * @param int $start
83
     * @return EventStreamInterface
84
     */
85
    public function readStreamEventsUntil($streamId, $datetime, $start = 1)
86
    {
87
        // TODO: Implement readStreamEventsUntil() method.
88
    }
89
90
    /**
91
     * @param string $streamId
92
     * @return EventStreamInterface
93
     */
94 6 View Code Duplication
    public function readFullStream($streamId)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
95
    {
96 6
        $stmt = $this->connection->prepare(
97
            'SELECT *
98
             FROM events
99 6
             WHERE stream_id = :streamId'
100
        );
101 6
        $stmt->bindValue(':streamId', $streamId);
102 6
        $stmt->execute();
103 6
        $results = $stmt->fetchAll();
104
105
        $storedEvents = array_map(function($result) {
106 5
            return new StoredEvent(
107 5
                $result['id'],
108 5
                $result['stream_id'],
109 5
                $result['type'],
110 5
                $result['event'],
111 5
                $result['metadata'],
112 5
                new \DateTimeImmutable($result['occurred_on']),
113 5
                Version::fromString($result['version'])
114
            );
115 6
        }, $results);
116
117 6
        return $this->domainEventStreamFromStoredEvents($storedEvents);
118
    }
119
120
    /**
121
     * @return EventStreamInterface[]
122
     */
123
    public function readAllStreams()
124
    {
125
        // TODO: Implement readAllStreams() method.
126
    }
127
128
    /**
129
     * @return EventStreamInterface
130
     */
131
    public function readAllEvents()
132
    {
133
        // TODO: Implement readAllEvents() method.
134
    }
135
136
    /**
137
     * @param string $type
138
     * @param Version $version
139
     * @return EventStreamInterface
140
     */
141 1 View Code Duplication
    protected function readStoredEventsOfTypeAndVersion($type, $version)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
142
    {
143 1
        $stmt = $this->connection->prepare(
144
            'SELECT *
145
             FROM events
146
             WHERE type = :type
147 1
             AND version = :version'
148
        );
149 1
        $stmt->bindValue(':type', $type);
150 1
        $stmt->bindValue(':version', $version);
151 1
        $stmt->execute();
152 1
        $results = $stmt->fetchAll();
153
154
        $storedEvents = array_map(function($result) {
155 1
            return new StoredEvent(
156 1
                $result['id'],
157 1
                $result['stream_id'],
158 1
                $result['type'],
159 1
                $result['event'],
160 1
                $result['metadata'],
161 1
                new \DateTimeImmutable($result['occurred_on']),
162 1
                Version::fromString($result['version'])
163
            );
164 1
        }, $results);
165
166 1
        return new EventStream($storedEvents);
167
    }
168
169
    /**
170
     * @param string $streamId
171
     * @param StoredEvent[] $storedEvents
172
     * @param int $expectedVersion
173
     */
174 9
    protected function appendStoredEvents($streamId, $storedEvents, $expectedVersion)
175
    {
176
        $this->connection->transactional(function() use ($streamId, $storedEvents, $expectedVersion) {
177 9 View Code Duplication
            if (!$this->streamExists($streamId)) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
178 9
                $stmt = $this->connection
179 9
                    ->prepare('INSERT INTO streams (id) VALUES (:streamId)');
180 9
                $stmt->bindValue(':streamId', $streamId);
181 9
                $stmt->execute();
182
            }
183 9 View Code Duplication
            foreach ($storedEvents as $storedEvent) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
184 9
                $stmt = $this->connection->prepare(
185
                    'INSERT INTO events (stream_id, type, event, metadata, occurred_on, version)
186 9
                 VALUES (:streamId, :type, :event, :metadata, :occurredOn, :version)'
187
                );
188 9
                $stmt->bindValue(':streamId', $streamId);
189 9
                $stmt->bindValue(':type', $storedEvent->type());
190 9
                $stmt->bindValue(':event', $storedEvent->data());
191 9
                $stmt->bindValue(':metadata', $storedEvent->metadata());
192 9
                $stmt->bindValue(':occurredOn', $storedEvent->occurredOn()->format('Y-m-d H:i:s'));
193 9
                $stmt->bindValue(':version', $storedEvent->version());
194 9
                $stmt->execute();
195
            }
196 9
            $streamFinalVersion = $this->streamVersion($streamId);
197 9
            if (count($storedEvents) !== $streamFinalVersion - $expectedVersion) {
198
                throw ConcurrencyException::fromVersions(
199
                    $this->streamVersion($streamId),
200
                    $expectedVersion
201
                );
202
            }
203 9
        });
204 9
    }
205
    /**
206
     * @param string $streamId
207
     * @return int
208
     */
209 9 View Code Duplication
    protected function streamVersion($streamId)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
210
    {
211 9
        $stmt = $this->connection
212 9
            ->prepare('SELECT COUNT(*) FROM events WHERE stream_id = :streamId');
213 9
        $stmt->bindValue(':streamId', $streamId);
214 9
        $stmt->execute();
215 9
        return intval($stmt->fetchColumn());
216
    }
217
218
    /**
219
     * @param string $streamId
220
     * @param \DateTimeImmutable $datetime
221
     * @return int
222
     * @throws EventStreamDoesNotExistException
223
     */
224 3
    public function getStreamVersionAt($streamId, \DateTimeImmutable $datetime)
225
    {
226 3
        if (!$this->streamExists($streamId)) {
227 1
            throw EventStreamDoesNotExistException::fromStreamId($streamId);
228
        }
229 2
        $stmt = $this->connection->prepare(
230
            'SELECT COUNT(*)
231
             FROM events
232
             WHERE stream_id = :streamId
233 2
             AND occurred_on <= :occurred_on'
234
        );
235 2
        $stmt->bindValue(':streamId', $streamId);
236 2
        $stmt->bindValue(':occurred_on', $datetime->format('Y-m-d H:i:s'));
237 2
        $stmt->execute();
238 2
        return intval($stmt->fetchColumn());
239
    }
240
241
    /**
242
     * @param string $streamId
243
     * @return bool
244
     */
245 11 View Code Duplication
    protected function streamExists($streamId)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
246
    {
247 11
        $stmt = $this->connection
248 11
            ->prepare('SELECT COUNT(*) FROM streams WHERE id = :streamId');
249 11
        $stmt->bindValue(':streamId', $streamId);
250 11
        $stmt->execute();
251 11
        return boolval($stmt->fetchColumn());
252
    }
253
254
    /**
255
     * Initialize the Event Store
256
     */
257 15
    public function initialize()
258
    {
259 15
        $schema = new Schema();
260
261 15
        $streamTable = $schema->createTable(self::STREAMS_TABLE);
262 15
        $streamTable->addColumn('id', 'string');
263 15
        $streamTable->setPrimaryKey(array("id"));
264
265 15
        $eventsTable = $schema->createTable(self::EVENTS_TABLE);
266 15
        $eventsTable->addColumn('id', 'integer', ['autoincrement' => true]);
267 15
        $eventsTable->addColumn('stream_id', 'string');
268 15
        $eventsTable->addColumn('type', 'string');
269 15
        $eventsTable->addColumn('event', 'text');
270 15
        $eventsTable->addColumn('metadata', 'text');
271 15
        $eventsTable->addColumn('occurred_on', 'datetime');
272 15
        $eventsTable->addColumn('version', 'string');
273 15
        $eventsTable->setPrimaryKey(['id']);
274 15
        $eventsTable->addForeignKeyConstraint($streamTable, ['stream_id'], ['id']);
275
276 15
        $queries = $schema->toSql($this->connection->getDatabasePlatform());
277 15
        $this->connection->transactional(function(Connection $connection) use ($queries) {
278 15
            foreach ($queries as $query) {
279 15
                $connection->exec($query);
280
            }
281 15
        });
282 15
    }
283
284
    /**
285
     * Check if the Event Store has been initialized
286
     *
287
     * @return bool
288
     */
289
    public function initialized()
290
    {
291
        return $this->connection->getSchemaManager()->tablesExist([self::STREAMS_TABLE]);
292
    }
293
}
294