Passed
Push — master ( e1f7c1...81f74c )
by kacper
04:36
created

BinLogSocketConnect::execute()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 2
nc 1
nop 1
dl 0
loc 4
ccs 3
cts 3
cp 1
crap 1
rs 10
c 0
b 0
f 0
1
<?php
2
3
namespace MySQLReplication\BinLog;
4
5
use MySQLReplication\BinaryDataReader\BinaryDataReader;
6
use MySQLReplication\Config\Config;
7
use MySQLReplication\Exception\MySQLReplicationException;
8
use MySQLReplication\Gtid\GtidException;
9
use MySQLReplication\Gtid\GtidFactory;
10
use MySQLReplication\Repository\RepositoryInterface;
11
use MySQLReplication\Socket\SocketInterface;
12
13
/**
14
 * Class BinLogSocketConnect
15
 * @package MySQLReplication\BinLog
16
 */
17
class BinLogSocketConnect
18
{
19
    const COM_BINLOG_DUMP = 0x12;
20
    const COM_REGISTER_SLAVE = 0x15;
21
    const COM_BINLOG_DUMP_GTID = 0x1e;
22
23
    /**
24
     * @var bool
25
     */
26
    private $checkSum = false;
27
    /**
28
     * @var RepositoryInterface
29
     */
30
    private $repository;
31
    /**
32
     * http://dev.mysql.com/doc/internals/en/auth-phase-fast-path.html 00 FE
33
     * @var array
34
     */
35
    private $packageOkHeader = [0, 254];
36
    /**
37
     * @var SocketInterface
38
     */
39
    private $socket;
40
    /**
41
     * 2^24 - 1 16m
42
     * @var int
43
     */
44
    private $binaryDataMaxLength = 16777215;
45
    /**
46
     * @var BinLogCurrent
47
     */
48
    private $binLogCurrent;
49
50
    /**
51
     * @param RepositoryInterface $repository
52
     * @param SocketInterface $socket
53
     * @throws BinLogException
54
     * @throws \MySQLReplication\Gtid\GtidException
55
     * @throws \MySQLReplication\Socket\SocketException
56
     */
57 56
    public function __construct(
58
        RepositoryInterface $repository,
59
        SocketInterface $socket
60
    ) {
61 56
        $this->repository = $repository;
62 56
        $this->socket = $socket;
63 56
        $this->binLogCurrent = new BinLogCurrent();
64
65 56
        $this->socket->connectToStream(Config::getHost(), Config::getPort());
66 56
        BinLogServerInfo::parsePackage($this->getResponse(false), $this->repository->getVersion());
67 56
        $this->authenticate();
68 56
        $this->getBinlogStream();
69 56
    }
70
71
    /**
72
     * @param bool $checkResponse
73
     * @return string
74
     * @throws \MySQLReplication\BinLog\BinLogException
75
     * @throws \MySQLReplication\Socket\SocketException
76
     */
77 56
    public function getResponse($checkResponse = true)
78
    {
79 56
        $header = $this->socket->readFromSocket(4);
80 56
        if ('' === $header) {
81
            return '';
82
        }
83 56
        $dataLength = unpack('L', $header[0] . $header[1] . $header[2] . chr(0))[1];
84 56
        $isMaxDataLength = $dataLength === $this->binaryDataMaxLength;
85
86 56
        $result = $this->socket->readFromSocket($dataLength);
87 56
        if (true === $checkResponse) {
88 56
            $this->isWriteSuccessful($result);
89 56
        }
90
91
        //https://dev.mysql.com/doc/internals/en/sending-more-than-16mbyte.html
92 56
        while ($isMaxDataLength) {
93 1
            $header = $this->socket->readFromSocket(4);
94 1
            if ('' === $header) {
95
                return $result;
96
            }
97 1
            $dataLength = unpack('L', $header[0] . $header[1] . $header[2] . chr(0))[1];
98 1
            $isMaxDataLength = $dataLength === $this->binaryDataMaxLength;
99 1
            $next_result = $this->socket->readFromSocket($dataLength);
100 1
            $result .= $next_result;
101 1
        }
102
103 56
        return $result;
104
    }
105
106
    /**
107
     * @param string $data
108
     *
109
     * @throws BinLogException
110
     */
111 56
    private function isWriteSuccessful($data)
112
    {
113 56
        $head = ord($data[0]);
114 56
        if (!in_array($head, $this->packageOkHeader, true)) {
115
            $errorCode = unpack('v', $data[1] . $data[2])[1];
116
            $errorMessage = '';
117
            $packetLength = strlen($data);
118
            for ($i = 9; $i < $packetLength; ++$i) {
119
                $errorMessage .= $data[$i];
120
            }
121
122
            throw new BinLogException($errorMessage, $errorCode);
123
        }
124 56
    }
125
126
    /**
127
     * @throws BinLogException
128
     * @throws \MySQLReplication\Socket\SocketException
129
     * @link http://dev.mysql.com/doc/internals/en/secure-password-authentication.html#packet-Authentication::Native41
130
     */
131 56
    private function authenticate()
132
    {
133 56
        $data = pack('L', self::getCapabilities());
134 56
        $data .= pack('L', $this->binaryDataMaxLength);
135 56
        $data .= chr(33);
136 56
        for ($i = 0; $i < 23; $i++) {
137 56
            $data .= chr(0);
138 56
        }
139 56
        $result = sha1(Config::getPassword(), true) ^ sha1(
140 56
                BinLogServerInfo::getSalt() . sha1(sha1(Config::getPassword(), true), true), true
141 56
            );
142
143 56
        $data = $data . Config::getUser() . chr(0) . chr(strlen($result)) . $result;
144 56
        $str = pack('L', strlen($data));
145 56
        $s = $str[0] . $str[1] . $str[2];
146 56
        $data = $s . chr(1) . $data;
147
148 56
        $this->socket->writeToSocket($data);
149 56
        $this->getResponse();
150 56
    }
151
152
    /**
153
     * http://dev.mysql.com/doc/internals/en/capability-flags.html#packet-protocol::capabilityflags
154
     * https://github.com/siddontang/mixer/blob/master/doc/protocol.txt
155
     * @return int
156
     */
157 56
    private static function getCapabilities()
158
    {
159
        /*
0 ignored issues
show
Unused Code Comprehensibility introduced by
36% of this comment could be valid code. Did you maybe forget this after debugging?

Sometimes obsolete code just ends up commented out instead of removed. In this case it is better to remove the code once you have checked you do not need it.

The code might also have been commented out for debugging purposes. In this case it is vital that someone uncomments it again or your project may behave in very unexpected ways in production.

This check looks for comments that seem to be mostly valid code and reports them.

Loading history...
160
            Left only as information
161
            $foundRows = 1 << 1;
162
            $connectWithDb = 1 << 3;
163
            $compress = 1 << 5;
164
            $odbc = 1 << 6;
165
            $localFiles = 1 << 7;
166
            $ignoreSpace = 1 << 8;
167
            $multiStatements = 1 << 16;
168
            $multiResults = 1 << 17;
169
            $interactive = 1 << 10;
170
            $ssl = 1 << 11;
171
            $ignoreSigPipe = 1 << 12;
172
        */
173
174 56
        $noSchema = 1 << 4;
175 56
        $longPassword = 1;
176 56
        $longFlag = 1 << 2;
177 56
        $transactions = 1 << 13;
178 56
        $secureConnection = 1 << 15;
179 56
        $protocol41 = 1 << 9;
180
181 56
        return ($longPassword | $longFlag | $transactions | $protocol41 | $secureConnection | $noSchema);
182
    }
183
184
    /**
185
     * @throws BinLogException
186
     * @throws GtidException
187
     * @throws \MySQLReplication\Socket\SocketException
188
     */
189 56
    private function getBinlogStream()
190
    {
191 56
        $this->checkSum = $this->repository->isCheckSum();
192 56
        if ($this->checkSum) {
193 56
            $this->execute('SET @master_binlog_checksum = @@global.binlog_checksum');
194 56
        }
195
196 56
        if (0 !== Config::getHeartbeatPeriod()) {
197
            // master_heartbeat_period is in nanoseconds
198
            $this->execute('SET @master_heartbeat_period = ' . Config::getHeartbeatPeriod() * 1000000000);
199
        }
200
201 56
        $this->registerSlave();
202
203 56
        if ('' !== Config::getMariaDbGtid()) {
204
            $this->setBinLogDumpMariaGtid();
205
        }
206 56
        if ('' !== Config::getGtid()) {
207
            $this->setBinLogDumpGtid();
208
        } else {
209 56
            $this->setBinLogDump();
210
        }
211 56
    }
212
213
    /**
214
     * @param string $sql
215
     * @throws BinLogException
216
     * @throws \MySQLReplication\Socket\SocketException
217
     */
218 56
    private function execute($sql)
219
    {
220 56
        $this->socket->writeToSocket(pack('LC', strlen($sql) + 1, 0x03) . $sql);
221 56
        $this->getResponse();
222 56
    }
223
224
    /**
225
     * @see https://dev.mysql.com/doc/internals/en/com-register-slave.html
226
     * @throws BinLogException
227
     * @throws \MySQLReplication\Socket\SocketException
228
     */
229 56
    private function registerSlave()
230
    {
231 56
        $host = gethostname();
232 56
        $hostLength = strlen($host);
233 56
        $userLength = strlen(Config::getUser());
234 56
        $passLength = strlen(Config::getPassword());
235
236 56
        $data = pack('l', 18 + $hostLength + $userLength + $passLength);
237 56
        $data .= chr(self::COM_REGISTER_SLAVE);
238 56
        $data .= pack('V', Config::getSlaveId());
239 56
        $data .= pack('C', $hostLength);
240 56
        $data .= $host;
241 56
        $data .= pack('C', $userLength);
242 56
        $data .= Config::getUser();
243 56
        $data .= pack('C', $passLength);
244 56
        $data .= Config::getPassword();
245 56
        $data .= pack('v', Config::getPort());
246 56
        $data .= pack('V', 0);
247 56
        $data .= pack('V', 0);
248
249 56
        $this->socket->writeToSocket($data);
250 56
        $this->getResponse();
251 56
    }
252
253
    /**
254
     * @throws \MySQLReplication\Socket\SocketException
255
     * @throws \MySQLReplication\BinLog\BinLogException
256
     */
257
    private function setBinLogDumpMariaGtid()
258
    {
259
        $this->execute('SET @mariadb_slave_capability = 4');
260
        $this->execute('SET @slave_connect_state = \'' . Config::getMariaDbGtid() . '\'');
261
        $this->execute('SET @slave_gtid_strict_mode = 0');
262
        $this->execute('SET @slave_gtid_ignore_duplicates = 0');
263
264
        $this->binLogCurrent->setMariaDbGtid(Config::getMariaDbGtid());
265
    }
266
267
    /**
268
     * @see https://dev.mysql.com/doc/internals/en/com-binlog-dump-gtid.html
269
     * @throws BinLogException
270
     * @throws GtidException
271
     * @throws \MySQLReplication\Socket\SocketException
272
     */
273
    private function setBinLogDumpGtid()
274
    {
275
        $collection = GtidFactory::makeCollectionFromString(Config::getGtid());
276
277
        $data = pack('l', 26 + $collection->getEncodedLength()) . chr(self::COM_BINLOG_DUMP_GTID);
278
        $data .= pack('S', 0);
279
        $data .= pack('I', Config::getSlaveId());
280
        $data .= pack('I', 3);
281
        $data .= chr(0);
282
        $data .= chr(0);
283
        $data .= chr(0);
284
        $data .= BinaryDataReader::pack64bit(4);
285
        $data .= pack('I', $collection->getEncodedLength());
286
        $data .= $collection->getEncoded();
287
288
        $this->socket->writeToSocket($data);
289
        $this->getResponse();
290
291
        $this->binLogCurrent->setGtid(Config::getGtid());
292
    }
293
294
    /**
295
     * @see https://dev.mysql.com/doc/internals/en/com-binlog-dump.html
296
     * @throws BinLogException
297
     * @throws \MySQLReplication\Socket\SocketException
298
     */
299 56
    private function setBinLogDump()
300
    {
301 56
        $binFilePos = Config::getBinLogPosition();
302 56
        $binFileName = Config::getBinLogFileName();
303 56
        if (0 === $binFilePos && '' === $binFileName) {
304 56
            $master = $this->repository->getMasterStatus();
305 56
            if ([] === $master) {
306
                throw new BinLogException(
307
                    MySQLReplicationException::BINLOG_NOT_ENABLED,
308
                    MySQLReplicationException::BINLOG_NOT_ENABLED_CODE
309
                );
310
            }
311 56
            $binFilePos = $master['Position'];
312 56
            $binFileName = $master['File'];
313 56
        }
314
315 56
        $data = pack('i', strlen($binFileName) + 11) . chr(self::COM_BINLOG_DUMP);
316 56
        $data .= pack('I', $binFilePos);
317 56
        $data .= pack('v', 0);
318 56
        $data .= pack('I', Config::getSlaveId());
319 56
        $data .= $binFileName;
320
321 56
        $this->socket->writeToSocket($data);
322 56
        $this->getResponse();
323
324 56
        $this->binLogCurrent->setBinLogPosition($binFilePos);
325 56
        $this->binLogCurrent->setBinFileName($binFileName);
326 56
    }
327
328
    /**
329
     * @return BinLogCurrent
330
     */
331 56
    public function getBinLogCurrent()
332
    {
333 56
        return $this->binLogCurrent;
334
    }
335
336
    /**
337
     * @return bool
338
     */
339 56
    public function getCheckSum()
340
    {
341 56
        return $this->checkSum;
342
    }
343
}
344