Completed
Push — master ( 19228d...34c2b4 )
by David
03:14
created

DBALEventStore::eventLog()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 17
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 9
CRAP Score 2

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 17
ccs 9
cts 9
cp 1
rs 9.4286
cc 2
eloc 10
nc 2
nop 1
crap 2
1
<?php
2
3
namespace Rawkode\Eidetic\EventStore\DBALEventStore;
4
5
use Doctrine\DBAL\Connection;
6
use Doctrine\DBAL\DriverManager;
7
use Rawkode\Eidetic\EventStore\EventStore;
8
use Rawkode\Eidetic\EventStore\NoEventsFoundForKeyException;
9
use Rawkode\Eidetic\EventSourcing\EventSourcedEntity;
10
11
final class DBALEventStore extends EventStore
12
{
13
    /**
14
     * @var string
15
     */
16
    private $tableName;
17
18
    /**
19
     * @var Connection
20
     */
21
    private $connection;
22
23
    /**
24
     * @param string     $tableName
25
     * @param Connection $connection
26
     */
27 5
    private function __construct($tableName, Connection $connection)
28
    {
29 5
        $this->tableName = $tableName;
30 5
        $this->connection = $connection;
31 5
    }
32
33
    /**
34
     * @param string $tableName
35
     * @param array  $options
36
     *
37
     * @return self
38
     */
39 5
    public static function createWithOptions($tableName, array $options)
40
    {
41 5
        $connection = DriverManager::getConnection($options);
42
43 5
        return new self($tableName, $connection);
44
    }
45
46
    /**
47
     * @param EventSourcedEntity $eventSourcedEntity
48
     */
49 4
    protected function persist(EventSourcedEntity $eventSourcedEntity)
50
    {
51 4
        $eventCount = $this->countEntityEvents($eventSourcedEntity->identifier());
52
53 4
        foreach ($eventSourcedEntity->stagedEvents() as $event) {
54 4
            $this->connection->insert($this->tableName, [
55 4
                'entity_identifier' => $eventSourcedEntity->identifier(),
56 4
                'serial_number' => ++$eventCount,
57 4
                'entity_class' => get_class($eventSourcedEntity),
58 4
                'recorded_at' => new \DateTime('now', new \DateTimeZone('UTC')),
59 4
                'event_class' => get_class($event),
60 4
                'event' => $this->serialize($event),
61 4
            ], [
62 4
                \PDO::PARAM_STR,
63 4
                \PDO::PARAM_INT,
64 4
                \PDO::PARAM_STR,
65 4
                'datetime',
66 4
                \PDO::PARAM_STR,
67 4
                \PDO::PARAM_STR,
68 4
            ]);
69
70 4
            array_push($this->stagedEvents, $event);
71 4
        }
72 4
    }
73
74
    /**
75
     * @param string $entityIdentifier
76
     *
77
     * @throws NoEventsFoundForKeyException
78
     *
79
     * @return array
80
     */
81 4
    protected function eventLog($entityIdentifier)
82
    {
83 4
        if (0 === $this->countEntityEvents($entityIdentifier)) {
84 1
            throw new NoEventsFoundForKeyException();
85
        }
86
87 3
        $statement = $this->eventLogQuery($entityIdentifier)->execute();
88
89 3
        $eventLog = $statement->fetchAll();
90
91
        return array_map(function ($eventLogEntry) {
92 3
            $eventLogEntry['event'] = $this->unserialize($eventLogEntry['event']);
93 3
            $eventLogEntry['recorded_at'] = new \DateTime($eventLogEntry['recorded_at']);
94
95 3
            return $eventLogEntry;
96 3
        }, $eventLog);
97
    }
98
99
    /**
100
     */
101 4
    protected function startTransaction()
102
    {
103 4
        $this->connection->beginTransaction();
104 4
        $this->stagedEvents = [];
105 4
    }
106
107
    /**
108
     */
109
    protected function abortTransaction()
110
    {
111
        $this->connection->rollBack();
112
        $this->stagedEvents = [];
113
    }
114
115
    /**
116
     */
117 4
    protected function completeTransaction()
118
    {
119 4
        $this->connection->commit();
120
121 4
        array_map(function ($event) {
122 4
            $this->publish(self::EVENT_STORED, $event);
123 4
        }, $this->stagedEvents);
124
125 4
        $this->stagedEvents = [];
126 4
    }
127
128
    /**
129
     */
130 5
    public function createTable()
131
    {
132 5
        $schemaManager = $this->connection->getSchemaManager();
133 5
        $schema = $schemaManager->createSchema();
134
135 5
        if ($schema->hasTable($this->tableName)) {
136
            return;
137
        }
138
139 5
        $table = $schema->createTable($this->tableName);
140
141 5
        $table->addColumn('entity_identifier', 'string', ['length' => 255]);
142 5
        $table->addColumn('serial_number', 'integer');
143
144 5
        $table->setPrimaryKey(['entity_identifier', 'serial_number']);
145
146 5
        $table->addColumn('entity_class', 'string', ['length' => 255]);
147 5
        $table->addColumn('recorded_at', 'datetime');
148 5
        $table->addColumn('event_class', 'string', ['length' => 255]);
149 5
        $table->addColumn('event', 'text');
150
151 5
        $table->addIndex(['entity_class']);
152 5
        $table->addIndex(['recorded_at']);
153 5
        $table->addIndex(['event_class']);
154
155 5
        $schemaManager->createTable($table);
156 5
    }
157
158
    /**
159
     */
160 5
    public function dropTable()
161 1
    {
162 5
        $this->connection->getSchemaManager()->dropTable($this->tableName);
163 5
    }
164
165
    /**
166
     * @param string $entityIdentifier
167
     *
168
     * @return int
169
     */
170 5
    private function countEntityEvents($entityIdentifier)
171
    {
172 5
        $queryBuilder = $this->connection->createQueryBuilder();
173
174 5
        $queryBuilder->select('COUNT(entity_identifier)');
175 5
        $queryBuilder->from($this->tableName);
176 5
        $queryBuilder->where('entity_identifier = :entity_identifier');
177
178 5
        $queryBuilder->setParameter('entity_identifier', $entityIdentifier);
179
180 5
        return (int) $queryBuilder->execute()->fetchColumn(0);
181
    }
182
183
    /**
184
     * @param string $entityIdentifier
185
     *
186
     * @return \Doctrine\DBAL\Query\QueryBuilder
187
     */
188 3
    protected function eventLogQuery($entityIdentifier)
189
    {
190 3
        $queryBuilder = $this->connection->createQueryBuilder();
191
192 3
        $queryBuilder->select('*');
193 3
        $queryBuilder->from($this->tableName);
194 3
        $queryBuilder->where('entity_identifier = :entity_identifier');
195 3
        $queryBuilder->orderBy('serial_number', 'ASC');
196
197 3
        $queryBuilder->setParameter('entity_identifier', $entityIdentifier);
198
199 3
        return $queryBuilder;
200
    }
201
202
    /**
203
     * @param string $entityIdentifier
204
     *
205
     * @return array
206
     */
207
    protected function singleLogForKey($entityIdentifier)
208
    {
209
        $queryBuilder = $this->eventLogQuery($entityIdentifier);
210
211
        $queryBuilder->setMaxResults(1);
212
213
        $statement = $queryBuilder->execute();
214
215
        $results = $statement->fetchAll();
216
217
        return $results;
218
    }
219
}
220