MongoStorage::query()   A
last analyzed

Complexity

Conditions 3
Paths 3

Size

Total Lines 17
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 1
Metric Value
c 1
b 0
f 1
dl 0
loc 17
rs 9.4285
cc 3
eloc 10
nc 3
nop 2
1
<?php
2
/**
3
 * This file is part of the Borobudur-Event-Sourcing package.
4
 *
5
 * (c) Hexacodelabs <http://hexacodelabs.com>
6
 *
7
 * For the full copyright and license information, please view the LICENSE
8
 * file that was distributed with this source code.
9
 */
10
11
namespace Borobudur\EventSourcing\Storage\Mongo;
12
13
use Borobudur\Cqrs\IdentifierInterface;
14
use Borobudur\Cqrs\Message\DomainEventInterface;
15
use Borobudur\EventSourcing\Domain\DomainEventStream;
16
use Borobudur\EventSourcing\Domain\DomainMessage;
17
use Borobudur\EventSourcing\Domain\DomainMessageInterface;
18
use Borobudur\EventSourcing\Exception\LogicException;
19
use Borobudur\EventSourcing\Snapshot\SnapshotInterface;
20
use Borobudur\EventSourcing\Storage\StorageInterface;
21
use Borobudur\EventSourcing\Storage\TransactionalTrait;
22
use MongoDB\Driver\BulkWrite;
23
use MongoDB\Driver\Manager;
24
use MongoDB\Driver\Query;
25
26
/**
27
 * @author      Iqbal Maulana <[email protected]>
28
 * @created     8/20/15
29
 */
