Failed Conditions
Pull Request — master (#46)
by
unknown
02:00
created

MasterSlavesConnection::getSlaveStatus()   C

Complexity

Conditions 7
Paths 13

Size

Total Lines 25
Code Lines 18

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 25
rs 6.7272
c 0
b 0
f 0
cc 7
eloc 18
nc 13
nop 0
1
<?php
2
3
namespace Ez\DbLinker\Driver\Connection;
4
5
use Exception;
6
use Doctrine\DBAL\DriverManager;
7
use Doctrine\DBAL\Driver\Connection;
8
use Doctrine\DBAL\Driver\PDOConnection;
9
10
class MasterSlavesConnection implements Connection, ConnectionWrapper
11
{
12
    use ConnectionWrapperTrait;
13
14
    private $master;
15
    private $slaves;
16
    private $currentConnectionParams;
17
    private $currentSlave;
18
    private $cache;
19
    private $forceMaster;
20
    private $maxSlaveDelay = 30;
21
    private $slaveStatusCacheTtl = 10;
22
23
    public function __construct(array $master, array $slaves, $cache = null)
24
    {
25
        $this->master = $master;
26
        $this->checkSlaves($slaves);
27
        $this->slaves = $slaves;
28
        $this->cache = $cache;
29
        $this->forceMaster = false;
30
    }
31
32
    public function disableCache() {
33
        return $this->cache->disableCache();
34
    }
35
36
    private function checkSlaves(array $slaves)
37
    {
38
        foreach ($slaves as $slave) {
39
            if ((int)$slave['weight'] < 0) {
40
                throw new Exception('Slave weight must be >= 0');
41
            }
42
        }
43
    }
44
45
    public function connectToMaster($forced = null)
46
    {
47
        if ($forced !== null) {
48
            $this->forceMaster = $forced;
49
        }
50
        if ($this->currentConnectionParams === $this->master) {
51
            return;
52
        }
53
        $this->currentConnectionParams = $this->master;
54
        $this->currentSlave = null;
55
        $this->wrappedConnection = null;
56
    }
57
58
    public function connectToSlave()
59
    {
60
        $this->forceMaster = false;
61
        if ($this->currentConnectionParams !== null // if connection exists
62
            && (
63
                $this->currentConnectionParams !== $this->master // and is not a master
64
                || $this->currentConnectionParams === $this->currentSlave // or is current slave
65
                )
66
            ) {
67
            return;
68
        }
69
        $this->currentConnectionParams = null;
70
        $this->currentSlave = null;
71
        $this->wrappedConnection = null;
72
        $this->wrap();
73
        while (!$this->isSlaveOk() && $this->currentSlave !== null) {
74
            $this->wrap();
75
        }
76
    }
77
78
    public function isConnectedToMaster()
79
    {
80
        return $this->currentSlave === null && $this->currentConnectionParams !== null;
81
    }
82
83
    /**
84
     * @inherit
85
     */
86
    public function getCurrentConnection()
87
    {
88
        return $this->wrappedConnection();
89
    }
90
91
    protected function wrap()
92
    {
93
        if ($this->wrappedConnection !== null) {
94
            return $this->wrappedConnection;
95
        }
96
        if ($this->currentConnectionParams === null) {
97
            $this->currentSlave = $this->chooseASlave();
98
            $this->currentConnectionParams = $this->currentSlave !== null ? $this->slaves[$this->currentSlave] : $this->master;
99
        }
100
        $connection = DriverManager::getConnection($this->currentConnectionParams);
101
        $this->wrappedConnection = $connection->getWrappedConnection();
102
        $this->wrappedDriver = $connection->getDriver();
103
    }
104
105
    private function chooseASlave()
106
    {
107
        $totalSlavesWeight = $this->totalSlavesWeight();
108
        if ($totalSlavesWeight < 1) {
109
            return null;
110
        }
111
        $weightTarget = mt_rand(1, $totalSlavesWeight);
112
        foreach ($this->slaves as $n => $slave) {
113
            if ($slave['weight'] <= 0) {
114
                continue;
115
            }
116
            $weightTarget -= $slave['weight'];
117
            if ($weightTarget <= 0) {
118
                return $n;
119
            }
120
        }
121
    }
122
123
    private function totalSlavesWeight()
124
    {
125
        $weight = 0;
126
        foreach ($this->slaves as $slave) {
127
            $weight += $slave['weight'];
128
        }
129
        return $weight;
130
    }
131
132
    public function disableCurrentSlave()
133
    {
134
        if ($this->currentSlave !== null) {
135
            array_splice($this->slaves, $this->currentSlave, 1);
136
            $this->currentSlave = null;
137
        }
138
        $this->currentConnectionParams = null;
139
        $this->wrappedConnection = null;
140
    }
141
142
    public function slaves()
143
    {
144
        return $this->slaves;
145
    }
146
147
    /**
148
     * Prepares a statement for execution and returns a Statement object.
149
     *
150
     * @param string $prepareString
151
     *
152
     * @return \Doctrine\DBAL\Driver\Statement
153
     */
154
    public function prepare($prepareString)
155
    {
156
        $this->connectToMaster(true);
157
        return $this->wrappedConnection()->prepare($prepareString);
158
    }
159
160
    /**
161
     * Executes an SQL statement, returning a result set as a Statement object.
162
     *
163
     * @return \Doctrine\DBAL\Driver\Statement
164
     */
165
    public function query()
166
    {
167
        if ($this->forceMaster !== true) {
168
            $this->connectToSlave();
169
        }
170
        return call_user_func_array([$this->wrappedConnection(), __FUNCTION__], func_get_args());
171
    }
172
173
    /**
174
     * Quotes a string for use in a query.
175
     *
176
     * @param string  $input
177
     * @param integer $type
178
     *
179
     * @return string
180
     */
181
    public function quote($input, $type = \PDO::PARAM_STR)
182
    {
183
        return $this->wrappedConnection()->quote($input, $type);
184
    }
185
186
    /**
187
     * Executes an SQL statement and return the number of affected rows.
188
     *
189
     * @param string $statement
190
     *
191
     * @return integer
192
     */
193
    public function exec($statement)
194
    {
195
        $this->connectToMaster();
196
        return $this->wrappedConnection()->exec($statement);
197
    }
198
199
    /**
200
     * Returns the ID of the last inserted row or sequence value.
201
     *
202
     * @param string|null $name
203
     *
204
     * @return string
205
     */
206
    public function lastInsertId($name = null)
207
    {
208
        $this->forceMaster = true;
209
        return $this->wrappedConnection()->lastInsertId($name);
210
    }
211
212
    /**
213
     * Initiates a transaction.
214
     *
215
     * @return boolean TRUE on success or FALSE on failure.
216
     */
217
    public function beginTransaction()
218
    {
219
        $this->connectToMaster(true);
220
        return $this->wrappedConnection()->beginTransaction();
221
    }
222
223
    /**
224
     * Commits a transaction.
225
     *
226
     * @return boolean TRUE on success or FALSE on failure.
227
     */
228
    public function commit()
229
    {
230
        $this->connectToMaster(false);
231
        return $this->wrappedConnection()->commit();
232
    }
233
234
    /**
235
     * Rolls back the current transaction, as initiated by beginTransaction().
236
     *
237
     * @return boolean TRUE on success or FALSE on failure.
238
     */
239
    public function rollBack()
240
    {
241
        $this->connectToMaster(false);
242
        return $this->wrappedConnection()->rollBack();
243
    }
244
245
    /**
246
     * Returns the error code associated with the last operation on the database handle.
247
     *
248
     * @return string|null The error code, or null if no operation has been run on the database handle.
249
     */
250
    public function errorCode()
251
    {
252
        return $this->wrappedConnection()->errorCode();
253
    }
254
255
    /**
256
     * Returns extended error information associated with the last operation on the database handle.
257
     *
258
     * @return array
259
     */
260
    public function errorInfo()
261
    {
262
        return $this->wrappedConnection()->errorInfo();
263
    }
264
265
    public function close()
266
    {
267
        if (!$this->wrappedConnection() instanceof PDOConnection) {
268
            return $this->wrappedConnection()->getWrappedResourceHandle()->close();
269
        }
270
    }
271
272
    private function hasCache() {
273
        return $this->cache !== null;
274
    }
275
276
    private function getCacheKey() {
277
        return "MasterSlavesConnection_".strtr(serialize($this->currentConnectionParams), '{}()/@:', '______|');
278
    }
279
280
    public function setSlaveStatus(bool $running, ?int $delay) {
281
        if ($this->hasCache()) {
282
            $this->cache->setCacheItem($this->getCacheKey(), ["running" => $running, "delay" => $delay], $this->slaveStatusCacheTtl);
283
        }
284
        return ['running' => $running, 'delay' => $delay];
285
    }
286
287
    private function getSlaveStatus() {
288
        if (stripos($this->wrappedDriver->getName(), 'pgsql') !== false) {
289
            try {
290
                $sss = $this->wrappedConnection()->query("SELECT CASE WHEN pg_last_xlog_receive_location() IS NULL THEN NULL WHEN pg_last_xlog_receive_location() = pg_last_xlog_replay_location() THEN '00:00:00' ELSE now() - pg_last_xact_replay_timestamp() END AS replication_lag")->fetch();
291
                return $this->setSlaveStatus(true, $sss['replication_lag']);
292
            } catch (\Exception $e) {
293
                return $this->setSlaveStatus(false, null);
294
            }
295
        } else {
296
            try {
297
                $sss = $this->wrappedConnection()->query("SHOW SLAVE STATUS")->fetch();
298
                if ($sss['Slave_IO_Running'] === 'No' || $sss['Slave_SQL_Running'] === 'No') {
299
                    // slave is STOPPED
300
                    return $this->setSlaveStatus(false, null);
301
                } else {
302
                    return $this->setSlaveStatus(true, $sss['Seconds_Behind_Master']);
303
                }
304
            } catch (\Exception $e) {
305
                if (stripos($e->getMessage(), "Access denied") !== false) {
306
                    return $this->setSlaveStatus(true, 0);
307
                }
308
                return $this->setSlaveStatus(false, null);
309
            }
310
        }
311
    }
312
313
    public function isSlaveOk($maxdelay = null) {
314
        if ($maxdelay === null) {
315
            $maxdelay = $this->maxSlaveDelay;
316
        }
317
        if ($this->hasCache()) {
318
            $status = $this->cache->getCacheItem($this->getCacheKey());
319
            if ($status === null) {
320
                $status = $this->getSlaveStatus();
321
            }
322
        } else {
323
            $status = $this->getSlaveStatus();
324
        }
325
        if (!$status['running'] || $status['delay'] >= $maxdelay) {
326
            $this->disableCurrentSlave();
327
            $this->wrap();
328
            return false;
329
        }
330
        return true;
331
    }
332
}
333