Failed Conditions
Pull Request — master (#44)
by
unknown
02:00
created

MasterSlavesConnection::chooseASlave()   B

Complexity

Conditions 5
Paths 5

Size

Total Lines 17
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 17
rs 8.8571
c 0
b 0
f 0
cc 5
eloc 11
nc 5
nop 0
1
<?php
2
3
namespace Ez\DbLinker\Driver\Connection;
4
5
use Exception;
6
use Doctrine\DBAL\DriverManager;
7
use Doctrine\DBAL\Driver\Connection;
8
use Doctrine\DBAL\Driver\PDOConnection;
9
10
class MasterSlavesConnection implements Connection, ConnectionWrapper
11
{
12
    use ConnectionWrapperTrait;
13
14
    private $master;
15
    private $slaves;
16
    private $currentConnectionParams;
17
    private $currentSlave;
18
    private $cache;
19
    private $forceMaster;
20
    private $maxSlaveDelay = 30;
21
    private $slaveStatusCacheTtl = 10;
22
23
    public function __construct(array $master, array $slaves, $cache = null)
24
    {
25
        $this->master = $master;
26
        $this->checkSlaves($slaves);
27
        $this->slaves = $slaves;
28
        $this->cache = $cache;
29
        $this->forceMaster = false;
30
    }
31
32
    public function disableCache() {
33
        return $this->cache->disableCache();
34
    }
35
36
    private function checkSlaves(array $slaves)
37
    {
38
        foreach ($slaves as $slave) {
39
            if ((int)$slave['weight'] < 0) {
40
                throw new Exception('Slave weight must be >= 0');
41
            }
42
        }
43
    }
44
45
    public function connectToMaster($forced = null)
46
    {
47
        if ($forced !== null) {
48
            $this->forceMaster = $forced;
49
        }
50
        if ($this->currentConnectionParams === $this->master) {
51
            return;
52
        }
53
        $this->currentConnectionParams = $this->master;
54
        $this->currentSlave = null;
55
        $this->wrappedConnection = null;
56
    }
57
58
    public function connectToSlave()
59
    {
60
        $this->forceMaster = false;
61
        if ($this->currentConnectionParams !== null && $this->currentConnectionParams !== $this->master) {
62
            return;
63
        }
64
        $this->currentConnectionParams = null;
65
        $this->currentSlave = null;
66
        $this->wrappedConnection = null;
67
        $this->wrap();
68
        while (!$this->isSlaveOk() && $this->currentSlave !== null) {
69
            $this->wrap();
70
        }
71
    }
72
73
    public function isConnectedToMaster()
74
    {
75
        return $this->currentSlave === null && $this->currentConnectionParams !== null;
76
    }
77
78
    /**
79
     * @inherit
80
     */
81
    public function getCurrentConnection()
82
    {
83
        return $this->wrappedConnection();
84
    }
85
86
    protected function wrap()
87
    {
88
        if ($this->wrappedConnection !== null) {
89
            return $this->wrappedConnection;
90
        }
91
        if ($this->currentConnectionParams === null) {
92
            $this->currentSlave = $this->chooseASlave();
93
            $this->currentConnectionParams = $this->currentSlave !== null ? $this->slaves[$this->currentSlave] : $this->master;
94
        }
95
        $connection = DriverManager::getConnection($this->currentConnectionParams);
96
        $this->wrappedConnection = $connection->getWrappedConnection();
97
        $this->wrappedDriver = $connection->getDriver();
98
    }
99
100
    private function chooseASlave()
101
    {
102
        $totalSlavesWeight = $this->totalSlavesWeight();
103
        if ($totalSlavesWeight < 1) {
104
            return null;
105
        }
106
        $weightTarget = mt_rand(1, $totalSlavesWeight);
107
        foreach ($this->slaves as $n => $slave) {
108
            if ($slave['weight'] <= 0) {
109
                continue;
110
            }
111
            $weightTarget -= $slave['weight'];
112
            if ($weightTarget <= 0) {
113
                return $n;
114
            }
115
        }
116
    }
117
118
    private function totalSlavesWeight()
119
    {
120
        $weight = 0;
121
        foreach ($this->slaves as $slave) {
122
            $weight += $slave['weight'];
123
        }
124
        return $weight;
125
    }
126
127
    public function disableCurrentSlave()
128
    {
129
        if ($this->currentSlave !== null) {
130
            array_splice($this->slaves, $this->currentSlave, 1);
131
            $this->currentSlave = null;
132
        }
133
        $this->currentConnectionParams = null;
134
        $this->wrappedConnection = null;
135
    }
136
137
    public function slaves()
138
    {
139
        return $this->slaves;
140
    }
141
142
    /**
143
     * Prepares a statement for execution and returns a Statement object.
144
     *
145
     * @param string $prepareString
146
     *
147
     * @return \Doctrine\DBAL\Driver\Statement
148
     */
149
    public function prepare($prepareString)
150
    {
151
        $this->connectToMaster(true);
152
        return $this->wrappedConnection()->prepare($prepareString);
153
    }
154
155
    /**
156
     * Executes an SQL statement, returning a result set as a Statement object.
157
     *
158
     * @return \Doctrine\DBAL\Driver\Statement
159
     */
160
    public function query()
161
    {
162
        if ($this->forceMaster !== true) {
163
            $this->connectToSlave();
164
        }
165
        return call_user_func_array([$this->wrappedConnection(), __FUNCTION__], func_get_args());
166
    }
167
168
    /**
169
     * Quotes a string for use in a query.
170
     *
171
     * @param string  $input
172
     * @param integer $type
173
     *
174
     * @return string
175
     */
176
    public function quote($input, $type = \PDO::PARAM_STR)
177
    {
178
        return $this->wrappedConnection()->quote($input, $type);
179
    }
180
181
    /**
182
     * Executes an SQL statement and return the number of affected rows.
183
     *
184
     * @param string $statement
185
     *
186
     * @return integer
187
     */
188
    public function exec($statement)
189
    {
190
        $this->connectToMaster();
191
        return $this->wrappedConnection()->exec($statement);
192
    }
193
194
    /**
195
     * Returns the ID of the last inserted row or sequence value.
196
     *
197
     * @param string|null $name
198
     *
199
     * @return string
200
     */
201
    public function lastInsertId($name = null)
202
    {
203
        $this->forceMaster = true;
204
        return $this->wrappedConnection()->lastInsertId($name);
205
    }
206
207
    /**
208
     * Initiates a transaction.
209
     *
210
     * @return boolean TRUE on success or FALSE on failure.
211
     */
212
    public function beginTransaction()
213
    {
214
        $this->connectToMaster(true);
215
        return $this->wrappedConnection()->beginTransaction();
216
    }
217
218
    /**
219
     * Commits a transaction.
220
     *
221
     * @return boolean TRUE on success or FALSE on failure.
222
     */
223
    public function commit()
224
    {
225
        $this->connectToMaster(false);
226
        return $this->wrappedConnection()->commit();
227
    }
228
229
    /**
230
     * Rolls back the current transaction, as initiated by beginTransaction().
231
     *
232
     * @return boolean TRUE on success or FALSE on failure.
233
     */
234
    public function rollBack()
235
    {
236
        $this->connectToMaster(false);
237
        return $this->wrappedConnection()->rollBack();
238
    }
239
240
    /**
241
     * Returns the error code associated with the last operation on the database handle.
242
     *
243
     * @return string|null The error code, or null if no operation has been run on the database handle.
244
     */
245
    public function errorCode()
246
    {
247
        return $this->wrappedConnection()->errorCode();
248
    }
249
250
    /**
251
     * Returns extended error information associated with the last operation on the database handle.
252
     *
253
     * @return array
254
     */
255
    public function errorInfo()
256
    {
257
        return $this->wrappedConnection()->errorInfo();
258
    }
259
260
    public function close()
261
    {
262
        if (!$this->wrappedConnection() instanceof PDOConnection) {
263
            return $this->wrappedConnection()->getWrappedResourceHandle()->close();
264
        }
265
    }
266
267
    private function hasCache() {
268
        return $this->cache !== null;
269
    }
270
271
    private function getCacheKey() {
272
        return "MasterSlavesConnection_".strtr(serialize($this->currentConnectionParams), '{}()/@:', '______|');
273
    }
274
275
    public function setSlaveStatus(bool $running, ?int $delay) {
276
        if ($this->hasCache()) {
277
            $this->cache->setCacheItem($this->getCacheKey(), ["running" => $running, "delay" => $delay], $this->slaveStatusCacheTtl);
278
        }
279
        return ['running' => $running, 'delay' => $delay];
280
    }
281
282
    private function getSlaveStatus() {
283
        try {
284
            $sss = $this->wrappedConnection()->query("SHOW SLAVE STATUS")->fetch();
285
            if ($sss['Slave_IO_Running'] === 'No' || $sss['Slave_SQL_Running'] === 'No') {
286
                // slave is STOPPED
287
                return $this->setSlaveStatus(false, INF);
288
            } else {
289
                return $this->setSlaveStatus(true, $sss['Seconds_Behind_Master']);
290
            }
291
        } catch (\Exception $e) {
292
            return $this->setSlaveStatus(true, 0);
293
        }
294
    }
295
296
    public function isSlaveOk($maxdelay = null) {
297
        if ($maxdelay === null) {
298
            $maxdelay = $this->maxSlaveDelay;
299
        }
300
        if ($this->hasCache()) {
301
            $status = $this->cache->getCacheItem($this->getCacheKey());
302
            if ($status === null) {
303
                $status = $this->getSlaveStatus();
304
            }
305
        } else {
306
            $status = $this->getSlaveStatus();
307
        }
308
        if (!$status['running'] || $status['delay'] >= $maxdelay) {
309
            $this->disableCurrentSlave();
310
            $this->wrap();
311
            return false;
312
        }
313
        return true;
314
    }
315
}
316