Passed
Push — master ( 180e3d...cef01d )
by kacper
03:39
created

BinLogSocketConnect::isWriteSuccessful()   A

Complexity

Conditions 4
Paths 4

Size

Total Lines 15
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 8.7414

Importance

Changes 0
Metric Value
cc 4
eloc 10
nc 4
nop 2
dl 0
loc 15
ccs 4
cts 12
cp 0.3333
crap 8.7414
rs 9.9332
c 0
b 0
f 0
1
<?php
2
3
namespace MySQLReplication\BinLog;
4
5
use MySQLReplication\BinaryDataReader\BinaryDataReader;
6
use MySQLReplication\Config\Config;
7
use MySQLReplication\Exception\MySQLReplicationException;
8
use MySQLReplication\Gtid\GtidException;
9
use MySQLReplication\Gtid\GtidFactory;
10
use MySQLReplication\Repository\RepositoryInterface;
11
use MySQLReplication\Socket\SocketInterface;
12
13
/**
14
 * Class BinLogSocketConnect
15
 * @package MySQLReplication\BinLog
16
 */
17
class BinLogSocketConnect
18
{
19
    const COM_BINLOG_DUMP = 0x12;
20
    const COM_REGISTER_SLAVE = 0x15;
21
    const COM_BINLOG_DUMP_GTID = 0x1e;
22
23
    /**
24
     * @var bool
25
     */
26
    private $checkSum = false;
27
    /**
28
     * @var RepositoryInterface
29
     */
30
    private $repository;
31
    /**
32
     * http://dev.mysql.com/doc/internals/en/auth-phase-fast-path.html 00 FE
33
     * @var array
34
     */
35
    private $packageOkHeader = [0, 254];
36
    /**
37
     * @var SocketInterface
38
     */
39
    private $socket;
40
    /**
41
     * 2^24 - 1 16m
42
     * @var int
43
     */
44
    private $binaryDataMaxLength = 16777215;
45
    /**
46
     * @var BinLogCurrent
47
     */
48
    private $binLogCurrent;
49
50
    /**
51
     * @param RepositoryInterface $repository
52
     * @param SocketInterface $socket
53
     * @throws BinLogException
54
     * @throws \MySQLReplication\Gtid\GtidException
55
     * @throws \MySQLReplication\Socket\SocketException
56
     */
57 55
    public function __construct(
58
        RepositoryInterface $repository,
59
        SocketInterface $socket
60
    ) {
61 55
        $this->repository = $repository;
62 55
        $this->socket = $socket;
63 55
        $this->binLogCurrent = new BinLogCurrent();
64
65 55
        $this->socket->connectToStream(Config::getHost(), Config::getPort());
66 55
        BinLogServerInfo::parsePackage($this->getResponse(false), $this->repository->getVersion());
67 55
        $this->authenticate();
68 55
        $this->getBinlogStream();
69 55
    }
70
71
    /**
72
     * @param bool $checkResponse
73
     * @return string
74
     * @throws \MySQLReplication\BinLog\BinLogException
75
     * @throws \MySQLReplication\Socket\SocketException
76
     */
77 55
    public function getResponse($checkResponse = true)
78
    {
79 55
        $header = $this->socket->readFromSocket(4);
80 55
        if ('' === $header) {
81
            return '';
82
        }
83 55
        $dataLength = unpack('L', $header[0] . $header[1] . $header[2] . chr(0))[1];
84 55
        $isMaxDataLength = $dataLength === $this->binaryDataMaxLength;
85
86 55
        $result = $this->socket->readFromSocket($dataLength);
87 55
        if (true === $checkResponse) {
88 55
            $this->isWriteSuccessful($result, $isMaxDataLength);
89 55
        }
90
91
        //https://dev.mysql.com/doc/internals/en/sending-more-than-16mbyte.html
92 55
        while ($isMaxDataLength) {
93
            $header = $this->socket->readFromSocket(4);
94
            if ('' === $header) {
95
                return $result;
96
            }
97
            $dataLength = unpack('L', $header[0] . $header[1] . $header[2] . chr(0))[1];
98
            $isMaxDataLength = $dataLength === $this->binaryDataMaxLength;
99
100
            $next_result = $this->socket->readFromSocket($dataLength);
101
            if (true === $checkResponse) {
102
                $this->isWriteSuccessful($next_result, $isMaxDataLength);
103
            }
104
            $result .= $next_result;
105
        }
106
107 55
        return $result;
108
    }
109
110
    /**
111
     * @param string $data
112
     * @param bool   $isMaxDataLength
113
     *
114
     * @throws BinLogException
115
     */
116 55
    private function isWriteSuccessful($data, $isMaxDataLength = false)
117
    {
118 55
        if ($isMaxDataLength) {
119
            return;
120
        }
121 55
        $head = ord($data[0]);
122 55
        if (!in_array($head, $this->packageOkHeader, true)) {
123
            $errorCode = unpack('v', $data[1] . $data[2])[1];
124
            $errorMessage = '';
125
            $packetLength = strlen($data);
126
            for ($i = 9; $i < $packetLength; ++$i) {
127
                $errorMessage .= $data[$i];
128
            }
129
130
            throw new BinLogException($errorMessage, $errorCode);
131
        }
132 55
    }
133
134
    /**
135
     * @throws BinLogException
136
     * @throws \MySQLReplication\Socket\SocketException
137
     * @link http://dev.mysql.com/doc/internals/en/secure-password-authentication.html#packet-Authentication::Native41
138
     */
139 55
    private function authenticate()
140
    {
141 55
        $data = pack('L', self::getCapabilities());
142 55
        $data .= pack('L', $this->binaryDataMaxLength);
143 55
        $data .= chr(33);
144 55
        for ($i = 0; $i < 23; $i++) {
145 55
            $data .= chr(0);
146 55
        }
147 55
        $result = sha1(Config::getPassword(), true) ^ sha1(
148 55
                BinLogServerInfo::getSalt() . sha1(sha1(Config::getPassword(), true), true), true
149 55
            );
150
151 55
        $data = $data . Config::getUser() . chr(0) . chr(strlen($result)) . $result;
152 55
        $str = pack('L', strlen($data));
153 55
        $s = $str[0] . $str[1] . $str[2];
154 55
        $data = $s . chr(1) . $data;
155
156 55
        $this->socket->writeToSocket($data);
157 55
        $this->getResponse();
158 55
    }
159
160
    /**
161
     * http://dev.mysql.com/doc/internals/en/capability-flags.html#packet-protocol::capabilityflags
162
     * https://github.com/siddontang/mixer/blob/master/doc/protocol.txt
163
     * @return int
164
     */
165 55
    private static function getCapabilities()
166
    {
167
        /*
168
            Left only as information
169
            $foundRows = 1 << 1;
170
            $connectWithDb = 1 << 3;
171
            $compress = 1 << 5;
172
            $odbc = 1 << 6;
173
            $localFiles = 1 << 7;
174
            $ignoreSpace = 1 << 8;
175
            $multiStatements = 1 << 16;
176
            $multiResults = 1 << 17;
177
            $interactive = 1 << 10;
178
            $ssl = 1 << 11;
179
            $ignoreSigPipe = 1 << 12;
180
        */
181
182 55
        $noSchema = 1 << 4;
183 55
        $longPassword = 1;
184 55
        $longFlag = 1 << 2;
185 55
        $transactions = 1 << 13;
186 55
        $secureConnection = 1 << 15;
187 55
        $protocol41 = 1 << 9;
188
189 55
        return ($longPassword | $longFlag | $transactions | $protocol41 | $secureConnection | $noSchema);
190
    }
191
192
    /**
193
     * @throws BinLogException
194
     * @throws GtidException
195
     * @throws \MySQLReplication\Socket\SocketException
196
     */
197 55
    private function getBinlogStream()
198
    {
199 55
        $this->checkSum = $this->repository->isCheckSum();
200 55
        if ($this->checkSum) {
201 55
            $this->execute('SET @master_binlog_checksum = @@global.binlog_checksum');
202 55
        }
203
204 55
        if (0 !== Config::getHeartbeatPeriod()) {
205
            // master_heartbeat_period is in nanoseconds
206
            $this->execute('SET @master_heartbeat_period = ' . Config::getHeartbeatPeriod() * 1000000000);
207
        }
208
209 55
        $this->registerSlave();
210
211 55
        if ('' !== Config::getMariaDbGtid()) {
212
            $this->setBinLogDumpMariaGtid();
213
        }
214 55
        if ('' !== Config::getGtid()) {
215
            $this->setBinLogDumpGtid();
216
        } else {
217 55
            $this->setBinLogDump();
218
        }
219 55
    }
220
221
    /**
222
     * @param string $sql
223
     * @throws BinLogException
224
     * @throws \MySQLReplication\Socket\SocketException
225
     */
226 55
    private function execute($sql)
227
    {
228 55
        $this->socket->writeToSocket(pack('LC', strlen($sql) + 1, 0x03) . $sql);
229 55
        $this->getResponse();
230 55
    }
231
232
    /**
233
     * @see https://dev.mysql.com/doc/internals/en/com-register-slave.html
234
     * @throws BinLogException
235
     * @throws \MySQLReplication\Socket\SocketException
236
     */
237 55
    private function registerSlave()
238
    {
239 55
        $host = gethostname();
240 55
        $hostLength = strlen($host);
241 55
        $userLength = strlen(Config::getUser());
242 55
        $passLength = strlen(Config::getPassword());
243
244 55
        $data = pack('l', 18 + $hostLength + $userLength + $passLength);
245 55
        $data .= chr(self::COM_REGISTER_SLAVE);
246 55
        $data .= pack('V', Config::getSlaveId());
247 55
        $data .= pack('C', $hostLength);
248 55
        $data .= $host;
249 55
        $data .= pack('C', $userLength);
250 55
        $data .= Config::getUser();
251 55
        $data .= pack('C', $passLength);
252 55
        $data .= Config::getPassword();
253 55
        $data .= pack('v', Config::getPort());
254 55
        $data .= pack('V', 0);
255 55
        $data .= pack('V', 0);
256
257 55
        $this->socket->writeToSocket($data);
258 55
        $this->getResponse();
259 55
    }
260
261
    /**
262
     * @throws \MySQLReplication\Socket\SocketException
263
     * @throws \MySQLReplication\BinLog\BinLogException
264
     */
265
    private function setBinLogDumpMariaGtid()
266
    {
267
        $this->execute('SET @mariadb_slave_capability = 4');
268
        $this->execute('SET @slave_connect_state = \'' . Config::getMariaDbGtid() . '\'');
269
        $this->execute('SET @slave_gtid_strict_mode = 0');
270
        $this->execute('SET @slave_gtid_ignore_duplicates = 0');
271
272
        $this->binLogCurrent->setMariaDbGtid(Config::getMariaDbGtid());
273
    }
274
275
    /**
276
     * @see https://dev.mysql.com/doc/internals/en/com-binlog-dump-gtid.html
277
     * @throws BinLogException
278
     * @throws GtidException
279
     * @throws \MySQLReplication\Socket\SocketException
280
     */
281
    private function setBinLogDumpGtid()
282
    {
283
        $collection = GtidFactory::makeCollectionFromString(Config::getGtid());
284
285
        $data = pack('l', 26 + $collection->getEncodedLength()) . chr(self::COM_BINLOG_DUMP_GTID);
286
        $data .= pack('S', 0);
287
        $data .= pack('I', Config::getSlaveId());
288
        $data .= pack('I', 3);
289
        $data .= chr(0);
290
        $data .= chr(0);
291
        $data .= chr(0);
292
        $data .= BinaryDataReader::pack64bit(4);
293
        $data .= pack('I', $collection->getEncodedLength());
294
        $data .= $collection->getEncoded();
295
296
        $this->socket->writeToSocket($data);
297
        $this->getResponse();
298
299
        $this->binLogCurrent->setGtid(Config::getGtid());
300
    }
301
302
    /**
303
     * @see https://dev.mysql.com/doc/internals/en/com-binlog-dump.html
304
     * @throws BinLogException
305
     * @throws \MySQLReplication\Socket\SocketException
306
     */
307 55
    private function setBinLogDump()
308
    {
309 55
        $binFilePos = Config::getBinLogPosition();
310 55
        $binFileName = Config::getBinLogFileName();
311 55
        if (0 === $binFilePos && '' === $binFileName) {
312 55
            $master = $this->repository->getMasterStatus();
313 55
            if ([] === $master) {
314
                throw new BinLogException(
315
                    MySQLReplicationException::BINLOG_NOT_ENABLED,
316
                    MySQLReplicationException::BINLOG_NOT_ENABLED_CODE
317
                );
318
            }
319 55
            $binFilePos = $master['Position'];
320 55
            $binFileName = $master['File'];
321 55
        }
322
323 55
        $data = pack('i', strlen($binFileName) + 11) . chr(self::COM_BINLOG_DUMP);
324 55
        $data .= pack('I', $binFilePos);
325 55
        $data .= pack('v', 0);
326 55
        $data .= pack('I', Config::getSlaveId());
327 55
        $data .= $binFileName;
328
329 55
        $this->socket->writeToSocket($data);
330 55
        $this->getResponse();
331
332 55
        $this->binLogCurrent->setBinLogPosition($binFilePos);
333 55
        $this->binLogCurrent->setBinFileName($binFileName);
334 55
    }
335
336
    /**
337
     * @return BinLogCurrent
338
     */
339 55
    public function getBinLogCurrent()
340
    {
341 55
        return $this->binLogCurrent;
342
    }
343
344
    /**
345
     * @return bool
346
     */
347 55
    public function getCheckSum()
348
    {
349 55
        return $this->checkSum;
350
    }
351
}
352