Completed
Push — release-1.x ( c1f7bf...3be149 )
by Boy
09:15 queued 05:00
created

DBALEventHydrator::prepareLoadStatement()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 20
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
dl 0
loc 20
rs 9.4285
c 1
b 0
f 0
cc 2
eloc 8
nc 2
nop 0
1
<?php
2
3
/**
4
 * Copyright 2014 SURFnet bv
5
 *
6
 * Licensed under the Apache License, Version 2.0 (the "License");
7
 * you may not use this file except in compliance with the License.
8
 * You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing, software
13
 * distributed under the License is distributed on an "AS IS" BASIS,
14
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15
 * See the License for the specific language governing permissions and
16
 * limitations under the License.
17
 */
18
19
namespace Surfnet\StepupMiddleware\MiddlewareBundle\EventSourcing;
20
21
use Broadway\Domain\DateTime;
22
use Broadway\Domain\DomainEventStream;
23
use Broadway\Domain\DomainMessage;
24
use Broadway\Serializer\SerializerInterface;
25
use Doctrine\DBAL\Connection;
26
use PDO;
27
use Surfnet\StepupMiddleware\CommandHandlingBundle\SensitiveData\Forgettable;
28
use Surfnet\StepupMiddleware\CommandHandlingBundle\SensitiveData\SensitiveData;
29
30
class DBALEventHydrator
31
{
32
    /**
33
     * @var Connection
34
     */
35
    private $connection;
36
37
    /**
38
     * @var \Broadway\Serializer\SerializerInterface
39
     */
40
    private $payloadSerializer;
41
42
    /**
43
     * @var \Broadway\Serializer\SerializerInterface
44
     */
45
    private $metadataSerializer;
46
47
    /**
48
     * @var string
49
     */
50
    private $eventStreamTableName;
51
52
    /**
53
     * @var string
54
     */
55
    private $sensitiveDataTable;
56
57
    /**
58
     * @var \Doctrine\DBAL\Driver\Statement
59
     */
60
    private $loadStatement = null;
61
62
    /**
63
     * @param Connection          $connection
64
     * @param SerializerInterface $payloadSerializer
65
     * @param SerializerInterface $metadataSerializer
66
     * @param string              $eventStreamTable
67
     * @param string              $sensitiveDataTable
68
     */
69
    public function __construct(
70
        Connection $connection,
71
        SerializerInterface $payloadSerializer,
72
        SerializerInterface $metadataSerializer,
73
        $eventStreamTable,
74
        $sensitiveDataTable
75
    ) {
76
        $this->connection         = $connection;
77
        $this->payloadSerializer  = $payloadSerializer;
78
        $this->metadataSerializer = $metadataSerializer;
79
        $this->eventStreamTableName = $eventStreamTable;
80
        $this->sensitiveDataTable = $sensitiveDataTable;
81
    }
82
83
    /**
84
     * @return string
85
     * @throws \Doctrine\DBAL\DBALException
86
     */
87
    public function getCount()
88
    {
89
        $statement = $this->connection->prepare('SELECT COUNT(1) AS cnt FROM ' . $this->eventStreamTableName);
90
        $statement->execute();
91
92
        $row = $statement->fetch();
93
94
        return $row['cnt'];
95
    }
96
97
    /**
98
     * @param int $limit
99
     * @param int $offset
100
     * @return DomainEventStream
101
     */
102
    public function getFromTill($limit, $offset)
103
    {
104
        $statement = $this->prepareLoadStatement();
105
        $statement->bindValue('limit', $limit, PDO::PARAM_INT);
106
        $statement->bindValue('offset', $offset, PDO::PARAM_INT);
107
108
        $statement->execute();
109
110
        $events = array();
111
        while ($row = $statement->fetch()) {
112
            $events[] = $this->deserializeEvent($row);
113
        }
114
115
        return new DomainEventStream($events);
116
    }
117
118
    private function deserializeEvent($row)
119
    {
120
        $event = $this->payloadSerializer->deserialize(json_decode($row['payload'], true));
121
122
        if ($event instanceof Forgettable) {
123
            $event->setSensitiveData(SensitiveData::deserialize(json_decode($row['sensitive_data'], true)));
124
        }
125
126
        return new DomainMessage(
127
            $row['uuid'],
128
            $row['playhead'],
129
            $this->metadataSerializer->deserialize(json_decode($row['metadata'], true)),
130
            $event,
131
            DateTime::fromString($row['recorded_on'])
132
        );
133
    }
134
135
    private function prepareLoadStatement()
136
    {
137
        if ($this->loadStatement === null) {
138
            $query = str_replace(
139
                ['%es%', '%sd%'],
140
                [$this->eventStreamTableName, $this->sensitiveDataTable],
141
                'SELECT %es%.uuid, %es%.playhead, %es%.metadata, %es%.payload, %es%.recorded_on, %sd%.sensitive_data
142
                FROM %es%
143
                LEFT JOIN %sd%
144
                    ON %es%.uuid = %sd%.identity_id
145
                        AND %es%.playhead = %sd%.playhead
146
                ORDER BY recorded_on ASC
147
                LIMIT :limit OFFSET :offset'
148
            );
149
150
            $this->loadStatement = $this->connection->prepare($query);
151
        }
152
153
        return $this->loadStatement;
154
    }
155
}
156