Passed
Pull Request — master (#19)
by
unknown
06:08 queued 02:43
created

Event::createEventInfo()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 11
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 10
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 9
nc 1
nop 1
dl 0
loc 11
ccs 10
cts 10
cp 1
crap 1
rs 9.9666
c 0
b 0
f 0
1
<?php
2
declare(strict_types=1);
3
4
namespace MySQLReplication\Event;
5
6
use MySQLReplication\BinaryDataReader\BinaryDataReader;
7
use MySQLReplication\BinaryDataReader\BinaryDataReaderException;
8
use MySQLReplication\BinLog\BinLogException;
9
use MySQLReplication\BinLog\BinLogSocketConnect;
10
use MySQLReplication\Config\Config;
11
use MySQLReplication\Definitions\ConstEventType;
12
use MySQLReplication\Event\DTO\EventDTO;
13
use MySQLReplication\Event\DTO\FormatDescriptionEventDTO;
14
use MySQLReplication\Event\DTO\HeartbeatDTO;
15
use MySQLReplication\Event\DTO\QueryDTO;
16
use MySQLReplication\Event\RowEvent\RowEventFactory;
17
use MySQLReplication\Exception\MySQLReplicationException;
18
use MySQLReplication\JsonBinaryDecoder\JsonBinaryDecoderException;
19
use MySQLReplication\Socket\SocketException;
20
use Psr\SimpleCache\CacheInterface;
21
use Psr\SimpleCache\InvalidArgumentException;
22
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
23
24
class Event
25
{
26
    private const MARIADB_DUMMY_QUERY = '# Dum';
27
    private const EOF_HEADER_VALUE = 254;
28
29
    private $binLogSocketConnect;
30
    private $rowEventFactory;
31
    private $eventDispatcher;
32
    private $cache;
33
    /**
34
     * @var Config
35
     */
36
    private $config;
37
38 58
    public function __construct(
39
        Config $config,
40
        BinLogSocketConnect $binLogSocketConnect,
41
        RowEventFactory $rowEventFactory,
42
        EventDispatcherInterface $eventDispatcher,
43
        CacheInterface $cache
44
    ) {
45 58
        $this->config = $config;
46 58
        $this->binLogSocketConnect = $binLogSocketConnect;
47 58
        $this->rowEventFactory = $rowEventFactory;
48 58
        $this->eventDispatcher = $eventDispatcher;
49 58
        $this->cache = $cache;
50
    }
51
52 58
    public function connect(): void
53
    {
54 58
        $this->binLogSocketConnect->connect();
55
    }
56
57
    /**
58
     * @throws BinaryDataReaderException
59
     * @throws BinLogException
60
     * @throws MySQLReplicationException
61
     * @throws JsonBinaryDecoderException
62
     * @throws InvalidArgumentException
63
     * @throws SocketException
64
     */
65 58
    public function consume(): void
66
    {
67 58
        $binLogServerInfo = $this->binLogSocketConnect->getBinLogServerInfo();
68 58
        $binaryDataReader = new BinaryDataReader($this->binLogSocketConnect->getResponse());
69
70
        // check EOF_Packet -> https://dev.mysql.com/doc/internals/en/packet-EOF_Packet.html
71 58
        if (self::EOF_HEADER_VALUE === $binaryDataReader->readUInt8()) {
72
            return;
73
        }
74
75
        // decode all events data
76 58
        $eventInfo = $this->createEventInfo($binaryDataReader);
77
78 58
        $eventDTO = null;
79
80
        // we always need this events to clean table maps and for BinLogCurrent class to keep track of binlog position
81
        // always parse table map event but propagate when needed (we need this for creating table cache)
82 58
        if (ConstEventType::TABLE_MAP_EVENT === $eventInfo->getType()) {
83 54
            $eventDTO = $this->rowEventFactory->makeRowEvent($binaryDataReader, $eventInfo)->makeTableMapDTO();
84 58
        } elseif (ConstEventType::ROTATE_EVENT === $eventInfo->getType()) {
85 1
            $this->cache->clear();
86 1
            $eventDTO = (new RotateEvent($binLogServerInfo, $eventInfo, $binaryDataReader))->makeRotateEventDTO();
87 58
        } elseif (ConstEventType::GTID_LOG_EVENT === $eventInfo->getType()) {
88
            $eventDTO = (new GtidEvent($eventInfo, $binaryDataReader))->makeGTIDLogDTO();
89 58
        } elseif (ConstEventType::HEARTBEAT_LOG_EVENT === $eventInfo->getType()) {
90
            $eventDTO = new HeartbeatDTO($eventInfo);
91 58
        } elseif (ConstEventType::MARIA_GTID_EVENT === $eventInfo->getType()) {
92
            $eventDTO = (new MariaDbGtidEvent($eventInfo, $binaryDataReader))->makeMariaDbGTIDLogDTO();
93 58
        } elseif (ConstEventType::MARIA_GTID_LIST_EVENT === $eventInfo->getType()) {
94
            $eventDTO = (new MariaDbGtidListEvent($eventInfo, $binaryDataReader))->makeMariaDbGTIDListDTO();
95
        }
96
97
        // check for ignore and permitted events
98 58
        if (! $this->config->checkEvent($eventInfo->getType())) {
99 4
            return;
100
        }
101
102 58
        if (in_array($eventInfo->getType(), [ConstEventType::UPDATE_ROWS_EVENT_V1, ConstEventType::UPDATE_ROWS_EVENT_V2], true)) {
103 1
            $eventDTO = $this->rowEventFactory->makeRowEvent($binaryDataReader, $eventInfo)->makeUpdateRowsDTO();
104 58
        } elseif (in_array($eventInfo->getType(), [ConstEventType::WRITE_ROWS_EVENT_V1, ConstEventType::WRITE_ROWS_EVENT_V2], true)) {
105 53
            $eventDTO = $this->rowEventFactory->makeRowEvent($binaryDataReader, $eventInfo)->makeWriteRowsDTO();
106 58
        } elseif (in_array($eventInfo->getType(), [ConstEventType::DELETE_ROWS_EVENT_V1, ConstEventType::DELETE_ROWS_EVENT_V2], true)) {
107 1
            $eventDTO = $this->rowEventFactory->makeRowEvent($binaryDataReader, $eventInfo)->makeDeleteRowsDTO();
108 58
        } elseif (ConstEventType::XID_EVENT === $eventInfo->getType()) {
109 3
            $eventDTO = (new XidEvent($eventInfo, $binaryDataReader))->makeXidDTO();
110 58
        } elseif (ConstEventType::QUERY_EVENT === $eventInfo->getType()) {
111 58
            $eventDTO = $this->filterDummyMariaDbEvents((new QueryEvent($eventInfo, $binaryDataReader))->makeQueryDTO());
112 58
        } elseif (ConstEventType::FORMAT_DESCRIPTION_EVENT === $eventInfo->getType()) {
113 58
            $eventDTO = new FormatDescriptionEventDTO($eventInfo);
114
        }
115
116 58
        $this->dispatch($eventDTO);
117
    }
118
119 58
    private function createEventInfo(BinaryDataReader $binaryDataReader): EventInfo
120
    {
121 58
        return new EventInfo(
122 58
            $binaryDataReader->readInt32(),
123 58
            $binaryDataReader->readUInt8(),
124 58
            $binaryDataReader->readInt32(),
125 58
            $binaryDataReader->readInt32(),
126 58
            $binaryDataReader->readInt32(),
127 58
            $binaryDataReader->readUInt16(),
128 58
            $this->binLogSocketConnect->getCheckSum(),
129 58
            $this->binLogSocketConnect->getBinLogCurrent()
130 58
        );
131
    }
132
133 58
    private function filterDummyMariaDbEvents(QueryDTO $queryDTO): ?QueryDTO
134
    {
135 58
        if ($this->binLogSocketConnect->getBinLogServerInfo()->isMariaDb() &&
136 58
            false !== strpos($queryDTO->getQuery(), self::MARIADB_DUMMY_QUERY)) {
137
            return null;
138
        }
139
140 58
        return $queryDTO;
141
    }
142
143 58
    private function dispatch(EventDTO $eventDTO = null): void
144
    {
145 58
        if (null !== $eventDTO) {
146 58
            $this->eventDispatcher->dispatch($eventDTO, $eventDTO->getType());
147
        }
148
    }
149
}
150