Completed
Pull Request — master (#255)
by Luc
73:37 queued 43:08
created

AggregateAwareDBALEventStore::deserializeEvent()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 10
Code Lines 7

Duplication

Lines 10
Ratio 100 %

Importance

Changes 0
Metric Value
dl 10
loc 10
rs 9.4285
c 0
b 0
f 0
cc 1
eloc 7
nc 1
nop 1
1
<?php
2
3
namespace CultuurNet\UDB3\EventSourcing\DBAL;
4
5
use Broadway\Domain\DateTime as BroadwayDateTime;
6
use Broadway\Domain\DomainEventStream;
7
use Broadway\Domain\DomainEventStreamInterface;
8
use Broadway\Domain\DomainMessage;
9
use Broadway\EventStore\DBALEventStoreException;
10
use Broadway\EventStore\EventStoreInterface;
11
use Broadway\EventStore\EventStreamNotFoundException;
12
use Broadway\EventStore\Exception\InvalidIdentifierException;
13
use Broadway\Serializer\SerializerInterface;
14
use Doctrine\DBAL\Connection;
15
use Doctrine\DBAL\DBALException;
16
use Doctrine\DBAL\Schema\Schema;
17
use Doctrine\DBAL\Schema\Table;
18
use Doctrine\DBAL\Version;
19
use Rhumsaa\Uuid\Uuid;
20
21
class AggregateAwareDBALEventStore implements EventStoreInterface
22
{
23
    /**
24
     * @var Connection
25
     */
26
    private $connection;
27
28
    /**
29
     * @var SerializerInterface
30
     */
31
    private $payloadSerializer;
32
33
    /**
34
     * @var SerializerInterface
35
     */
36
    private $metadataSerializer;
37
38
    /**
39
     * @var null
40
     */
41
    private $loadStatement = null;
42
43
    /**
44
     * @var string
45
     */
46
    private $tableName;
47
48
    /**
49
     * @var string
50
     */
51
    private $aggregateType;
52
53
    /**
54
     * @var bool
55
     */
56
    private $useBinary;
57
58
    /**
59
     * @param Connection $connection
60
     * @param SerializerInterface $payloadSerializer
61
     * @param SerializerInterface $metadataSerializer
62
     * @param string $tableName
63
     * @param string $aggregateType
64
     * @param bool $useBinary
65
     */
66
    public function __construct(
67
        Connection $connection,
68
        SerializerInterface $payloadSerializer,
69
        SerializerInterface $metadataSerializer,
70
        $tableName,
71
        $aggregateType,
72
        $useBinary = false
73
    ) {
74
        $this->connection         = $connection;
75
        $this->payloadSerializer  = $payloadSerializer;
76
        $this->metadataSerializer = $metadataSerializer;
77
        $this->tableName          = $tableName;
78
        $this->aggregateType      = $aggregateType;
79
        $this->useBinary          = (bool) $useBinary;
80
81
        if ($this->useBinary && Version::compare('2.5.0') >= 0) {
82
            throw new \InvalidArgumentException(
83
                'The Binary storage is only available with Doctrine DBAL >= 2.5.0'
84
            );
85
        }
86
    }
87
88
    /**
89
     * {@inheritDoc}
90
     */
91
    public function load($id)
92
    {
93
        $statement = $this->prepareLoadStatement();
94
        $statement->bindValue('uuid', $this->convertIdentifierToStorageValue($id));
95
        $statement->execute();
96
97
        $events = array();
98
        while ($row = $statement->fetch()) {
99
            if ($this->useBinary) {
100
                $row['uuid'] = $this->convertStorageValueToIdentifier($row['uuid']);
101
            }
102
            $events[] = $this->deserializeEvent($row);
103
        }
104
105
        if (empty($events)) {
106
            throw new EventStreamNotFoundException(sprintf('EventStream not found for aggregate with id %s', $id));
107
        }
108
109
        return new DomainEventStream($events);
110
    }
111
112
    /**
113
     * {@inheritDoc}
114
     */
115
    public function append($id, DomainEventStreamInterface $eventStream)
116
    {
117
        // noop to ensure that an error will be thrown early if the ID
118
        // is not something that can be converted to a string. If we
119
        // let this move on without doing this DBAL will eventually
120
        // give us a hard time but the true reason for the problem
121
        // will be obfuscated.
122
        $id = (string) $id;
0 ignored issues
show
Unused Code introduced by
$id is not used, you could remove the assignment.

This check looks for variable assignements that are either overwritten by other assignments or where the variable is not used subsequently.

$myVar = 'Value';
$higher = false;

if (rand(1, 6) > 3) {
    $higher = true;
} else {
    $higher = false;
}

Both the $myVar assignment in line 1 and the $higher assignment in line 2 are dead. The first because $myVar is never used and the second because $higher is always overwritten for every possible time line.

Loading history...
123
124
        $this->connection->beginTransaction();
125
126
        try {
127
            foreach ($eventStream as $domainMessage) {
128
                $this->insertMessage($this->connection, $domainMessage);
129
            }
130
131
            $this->connection->commit();
132
        } catch (DBALException $exception) {
133
            $this->connection->rollback();
134
135
            throw DBALEventStoreException::create($exception);
136
        }
137
    }
138
139
    /**
140
     * @param Connection $connection
141
     * @param DomainMessage $domainMessage
142
     */
143
    private function insertMessage(Connection $connection, DomainMessage $domainMessage)
144
    {
145
        $data = array(
146
            'uuid'           => $this->convertIdentifierToStorageValue((string) $domainMessage->getId()),
147
            'playhead'       => $domainMessage->getPlayhead(),
148
            'metadata'       => json_encode($this->metadataSerializer->serialize($domainMessage->getMetadata())),
149
            'payload'        => json_encode($this->payloadSerializer->serialize($domainMessage->getPayload())),
150
            'recorded_on'    => $domainMessage->getRecordedOn()->toString(),
151
            'type'           => $domainMessage->getType(),
152
            'aggregate_type' => $this->aggregateType
153
        );
154
155
        $connection->insert($this->tableName, $data);
156
    }
157
158
    /**
159
     * @param Schema $schema
160
     * @return Table|null
161
     */
162
    public function configureSchema(Schema $schema)
163
    {
164
        if ($schema->hasTable($this->tableName)) {
165
            return null;
166
        }
167
168
        return $this->configureTable();
169
    }
170
171
    /**
172
     * @return mixed
173
     */
174
    public function configureTable()
175
    {
176
        $schema = new Schema();
177
178
        $uuidColumnDefinition = array(
179
            'type'   => 'guid',
180
            'params' => array(
181
                'length' => 36,
182
            ),
183
        );
184
185
        if ($this->useBinary) {
186
            $uuidColumnDefinition['type']   = 'binary';
187
            $uuidColumnDefinition['params'] = array(
188
                'length' => 16,
189
                'fixed'  => true,
190
            );
191
        }
192
193
        $table = $schema->createTable($this->tableName);
194
195
        $table->addColumn('id', 'integer', array('autoincrement' => true));
196
        $table->addColumn('uuid', $uuidColumnDefinition['type'], $uuidColumnDefinition['params']);
197
        $table->addColumn('playhead', 'integer', array('unsigned' => true));
198
        $table->addColumn('payload', 'text');
199
        $table->addColumn('metadata', 'text');
200
        $table->addColumn('recorded_on', 'string', array('length' => 32));
201
        $table->addColumn('type', 'string', array('length' => 128));
202
        $table->addColumn('aggregate_type', 'string', array('length' => 128));
203
204
        $table->setPrimaryKey(array('id'));
205
        $table->addUniqueIndex(array('uuid', 'playhead'));
206
207
        return $table;
208
    }
209
210
    /**
211
     * @return \Doctrine\DBAL\Driver\Statement|null
212
     */
213
    private function prepareLoadStatement()
214
    {
215
        if (null === $this->loadStatement) {
216
            $query = 'SELECT uuid, playhead, metadata, payload, recorded_on
217
                FROM ' . $this->tableName . '
218
                WHERE uuid = :uuid
219
                ORDER BY playhead ASC';
220
            $this->loadStatement = $this->connection->prepare($query);
0 ignored issues
show
Documentation Bug introduced by
It seems like $this->connection->prepare($query) of type object<Doctrine\DBAL\Driver\Statement> is incompatible with the declared type null of property $loadStatement.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
221
        }
222
223
        return $this->loadStatement;
224
    }
225
226
    /**
227
     * @param $row
228
     * @return DomainMessage
229
     */
230 View Code Duplication
    private function deserializeEvent($row)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
231
    {
232
        return new DomainMessage(
233
            $row['uuid'],
234
            $row['playhead'],
235
            $this->metadataSerializer->deserialize(json_decode($row['metadata'], true)),
236
            $this->payloadSerializer->deserialize(json_decode($row['payload'], true)),
237
            BroadwayDateTime::fromString($row['recorded_on'])
238
        );
239
    }
240
241
    /**
242
     * @param $id
243
     * @return mixed
244
     */
245
    private function convertIdentifierToStorageValue($id)
246
    {
247
        if ($this->useBinary) {
248
            try {
249
                return Uuid::fromString($id)->getBytes();
250
            } catch (\Exception $e) {
251
                throw new InvalidIdentifierException(
252
                    'Only valid UUIDs are allowed to by used with the binary storage mode.'
253
                );
254
            }
255
        }
256
257
        return $id;
258
    }
259
260
    /**
261
     * @param $id
262
     * @return mixed
263
     */
264
    private function convertStorageValueToIdentifier($id)
265
    {
266
        if ($this->useBinary) {
267
            try {
268
                return Uuid::fromBytes($id)->toString();
269
            } catch (\Exception $e) {
270
                throw new InvalidIdentifierException(
271
                    'Could not convert binary storage value to UUID.'
272
                );
273
            }
274
        }
275
276
        return $id;
277
    }
278
}