Passed
Pull Request — master (#47)
by GilWon
06:16
created

BinLogSocketConnect   A

Complexity

Total Complexity 27

Size/Duplication

Total Lines 325
Duplicated Lines 0 %

Test Coverage

Coverage 69.03%

Importance

Changes 0
Metric Value
eloc 134
dl 0
loc 325
ccs 107
cts 155
cp 0.6903
rs 10
c 0
b 0
f 0
wmc 27

13 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 12 1
A setBinLogDump() 0 27 4
A authenticate() 0 19 2
A getBinLogCurrent() 0 3 1
A setBinLogDumpGtid() 0 19 1
A registerSlave() 0 22 1
A isWriteSuccessful() 0 12 3
A execute() 0 4 1
A getBinlogStream() 0 21 5
A getCheckSum() 0 3 1
A setBinLogDumpMariaGtid() 0 8 1
A getCapabilities() 0 25 1
A getResponse() 0 27 5
1
<?php
2
3
namespace MySQLReplication\BinLog;
4
5
use MySQLReplication\BinaryDataReader\BinaryDataReader;
6
use MySQLReplication\Config\Config;
7
use MySQLReplication\Exception\MySQLReplicationException;
8
use MySQLReplication\Gtid\GtidException;
9
use MySQLReplication\Gtid\GtidFactory;
10
use MySQLReplication\Repository\RepositoryInterface;
11
use MySQLReplication\Socket\SocketInterface;
12
13
/**
14
 * Class BinLogSocketConnect
15
 * @package MySQLReplication\BinLog
16
 */
17
class BinLogSocketConnect
18
{
19
    const COM_BINLOG_DUMP = 0x12;
20
    const COM_REGISTER_SLAVE = 0x15;
21
    const COM_BINLOG_DUMP_GTID = 0x1e;
22
23
    /**
24
     * @var bool
25
     */
26
    private $checkSum = false;
27
    /**
28
     * @var RepositoryInterface
29
     */
30
    private $repository;
31
    /**
32
     * http://dev.mysql.com/doc/internals/en/auth-phase-fast-path.html 00 FE
33
     * @var array
34
     */
35
    private $packageOkHeader = [0, 254];
36
    /**
37
     * @var SocketInterface
38
     */
39
    private $socket;
40
    /**
41
     * 2^24 - 1 16m
42
     * @var int
43
     */
44
    private $binaryDataMaxLength = 16777215;
45
    /**
46
     * @var BinLogCurrent
47
     */
48
    private $binLogCurrent;
49
50
    /**
51
     * @param RepositoryInterface $repository
52
     * @param SocketInterface $socket
53
     * @throws BinLogException
54
     * @throws \MySQLReplication\Gtid\GtidException
55
     * @throws \MySQLReplication\Socket\SocketException
56
     */
57 55
    public function __construct(
58
        RepositoryInterface $repository,
59
        SocketInterface $socket
60
    ) {
61 55
        $this->repository = $repository;
62 55
        $this->socket = $socket;
63 55
        $this->binLogCurrent = new BinLogCurrent();
64
65 55
        $this->socket->connectToStream(Config::getHost(), Config::getPort());
66 55
        BinLogServerInfo::parsePackage($this->getResponse(false), $this->repository->getVersion());
67 55
        $this->authenticate();
68 55
        $this->getBinlogStream();
69 55
    }
70
71
    /**
72
     * @param bool $checkResponse
73
     * @return string
74
     * @throws \MySQLReplication\BinLog\BinLogException
75
     * @throws \MySQLReplication\Socket\SocketException
76
     */
77 55
    public function getResponse($checkResponse = true)
78
    {
79 55
        $header = $this->socket->readFromSocket(4);
80 55
        if ('' === $header) {
81
            return '';
82
        }
83 55
        $dataLength = unpack('L', $header[0] . $header[1] . $header[2] . chr(0))[1];
84 55
        $isMaxDataLength = $dataLength === $this->binaryDataMaxLength;
85
86 55
        $result = $this->socket->readFromSocket($dataLength);
87 55
        if (true === $checkResponse) {
88 55
            $this->isWriteSuccessful($result);
89 55
        }
90
91
        //https://dev.mysql.com/doc/internals/en/sending-more-than-16mbyte.html
92 55
        while ($isMaxDataLength) {
93
            $header = $this->socket->readFromSocket(4);
94
            if ('' === $header) {
95
                return $result;
96
            }
97
            $dataLength = unpack('L', $header[0] . $header[1] . $header[2] . chr(0))[1];
98
            $isMaxDataLength = $dataLength === $this->binaryDataMaxLength;
99
            $next_result = $this->socket->readFromSocket($dataLength);
100
            $result .= $next_result;
101
        }
102
103 55
        return $result;
104
    }
105
106
    /**
107
     * @param string $data
108
     *
109
     * @throws BinLogException
110
     */
111 55
    private function isWriteSuccessful($data)
112
    {
113 55
        $head = ord($data[0]);
114 55
        if (!in_array($head, $this->packageOkHeader, true)) {
115
            $errorCode = unpack('v', $data[1] . $data[2])[1];
116
            $errorMessage = '';
117
            $packetLength = strlen($data);
118
            for ($i = 9; $i < $packetLength; ++$i) {
119
                $errorMessage .= $data[$i];
120
            }
121
122
            throw new BinLogException($errorMessage, $errorCode);
123
        }
124 55
    }
125
126
    /**
127
     * @throws BinLogException
128
     * @throws \MySQLReplication\Socket\SocketException
129
     * @link http://dev.mysql.com/doc/internals/en/secure-password-authentication.html#packet-Authentication::Native41
130
     */
131 55
    private function authenticate()
132
    {
133 55
        $data = pack('L', self::getCapabilities());
134 55
        $data .= pack('L', $this->binaryDataMaxLength);
135 55
        $data .= chr(33);
136 55
        for ($i = 0; $i < 23; $i++) {
137 55
            $data .= chr(0);
138 55
        }
139 55
        $result = sha1(Config::getPassword(), true) ^ sha1(
140 55
                BinLogServerInfo::getSalt() . sha1(sha1(Config::getPassword(), true), true), true
141 55
            );
142
143 55
        $data = $data . Config::getUser() . chr(0) . chr(strlen($result)) . $result;
144 55
        $str = pack('L', strlen($data));
145 55
        $s = $str[0] . $str[1] . $str[2];
146 55
        $data = $s . chr(1) . $data;
147
148 55
        $this->socket->writeToSocket($data);
149 55
        $this->getResponse();
150 55
    }
151
152
    /**
153
     * http://dev.mysql.com/doc/internals/en/capability-flags.html#packet-protocol::capabilityflags
154
     * https://github.com/siddontang/mixer/blob/master/doc/protocol.txt
155
     * @return int
156
     */
157 55
    private static function getCapabilities()
158
    {
159
        /*
0 ignored issues
show
Unused Code Comprehensibility introduced by
36% of this comment could be valid code. Did you maybe forget this after debugging?

Sometimes obsolete code just ends up commented out instead of removed. In this case it is better to remove the code once you have checked you do not need it.

The code might also have been commented out for debugging purposes. In this case it is vital that someone uncomments it again or your project may behave in very unexpected ways in production.

This check looks for comments that seem to be mostly valid code and reports them.

Loading history...
160
            Left only as information
161
            $foundRows = 1 << 1;
162
            $connectWithDb = 1 << 3;
163
            $compress = 1 << 5;
164
            $odbc = 1 << 6;
165
            $localFiles = 1 << 7;
166
            $ignoreSpace = 1 << 8;
167
            $multiStatements = 1 << 16;
168
            $multiResults = 1 << 17;
169
            $interactive = 1 << 10;
170
            $ssl = 1 << 11;
171
            $ignoreSigPipe = 1 << 12;
172
        */
173
174 55
        $noSchema = 1 << 4;
175 55
        $longPassword = 1;
176 55
        $longFlag = 1 << 2;
177 55
        $transactions = 1 << 13;
178 55
        $secureConnection = 1 << 15;
179 55
        $protocol41 = 1 << 9;
180
181 55
        return ($longPassword | $longFlag | $transactions | $protocol41 | $secureConnection | $noSchema);
182
    }
183
184
    /**
185
     * @throws BinLogException
186
     * @throws GtidException
187
     * @throws \MySQLReplication\Socket\SocketException
188
     */
189 55
    private function getBinlogStream()
190
    {
191 55
        $this->checkSum = $this->repository->isCheckSum();
192 55
        if ($this->checkSum) {
193 55
            $this->execute('SET @master_binlog_checksum = @@global.binlog_checksum');
194 55
        }
195
196 55
        if (0 !== Config::getHeartbeatPeriod()) {
197
            // master_heartbeat_period is in nanoseconds
198
            $this->execute('SET @master_heartbeat_period = ' . Config::getHeartbeatPeriod() * 1000000000);
199
        }
200
201 55
        $this->registerSlave();
202
203 55
        if ('' !== Config::getMariaDbGtid()) {
204
            $this->setBinLogDumpMariaGtid();
205
        }
206 55
        if ('' !== Config::getGtid()) {
207
            $this->setBinLogDumpGtid();
208
        } else {
209 55
            $this->setBinLogDump();
210
        }
211 55
    }
212
213
    /**
214
     * @param string $sql
215
     * @throws BinLogException
216
     * @throws \MySQLReplication\Socket\SocketException
217
     */
218 55
    private function execute($sql)
219
    {
220 55
        $this->socket->writeToSocket(pack('LC', strlen($sql) + 1, 0x03) . $sql);
221 55
        $this->getResponse();
222 55
    }
223
224
    /**
225
     * @see https://dev.mysql.com/doc/internals/en/com-register-slave.html
226
     * @throws BinLogException
227
     * @throws \MySQLReplication\Socket\SocketException
228
     */
229 55
    private function registerSlave()
230
    {
231 55
        $host = gethostname();
232 55
        $hostLength = strlen($host);
233 55
        $userLength = strlen(Config::getUser());
234 55
        $passLength = strlen(Config::getPassword());
235
236 55
        $data = pack('l', 18 + $hostLength + $userLength + $passLength);
237 55
        $data .= chr(self::COM_REGISTER_SLAVE);
238 55
        $data .= pack('V', Config::getSlaveId());
239 55
        $data .= pack('C', $hostLength);
240 55
        $data .= $host;
241 55
        $data .= pack('C', $userLength);
242 55
        $data .= Config::getUser();
243 55
        $data .= pack('C', $passLength);
244 55
        $data .= Config::getPassword();
245 55
        $data .= pack('v', Config::getPort());
246 55
        $data .= pack('V', 0);
247 55
        $data .= pack('V', 0);
248
249 55
        $this->socket->writeToSocket($data);
250 55
        $this->getResponse();
251 55
    }
252
253
    /**
254
     * @throws \MySQLReplication\Socket\SocketException
255
     * @throws \MySQLReplication\BinLog\BinLogException
256
     */
257
    private function setBinLogDumpMariaGtid()
258
    {
259
        $this->execute('SET @mariadb_slave_capability = 4');
260
        $this->execute('SET @slave_connect_state = \'' . Config::getMariaDbGtid() . '\'');
261
        $this->execute('SET @slave_gtid_strict_mode = 0');
262
        $this->execute('SET @slave_gtid_ignore_duplicates = 0');
263
264
        $this->binLogCurrent->setMariaDbGtid(Config::getMariaDbGtid());
265
    }
266
267
    /**
268
     * @see https://dev.mysql.com/doc/internals/en/com-binlog-dump-gtid.html
269
     * @throws BinLogException
270
     * @throws GtidException
271
     * @throws \MySQLReplication\Socket\SocketException
272
     */
273
    private function setBinLogDumpGtid()
274
    {
275
        $collection = GtidFactory::makeCollectionFromString(Config::getGtid());
276
277
        $data = pack('l', 26 + $collection->getEncodedLength()) . chr(self::COM_BINLOG_DUMP_GTID);
278
        $data .= pack('S', 0);
279
        $data .= pack('I', Config::getSlaveId());
280
        $data .= pack('I', 3);
281
        $data .= chr(0);
282
        $data .= chr(0);
283
        $data .= chr(0);
284
        $data .= BinaryDataReader::pack64bit(4);
285
        $data .= pack('I', $collection->getEncodedLength());
286
        $data .= $collection->getEncoded();
287
288
        $this->socket->writeToSocket($data);
289
        $this->getResponse();
290
291
        $this->binLogCurrent->setGtid(Config::getGtid());
292
    }
293
294
    /**
295
     * @see https://dev.mysql.com/doc/internals/en/com-binlog-dump.html
296
     * @throws BinLogException
297
     * @throws \MySQLReplication\Socket\SocketException
298
     */
299 55
    private function setBinLogDump()
300
    {
301 55
        $binFilePos = Config::getBinLogPosition();
302 55
        $binFileName = Config::getBinLogFileName();
303 55
        if (0 === $binFilePos && '' === $binFileName) {
304 55
            $master = $this->repository->getMasterStatus();
305 55
            if ([] === $master) {
306
                throw new BinLogException(
307
                    MySQLReplicationException::BINLOG_NOT_ENABLED,
308
                    MySQLReplicationException::BINLOG_NOT_ENABLED_CODE
309
                );
310
            }
311 55
            $binFilePos = $master['Position'];
312 55
            $binFileName = $master['File'];
313 55
        }
314
315 55
        $data = pack('i', strlen($binFileName) + 11) . chr(self::COM_BINLOG_DUMP);
316 55
        $data .= pack('I', $binFilePos);
317 55
        $data .= pack('v', 0);
318 55
        $data .= pack('I', Config::getSlaveId());
319 55
        $data .= $binFileName;
320
321 55
        $this->socket->writeToSocket($data);
322 55
        $this->getResponse();
323
324 55
        $this->binLogCurrent->setBinLogPosition($binFilePos);
325 55
        $this->binLogCurrent->setBinFileName($binFileName);
326 55
    }
327
328
    /**
329
     * @return BinLogCurrent
330
     */
331 55
    public function getBinLogCurrent()
332
    {
333 55
        return $this->binLogCurrent;
334
    }
335
336
    /**
337
     * @return bool
338
     */
339 55
    public function getCheckSum()
340
    {
341 55
        return $this->checkSum;
342
    }
343
}
344