Completed
Push — master ( 0d942d...8e5e55 )
by Iqbal
03:04
created

MongoStorage::rollback()   A

Complexity

Conditions 4
Paths 4

Size

Total Lines 20
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Importance

Changes 2
Bugs 0 Features 1
Metric Value
c 2
b 0
f 1
dl 0
loc 20
rs 9.2
cc 4
eloc 13
nc 4
nop 0
1
<?php
2
/*
3
 * This file is part of the Borobudur-Event-Sourcing package.
4
 *
5
 * (c) Hexacodelabs <http://hexacodelabs.com>
6
 *
7
 * For the full copyright and license information, please view the LICENSE
8
 * file that was distributed with this source code.
9
 */
10
11
namespace Borobudur\EventSourcing\Storage\Mongo;
12
13
use Borobudur\Cqrs\IdentifierInterface;
14
use Borobudur\Cqrs\Message\DomainEventInterface;
15
use Borobudur\EventSourcing\Domain\DomainEventStream;
16
use Borobudur\EventSourcing\Domain\DomainMessage;
17
use Borobudur\EventSourcing\Domain\DomainMessageInterface;
18
use Borobudur\EventSourcing\Exception\LogicException;
19
use Borobudur\EventSourcing\Storage\StorageInterface;
20
use MongoDB\Driver\BulkWrite;
21
use MongoDB\Driver\Manager;
22
use MongoDB\Driver\Query;
23
24
/**
25
 * @author      Iqbal Maulana <[email protected]>
26
 * @created     8/20/15
27
 */
