Passed
Push — master ( 80ac3a...2487f9 )
by kacper
05:49
created

BinLogSocketConnect::getCapabilities()   B

Complexity

Conditions 1
Paths 1

Size

Total Lines 25
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 8
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 7
nc 1
nop 0
dl 0
loc 25
ccs 8
cts 8
cp 1
crap 1
rs 8.8571
c 0
b 0
f 0
1
<?php
2
3
namespace MySQLReplication\BinLog;
4
5
use MySQLReplication\BinaryDataReader\BinaryDataReader;
6
use MySQLReplication\Config\Config;
7
use MySQLReplication\Exception\MySQLReplicationException;
8
use MySQLReplication\Gtid\GtidException;
9
use MySQLReplication\Gtid\GtidFactory;
10
use MySQLReplication\Repository\RepositoryInterface;
11
use MySQLReplication\Socket\SocketInterface;
12
13
/**
14
 * Class BinLogSocketConnect
15
 * @package MySQLReplication\BinLog
16
 */
17
class BinLogSocketConnect
18
{
19
    const COM_BINLOG_DUMP = 0x12;
20
    const COM_REGISTER_SLAVE = 0x15;
21
    const COM_BINLOG_DUMP_GTID = 0x1e;
22
23
    /**
24
     * @var bool
25
     */
26
    private $checkSum = false;
27
    /**
28
     * @var RepositoryInterface
29
     */
30
    private $repository;
31
    /**
32
     * http://dev.mysql.com/doc/internals/en/auth-phase-fast-path.html 00 FE
33
     * @var array
34
     */
35
    private $packageOkHeader = [0, 254];
36
    /**
37
     * @var SocketInterface
38
     */
39
    private $socket;
40
    /**
41
     * 2^24 - 1 16m
42
     * @var int
43
     */
44
    private $binaryDataMaxLength = 16777215;
45
    /**
46
     * @var BinLogCurrent
47
     */
48
    private $binLogCurrent;
49
50
    /**
51
     * @param RepositoryInterface $repository
52
     * @param SocketInterface $socket
53
     * @throws BinLogException
54
     * @throws \MySQLReplication\Gtid\GtidException
55
     * @throws \MySQLReplication\Socket\SocketException
56
     */
57 54
    public function __construct(
58
        RepositoryInterface $repository,
59
        SocketInterface $socket
60
    ) {
61 54
        $this->repository = $repository;
62 54
        $this->socket = $socket;
63 54
        $this->binLogCurrent = new BinLogCurrent();
64
65 54
        $this->socket->connectToStream(Config::getHost(), Config::getPort());
66 54
        BinLogServerInfo::parsePackage($this->getResponse(false), $this->repository->getVersion());
67 54
        $this->authenticate();
68 54
        $this->getBinlogStream();
69 54
    }
70
71
    /**
72
     * @param bool $checkResponse
73
     * @return string
74
     * @throws \MySQLReplication\BinLog\BinLogException
75
     * @throws \MySQLReplication\Socket\SocketException
76
     */
77 54
    public function getResponse($checkResponse = true)
78
    {
79 54
        $header = $this->socket->readFromSocket(4);
80 54
        if ('' === $header) {
81
            return '';
82
        }
83 54
        $dataLength = unpack('L', $header[0] . $header[1] . $header[2] . chr(0))[1];
84
85 54
        $result = $this->socket->readFromSocket($dataLength);
86 54
        if (true === $checkResponse) {
87 54
            $this->isWriteSuccessful($result);
88 54
        }
89
90 54
        return $result;
91
    }
92
93
    /**
94
     * @param string $data
95
     * @throws BinLogException
96
     */
97 54
    private function isWriteSuccessful($data)
98
    {
99 54
        $head = ord($data[0]);
100 54
        if (!in_array($head, $this->packageOkHeader, true)) {
101
            $errorCode = unpack('v', $data[1] . $data[2])[1];
102
            $errorMessage = '';
103
            $packetLength = strlen($data);
104
            for ($i = 9; $i < $packetLength; ++$i) {
105
                $errorMessage .= $data[$i];
106
            }
107
108
            throw new BinLogException($errorMessage, $errorCode);
109
        }
110 54
    }
111
112
    /**
113
     * @throws BinLogException
114
     * @throws \MySQLReplication\Socket\SocketException
115
     * @link http://dev.mysql.com/doc/internals/en/secure-password-authentication.html#packet-Authentication::Native41
116
     */
117 54
    private function authenticate()
118
    {
119 54
        $data = pack('L', self::getCapabilities());
120 54
        $data .= pack('L', $this->binaryDataMaxLength);
121 54
        $data .= chr(33);
122 54
        for ($i = 0; $i < 23; $i++) {
123 54
            $data .= chr(0);
124 54
        }
125 54
        $result = sha1(Config::getPassword(), true) ^ sha1(
126 54
                BinLogServerInfo::getSalt() . sha1(sha1(Config::getPassword(), true), true), true
127 54
            );
128
129 54
        $data = $data . Config::getUser() . chr(0) . chr(strlen($result)) . $result;
130 54
        $str = pack('L', strlen($data));
131 54
        $s = $str[0] . $str[1] . $str[2];
132 54
        $data = $s . chr(1) . $data;
133
134 54
        $this->socket->writeToSocket($data);
135 54
        $this->getResponse();
136 54
    }
137
138
    /**
139
     * http://dev.mysql.com/doc/internals/en/capability-flags.html#packet-protocol::capabilityflags
140
     * https://github.com/siddontang/mixer/blob/master/doc/protocol.txt
141
     * @return int
142
     */
143 54
    private static function getCapabilities()
144
    {
145
        /*
146
            Left only as information
147
            $foundRows = 1 << 1;
148
            $connectWithDb = 1 << 3;
149
            $compress = 1 << 5;
150
            $odbc = 1 << 6;
151
            $localFiles = 1 << 7;
152
            $ignoreSpace = 1 << 8;
153
            $multiStatements = 1 << 16;
154
            $multiResults = 1 << 17;
155
            $interactive = 1 << 10;
156
            $ssl = 1 << 11;
157
            $ignoreSigPipe = 1 << 12;
158
        */
159
160 54
        $noSchema = 1 << 4;
161 54
        $longPassword = 1;
162 54
        $longFlag = 1 << 2;
163 54
        $transactions = 1 << 13;
164 54
        $secureConnection = 1 << 15;
165 54
        $protocol41 = 1 << 9;
166
167 54
        return ($longPassword | $longFlag | $transactions | $protocol41 | $secureConnection | $noSchema);
168
    }
169
170
    /**
171
     * @throws BinLogException
172
     * @throws GtidException
173
     * @throws \MySQLReplication\Socket\SocketException
174
     */
175 54
    private function getBinlogStream()
176
    {
177 54
        $this->checkSum = $this->repository->isCheckSum();
178 54
        if ($this->checkSum) {
179 54
            $this->execute('SET @master_binlog_checksum = @@global.binlog_checksum');
180 54
        }
181
182 54
        if (0 !== Config::getHeartbeatPeriod()) {
183
            // master_heartbeat_period is in nanoseconds
184
            $this->execute('SET @master_heartbeat_period = ' . Config::getHeartbeatPeriod() * 1000000000);
185
        }
186
187 54
        $this->registerSlave();
188
189 54
        if ('' !== Config::getMariaDbGtid()) {
190
            $this->setBinLogDumpMariaGtid();
191
        }
192 54
        if ('' !== Config::getGtid()) {
193
            $this->setBinLogDumpGtid();
194
        } else {
195 54
            $this->setBinLogDump();
196
        }
197 54
    }
198
199
    /**
200
     * @param string $sql
201
     * @throws BinLogException
202
     * @throws \MySQLReplication\Socket\SocketException
203
     */
204 54
    private function execute($sql)
205
    {
206 54
        $this->socket->writeToSocket(pack('LC', strlen($sql) + 1, 0x03) . $sql);
207 54
        $this->getResponse();
208 54
    }
209
210
    /**
211
     * @see https://dev.mysql.com/doc/internals/en/com-register-slave.html
212
     * @throws BinLogException
213
     * @throws \MySQLReplication\Socket\SocketException
214
     */
215 54
    private function registerSlave()
216
    {
217 54
        $host = gethostname();
218 54
        $hostLength = strlen($host);
219 54
        $userLength = strlen(Config::getUser());
220 54
        $passLength = strlen(Config::getPassword());
221
222 54
        $data = pack('l', 18 + $hostLength + $userLength + $passLength);
223 54
        $data .= chr(self::COM_REGISTER_SLAVE);
224 54
        $data .= pack('V', Config::getSlaveId());
225 54
        $data .= pack('C', $hostLength);
226 54
        $data .= $host;
227 54
        $data .= pack('C', $userLength);
228 54
        $data .= Config::getUser();
229 54
        $data .= pack('C', $passLength);
230 54
        $data .= Config::getPassword();
231 54
        $data .= pack('v', Config::getPort());
232 54
        $data .= pack('V', 0);
233 54
        $data .= pack('V', 0);
234
235 54
        $this->socket->writeToSocket($data);
236 54
        $this->getResponse();
237 54
    }
238
239
    /**
240
     * @throws \MySQLReplication\Socket\SocketException
241
     * @throws \MySQLReplication\BinLog\BinLogException
242
     */
243
    private function setBinLogDumpMariaGtid()
244
    {
245
        $this->execute('SET @mariadb_slave_capability = 4');
246
        $this->execute('SET @slave_connect_state = \'' . Config::getMariaDbGtid() . '\'');
247
        $this->execute('SET @slave_gtid_strict_mode = 0');
248
        $this->execute('SET @slave_gtid_ignore_duplicates = 0');
249
250
        $this->binLogCurrent->setMariaDbGtid(Config::getMariaDbGtid());
251
    }
252
253
    /**
254
     * @see https://dev.mysql.com/doc/internals/en/com-binlog-dump-gtid.html
255
     * @throws BinLogException
256
     * @throws GtidException
257
     * @throws \MySQLReplication\Socket\SocketException
258
     */
259
    private function setBinLogDumpGtid()
260
    {
261
        $collection = GtidFactory::makeCollectionFromString(Config::getGtid());
262
263
        $data = pack('l', 26 + $collection->getEncodedLength()) . chr(self::COM_BINLOG_DUMP_GTID);
264
        $data .= pack('S', 0);
265
        $data .= pack('I', Config::getSlaveId());
266
        $data .= pack('I', 3);
267
        $data .= chr(0);
268
        $data .= chr(0);
269
        $data .= chr(0);
270
        $data .= BinaryDataReader::pack64bit(4);
271
        $data .= pack('I', $collection->getEncodedLength());
272
        $data .= $collection->getEncoded();
273
274
        $this->socket->writeToSocket($data);
275
        $this->getResponse();
276
277
        $this->binLogCurrent->setGtid(Config::getGtid());
278
    }
279
280
    /**
281
     * @see https://dev.mysql.com/doc/internals/en/com-binlog-dump.html
282
     * @throws BinLogException
283
     * @throws \MySQLReplication\Socket\SocketException
284
     */
285 54
    private function setBinLogDump()
286
    {
287 54
        $binFilePos = Config::getBinLogPosition();
288 54
        $binFileName = Config::getBinLogFileName();
289 54
        if (0 === $binFilePos && '' === $binFileName) {
290 54
            $master = $this->repository->getMasterStatus();
291 54
            if ([] === $master) {
292
                throw new BinLogException(
293
                    MySQLReplicationException::BINLOG_NOT_ENABLED,
294
                    MySQLReplicationException::BINLOG_NOT_ENABLED_CODE
295
                );
296
            }
297 54
            $binFilePos = $master['Position'];
298 54
            $binFileName = $master['File'];
299 54
        }
300
301 54
        $data = pack('i', strlen($binFileName) + 11) . chr(self::COM_BINLOG_DUMP);
302 54
        $data .= pack('I', $binFilePos);
303 54
        $data .= pack('v', 0);
304 54
        $data .= pack('I', Config::getSlaveId());
305 54
        $data .= $binFileName;
306
307 54
        $this->socket->writeToSocket($data);
308 54
        $this->getResponse();
309
310 54
        $this->binLogCurrent->setBinLogPosition($binFilePos);
311 54
        $this->binLogCurrent->setBinFileName($binFileName);
312 54
    }
313
314
    /**
315
     * @return BinLogCurrent
316
     */
317 54
    public function getBinLogCurrent()
318
    {
319 54
        return $this->binLogCurrent;
320
    }
321
322
    /**
323
     * @return bool
324
     */
325 54
    public function getCheckSum()
326
    {
327 54
        return $this->checkSum;
328
    }
329
}
330