Completed
Push — master ( 14e29f...efb36f )
by David
02:04
created

DBALEventStore::eventLogQuery()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 13
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 8
CRAP Score 1

Importance

Changes 1
Bugs 0 Features 1
Metric Value
c 1
b 0
f 1
dl 0
loc 13
ccs 8
cts 8
cp 1
rs 9.4286
cc 1
eloc 8
nc 1
nop 1
crap 1
1
<?php
2
3
namespace Rawkode\Eidetic\EventStore\DBALEventStore;
4
5
use Doctrine\DBAL\Connection;
6
use Doctrine\DBAL\DriverManager;
7
use Doctrine\DBAL\Schema\Table;
8
use Rawkode\Eidetic\EventStore\InvalidEventException;
9
use Rawkode\Eidetic\EventStore\EventStore;
10
use Rawkode\Eidetic\EventStore\EventPublisherMixin;
11
use Rawkode\Eidetic\EventStore\NoEventsFoundForKeyException;
12
use Rawkode\Eidetic\EventStore\Subscriber;
13
use Rawkode\Eidetic\EventStore\VerifyEventIsAClassTrait;
14
15
final class DBALEventStore implements EventStore
16
{
17
    use EventPublisherMixin;
18
    use VerifyEventIsAClassTrait;
19
20
    /**
21
     * @var string
22
     */
23
    private $tableName;
24
25
    /**
26
     * @var Connection
27
     */
28
    private $connection;
29
30
    /** @var array */
31
    private $stagedEvents = [ ];
32
33
    /**
34
     * @param string     $tableName
35
     * @param Connection $connection
36
     */
37 7
    private function __construct($tableName, Connection $connection)
38
    {
39 7
        $this->tableName = $tableName;
40 7
        $this->connection = $connection;
41 7
    }
42
43
    /**
44
     * @param string $tableName
45
     * @param array  $options
46
     *
47
     * @return self
48
     */
49 7
    public static function createWithOptions($tableName, array $options)
50
    {
51 7
        $connection = DriverManager::getConnection($options);
52
53 7
        return new self($tableName, $connection);
54
    }
55
56
    /**
57
     * @param string $key
58
     * @param array  $events
59
     */
60 6
    public function store($key, array $events)
61
    {
62
        try {
63 6
            $this->startTransaction();
64
65 6
            foreach ($events as $event) {
66 6
                $this->persistEvent($key, $event);
67 6
            }
68 6
        } catch (InvalidEventException $invalidEventException) {
69 2
            $this->abortTransaction();
70
71 2
            throw $invalidEventException;
72
        }
73
74 5
        $this->completeTransaction();
75 5
    }
76
77
    /**
78
     * @param string $key
79
     *
80
     * @return array
81
     */
82 2
    public function retrieve($key)
83
    {
84 2
        $results = $this->eventLogs($key);
85
86
        return array_map(function ($eventLog) {
87 1
            return unserialize(base64_decode($eventLog['event']));
88 1
        }, $results);
89
    }
90
91
    /**
92
     * @param string $key
93
     *
94
     * @return array
95
     */
96 1
    public function retrieveLogs($key)
97
    {
98 1
        return $this->eventLogs($key);
99
    }
100
101
    /**
102
     * @param string $key
103
     *
104
     * @return array
105
     */
106 3
    private function eventLogs($key)
107
    {
108 3
        $this->verifyEventExistsForKey($key);
109
110 2
        $statement = $this->eventLogQuery($key)->execute();
111
112 2
        $results = $statement->fetchAll();
113
114 2
        return array_map(function ($eventLog) {
115 2
            if (true === array_key_exists('recorded_at', $eventLog)) {
116 2
                $eventLog['recorded_at'] = new \DateTime($eventLog['recorded_at']);
117 2
            }
118
119 2
            return $eventLog;
120 2
        }, $results);
121
    }
122
123
    /**
124
     */
125 6
    private function startTransaction()
126
    {
127 6
        $this->connection->beginTransaction();
128 6
        $this->stagedEvents = [ ];
129 6
    }
130
131
    /**
132
     */
133 2
    private function abortTransaction()
134
    {
135 2
        $this->connection->rollBack();
136 2
        $this->stagedEvents = [ ];
137 2
    }
138
139
    /**
140
     */
141 5
    private function completeTransaction()
142
    {
143 5
        $this->connection->commit();
144
145 5
        foreach ($this->stagedEvents as $event) {
146 5
            $this->publish(self::EVENT_STORED, $event);
147 5
        }
148
149 5
        $this->stagedEvents = [ ];
150 5
    }
151
152
    /**
153
     * @param string $key
154
     * @param object $event
155
     */
156 7
    private function persistEvent($key, $event)
157
    {
158 6
        $this->verifyEventIsAClass($event);
159
160 6
        $this->connection->insert($this->tableName, [
161 7
            'key' => $key,
162 6
            'recorded_at' => new \DateTime('now', new \DateTimeZone('UTC')),
163 6
            'event_class' => get_class($event),
164 6
            'event' => base64_encode(serialize($event)),
165 6
        ], [
166 6
            \PDO::PARAM_STR,
167 6
            'datetime',
168 6
            \PDO::PARAM_STR,
169 6
            \PDO::PARAM_STR,
170 6
        ]);
171
172 7
        array_push($this->stagedEvents, $event);
173 6
    }
174
175
    /**
176
     */
177 7
    public function createTable()
178
    {
179 7
        $schemaManager = $this->connection->getSchemaManager();
180 7
        $schema = $schemaManager->createSchema();
181
182 7
        if ($schema->hasTable($this->tableName)) {
183
            return;
184
        }
185
186 7
        $table = $schema->createTable($this->tableName);
187
188 7
        $serialNumberColumn = $table->addColumn('serial_number', 'integer');
189 7
        $serialNumberColumn->setAutoincrement(true);
190 7
        $table->setPrimaryKey(['serial_number']);
191
192 7
        $table->addColumn('key', 'string', ['length' => 255]);
193 7
        $table->addColumn('recorded_at', 'datetime');
194 7
        $table->addColumn('event_class', 'string', ['length' => 255]);
195 7
        $table->addColumn('event', 'text');
196
197 7
        $table->addIndex(['key']);
198 7
        $table->addIndex(['recorded_at']);
199 7
        $table->addIndex(['event_class']);
200
201 7
        $schemaManager->createTable($table);
202 7
    }
203
204
    /**
205
     */
206 7
    public function dropTable()
207
    {
208 7
        $this->connection->getSchemaManager()->dropTable($this->tableName);
209 7
    }
210
211
    /**
212
     * @param $key
213
     * @return string
214
     */
215 1
    public function getClassForKey($key)
216
    {
217 1
        $log = $this->singleLogForKey($key);
218
219 1
        return $log[0]['event_class'];
220
    }
221
222
    /**
223
     * @param $key
224
     * @throws NoEventsFoundForKeyException
225
     */
226 3
    private function verifyEventExistsForKey($key)
227
    {
228 3
        $results = $this->singleLogForKey($key);
229
230 3
        if (count($results) === 0) {
231 1
            throw new NoEventsFoundForKeyException();
232
        }
233 2
    }
234
235
    /**
236
     * @param $key
237
     * @return \Doctrine\DBAL\Query\QueryBuilder
238
     */
239 4
    private function eventLogQuery($key)
240
    {
241 4
        $queryBuilder = $this->connection->createQueryBuilder();
242
243 4
        $queryBuilder->select('*');
244 4
        $queryBuilder->from($this->tableName);
245 4
        $queryBuilder->where('key = :key');
246 4
        $queryBuilder->orderBy('serial_number', 'ASC');
247
248 4
        $queryBuilder->setParameter('key', $key);
249
250 4
        return $queryBuilder;
251
    }
252
253
    /**
254
     * @param $key
255
     * @return array
256
     */
257 4
    private function singleLogForKey($key)
258
    {
259 4
        $queryBuilder = $this->eventLogQuery($key);
260
261 4
        $queryBuilder->setMaxResults(1);
262
263 4
        $statement = $queryBuilder->execute();
264
265 4
        $results = $statement->fetchAll();
266
267 4
        return $results;
268
    }
269
}
270