1 | <?php |
||
2 | declare(strict_types=1); |
||
3 | |||
4 | namespace MySQLReplication\BinLog; |
||
5 | |||
6 | use MySQLReplication\BinaryDataReader\BinaryDataReader; |
||
7 | use MySQLReplication\Config\Config; |
||
8 | use MySQLReplication\Gtid\GtidCollection; |
||
9 | use MySQLReplication\Gtid\GtidException; |
||
10 | use MySQLReplication\Repository\RepositoryInterface; |
||
11 | use MySQLReplication\Socket\SocketException; |
||
12 | use MySQLReplication\Socket\SocketInterface; |
||
13 | |||
14 | class BinLogSocketConnect |
||
15 | { |
||
16 | private const COM_BINLOG_DUMP = 0x12; |
||
17 | private const COM_REGISTER_SLAVE = 0x15; |
||
18 | private const COM_BINLOG_DUMP_GTID = 0x1e; |
||
19 | |||
20 | /** |
||
21 | * http://dev.mysql.com/doc/internals/en/auth-phase-fast-path.html 00 FE |
||
22 | */ |
||
23 | private $packageOkHeader = [0, 254]; |
||
24 | private $binaryDataMaxLength = 16777215; |
||
25 | private $checkSum = false; |
||
26 | |||
27 | private $repository; |
||
28 | private $socket; |
||
29 | private $binLogCurrent; |
||
30 | |||
31 | /** |
||
32 | * @throws BinLogException |
||
33 | * @throws GtidException |
||
34 | * @throws SocketException |
||
35 | */ |
||
36 | 58 | public function __construct( |
|
37 | RepositoryInterface $repository, |
||
38 | SocketInterface $socket |
||
39 | ) { |
||
40 | 58 | $this->repository = $repository; |
|
41 | 58 | $this->socket = $socket; |
|
42 | 58 | $this->binLogCurrent = new BinLogCurrent(); |
|
43 | |||
44 | 58 | $this->socket->connectToStream(Config::getHost(), Config::getPort()); |
|
45 | 58 | BinLogServerInfo::parsePackage($this->getResponse(false), $this->repository->getVersion()); |
|
46 | 58 | $this->authenticate(); |
|
47 | 58 | $this->getBinlogStream(); |
|
48 | 58 | } |
|
49 | |||
50 | /** |
||
51 | * @throws BinLogException |
||
52 | * @throws SocketException |
||
53 | */ |
||
54 | 58 | public function getResponse(bool $checkResponse = true): string |
|
55 | { |
||
56 | 58 | $header = $this->socket->readFromSocket(4); |
|
57 | 58 | if ('' === $header) { |
|
58 | return ''; |
||
59 | } |
||
60 | 58 | $dataLength = unpack('L', $header[0] . $header[1] . $header[2] . chr(0))[1]; |
|
61 | 58 | $isMaxDataLength = $dataLength === $this->binaryDataMaxLength; |
|
62 | |||
63 | 58 | $result = $this->socket->readFromSocket($dataLength); |
|
64 | 58 | if (true === $checkResponse) { |
|
65 | 58 | $this->isWriteSuccessful($result); |
|
66 | } |
||
67 | |||
68 | // https://dev.mysql.com/doc/internals/en/sending-more-than-16mbyte.html |
||
69 | 58 | while ($isMaxDataLength) { |
|
70 | $header = $this->socket->readFromSocket(4); |
||
71 | if ('' === $header) { |
||
72 | return $result; |
||
73 | } |
||
74 | $dataLength = unpack('L', $header[0] . $header[1] . $header[2] . chr(0))[1]; |
||
75 | $isMaxDataLength = $dataLength === $this->binaryDataMaxLength; |
||
76 | $next_result = $this->socket->readFromSocket($dataLength); |
||
77 | $result .= $next_result; |
||
78 | } |
||
79 | |||
80 | 58 | return $result; |
|
81 | } |
||
82 | |||
83 | /** |
||
84 | * @throws BinLogException |
||
85 | */ |
||
86 | 58 | private function isWriteSuccessful(string $data): void |
|
87 | { |
||
88 | 58 | $head = ord($data[0]); |
|
89 | 58 | if (!in_array($head, $this->packageOkHeader, true)) { |
|
90 | $errorCode = unpack('v', $data[1] . $data[2])[1]; |
||
91 | $errorMessage = ''; |
||
92 | $packetLength = strlen($data); |
||
93 | for ($i = 9; $i < $packetLength; ++$i) { |
||
94 | $errorMessage .= $data[$i]; |
||
95 | } |
||
96 | |||
97 | throw new BinLogException($errorMessage, $errorCode); |
||
98 | } |
||
99 | 58 | } |
|
100 | |||
101 | /** |
||
102 | * @throws BinLogException |
||
103 | * @throws SocketException |
||
104 | * @link http://dev.mysql.com/doc/internals/en/secure-password-authentication.html#packet-Authentication::Native41 |
||
105 | */ |
||
106 | 58 | private function authenticate(): void |
|
107 | { |
||
108 | 58 | $data = pack('L', self::getCapabilities()); |
|
109 | 58 | $data .= pack('L', $this->binaryDataMaxLength); |
|
110 | 58 | $data .= chr(33); |
|
111 | 58 | for ($i = 0; $i < 23; ++$i) { |
|
112 | 58 | $data .= chr(0); |
|
113 | } |
||
114 | 58 | $result = sha1(Config::getPassword(), true) ^ sha1( |
|
115 | 58 | BinLogServerInfo::getSalt() . sha1(sha1(Config::getPassword(), true), true), true |
|
116 | ); |
||
117 | |||
118 | 58 | $data = $data . Config::getUser() . chr(0) . chr(strlen($result)) . $result; |
|
119 | 58 | $str = pack('L', strlen($data)); |
|
120 | 58 | $s = $str[0] . $str[1] . $str[2]; |
|
121 | 58 | $data = $s . chr(1) . $data; |
|
122 | |||
123 | 58 | $this->socket->writeToSocket($data); |
|
124 | 58 | $this->getResponse(); |
|
125 | 58 | } |
|
126 | |||
127 | /** |
||
128 | * http://dev.mysql.com/doc/internals/en/capability-flags.html#packet-protocol::capabilityflags |
||
129 | * https://github.com/siddontang/mixer/blob/master/doc/protocol.txt |
||
130 | */ |
||
131 | 58 | private static function getCapabilities(): int |
|
132 | { |
||
133 | 58 | $noSchema = 1 << 4; |
|
134 | 58 | $longPassword = 1; |
|
135 | 58 | $longFlag = 1 << 2; |
|
136 | 58 | $transactions = 1 << 13; |
|
137 | 58 | $secureConnection = 1 << 15; |
|
138 | 58 | $protocol41 = 1 << 9; |
|
139 | |||
140 | 58 | return ($longPassword | $longFlag | $transactions | $protocol41 | $secureConnection | $noSchema); |
|
141 | } |
||
142 | |||
143 | /** |
||
144 | * @throws BinLogException |
||
145 | * @throws GtidException |
||
146 | * @throws SocketException |
||
147 | */ |
||
148 | 58 | private function getBinlogStream(): void |
|
149 | { |
||
150 | 58 | $this->checkSum = $this->repository->isCheckSum(); |
|
151 | 58 | if ($this->checkSum) { |
|
152 | 58 | $this->execute('SET @master_binlog_checksum = @@global.binlog_checksum'); |
|
153 | } |
||
154 | |||
155 | 58 | if (0 !== Config::getHeartbeatPeriod()) { |
|
0 ignored issues
–
show
introduced
by
![]() |
|||
156 | // master_heartbeat_period is in nanoseconds |
||
157 | 58 | $this->execute('SET @master_heartbeat_period = ' . Config::getHeartbeatPeriod() * 1000000000); |
|
158 | } |
||
159 | |||
160 | 58 | $this->registerSlave(); |
|
161 | |||
162 | 58 | if ('' !== Config::getMariaDbGtid()) { |
|
163 | $this->setBinLogDumpMariaGtid(); |
||
164 | } |
||
165 | 58 | if ('' !== Config::getGtid()) { |
|
166 | $this->setBinLogDumpGtid(); |
||
167 | } else { |
||
168 | 58 | $this->setBinLogDump(); |
|
169 | } |
||
170 | 58 | } |
|
171 | |||
172 | /** |
||
173 | * @throws BinLogException |
||
174 | * @throws SocketException |
||
175 | */ |
||
176 | 58 | private function execute(string $sql): void |
|
177 | { |
||
178 | 58 | $this->socket->writeToSocket(pack('LC', strlen($sql) + 1, 0x03) . $sql); |
|
179 | 58 | $this->getResponse(); |
|
180 | 58 | } |
|
181 | |||
182 | /** |
||
183 | * @see https://dev.mysql.com/doc/internals/en/com-register-slave.html |
||
184 | * @throws BinLogException |
||
185 | * @throws SocketException |
||
186 | */ |
||
187 | 58 | private function registerSlave(): void |
|
188 | { |
||
189 | 58 | $host = gethostname(); |
|
190 | 58 | $hostLength = strlen($host); |
|
191 | 58 | $userLength = strlen(Config::getUser()); |
|
192 | 58 | $passLength = strlen(Config::getPassword()); |
|
193 | |||
194 | 58 | $data = pack('l', 18 + $hostLength + $userLength + $passLength); |
|
195 | 58 | $data .= chr(self::COM_REGISTER_SLAVE); |
|
196 | 58 | $data .= pack('V', Config::getSlaveId()); |
|
197 | 58 | $data .= pack('C', $hostLength); |
|
198 | 58 | $data .= $host; |
|
199 | 58 | $data .= pack('C', $userLength); |
|
200 | 58 | $data .= Config::getUser(); |
|
201 | 58 | $data .= pack('C', $passLength); |
|
202 | 58 | $data .= Config::getPassword(); |
|
203 | 58 | $data .= pack('v', Config::getPort()); |
|
204 | 58 | $data .= pack('V', 0); |
|
205 | 58 | $data .= pack('V', 0); |
|
206 | |||
207 | 58 | $this->socket->writeToSocket($data); |
|
208 | 58 | $this->getResponse(); |
|
209 | 58 | } |
|
210 | |||
211 | /** |
||
212 | * @throws SocketException |
||
213 | * @throws BinLogException |
||
214 | */ |
||
215 | private function setBinLogDumpMariaGtid(): void |
||
216 | { |
||
217 | $this->execute('SET @mariadb_slave_capability = 4'); |
||
218 | $this->execute('SET @slave_connect_state = \'' . Config::getMariaDbGtid() . '\''); |
||
219 | $this->execute('SET @slave_gtid_strict_mode = 0'); |
||
220 | $this->execute('SET @slave_gtid_ignore_duplicates = 0'); |
||
221 | |||
222 | $this->binLogCurrent->setMariaDbGtid(Config::getMariaDbGtid()); |
||
223 | } |
||
224 | |||
225 | /** |
||
226 | * @see https://dev.mysql.com/doc/internals/en/com-binlog-dump-gtid.html |
||
227 | * @throws BinLogException |
||
228 | * @throws GtidException |
||
229 | * @throws SocketException |
||
230 | */ |
||
231 | private function setBinLogDumpGtid(): void |
||
232 | { |
||
233 | $collection = GtidCollection::makeCollectionFromString(Config::getGtid()); |
||
234 | |||
235 | $data = pack('l', 26 + $collection->getEncodedLength()) . chr(self::COM_BINLOG_DUMP_GTID); |
||
236 | $data .= pack('S', 0); |
||
237 | $data .= pack('I', Config::getSlaveId()); |
||
238 | $data .= pack('I', 3); |
||
239 | $data .= chr(0); |
||
240 | $data .= chr(0); |
||
241 | $data .= chr(0); |
||
242 | $data .= BinaryDataReader::pack64bit(4); |
||
243 | $data .= pack('I', $collection->getEncodedLength()); |
||
244 | $data .= $collection->getEncoded(); |
||
245 | |||
246 | $this->socket->writeToSocket($data); |
||
247 | $this->getResponse(); |
||
248 | |||
249 | $this->binLogCurrent->setGtid(Config::getGtid()); |
||
250 | } |
||
251 | |||
252 | /** |
||
253 | * @see https://dev.mysql.com/doc/internals/en/com-binlog-dump.html |
||
254 | * @throws BinLogException |
||
255 | * @throws SocketException |
||
256 | */ |
||
257 | 58 | private function setBinLogDump(): void |
|
258 | { |
||
259 | 58 | $binFilePos = Config::getBinLogPosition(); |
|
260 | 58 | $binFileName = Config::getBinLogFileName(); |
|
261 | 58 | if (0 === $binFilePos && '' === $binFileName) { |
|
262 | 58 | $masterStatusDTO = $this->repository->getMasterStatus(); |
|
263 | 58 | $binFilePos = $masterStatusDTO->getPosition(); |
|
264 | 58 | $binFileName = $masterStatusDTO->getFile(); |
|
265 | } |
||
266 | |||
267 | 58 | $data = pack('i', strlen($binFileName) + 11) . chr(self::COM_BINLOG_DUMP); |
|
268 | 58 | $data .= pack('I', $binFilePos); |
|
269 | 58 | $data .= pack('v', 0); |
|
270 | 58 | $data .= pack('I', Config::getSlaveId()); |
|
271 | 58 | $data .= $binFileName; |
|
272 | |||
273 | 58 | $this->socket->writeToSocket($data); |
|
274 | 58 | $this->getResponse(); |
|
275 | |||
276 | 58 | $this->binLogCurrent->setBinLogPosition($binFilePos); |
|
277 | 58 | $this->binLogCurrent->setBinFileName($binFileName); |
|
278 | 58 | } |
|
279 | |||
280 | 58 | public function getBinLogCurrent(): BinLogCurrent |
|
281 | { |
||
282 | 58 | return $this->binLogCurrent; |
|
283 | } |
||
284 | |||
285 | 58 | public function getCheckSum(): bool |
|
286 | { |
||
287 | 58 | return $this->checkSum; |
|
288 | } |
||
289 | } |
||
290 |