Completed
Push — master ( 8e5e55...6fad72 )
by Iqbal
10:57
created

MongoSnapshot   A

Complexity

Total Complexity 21

Size/Duplication

Total Lines 224
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 4

Importance

Changes 1
Bugs 0 Features 1
Metric Value
wmc 21
c 1
b 0
f 1
lcom 1
cbo 4
dl 0
loc 224
rs 10

15 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 6 1
A append() 0 22 3
A has() 0 4 1
A find() 0 18 2
A drop() 0 6 1
A getSnapshotPeriodical() 0 4 1
A setSnapshotPeriodical() 0 4 1
A performCommit() 0 4 1
A performRollback() 0 11 2
A isNeedToSnapshot() 0 6 3
A isEmpty() 0 4 1
A count() 0 8 1
A serialize() 0 9 1
A deserialize() 0 11 1
A getNamespace() 0 4 1
1
<?php
2
3
namespace Borobudur\EventSourcing\Snapshot;
4
5
use Borobudur\Cqrs\IdentifierInterface;
6
use Borobudur\EventSourcing\Entity\AggregateRootInterface;
7
use Borobudur\EventSourcing\Exception\LogicException;
8
use Borobudur\EventSourcing\Storage\Mongo\MongoConfig;
9
use Borobudur\EventSourcing\Storage\TransactionalTrait;
10
use DateTime;
11
use MongoDB\Driver\BulkWrite;
12
use MongoDB\Driver\Command;
13
use MongoDB\Driver\Manager;
14
use MongoDB\Driver\Query;
15
16
/**
17
 * @author      Iqbal Maulana <[email protected]>
18
 * @created     8/22/16
19
 */
