Completed
Pull Request — master (#34)
by kacper
04:27
created

BinLogSocketConnect::setBinLogDump()   B

Complexity

Conditions 4
Paths 3

Size

Total Lines 27
Code Lines 19

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 18
CRAP Score 4.0466

Importance

Changes 0
Metric Value
cc 4
eloc 19
nc 3
nop 0
dl 0
loc 27
ccs 18
cts 21
cp 0.8571
crap 4.0466
rs 8.5806
c 0
b 0
f 0
1
<?php
2
3
namespace MySQLReplication\BinLog;
4
5
use MySQLReplication\BinaryDataReader\BinaryDataReader;
6
use MySQLReplication\Config\Config;
7
use MySQLReplication\Definitions\ConstCapabilityFlags;
8
use MySQLReplication\Definitions\ConstCommand;
9
use MySQLReplication\Exception\MySQLReplicationException;
10
use MySQLReplication\Gtid\GtidException;
11
use MySQLReplication\Gtid\GtidFactory;
12
use MySQLReplication\Repository\RepositoryInterface;
13
use MySQLReplication\Socket\SocketInterface;
14
15
/**
16
 * Class BinLogSocketConnect
17
 * @package MySQLReplication\BinLog
18
 */
19
class BinLogSocketConnect
20
{
21
    /**
22
     * @var bool
23
     */
24
    private $checkSum = false;
25
    /**
26
     * @var RepositoryInterface
27
     */
28
    private $repository;
29
    /**
30
     * http://dev.mysql.com/doc/internals/en/auth-phase-fast-path.html 00 FE
31
     * @var array
32
     */
33
    private $packageOkHeader = [0, 254];
34
    /**
35
     * @var SocketInterface
36
     */
37
    private $socket;
38
    /**
39
     * 2^24 - 1 16m
40
     * @var int
41
     */
42
    private $binaryDataMaxLength = 16777215;
43
    /**
44
     * @var BinLogCurrent
45
     */
46
    private $binLogCurrent;
47
48
    /**
49
     * @param RepositoryInterface $repository
50
     * @param SocketInterface $socket
51
     * @throws BinLogException
52
     * @throws \MySQLReplication\Gtid\GtidException
53
     * @throws \MySQLReplication\Socket\SocketException
54
     */
55 54
    public function __construct(
56
        RepositoryInterface $repository,
57
        SocketInterface $socket
58
    ) {
59 54
        $this->repository = $repository;
60 54
        $this->socket = $socket;
61 54
        $this->binLogCurrent = new BinLogCurrent();
62
63 54
        $this->socket->connectToStream(Config::getHost(), Config::getPort());
64 54
        BinLogServerInfo::parsePackage($this->getResponse(false), $this->repository->getVersion());
65 54
        $this->authenticate();
66 54
        $this->getBinlogStream();
67 54
    }
68
69
    /**
70
     * @param bool $checkResponse
71
     * @return string
72
     * @throws \MySQLReplication\BinLog\BinLogException
73
     * @throws \MySQLReplication\Socket\SocketException
74
     */
75 54
    public function getResponse($checkResponse = true)
76
    {
77 54
        $header = $this->socket->readFromSocket(4);
78 54
        if ('' === $header) {
79
            return '';
80
        }
81 54
        $dataLength = unpack('L', $header[0] . $header[1] . $header[2] . chr(0))[1];
82
83 54
        $result = $this->socket->readFromSocket($dataLength);
84 54
        if (true === $checkResponse) {
85 54
            $this->isWriteSuccessful($result);
86 54
        }
87
88 54
        return $result;
89
    }
90
91
    /**
92
     * @param string $data
93
     * @throws BinLogException
94
     */
95 54
    private function isWriteSuccessful($data)
96
    {
97 54
        $head = ord($data[0]);
98 54
        if (!in_array($head, $this->packageOkHeader, true)) {
99
            $errorCode = unpack('v', $data[1] . $data[2])[1];
100
            $errorMessage = '';
101
            $packetLength = strlen($data);
102
            for ($i = 9; $i < $packetLength; ++$i) {
103
                $errorMessage .= $data[$i];
104
            }
105
106
            throw new BinLogException($errorMessage, $errorCode);
107
        }
108 54
    }
109
110
    /**
111
     * @throws BinLogException
112
     * @throws \MySQLReplication\Socket\SocketException
113
     * @link http://dev.mysql.com/doc/internals/en/secure-password-authentication.html#packet-Authentication::Native41
114
     */
115 54
    private function authenticate()
116
    {
117 54
        $data = pack('L', ConstCapabilityFlags::getCapabilities());
118 54
        $data .= pack('L', $this->binaryDataMaxLength);
119 54
        $data .= chr(33);
120 54
        for ($i = 0; $i < 23; $i++) {
121 54
            $data .= chr(0);
122 54
        }
123 54
        $result = sha1(Config::getPassword(), true) ^ sha1(
124 54
                BinLogServerInfo::getSalt() . sha1(sha1(Config::getPassword(), true), true), true
125 54
            );
126
127 54
        $data = $data . Config::getUser() . chr(0) . chr(strlen($result)) . $result;
128 54
        $str = pack('L', strlen($data));
129 54
        $s = $str[0] . $str[1] . $str[2];
130 54
        $data = $s . chr(1) . $data;
131
132 54
        $this->socket->writeToSocket($data);
133 54
        $this->getResponse();
134 54
    }
135
136
    /**
137
     * @throws BinLogException
138
     * @throws GtidException
139
     * @throws \MySQLReplication\Socket\SocketException
140
     */
141 54
    private function getBinlogStream()
142
    {
143 54
        $this->checkSum = $this->repository->isCheckSum();
144 54
        if ($this->checkSum) {
145 54
            $this->execute('SET @master_binlog_checksum = @@global.binlog_checksum');
146 54
        }
147
148 54
        if (0 !== Config::getHeartbeatPeriod()) {
149
            // master_heartbeat_period is in nanoseconds
150
            $this->execute('SET @master_heartbeat_period = ' . Config::getHeartbeatPeriod() * 1000000000);
151
        }
152
153 54
        $this->registerSlave();
154
155 54
        if ('' !== Config::getMariaDbGtid()) {
156
            $this->setBinLogDumpMariaGtid();
157
        }
158 54
        if ('' !== Config::getGtid()) {
159
            $this->setBinLogDumpGtid();
160
        } else {
161 54
            $this->setBinLogDump();
162
        }
163 54
    }
164
165
    /**
166
     * @param string $sql
167
     * @throws BinLogException
168
     * @throws \MySQLReplication\Socket\SocketException
169
     */
170 54
    private function execute($sql)
171
    {
172 54
        $this->socket->writeToSocket(pack('LC', strlen($sql) + 1, 0x03) . $sql);
173 54
        $this->getResponse();
174 54
    }
175
176
    /**
177
     * @see https://dev.mysql.com/doc/internals/en/com-register-slave.html
178
     * @throws BinLogException
179
     * @throws \MySQLReplication\Socket\SocketException
180
     */
181 54
    private function registerSlave()
182
    {
183 54
        $host = gethostname();
184 54
        $hostLength = strlen($host);
185 54
        $userLength = strlen(Config::getUser());
186 54
        $passLength = strlen(Config::getPassword());
187
188 54
        $data = pack('l', 18 + $hostLength + $userLength + $passLength);
189 54
        $data .= chr(ConstCommand::COM_REGISTER_SLAVE);
190 54
        $data .= pack('V', Config::getSlaveId());
191 54
        $data .= pack('C', $hostLength);
192 54
        $data .= $host;
193 54
        $data .= pack('C', $userLength);
194 54
        $data .= Config::getUser();
195 54
        $data .= pack('C', $passLength);
196 54
        $data .= Config::getPassword();
197 54
        $data .= pack('v', Config::getPort());
198 54
        $data .= pack('V', 0);
199 54
        $data .= pack('V', 0);
200
201 54
        $this->socket->writeToSocket($data);
202 54
        $this->getResponse();
203 54
    }
204
205
    /**
206
     * @throws \MySQLReplication\Socket\SocketException
207
     * @throws \MySQLReplication\BinLog\BinLogException
208
     */
209
    private function setBinLogDumpMariaGtid()
210
    {
211
        $this->execute('SET @mariadb_slave_capability = 4');
212
        $this->execute('SET @slave_connect_state = \'' . Config::getMariaDbGtid() . '\'');
213
        $this->execute('SET @slave_gtid_strict_mode = 0');
214
        $this->execute('SET @slave_gtid_ignore_duplicates = 0');
215
216
        $this->binLogCurrent->setMariaDbGtid(Config::getMariaDbGtid());
217
    }
218
219
    /**
220
     * @see https://dev.mysql.com/doc/internals/en/com-binlog-dump-gtid.html
221
     * @throws BinLogException
222
     * @throws GtidException
223
     * @throws \MySQLReplication\Socket\SocketException
224
     */
225
    private function setBinLogDumpGtid()
226
    {
227
        $collection = GtidFactory::makeCollectionFromString(Config::getGtid());
228
229
        $data = pack('l', 26 + $collection->getEncodedLength()) . chr(ConstCommand::COM_BINLOG_DUMP_GTID);
230
        $data .= pack('S', 0);
231
        $data .= pack('I', Config::getSlaveId());
232
        $data .= pack('I', 3);
233
        $data .= chr(0);
234
        $data .= chr(0);
235
        $data .= chr(0);
236
        $data .= BinaryDataReader::pack64bit(4);
237
        $data .= pack('I', $collection->getEncodedLength());
238
        $data .= $collection->getEncoded();
239
240
        $this->socket->writeToSocket($data);
241
        $this->getResponse();
242
243
        $this->binLogCurrent->setGtid(Config::getGtid());
244
    }
245
246
    /**
247
     * @see https://dev.mysql.com/doc/internals/en/com-binlog-dump.html
248
     * @throws BinLogException
249
     * @throws \MySQLReplication\Socket\SocketException
250
     */
251 54
    private function setBinLogDump()
252
    {
253 54
        $binFilePos = Config::getBinLogPosition();
254 54
        $binFileName = Config::getBinLogFileName();
255 54
        if (0 === $binFilePos && '' === $binFileName) {
256 54
            $master = $this->repository->getMasterStatus();
257 54
            if ([] === $master) {
258
                throw new BinLogException(
259
                    MySQLReplicationException::BINLOG_NOT_ENABLED,
260
                    MySQLReplicationException::BINLOG_NOT_ENABLED_CODE
261
                );
262
            }
263 54
            $binFilePos = $master['Position'];
264 54
            $binFileName = $master['File'];
265 54
        }
266
267 54
        $data = pack('i', strlen($binFileName) + 11) . chr(ConstCommand::COM_BINLOG_DUMP);
268 54
        $data .= pack('I', $binFilePos);
269 54
        $data .= pack('v', 0);
270 54
        $data .= pack('I', Config::getSlaveId());
271 54
        $data .= $binFileName;
272
273 54
        $this->socket->writeToSocket($data);
274 54
        $this->getResponse();
275
276 54
        $this->binLogCurrent->setBinLogPosition($binFilePos);
277 54
        $this->binLogCurrent->setBinFileName($binFileName);
278 54
    }
279
280
    /**
281
     * @return BinLogCurrent
282
     */
283 54
    public function getBinLogCurrent()
284
    {
285 54
        return $this->binLogCurrent;
286
    }
287
288
    /**
289
     * @return bool
290
     */
291 54
    public function getCheckSum()
292
    {
293 54
        return $this->checkSum;
294
    }
295
}
296