Completed
Push — master ( 8e5e55...6fad72 )
by Iqbal
10:57
created

MongoStorage::performRollback()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 12
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 1
Metric Value
c 1
b 0
f 1
dl 0
loc 12
rs 9.4285
cc 2
eloc 8
nc 2
nop 2
1
<?php
2
/**
3
 * This file is part of the Borobudur-Event-Sourcing package.
4
 *
5
 * (c) Hexacodelabs <http://hexacodelabs.com>
6
 *
7
 * For the full copyright and license information, please view the LICENSE
8
 * file that was distributed with this source code.
9
 */
10
11
namespace Borobudur\EventSourcing\Storage\Mongo;
12
13
use Borobudur\Cqrs\IdentifierInterface;
14
use Borobudur\Cqrs\Message\DomainEventInterface;
15
use Borobudur\EventSourcing\Domain\DomainEventStream;
16
use Borobudur\EventSourcing\Domain\DomainMessage;
17
use Borobudur\EventSourcing\Domain\DomainMessageInterface;
18
use Borobudur\EventSourcing\Exception\LogicException;
19
use Borobudur\EventSourcing\Snapshot\SnapshotInterface;
20
use Borobudur\EventSourcing\Storage\StorageInterface;
21
use Borobudur\EventSourcing\Storage\TransactionalTrait;
22
use MongoDB\Driver\BulkWrite;
23
use MongoDB\Driver\Manager;
24
use MongoDB\Driver\Query;
25
26
/**
27
 * @author      Iqbal Maulana <[email protected]>
28
 * @created     8/20/15
29
 */