20
class MongoSnapshot implements SnapshotInterface
21
{
22
    use TransactionalTrait;
23
24
    /**
25
     * @var Manager
26
     */
27
    private $conn;
28
29
    /**
30
     * @var string
31
     */
32
    private $db;
33
34
    /**
35
     * @var int
36
     */
37
    private $snapshotPeriodical;
38
39
    /**
40
     * Constructor.
41
     *
42
     * @param MongoConfig $config
43
     * @param int         $snapshotPeriodical
44
     */
45
    public function __construct(MongoConfig $config, $snapshotPeriodical = SnapshotInterface::SNAPSHOT_PERIODICAL)
46
    {
47
        $this->conn = new Manager($config->dsn);
48
        $this->db = $config->database;
49
        $this->snapshotPeriodical = $snapshotPeriodical;
50
    }
51
52
    /**
53
     * {@inheritdoc}
54
     */
55
    public function append(AggregateRootInterface $aggregate, $table)
56
    {
57
        if ($this->isNeedToSnapshot($table, $aggregate->getVersion())) {
58
            if ($this->has($aggregate->getId(), $aggregate->getVersion(), $table)) {
59
                throw new LogicException(
60
                    sprintf(
61
                        'Snapshot for "%s" with id "%s" and version "%d" already exist.',
62
                        $table,
63
                        (string) $aggregate->getId(),
64
                        $aggregate->getVersion()
65
                    )
66
                );
67
            }
68
69
            $record = $this->serialize($aggregate);
70
            $this->watch($table, $record);
71
72
            $bulk = new BulkWrite;
73
            $record['_id'] = $bulk->insert($record);
74
            $this->conn->executeBulkWrite($this->getNamespace($table), $bulk);
75
        }
76
    }
77
78
    /**
79
     * {@inheritdoc}
80
     */
81
    public function has(IdentifierInterface $id, $version, $table)
82
    {
83
        return 0 !== $this->count($table, array('uuid' => $id, 'version' => $version));
84
    }
85
86
    /**
87
     * {@inheritdoc}
88
     */
89
    public function find(IdentifierInterface $id, $class, $table)
90
    {
91
        $query = new Query(
92
            array('uuid' => (string) $id),
93
            array('sort' => array('version' => -1), 'limit' => 1)
94
        );
95
        $cursor = $this->conn->executeQuery($this->getNamespace($table), $query);
96
        $cursor->setTypeMap(array('document' => 'array'));
97
        $results = $cursor->toArray();
98
        
99
        if (count($results)) {
100
            $results = current($results);
101
102
            return $this->deserialize($class, (array) $results);
103
        }
104
105
        return null;
106
    }
107
108
    /**
109
     * {@inheritdoc}
110
     */
111
    public function drop($table)
112
    {
113
        $bulk = new BulkWrite;
114
        $bulk->delete(array());
115
        $this->conn->executeBulkWrite($this->getNamespace($table), $bulk);
116
    }
117
    
118
    /**
119
     * {@inheritdoc}
120
     */
121
    public function getSnapshotPeriodical()
122
    {
123
        return $this->snapshotPeriodical;
124
    }
125
126
    /**
127
     * {@inheritdoc}
128
     */
129
    public function setSnapshotPeriodical($snapshotPeriodical)
130
    {
131
        $this->snapshotPeriodical = $snapshotPeriodical;
132
    }
133
134
    /**
135
     * {@inheritdoc}
136
     */
137
    protected function performCommit($table, array $records)
0 ignored issues
show
Unused Code introduced by
The parameter $table is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
Unused Code introduced by
The parameter $records is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
138
    {
139
        // Do nothing (handle by the trait)
140
    }
141
142
    /**
143
     * {@inheritdoc}
144
     */
145
    protected function performRollback($table, array $records)
146
    {
147
        $bulk = new BulkWrite;
148
        foreach ($records as $record) {
149
            $id = $record['_id'];
150
            unset($record['_id']);
151
            $bulk->delete(array('_id' => $id));
152
        }
153
        
154
        $this->conn->executeBulkWrite($this->getNamespace($table), $bulk);
155
    }
156
157
    /**
158
     * @param string $table
159
     * @param int    $version
160
     *
161
     * @return bool
162
     */
163
    private function isNeedToSnapshot($table, $version)
164
    {
165
        return $version >= $this->snapshotPeriodical
166
        && (0 === $version % $this->snapshotPeriodical
167
            || $this->isEmpty($table));
168
    }
169
170
    /**
171
     * @param string $table
172
     *
173
     * @return bool
174
     */
175
    private function isEmpty($table)
176
    {
177
        return 0 === $this->count($table);
178
    }
179
180
    /**
181
     * Count collection.
182
     *
183
     * @param string $table
184
     * @param array  $query
185
     *
186
     * @return int
187
     */
188
    private function count($table, array $query = array())
189
    {
190
        $command = new Command(array('count' => $table, 'query' => $query));
191
        $cursor = $this->conn->executeCommand($this->db, $command);
192
        $results = (array) current($cursor->toArray());
193
194
        return (int) $results['n'];
195
    }
196
    
197
    /**
198
     * Serialize aggregate root with metadata.
199
     *
200
     * @param AggregateRootInterface $aggregate
201
     *
202
     * @return array
203
     */
204
    private function serialize(AggregateRootInterface $aggregate)
205
    {
206
        return [
207
            'uuid'    => (string) $aggregate->getId(),
208
            'version' => $aggregate->getVersion(),
209
            'created' => (new DateTime())->format('Y-m-d\TH:i:s.uP'),
210
            'payload' => array('class' => get_class($aggregate), 'body' => $aggregate->serialize()),
211
        ];
212
    }
213
214
    /**
215
     * Deserialize from array to aggregate root.
216
     *
217
     * @param string $class
218
     * @param array  $serialized
219
     *
220
     * @return AggregateRootInterface
221
     */
222
    private function deserialize($class, array $serialized)
223
    {
224
        return $class::{'deserialize'}(
225
            array_merge(
226
                $serialized['payload']['body'],
227
                array(
228
                    'version' => $serialized['version'],
229
                )
230
            )
231
        );
232
    }
233
234
    /**
235
     * @param string $table
236
     *
237
     * @return string
238
     */
239
    private function getNamespace($table)
240
    {
241
        return sprintf('%s.%s', $this->db, $table);
242
    }
243
}
244