Test Failed
Pull Request — master (#4)
by Moln
03:04
created

Event::createEventInfo()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 11
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 9
nc 1
nop 1
dl 0
loc 11
rs 9.9666
c 0
b 0
f 0
1
<?php
2
declare(strict_types=1);
3
4
namespace MySQLReplication\Event;
5
6
use MySQLReplication\BinaryDataReader\BinaryDataReader;
7
use MySQLReplication\BinaryDataReader\BinaryDataReaderException;
8
use MySQLReplication\BinLog\BinLogException;
9
use MySQLReplication\BinLog\BinLogSocketConnect;
10
use MySQLReplication\Config\Config;
11
use MySQLReplication\Definitions\ConstEventType;
12
use MySQLReplication\Event\DTO\EventDTO;
13
use MySQLReplication\Event\DTO\FormatDescriptionEventDTO;
14
use MySQLReplication\Event\DTO\HeartbeatDTO;
15
use MySQLReplication\Event\DTO\QueryDTO;
16
use MySQLReplication\Event\RowEvent\RowEventFactory;
17
use MySQLReplication\Exception\MySQLReplicationException;
18
use MySQLReplication\JsonBinaryDecoder\JsonBinaryDecoderException;
19
use MySQLReplication\Socket\SocketException;
20
use Psr\SimpleCache\CacheInterface;
21
use Psr\SimpleCache\InvalidArgumentException;
22
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
23
24
class Event
25
{
26
    private const MARIADB_DUMMY_QUERY = '# Dum';
27
    private const EOF_HEADER_VALUE = 254;
28
29
    private $binLogSocketConnect;
30
    private $rowEventFactory;
31
    private $eventDispatcher;
32
    private $cache;
33
    /**
34
     * @var Config
35
     */
36
    private $config;
37
38
    public function __construct(
39
        Config $config,
40
        BinLogSocketConnect $binLogSocketConnect,
41
        RowEventFactory $rowEventFactory,
42
        EventDispatcherInterface $eventDispatcher,
43
        CacheInterface $cache
44
    ) {
45
        $this->config = $config;
46
        $this->binLogSocketConnect = $binLogSocketConnect;
47
        $this->rowEventFactory = $rowEventFactory;
48
        $this->eventDispatcher = $eventDispatcher;
49
        $this->cache = $cache;
50
    }
51
52
    public function connect(): void
53
    {
54
        $this->binLogSocketConnect->connect();
55
    }
56
57
    /**
58
     * @throws BinaryDataReaderException
59
     * @throws BinLogException
60
     * @throws MySQLReplicationException
61
     * @throws JsonBinaryDecoderException
62
     * @throws InvalidArgumentException
63
     * @throws SocketException
64
     */
65
    public function consume(): void
66
    {
67
        $binLogServerInfo = $this->binLogSocketConnect->getBinLogServerInfo();
68
        $binaryDataReader = new BinaryDataReader($this->binLogSocketConnect->getResponse());
69
70
        // check EOF_Packet -> https://dev.mysql.com/doc/internals/en/packet-EOF_Packet.html
71
        if (self::EOF_HEADER_VALUE === $binaryDataReader->readUInt8()) {
72
            return;
73
        }
74
75
        // decode all events data
76
        $eventInfo = $this->createEventInfo($binaryDataReader);
77
78
        $eventDTO = null;
79
80
        // we always need this events to clean table maps and for BinLogCurrent class to keep track of binlog position
81
        // always parse table map event but propagate when needed (we need this for creating table cache)
82
        if (ConstEventType::TABLE_MAP_EVENT === $eventInfo->getType()) {
83
            $eventDTO = $this->rowEventFactory->makeRowEvent($binaryDataReader, $eventInfo)->makeTableMapDTO();
84
        } else if (ConstEventType::ROTATE_EVENT === $eventInfo->getType()) {
85
            $this->cache->clear();
86
            $eventDTO = (new RotateEvent($binLogServerInfo, $eventInfo, $binaryDataReader))->makeRotateEventDTO();
87
        } else if (ConstEventType::GTID_LOG_EVENT === $eventInfo->getType()) {
88
            $eventDTO = (new GtidEvent($eventInfo, $binaryDataReader))->makeGTIDLogDTO();
89
        } else if (ConstEventType::HEARTBEAT_LOG_EVENT === $eventInfo->getType()) {
90
            $eventDTO = new HeartbeatDTO($eventInfo);
91
        } else if (ConstEventType::MARIA_GTID_EVENT === $eventInfo->getType()) {
92
            $eventDTO = (new MariaDbGtidEvent($eventInfo, $binaryDataReader))->makeMariaDbGTIDLogDTO();
93
        }
94
95
        // check for ignore and permitted events
96
        if (!$this->config->checkEvent($eventInfo->getType())) {
97
            return;
98
        }
99
100
        if (in_array($eventInfo->getType(), [ConstEventType::UPDATE_ROWS_EVENT_V1, ConstEventType::UPDATE_ROWS_EVENT_V2], true)) {
101
            $eventDTO = $this->rowEventFactory->makeRowEvent($binaryDataReader, $eventInfo)->makeUpdateRowsDTO();
102
        } else if (in_array($eventInfo->getType(), [ConstEventType::WRITE_ROWS_EVENT_V1, ConstEventType::WRITE_ROWS_EVENT_V2], true)) {
103
            $eventDTO = $this->rowEventFactory->makeRowEvent($binaryDataReader, $eventInfo)->makeWriteRowsDTO();
104
        } else if (in_array($eventInfo->getType(), [ConstEventType::DELETE_ROWS_EVENT_V1, ConstEventType::DELETE_ROWS_EVENT_V2], true)) {
105
            $eventDTO = $this->rowEventFactory->makeRowEvent($binaryDataReader, $eventInfo)->makeDeleteRowsDTO();
106
        } else if (ConstEventType::XID_EVENT === $eventInfo->getType()) {
107
            $eventDTO = (new XidEvent($eventInfo, $binaryDataReader))->makeXidDTO();
108
        } else if (ConstEventType::QUERY_EVENT === $eventInfo->getType()) {
109
            $eventDTO = $this->filterDummyMariaDbEvents((new QueryEvent($eventInfo, $binaryDataReader))->makeQueryDTO());
110
        } else if (ConstEventType::FORMAT_DESCRIPTION_EVENT === $eventInfo->getType()) {
111
            $eventDTO = new FormatDescriptionEventDTO($eventInfo);
112
        }
113
114
        $this->dispatch($eventDTO);
115
    }
116
117
    private function createEventInfo(BinaryDataReader $binaryDataReader): EventInfo
118
    {
119
        return new EventInfo(
120
            $binaryDataReader->readInt32(),
121
            $binaryDataReader->readUInt8(),
122
            $binaryDataReader->readInt32(),
123
            $binaryDataReader->readInt32(),
124
            $binaryDataReader->readInt32(),
125
            $binaryDataReader->readUInt16(),
126
            $this->binLogSocketConnect->getCheckSum(),
127
            $this->binLogSocketConnect->getBinLogCurrent()
128
        );
129
    }
130
131
    private function filterDummyMariaDbEvents(QueryDTO $queryDTO): ?QueryDTO
132
    {
133
        if ($this->binLogSocketConnect->getBinLogServerInfo()->isMariaDb() &&
134
            false !== strpos($queryDTO->getQuery(), self::MARIADB_DUMMY_QUERY)) {
135
            return null;
136
        }
137
138
        return $queryDTO;
139
    }
140
141
    private function dispatch(EventDTO $eventDTO = null): void
142
    {
143
        if (null !== $eventDTO) {
144
            $this->eventDispatcher->dispatch($eventDTO, $eventDTO->getType());
145
        }
146
    }
147
}
148