Completed
Push — master ( 9d1314...5291f6 )
by kacper
02:05
created

BinLogSocketConnect::setBinLogDumpGtid()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 19
Code Lines 14

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 19
rs 9.4285
c 0
b 0
f 0
cc 1
eloc 14
nc 1
nop 0
1
<?php
2
namespace MySQLReplication\BinLog;
3
4
use MySQLReplication\BinaryDataReader\BinaryDataReader;
5
use MySQLReplication\BinLog\Exception\BinLogException;
6
use MySQLReplication\Config\Config;
7
use MySQLReplication\Definitions\ConstCapabilityFlags;
8
use MySQLReplication\Definitions\ConstCommand;
9
use MySQLReplication\Gtid\GtidException;
10
use MySQLReplication\Gtid\GtidService;
11
use MySQLReplication\Repository\RepositoryInterface;
12
13
/**
14
 * Class BinLogSocketConnect
15
 * @package MySQLReplication\BinLog
16
 */
17
class BinLogSocketConnect implements BinLogSocketConnectInterface
18
{
19
    /**
20
     * @var resource
21
     */
22
    private $socket;
23
    /**
24
     * @var bool
25
     */
26
    private $checkSum = false;
27
    /**
28
     * @var RepositoryInterface
29
     */
30
    private $repository;
31
    /**
32
     * @var Config
33
     */
34
    private $config;
35
    /**
36
     * @var BinLogAuth
37
     */
38
    private $packAuth;
39
    /**
40
     * @var GtidService
41
     */
42
    private $gtidService;
43
    /**
44
     * http://dev.mysql.com/doc/internals/en/auth-phase-fast-path.html 00 FE
45
     * @var array
46
     */
47
    private $packageOkHeader = [0, 254];
48
49
    /**
50
     * @param Config $config
51
     * @param RepositoryInterface $repository
52
     * @param BinLogAuth $packAuth
53
     * @param GtidService $gtidService
54
     */
55
    public function __construct(
56
        Config $config,
57
        RepositoryInterface $repository,
58
        BinLogAuth $packAuth,
59
        GtidService $gtidService
60
    ) {
61
        $this->repository = $repository;
62
        $this->config = $config;
63
        $this->packAuth = $packAuth;
64
        $this->gtidService = $gtidService;
65
    }
66
67
    public function __destruct()
68
    {
69
        if (true === $this->isConnected())
70
        {
71
            socket_shutdown($this->socket);
72
            socket_close($this->socket);
73
        }
74
    }
75
76
    /**
77
     * @return bool
78
     */
79
    public function isConnected()
80
    {
81
        return is_resource($this->socket);
82
    }
83
84
    /**
85
     * @return bool
86
     */
87
    public function getCheckSum()
88
    {
89
        return $this->checkSum;
90
    }
91
92
    /**
93
     * @throws BinLogException
94
     */
95
    public function connectToStream()
96
    {
97
        if (false === ($this->socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP)))
98
        {
99
            throw new BinLogException(BinLogException::UNABLE_TO_CREATE_SOCKET. socket_strerror(socket_last_error()), socket_last_error());
100
        }
101
        socket_set_block($this->socket);
102
        socket_set_option($this->socket, SOL_SOCKET, SO_KEEPALIVE, 1);
103
104
        if (false === socket_connect($this->socket, $this->config->getHost(), $this->config->getPort()))
105
        {
106
            throw new BinLogException(socket_strerror(socket_last_error()), socket_last_error());
107
        }
108
109
        $this->serverInfo();
110
        $this->auth();
111
        $this->getBinlogStream();
112
    }
113
114
    /**
115
     * @throws BinLogException
116
     */
117
    private function serverInfo()
118
    {
119
        BinLogServerInfo::parsePackage($this->getPacket(false));
120
        BinLogServerInfo::parseVersion($this->repository->getVersion());
121
    }
122
123
    /**
124
     * @param bool $checkForOkByte
125
     * @return string
126
     * @throws BinLogException
127
     */
128
    public function getPacket($checkForOkByte = true)
129
    {
130
        $header = $this->readFromSocket(4);
131
        if (false === $header)
132
        {
133
            return '';
134
        }
135
        $dataLength = unpack('L', $header[0] . $header[1] . $header[2] . chr(0))[1];
136
137
        $result = $this->readFromSocket($dataLength);
138
        if (true === $checkForOkByte)
139
        {
140
            $this->isWriteSuccessful($result);
141
        }
142
143
        return $result;
144
    }
145
146
    /**
147
     * @param int $length
148
     * @return string
149
     * @throws BinLogException
150
     */
151
    private function readFromSocket($length)
152
    {
153
        $received = socket_recv($this->socket, $buf, $length, MSG_WAITALL);
154
        if ($length === $received)
155
        {
156
            return $buf;
157
        }
158
159
        // http://php.net/manual/pl/function.socket-recv.php#47182
160
        if (0 === $received)
161
        {
162
            throw new BinLogException(BinLogException::DISCONNECTED_MESSAGE);
163
        }
164
165
        throw new BinLogException(socket_strerror(socket_last_error()), socket_last_error());
166
    }
167
168
    /**
169
     * @param string $packet
170
     * @return array
171
     * @throws BinLogException
172
     */
173
    public function isWriteSuccessful($packet)
174
    {
175
        $head = ord($packet[0]);
176
        if (in_array($head, $this->packageOkHeader, true))
177
        {
178
            return ['status' => true, 'code' => 0, 'msg' => ''];
179
        }
180
        else
181
        {
182
            $error_code = unpack('v', $packet[1] . $packet[2])[1];
183
            $error_msg = '';
184
            $packetLength = strlen($packet);
185
            for ($i = 9; $i < $packetLength; $i++)
186
            {
187
                $error_msg .= $packet[$i];
188
            }
189
190
            throw new BinLogException($error_msg, $error_code);
191
        }
192
    }
