Completed
Push — master ( efb36f...19228d )
by David
02:15
created

DBALEventStore::getClassForKey()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 6
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 1

Importance

Changes 1
Bugs 0 Features 1
Metric Value
c 1
b 0
f 1
dl 0
loc 6
ccs 3
cts 3
cp 1
rs 9.4286
cc 1
eloc 3
nc 1
nop 1
crap 1
1
<?php
2
3
namespace Rawkode\Eidetic\EventStore\DBALEventStore;
4
5
use Doctrine\DBAL\Connection;
6
use Doctrine\DBAL\DriverManager;
7
use Doctrine\DBAL\Schema\Table;
8
use Rawkode\Eidetic\EventStore\InvalidEventException;
9
use Rawkode\Eidetic\EventStore\EventStore;
10
use Rawkode\Eidetic\EventStore\EventPublisherMixin;
11
use Rawkode\Eidetic\EventStore\NoEventsFoundForKeyException;
12
use Rawkode\Eidetic\EventStore\Subscriber;
13
use Rawkode\Eidetic\EventStore\VerifyEventIsAClassTrait;
14
15
final class DBALEventStore implements EventStore
16
{
17
    use EventPublisherMixin;
18
    use VerifyEventIsAClassTrait;
19
20
    /**
21
     * @var string
22
     */
23
    private $tableName;
24
25
    /**
26
     * @var Connection
27
     */
28
    private $connection;
29
30
    /** @var array */
31
    private $stagedEvents = [ ];
32
33
    /**
34
     * @param string     $tableName
35
     * @param Connection $connection
36
     */
37 6
    private function __construct($tableName, Connection $connection)
38
    {
39 6
        $this->tableName = $tableName;
40 6
        $this->connection = $connection;
41 6
    }
42
43
    /**
44
     * @param string $tableName
45
     * @param array  $options
46
     *
47
     * @return self
48
     */
49 6
    public static function createWithOptions($tableName, array $options)
50
    {
51 6
        $connection = DriverManager::getConnection($options);
52
53 6
        return new self($tableName, $connection);
54
    }
55
56
    /**
57
     * @param string $key
58
     * @param array  $events
59
     */
60 5 View Code Duplication
    public function store($key, array $events)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
61
    {
62
        try {
63 5
            $this->startTransaction();
64
65 5
            foreach ($events as $event) {
66 5
                $this->persistEvent($key, $event);
67 5
            }
68 5
        } catch (InvalidEventException $invalidEventException) {
69 2
            $this->abortTransaction();
70
71 2
            throw $invalidEventException;
72
        }
73
74 4
        $this->completeTransaction();
75 4
    }
76
77
    /**
78
     * @param string $key
79
     *
80
     * @return array
81
     */
82 2
    public function retrieve($key)
83
    {
84 2
        $results = $this->eventLogs($key);
85
86
        return array_map(function ($eventLog) {
87 1
            return unserialize(base64_decode($eventLog['event']));
88 1
        }, $results);
89
    }
90
91
    /**
92
     * @param string $key
93
     *
94
     * @return array
95
     */
96 1
    public function retrieveLogs($key)
97
    {
98 1
        return $this->eventLogs($key);
99
    }
100
101
    /**
102
     * @param string $key
103
     *
104
     * @return array
105
     */
106 3
    private function eventLogs($key)
107
    {
108 3
        $this->verifyEventExistsForKey($key);
109
110 2
        $statement = $this->eventLogQuery($key)->execute();
111
112 2
        $results = $statement->fetchAll();
113
114 2
        return array_map(function ($eventLog) {
115 2
            if (true === array_key_exists('recorded_at', $eventLog)) {
116 2
                $eventLog['recorded_at'] = new \DateTime($eventLog['recorded_at']);
117 2
            }
118
119 2
            return $eventLog;
120 2
        }, $results);
121
    }
122
123
    /**
124
     */
125 5
    private function startTransaction()
126
    {
127 5
        $this->connection->beginTransaction();
128 5
        $this->stagedEvents = [ ];
129 5
    }
130
131
    /**
132
     */
133 2
    private function abortTransaction()
134
    {
135 2
        $this->connection->rollBack();
136 2
        $this->stagedEvents = [ ];
137 2
    }
138
139
    /**
140
     */
141 4
    private function completeTransaction()
142
    {
143 4
        $this->connection->commit();
144
145 4
        foreach ($this->stagedEvents as $event) {
146 4
            $this->publish(self::EVENT_STORED, $event);
147 4
        }
148
149 4
        $this->stagedEvents = [ ];
150 4
    }
151
152
    /**
153
     * @param string $key
154
     * @param object $event
155
     */
156 6
    private function persistEvent($key, $event)
157
    {
158 5
        $this->verifyEventIsAClass($event);
159
160 5
        $this->connection->insert($this->tableName, [
161 6
            'key' => $key,
162 5
            'recorded_at' => new \DateTime('now', new \DateTimeZone('UTC')),
163 5
            'event_class' => get_class($event),
164 5
            'event' => base64_encode(serialize($event)),
165 5
        ], [
166 5
            \PDO::PARAM_STR,
167 5
            'datetime',
168 5
            \PDO::PARAM_STR,
169 5
            \PDO::PARAM_STR,
170 5
        ]);
171
172 6
        array_push($this->stagedEvents, $event);
173 5
    }
174
175
    /**
176
     */
177 6
    public function createTable()
178
    {
179 6
        $schemaManager = $this->connection->getSchemaManager();
180 6
        $schema = $schemaManager->createSchema();
181
182 6
        if ($schema->hasTable($this->tableName)) {
183
            return;
184
        }
185
186 6
        $table = $schema->createTable($this->tableName);
187
188 6
        $serialNumberColumn = $table->addColumn('serial_number', 'integer');
189 6
        $serialNumberColumn->setAutoincrement(true);
190 6
        $table->setPrimaryKey(['serial_number']);
191
192 6
        $table->addColumn('key', 'string', ['length' => 255]);
193 6
        $table->addColumn('recorded_at', 'datetime');
194 6
        $table->addColumn('event_class', 'string', ['length' => 255]);
195 6
        $table->addColumn('event', 'text');
196
197 6
        $table->addIndex(['key']);
198 6
        $table->addIndex(['recorded_at']);
199 6
        $table->addIndex(['event_class']);
200
201 6
        $schemaManager->createTable($table);
202 6
    }
203
204
    /**
205
     */
206 6
    public function dropTable()
207
    {
208 6
        $this->connection->getSchemaManager()->dropTable($this->tableName);
209 6
    }
210
211
    /**
212
     * @param $key
213
     * @throws NoEventsFoundForKeyException
214
     */
215 3
    private function verifyEventExistsForKey($key)
216
    {
217 3
        $results = $this->singleLogForKey($key);
218
219 3
        if (count($results) === 0) {
220 1
            throw new NoEventsFoundForKeyException();
221
        }
222 2
    }
223
224
    /**
225
     * @param $key
226
     * @return \Doctrine\DBAL\Query\QueryBuilder
227
     */
228 3
    private function eventLogQuery($key)
229
    {
230 3
        $queryBuilder = $this->connection->createQueryBuilder();
231
232 3
        $queryBuilder->select('*');
233 3
        $queryBuilder->from($this->tableName);
234 3
        $queryBuilder->where('key = :key');
235 3
        $queryBuilder->orderBy('serial_number', 'ASC');
236
237 3
        $queryBuilder->setParameter('key', $key);
238
239 3
        return $queryBuilder;
240
    }
241
242
    /**
243
     * @param $key
244
     * @return array
245
     */
246 3
    private function singleLogForKey($key)
247
    {
248 3
        $queryBuilder = $this->eventLogQuery($key);
249
250 3
        $queryBuilder->setMaxResults(1);
251
252 3
        $statement = $queryBuilder->execute();
253
254 3
        $results = $statement->fetchAll();
255
256 3
        return $results;
257
    }
258
}
259