Passed
Pull Request — master (#4)
by Moln
03:27
created

BinLogSocketConnect::connect()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 9
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 6
c 0
b 0
f 0
nc 1
nop 0
dl 0
loc 9
rs 10
1
<?php
2
declare(strict_types=1);
3
4
namespace MySQLReplication\BinLog;
5
6
use MySQLReplication\BinaryDataReader\BinaryDataReader;
7
use MySQLReplication\Config\Config;
8
use MySQLReplication\Gtid\GtidCollection;
9
use MySQLReplication\Gtid\GtidException;
10
use MySQLReplication\Repository\RepositoryInterface;
11
use MySQLReplication\Socket\SocketException;
12
use MySQLReplication\Socket\SocketInterface;
13
14
class BinLogSocketConnect
15
{
16
    private const COM_BINLOG_DUMP = 0x12;
17
    private const COM_REGISTER_SLAVE = 0x15;
18
    private const COM_BINLOG_DUMP_GTID = 0x1e;
19
20
    /**
21
     * http://dev.mysql.com/doc/internals/en/auth-phase-fast-path.html 00 FE
22
     */
23
    private $packageOkHeader = [0, 254];
24
    private $binaryDataMaxLength = 16777215;
25
    private $checkSum = false;
26
27
    private $repository;
28
    private $socket;
29
    private $binLogCurrent;
30
31
    /**
32
     * @var Config
33
     */
34
    private $config;
35
    /**
36
     * @var BinLogServerInfo
37
     */
38
    private $binLogServerInfo;
39
40
    /**
41
     * @throws BinLogException
42
     * @throws GtidException
43
     * @throws SocketException
44
     */
45
    public function __construct(
46
        Config $config,
47
        RepositoryInterface $repository,
48
        SocketInterface $socket
49
    ) {
50
        $this->config = $config;
51
        $this->repository = $repository;
52
        $this->socket = $socket;
53
        $this->binLogCurrent = new BinLogCurrent();
54
55
    }
56
57
    public function isConnected(): bool
58
    {
59
        return $this->socket->isConnected();
60
    }
61
62
    public function connect(): void
63
    {
64
        $this->socket->connectToStream($this->config->getHost(), $this->config->getPort());
65
        $this->binLogServerInfo = BinLogServerInfo::parsePackage(
66
            $this->getResponse(false),
67
            $this->repository->getVersion()
68
        );
69
        $this->authenticate();
70
        $this->getBinlogStream();
71
    }
72
73
    public function getBinLogServerInfo(): BinLogServerInfo
74
    {
75
        return $this->binLogServerInfo;
76
    }
77
78
    /**
79
     * @throws BinLogException
80
     * @throws SocketException
81
     */
82
    public function getResponse(bool $checkResponse = true): string
83
    {
84
        $header = $this->socket->readFromSocket(4);
85
        if ('' === $header) {
86
            return '';
87
        }
88
        $dataLength = unpack('L', $header[0] . $header[1] . $header[2] . chr(0))[1];
89
        $isMaxDataLength = $dataLength === $this->binaryDataMaxLength;
90
91
        $result = $this->socket->readFromSocket($dataLength);
92
        if (true === $checkResponse) {
93
            $this->isWriteSuccessful($result);
94
        }
95
96
        // https://dev.mysql.com/doc/internals/en/sending-more-than-16mbyte.html
97
        while ($isMaxDataLength) {
98
            $header = $this->socket->readFromSocket(4);
99
            if ('' === $header) {
100
                return $result;
101
            }
102
            $dataLength = unpack('L', $header[0] . $header[1] . $header[2] . chr(0))[1];
103
            $isMaxDataLength = $dataLength === $this->binaryDataMaxLength;
104
            $next_result = $this->socket->readFromSocket($dataLength);
105
            $result .= $next_result;
106
        }
107
108
        return $result;
109
    }
110
111
    /**
112
     * @throws BinLogException
113
     */
114
    private function isWriteSuccessful(string $data): void
115
    {
116
        $head = ord($data[0]);
117
        if (!in_array($head, $this->packageOkHeader, true)) {
118
            $errorCode = unpack('v', $data[1] . $data[2])[1];
119
            $errorMessage = '';
120
            $packetLength = strlen($data);
121
            for ($i = 9; $i < $packetLength; ++$i) {
122
                $errorMessage .= $data[$i];
123
            }
124
125
            throw new BinLogException($errorMessage, $errorCode);
126
        }
127
    }
128
129
    /**
130
     * @throws BinLogException
131
     * @throws SocketException
132
     * @link http://dev.mysql.com/doc/internals/en/secure-password-authentication.html#packet-Authentication::Native41
133
     */
134
    private function authenticate(): void
135
    {
136
        $data = pack('L', self::getCapabilities());
137
        $data .= pack('L', $this->binaryDataMaxLength);
138
        $data .= chr(33);
139
        for ($i = 0; $i < 23; ++$i) {
140
            $data .= chr(0);
141
        }
142
        $result = sha1($this->config->getPassword(), true) ^ sha1(
143
                $this->binLogServerInfo->getSalt() . sha1(sha1($this->config->getPassword(), true), true), true
144
            );
145
146
        $data = $data . $this->config->getUser() . chr(0) . chr(strlen($result)) . $result;
147
        $str = pack('L', strlen($data));
148
        $s = $str[0] . $str[1] . $str[2];
149
        $data = $s . chr(1) . $data;
150
151
        $this->socket->writeToSocket($data);
152
        $this->getResponse();
153
    }
154
155
    /**
156
     * http://dev.mysql.com/doc/internals/en/capability-flags.html#packet-protocol::capabilityflags
157
     * https://github.com/siddontang/mixer/blob/master/doc/protocol.txt
158
     */
159
    private static function getCapabilities(): int
160
    {
161
        $noSchema = 1 << 4;
162
        $longPassword = 1;
163
        $longFlag = 1 << 2;
164
        $transactions = 1 << 13;
165
        $secureConnection = 1 << 15;
166
        $protocol41 = 1 << 9;
167
168
        return ($longPassword | $longFlag | $transactions | $protocol41 | $secureConnection | $noSchema);
169
    }
170
171
    /**
172
     * @throws BinLogException
173
     * @throws GtidException
174
     * @throws SocketException
175
     */
176
    private function getBinlogStream(): void
177
    {
178
        $this->checkSum = $this->repository->isCheckSum();
179
        if ($this->checkSum) {
180
            $this->execute('SET @master_binlog_checksum = @@global.binlog_checksum');
181
        }
182
183
        if (0 !== $this->config->getHeartbeatPeriod()) {
0 ignored issues
show
introduced by
The condition 0 !== $this->config->getHeartbeatPeriod() is always true.
Loading history...
184
            // master_heartbeat_period is in nanoseconds
185
            $this->execute('SET @master_heartbeat_period = ' . $this->config->getHeartbeatPeriod() * 1000000000);
186
        }
187
188
        $this->registerSlave();
189
190
        if ('' !== $this->config->getMariaDbGtid()) {
191
            $this->setBinLogDumpMariaGtid();
192
        }
193
        if ('' !== $this->config->getGtid()) {
194
            $this->setBinLogDumpGtid();
195
        } else {
196
            $this->setBinLogDump();
197
        }
198
    }
199
200
    /**
201
     * @throws BinLogException
202
     * @throws SocketException
203
     */
204
    private function execute(string $sql): void
205
    {
206
        $this->socket->writeToSocket(pack('LC', strlen($sql) + 1, 0x03) . $sql);
207
        $this->getResponse();
208
    }
209
210
    /**
211
     * @see https://dev.mysql.com/doc/internals/en/com-register-slave.html
212
     * @throws BinLogException
213
     * @throws SocketException
214
     */
215
    private function registerSlave(): void
216
    {
217
        $host = gethostname();
218
        $hostLength = strlen($host);
219
        $userLength = strlen($this->config->getUser());
220
        $passLength = strlen($this->config->getPassword());
221
222
        $data = pack('l', 18 + $hostLength + $userLength + $passLength);
223
        $data .= chr(self::COM_REGISTER_SLAVE);
224
        $data .= pack('V', $this->config->getSlaveId());
225
        $data .= pack('C', $hostLength);
226
        $data .= $host;
227
        $data .= pack('C', $userLength);
228
        $data .= $this->config->getUser();
229
        $data .= pack('C', $passLength);
230
        $data .= $this->config->getPassword();
231
        $data .= pack('v', $this->config->getPort());
232
        $data .= pack('V', 0);
233
        $data .= pack('V', 0);
234
235
        $this->socket->writeToSocket($data);
236
        $this->getResponse();
237
    }
238
239
    /**
240
     * @throws SocketException
241
     * @throws BinLogException
242
     */
243
    private function setBinLogDumpMariaGtid(): void
244
    {
245
        $this->execute('SET @mariadb_slave_capability = 4');
246
        $this->execute('SET @slave_connect_state = \'' . $this->config->getMariaDbGtid() . '\'');
247
        $this->execute('SET @slave_gtid_strict_mode = 0');
248
        $this->execute('SET @slave_gtid_ignore_duplicates = 0');
249
250
        $this->binLogCurrent->setMariaDbGtid($this->config->getMariaDbGtid());
251
    }
252
253
    /**
254
     * @see https://dev.mysql.com/doc/internals/en/com-binlog-dump-gtid.html
255
     * @throws BinLogException
256
     * @throws GtidException
257
     * @throws SocketException
258
     */
259
    private function setBinLogDumpGtid(): void
260
    {
261
        $collection = GtidCollection::makeCollectionFromString($this->config->getGtid());
262
263
        $data = pack('l', 26 + $collection->getEncodedLength()) . chr(self::COM_BINLOG_DUMP_GTID);
264
        $data .= pack('S', 0);
265
        $data .= pack('I', $this->config->getSlaveId());
266
        $data .= pack('I', 3);
267
        $data .= chr(0);
268
        $data .= chr(0);
269
        $data .= chr(0);
270
        $data .= BinaryDataReader::pack64bit(4);
271
        $data .= pack('I', $collection->getEncodedLength());
272
        $data .= $collection->getEncoded();
273
274
        $this->socket->writeToSocket($data);
275
        $this->getResponse();
276
277
        $this->binLogCurrent->setGtid($this->config->getGtid());
278
    }
279
280
    /**
281
     * @see https://dev.mysql.com/doc/internals/en/com-binlog-dump.html
282
     * @throws BinLogException
283
     * @throws SocketException
284
     */
285
    private function setBinLogDump(): void
286
    {
287
        $binLogCurrent = $this->binLogCurrent;
288
        $binFilePos = $binLogCurrent->getBinLogPosition() ?: $this->config->getBinLogPosition();
289
        $binFileName = $binLogCurrent->getBinFileName() ?: $this->config->getBinLogFileName();
290
        if (0 === $binFilePos && '' === $binFileName) {
291
            $masterStatusDTO = $this->repository->getMasterStatus();
292
            $binFilePos = $masterStatusDTO->getPosition();
293
            $binFileName = $masterStatusDTO->getFile();
294
        }
295
296
        $data = pack('i', strlen($binFileName) + 11) . chr(self::COM_BINLOG_DUMP);
297
        $data .= pack('I', $binFilePos);
298
        $data .= pack('v', 0);
299
        $data .= pack('I', $this->config->getSlaveId());
300
        $data .= $binFileName;
301
302
        $this->socket->writeToSocket($data);
303
        $this->getResponse();
304
305
        $binLogCurrent->setBinLogPosition($binFilePos);
306
        $binLogCurrent->setBinFileName($binFileName);
307
    }
308
309
    public function getBinLogCurrent(): BinLogCurrent
310
    {
311
        return $this->binLogCurrent;
312
    }
313
314
    public function getCheckSum(): bool
315
    {
316
        return $this->checkSum;
317
    }
318
}
319