Passed
Push — master ( 2b0f26...e41b5c )
by kacper
02:51
created

Event   B

Complexity

Total Complexity 19

Size/Duplication

Total Lines 147
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 16

Test Coverage

Coverage 79.76%

Importance

Changes 4
Bugs 2 Features 0
Metric Value
wmc 19
c 4
b 2
f 0
lcom 1
cbo 16
dl 0
loc 147
ccs 67
cts 84
cp 0.7976
rs 8.4614

2 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 13 1
D consume() 0 91 18
1
<?php
2
3
namespace MySQLReplication\Event;
4
5
use MySQLReplication\BinaryDataReader\BinaryDataReaderException;
6
use MySQLReplication\BinaryDataReader\BinaryDataReaderFactory;
7
use MySQLReplication\BinLog\BinLogException;
8
use MySQLReplication\BinLog\BinLogSocketConnect;
9
use MySQLReplication\Config\Config;
10
use MySQLReplication\Config\ConfigException;
11
use MySQLReplication\Definitions\ConstEventsNames;
12
use MySQLReplication\Definitions\ConstEventType;
13
use MySQLReplication\Event\DTO\FormatDescriptionEventDTO;
14
use MySQLReplication\Event\DTO\HeartbeatDTO;
15
use MySQLReplication\Event\RowEvent\RowEventFactory;
16
use MySQLReplication\Exception\MySQLReplicationException;
17
use MySQLReplication\JsonBinaryDecoder\JsonBinaryDecoderException;
18
use Psr\SimpleCache\CacheInterface;
19
use Psr\SimpleCache\InvalidArgumentException;
20
use Symfony\Component\EventDispatcher\EventDispatcher;
21
22
/**
23
 * Class Event
24
 * @package MySQLReplication\Event
25
 */
