Passed
Pull Request — master (#8)
by Moln
05:05
created

MySQLReplicationFactory::connect()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 2
nc 1
nop 0
dl 0
loc 4
ccs 3
cts 3
cp 1
crap 1
rs 10
c 0
b 0
f 0
1
<?php
2
declare(strict_types=1);
3
4
namespace MySQLReplication;
5
6
use Doctrine\DBAL\Connection;
7
use Doctrine\DBAL\Exception;
8
use Doctrine\DBAL\DriverManager;
9
use MySQLReplication\BinaryDataReader\BinaryDataReaderException;
10
use MySQLReplication\BinLog\BinLogException;
11
use MySQLReplication\BinLog\BinLogServerInfo;
12
use MySQLReplication\BinLog\BinLogSocketConnect;
13
use MySQLReplication\Cache\ArrayCache;
14
use MySQLReplication\Config\Config;
15
use MySQLReplication\Config\ConfigException;
16
use MySQLReplication\Event\Event;
17
use MySQLReplication\Event\RowEvent\RowEventFactory;
18
use MySQLReplication\Exception\MySQLReplicationException;
19
use MySQLReplication\Gtid\GtidException;
20
use MySQLReplication\JsonBinaryDecoder\JsonBinaryDecoderException;
21
use MySQLReplication\Repository\MySQLRepository;
22
use MySQLReplication\Repository\RepositoryInterface;
23
use MySQLReplication\Socket\Socket;
24
use MySQLReplication\Socket\SocketException;
25
use MySQLReplication\Socket\SocketInterface;
26
use Psr\Log\LoggerInterface;
27
use Psr\Log\NullLogger;
28
use Psr\SimpleCache\CacheInterface;
29
use Psr\SimpleCache\InvalidArgumentException;
30
use Symfony\Component\EventDispatcher\EventDispatcher;
31
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
32
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
33
34
class MySQLReplicationFactory
35
{
36
    private $connection;
37
    private $eventDispatcher;
38
    private $event;
39
    private $binLogServerConnect;
40
    /**
41
     * @var Config
42
     */
43
    private $config;
44
    /**
45
     * @var LoggerInterface
46
     */
47
    private $logger;
48
    /**
49
     * @var SocketInterface
50
     */
51
    private $socket;
52
53
    /**
54
     * @throws BinLogException
55
     * @throws ConfigException
56
     * @throws Exception
57
     * @throws SocketException
58
     * @throws GtidException
59
     */
60 58
    public function __construct(
61
        Config $config,
62
        ?RepositoryInterface $repository = null,
63
        ?CacheInterface $cache = null,
64
        ?EventDispatcherInterface $eventDispatcher = null,
65
        ?SocketInterface $socket = null,
66
        ?LoggerInterface $logger = null
67
    ) {
68 58
        $this->config = $config;
69 58
        $this->logger = $logger ?? new NullLogger();
70 58
        $config->validate();
71
72 58
        if (null === $repository) {
73 58
            $this->connection = DriverManager::getConnection(
74 58
                [
75 58
                    'user' => $config->getUser(),
76 58
                    'password' => $config->getPassword(),
77 58
                    'host' => $config->getHost(),
78 58
                    'port' => $config->getPort(),
79 58
                    'driver' => 'pdo_mysql',
80 58
                    'charset' => $config->getCharset()
81 58
                ]
82 58
            );
83 58
            $repository = new MySQLRepository($this->connection);
84
        }
85 58
        if (null === $cache) {
86 58
            $cache = new ArrayCache($config->getTableCacheSize());
87
        }
88
89 58
        $this->eventDispatcher = $eventDispatcher ?: new EventDispatcher();
90
91 58
        if (null === $socket) {
92 58
            $socket = new Socket();
93
        }
94
95 58
        $this->binLogServerConnect = new BinLogSocketConnect(
96 58
            $config,
97 58
            $repository,
98 58
            $socket
99 58
        );
100
101 58
        $this->event = new Event(
102 58
            $config,
103 58
            $this->binLogServerConnect,
104 58
            new RowEventFactory(
105 58
                $config,
106 58
                $repository,
107 58
                $cache
108 58
            ),
109 58
            $this->eventDispatcher,
110 58
            $cache
111 58
        );
112
113 58
        $this->socket = $socket;
114 58
        $this->connect();
115
    }
116
117 58
    public function connect()
118
    {
119 58
        $this->event->connect();
120 58
        $this->logger->info("[MysqlReplication] Connected.");
121
    }
122
123 1
    public function getBinLogServerInfo(): BinLogServerInfo
124
    {
125 1
        return $this->binLogServerConnect->getBinLogServerInfo();
126
    }
127
128 58
    public function registerSubscriber(EventSubscriberInterface $eventSubscribers): void
129
    {
130 58
        $this->eventDispatcher->addSubscriber($eventSubscribers);
131
    }
132
133 58
    public function unregisterSubscriber(EventSubscriberInterface $eventSubscribers): void
134
    {
135 58
        $this->eventDispatcher->removeSubscriber($eventSubscribers);
136
    }
137
138 58
    public function getDbConnection(): Connection
139
    {
140 58
        return $this->connection;
141
    }
142
143
    /**
144
     * @throws SocketException
145
     * @throws JsonBinaryDecoderException
146
     * @throws BinaryDataReaderException
147
     * @throws BinLogException
148
     * @throws InvalidArgumentException
149
     * @throws MySQLReplicationException
150
     */
151
    public function run(): void
152
    {
153
        $retryNum = $this->config->getRetry();
154
        while (1) {
155
            if (! $this->socket->isConnected()) {
156
                try {
157
                    if ($retryNum) {
158
                        $retryNum--;
159
                        $this->connect();
160
                        $retryNum = $this->config->getRetry();
161
                    } else {
162
                        break;
163
                    }
164
                } catch (\Throwable $e) {
165
                    $this->logger->warning("[MysqlReplication] Connect error:" . $e->getMessage(), ['exception' => $e]);
166
                    if (method_exists($this->socket, 'close')) {
167
                        $this->socket->close();
168
                    }
169
                    sleep(1);
170
                    continue;
171
                }
172
            }
173
174
            try {
175
                while (1) {
176
                    $this->consume();
177
                }
178
            } catch (SocketException $e) {
179
                $this->logger->warning("[MysqlReplication] Connection lost: " . $e->getMessage(), ['exception' => $e]);
180
181
                if (method_exists($this->socket, 'close')) {
182
                    $this->socket->close();
183
                }
184
            }
185
        }
186
    }
187
188
    /**
189
     * @throws MySQLReplicationException
190
     * @throws InvalidArgumentException
191
     * @throws BinLogException
192
     * @throws BinaryDataReaderException
193
     * @throws JsonBinaryDecoderException
194
     * @throws SocketException
195
     */
196 58
    public function consume(): void
197
    {
198 58
        $this->event->consume();
199
    }
200
}
201