30
class MongoStorage implements StorageInterface
31
{
32
    use TransactionalTrait {
33
        TransactionalTrait::beginTransaction as protected parentBeginTransaction;
34
        TransactionalTrait::commit as protected parentCommit;
35
        TransactionalTrait::rollback as protected parentRollback;
36
    }
37
38
    /**
39
     * @var Manager
40
     */
41
    private $conn;
42
43
    /**
44
     * @var string
45
     */
46
    private $db;
47
48
    /**
49
     * @var SnapshotInterface
50
     */
51
    private $snapshot;
52
    
53
    /**
54
     * Constructor.
55
     *
56
     * @param MongoConfig       $config
57
     * @param SnapshotInterface $snapshot
58
     */
59
    public function __construct(MongoConfig $config, SnapshotInterface $snapshot = null)
60
    {
61
        $this->conn = new Manager($config->dsn);
62
        $this->db = $config->database;
63
        $this->snapshot = $snapshot;
64
    }
65
66
    /**
67
     * @param SnapshotInterface $snapshot
68
     */
69
    public function attachSnapshot(SnapshotInterface $snapshot)
70
    {
71
        $this->snapshot = $snapshot;
72
    }
73
74
    /**
75
     * @return SnapshotInterface
76
     */
77
    public function getSnapshot()
78
    {
79
        return $this->snapshot;
80
    }
81
82
    /**
83
     * {@inheritdoc}
84
     */
85
    public function findEventFromBeginning($table, IdentifierInterface $id)
86
    {
87
        $query = new Query(
88
            array('uuid' => $id->toString(), 'status' => 'OK'),
89
            array('sort' => array('version' => 1))
90
        );
91
        if (null !== $events = $this->query($query, $this->getNamespace($table))) {
92
            return $events;
93
        }
94
95
        throw new LogicException(sprintf('Aggregate root with id "%s" is not found.', (string) $id));
96
    }
97
98
    /**
99
     * {@inheritdoc}
100
     */
101
    public function findEventFromVersion($table, IdentifierInterface $id, $version)
102
    {
103
        $query = new Query(
104
            array('uuid' => $id->toString(), 'status' => 'OK', 'version' => array('$gt' => $version)),
105
            array('sort' => array('version' => 1))
106
        );
107
108
        return $this->query($query, $this->getNamespace($table));
109
    }
110
111
    /**
112
     * {@inheritdoc}
113
     */
114
    public function append($table, DomainMessageInterface $message, $status, array $exception = null)
115
    {
116
        $serialized = $this->serialize($message);
117
        $record = array_merge(
118
            $serialized,
119
            array(
120
                'status'    => $status,
121
                'exception' => $exception,
122
            )
123
        );
124
        $this->watch($table, $record);
125
126
        $bulk = new BulkWrite;
127
        $record['_id'] = $bulk->insert($record);
128
        $this->conn->executeBulkWrite(sprintf('%s.%s', $this->db, $table), $bulk);
129
    }
130
131
    /**
132
     * {@inheritdoc}
133
     */
134
    public function drop($table)
135
    {
136
        $bulk = new BulkWrite;
137
        $bulk->delete(array());
138
        $this->conn->executeBulkWrite($this->getNamespace($table), $bulk);
139
    }
140
141
    /**
142
     * {@inheritdoc}
143
     */
144
    public function beginTransaction()
145
    {
146
        if (null !== $this->snapshot) {
147
            $this->snapshot->beginTransaction();
148
        }
149
150
        $this->parentBeginTransaction();
151
    }
152
153
    /**
154
     * {@inheritdoc}
155
     */
156
    public function rollback()
157
    {
158
        if (null !== $this->snapshot) {
159
            $this->snapshot->rollback();
160
        }
161
162
        $this->parentRollback();
163
    }
164
165
    /**
166
     * {@inheritdoc}
167
     */
168
    public function commit()
169
    {
170
        if (null !== $this->snapshot) {
171
            $this->snapshot->commit();
172
        }
173
174
        $this->parentCommit();
175
    }
176
177
    /**
178
     * {@inheritdoc}
179
     */
180
    protected function performCommit($table, array $records)
0 ignored issues
show
Unused Code introduced by
The parameter $table is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
Unused Code introduced by
The parameter $records is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
181
    {
182
        // Do nothing (handle by the trait)
183
    }
184
185
    /**
186
     * {@inheritdoc}
187
     */
188
    protected function performRollback($table, array $records)
189
    {
190
        $bulk = new BulkWrite;
191
        foreach ($records as $record) {
192
            $id = $record['_id'];
193
            unset($record['_id']);
194
            $record['status'] = 'ROLLBACK';
195
            $bulk->update(array('_id' => $id), array('$set' => $record));
196
        }
197
198
        $this->conn->executeBulkWrite($this->getNamespace($table), $bulk);
199
    }
200
201
    /**
202
     * Serialize domain message.
203
     *
204
     * @param DomainMessageInterface $domain
205
     *
206
     * @return array
207
     */
208
    private function serialize(DomainMessageInterface $domain)
209
    {
210
        $payload = $domain->getPayload();
211
212
        return array(
213
            'uuid'    => (string) $domain->getId(),
214
            'version' => $domain->getVersion(),
215
            'type'    => $domain->getType(),
216
            'created' => $domain->getCreated()->format('Y-m-d\TH:i:s.uP'),
217
            'payload' => array('class' => get_class($payload), 'body' => $payload->serialize()),
218
        );
219
    }
220
221
    /**
222
     * Deserialize data to domain message.
223
     *
224
     * @param array $record
225
     *
226
     * @return DomainMessage
227
     */
228
    private function deserialize(array $record)
229
    {
230
        return new DomainMessage(
231
            $record['uuid'],
232
            $record['version'],
233
            new \DateTime($record['created']),
234
            $this->build((array) $record['payload'])
235
        );
236
    }
237
238
    /**
239
     * Build an object from serialized data
240
     *
241
     * @param array $serializedObject
242
     *
243
     * @return DomainEventInterface
244
     */
245
    private function build(array $serializedObject)
246
    {
247
        $this->assertKeyExists($serializedObject, 'class');
248
        $this->assertKeyExists($serializedObject, 'body');
249
250
        if (!in_array(
251
            'Borobudur\Serialization\DeserializableInterface',
252
            class_implements($serializedObject['class'])
253
        )
254
        ) {
255
            throw new \RuntimeException(
256
                sprintf(
257
                    'Class "%s" does not implement "\Borobudur\Serialization\DeserializableInterface"',
258
                    $serializedObject['class']
259
                )
260
            );
261
        }
262
263
        return $serializedObject['class']::{'deserialize'}((array) $serializedObject['body']);
264
    }
265
266
    /**
267
     * Assert array key
268
     *
269
     * @param array  $serializeObject
270
     * @param string $key
271
     */
272
    private function assertKeyExists(array $serializeObject, $key)
273
    {
274
        if (!array_key_exists($key, $serializeObject)) {
275
            throw new \RuntimeException(sprintf('Key "%s" should be set', $key));
276
        }
277
    }
278
279
    /**
280
     * Get namespace.
281
     *
282
     * @param string $table
283
     *
284
     * @return string
285
     */
286
    private function getNamespace($table)
287
    {
288
        return sprintf('%s.%s', $this->db, $table);
289
    }
290
291
    /**
292
     * Execute mongodb query.
293
     *
294
     * @param Query  $query
295
     * @param string $namespace
296
     *
297
     * @return DomainEventStream|null
298
     */
299
    private function query(Query $query, $namespace)
300
    {
301
        $cursor = $this->conn->executeQuery($namespace, $query);
302
        $cursor->setTypeMap(array('document' => 'array'));
303
        $results = $cursor->toArray();
304
        $events = array();
305
306
        if (count($results)) {
307
            foreach ($results as $record) {
308
                $events[] = $this->deserialize((array) $record);
309
            }
310
311
            return new DomainEventStream($events);
312
        }
313
314
        return null;
315
    }
316
}
317