OutboxPersister   A
last analyzed

Complexity

Total Complexity 16

Size/Duplication

Total Lines 209
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 1

Test Coverage

Coverage 93.83%

Importance

Changes 1
Bugs 0 Features 0
Metric Value
wmc 16
c 1
b 0
f 0
lcom 1
cbo 1
dl 0
loc 209
ccs 76
cts 81
cp 0.9383
rs 10

10 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 9 1
A get() 0 21 3
A store() 0 12 1
A markAsDispatched() 0 17 2
A beginTransaction() 0 8 2
A commit() 0 4 1
A rollBack() 0 8 2
A removeEntriesOlderThan() 0 8 1
B fetchOrGenerateEndpointId() 0 25 2
A stripDashes() 0 4 1
1
<?php
2
namespace PSB\Persistence\Doctrine1\Outbox;
3
4
5
use PSB\Persistence\Doctrine1\LogicalConnection;
6
7
class OutboxPersister
8
{
9
    /**
10
     * @var LogicalConnection
11
     */
12
    private $connection;
13
14
    /**
15
     * @var string
16
     */
17
    private $endpointsTableName;
18
19
    /**
20
     * @var string
21
     *
22
     */
23
    private $messagesTableName;
24
25
    /**
26
     * @param LogicalConnection $connection
27
     * @param string            $messagesTableName
28
     * @param string            $endpointsTableName
29
     */
30 13
    public function __construct(
31
        LogicalConnection $connection,
32
        $messagesTableName,
33
        $endpointsTableName
34
    ) {
35 13
        $this->connection = $connection;
36 13
        $this->endpointsTableName = $endpointsTableName;
37 13
        $this->messagesTableName = $messagesTableName;
38 13
    }
39
40
    /**
41
     * Attempts to reconnect once if disconnected.
42
     *
43
     * @param int    $endpointId
44
     * @param string $messageId
45
     *
46
     * @return array
47
     * @throws \Exception
48
     */
49 6
    public function get($endpointId, $messageId)
50
    {
51
        try {
52 6
            $result = $this->connection->executeQuery(
53 6
                "SELECT * FROM {$this->messagesTableName} WHERE endpoint_id = ? AND message_id = ?",
54 6
                [$endpointId, $this->stripDashes($messageId)]
55 6
            )->fetch(\PDO::FETCH_ASSOC);
56 6
        } catch (\Exception $e) {
57 1
            throw $this->connection->reconnectIfNeeded($e);
58
        }
59
60 5
        if (!$result) {
61 2
            return null;
62
        }
63
64 3
        unset($result['id']);
65 3
        unset($result['dispatched_at']);
66 3
        unset($result['endpoint_id']);
67
68 3
        return $result;
69
    }
70
71
    /**
72
     * @param int   $endpointId
73
     * @param array $outboxRecord
74
     *
75
     * @throws \Exception
76
     */
77 5
    public function store($endpointId, array $outboxRecord)
78
    {
79 5
        $outboxRecord['message_id'] = $this->stripDashes($outboxRecord['message_id']);
80 5
        $outboxRecord['endpoint_id'] = $endpointId;
81 5
        $outboxRecord['is_dispatched'] = 0;
82
83 5
        $this->connection->transactional(
84
            function (LogicalConnection $connection) use ($outboxRecord) {
85 5
                $connection->insert($this->messagesTableName, $outboxRecord);
86 5
            }
87 5
        );
88 5
    }
89
90
    /**
91
     * Attempts to reconnect once if disconnected.
92
     *
93
     * @param int    $endpointId
94
     * @param string $messageId
95
     *
96
     * @throws \Exception
97
     */
98 4
    public function markAsDispatched($endpointId, $messageId)
99
    {
100
        try {
101 4
            $this->connection->executeUpdate(
102 4
                "UPDATE {$this->messagesTableName}
103
                     SET is_dispatched = 1, dispatched_at = ?, transport_operations = ''
104 4
                     WHERE endpoint_id = ? AND message_id = ? AND is_dispatched = 0",
105
                [
106 4
                    (new \DateTime('now', new \DateTimeZone('UTC')))->format('Y-m-d H:i:s'),
107 4
                    $endpointId,
108 4
                    $this->stripDashes($messageId)
109 4
                ]
110 4
            );
111 4
        } catch (\Exception $e) {
112 1
            throw $this->connection->reconnectIfNeeded($e);
113
        }
114 3
    }
115
116
    /**
117
     * Initiates the transaction
118
     * Attempts to reconnect once if disconnected.
119
     *
120
     * @throws \Exception
121
     */
122 1
    public function beginTransaction()
123
    {
124
        try {
125 1
            $this->connection->beginTransaction();
126 1
        } catch (\Exception $e) {
127 1
            throw $this->connection->reconnectIfNeeded($e);
128
        }
129
    }
130
131
    /**
132
     * Commits the transaction
133
     *
134
     * Does not attempt to reconnect if disconnected because the transaction would be broken anyway.
135
     * Reconnection should be done by rollBack.
136
     *
137
     * @return void
138
     */
139
    public function commit()
140
    {
141
        $this->connection->commit();
142
    }
143
144
    /**
145
     * Rolls back the transaction.
146
     * Attempts to reconnect once if disconnected.
147
     *
148
     * @throws \Exception
149
     */
150 3
    public function rollBack()
151
    {
152
        try {
153 3
            $this->connection->rollBack();;
154 3
        } catch (\Exception $e) {
155 3
            throw $this->connection->reconnectIfNeeded($e);
156
        }
157
    }
158
159
    /**
160
     * Attempts to reconnect once if disconnected.
161
     *
162
     * @param \DateTime $dateTime
163
     *
164
     * @throws \Exception
165
     */
166 2
    public function removeEntriesOlderThan(\DateTime $dateTime)
167
    {
168 2
        $dateTime->setTimezone(new \DateTimeZone('UTC'));
169 2
        $this->connection->executeUpdate(
170 2
            "DELETE FROM {$this->messagesTableName} WHERE is_dispatched = 1 AND dispatched_at <= ?",
171 2
            [$dateTime->format('Y-m-d H:i:s')]
172 2
        );
173 2
    }
174
175
    /**
176
     * @param string $endpointName
177
     *
178
     * @return int
179
     */
180 2
    public function fetchOrGenerateEndpointId($endpointName)
181
    {
182 2
        $endpointId = 0;
183 2
        $this->connection->transactional(
184 2
            function (LogicalConnection $connection) use ($endpointName, &$endpointId) {
185 2
                $lookupHash = md5($endpointName);
186 2
                $endpointRecord = $connection->executeQuery(
187 2
                    "SELECT * FROM {$this->endpointsTableName} WHERE lookup_hash = ?",
188 2
                    [$lookupHash]
189 2
                )->fetch(\PDO::FETCH_ASSOC);
0 ignored issues
show
Unused Code introduced by
The call to Doctrine_Adapter_Statement::fetch() has too many arguments starting with \PDO::FETCH_ASSOC.

This check compares calls to functions or methods with their respective definitions. If the call has more arguments than are defined, it raises an issue.

If a function is defined several times with a different number of parameters, the check may pick up the wrong definition and report false positives. One codebase where this has been known to happen is Wordpress.

In this case you can add the @ignore PhpDoc annotation to the duplicate definition and it will be ignored.

Loading history...
190 2
                if (!$endpointRecord) {
191 2
                    $connection->insert(
192 2
                        $this->endpointsTableName,
193 2
                        ['lookup_hash' => $lookupHash, 'name' => $endpointName]
194 2
                    );
195 2
                    $endpointId = (int)$connection->lastInsertId();
196 2
                } else {
197 1
                    $endpointId = (int)$endpointRecord['id'];
198
                }
199
200 2
            }
201 2
        );
202
203 2
        return $endpointId;
204
    }
205
206
    /**
207
     * @param string $messageId
208
     *
209
     * @return string
210
     */
211 8
    private function stripDashes($messageId)
212
    {
213 8
        return str_replace('-', '', $messageId);
214
    }
215
}
216