Completed
Push — master ( 0157de...6fef55 )
by David
02:27
created

DBALEventStore::persist()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 22
Code Lines 16

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 19
CRAP Score 1

Importance

Changes 3
Bugs 1 Features 0
Metric Value
c 3
b 1
f 0
dl 0
loc 22
ccs 19
cts 19
cp 1
rs 9.2
cc 1
eloc 16
nc 1
nop 2
crap 1
1
<?php
2
3
namespace Rawkode\Eidetic\EventStore\DBALEventStore;
4
5
use Doctrine\DBAL\Connection;
6
use Doctrine\DBAL\DriverManager;
7
use Doctrine\DBAL\Query\QueryBuilder;
8
use Rawkode\Eidetic\EventStore\EventStore;
9
use Rawkode\Eidetic\EventStore\NoEventsFoundForKeyException;
10
use Rawkode\Eidetic\EventSourcing\EventSourcedEntity;
11
12
final class DBALEventStore extends EventStore
13
{
14
    /**
15
     * @var string
16
     */
17
    private $tableName;
18
19
    /**
20
     * @var Connection
21
     */
22
    private $connection;
23
24
    /**
25
     * @param string     $tableName
26
     * @param Connection $connection
27
     */
28 10
    private function __construct($tableName, Connection $connection)
29
    {
30 10
        $this->tableName = $tableName;
31 10
        $this->connection = $connection;
32 10
    }
33
34
    /**
35
     * @param string $tableName
36
     * @param array  $options
37
     *
38
     * @return self
39
     */
40 10
    public static function createWithOptions($tableName, array $options)
41
    {
42 10
        $connection = DriverManager::getConnection($options);
43
44 10
        return new self($tableName, $connection);
45
    }
46
47
    /**
48
     * @param EventSourcedEntity $eventSourcedEntity
49
     */
50 7
    protected function persist(EventSourcedEntity $eventSourcedEntity, $event)
51
    {
52 7
        $eventCount = $this->countEntityEvents($eventSourcedEntity->identifier());
53
54 7
        $this->connection->insert($this->tableName, [
55 7
            'entity_identifier' => $eventSourcedEntity->identifier(),
56 7
            'serial_number' => ++$eventCount,
57 7
            'entity_class' => get_class($eventSourcedEntity),
58 7
            'recorded_at' => new \DateTime('now', new \DateTimeZone('UTC')),
59 7
            'event_class' => get_class($event),
60 7
            'event' => $this->serialize($event),
61 7
        ], [
62 7
            \PDO::PARAM_STR,
63 7
            \PDO::PARAM_INT,
64 7
            \PDO::PARAM_STR,
65 7
            'datetime',
66 7
            \PDO::PARAM_STR,
67 7
            \PDO::PARAM_STR,
68 7
        ]);
69
70 7
        array_push($this->stagedEvents, $event);
71 7
    }
72
73
    /**
74
     * @param string $entityIdentifier
75
     *
76
     * @throws NoEventsFoundForKeyException
77
     *
78
     * @return array
79
     */
80 4
    protected function eventLog($entityIdentifier)
81
    {
82 4
        if (0 === $this->countEntityEvents($entityIdentifier)) {
83 1
            throw new NoEventsFoundForKeyException();
84
        }
85
86 3
        $statement = $this->eventLogQuery($entityIdentifier)->execute();
87
88 3
        $eventLog = $statement->fetchAll();
89
90 3
        return array_map(function ($eventLogEntry) {
91 3
            $eventLogEntry['event'] = $this->unserialize($eventLogEntry['event']);
92 3
            $eventLogEntry['recorded_at'] = new \DateTime($eventLogEntry['recorded_at']);
93
94 3
            return $eventLogEntry;
95 3
        }, $eventLog);
96
    }
97
98
    /**
99
     */
100 7
    protected function startTransaction(EventSourcedEntity $eventSourcedEntity)
101
    {
102 7
        $this->connection->beginTransaction();
103
104 7
        $this->stagedEvents = [];
105 7
    }
106
107
    /**
108
     */
109 1
    protected function abortTransaction(EventSourcedEntity $eventSourcedEntity)
110
    {
111 1
        $this->connection->rollBack();
112 1
        $this->stagedEvents = [];
113 1
    }
114
115
    /**
116
     */
117 7
    protected function completeTransaction(EventSourcedEntity $eventSourcedEntity)
118
    {
119 7
        $this->connection->commit();
120
121 7
        $this->stagedEvents = [];
122 7
    }
123
124
    /**
125
     */
126 10
    public function createTable()
127
    {
128 10
        $schemaManager = $this->connection->getSchemaManager();
129 10
        $schema = $schemaManager->createSchema();
130
131 10
        if ($schema->hasTable($this->tableName)) {
132 1
            throw new TableAlreadyExistsException();
133
        }
134
135 10
        $table = $schema->createTable($this->tableName);
136
137 10
        $table->addColumn('entity_identifier', 'string', ['length' => 255]);
138 10
        $table->addColumn('serial_number', 'integer');
139
140 10
        $table->setPrimaryKey(['entity_identifier', 'serial_number']);
141
142 10
        $table->addColumn('entity_class', 'string', ['length' => 255]);
143 10
        $table->addColumn('recorded_at', 'datetime');
144 10
        $table->addColumn('event_class', 'string', ['length' => 255]);
145 10
        $table->addColumn('event', 'text');
146
147 10
        $table->addIndex(['entity_class']);
148 10
        $table->addIndex(['recorded_at']);
149 10
        $table->addIndex(['event_class']);
150
151 10
        $schemaManager->createTable($table);
152 10
    }
153
154
    /**
155
     */
156 10
    public function dropTable()
157
    {
158 10
        $this->connection->getSchemaManager()->dropTable($this->tableName);
159 10
    }
160
161
    /**
162
     * @param string $entityIdentifier
163
     *
164
     * @return int
165
     */
166 10
    protected function countEntityEvents($entityIdentifier)
167
    {
168
        /* @var QueryBuilder $queryBuilder */
169 9
        $queryBuilder = $this->connection->createQueryBuilder();
170
171 9
        $queryBuilder->select('COUNT(entity_identifier)');
172 10
        $queryBuilder->from($this->tableName);
173 9
        $queryBuilder->where('entity_identifier = :entity_identifier');
174
175 9
        $queryBuilder->setParameter('entity_identifier', $entityIdentifier);
176
177 9
        return (int) $queryBuilder->execute()->fetchColumn(0);
178
    }
179
180
    /**
181
     * @param string $entityIdentifier
182
     *
183
     * @return \Doctrine\DBAL\Query\QueryBuilder
184
     */
185 3
    protected function eventLogQuery($entityIdentifier)
186
    {
187
        /* @var QueryBuilder $queryBuilder */
188 3
        $queryBuilder = $this->connection->createQueryBuilder();
189
190 3
        $queryBuilder->select('*');
191 3
        $queryBuilder->from($this->tableName);
192 3
        $queryBuilder->where('entity_identifier = :entity_identifier');
193 3
        $queryBuilder->orderBy('serial_number', 'ASC');
194
195 3
        $queryBuilder->setParameter('entity_identifier', $entityIdentifier);
196
197 3
        return $queryBuilder;
198
    }
199
200
    /**
201
     * @param string $entityIdentifier
202
     *
203
     * @throws NoEventsFoundForKeyException
204
     *
205
     * @return string
206
     */
207 2
    public function entityClass($entityIdentifier)
208
    {
209 2
        $this->verifyEventExistsForKey($entityIdentifier);
210
211
        /* @var QueryBuilder $queryBuilder */
212 1
        $queryBuilder = $this->connection->createQueryBuilder();
213
214 1
        $queryBuilder->select('entity_class');
215 1
        $queryBuilder->from($this->tableName);
216 1
        $queryBuilder->where('entity_identifier = :entity_identifier');
217 1
        $queryBuilder->orderBy('serial_number', 'ASC');
218 1
        $queryBuilder->setMaxResults(1);
219
220 1
        $queryBuilder->setParameter('entity_identifier', $entityIdentifier);
221
222 1
        return $queryBuilder->execute()->fetchColumn(0);
223
    }
224
}
225