Completed
Push — master ( ba82d8...306a50 )
by Daniel
03:11
created

DoctrineDbalEventStore::readAllEvents()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
dl 0
loc 4
c 0
b 0
f 0
ccs 0
cts 0
cp 0
rs 10
cc 1
eloc 1
nc 1
nop 0
crap 2
1
<?php
2
3
namespace DDDominio\EventSourcing\EventStore\Vendor;
4
5
use DDDominio\EventSourcing\Common\EventStream;
6
use DDDominio\EventSourcing\Common\EventStreamInterface;
7
use DDDominio\EventSourcing\EventStore\AbstractEventStore;
8
use DDDominio\EventSourcing\EventStore\ConcurrencyException;
9
use DDDominio\EventSourcing\EventStore\StoredEvent;
10
use Doctrine\DBAL\Connection;
11
use DDDominio\EventSourcing\Serialization\SerializerInterface;
12
use DDDominio\EventSourcing\Versioning\EventUpgrader;
13
use DDDominio\EventSourcing\Versioning\Version;
14
15
class DoctrineDbalEventStore extends AbstractEventStore
16
{
17
    const MAX_UNSIGNED_BIG_INT = 9223372036854775807;
18
19
    /**
20
     * @var Connection
21
     */
22
    private $connection;
23
24
    /**
25
     * @param Connection $connection
26
     * @param SerializerInterface $serializer
27
     * @param EventUpgrader $eventUpgrader
28
     */
29 12
    public function __construct($connection, $serializer, $eventUpgrader)
30
    {
31 12
        parent::__construct($serializer, $eventUpgrader);
32 12
        $this->connection = $connection;
33 12
    }
34
35
    /**
36
     * @param string $streamId
37
     * @param int $start
38
     * @param int $count
39
     * @return EventStreamInterface
40
     */
41 4 View Code Duplication
    public function readStreamEventsForward($streamId, $start = 1, $count = null)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
42
    {
43 4
        if (!isset($count)) {
44 3
            $count = self::MAX_UNSIGNED_BIG_INT;
45
        }
46 4
        $stmt = $this->connection->prepare(
47
            'SELECT *
48
             FROM events
49
             WHERE stream_id = :streamId
50
             LIMIT :limit
51 4
             OFFSET :offset'
52
        );
53 4
        $stmt->bindValue(':streamId', $streamId);
54 4
        $stmt->bindValue(':offset', (int) $start - 1, \PDO::PARAM_INT);
55 4
        $stmt->bindValue(':limit', $count, \PDO::PARAM_INT);
56 4
        $stmt->execute();
57 4
        $results = $stmt->fetchAll();
58
59
        $storedEvents = array_map(function($result) {
60 3
            return new StoredEvent(
61 3
                $result['id'],
62 3
                $result['stream_id'],
63 3
                $result['type'],
64 3
                $result['event'],
65 3
                $result['metadata'],
66 3
                new \DateTimeImmutable($result['occurred_on']),
67 3
                Version::fromString($result['version'])
68
            );
69 4
        }, $results);
70
71 4
        return $this->domainEventStreamFromStoredEvents($storedEvents);
72
    }
73
74
    /**
75
     * @param string $streamId
76
     * @return EventStreamInterface
77
     */
78 6 View Code Duplication
    public function readFullStream($streamId)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
79
    {
80 6
        $stmt = $this->connection->prepare(
81
            'SELECT *
82
             FROM events
83 6
             WHERE stream_id = :streamId'
84
        );
85 6
        $stmt->bindValue(':streamId', $streamId);
86 6
        $stmt->execute();
87 6
        $results = $stmt->fetchAll();
88
89
        $storedEvents = array_map(function($result) {
90 5
            return new StoredEvent(
91 5
                $result['id'],
92 5
                $result['stream_id'],
93 5
                $result['type'],
94 5
                $result['event'],
95 5
                $result['metadata'],
96 5
                new \DateTimeImmutable($result['occurred_on']),
97 5
                Version::fromString($result['version'])
98
            );
99 6
        }, $results);
100
101 6
        return $this->domainEventStreamFromStoredEvents($storedEvents);
102
    }
103
104
    /**
105
     * @return EventStreamInterface[]
106
     */
107
    public function readAllStreams()
108
    {
109
        // TODO: Implement readAllStreams() method.
110
    }
111
112
    /**
113
     * @return EventStreamInterface
114
     */
115
    public function readAllEvents()
116
    {
117
        // TODO: Implement readAllEvents() method.
118
    }
119
120
    /**
121
     * @param string $type
122
     * @param Version $version
123
     * @return EventStreamInterface
124
     */
125 1 View Code Duplication
    protected function readStoredEventsOfTypeAndVersion($type, $version)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
126
    {
127 1
        $stmt = $this->connection->prepare(
128
            'SELECT *
129
             FROM events
130
             WHERE type = :type
131 1
             AND version = :version'
132
        );
133 1
        $stmt->bindValue(':type', $type);
134 1
        $stmt->bindValue(':version', $version);
135 1
        $stmt->execute();
136 1
        $results = $stmt->fetchAll();
137
138
        $storedEvents = array_map(function($result) {
139 1
            return new StoredEvent(
140 1
                $result['id'],
141 1
                $result['stream_id'],
142 1
                $result['type'],
143 1
                $result['event'],
144 1
                $result['metadata'],
145 1
                new \DateTimeImmutable($result['occurred_on']),
146 1
                Version::fromString($result['version'])
147
            );
148 1
        }, $results);
149
150 1
        return new EventStream($storedEvents);
151
    }
152
153
    /**
154
     * @param string $streamId
155
     * @param StoredEvent[] $storedEvents
156
     * @param int $expectedVersion
157
     */
158
    protected function appendStoredEvents($streamId, $storedEvents, $expectedVersion)
159
    {
160 7
        $this->connection->transactional(function() use ($streamId, $storedEvents, $expectedVersion) {
161 7 View Code Duplication
            if (!$this->streamExists($streamId)) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
162 7
                $stmt = $this->connection
163 7
                    ->prepare('INSERT INTO streams (id) VALUES (:streamId)');
164 7
                $stmt->bindValue(':streamId', $streamId);
165 7
                $stmt->execute();
166
            }
167 7 View Code Duplication
            foreach ($storedEvents as $storedEvent) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
168 7
                $stmt = $this->connection->prepare(
169
                    'INSERT INTO events (stream_id, type, event, metadata, occurred_on, version)
170 7
                 VALUES (:streamId, :type, :event, :metadata, :occurredOn, :version)'
171
                );
172 7
                $stmt->bindValue(':streamId', $streamId);
173 7
                $stmt->bindValue(':type', $storedEvent->type());
174 7
                $stmt->bindValue(':event', $storedEvent->data());
175 7
                $stmt->bindValue(':metadata', $storedEvent->metadata());
176 7
                $stmt->bindValue(':occurredOn', $storedEvent->occurredOn()->format('Y-m-d H:i:s'));
177 7
                $stmt->bindValue(':version', $storedEvent->version());
178 7
                $stmt->execute();
179
            }
180 7
            $streamFinalVersion = $this->streamVersion($streamId);
181 7
            if (count($storedEvents) !== $streamFinalVersion - $expectedVersion) {
182
                throw ConcurrencyException::fromVersions(
183
                    $this->streamVersion($streamId),
184
                    $expectedVersion
185
                );
186
            }
187 7
        });
188 7
    }
189
190
    /**
191
     * @param string $streamId
192
     * @return int
193
     */
194 7 View Code Duplication
    protected function streamVersion($streamId)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
195
    {
196 7
        $stmt = $this->connection
197 7
            ->prepare('SELECT COUNT(*) FROM events WHERE stream_id = :streamId');
198 7
        $stmt->bindValue(':streamId', $streamId);
199 7
        $stmt->execute();
200 7
        return intval($stmt->fetchColumn());
201
    }
202
    /**
203
     * @param string $streamId
204
     * @return bool
205
     */
206 8 View Code Duplication
    protected function streamExists($streamId)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
207
    {
208 8
        $stmt = $this->connection
209 8
            ->prepare('SELECT COUNT(*) FROM streams WHERE id = :streamId');
210 8
        $stmt->bindValue(':streamId', $streamId);
211 8
        $stmt->execute();
212 8
        return boolval($stmt->fetchColumn());
213
    }
214
}
215