Completed
Push — master ( 34c2b4...7f9f5c )
by David
02:05
created

DBALEventStore::entityClass()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 17
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 10
CRAP Score 1

Importance

Changes 1
Bugs 0 Features 1
Metric Value
c 1
b 0
f 1
dl 0
loc 17
ccs 10
cts 10
cp 1
rs 9.4286
cc 1
eloc 10
nc 1
nop 1
crap 1
1
<?php
2
3
namespace Rawkode\Eidetic\EventStore\DBALEventStore;
4
5
use Doctrine\DBAL\Connection;
6
use Doctrine\DBAL\DriverManager;
7
use Doctrine\DBAL\Query\QueryBuilder;
8
use Rawkode\Eidetic\EventStore\EventStore;
9
use Rawkode\Eidetic\EventStore\NoEventsFoundForKeyException;
10
use Rawkode\Eidetic\EventSourcing\EventSourcedEntity;
11
12
final class DBALEventStore extends EventStore
13
{
14
    /**
15
     * @var string
16
     */
17
    private $tableName;
18
19
    /**
20
     * @var Connection
21
     */
22
    private $connection;
23
24
    /**
25
     * @param string     $tableName
26
     * @param Connection $connection
27
     */
28 10
    private function __construct($tableName, Connection $connection)
29
    {
30 10
        $this->tableName = $tableName;
31 10
        $this->connection = $connection;
32 10
    }
33
34
    /**
35
     * @param string $tableName
36
     * @param array  $options
37
     *
38
     * @return self
39
     */
40 10
    public static function createWithOptions($tableName, array $options)
41
    {
42 10
        $connection = DriverManager::getConnection($options);
43
44 10
        return new self($tableName, $connection);
45
    }
46
47
    /**
48
     * @param EventSourcedEntity $eventSourcedEntity
49
     */
50 7
    protected function persist(EventSourcedEntity $eventSourcedEntity)
51
    {
52 7
        $eventCount = $this->countEntityEvents($eventSourcedEntity->identifier());
53
54 7
        foreach ($eventSourcedEntity->stagedEvents() as $event) {
55 7
            $this->connection->insert($this->tableName, [
56 7
                'entity_identifier' => $eventSourcedEntity->identifier(),
57 7
                'serial_number' => ++$eventCount,
58 7
                'entity_class' => get_class($eventSourcedEntity),
59 7
                'recorded_at' => new \DateTime('now', new \DateTimeZone('UTC')),
60 7
                'event_class' => get_class($event),
61 7
                'event' => $this->serialize($event),
62 7
            ], [
63 7
                \PDO::PARAM_STR,
64 7
                \PDO::PARAM_INT,
65 7
                \PDO::PARAM_STR,
66 7
                'datetime',
67 7
                \PDO::PARAM_STR,
68 7
                \PDO::PARAM_STR,
69 7
            ]);
70
71 7
            array_push($this->stagedEvents, $event);
72 7
        }
73 7
    }
74
75
    /**
76
     * @param string $entityIdentifier
77
     *
78
     * @throws NoEventsFoundForKeyException
79
     *
80
     * @return array
81
     */
82 4
    protected function eventLog($entityIdentifier)
83
    {
84 4
        if (0 === $this->countEntityEvents($entityIdentifier)) {
85 1
            throw new NoEventsFoundForKeyException();
86
        }
87
88 3
        $statement = $this->eventLogQuery($entityIdentifier)->execute();
89
90 3
        $eventLog = $statement->fetchAll();
91
92
        return array_map(function ($eventLogEntry) {
93 3
            $eventLogEntry['event'] = $this->unserialize($eventLogEntry['event']);
94 3
            $eventLogEntry['recorded_at'] = new \DateTime($eventLogEntry['recorded_at']);
95
96 3
            return $eventLogEntry;
97 3
        }, $eventLog);
98
    }
99
100
    /**
101
     */
102 7
    protected function startTransaction()
103
    {
104 7
        $this->connection->beginTransaction();
105 7
        $this->stagedEvents = [];
106 7
    }
107
108
    /**
109
     */
110 1
    protected function abortTransaction()
111
    {
112 1
        $this->connection->rollBack();
113 1
        $this->stagedEvents = [];
114 1
    }
115
116
    /**
117
     */
118 7
    protected function completeTransaction()
119
    {
120 7
        $this->connection->commit();
121
122 7
        array_map(function ($event) {
123 7
            $this->publish(self::EVENT_STORED, $event);
124 7
        }, $this->stagedEvents);
125
126 7
        $this->stagedEvents = [];
127 7
    }
128
129
    /**
130
     */
131 10
    public function createTable()
132
    {
133 10
        $schemaManager = $this->connection->getSchemaManager();
134 10
        $schema = $schemaManager->createSchema();
135
136 10
        if ($schema->hasTable($this->tableName)) {
137 1
            throw new TableAlreadyExistsException();
138 1
        }
139
140 10
        $table = $schema->createTable($this->tableName);
141
142 10
        $table->addColumn('entity_identifier', 'string', ['length' => 255]);
143 10
        $table->addColumn('serial_number', 'integer');
144
145 10
        $table->setPrimaryKey(['entity_identifier', 'serial_number']);
146
147 10
        $table->addColumn('entity_class', 'string', ['length' => 255]);
148 10
        $table->addColumn('recorded_at', 'datetime');
149 10
        $table->addColumn('event_class', 'string', ['length' => 255]);
150 10
        $table->addColumn('event', 'text');
151
152 10
        $table->addIndex(['entity_class']);
153 10
        $table->addIndex(['recorded_at']);
154 10
        $table->addIndex(['event_class']);
155
156 10
        $schemaManager->createTable($table);
157 10
    }
158
159
    /**
160
     */
161 10
    public function dropTable()
162
    {
163 10
        $this->connection->getSchemaManager()->dropTable($this->tableName);
164 10
    }
165
166
    /**
167
     * @param string $entityIdentifier
168
     *
169
     * @return int
170
     */
171 9
    protected function countEntityEvents($entityIdentifier)
172 1
    {
173
        /* @var QueryBuilder $queryBuilder */
174 9
        $queryBuilder = $this->connection->createQueryBuilder();
175
176 9
        $queryBuilder->select('COUNT(entity_identifier)');
177 9
        $queryBuilder->from($this->tableName);
178 9
        $queryBuilder->where('entity_identifier = :entity_identifier');
179
180 9
        $queryBuilder->setParameter('entity_identifier', $entityIdentifier);
181
182 9
        return (int) $queryBuilder->execute()->fetchColumn(0);
183
    }
184
185
    /**
186
     * @param string $entityIdentifier
187
     *
188
     * @return \Doctrine\DBAL\Query\QueryBuilder
189
     */
190 3
    protected function eventLogQuery($entityIdentifier)
191
    {
192
        /* @var QueryBuilder $queryBuilder */
193 3
        $queryBuilder = $this->connection->createQueryBuilder();
194
195 3
        $queryBuilder->select('*');
196 3
        $queryBuilder->from($this->tableName);
197 3
        $queryBuilder->where('entity_identifier = :entity_identifier');
198 3
        $queryBuilder->orderBy('serial_number', 'ASC');
199
200 3
        $queryBuilder->setParameter('entity_identifier', $entityIdentifier);
201
202 3
        return $queryBuilder;
203
    }
204
205
    /**
206
     * @param string $entityIdentifier
207
     *
208
     * @throws NoEventsFoundForKeyException
209
     *
210
     * @return string
211
     */
212 2
    protected function entityClass($entityIdentifier)
213
    {
214 2
        $this->verifyEventExistsForKey($entityIdentifier);
215
216
        /* @var QueryBuilder $queryBuilder */
217 1
        $queryBuilder = $this->connection->createQueryBuilder();
218
219 1
        $queryBuilder->select('entity_class');
220 1
        $queryBuilder->from($this->tableName);
221 1
        $queryBuilder->where('entity_identifier = :entity_identifier');
222 1
        $queryBuilder->orderBy('serial_number', 'ASC');
223 1
        $queryBuilder->setMaxResults(1);
224
225 1
        $queryBuilder->setParameter('entity_identifier', $entityIdentifier);
226
227 1
        return $queryBuilder->execute()->fetchColumn(0);
228
    }
229
}
230