Completed
Push — php7 ( 0d7c93 )
by Alex
03:34
created

OutboxPersister::__construct()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 9
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 5
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 9
ccs 5
cts 5
cp 1
rs 9.6666
c 0
b 0
f 0
cc 1
eloc 7
nc 1
nop 3
crap 1
1
<?php
2
namespace PSB\Persistence\Doctrine2\Outbox;
3
4
5
use Doctrine\DBAL\Connection;
6
use Doctrine\DBAL\ConnectionException;
7
use PSB\Core\Exception\CriticalErrorException;
8
9
class OutboxPersister
10
{
11
    /**
12
     * @var Connection
13
     */
14
    private $connection;
15
16
    /**
17
     * @var string
18
     */
19
    private $endpointsTableName;
20
21
    /**
22
     * @var string
23
     *
24
     */
25
    private $messagesTableName;
26
27
    /**
28
     * @param Connection $connection
29
     * @param string     $messagesTableName
30
     * @param string     $endpointsTableName
31
     */
32 15
    public function __construct(
33
        Connection $connection,
34
        $messagesTableName,
35
        $endpointsTableName
36
    ) {
37 15
        $this->connection = $connection;
38 15
        $this->endpointsTableName = $endpointsTableName;
39 15
        $this->messagesTableName = $messagesTableName;
40 15
    }
41
42
    /**
43
     * @param int    $endpointId
44
     * @param string $messageId
45
     *
46
     * @return array
47
     * @throws \Throwable
48
     */
49 7
    public function get($endpointId, $messageId)
50
    {
51
        try {
52 7
            $result = $this->connection->executeQuery(
53 7
                "SELECT * FROM {$this->messagesTableName} WHERE endpoint_id = ? AND message_id = ?",
54 7
                [$endpointId, hex2bin($this->stripDashes($messageId))]
55 6
            )->fetch(\PDO::FETCH_ASSOC);
56 1
        } catch (\Throwable $t) {
57 1
            throw $this->attemptToReconnectPresumedLostConnection($t);
58
        }
59
60 6
        if (!$result) {
61 2
            return null;
62
        }
63
64 4
        unset($result['id']);
65 4
        unset($result['dispatched_at']);
66 4
        unset($result['endpoint_id']);
67 4
        $result['message_id'] = bin2hex($result['message_id']);
68
69 4
        return $result;
70
    }
71
72
    /**
73
     * @param int   $endpointId
74
     * @param array $outboxRecord
75
     *
76
     * @throws \Exception
77
     */
78 6
    public function store($endpointId, array $outboxRecord)
79
    {
80 6
        $outboxRecord['message_id'] = hex2bin($this->stripDashes($outboxRecord['message_id']));
81 6
        $outboxRecord['endpoint_id'] = $endpointId;
82 6
        $outboxRecord['is_dispatched'] = 0;
83
84 6
        $this->connection->transactional(
85 6
            function (Connection $connection) use ($outboxRecord) {
86 6
                $connection->insert($this->messagesTableName, $outboxRecord);
87 6
            }
88
        );
89 6
    }
90
91
    /**
92
     * @param int    $endpointId
93
     * @param string $messageId
94
     *
95
     * @throws \Throwable
96
     */
97 4
    public function markAsDispatched($endpointId, $messageId)
98
    {
99
        try {
100 4
            $this->connection->executeUpdate(
101 4
                "UPDATE {$this->messagesTableName}
102
                     SET is_dispatched = 1, dispatched_at = ?, transport_operations = ''
103
                     WHERE endpoint_id = ? AND message_id = ? AND is_dispatched = 0",
104
                [
105 4
                    (new \DateTime('now', new \DateTimeZone('UTC')))->format('Y-m-d H:i:s'),
106 4
                    $endpointId,
107 4
                    hex2bin($this->stripDashes($messageId))
108
                ]
109
            );
110 1
        } catch (\Throwable $t) {
111 1
            throw $this->attemptToReconnectPresumedLostConnection($t);
112
        }
113 3
    }
114
115
    /**
116
     * Initiates the transaction
117
     * Attempts to reconnect once if disconnected.
118
     *
119
     * @return void
120
     * @throws \Throwable
121
     */
122 1
    public function beginTransaction()
123
    {
124
        try {
125 1
            $this->connection->beginTransaction();
126 1
        } catch (\Throwable $t) {
127 1
            throw $this->attemptToReconnectPresumedLostConnection($t);
128
        }
129
    }
130
131
    /**
132
     * Commits the transaction
133
     *
134
     * Does not attempt to reconnect if disconnected because the transaction would be broken anyway.
135
     * Reconnection should be done by rollBack.
136
     *
137
     * @return void
138
     * @throws ConnectionException
139
     */
140
    public function commit()
141
    {
142
        $this->connection->commit();
143
    }
144
145
    /**
146
     * Rolls back the transaction.
147
     * It makes sure that the connection is in the correct state regardless of what happened before.
148
     * Correct state means that connection is not rollback only and does not have a transaction nesting level > 0
149
     *
150
     * @throws \Throwable
151
     */
152 5
    public function rollBack()
153
    {
154
        try {
155
            /**
156
             * Roll back all the way as this is supposed to be the top level transaction and we want to reset
157
             * the nesting level
158
             */
159 5
            $transactionNestingLevel = $this->connection->getTransactionNestingLevel();
160 5
            for ($i = 0; $i < $transactionNestingLevel - 1; $i++) {
161 1
                $this->connection->rollBack();
162
            }
163 5
            $this->connection->rollBack();
164 4
        } catch (\Throwable $t) {
165 4
            $rethrowable = $this->attemptToReconnectPresumedLostConnection($t);
166
            /**
167
             * If connection is functional we need to make sure the connection is not rollback only.
168
             * This can only be achieved by starting a transaction and rolling it back (the "why" is found in
169
             * lines 1277-1279 of Doctrine\DBAL\Connection).
170
             */
171 4
            if ($rethrowable === $t) {
172 4
                $this->connection->beginTransaction();
173 4
                $this->connection->rollBack();
174
            }
175 4
            throw $rethrowable;
176
        }
177 1
    }
178
179
    /**
180
     * Attempts to reconnect once if disconnected.
181
     *
182
     * @param \DateTime $dateTime
183
     *
184
     * @throws \Exception
185
     */
186 2
    public function removeEntriesOlderThan(\DateTime $dateTime)
187
    {
188 2
        $dateTime->setTimezone(new \DateTimeZone('UTC'));
189 2
        $this->connection->executeUpdate(
190 2
            "DELETE FROM {$this->messagesTableName} WHERE is_dispatched = 1 AND dispatched_at <= ?",
191 2
            [$dateTime->format('Y-m-d H:i:s')]
192
        );
193 2
    }
194
195
    /**
196
     * @param string $endpointName
197
     *
198
     * @return int
199
     * @throws \Exception
200
     */
201 2
    public function fetchOrGenerateEndpointId($endpointName)
202
    {
203 2
        $endpointId = 0;
204 2
        $this->connection->transactional(
205 2
            function (Connection $connection) use ($endpointName, &$endpointId) {
206 2
                $lookupHash = md5($endpointName);
207 2
                $endpointRecord = $connection->executeQuery(
208 2
                    "SELECT * FROM {$this->endpointsTableName} WHERE lookup_hash = ?",
209 2
                    [hex2bin($lookupHash)]
210 2
                )->fetch(\PDO::FETCH_ASSOC);
211 2
                if (!$endpointRecord) {
212 2
                    $connection->insert(
213 2
                        $this->endpointsTableName,
214 2
                        ['lookup_hash' => hex2bin($lookupHash), 'name' => $endpointName]
215
                    );
216 2
                    $endpointId = (int)$connection->lastInsertId();
217
                } else {
218 1
                    $endpointId = (int)$endpointRecord['id'];
219
                }
220
221 2
            }
222
        );
223
224 2
        return $endpointId;
225
    }
226
227
    /**
228
     * @param string $messageId
229
     *
230
     * @return string
231
     */
232 9
    private function stripDashes($messageId)
233
    {
234 9
        return str_replace('-', '', $messageId);
235
    }
236
237
    /**
238
     * It attempts to reconnect if connection is non responsive. Failing to reconnect triggers a critical error.
239
     * If connection is responsive or successfully reconnected it rethrows, relying on the bus retries
240
     * to re-execute everything from the beginning.
241
     *
242
     * @param \Throwable $t
243
     *
244
     * @return \Throwable|CriticalErrorException
245
     */
246 5
    private function attemptToReconnectPresumedLostConnection(\Throwable $t)
247
    {
248
        // presumably, any exception caught here is related to some connection error
249 5
        if (!$this->connection->ping()) {
250
            // if pinging fails, we try to reconnect
251
            try {
252 5
                $this->connection->close();
253 5
                $this->connection->connect();
254
            } catch (\Throwable $t) {
255
                // if reconnecting fails, there is no way that the bus can continue to function
256
                return new CriticalErrorException("Database connection failed.", 0, $t);
257
            }
258
        }
259
260 5
        return $t;
261
    }
262
}
263