Passed
Push — master ( 41866b...658441 )
by kacper
05:20
created

Event   A

Complexity

Total Complexity 21

Size/Duplication

Total Lines 108
Duplicated Lines 0 %

Test Coverage

Coverage 91.07%

Importance

Changes 6
Bugs 2 Features 0
Metric Value
eloc 56
dl 0
loc 108
ccs 51
cts 56
cp 0.9107
rs 10
c 6
b 2
f 0
wmc 21

5 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 10 1
A dispatch() 0 4 2
A createEventInfo() 0 11 1
A filterDummyMariaDbEvents() 0 7 3
C consume() 0 49 14
1
<?php
2
declare(strict_types=1);
3
4
namespace MySQLReplication\Event;
5
6
use MySQLReplication\BinaryDataReader\BinaryDataReader;
7
use MySQLReplication\BinaryDataReader\BinaryDataReaderException;
8
use MySQLReplication\BinLog\BinLogException;
9
use MySQLReplication\BinLog\BinLogServerInfo;
10
use MySQLReplication\BinLog\BinLogSocketConnect;
11
use MySQLReplication\Config\Config;
12
use MySQLReplication\Definitions\ConstEventType;
13
use MySQLReplication\Event\DTO\EventDTO;
14
use MySQLReplication\Event\DTO\FormatDescriptionEventDTO;
15
use MySQLReplication\Event\DTO\HeartbeatDTO;
16
use MySQLReplication\Event\DTO\QueryDTO;
17
use MySQLReplication\Event\RowEvent\RowEventFactory;
18
use MySQLReplication\Exception\MySQLReplicationException;
19
use MySQLReplication\JsonBinaryDecoder\JsonBinaryDecoderException;
20
use MySQLReplication\Socket\SocketException;
21
use Psr\SimpleCache\CacheInterface;
22
use Psr\SimpleCache\InvalidArgumentException;
23
use Symfony\Component\EventDispatcher\EventDispatcher;
24
25
class Event
26
{
27
    private const MARIADB_DUMMY_QUERY = '# Dum';
28
    private const EOF_HEADER_VALUE = 254;
29
30
    private $binLogSocketConnect;
31
    private $rowEventFactory;
32
    private $eventDispatcher;
33
    private $cache;
34
35 57
    public function __construct(
36
        BinLogSocketConnect $binLogSocketConnect,
37
        RowEventFactory $rowEventFactory,
38
        EventDispatcher $eventDispatcher,
39
        CacheInterface $cache
40
    ) {
41 57
        $this->binLogSocketConnect = $binLogSocketConnect;
42 57
        $this->rowEventFactory = $rowEventFactory;
43 57
        $this->eventDispatcher = $eventDispatcher;
44 57
        $this->cache = $cache;
45 57
    }
46
47
    /**
48
     * @throws BinaryDataReaderException
49
     * @throws BinLogException
50
     * @throws MySQLReplicationException
51
     * @throws JsonBinaryDecoderException
52
     * @throws InvalidArgumentException
53
     * @throws SocketException
54
     */
55 57
    public function consume(): void
56
    {
57 57
        $binaryDataReader = new BinaryDataReader($this->binLogSocketConnect->getResponse());
58
59
        // check EOF_Packet -> https://dev.mysql.com/doc/internals/en/packet-EOF_Packet.html
60 57
        if (self::EOF_HEADER_VALUE === $binaryDataReader->readUInt8()) {
61
            return;
62
        }
63
64
        // decode all events data
65 57
        $eventInfo = $this->createEventInfo($binaryDataReader);
66
67 57
        $eventDTO = null;
68
69
        // we always need this events to clean table maps and for BinLogCurrent class to keep track of binlog position
70
        // always parse table map event but propagate when needed (we need this for creating table cache)
71 57
        if (ConstEventType::TABLE_MAP_EVENT === $eventInfo->getType()) {
72 54
            $eventDTO = $this->rowEventFactory->makeRowEvent($binaryDataReader, $eventInfo)->makeTableMapDTO();
73 57
        } else if (ConstEventType::ROTATE_EVENT === $eventInfo->getType()) {
74 1
            $this->cache->clear();
75 1
            $eventDTO = (new RotateEvent($eventInfo, $binaryDataReader))->makeRotateEventDTO();
76 57
        } else if (ConstEventType::GTID_LOG_EVENT === $eventInfo->getType()) {
77
            $eventDTO = (new GtidEvent($eventInfo, $binaryDataReader))->makeGTIDLogDTO();
78 57
        } else if (ConstEventType::HEARTBEAT_LOG_EVENT === $eventInfo->getType()) {
79
            $eventDTO = new HeartbeatDTO($eventInfo);
80 57
        } else if (ConstEventType::MARIA_GTID_EVENT === $eventInfo->getType()) {
81
            $eventDTO = (new MariaDbGtidEvent($eventInfo, $binaryDataReader))->makeMariaDbGTIDLogDTO();
82
        }
83
84
        // check for ignore and permitted events
85 57
        if (!Config::checkEvent($eventInfo->getType())) {
86 4
            return;
87
        }
88
89 57
        if (in_array($eventInfo->getType(), [ConstEventType::UPDATE_ROWS_EVENT_V1, ConstEventType::UPDATE_ROWS_EVENT_V2], true)) {
90 1
            $eventDTO = $this->rowEventFactory->makeRowEvent($binaryDataReader, $eventInfo)->makeUpdateRowsDTO();
91 57
        } else if (in_array($eventInfo->getType(), [ConstEventType::WRITE_ROWS_EVENT_V1, ConstEventType::WRITE_ROWS_EVENT_V2], true)) {
92 53
            $eventDTO = $this->rowEventFactory->makeRowEvent($binaryDataReader, $eventInfo)->makeWriteRowsDTO();
93 57
        } else if (in_array($eventInfo->getType(), [ConstEventType::DELETE_ROWS_EVENT_V1, ConstEventType::DELETE_ROWS_EVENT_V2], true)) {
94 1
            $eventDTO = $this->rowEventFactory->makeRowEvent($binaryDataReader, $eventInfo)->makeDeleteRowsDTO();
95 57
        } else if (ConstEventType::XID_EVENT === $eventInfo->getType()) {
96 3
            $eventDTO = (new XidEvent($eventInfo, $binaryDataReader))->makeXidDTO();
97 57
        } else if (ConstEventType::QUERY_EVENT === $eventInfo->getType()) {
98 57
            $eventDTO = $this->filterDummyMariaDbEvents((new QueryEvent($eventInfo, $binaryDataReader))->makeQueryDTO());
99 57
        } else if (ConstEventType::FORMAT_DESCRIPTION_EVENT === $eventInfo->getType()) {
100 57
            $eventDTO = new FormatDescriptionEventDTO($eventInfo);
101
        }
102
103 57
        $this->dispatch($eventDTO);
104 57
    }
105
106 57
    private function createEventInfo(BinaryDataReader $binaryDataReader): EventInfo
107
    {
108 57
        return new EventInfo(
109 57
            $binaryDataReader->readInt32(),
110 57
            $binaryDataReader->readUInt8(),
111 57
            $binaryDataReader->readInt32(),
112 57
            $binaryDataReader->readInt32(),
113 57
            $binaryDataReader->readInt32(),
114 57
            $binaryDataReader->readUInt16(),
115 57
            $this->binLogSocketConnect->getCheckSum(),
116 57
            $this->binLogSocketConnect->getBinLogCurrent()
117
        );
118
    }
119
120 57
    private function filterDummyMariaDbEvents(QueryDTO $queryDTO): ?QueryDTO
121
    {
122 57
        if (BinLogServerInfo::isMariaDb() && false !== strpos($queryDTO->getQuery(), self::MARIADB_DUMMY_QUERY)) {
123
            return null;
124
        }
125
126 57
        return $queryDTO;
127
    }
128
129 57
    private function dispatch(EventDTO $eventDTO = null): void
130
    {
131 57
        if (null !== $eventDTO) {
132 57
            $this->eventDispatcher->dispatch($eventDTO, $eventDTO->getType());
133
        }
134 57
    }
135
}
136