Completed
Pull Request — master (#53)
by kacper
04:34
created

Event::consume()   C

Complexity

Conditions 14
Paths 49

Size

Total Lines 49
Code Lines 31

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 26
CRAP Score 15.2919

Importance

Changes 5
Bugs 2 Features 0
Metric Value
cc 14
eloc 31
c 5
b 2
f 0
nc 49
nop 0
dl 0
loc 49
ccs 26
cts 32
cp 0.8125
crap 15.2919
rs 6.2666

How to fix   Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
declare(strict_types=1);
3
4
namespace MySQLReplication\Event;
5
6
use MySQLReplication\BinaryDataReader\BinaryDataReader;
7
use MySQLReplication\BinaryDataReader\BinaryDataReaderException;
8
use MySQLReplication\BinLog\BinLogException;
9
use MySQLReplication\BinLog\BinLogServerInfo;
10
use MySQLReplication\BinLog\BinLogSocketConnect;
11
use MySQLReplication\Config\Config;
12
use MySQLReplication\Definitions\ConstEventType;
13
use MySQLReplication\Event\DTO\EventDTO;
14
use MySQLReplication\Event\DTO\FormatDescriptionEventDTO;
15
use MySQLReplication\Event\DTO\HeartbeatDTO;
16
use MySQLReplication\Event\DTO\QueryDTO;
17
use MySQLReplication\Event\RowEvent\RowEventFactory;
18
use MySQLReplication\Exception\MySQLReplicationException;
19
use MySQLReplication\JsonBinaryDecoder\JsonBinaryDecoderException;
20
use MySQLReplication\Socket\SocketException;
21
use Psr\SimpleCache\CacheInterface;
22
use Psr\SimpleCache\InvalidArgumentException;
23
use Symfony\Component\EventDispatcher\EventDispatcher;
24
25
class Event
26
{
27
    private const MARIADB_DUMMY_QUERY = '# Dum';
28
    private const EOF_HEADER_VALUE = 254;
29
30
    private $binLogSocketConnect;
31
    private $rowEventFactory;
32
    private $eventDispatcher;
33
    private $cache;
34
35 56
    public function __construct(
36
        BinLogSocketConnect $binLogSocketConnect,
37
        RowEventFactory $rowEventFactory,
38
        EventDispatcher $eventDispatcher,
39
        CacheInterface $cache
40
    ) {
41 56
        $this->binLogSocketConnect = $binLogSocketConnect;
42 56
        $this->rowEventFactory = $rowEventFactory;
43 56
        $this->eventDispatcher = $eventDispatcher;
44 56
        $this->cache = $cache;
45 56
    }
46
47
    /**
48
     * @throws BinaryDataReaderException
49
     * @throws BinLogException
50
     * @throws MySQLReplicationException
51
     * @throws JsonBinaryDecoderException
52
     * @throws InvalidArgumentException
53
     * @throws SocketException
54
     */
55 56
    public function consume(): void
56
    {
57 56
        $binaryDataReader = new BinaryDataReader($this->binLogSocketConnect->getResponse());
58
59
        // check EOF_Packet -> https://dev.mysql.com/doc/internals/en/packet-EOF_Packet.html
60 56
        if (self::EOF_HEADER_VALUE === $binaryDataReader->readUInt8()) {
61
            return;
62
        }
63
64
        // decode all events data
65 56
        $eventInfo = $this->createEventInfo($binaryDataReader);
66
67 56
        $eventDTO = null;
68
69
        // we always need this events to clean table maps and for BinLogCurrent class to keep track of binlog position
70
        // always parse table map event but propagate when needed (we need this for creating table cache)
71 56
        if (ConstEventType::TABLE_MAP_EVENT === $eventInfo->getType()) {
72 54
            $eventDTO = $this->rowEventFactory->makeRowEvent($binaryDataReader, $eventInfo)->makeTableMapDTO();
73 56
        } else if (ConstEventType::ROTATE_EVENT === $eventInfo->getType()) {
74
            $this->cache->clear();
75
            $eventDTO = (new RotateEvent($eventInfo, $binaryDataReader))->makeRotateEventDTO();
76 56
        } else if (ConstEventType::GTID_LOG_EVENT === $eventInfo->getType()) {
77
            $eventDTO = (new GtidEvent($eventInfo, $binaryDataReader))->makeGTIDLogDTO();
78 56
        } else if (ConstEventType::HEARTBEAT_LOG_EVENT === $eventInfo->getType()) {
79
            $eventDTO = new HeartbeatDTO($eventInfo);
80 56
        } else if (ConstEventType::MARIA_GTID_EVENT === $eventInfo->getType()) {
81
            $eventDTO = (new MariaDbGtidEvent($eventInfo, $binaryDataReader))->makeMariaDbGTIDLogDTO();
82
        }
83
84
        // check for ignore and permitted events
85 56
        if (!Config::checkEvent($eventInfo->getType())) {
86 4
            return;
87
        }
88
89 56
        if (in_array($eventInfo->getType(), [ConstEventType::UPDATE_ROWS_EVENT_V1, ConstEventType::UPDATE_ROWS_EVENT_V2], true)) {
90 1
            $eventDTO = $this->rowEventFactory->makeRowEvent($binaryDataReader, $eventInfo)->makeUpdateRowsDTO();
91 56
        } else if (in_array($eventInfo->getType(), [ConstEventType::WRITE_ROWS_EVENT_V1, ConstEventType::WRITE_ROWS_EVENT_V2], true)) {
92 53
            $eventDTO = $this->rowEventFactory->makeRowEvent($binaryDataReader, $eventInfo)->makeWriteRowsDTO();
93 56
        } else if (in_array($eventInfo->getType(), [ConstEventType::DELETE_ROWS_EVENT_V1, ConstEventType::DELETE_ROWS_EVENT_V2], true)) {
94 1
            $eventDTO = $this->rowEventFactory->makeRowEvent($binaryDataReader, $eventInfo)->makeDeleteRowsDTO();
95 56
        } else if (ConstEventType::XID_EVENT === $eventInfo->getType()) {
96 3
            $eventDTO = (new XidEvent($eventInfo, $binaryDataReader))->makeXidDTO();
97 56
        } else if (ConstEventType::QUERY_EVENT === $eventInfo->getType()) {
98 56
            $eventDTO = $this->filterDummyMariaDbEvents((new QueryEvent($eventInfo, $binaryDataReader))->makeQueryDTO());
99 56
        } else if (ConstEventType::FORMAT_DESCRIPTION_EVENT === $eventInfo->getType()) {
100 56
            $eventDTO = new FormatDescriptionEventDTO($eventInfo);
101
        }
102
103 56
        $this->dispatch($eventDTO);
104 56
    }
105
106 56
    private function createEventInfo(BinaryDataReader $binaryDataReader): EventInfo
107
    {
108 56
        return new EventInfo(
109 56
            $binaryDataReader->readInt32(),
110 56
            $binaryDataReader->readUInt8(),
111 56
            $binaryDataReader->readInt32(),
112 56
            $binaryDataReader->readInt32(),
113 56
            $binaryDataReader->readInt32(),
114 56
            $binaryDataReader->readUInt16(),
115 56
            $this->binLogSocketConnect->getCheckSum(),
116 56
            $this->binLogSocketConnect->getBinLogCurrent()
117
        );
118
    }
119
120 56
    private function filterDummyMariaDbEvents(QueryDTO $queryDTO): ?QueryDTO
121
    {
122 56
        if (BinLogServerInfo::isMariaDb() && false !== strpos($queryDTO->getQuery(), self::MARIADB_DUMMY_QUERY)) {
123
            return null;
124
        }
125
126 56
        return $queryDTO;
127
    }
128
129 56
    private function dispatch(EventDTO $eventDTO = null): void
130
    {
131 56
        if (null !== $eventDTO) {
132 56
            $this->eventDispatcher->dispatch($eventDTO->getType(), $eventDTO);
133
        }
134 56
    }
135
}
136