30
class MongoStorage implements StorageInterface
31
{
32
    use TransactionalTrait {
33
        TransactionalTrait::beginTransaction as protected parentBeginTransaction;
34
        TransactionalTrait::commit as protected parentCommit;
35
        TransactionalTrait::rollback as protected parentRollback;
36
    }
37
38
    /**
39
     * @var Manager
40
     */
41
    private $conn;
42
43
    /**
44
     * @var string
45
     */
46
    private $db;
47
48
    /**
49
     * @var SnapshotInterface
50
     */
51
    private $snapshot;
52
53
    /**
54
     * @var array
55
     */
56
    private $watched = array();
0 ignored issues
show
Unused Code introduced by
The property $watched is not used and could be removed.

This check marks private properties in classes that are never used. Those properties can be removed.

Loading history...
57
58
    /**
59
     * @var bool
60
     */
61
    private $transactional = false;
0 ignored issues
show
Unused Code introduced by
The property $transactional is not used and could be removed.

This check marks private properties in classes that are never used. Those properties can be removed.

Loading history...
62
63
    /**
64
     * @var int
65
     */
66
    private $transactionalLevel = 0;
0 ignored issues
show
Unused Code introduced by
The property $transactionalLevel is not used and could be removed.

This check marks private properties in classes that are never used. Those properties can be removed.

Loading history...
67
68
    /**
69
     * Constructor.
70
     *
71
     * @param MongoConfig       $config
72
     * @param SnapshotInterface $snapshot
73
     */
74
    public function __construct(MongoConfig $config, SnapshotInterface $snapshot = null)
75
    {
76
        $this->conn = new Manager($config->dsn);
77
        $this->db = $config->database;
78
        $this->snapshot = $snapshot;
79
    }
80
81
    /**
82
     * @param SnapshotInterface $snapshot
83
     */
84
    public function attachSnapshot(SnapshotInterface $snapshot)
85
    {
86
        $this->snapshot = $snapshot;
87
    }
88
89
    /**
90
     * @return SnapshotInterface
91
     */
92
    public function getSnapshot()
93
    {
94
        return $this->snapshot;
95
    }
96
97
    /**
98
     * {@inheritdoc}
99
     */
100
    public function findEventFromBeginning($table, IdentifierInterface $id)
101
    {
102
        $query = new Query(
103
            array('uuid' => $id->toString(), 'status' => 'OK'),
104
            array('sort' => array('version' => 1))
105
        );
106
        if (null !== $events = $this->query($query, $this->getNamespace($table))) {
107
            return $events;
108
        }
109
110
        throw new LogicException(sprintf('Aggregate root with id "%s" is not found.', (string) $id));
111
    }
112
113
    /**
114
     * {@inheritdoc}
115
     */
116
    public function findEventFromVersion($table, IdentifierInterface $id, $version)
117
    {
118
        $query = new Query(
119
            array('uuid' => $id->toString(), 'status' => 'OK', 'version' => array('$gt' => $version)),
120
            array('sort' => array('version' => 1))
121
        );
122
123
        return $this->query($query, $this->getNamespace($table));
124
    }
125
126
    /**
127
     * {@inheritdoc}
128
     */
129
    public function append($table, DomainMessageInterface $message, $status, array $exception = null)
130
    {
131
        $serialized = $this->serialize($message);
132
        $record = array_merge(
133
            $serialized,
134
            array(
135
                'status'    => $status,
136
                'exception' => $exception,
137
            )
138
        );
139
        $this->watch($table, $record);
140
141
        $bulk = new BulkWrite;
142
        $record['_id'] = $bulk->insert($record);
143
        $this->conn->executeBulkWrite(sprintf('%s.%s', $this->db, $table), $bulk);
144
    }
145
146
    /**
147
     * {@inheritdoc}
148
     */
149
    public function drop($table)
150
    {
151
        $bulk = new BulkWrite;
152
        $bulk->delete(array());
153
        $this->conn->executeBulkWrite($this->getNamespace($table), $bulk);
154
    }
155
156
    /**
157
     * {@inheritdoc}
158
     */
159
    public function beginTransaction()
160
    {
161
        if (null !== $this->snapshot) {
162
            $this->snapshot->beginTransaction();
163
        }
164
165
        $this->parentBeginTransaction();
166
    }
167
168
    /**
169
     * {@inheritdoc}
170
     */
171
    public function rollback()
172
    {
173
        if (null !== $this->snapshot) {
174
            $this->snapshot->rollback();
175
        }
176
177
        $this->parentRollback();
178
    }
179
180
    /**
181
     * {@inheritdoc}
182
     */
183
    public function commit()
184
    {
185
        if (null !== $this->snapshot) {
186
            $this->snapshot->commit();
187
        }
188
189
        $this->parentCommit();
190
    }
191
192
    /**
193
     * {@inheritdoc}
194
     */
195
    protected function performCommit($table, array $records)
0 ignored issues
show
Unused Code introduced by
The parameter $table is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
Unused Code introduced by
The parameter $records is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
196
    {
197
        // Do nothing (handle by the trait)
198
    }
199
200
    /**
201
     * {@inheritdoc}
202
     */
203
    protected function performRollback($table, array $records)
204
    {
205
        $bulk = new BulkWrite;
206
        foreach ($records as $record) {
207
            $id = $record['_id'];
208
            unset($record['_id']);
209
            $record['status'] = 'ROLLBACK';
210
            $bulk->update(array('_id' => $id), array('$set' => $record));
211
        }
212
213
        $this->conn->executeBulkWrite($this->getNamespace($table), $bulk);
214
    }
215
216
    /**
217
     * Serialize domain message.
218
     *
219
     * @param DomainMessageInterface $domain
220
     *
221
     * @return array
222
     */
223
    private function serialize(DomainMessageInterface $domain)
224
    {
225
        $payload = $domain->getPayload();
226
227
        return array(
228
            'uuid'    => (string) $domain->getId(),
229
            'version' => $domain->getVersion(),
230
            'type'    => $domain->getType(),
231
            'created' => $domain->getCreated()->format('Y-m-d\TH:i:s.uP'),
232
            'payload' => array('class' => get_class($payload), 'body' => $payload->serialize()),
233
        );
234
    }
235
236
    /**
237
     * Deserialize data to domain message.
238
     *
239
     * @param array $record
240
     *
241
     * @return DomainMessage
242
     */
243
    private function deserialize(array $record)
244
    {
245
        return new DomainMessage(
246
            $record['uuid'],
247
            $record['version'],
248
            new \DateTime($record['created']),
249
            $this->build((array) $record['payload'])
250
        );
251
    }
252
253
    /**
254
     * Build an object from serialized data
255
     *
256
     * @param array $serializedObject
257
     *
258
     * @return DomainEventInterface
259
     */
260
    private function build(array $serializedObject)
261
    {
262
        $this->assertKeyExists($serializedObject, 'class');
263
        $this->assertKeyExists($serializedObject, 'body');
264
265
        if (!in_array(
266
            'Borobudur\Serialization\DeserializableInterface',
267
            class_implements($serializedObject['class'])
268
        )
269
        ) {
270
            throw new \RuntimeException(
271
                sprintf(
272
                    'Class "%s" does not implement "\Borobudur\Serialization\DeserializableInterface"',
273
                    $serializedObject['class']
274
                )
275
            );
276
        }
277
278
        return $serializedObject['class']::{'deserialize'}((array) $serializedObject['body']);
279
    }
280
281
    /**
282
     * Assert array key
283
     *
284
     * @param array  $serializeObject
285
     * @param string $key
286
     */
287
    private function assertKeyExists(array $serializeObject, $key)
288
    {
289
        if (!array_key_exists($key, $serializeObject)) {
290
            throw new \RuntimeException(sprintf('Key "%s" should be set', $key));
291
        }
292
    }
293
294
    /**
295
     * Get namespace.
296
     *
297
     * @param string $table
298
     *
299
     * @return string
300
     */
301
    private function getNamespace($table)
302
    {
303
        return sprintf('%s.%s', $this->db, $table);
304
    }
305
306
    /**
307
     * Execute mongodb query.
308
     *
309
     * @param Query  $query
310
     * @param string $namespace
311
     *
312
     * @return DomainEventStream|null
313
     */
314
    private function query(Query $query, $namespace)
315
    {
316
        $cursor = $this->conn->executeQuery($namespace, $query);
317
        $cursor->setTypeMap(array('document' => 'array'));
318
        $results = $cursor->toArray();
319
        $events = array();
320
321
        if (count($results)) {
322
            foreach ($results as $record) {
323
                $events[] = $this->deserialize((array) $record);
324
            }
325
326
            return new DomainEventStream($events);
327
        }
328
329
        return null;
330
    }
331
}
332