Passed
Branch master (c44613)
by Moln
05:00
created

MySQLReplicationFactory::getBinLogServerInfo()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 1
nc 1
nop 0
dl 0
loc 3
ccs 2
cts 2
cp 1
crap 1
rs 10
c 0
b 0
f 0
1
<?php
2
declare(strict_types=1);
3
4
namespace MySQLReplication;
5
6
use Doctrine\DBAL\Connection;
7
use Doctrine\DBAL\Exception;
8
use Doctrine\DBAL\DriverManager;
9
use MySQLReplication\BinaryDataReader\BinaryDataReaderException;
10
use MySQLReplication\BinLog\BinLogException;
11
use MySQLReplication\BinLog\BinLogServerInfo;
12
use MySQLReplication\BinLog\BinLogSocketConnect;
13
use MySQLReplication\Cache\ArrayCache;
14
use MySQLReplication\Config\Config;
15
use MySQLReplication\Config\ConfigException;
16
use MySQLReplication\Event\Event;
17
use MySQLReplication\Event\RowEvent\RowEventFactory;
18
use MySQLReplication\Exception\MySQLReplicationException;
19
use MySQLReplication\Gtid\GtidException;
20
use MySQLReplication\JsonBinaryDecoder\JsonBinaryDecoderException;
21
use MySQLReplication\Repository\MySQLRepository;
22
use MySQLReplication\Repository\RepositoryInterface;
23
use MySQLReplication\Socket\Socket;
24
use MySQLReplication\Socket\SocketException;
25
use MySQLReplication\Socket\SocketInterface;
26
use Psr\Log\LoggerInterface;
27
use Psr\Log\NullLogger;
28
use Psr\SimpleCache\CacheInterface;
29
use Psr\SimpleCache\InvalidArgumentException;
30
use Symfony\Component\EventDispatcher\EventDispatcher;
31
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
32
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
33
34
class MySQLReplicationFactory
35
{
36
    private $connection;
37
    private $eventDispatcher;
38
    private $event;
39
    private $binLogServerConnect;
40
    /**
41
     * @var Config
42
     */
43
    private $config;
44
    /**
45
     * @var LoggerInterface
46
     */
47
    private $logger;
48
    /**
49
     * @var SocketInterface
50
     */
51
    private $socket;
52
53
    /**
54
     * @throws BinLogException
55
     * @throws ConfigException
56
     * @throws Exception
57
     * @throws SocketException
58
     * @throws GtidException
59
     */
60 62
    public function __construct(
61
        Config $config,
62
        ?RepositoryInterface $repository = null,
63
        ?CacheInterface $cache = null,
64
        ?EventDispatcherInterface $eventDispatcher = null,
65
        ?SocketInterface $socket = null,
66
        ?LoggerInterface $logger = null
67
    ) {
68 62
        $this->config = $config;
69 62
        $this->logger = $logger ?? new NullLogger();
70 62
        $config->validate();
71
72 62
        if (null === $repository) {
73 62
            $this->connection = DriverManager::getConnection(
74
                [
75 62
                    'user' => $config->getUser(),
76 62
                    'password' => $config->getPassword(),
77 62
                    'host' => $config->getHost(),
78 62
                    'port' => $config->getPort(),
79
                    'driver' => 'pdo_mysql',
80 62
                    'charset' => $config->getCharset()
81
                ]
82
            );
83 62
            $repository = new MySQLRepository($this->connection);
84
        }
85 62
        if (null === $cache) {
86 62
            $cache = new ArrayCache($config->getTableCacheSize());
87
        }
88
89 62
        $this->eventDispatcher = $eventDispatcher ?: new EventDispatcher();
90
91 62
        if (null === $socket) {
92 62
            $socket = new Socket();
93
        }
94
95 62
        $this->binLogServerConnect = new BinLogSocketConnect(
96
            $config,
97
            $repository,
98
            $socket
99
        );
100
101 62
        $this->event = new Event(
102
            $config,
103 62
            $this->binLogServerConnect,
104 62
            new RowEventFactory(
105
                $config,
106
                $repository,
107
                $cache
108
            ),
109 62
            $this->eventDispatcher,
110
            $cache
111
        );
112
113 62
        $this->socket = $socket;
114 62
        $this->connect();
115
    }
116
117 62
    public function connect()
118
    {
119 62
        $this->event->connect();
120 62
        $this->logger->info("[MysqlReplication] Connected.");
121
    }
122
123 5
    public function getBinLogServerInfo(): BinLogServerInfo
124
    {
125 5
        return $this->binLogServerConnect->getBinLogServerInfo();
126
    }
127
128 62
    public function registerSubscriber(EventSubscriberInterface $eventSubscribers): void
129
    {
130 62
        $this->eventDispatcher->addSubscriber($eventSubscribers);
131
    }
132
133 62
    public function unregisterSubscriber(EventSubscriberInterface $eventSubscribers): void
134
    {
135 62
        $this->eventDispatcher->removeSubscriber($eventSubscribers);
136
    }
137
138 62
    public function getDbConnection(): Connection
139
    {
140 62
        return $this->connection;
141
    }
142
143
    /**
144
     * @throws SocketException
145
     * @throws JsonBinaryDecoderException
146
     * @throws BinaryDataReaderException
147
     * @throws BinLogException
148
     * @throws InvalidArgumentException
149
     * @throws MySQLReplicationException
150
     */
151
    public function run(): void
152
    {
153
        $retryNum = $this->config->getRetry();
154
        while (1) {
155
            if (! $this->socket->isConnected()) {
156
                try {
157
                    if ($retryNum) {
158
                        $retryNum--;
159
                        $this->connect();
160
                        $retryNum = $this->config->getRetry();
161
                    } else {
162
                        break;
163
                    }
164
                } catch (\Throwable $e) {
165
                    $this->logger->warning("[MysqlReplication] Connect error:" . $e->getMessage(), ['exception' => $e]);
166
                    if (method_exists($this->socket, 'close')) {
167
                        $this->socket->close();
168
                    }
169
                    sleep(1);
170
                    continue;
171
                }
172
            }
173
174
            try {
175
                while (1) {
176
                    $this->consume();
177
                }
178
            } catch (SocketException $e) {
179
                $this->logger->warning("[MysqlReplication] Connection lost: " . $e->getMessage(), ['exception' => $e]);
180
181
                if (method_exists($this->socket, 'close')) {
182
                    $this->socket->close();
183
                }
184
            }
185
        }
186
    }
187
188
    /**
189
     * @throws MySQLReplicationException
190
     * @throws InvalidArgumentException
191
     * @throws BinLogException
192
     * @throws BinaryDataReaderException
193
     * @throws JsonBinaryDecoderException
194
     * @throws SocketException
195
     */
196 62
    public function consume(): void
197
    {
198 62
        $this->event->consume();
199
    }
200
}
201