Passed
Branch master (c44613)
by Moln
05:00
created

Event::createEventInfo()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 11
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 10
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 9
nc 1
nop 1
dl 0
loc 11
ccs 10
cts 10
cp 1
crap 1
rs 9.9666
c 0
b 0
f 0
1
<?php
2
declare(strict_types=1);
3
4
namespace MySQLReplication\Event;
5
6
use MySQLReplication\BinaryDataReader\BinaryDataReader;
7
use MySQLReplication\BinaryDataReader\BinaryDataReaderException;
8
use MySQLReplication\BinLog\BinLogException;
9
use MySQLReplication\BinLog\BinLogSocketConnect;
10
use MySQLReplication\Config\Config;
11
use MySQLReplication\Definitions\ConstEventType;
12
use MySQLReplication\Event\DTO\EventDTO;
13
use MySQLReplication\Event\DTO\FormatDescriptionEventDTO;
14
use MySQLReplication\Event\DTO\HeartbeatDTO;
15
use MySQLReplication\Event\DTO\QueryDTO;
16
use MySQLReplication\Event\RowEvent\RowEventFactory;
17
use MySQLReplication\Exception\MySQLReplicationException;
18
use MySQLReplication\JsonBinaryDecoder\JsonBinaryDecoderException;
19
use MySQLReplication\Socket\SocketException;
20
use Psr\SimpleCache\CacheInterface;
21
use Psr\SimpleCache\InvalidArgumentException;
22
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
23
24
class Event
25
{
26
    private const MARIADB_DUMMY_QUERY = '# Dum';
27
    private const EOF_HEADER_VALUE = 254;
28
29
    private $binLogSocketConnect;
30
    private $rowEventFactory;
31
    private $eventDispatcher;
32
    private $cache;
33
    /**
34
     * @var Config
35
     */
36
    private $config;
37
38 62
    public function __construct(
39
        Config $config,
40
        BinLogSocketConnect $binLogSocketConnect,
41
        RowEventFactory $rowEventFactory,
42
        EventDispatcherInterface $eventDispatcher,
43
        CacheInterface $cache
44
    ) {
45 62
        $this->config = $config;
46 62
        $this->binLogSocketConnect = $binLogSocketConnect;
47 62
        $this->rowEventFactory = $rowEventFactory;
48 62
        $this->eventDispatcher = $eventDispatcher;
49 62
        $this->cache = $cache;
50
    }
51
52 62
    public function connect(): void
53
    {
54 62
        $this->binLogSocketConnect->connect();
55
    }
56
57
    /**
58
     * @throws BinaryDataReaderException
59
     * @throws BinLogException
60
     * @throws MySQLReplicationException
61
     * @throws JsonBinaryDecoderException
62
     * @throws InvalidArgumentException
63
     * @throws SocketException
64
     */
65 62
    public function consume(): void
66
    {
67 62
        $binLogServerInfo = $this->binLogSocketConnect->getBinLogServerInfo();
68 62
        $binaryDataReader = new BinaryDataReader($this->binLogSocketConnect->getResponse());
69
70
        // check EOF_Packet -> https://dev.mysql.com/doc/internals/en/packet-EOF_Packet.html
71 62
        if (self::EOF_HEADER_VALUE === $binaryDataReader->readUInt8()) {
72
            return;
73
        }
74
75
        // decode all events data
76 62
        $eventInfo = $this->createEventInfo($binaryDataReader);
77
78 62
        $eventDTO = null;
79
80
        // we always need this events to clean table maps and for BinLogCurrent class to keep track of binlog position
81
        // always parse table map event but propagate when needed (we need this for creating table cache)
82 62
        if (ConstEventType::TABLE_MAP_EVENT === $eventInfo->getType()) {
83 58
            $eventDTO = $this->rowEventFactory->makeRowEvent($binaryDataReader, $eventInfo)->makeTableMapDTO();
84 62
        } else if (ConstEventType::ROTATE_EVENT === $eventInfo->getType()) {
85 1
            $this->cache->clear();
86 1
            $eventDTO = (new RotateEvent($binLogServerInfo, $eventInfo, $binaryDataReader))->makeRotateEventDTO();
87 62
        } else if (ConstEventType::GTID_LOG_EVENT === $eventInfo->getType()) {
88
            $eventDTO = (new GtidEvent($eventInfo, $binaryDataReader))->makeGTIDLogDTO();
89 62
        } else if (ConstEventType::HEARTBEAT_LOG_EVENT === $eventInfo->getType()) {
90
            $eventDTO = new HeartbeatDTO($eventInfo);
91 62
        } else if (ConstEventType::MARIA_GTID_EVENT === $eventInfo->getType()) {
92
            $eventDTO = (new MariaDbGtidEvent($eventInfo, $binaryDataReader))->makeMariaDbGTIDLogDTO();
93
        }
94
95
        // check for ignore and permitted events
96 62
        if (!$this->config->checkEvent($eventInfo->getType())) {
97 4
            return;
98
        }
99
100 62
        if (in_array($eventInfo->getType(), [ConstEventType::UPDATE_ROWS_EVENT_V1, ConstEventType::UPDATE_ROWS_EVENT_V2], true)) {
101 4
            $eventDTO = $this->rowEventFactory->makeRowEvent($binaryDataReader, $eventInfo)->makeUpdateRowsDTO();
102 62
        } else if (in_array($eventInfo->getType(), [ConstEventType::WRITE_ROWS_EVENT_V1, ConstEventType::WRITE_ROWS_EVENT_V2], true)) {
103 57
            $eventDTO = $this->rowEventFactory->makeRowEvent($binaryDataReader, $eventInfo)->makeWriteRowsDTO();
104 62
        } else if (in_array($eventInfo->getType(), [ConstEventType::DELETE_ROWS_EVENT_V1, ConstEventType::DELETE_ROWS_EVENT_V2], true)) {
105 1
            $eventDTO = $this->rowEventFactory->makeRowEvent($binaryDataReader, $eventInfo)->makeDeleteRowsDTO();
106 62
        } else if (ConstEventType::XID_EVENT === $eventInfo->getType()) {
107 6
            $eventDTO = (new XidEvent($eventInfo, $binaryDataReader))->makeXidDTO();
108 62
        } else if (ConstEventType::QUERY_EVENT === $eventInfo->getType()) {
109 62
            $eventDTO = $this->filterDummyMariaDbEvents((new QueryEvent($eventInfo, $binaryDataReader))->makeQueryDTO());
110 62
        } else if (ConstEventType::FORMAT_DESCRIPTION_EVENT === $eventInfo->getType()) {
111 62
            $eventDTO = new FormatDescriptionEventDTO($eventInfo);
112
        }
113
114 62
        $this->dispatch($eventDTO);
115
    }
116
117 62
    private function createEventInfo(BinaryDataReader $binaryDataReader): EventInfo
118
    {
119 62
        return new EventInfo(
120 62
            $binaryDataReader->readInt32(),
121 62
            $binaryDataReader->readUInt8(),
122 62
            $binaryDataReader->readInt32(),
123 62
            $binaryDataReader->readInt32(),
124 62
            $binaryDataReader->readInt32(),
125 62
            $binaryDataReader->readUInt16(),
126 62
            $this->binLogSocketConnect->getCheckSum(),
127 62
            $this->binLogSocketConnect->getBinLogCurrent()
128
        );
129
    }
130
131 62
    private function filterDummyMariaDbEvents(QueryDTO $queryDTO): ?QueryDTO
132
    {
133 62
        if ($this->binLogSocketConnect->getBinLogServerInfo()->isMariaDb() &&
134 62
            false !== strpos($queryDTO->getQuery(), self::MARIADB_DUMMY_QUERY)) {
135
            return null;
136
        }
137
138 62
        return $queryDTO;
139
    }
140
141 62
    private function dispatch(EventDTO $eventDTO = null): void
142
    {
143 62
        if (null !== $eventDTO) {
144 62
            $this->eventDispatcher->dispatch($eventDTO, $eventDTO->getType());
145
        }
146
    }
147
}
148