Test Failed
Pull Request — master (#4)
by Moln
03:04
created

MySQLReplicationFactory::__construct()   A

Complexity

Conditions 5
Paths 8

Size

Total Lines 55
Code Lines 32

Duplication

Lines 0
Ratio 0 %

Importance

Changes 2
Bugs 0 Features 0
Metric Value
cc 5
eloc 32
c 2
b 0
f 0
nc 8
nop 6
dl 0
loc 55
rs 9.0968

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
declare(strict_types=1);
3
4
namespace MySQLReplication;
5
6
use Doctrine\DBAL\Connection;
7
use Doctrine\DBAL\Exception;
8
use Doctrine\DBAL\DriverManager;
9
use MySQLReplication\BinaryDataReader\BinaryDataReaderException;
10
use MySQLReplication\BinLog\BinLogException;
11
use MySQLReplication\BinLog\BinLogServerInfo;
12
use MySQLReplication\BinLog\BinLogSocketConnect;
13
use MySQLReplication\Cache\ArrayCache;
14
use MySQLReplication\Config\Config;
15
use MySQLReplication\Config\ConfigException;
16
use MySQLReplication\Event\Event;
17
use MySQLReplication\Event\RowEvent\RowEventFactory;
18
use MySQLReplication\Exception\MySQLReplicationException;
19
use MySQLReplication\Gtid\GtidException;
20
use MySQLReplication\JsonBinaryDecoder\JsonBinaryDecoderException;
21
use MySQLReplication\Repository\MySQLRepository;
22
use MySQLReplication\Repository\RepositoryInterface;
23
use MySQLReplication\Socket\Socket;
24
use MySQLReplication\Socket\SocketException;
25
use MySQLReplication\Socket\SocketInterface;
26
use Psr\Log\LoggerInterface;
27
use Psr\Log\NullLogger;
28
use Psr\SimpleCache\CacheInterface;
29
use Psr\SimpleCache\InvalidArgumentException;
30
use Symfony\Component\EventDispatcher\EventDispatcher;
31
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
32
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
33
34
class MySQLReplicationFactory
35
{
36
    private $connection;
37
    private $eventDispatcher;
38
    private $event;
39
    private $binLogServerConnect;
40
    /**
41
     * @var Config
42
     */
43
    private $config;
44
    /**
45
     * @var LoggerInterface
46
     */
47
    private $logger;
48
    /**
49
     * @var SocketInterface
50
     */
51
    private $socket;
52
53
    /**
54
     * @throws BinLogException
55
     * @throws ConfigException
56
     * @throws Exception
57
     * @throws SocketException
58
     * @throws GtidException
59
     */
60
    public function __construct(
61
        Config $config,
62
        ?RepositoryInterface $repository = null,
63
        ?CacheInterface $cache = null,
64
        ?EventDispatcherInterface $eventDispatcher = null,
65
        ?SocketInterface $socket = null,
66
        ?LoggerInterface $logger = null
67
    ) {
68
        $this->config = $config;
69
        $this->logger = $logger ?? new NullLogger();
70
        $config->validate();
71
72
        if (null === $repository) {
73
            $this->connection = DriverManager::getConnection(
74
                [
75
                    'user' => $config->getUser(),
76
                    'password' => $config->getPassword(),
77
                    'host' => $config->getHost(),
78
                    'port' => $config->getPort(),
79
                    'driver' => 'pdo_mysql',
80
                    'charset' => $config->getCharset()
81
                ]
82
            );
83
            $repository = new MySQLRepository($this->connection);
84
        }
85
        if (null === $cache) {
86
            $cache = new ArrayCache($config->getTableCacheSize());
87
        }
88
89
        $this->eventDispatcher = $eventDispatcher ?: new EventDispatcher();
90
91
        if (null === $socket) {
92
            $socket = new Socket();
93
        }
94
95
        $this->binLogServerConnect = new BinLogSocketConnect(
96
            $config,
97
            $repository,
98
            $socket
99
        );
100
101
        $this->event = new Event(
102
            $config,
103
            $this->binLogServerConnect,
104
            new RowEventFactory(
105
                $config,
106
                $repository,
107
                $cache
108
            ),
109
            $this->eventDispatcher,
110
            $cache
111
        );
112
113
        $this->socket = $socket;
114
        $this->connect();
115
    }
116
117
    public function connect()
118
    {
119
        $this->event->connect();
120
        $this->logger->info("[MysqlReplication] Connected.");
121
    }
122
123
    public function getBinLogServerInfo(): BinLogServerInfo
124
    {
125
        return $this->binLogServerConnect->getBinLogServerInfo();
126
    }
127
128
    public function registerSubscriber(EventSubscriberInterface $eventSubscribers): void
129
    {
130
        $this->eventDispatcher->addSubscriber($eventSubscribers);
131
    }
132
133
    public function unregisterSubscriber(EventSubscriberInterface $eventSubscribers): void
134
    {
135
        $this->eventDispatcher->removeSubscriber($eventSubscribers);
136
    }
137
138
    public function getDbConnection(): Connection
139
    {
140
        return $this->connection;
141
    }
142
143
    /**
144
     * @throws SocketException
145
     * @throws JsonBinaryDecoderException
146
     * @throws BinaryDataReaderException
147
     * @throws BinLogException
148
     * @throws InvalidArgumentException
149
     * @throws MySQLReplicationException
150
     */
151
    public function run(): void
152
    {
153
        $retryNum = $this->config->getRetry();
154
        while (1) {
155
            if (! $this->socket->isConnected()) {
156
                try {
157
                    if ($retryNum) {
158
                        $retryNum--;
159
                        $this->connect();
160
                        $retryNum = $this->config->getRetry();
161
                    } else {
162
                        break;
163
                    }
164
                } catch (\Throwable $e) {
165
                    $this->logger->warning("[MysqlReplication] Connect error:" . $e->getMessage(), ['exception' => $e]);
166
                    $this->socket->close();
0 ignored issues
show
Bug introduced by
The method close() does not exist on MySQLReplication\Socket\SocketInterface. Since it exists in all sub-types, consider adding an abstract or default implementation to MySQLReplication\Socket\SocketInterface. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

166
                    $this->socket->/** @scrutinizer ignore-call */ 
167
                                   close();
Loading history...
167
                    sleep(1);
168
                    continue;
169
                }
170
            }
171
172
            try {
173
                while (1) {
174
                    $this->consume();
175
                }
176
            } catch (SocketException $e) {
177
                $this->logger->warning("[MysqlReplication] Connection lost: " . $e->getMessage(), ['exception' => $e]);
178
                $this->socket->close();
179
            }
180
        }
181
    }
182
183
    /**
184
     * @throws MySQLReplicationException
185
     * @throws InvalidArgumentException
186
     * @throws BinLogException
187
     * @throws BinaryDataReaderException
188
     * @throws JsonBinaryDecoderException
189
     * @throws SocketException
190
     */
191
    public function consume(): void
192
    {
193
        $this->event->consume();
194
    }
195
}
196