Completed
Push — master ( 78f2f3...060050 )
by David
16:07 queued 07:32
created

DBALEventStore::createWithConnection()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 4
ccs 0
cts 2
cp 0
rs 10
cc 1
eloc 2
nc 1
nop 2
crap 2
1
<?php
2
3
namespace Rawkode\Eidetic\EventStore\DBALEventStore;
4
5
use Doctrine\DBAL\Connection;
6
use Doctrine\DBAL\DriverManager;
7
use Doctrine\DBAL\Query\QueryBuilder;
8
use Rawkode\Eidetic\EventStore\EventStore;
9
use Rawkode\Eidetic\EventStore\NoEventsFoundForKeyException;
10
use Rawkode\Eidetic\EventSourcing\EventSourcedEntity;
11
12
final class DBALEventStore extends EventStore
13
{
14
    /**
15
     * @var string
16
     */
17
    private $tableName;
18
19
    /**
20
     * @var Connection
21
     */
22
    private $connection;
23
24
    /**
25
     * @param string     $tableName
26
     * @param Connection $connection
27
     */
28 10
    private function __construct($tableName, Connection $connection)
29
    {
30 10
        $this->tableName = $tableName;
31 10
        $this->connection = $connection;
32 10
    }
33
34
    /**
35
     * @param string $tableName
36
     * @param array  $options
37
     *
38
     * @return static
39
     */
40 10
    public static function createWithOptions($tableName, array $options)
41
    {
42 10
        $connection = DriverManager::getConnection($options);
43
44 10
        return new static($tableName, $connection);
45
    }
46
47
    /**
48
     * @param string $tableName
49
     * @param Connection $connection
50
     *
51
     * @return static
52
     */
53
    public static function createWithConnection($tableName, Connection $connection)
54
    {
55
        return new static($tableName, $connection);
56
    }
57
58
    /**
59
     * @param EventSourcedEntity $eventSourcedEntity
60
     */
61 7
    protected function persist(EventSourcedEntity $eventSourcedEntity, $event)
62
    {
63 7
        $eventCount = $this->countEntityEvents($eventSourcedEntity->identifier());
64
65 7
        $this->connection->insert($this->tableName, [
66 7
            'entity_identifier' => $eventSourcedEntity->identifier(),
67 7
            'serial_number' => ++$eventCount,
68 7
            'entity_class' => get_class($eventSourcedEntity),
69 7
            'recorded_at' => new \DateTime('now', new \DateTimeZone('UTC')),
70 7
            'event_class' => get_class($event),
71 7
            'event' => $this->serialize($event),
72 7
        ], [
73 7
            \PDO::PARAM_STR,
74 7
            \PDO::PARAM_INT,
75 7
            \PDO::PARAM_STR,
76 7
            'datetime',
77 7
            \PDO::PARAM_STR,
78 7
            \PDO::PARAM_STR,
79 7
        ]);
80
81 7
        array_push($this->stagedEvents, $event);
82 7
    }
83
84
    /**
85
     * @param string $entityIdentifier
86
     *
87
     * @throws NoEventsFoundForKeyException
88
     *
89
     * @return array
90
     */
91 4
    protected function eventLog($entityIdentifier)
92
    {
93 4
        if (0 === $this->countEntityEvents($entityIdentifier)) {
94 1
            throw new NoEventsFoundForKeyException();
95
        }
96
97 3
        $statement = $this->eventLogQuery($entityIdentifier)->execute();
98
99 3
        $eventLog = $statement->fetchAll();
100
101 3
        return array_map(function ($eventLogEntry) {
102 3
            $eventLogEntry['event'] = $this->unserialize($eventLogEntry['event']);
103 3
            $eventLogEntry['recorded_at'] = new \DateTime($eventLogEntry['recorded_at']);
104
105 3
            return $eventLogEntry;
106 3
        }, $eventLog);
107
    }
108
109
    /**
110
     */
111 7
    protected function startTransaction(EventSourcedEntity $eventSourcedEntity)
112
    {
113 7
        $this->connection->beginTransaction();
114
115 7
        $this->stagedEvents = [];
116 7
    }
117
118
    /**
119
     */
120 1
    protected function abortTransaction(EventSourcedEntity $eventSourcedEntity)
121
    {
122 1
        $this->connection->rollBack();
123 1
        $this->stagedEvents = [];
124 1
    }
125
126
    /**
127
     */
128 7
    protected function completeTransaction(EventSourcedEntity $eventSourcedEntity)
129
    {
130 7
        $this->connection->commit();
131
132 7
        $this->stagedEvents = [];
133 7
    }
134
135
    /**
136
     */
137 10
    public function createTable()
138 1
    {
139 10
        $schemaManager = $this->connection->getSchemaManager();
140 10
        $schema = $schemaManager->createSchema();
141
142 10
        if ($schema->hasTable($this->tableName)) {
143 1
            throw new TableAlreadyExistsException();
144
        }
145
146 10
        $table = $schema->createTable($this->tableName);
147
148 10
        $table->addColumn('entity_identifier', 'string', ['length' => 255]);
149 10
        $table->addColumn('serial_number', 'integer');
150
151 10
        $table->setPrimaryKey(['entity_identifier', 'serial_number']);
152
153 10
        $table->addColumn('entity_class', 'string', ['length' => 255]);
154 10
        $table->addColumn('recorded_at', 'datetime');
155 10
        $table->addColumn('event_class', 'string', ['length' => 255]);
156 10
        $table->addColumn('event', 'text');
157
158 10
        $table->addIndex(['entity_class']);
159 10
        $table->addIndex(['recorded_at']);
160 10
        $table->addIndex(['event_class']);
161
162 10
        $schemaManager->createTable($table);
163 10
    }
164
165
    /**
166
     */
167 10
    public function dropTable()
168
    {
169 10
        $this->connection->getSchemaManager()->dropTable($this->tableName);
170 10
    }
171
172
    /**
173
     * @param string $entityIdentifier
174
     *
175
     * @return int
176
     */
177 9
    protected function countEntityEvents($entityIdentifier)
178
    {
179
        /* @var QueryBuilder $queryBuilder */
180 9
        $queryBuilder = $this->connection->createQueryBuilder();
181
182 9
        $queryBuilder->select('COUNT(entity_identifier)');
183 9
        $queryBuilder->from($this->tableName);
184 9
        $queryBuilder->where('entity_identifier = :entity_identifier');
185
186 9
        $queryBuilder->setParameter('entity_identifier', $entityIdentifier);
187
188 9
        return (int) $queryBuilder->execute()->fetchColumn(0);
189
    }
190
191
    /**
192
     * @param string $entityIdentifier
193
     *
194
     * @return \Doctrine\DBAL\Query\QueryBuilder
195
     */
196 3
    protected function eventLogQuery($entityIdentifier)
197
    {
198
        /* @var QueryBuilder $queryBuilder */
199 3
        $queryBuilder = $this->connection->createQueryBuilder();
200
201 3
        $queryBuilder->select('*');
202 3
        $queryBuilder->from($this->tableName);
203 3
        $queryBuilder->where('entity_identifier = :entity_identifier');
204 3
        $queryBuilder->orderBy('serial_number', 'ASC');
205
206 3
        $queryBuilder->setParameter('entity_identifier', $entityIdentifier);
207
208 3
        return $queryBuilder;
209
    }
210
211
    /**
212
     * @param string $entityIdentifier
213
     *
214
     * @throws NoEventsFoundForKeyException
215
     *
216
     * @return string
217
     */
218 2
    public function entityClass($entityIdentifier)
219
    {
220 2
        $this->verifyEventExistsForKey($entityIdentifier);
221
222
        /* @var QueryBuilder $queryBuilder */
223 1
        $queryBuilder = $this->connection->createQueryBuilder();
224
225 1
        $queryBuilder->select('entity_class');
226 1
        $queryBuilder->from($this->tableName);
227 1
        $queryBuilder->where('entity_identifier = :entity_identifier');
228 1
        $queryBuilder->orderBy('serial_number', 'ASC');
229 1
        $queryBuilder->setMaxResults(1);
230
231 1
        $queryBuilder->setParameter('entity_identifier', $entityIdentifier);
232
233 1
        return $queryBuilder->execute()->fetchColumn(0);
234
    }
235
}
236