193
194
    /**
195
     * @throws BinLogException
196
     */
197
    private function auth()
198
    {
199
        $data = $this->packAuth->createAuthenticationBinary(
200
            ConstCapabilityFlags::getCapabilities(),
201
            $this->config->getUser(),
202
            $this->config->getPassword(),
203
            BinLogServerInfo::getSalt()
204
        );
205
206
        $this->writeToSocket($data);
207
        $this->getPacket();
208
    }
209
210
    /**
211
     * @param string $data
212
     * @throws BinLogException
213
     */
214
    private function writeToSocket($data)
215
    {
216
        if (false === socket_write($this->socket, $data, strlen($data)))
217
        {
218
            throw new BinLogException(BinLogException::UNABLE_TO_WRITE_SOCKET . socket_strerror(socket_last_error()), socket_last_error());
219
        }
220
    }
221
222
    /**
223
     * @throws BinLogException
224
     * @throws GtidException
225
     */
226
    private function getBinlogStream()
227
    {
228
        $this->checkSum = $this->repository->isCheckSum();
229
        if (true === $this->checkSum)
230
        {
231
            $this->execute('SET @master_binlog_checksum=@@global.binlog_checksum');
232
        }
233
234
        $this->registerSlave();
235
236
        if ('' !== $this->config->getGtid())
237
        {
238
            $this->setBinLogDumpGtid();
239
        }
240
        else
241
        {
242
            $this->setBinLogDump();
243
        }
244
    }
245
246
    /**
247
     * @param string $sql
248
     * @throws BinLogException
249
     */
250
    private function execute($sql)
251
    {
252
        $chunk_size = strlen($sql) + 1;
253
        $prelude = pack('LC', $chunk_size, 0x03);
254
255
        $this->writeToSocket($prelude . $sql);
256
        $this->getPacket();
257
    }
258
259
    /**
260
     * @see https://dev.mysql.com/doc/internals/en/com-register-slave.html
261
     * @throws BinLogException
262
     */
263
    private function registerSlave()
264
    {
265
        $host = gethostname();
266
        $hostLength = strlen($host);
267
        $userLength = strlen($this->config->getUser());
268
        $passLength = strlen($this->config->getPassword());
269
270
        $prelude = pack('l', 18 + $hostLength + $userLength + $passLength);
271
        $prelude .= chr(ConstCommand::COM_REGISTER_SLAVE);
272
        $prelude .= pack('V', $this->config->getSlaveId());
273
        $prelude .= pack('C', $hostLength);
274
        $prelude .= $host;
275
        $prelude .= pack('C', $userLength);
276
        $prelude .= $this->config->getUser();
277
        $prelude .= pack('C', $passLength);
278
        $prelude .= $this->config->getPassword();
279
        $prelude .= pack('v', $this->config->getPort());
280
        $prelude .= pack('V', 0);
281
        $prelude .= pack('V', 0);
282
283
        $this->writeToSocket($prelude);
284
        $this->getPacket();
285
    }
286
287
    /**
288
     * @see https://dev.mysql.com/doc/internals/en/com-binlog-dump-gtid.html
289
     * @throws BinLogException
290
     * @throws GtidException
291
     */
292
    private function setBinLogDumpGtid()
293
    {
294
        $collection = $this->gtidService->makeCollectionFromString($this->config->getGtid());
295
296
        $prelude = pack('l', 26 + $collection->getEncodedLength()) . chr(ConstCommand::COM_BINLOG_DUMP_GTID);
297
        $prelude .= pack('S', 0);
298
        $prelude .= pack('I', $this->config->getSlaveId());
299
        $prelude .= pack('I', 3);
300
        $prelude .= chr(0);
301
        $prelude .= chr(0);
302
        $prelude .= chr(0);
303
        $prelude .= BinaryDataReader::pack64bit(4);
304
305
        $prelude .= pack('I', $collection->getEncodedLength());
306
        $prelude .= $collection->getEncoded();
307
308
        $this->writeToSocket($prelude);
309
        $this->getPacket();
310
    }
311
312
    /**
313
     * @see https://dev.mysql.com/doc/internals/en/com-binlog-dump.html
314
     * @throws BinLogException
315
     */
316
    private function setBinLogDump()
317
    {
318
        $binFilePos = $this->config->getBinLogPosition();
319
        $binFileName = $this->config->getBinLogFileName();
320
321
        if ('' !== $this->config->getMariaDbGtid())
322
        {
323
            $this->execute('SET @mariadb_slave_capability = 4');
324
            $this->execute('SET @slave_connect_state = \'' . $this->config->getMariaDbGtid() . '\'');
325
            $this->execute('SET @slave_gtid_strict_mode = 0');
326
            $this->execute('SET @slave_gtid_ignore_duplicates = 0');
327
        }
328
329
        if (0 === $binFilePos || '' === $binFileName)
330
        {
331
            $master = $this->repository->getMasterStatus();
332
            $binFilePos = $master['Position'];
333
            $binFileName = $master['File'];
334
        }
335
336
        $prelude = pack('i', strlen($binFileName) + 11) . chr(ConstCommand::COM_BINLOG_DUMP);
337
        $prelude .= pack('I', $binFilePos);
338
        $prelude .= pack('v', 0);
339
        $prelude .= pack('I', $this->config->getSlaveId());
340
        $prelude .= $binFileName;
341
342
        $this->writeToSocket($prelude);
343
        $this->getPacket();
344
    }
345
}
346