28
class MongoStorage implements StorageInterface
29
{
30
    /**
31
     * @var Manager
32
     */
33
    private $conn;
34
35
    /**
36
     * @var string
37
     */
38
    private $db;
39
40
    /**
41
     * @var array
42
     */
43
    private $watched = array();
44
45
    /**
46
     * @var bool
47
     */
48
    private $transactional = false;
49
50
    /**
51
     * @var int
52
     */
53
    private $transactionalLevel = 0;
54
55
    /**
56
     * Constructor.
57
     *
58
     * @param MongoConfig $config
59
     */
60
    public function __construct(MongoConfig $config)
61
    {
62
        $this->conn = new Manager($config->dsn);
63
        $this->db = $config->database;
64
    }
65
66
    /**
67
     * {@inheritdoc}
68
     */
69
    public function find($table, IdentifierInterface $id)
70
    {
71
        $query = new Query(
72
            array('uuid' => $id->toString(), 'status' => 'OK'),
73
            array('sort' => array('version' => 1))
74
        );
75
        $cursor = $this->conn->executeQuery(sprintf('%s.%s', $this->db, $table), $query);
76
        $cursor->setTypeMap(array('document' => 'array'));
77
        $results = $cursor->toArray();
78
        $events = array();
79
80
        if (count($results)) {
81
            foreach ($results as $record) {
82
                $events[] = $this->deserialize((array) $record);
83
            }
84
85
            return new DomainEventStream($events);
86
        }
87
88
        throw new LogicException(sprintf('Aggregate root with id "%s" is not found.', (string) $id));
89
    }
90
91
    /**
92
     * {@inheritdoc}
93
     */
94
    public function append($table, DomainMessageInterface $message, $status, array $exception = null)
95
    {
96
        $serialized = $this->serialize($message);
97
        $record = array_merge(
98
            $serialized,
99
            array(
100
                'status'    => $status,
101
                'exception' => $exception,
102
            )
103
        );
104
105
        if (true === $this->transactional) {
106
            if (!isset($this->watched[$table])) {
107
                $this->watched[$table] = array();
108
            }
109
110
            $this->watched[$table][] = &$record;
111
        }
112
113
        $bulk = new BulkWrite;
114
        $record['_id'] = $bulk->insert($record);
115
        $this->conn->executeBulkWrite(sprintf('%s.%s', $this->db, $table), $bulk);
116
    }
117
118
    /**
119
     * {@inheritdoc}
120
     */
121
    public function drop($table)
122
    {
123
        $bulk = new BulkWrite;
124
        $bulk->delete(array());
125
        $this->conn->executeBulkWrite(sprintf('%s.%s', $this->db, $table), $bulk);
126
    }
127
128
    /**
129
     * @return boolean
130
     */
131
    public function isTransactional()
132
    {
133
        return $this->transactional;
134
    }
135
136
    /**
137
     * @return int
138
     */
139
    public function getTransactionalLevel()
140
    {
141
        return $this->transactionalLevel;
142
    }
143
144
    /**
145
     * Begin transaction
146
     */
147
    public function beginTransaction()
148
    {
149
        if (0 === $this->transactionalLevel) {
150
            $this->transactional = true;
151
        }
152
153
        $this->transactionalLevel += 1;
154
    }
155
156
    /**
157
     * Rollback transaction
158
     */
159
    public function rollback()
160
    {
161
        if (true === $this->transactional) {
162
            foreach ($this->watched as $table => $records) {
163
                $bulk = new BulkWrite;
164
                foreach ($records as $record) {
165
                    $id = $record['_id'];
166
                    unset($record['_id']);
167
                    $record['status'] = 'ROLLBACK';
168
                    $bulk->update(array('_id' => $id), array('$set' => $record));
169
                }
170
171
                $this->conn->executeBulkWrite(sprintf('%s.%s', $this->db, $table), $bulk);
172
            }
173
174
            $this->transactionalLevel = 0;
175
            $this->transactional = false;
176
            $this->watched = array();
177
        }
178
    }
179
180
    /**
181
     * Commit transaction.
182
     */
183
    public function commit()
184
    {
185
        if (true === $this->transactional) {
186
            $this->transactionalLevel -= 1;
187
            if (0 === $this->transactionalLevel) {
188
                $this->transactional = false;
189
                $this->watched = array();
190
            }
191
        }
192
    }
193
194
    /**
195
     * Serialize domain message.
196
     *
197
     * @param DomainMessageInterface $domain
198
     *
199
     * @return array
200
     */
201
    private function serialize(DomainMessageInterface $domain)
202
    {
203
        $payload = $domain->getPayload();
204
205
        return array(
206
            'uuid'    => (string) $domain->getId(),
207
            'version' => $domain->getVersion(),
208
            'type'    => $domain->getType(),
209
            'created' => $domain->getCreated()->format('Y-m-d\TH:i:s.uP'),
210
            'payload' => array('class' => get_class($payload), 'body' => $payload->serialize()),
211
        );
212
    }
213
214
    /**
215
     * Deserialize data to domain message.
216
     *
217
     * @param array $record
218
     *
219
     * @return DomainMessage
220
     */
221
    private function deserialize(array $record)
222
    {
223
        return new DomainMessage(
224
            $record['uuid'],
225
            $record['version'],
226
            new \DateTime($record['created']),
227
            $this->build((array) $record['payload'])
228
        );
229
    }
230
231
    /**
232
     * Build an object from serialized data
233
     *
234
     * @param array $serializedObject
235
     *
236
     * @return DomainEventInterface
237
     */
238
    private function build(array $serializedObject)
239
    {
240
        $this->assertKeyExists($serializedObject, 'class');
241
        $this->assertKeyExists($serializedObject, 'body');
242
243
        if (!in_array(
244
            'Borobudur\Serialization\DeserializableInterface',
245
            class_implements($serializedObject['class'])
246
        )
247
        ) {
248
            throw new \RuntimeException(
249
                sprintf(
250
                    'Class "%s" does not implement "\Borobudur\Serialization\DeserializableInterface"',
251
                    $serializedObject['class']
252
                )
253
            );
254
        }
255
256
        return $serializedObject['class']::{'deserialize'}((array) $serializedObject['body']);
257
    }
258
259
    /**
260
     * Assert array key
261
     *
262
     * @param array  $serializeObject
263
     * @param string $key
264
     */
265
    private function assertKeyExists(array $serializeObject, $key)
266
    {
267
        if (!array_key_exists($key, $serializeObject)) {
268
            throw new \RuntimeException(sprintf('Key "%s" should be set', $key));
269
        }
270
    }
271
}
272