MySqlEventStore::__construct()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %
Metric Value
dl 0
loc 4
rs 10
cc 1
eloc 2
nc 1
nop 1
1
<?php
2
3
namespace Domain\Eventing;
4
5
use Domain\Identity\Identity;
6
use Domain\Tools\ClassToString;
7
use Doctrine\DBAL\DriverManager;
8
use Doctrine\DBAL\Connection;
9
use Doctrine\DBAL\Schema\Schema;
10
11
/**
12
 * @author Sebastiaan Hilbers <[email protected]>
13
 */
14
class MySqlEventStore implements EventStore
15
{
16
    /**
17
     * @var Connection
18
     */
19
    private $conn;
20
21
    public function __construct(Connection $conn)
22
    {
23
        $this->conn = $conn;
24
    }
25
26
    public static function createSchema(Connection $conn)
27
    {
28
        $sm = $conn->getSchemaManager();
29
30
        $schema = static::getSchema();
31
        foreach ($schema->getTables() as $table) {
32
            $sm->createTable($table);
33
        }
34
    }
35
36
    public static function getSchema()
37
    {
38
        $schema = new Schema();
39
40
        $streamTable = $schema->createTable('aggregate_events');
41
42
        $streamTable->addColumn('id', 'integer')
43
            ->setAutoincrement(true);
44
        $streamTable->addColumn('identity', 'string')
45
            ->setNotnull(true);
46
        $streamTable->addColumn('event', 'text')
47
            ->setNotnull(true);
48
        $streamTable->addColumn('data', 'text')
49
            ->setNotnull(true);
50
        $streamTable->setPrimaryKey(array('id'));
51
52
        return $schema;
53
    }
54
55
    public function commit(UncommittedEvents $events)
56
    {
57
        $query = '
58
            INSERT INTO `aggregate_events` (`identity`, `event`, `data`)
59
            VALUES (:identity, :event, :data)';
60
61
        $pdo = $this->conn->getWrappedConnection();
62
63
        foreach ($events as $event) {
64
            try {
65
                $pdo->beginTransaction();
66
67
                $stmt = $pdo->prepare($query);
68
69
                $stmt->execute([
70
                    ':identity' => (string) $event->getAggregateIdentity(),
71
                    ':event'    => (string) ClassToString::fqcn($event),
72
                    ':data'     => (string) serialize($event)
73
                ]);
74
75
                $pdo->commit();
76
            } catch (\PDOException $ex) {
77
                $pdo->rollBack();
78
                throw $ex;
79
            }
80
        }
81
    }
82
83
    public function getAggregateHistoryFor(Identity $id, $offset = 0, $max = null)
84
    {
85
        $query = "SELECT * FROM `aggregate_events` WHERE identity = :identity";
86
87
        if ($offset > 0) {
88
            $query .= " OFFSET {$offset}";
89
        }
90
91
        if (!is_null($max)) {
92
            $query .= " LIMIT {$max}";
93
        }
94
95
        $pdo = $this->conn->getWrappedConnection();
96
97
        $stmt = $pdo->prepare($query);
98
        $stmt->execute([':identity' => (string) $id]);
99
100
        return new CommittedEvents(
101
            $id,
102
            array_map(function ($row) {
103
                return unserialize($row['data']);
104
            }, $stmt->fetchAll(\PDO::FETCH_ASSOC))
105
        );
106
    }
107
}
108