Issues (6)

MySQLReplication/BinLog/BinLogSocketConnect.php (1 issue)

Severity
1
<?php
2
declare(strict_types=1);
3
4
namespace MySQLReplication\BinLog;
5
6
use MySQLReplication\BinaryDataReader\BinaryDataReader;
7
use MySQLReplication\Config\Config;
8
use MySQLReplication\Gtid\GtidCollection;
9
use MySQLReplication\Gtid\GtidException;
10
use MySQLReplication\Repository\RepositoryInterface;
11
use MySQLReplication\Socket\SocketException;
12
use MySQLReplication\Socket\SocketInterface;
13
14
class BinLogSocketConnect
15
{
16
    private const COM_BINLOG_DUMP = 0x12;
17
    private const COM_REGISTER_SLAVE = 0x15;
18
    private const COM_BINLOG_DUMP_GTID = 0x1e;
19
20
    /**
21
     * http://dev.mysql.com/doc/internals/en/auth-phase-fast-path.html 00 FE
22
     */
23
    private $packageOkHeader = [0, 254];
24
    private $binaryDataMaxLength = 16777215;
25
    private $checkSum = false;
26
27
    private $repository;
28
    private $socket;
29
    private $binLogCurrent;
30
31
    /**
32
     * @throws BinLogException
33
     * @throws GtidException
34
     * @throws SocketException
35
     */
36 58
    public function __construct(
37
        RepositoryInterface $repository,
38
        SocketInterface $socket
39
    ) {
40 58
        $this->repository = $repository;
41 58
        $this->socket = $socket;
42 58
        $this->binLogCurrent = new BinLogCurrent();
43
44 58
        $this->socket->connectToStream(Config::getHost(), Config::getPort());
45 58
        BinLogServerInfo::parsePackage($this->getResponse(false), $this->repository->getVersion());
46 58
        $this->authenticate();
47 58
        $this->getBinlogStream();
48 58
    }
49
50
    /**
51
     * @throws BinLogException
52
     * @throws SocketException
53
     */
54 58
    public function getResponse(bool $checkResponse = true): string
55
    {
56 58
        $header = $this->socket->readFromSocket(4);
57 58
        if ('' === $header) {
58
            return '';
59
        }
60 58
        $dataLength = unpack('L', $header[0] . $header[1] . $header[2] . chr(0))[1];
61 58
        $isMaxDataLength = $dataLength === $this->binaryDataMaxLength;
62
63 58
        $result = $this->socket->readFromSocket($dataLength);
64 58
        if (true === $checkResponse) {
65 58
            $this->isWriteSuccessful($result);
66
        }
67
68
        // https://dev.mysql.com/doc/internals/en/sending-more-than-16mbyte.html
69 58
        while ($isMaxDataLength) {
70
            $header = $this->socket->readFromSocket(4);
71
            if ('' === $header) {
72
                return $result;
73
            }
74
            $dataLength = unpack('L', $header[0] . $header[1] . $header[2] . chr(0))[1];
75
            $isMaxDataLength = $dataLength === $this->binaryDataMaxLength;
76
            $next_result = $this->socket->readFromSocket($dataLength);
77
            $result .= $next_result;
78
        }
79
80 58
        return $result;
81
    }
82
83
    /**
84
     * @throws BinLogException
85
     */
86 58
    private function isWriteSuccessful(string $data): void
87
    {
88 58
        $head = ord($data[0]);
89 58
        if (!in_array($head, $this->packageOkHeader, true)) {
90
            $errorCode = unpack('v', $data[1] . $data[2])[1];
91
            $errorMessage = '';
92
            $packetLength = strlen($data);
93
            for ($i = 9; $i < $packetLength; ++$i) {
94
                $errorMessage .= $data[$i];
95
            }
96
97
            throw new BinLogException($errorMessage, $errorCode);
98
        }
99 58
    }
100
101
    /**
102
     * @throws BinLogException
103
     * @throws SocketException
104
     * @link http://dev.mysql.com/doc/internals/en/secure-password-authentication.html#packet-Authentication::Native41
105
     */
106 58
    private function authenticate(): void
107
    {
108 58
        $data = pack('L', self::getCapabilities());
109 58
        $data .= pack('L', $this->binaryDataMaxLength);
110 58
        $data .= chr(33);
111 58
        for ($i = 0; $i < 23; ++$i) {
112 58
            $data .= chr(0);
113
        }
114 58
        $result = sha1(Config::getPassword(), true) ^ sha1(
115 58
                BinLogServerInfo::getSalt() . sha1(sha1(Config::getPassword(), true), true), true
116
            );
117
118 58
        $data = $data . Config::getUser() . chr(0) . chr(strlen($result)) . $result;
119 58
        $str = pack('L', strlen($data));
120 58
        $s = $str[0] . $str[1] . $str[2];
121 58
        $data = $s . chr(1) . $data;
122
123 58
        $this->socket->writeToSocket($data);
124 58
        $this->getResponse();
125 58
    }
126
127
    /**
128
     * http://dev.mysql.com/doc/internals/en/capability-flags.html#packet-protocol::capabilityflags
129
     * https://github.com/siddontang/mixer/blob/master/doc/protocol.txt
130
     */
131 58
    private static function getCapabilities(): int
132
    {
133 58
        $noSchema = 1 << 4;
134 58
        $longPassword = 1;
135 58
        $longFlag = 1 << 2;
136 58
        $transactions = 1 << 13;
137 58
        $secureConnection = 1 << 15;
138 58
        $protocol41 = 1 << 9;
139
140 58
        return ($longPassword | $longFlag | $transactions | $protocol41 | $secureConnection | $noSchema);
141
    }
142
143
    /**
144
     * @throws BinLogException
145
     * @throws GtidException
146
     * @throws SocketException
147
     */
148 58
    private function getBinlogStream(): void
149
    {
150 58
        $this->checkSum = $this->repository->isCheckSum();
151 58
        if ($this->checkSum) {
152 58
            $this->execute('SET @master_binlog_checksum = @@global.binlog_checksum');
153
        }
154
155 58
        if (0 !== Config::getHeartbeatPeriod()) {
0 ignored issues
show
The condition 0 !== MySQLReplication\C...g::getHeartbeatPeriod() is always true.
Loading history...
156
            // master_heartbeat_period is in nanoseconds
157 58
            $this->execute('SET @master_heartbeat_period = ' . Config::getHeartbeatPeriod() * 1000000000);
158
        }
159
160 58
        $this->registerSlave();
161
162 58
        if ('' !== Config::getMariaDbGtid()) {
163
            $this->setBinLogDumpMariaGtid();
164
        }
165 58
        if ('' !== Config::getGtid()) {
166
            $this->setBinLogDumpGtid();
167
        } else {
168 58
            $this->setBinLogDump();
169
        }
170 58
    }
171
172
    /**
173
     * @throws BinLogException
174
     * @throws SocketException
175
     */
176 58
    private function execute(string $sql): void
177
    {
178 58
        $this->socket->writeToSocket(pack('LC', strlen($sql) + 1, 0x03) . $sql);
179 58
        $this->getResponse();
180 58
    }
181
182
    /**
183
     * @see https://dev.mysql.com/doc/internals/en/com-register-slave.html
184
     * @throws BinLogException
185
     * @throws SocketException
186
     */
187 58
    private function registerSlave(): void
188
    {
189 58
        $host = gethostname();
190 58
        $hostLength = strlen($host);
191 58
        $userLength = strlen(Config::getUser());
192 58
        $passLength = strlen(Config::getPassword());
193
194 58
        $data = pack('l', 18 + $hostLength + $userLength + $passLength);
195 58
        $data .= chr(self::COM_REGISTER_SLAVE);
196 58
        $data .= pack('V', Config::getSlaveId());
197 58
        $data .= pack('C', $hostLength);
198 58
        $data .= $host;
199 58
        $data .= pack('C', $userLength);
200 58
        $data .= Config::getUser();
201 58
        $data .= pack('C', $passLength);
202 58
        $data .= Config::getPassword();
203 58
        $data .= pack('v', Config::getPort());
204 58
        $data .= pack('V', 0);
205 58
        $data .= pack('V', 0);
206
207 58
        $this->socket->writeToSocket($data);
208 58
        $this->getResponse();
209 58
    }
210
211
    /**
212
     * @throws SocketException
213
     * @throws BinLogException
214
     */
215
    private function setBinLogDumpMariaGtid(): void
216
    {
217
        $this->execute('SET @mariadb_slave_capability = 4');
218
        $this->execute('SET @slave_connect_state = \'' . Config::getMariaDbGtid() . '\'');
219
        $this->execute('SET @slave_gtid_strict_mode = 0');
220
        $this->execute('SET @slave_gtid_ignore_duplicates = 0');
221
222
        $this->binLogCurrent->setMariaDbGtid(Config::getMariaDbGtid());
223
    }
224
225
    /**
226
     * @see https://dev.mysql.com/doc/internals/en/com-binlog-dump-gtid.html
227
     * @throws BinLogException
228
     * @throws GtidException
229
     * @throws SocketException
230
     */
231
    private function setBinLogDumpGtid(): void
232
    {
233
        $collection = GtidCollection::makeCollectionFromString(Config::getGtid());
234
235
        $data = pack('l', 26 + $collection->getEncodedLength()) . chr(self::COM_BINLOG_DUMP_GTID);
236
        $data .= pack('S', 0);
237
        $data .= pack('I', Config::getSlaveId());
238
        $data .= pack('I', 3);
239
        $data .= chr(0);
240
        $data .= chr(0);
241
        $data .= chr(0);
242
        $data .= BinaryDataReader::pack64bit(4);
243
        $data .= pack('I', $collection->getEncodedLength());
244
        $data .= $collection->getEncoded();
245
246
        $this->socket->writeToSocket($data);
247
        $this->getResponse();
248
249
        $this->binLogCurrent->setGtid(Config::getGtid());
250
    }
251
252
    /**
253
     * @see https://dev.mysql.com/doc/internals/en/com-binlog-dump.html
254
     * @throws BinLogException
255
     * @throws SocketException
256
     */
257 58
    private function setBinLogDump(): void
258
    {
259 58
        $binFilePos = Config::getBinLogPosition();
260 58
        $binFileName = Config::getBinLogFileName();
261 58
        if (0 === $binFilePos && '' === $binFileName) {
262 58
            $masterStatusDTO = $this->repository->getMasterStatus();
263 58
            $binFilePos = $masterStatusDTO->getPosition();
264 58
            $binFileName = $masterStatusDTO->getFile();
265
        }
266
267 58
        $data = pack('i', strlen($binFileName) + 11) . chr(self::COM_BINLOG_DUMP);
268 58
        $data .= pack('I', $binFilePos);
269 58
        $data .= pack('v', 0);
270 58
        $data .= pack('I', Config::getSlaveId());
271 58
        $data .= $binFileName;
272
273 58
        $this->socket->writeToSocket($data);
274 58
        $this->getResponse();
275
276 58
        $this->binLogCurrent->setBinLogPosition($binFilePos);
277 58
        $this->binLogCurrent->setBinFileName($binFileName);
278 58
    }
279
280 58
    public function getBinLogCurrent(): BinLogCurrent
281
    {
282 58
        return $this->binLogCurrent;
283
    }
284
285 58
    public function getCheckSum(): bool
286
    {
287 58
        return $this->checkSum;
288
    }
289
}
290