Passed
Push — master ( 49833e...35a135 )
by kacper
03:22
created

MySQLReplicationFactory   A

Complexity

Total Complexity 11

Size/Duplication

Total Lines 100
Duplicated Lines 0 %

Test Coverage

Coverage 90%

Importance

Changes 5
Bugs 2 Features 0
Metric Value
eloc 34
c 5
b 2
f 0
dl 0
loc 100
ccs 36
cts 40
cp 0.9
rs 10
wmc 11

6 Methods

Rating   Name   Duplication   Size   Complexity  
A unregisterSubscriber() 0 3 1
A registerSubscriber() 0 3 1
A run() 0 4 2
A getDbConnection() 0 3 1
A consume() 0 3 1
A __construct() 0 43 5
1
<?php
2
declare(strict_types=1);
3
4
namespace MySQLReplication;
5
6
use Doctrine\DBAL\Connection;
7
use Doctrine\DBAL\DBALException;
8
use Doctrine\DBAL\DriverManager;
9
use MySQLReplication\BinaryDataReader\BinaryDataReaderException;
10
use MySQLReplication\BinLog\BinLogException;
11
use MySQLReplication\BinLog\BinLogSocketConnect;
12
use MySQLReplication\Cache\ArrayCache;
13
use MySQLReplication\Config\Config;
14
use MySQLReplication\Config\ConfigException;
15
use MySQLReplication\Event\Event;
16
use MySQLReplication\Event\RowEvent\RowEventFactory;
17
use MySQLReplication\Exception\MySQLReplicationException;
18
use MySQLReplication\Gtid\GtidException;
19
use MySQLReplication\JsonBinaryDecoder\JsonBinaryDecoderException;
20
use MySQLReplication\Repository\MySQLRepository;
21
use MySQLReplication\Repository\RepositoryInterface;
22
use MySQLReplication\Socket\Socket;
23
use MySQLReplication\Socket\SocketException;
24
use MySQLReplication\Socket\SocketInterface;
25
use Psr\SimpleCache\CacheInterface;
26
use Psr\SimpleCache\InvalidArgumentException;
27
use Symfony\Component\EventDispatcher\EventDispatcher;
28
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
29
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
30
31
class MySQLReplicationFactory
32
{
33
    private $connection;
34
    private $eventDispatcher;
35
    private $event;
36
37
    /**
38
     * @throws BinLogException
39
     * @throws ConfigException
40
     * @throws DBALException
41
     * @throws SocketException
42
     * @throws GtidException
43
     */
44 58
    public function __construct(
45
        Config $config,
46
        RepositoryInterface $repository = null,
47
        CacheInterface $cache = null,
48
        EventDispatcherInterface $eventDispatcher = null,
49
        SocketInterface $socket = null
50
    ) {
51 58
        $config::validate();
52
53 58
        if (null === $repository) {
54 58
            $this->connection = DriverManager::getConnection(
55
                [
56 58
                    'user' => Config::getUser(),
57 58
                    'password' => Config::getPassword(),
58 58
                    'host' => Config::getHost(),
59 58
                    'port' => Config::getPort(),
60 58
                    'driver' => 'pdo_mysql',
61 58
                    'charset' => Config::getCharset()
62
                ]
63
            );
64 58
            $repository = new MySQLRepository($this->connection);
65
        }
66 58
        if (null === $cache) {
67 58
            $cache = new ArrayCache();
68
        }
69
70 58
        $this->eventDispatcher = $eventDispatcher ?: new EventDispatcher();
71
72 58
        if (null === $socket) {
73 58
            $socket = new Socket();
74
        }
75
76 58
        $this->event = new Event(
77 58
            new BinLogSocketConnect(
78 58
                $repository,
79 58
                $socket
80
            ),
81 58
            new RowEventFactory(
82 58
                $repository,
83 58
                $cache
84
            ),
85 58
            $this->eventDispatcher,
86 58
            $cache
87
        );
88 58
    }
89
90 58
    public function registerSubscriber(EventSubscriberInterface $eventSubscribers): void
91
    {
92 58
        $this->eventDispatcher->addSubscriber($eventSubscribers);
93 58
    }
94
95 58
    public function unregisterSubscriber(EventSubscriberInterface $eventSubscribers): void
96
    {
97 58
        $this->eventDispatcher->removeSubscriber($eventSubscribers);
98 58
    }
99
100 58
    public function getDbConnection(): Connection
101
    {
102 58
        return $this->connection;
103
    }
104
105
    /**
106
     * @throws SocketException
107
     * @throws JsonBinaryDecoderException
108
     * @throws BinaryDataReaderException
109
     * @throws BinLogException
110
     * @throws InvalidArgumentException
111
     * @throws MySQLReplicationException
112
     */
113
    public function run(): void
114
    {
115
        while (1) {
116
            $this->consume();
117
        }
118
    }
119
120
    /**
121
     * @throws MySQLReplicationException
122
     * @throws InvalidArgumentException
123
     * @throws BinLogException
124
     * @throws BinaryDataReaderException
125
     * @throws JsonBinaryDecoderException
126
     * @throws SocketException
127
     */
128 58
    public function consume(): void
129
    {
130 58
        $this->event->consume();
131
    }
132
}