26
class Event
27
{
28
    /**
29
     * @var BinLogSocketConnect
30
     */
31
    private $socketConnect;
32
    /**
33
     * @var BinaryDataReaderFactory
34
     */
35
    private $packageService;
36
    /**
37
     * @var RowEventFactory
38
     */
39
    private $rowEventService;
40
    /**
41
     * @var EventDispatcher
42
     */
43
    private $eventDispatcher;
44
    /**
45
     * @var CacheInterface
46
     */
47
    private $cache;
48
49
    /**
50
     * BinLogPack constructor.
51
     * @param BinLogSocketConnect $socketConnect
52
     * @param BinaryDataReaderFactory $packageService
53
     * @param RowEventFactory $rowEventService
54
     * @param EventDispatcher $eventDispatcher
55
     * @param CacheInterface $cache
56
     */
57 54
    public function __construct(
58
        BinLogSocketConnect $socketConnect,
59
        BinaryDataReaderFactory $packageService,
60
        RowEventFactory $rowEventService,
61
        EventDispatcher $eventDispatcher,
62
        CacheInterface $cache
63
    ) {
64 54
        $this->socketConnect = $socketConnect;
65 54
        $this->packageService = $packageService;
66 54
        $this->rowEventService = $rowEventService;
67 54
        $this->eventDispatcher = $eventDispatcher;
68 54
        $this->cache = $cache;
69 54
    }
70
71
    /**
72
     * @throws BinaryDataReaderException
73
     * @throws BinLogException
74
     * @throws ConfigException
75
     * @throws EventException
76
     * @throws MySQLReplicationException
77
     * @throws JsonBinaryDecoderException
78
     * @throws InvalidArgumentException
79
     * @throws \MySQLReplication\Socket\SocketException
80
     */
81 54
    public function consume()
82
    {
83 54
        $binaryDataReader = $this->packageService->makePackageFromBinaryData($this->socketConnect->getResponse());
84
85
        // "ok" value on first byte continue
86 54
        $binaryDataReader->advance(1);
87
88
        // decode all events data
89 54
        $eventInfo = new EventInfo(
90 54
            $binaryDataReader->readInt32(),
91 54
            $binaryDataReader->readUInt8(),
92 54
            $binaryDataReader->readInt32(),
93 54
            $binaryDataReader->readInt32(),
94 54
            $binaryDataReader->readInt32(),
95 54
            $binaryDataReader->readUInt16(),
96 54
            $this->socketConnect->getCheckSum()
97 54
        );
98
99 54
        if (ConstEventType::TABLE_MAP_EVENT === $eventInfo->getType()) {
100 52
            $event = $this->rowEventService->makeRowEvent($binaryDataReader, $eventInfo)->makeTableMapDTO();
101 52
            if (null !== $event && Config::checkEvent($eventInfo->getType())) {
102 50
                $this->eventDispatcher->dispatch(ConstEventsNames::TABLE_MAP, $event);
103 50
            }
104
105 52
            return;
106
        }
107
108 54
        if (!Config::checkEvent($eventInfo->getType())) {
109 3
            return;
110
        }
111
112 54
        if (in_array(
113 54
            $eventInfo->getType(), [ConstEventType::UPDATE_ROWS_EVENT_V1, ConstEventType::UPDATE_ROWS_EVENT_V2],
114
            true
115 54
        )) {
116 1
            $event = $this->rowEventService->makeRowEvent($binaryDataReader, $eventInfo)->makeUpdateRowsDTO();
117 1
            if ($event !== null) {
118 1
                $this->eventDispatcher->dispatch(ConstEventsNames::UPDATE, $event);
119 1
            }
120 54
        } elseif (in_array(
121 54
            $eventInfo->getType(), [ConstEventType::WRITE_ROWS_EVENT_V1, ConstEventType::WRITE_ROWS_EVENT_V2], true
122 54
        )) {
123 52
            $event = $this->rowEventService->makeRowEvent($binaryDataReader, $eventInfo)->makeWriteRowsDTO();
124 52
            if ($event !== null) {
125 52
                $this->eventDispatcher->dispatch(ConstEventsNames::WRITE, $event);
126 52
            }
127 54
        } elseif (in_array(
128 54
            $eventInfo->getType(), [ConstEventType::DELETE_ROWS_EVENT_V1, ConstEventType::DELETE_ROWS_EVENT_V2],
129
            true
130 54
        )) {
131 1
            $event = $this->rowEventService->makeRowEvent($binaryDataReader, $eventInfo)->makeDeleteRowsDTO();
132 1
            if ($event !== null) {
133 1
                $this->eventDispatcher->dispatch(ConstEventsNames::DELETE, $event);
134 1
            }
135 54
        } elseif (ConstEventType::XID_EVENT === $eventInfo->getType()) {
136 3
            $this->eventDispatcher->dispatch(
137 3
                ConstEventsNames::XID,
138 3
                (new XidEvent($eventInfo, $binaryDataReader))->makeXidDTO()
139 3
            );
140 54
        } elseif (ConstEventType::ROTATE_EVENT === $eventInfo->getType()) {
141
            $this->cache->clear();
142
143
            $this->eventDispatcher->dispatch(
144
                ConstEventsNames::ROTATE,
145
                (new RotateEvent($eventInfo, $binaryDataReader))->makeRotateEventDTO()
146
            );
147 54
        } elseif (ConstEventType::GTID_LOG_EVENT === $eventInfo->getType()) {
148
            $this->eventDispatcher->dispatch(
149
                ConstEventsNames::GTID,
150
                (new GtidEvent($eventInfo, $binaryDataReader))->makeGTIDLogDTO()
151
            );
152 54
        } elseif (ConstEventType::QUERY_EVENT === $eventInfo->getType()) {
153 54
            $this->eventDispatcher->dispatch(
154 54
                ConstEventsNames::QUERY,
155 54
                (new QueryEvent($eventInfo, $binaryDataReader))->makeQueryDTO()
156 54
            );
157 54
        } elseif (ConstEventType::MARIA_GTID_EVENT === $eventInfo->getType()) {
158
            $this->eventDispatcher->dispatch(
159
                ConstEventsNames::MARIADB_GTID,
160
                (new MariaDbGtidEvent($eventInfo, $binaryDataReader))->makeMariaDbGTIDLogDTO()
161
            );
162 54
        } elseif (ConstEventType::FORMAT_DESCRIPTION_EVENT === $eventInfo->getType()) {
163 54
            $this->eventDispatcher->dispatch(
164 54
                ConstEventsNames::FORMAT_DESCRIPTION, new FormatDescriptionEventDTO($eventInfo)
165 54
            );
166 54
        } elseif (ConstEventType::HEARTBEAT_LOG_EVENT === $eventInfo->getType()) {
167
            $this->eventDispatcher->dispatch(
168
                ConstEventsNames::HEARTBEAT, new HeartbeatDTO($eventInfo)
169
            );
170
        }
171 54
    }
172
}
173