Completed
Pull Request — master (#255)
by Luc
04:47
created

AggregateAwareDBALEventStore::load()   A

Complexity

Conditions 4
Paths 6

Size

Total Lines 20
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 20
rs 9.2
c 0
b 0
f 0
cc 4
eloc 12
nc 6
nop 1
1
<?php
2
3
namespace CultuurNet\UDB3\EventSourcing\DBAL;
4
5
use Broadway\Domain\DateTime as BroadwayDateTime;
6
use Broadway\Domain\DomainEventStream;
7
use Broadway\Domain\DomainEventStreamInterface;
8
use Broadway\Domain\DomainMessage;
9
use Broadway\EventStore\DBALEventStoreException;
10
use Broadway\EventStore\EventStoreInterface;
11
use Broadway\EventStore\EventStreamNotFoundException;
12
use Broadway\EventStore\Exception\InvalidIdentifierException;
13
use Broadway\Serializer\SerializerInterface;
14
use Doctrine\DBAL\Connection;
15
use Doctrine\DBAL\DBALException;
16
use Doctrine\DBAL\Schema\Schema;
17
use Doctrine\DBAL\Schema\Table;
18
use Doctrine\DBAL\Version;
19
use Rhumsaa\Uuid\Uuid;
20
21
class AggregateAwareDBALEventStore implements EventStoreInterface
22
{
23
    /**
24
     * @var Connection
25
     */
26
    private $connection;
27
28
    /**
29
     * @var SerializerInterface
30
     */
31
    private $payloadSerializer;
32
33
    /**
34
     * @var SerializerInterface
35
     */
36
    private $metadataSerializer;
37
38
    /**
39
     * @var null
40
     */
41
    private $loadStatement = null;
42
43
    /**
44
     * @var string
45
     */
46
    private $tableName;
47
48
    /**
49
     * @var string
50
     */
51
    private $aggregateType;
52
53
    /**
54
     * @var bool
55
     */
56
    private $useBinary;
57
58
    /**
59
     * @param Connection $connection
60
     * @param SerializerInterface $payloadSerializer
61
     * @param SerializerInterface $metadataSerializer
62
     * @param string $tableName
63
     * @param string $aggregateType
64
     * @param bool $useBinary
65
     */
66
    public function __construct(
67
        Connection $connection,
68
        SerializerInterface $payloadSerializer,
69
        SerializerInterface $metadataSerializer,
70
        $tableName,
71
        $aggregateType,
72
        $useBinary = false
73
    ) {
74
        $this->connection         = $connection;
75
        $this->payloadSerializer  = $payloadSerializer;
76
        $this->metadataSerializer = $metadataSerializer;
77
        $this->tableName          = $tableName;
78
        $this->aggregateType      = $aggregateType;
79
        $this->useBinary          = (bool) $useBinary;
80
81
        if ($this->useBinary && Version::compare('2.5.0') >= 0) {
82
            throw new \InvalidArgumentException(
83
                'The Binary storage is only available with Doctrine DBAL >= 2.5.0'
84
            );
85
        }
86
    }
87
88
    /**
89
     * {@inheritDoc}
90
     */
91
    public function load($id)
92
    {
93
        $statement = $this->prepareLoadStatement();
94
        $statement->bindValue('uuid', $this->convertIdentifierToStorageValue($id));
95
        $statement->execute();
96
97
        $events = array();
98
        while ($row = $statement->fetch()) {
99
            if ($this->useBinary) {
100
                $row['uuid'] = $this->convertStorageValueToIdentifier($row['uuid']);
101
            }
102
            $events[] = $this->deserializeEvent($row);
103
        }
104
105
        if (empty($events)) {
106
            throw new EventStreamNotFoundException(sprintf('EventStream not found for aggregate with id %s', $id));
107
        }
108
109
        return new DomainEventStream($events);
110
    }
111
112
    /**
113
     * {@inheritDoc}
114
     */
115
    public function append($id, DomainEventStreamInterface $eventStream)
116
    {
117
        // noop to ensure that an error will be thrown early if the ID
118
        // is not something that can be converted to a string. If we
119
        // let this move on without doing this DBAL will eventually
120
        // give us a hard time but the true reason for the problem
121
        // will be obfuscated.
122
        $id = (string) $id;
0 ignored issues
show
Unused Code introduced by
$id is not used, you could remove the assignment.

This check looks for variable assignements that are either overwritten by other assignments or where the variable is not used subsequently.

$myVar = 'Value';
$higher = false;

if (rand(1, 6) > 3) {
    $higher = true;
} else {
    $higher = false;
}

Both the $myVar assignment in line 1 and the $higher assignment in line 2 are dead. The first because $myVar is never used and the second because $higher is always overwritten for every possible time line.

Loading history...
123
124
        $this->connection->beginTransaction();
125
126
        try {
127
            foreach ($eventStream as $domainMessage) {
128
                $this->insertMessage($this->connection, $domainMessage);
129
            }
130
131
            $this->connection->commit();
132
        } catch (DBALException $exception) {
133
            $this->connection->rollback();
134
135
            throw DBALEventStoreException::create($exception);
136
        }
137
    }
138
139
    /**
140
     * @param Connection $connection
141
     * @param DomainMessage $domainMessage
142
     */
143
    private function insertMessage(Connection $connection, DomainMessage $domainMessage)
144
    {
145
        $data = array(
146
            'uuid'           => $this->convertIdentifierToStorageValue((string) $domainMessage->getId()),
147
            'playhead'       => $domainMessage->getPlayhead(),
148
            'metadata'       => json_encode($this->metadataSerializer->serialize($domainMessage->getMetadata())),
149
            'payload'        => json_encode($this->payloadSerializer->serialize($domainMessage->getPayload())),
150
            'recorded_on'    => $domainMessage->getRecordedOn()->toString(),
151
            'type'           => $domainMessage->getType(),
152
            'aggregate_type' => $this->aggregateType
153
        );
154
155
        $connection->insert($this->tableName, $data);
156
    }
157
158
    /**
159
     * @param Schema $schema
160
     * @return Table|null
161
     */
162
    public function configureSchema(Schema $schema)
163
    {
164
        if ($schema->hasTable($this->tableName)) {
165
            return null;
166
        }
167
168
        return $this->configureTable();
169
    }
170
171
    /**
172
     * @return mixed
173
     */
174
    public function configureTable()
175
    {
176
        $schema = new Schema();
177
178
        $uuidColumnDefinition = array(
179
            'type'   => 'guid',
180
            'params' => array(
181
                'length' => 36,
182
            ),
183
        );
184
185
        if ($this->useBinary) {
186
            $uuidColumnDefinition['type']   = 'binary';
187
            $uuidColumnDefinition['params'] = array(
188
                'length' => 16,
189
                'fixed'  => true,
190
            );
191
        }
192
193
        $table = $schema->createTable($this->tableName);
194
195
        $table->addColumn('id', 'integer', array('autoincrement' => true));
196
        $table->addColumn('uuid', $uuidColumnDefinition['type'], $uuidColumnDefinition['params']);
197
        $table->addColumn('playhead', 'integer', array('unsigned' => true));
198
        $table->addColumn('payload', 'text');
199
        $table->addColumn('metadata', 'text');
200
        $table->addColumn('recorded_on', 'string', array('length' => 32));
201
        $table->addColumn('type', 'string', array('length' => 128));
202
        $table->addColumn('aggregate_type', 'string', array('length' => 128));
203
204
        $table->setPrimaryKey(array('id'));
205
        $table->addUniqueIndex(array('uuid', 'playhead'));
206
207
        return $table;
208
    }
209
210
    /**
211
     * @return \Doctrine\DBAL\Driver\Statement|null
212
     */
213
    private function prepareLoadStatement()
214
    {
215
        if (null === $this->loadStatement) {
216
            $queryBuilder = $this->connection->createQueryBuilder();
217
218
            $queryBuilder->select(
219
                ['uuid', 'playhead', 'metadata', 'payload', 'recorded_on']
220
            )
221
                ->from($this->tableName)
222
                ->where('uuid = :uuid')
223
                ->orderBy('playhead', 'ASC');
224
225
            $this->loadStatement = $this->connection->prepare(
0 ignored issues
show
Documentation Bug introduced by
It seems like $this->connection->prepa...queryBuilder->getSQL()) of type object<Doctrine\DBAL\Driver\Statement> is incompatible with the declared type null of property $loadStatement.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
226
                $queryBuilder->getSQL()
227
            );
228
        }
229
230
        return $this->loadStatement;
231
    }
232
233
    /**
234
     * @param $row
235
     * @return DomainMessage
236
     */
237 View Code Duplication
    private function deserializeEvent($row)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
238
    {
239
        return new DomainMessage(
240
            $row['uuid'],
241
            $row['playhead'],
242
            $this->metadataSerializer->deserialize(json_decode($row['metadata'], true)),
243
            $this->payloadSerializer->deserialize(json_decode($row['payload'], true)),
244
            BroadwayDateTime::fromString($row['recorded_on'])
245
        );
246
    }
247
248
    /**
249
     * @param $id
250
     * @return mixed
251
     */
252
    private function convertIdentifierToStorageValue($id)
253
    {
254
        if ($this->useBinary) {
255
            try {
256
                return Uuid::fromString($id)->getBytes();
257
            } catch (\Exception $e) {
258
                throw new InvalidIdentifierException(
259
                    'Only valid UUIDs are allowed to by used with the binary storage mode.'
260
                );
261
            }
262
        }
263
264
        return $id;
265
    }
266
267
    /**
268
     * @param $id
269
     * @return mixed
270
     */
271
    private function convertStorageValueToIdentifier($id)
272
    {
273
        if ($this->useBinary) {
274
            try {
275
                return Uuid::fromBytes($id)->toString();
276
            } catch (\Exception $e) {
277
                throw new InvalidIdentifierException(
278
                    'Could not convert binary storage value to UUID.'
279
                );
280
            }
281
        }
282
283
        return $id;
284
    }
285
}
286