Failed Conditions
Pull Request — master (#41)
by
unknown
01:23
created

MasterSlavesConnection::close()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 6
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 6
rs 9.4285
c 0
b 0
f 0
cc 2
eloc 3
nc 2
nop 0
1
<?php
2
3
namespace Ez\DbLinker\Driver\Connection;
4
5
use Exception;
6
use Doctrine\DBAL\DriverManager;
7
use Doctrine\DBAL\Driver\Connection;
8
use Doctrine\DBAL\Driver\PDOConnection;
9
10
class MasterSlavesConnection implements Connection, ConnectionWrapper
11
{
12
    use ConnectionWrapperTrait;
13
14
    private $master;
15
    private $slaves;
16
    private $currentConnectionParams;
17
    private $currentSlave;
18
    private $cache;
19
    private $forceMaster;
20
    private $maxSlaveDelay = 30;
21
    private $slaveStatusCacheTtl = 10;
22
23
    public function __construct(array $master, array $slaves, $cache = null)
24
    {
25
        $this->master = $master;
26
        $this->checkSlaves($slaves);
27
        $this->slaves = $slaves;
28
        $this->cache = $cache;
29
        $this->forceMaster = false;
30
    }
31
32
    public function disableCache() {
33
        return $this->cache->disableCache();
34
    }
35
36
    private function checkSlaves(array $slaves)
37
    {
38
        foreach ($slaves as $slave) {
39
            if ((int)$slave['weight'] < 0) {
40
                throw new Exception('Slave weight must be >= 0');
41
            }
42
        }
43
    }
44
45
    public function connectToMaster($forced = null)
46
    {
47
        if ($forced !== null) {
48
            $this->forceMaster = $forced;
49
        }
50
        if ($this->currentConnectionParams === $this->master) {
51
            return;
52
        }
53
        $this->currentConnectionParams = $this->master;
54
        $this->currentSlave = null;
55
        $this->wrappedConnection = null;
56
    }
57
58
    public function connectToSlave()
59
    {
60
        $this->forceMaster = false;
61
        if ($this->currentConnectionParams !== null && $this->currentConnectionParams !== $this->master) {
62
            return;
63
        }
64
        $this->currentConnectionParams = null;
65
        $this->currentSlave = null;
66
        $this->wrappedConnection = null;
67
        $this->wrap();
68
        while (!$this->isSlaveOk() && $this->currentSlave !== null) {
69
            $this->wrap();
70
        }
71
    }
72
73
    public function isConnectedToMaster()
74
    {
75
        return $this->currentSlave === null && $this->currentConnectionParams !== null;
76
    }
77
78
    /**
79
     * @inherit
80
     */
81
    public function getCurrentConnection()
82
    {
83
        return $this->wrappedConnection();
84
    }
85
86
    protected function wrap()
87
    {
88
        if ($this->wrappedConnection !== null) {
89
            return $this->wrappedConnection;
90
        }
91
        if ($this->currentConnectionParams === null) {
92
            $this->currentSlave = $this->chooseASlave();
93
            $this->currentConnectionParams = $this->currentSlave !== null ? $this->slaves[$this->currentSlave] : $this->master;
94
        }
95
        $connection = DriverManager::getConnection($this->currentConnectionParams);
96
        $this->wrappedConnection = $connection->getWrappedConnection();
97
        $this->wrappedDriver = $connection->getDriver();
98
    }
99
100
    private function chooseASlave()
101
    {
102
        $totalSlavesWeight = $this->totalSlavesWeight();
103
        if ($totalSlavesWeight < 1) {
104
            return null;
105
        }
106
        $weightTarget = mt_rand(1, $totalSlavesWeight);
107
        foreach ($this->slaves as $n => $slave) {
108
            $weightTarget -= $slave['weight'];
109
            if ($weightTarget <= 0) {
110
                return $n;
111
            }
112
        }
113
    }
114
115
    private function totalSlavesWeight()
116
    {
117
        $weight = 0;
118
        foreach ($this->slaves as $slave) {
119
            $weight += $slave['weight'];
120
        }
121
        return $weight;
122
    }
123
124
    public function disableCurrentSlave()
125
    {
126
        if ($this->currentSlave !== null) {
127
            array_splice($this->slaves, $this->currentSlave, 1);
128
            $this->currentSlave = null;
129
        }
130
        $this->currentConnectionParams = null;
131
        $this->wrappedConnection = null;
132
    }
133
134
    public function slaves()
135
    {
136
        return $this->slaves;
137
    }
138
139
    /**
140
     * Prepares a statement for execution and returns a Statement object.
141
     *
142
     * @param string $prepareString
143
     *
144
     * @return \Doctrine\DBAL\Driver\Statement
145
     */
146
    public function prepare($prepareString)
147
    {
148
        $this->connectToMaster(true);
149
        return $this->wrappedConnection()->prepare($prepareString);
150
    }
151
152
    /**
153
     * Executes an SQL statement, returning a result set as a Statement object.
154
     *
155
     * @return \Doctrine\DBAL\Driver\Statement
156
     */
157
    public function query()
158
    {
159
        if ($this->forceMaster !== true) {
160
            $this->connectToSlave();
161
        }
162
        return call_user_func_array([$this->wrappedConnection(), __FUNCTION__], func_get_args());
163
    }
164
165
    /**
166
     * Quotes a string for use in a query.
167
     *
168
     * @param string  $input
169
     * @param integer $type
170
     *
171
     * @return string
172
     */
173
    public function quote($input, $type = \PDO::PARAM_STR)
174
    {
175
        return $this->wrappedConnection()->quote($input, $type);
176
    }
177
178
    /**
179
     * Executes an SQL statement and return the number of affected rows.
180
     *
181
     * @param string $statement
182
     *
183
     * @return integer
184
     */
185
    public function exec($statement)
186
    {
187
        $this->connectToMaster();
188
        return $this->wrappedConnection()->exec($statement);
189
    }
190
191
    /**
192
     * Returns the ID of the last inserted row or sequence value.
193
     *
194
     * @param string|null $name
195
     *
196
     * @return string
197
     */
198
    public function lastInsertId($name = null)
199
    {
200
        $this->forceMaster = true;
201
        return $this->wrappedConnection()->lastInsertId($name);
202
    }
203
204
    /**
205
     * Initiates a transaction.
206
     *
207
     * @return boolean TRUE on success or FALSE on failure.
208
     */
209
    public function beginTransaction()
210
    {
211
        $this->connectToMaster(true);
212
        return $this->wrappedConnection()->beginTransaction();
213
    }
214
215
    /**
216
     * Commits a transaction.
217
     *
218
     * @return boolean TRUE on success or FALSE on failure.
219
     */
220
    public function commit()
221
    {
222
        $this->connectToMaster(false);
223
        return $this->wrappedConnection()->commit();
224
    }
225
226
    /**
227
     * Rolls back the current transaction, as initiated by beginTransaction().
228
     *
229
     * @return boolean TRUE on success or FALSE on failure.
230
     */
231
    public function rollBack()
232
    {
233
        $this->connectToMaster(false);
234
        return $this->wrappedConnection()->rollBack();
235
    }
236
237
    /**
238
     * Returns the error code associated with the last operation on the database handle.
239
     *
240
     * @return string|null The error code, or null if no operation has been run on the database handle.
241
     */
242
    public function errorCode()
243
    {
244
        return $this->wrappedConnection()->errorCode();
245
    }
246
247
    /**
248
     * Returns extended error information associated with the last operation on the database handle.
249
     *
250
     * @return array
251
     */
252
    public function errorInfo()
253
    {
254
        return $this->wrappedConnection()->errorInfo();
255
    }
256
257
    public function close()
258
    {
259
        if (!$this->wrappedConnection() instanceof PDOConnection) {
260
            return $this->wrappedConnection()->getWrappedResourceHandle()->close();
261
        }
262
    }
263
264
    private function hasCache() {
265
        return $this->cache !== null;
266
    }
267
268
    private function getCacheKey() {
269
        return "MasterSlavesConnection_".strtr(serialize($this->currentConnectionParams), '{}()/@:', '______|');
270
    }
271
272
    public function setSlaveStatus(bool $running, int $delay) {
273
        if ($this->hasCache()) {
274
            $this->cache->setCacheItem($this->getCacheKey(), ["running" => $running, "delay" => $delay], $this->slaveStatusCacheTtl);
275
        }
276
        return ['running' => $running, 'delay' => $delay];
277
    }
278
279
    private function getSlaveStatus() {
280
        try {
281
            $sss = $this->wrappedConnection()->query("SHOW SLAVE STATUS")->fetch();
282
            if ($sss['Slave_IO_Running'] === 'No' || $sss['Slave_SQL_Running'] === 'No') {
283
                // slave is STOPPED
284
                return $this->setSlaveStatus(false, INF);
285
            } else {
286
                return $this->setSlaveStatus(true, $sss['Seconds_Behind_Master']);
287
            }
288
        } catch (\Exception $e) {
289
            return $this->setSlaveStatus(true, 0);
290
        }
291
    }
292
293
    public function isSlaveOk($maxdelay = null) {
294
        if ($maxdelay === null) {
295
            $maxdelay = $this->maxSlaveDelay;
296
        }
297
        if ($this->hasCache()) {
298
            $status = $this->cache->getCacheItem($this->getCacheKey());
299
            if ($status === null) {
300
                $status = $this->getSlaveStatus();
301
            }
302
        } else {
303
            $status = $this->getSlaveStatus();
304
        }
305
        if (!$status['running'] || $status['delay'] >= $maxdelay) {
306
            $this->disableCurrentSlave();
307
            $this->wrap();
308
            return false;
309
        }
310
        return true;
311
    }
312
}
313