BinLogSocketConnect::connect()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 9
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 8
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 6
c 0
b 0
f 0
nc 1
nop 0
dl 0
loc 9
ccs 8
cts 8
cp 1
crap 1
rs 10
1
<?php
2
declare(strict_types=1);
3
4
namespace MySQLReplication\BinLog;
5
6
use MySQLReplication\BinaryDataReader\BinaryDataReader;
7
use MySQLReplication\Config\Config;
8
use MySQLReplication\Gtid\GtidCollection;
9
use MySQLReplication\Gtid\GtidException;
10
use MySQLReplication\Repository\RepositoryInterface;
11
use MySQLReplication\Socket\SocketException;
12
use MySQLReplication\Socket\SocketInterface;
13
14
class BinLogSocketConnect
15
{
16
    private const COM_BINLOG_DUMP = 0x12;
17
    private const COM_REGISTER_SLAVE = 0x15;
18
    private const COM_BINLOG_DUMP_GTID = 0x1e;
19
20
    /**
21
     * http://dev.mysql.com/doc/internals/en/auth-phase-fast-path.html 00 FE
22
     */
23
    private $packageOkHeader = [0, 254];
24
    private $binaryDataMaxLength = 16777215;
25
    private $checkSum = false;
26
27
    private $repository;
28
    private $socket;
29
    private $binLogCurrent;
30
31
    /**
32
     * @var Config
33
     */
34
    private $config;
35
    /**
36
     * @var BinLogServerInfo
37
     */
38
    private $binLogServerInfo;
39
40
    /**
41
     * @throws BinLogException
42
     * @throws GtidException
43
     * @throws SocketException
44
     */
45 58
    public function __construct(
46
        Config $config,
47
        RepositoryInterface $repository,
48
        SocketInterface $socket
49
    ) {
50 58
        $this->config = $config;
51 58
        $this->repository = $repository;
52 58
        $this->socket = $socket;
53 58
        $this->binLogCurrent = new BinLogCurrent();
54
    }
55
56
    public function isConnected(): bool
57
    {
58
        return $this->socket->isConnected();
59
    }
60
61 58
    public function connect(): void
62
    {
63 58
        $this->socket->connectToStream($this->config->getHost(), $this->config->getPort());
64 58
        $this->binLogServerInfo = BinLogServerInfo::parsePackage(
65 58
            $this->getResponse(false),
66 58
            $this->repository->getVersion()
67 58
        );
68 58
        $this->authenticate();
69 58
        $this->getBinlogStream();
70
    }
71
72 58
    public function getBinLogServerInfo(): BinLogServerInfo
73
    {
74 58
        return $this->binLogServerInfo;
75
    }
76
77
    /**
78
     * @throws BinLogException
79
     * @throws SocketException
80
     */
81 58
    public function getResponse(bool $checkResponse = true): string
82
    {
83 58
        $header = $this->socket->readFromSocket(4);
84 58
        if ('' === $header) {
85
            return '';
86
        }
87 58
        $dataLength = unpack('L', $header[0] . $header[1] . $header[2] . chr(0))[1];
88 58
        $isMaxDataLength = $dataLength === $this->binaryDataMaxLength;
89
90 58
        $result = $this->socket->readFromSocket($dataLength);
91 58
        if (true === $checkResponse) {
92 58
            $this->isWriteSuccessful($result);
93
        }
94
95
        // https://dev.mysql.com/doc/internals/en/sending-more-than-16mbyte.html
96 58
        while ($isMaxDataLength) {
97
            $header = $this->socket->readFromSocket(4);
98
            if ('' === $header) {
99
                return $result;
100
            }
101
            $dataLength = unpack('L', $header[0] . $header[1] . $header[2] . chr(0))[1];
102
            $isMaxDataLength = $dataLength === $this->binaryDataMaxLength;
103
            $nextResult = $this->socket->readFromSocket($dataLength);
104
            $result .= $nextResult;
105
        }
106
107 58
        return $result;
108
    }
109
110
    /**
111
     * @throws BinLogException
112
     */
113 58
    private function isWriteSuccessful(string $data): void
114
    {
115 58
        $head = ord($data[0]);
116 58
        if (! in_array($head, $this->packageOkHeader, true)) {
117
            $errorCode = unpack('v', $data[1] . $data[2])[1];
118
            $errorMessage = '';
119
            $packetLength = strlen($data);
120
            for ($i = 9; $i < $packetLength; ++$i) {
121
                $errorMessage .= $data[$i];
122
            }
123
124
            throw new BinLogException($errorMessage, $errorCode);
125
        }
126
    }
127
128
    /**
129
     * @throws BinLogException
130
     * @throws SocketException
131
     * @link http://dev.mysql.com/doc/internals/en/secure-password-authentication.html#packet-Authentication::Native41
132
     */
133 58
    private function authenticate(): void
134
    {
135 58
        $data = pack('L', self::getCapabilities());
136 58
        $data .= pack('L', $this->binaryDataMaxLength);
137 58
        $data .= chr(33);
138 58
        for ($i = 0; $i < 23; ++$i) {
139 58
            $data .= chr(0);
140
        }
141 58
        $result = sha1($this->config->getPassword(), true) ^ sha1(
142 58
            $this->binLogServerInfo->getSalt() . sha1(sha1($this->config->getPassword(), true), true),
143 58
            true
144 58
        );
145
146 58
        $data = $data . $this->config->getUser() . chr(0) . chr(strlen($result)) . $result;
147 58
        $str = pack('L', strlen($data));
148 58
        $s = $str[0] . $str[1] . $str[2];
149 58
        $data = $s . chr(1) . $data;
150
151 58
        $this->socket->writeToSocket($data);
152 58
        $this->getResponse();
153
    }
154
155
    /**
156
     * http://dev.mysql.com/doc/internals/en/capability-flags.html#packet-protocol::capabilityflags
157
     * https://github.com/siddontang/mixer/blob/master/doc/protocol.txt
158
     */
159 58
    private static function getCapabilities(): int
160
    {
161 58
        $noSchema = 1 << 4;
162 58
        $longPassword = 1;
163 58
        $longFlag = 1 << 2;
164 58
        $transactions = 1 << 13;
165 58
        $secureConnection = 1 << 15;
166 58
        $protocol41 = 1 << 9;
167
168 58
        return ($longPassword | $longFlag | $transactions | $protocol41 | $secureConnection | $noSchema);
169
    }
170
171
    /**
172
     * @throws BinLogException
173
     * @throws GtidException
174
     * @throws SocketException
175
     */
176 58
    private function getBinlogStream(): void
177
    {
178 58
        $this->checkSum = $this->repository->isCheckSum();
179 58
        if ($this->checkSum) {
180 58
            $this->execute('SET @master_binlog_checksum = @@global.binlog_checksum');
181
        }
182
183 58
        if (0 !== $this->config->getHeartbeatPeriod()) {
0 ignored issues
show
introduced by
The condition 0 !== $this->config->getHeartbeatPeriod() is always true.
Loading history...
184
            // master_heartbeat_period is in nanoseconds
185 58
            $this->execute('SET @master_heartbeat_period = ' . $this->config->getHeartbeatPeriod() * 1000000000);
186
        }
187
188 58
        $this->registerSlave();
189
190 58
        if ('' !== $this->config->getMariaDbGtid()) {
191
            $this->setBinLogDumpMariaGtid();
192
        }
193 58
        if ('' !== $this->config->getGtid()) {
194
            $this->setBinLogDumpGtid();
195
        } else {
196 58
            $this->setBinLogDump();
197
        }
198
    }
199
200
    /**
201
     * @throws BinLogException
202
     * @throws SocketException
203
     */
204 58
    private function execute(string $sql): void
205
    {
206 58
        $this->socket->writeToSocket(pack('LC', strlen($sql) + 1, 0x03) . $sql);
207 58
        $this->getResponse();
208
    }
209
210
    /**
211
     * @see https://dev.mysql.com/doc/internals/en/com-register-slave.html
212
     * @throws BinLogException
213
     * @throws SocketException
214
     */
215 58
    private function registerSlave(): void
216
    {
217 58
        $host = gethostname();
218 58
        $hostLength = strlen($host);
219 58
        $userLength = strlen($this->config->getUser());
220 58
        $passLength = strlen($this->config->getPassword());
221
222 58
        $data = pack('l', 18 + $hostLength + $userLength + $passLength);
223 58
        $data .= chr(self::COM_REGISTER_SLAVE);
224 58
        $data .= pack('V', $this->config->getSlaveId());
225 58
        $data .= pack('C', $hostLength);
226 58
        $data .= $host;
227 58
        $data .= pack('C', $userLength);
228 58
        $data .= $this->config->getUser();
229 58
        $data .= pack('C', $passLength);
230 58
        $data .= $this->config->getPassword();
231 58
        $data .= pack('v', $this->config->getPort());
232 58
        $data .= pack('V', 0);
233 58
        $data .= pack('V', 0);
234
235 58
        $this->socket->writeToSocket($data);
236 58
        $this->getResponse();
237
    }
238
239
    /**
240
     * @throws SocketException
241
     * @throws BinLogException
242
     */
243
    private function setBinLogDumpMariaGtid(): void
244
    {
245
        $this->execute('SET @mariadb_slave_capability = 4');
246
        $this->execute('SET @slave_connect_state = \'' . $this->config->getMariaDbGtid() . '\'');
247
        $this->execute('SET @slave_gtid_strict_mode = 0');
248
        $this->execute('SET @slave_gtid_ignore_duplicates = 0');
249
250
        $this->binLogCurrent->setMariaDbGtid($this->config->getMariaDbGtid());
251
    }
252
253
    /**
254
     * @see https://dev.mysql.com/doc/internals/en/com-binlog-dump-gtid.html
255
     * @throws BinLogException
256
     * @throws GtidException
257
     * @throws SocketException
258
     */
259
    private function setBinLogDumpGtid(): void
260
    {
261
        $collection = GtidCollection::makeCollectionFromString($this->config->getGtid());
262
263
        $data = pack('l', 26 + $collection->getEncodedLength()) . chr(self::COM_BINLOG_DUMP_GTID);
264
        $data .= pack('S', 0);
265
        $data .= pack('I', $this->config->getSlaveId());
266
        $data .= pack('I', 3);
267
        $data .= chr(0);
268
        $data .= chr(0);
269
        $data .= chr(0);
270
        $data .= BinaryDataReader::pack64bit(4);
271
        $data .= pack('I', $collection->getEncodedLength());
272
        $data .= $collection->getEncoded();
273
274
        $this->socket->writeToSocket($data);
275
        $this->getResponse();
276
277
        $this->binLogCurrent->setGtid($this->config->getGtid());
278
    }
279
280
    /**
281
     * @see https://dev.mysql.com/doc/internals/en/com-binlog-dump.html
282
     * @throws BinLogException
283
     * @throws SocketException
284
     */
285 58
    private function setBinLogDump(): void
286
    {
287 58
        $binLogCurrent = $this->binLogCurrent;
288 58
        $binFilePos = $binLogCurrent->getBinLogPosition() ?: $this->config->getBinLogPosition();
289 58
        $binFileName = $binLogCurrent->getBinFileName() ?: $this->config->getBinLogFileName();
290 58
        if (0 === $binFilePos && '' === $binFileName) {
291 58
            $masterStatusDTO = $this->repository->getMasterStatus();
292 58
            $binFilePos = $masterStatusDTO->getPosition();
293 58
            $binFileName = $masterStatusDTO->getFile();
294
        }
295
296 58
        $data = pack('i', strlen($binFileName) + 11) . chr(self::COM_BINLOG_DUMP);
297 58
        $data .= pack('I', $binFilePos);
298 58
        $data .= pack('v', 0);
299 58
        $data .= pack('I', $this->config->getSlaveId());
300 58
        $data .= $binFileName;
301
302 58
        $this->socket->writeToSocket($data);
303 58
        $this->getResponse();
304
305 58
        $binLogCurrent->setBinLogPosition($binFilePos);
306 58
        $binLogCurrent->setBinFileName($binFileName);
307
    }
308
309 58
    public function getBinLogCurrent(): BinLogCurrent
310
    {
311 58
        return $this->binLogCurrent;
312
    }
313
314 58
    public function getCheckSum(): bool
315
    {
316 58
        return $this->checkSum;
317
    }
318
}
319