Completed
Pull Request — master (#34)
by kacper
04:27
created

Event::dispatch()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 2

Importance

Changes 0
Metric Value
cc 2
eloc 2
nc 2
nop 1
dl 0
loc 4
ccs 4
cts 4
cp 1
crap 2
rs 10
c 0
b 0
f 0
1
<?php
2
3
namespace MySQLReplication\Event;
4
5
use MySQLReplication\BinaryDataReader\BinaryDataReader;
6
use MySQLReplication\BinaryDataReader\BinaryDataReaderException;
7
use MySQLReplication\BinaryDataReader\BinaryDataReaderFactory;
8
use MySQLReplication\BinLog\BinLogException;
9
use MySQLReplication\BinLog\BinLogSocketConnect;
10
use MySQLReplication\Config\Config;
11
use MySQLReplication\Config\ConfigException;
12
use MySQLReplication\Definitions\ConstEventType;
13
use MySQLReplication\Event\DTO\EventDTO;
14
use MySQLReplication\Event\DTO\FormatDescriptionEventDTO;
15
use MySQLReplication\Event\DTO\HeartbeatDTO;
16
use MySQLReplication\Event\RowEvent\RowEventFactory;
17
use MySQLReplication\Exception\MySQLReplicationException;
18
use MySQLReplication\JsonBinaryDecoder\JsonBinaryDecoderException;
19
use Psr\SimpleCache\CacheInterface;
20
use Psr\SimpleCache\InvalidArgumentException;
21
use Symfony\Component\EventDispatcher\EventDispatcher;
22
23
/**
24
 * Class Event
25
 * @package MySQLReplication\Event
26
 */
27
class Event
28
{
29
    /**
30
     * @var BinLogSocketConnect
31
     */
32
    private $socketConnect;
33
    /**
34
     * @var BinaryDataReaderFactory
35
     */
36
    private $packageService;
37
    /**
38
     * @var RowEventFactory
39
     */
40
    private $rowEventService;
41
    /**
42
     * @var EventDispatcher
43
     */
44
    private $eventDispatcher;
45
    /**
46
     * @var CacheInterface
47
     */
48
    private $cache;
49
50
    /**
51
     * BinLogPack constructor.
52
     * @param BinLogSocketConnect $socketConnect
53
     * @param BinaryDataReaderFactory $packageService
54
     * @param RowEventFactory $rowEventService
55
     * @param EventDispatcher $eventDispatcher
56
     * @param CacheInterface $cache
57
     */
58 54
    public function __construct(
59
        BinLogSocketConnect $socketConnect,
60
        BinaryDataReaderFactory $packageService,
61
        RowEventFactory $rowEventService,
62
        EventDispatcher $eventDispatcher,
63
        CacheInterface $cache
64
    ) {
65 54
        $this->socketConnect = $socketConnect;
66 54
        $this->packageService = $packageService;
67 54
        $this->rowEventService = $rowEventService;
68 54
        $this->eventDispatcher = $eventDispatcher;
69 54
        $this->cache = $cache;
70 54
    }
71
72
    /**
73
     * @throws BinaryDataReaderException
74
     * @throws BinLogException
75
     * @throws ConfigException
76
     * @throws EventException
77
     * @throws MySQLReplicationException
78
     * @throws JsonBinaryDecoderException
79
     * @throws InvalidArgumentException
80
     * @throws \MySQLReplication\Socket\SocketException
81
     */
82 54
    public function consume()
83
    {
84 54
        $binaryDataReader = $this->packageService->makePackageFromBinaryData($this->socketConnect->getResponse());
85
86
        // "ok" value on first byte continue
87 54
        $binaryDataReader->advance(1);
88
89
        // decode all events data
90 54
        $eventInfo = $this->createEventInfo($binaryDataReader);
91
92 54
        $eventDTO = null;
93
94
        // always parse table map event but propagate when needed (we need this for creating table cache)
95 54
        if (ConstEventType::TABLE_MAP_EVENT === $eventInfo->getType()) {
0 ignored issues
show
introduced by
The condition MySQLReplication\Definit...= $eventInfo->getType() is always false.
Loading history...
96 52
            $eventDTO = $this->rowEventService->makeRowEvent($binaryDataReader, $eventInfo)->makeTableMapDTO();
97 52
        }
98
99 54
        if (!Config::checkEvent($eventInfo->getType())) {
0 ignored issues
show
Bug introduced by
$eventInfo->getType() of type string is incompatible with the type integer expected by parameter $type of MySQLReplication\Config\Config::checkEvent(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

99
        if (!Config::checkEvent(/** @scrutinizer ignore-type */ $eventInfo->getType())) {
Loading history...
100 3
            return;
101
        }
102
103 54
        if (in_array($eventInfo->getType(), [ConstEventType::UPDATE_ROWS_EVENT_V1, ConstEventType::UPDATE_ROWS_EVENT_V2], true)) {
104 1
            $eventDTO = $this->rowEventService->makeRowEvent($binaryDataReader, $eventInfo)->makeUpdateRowsDTO();
105 54
        } elseif (in_array($eventInfo->getType(), [ConstEventType::WRITE_ROWS_EVENT_V1, ConstEventType::WRITE_ROWS_EVENT_V2], true)) {
106 52
            $eventDTO = $this->rowEventService->makeRowEvent($binaryDataReader, $eventInfo)->makeWriteRowsDTO();
107 54
        } elseif (in_array($eventInfo->getType(), [ConstEventType::DELETE_ROWS_EVENT_V1, ConstEventType::DELETE_ROWS_EVENT_V2], true)) {
108 1
            $eventDTO = $this->rowEventService->makeRowEvent($binaryDataReader, $eventInfo)->makeDeleteRowsDTO();
109 54
        } elseif (ConstEventType::XID_EVENT === $eventInfo->getType()) {
0 ignored issues
show
introduced by
The condition MySQLReplication\Definit...= $eventInfo->getType() is always false.
Loading history...
110 3
            $eventDTO = (new XidEvent($eventInfo, $binaryDataReader))->makeXidDTO();
111 54
        } elseif (ConstEventType::ROTATE_EVENT === $eventInfo->getType()) {
0 ignored issues
show
introduced by
The condition MySQLReplication\Definit...= $eventInfo->getType() is always false.
Loading history...
112
            $this->cache->clear();
113
            $eventDTO = (new RotateEvent($eventInfo, $binaryDataReader))->makeRotateEventDTO();
114 54
        } elseif (ConstEventType::GTID_LOG_EVENT === $eventInfo->getType()) {
0 ignored issues
show
introduced by
The condition MySQLReplication\Definit...= $eventInfo->getType() is always false.
Loading history...
115
            $eventDTO = (new GtidEvent($eventInfo, $binaryDataReader))->makeGTIDLogDTO();
116 54
        } elseif (ConstEventType::QUERY_EVENT === $eventInfo->getType()) {
0 ignored issues
show
introduced by
The condition MySQLReplication\Definit...= $eventInfo->getType() is always false.
Loading history...
117 54
            $eventDTO = (new QueryEvent($eventInfo, $binaryDataReader))->makeQueryDTO();
118 54
        } elseif (ConstEventType::MARIA_GTID_EVENT === $eventInfo->getType()) {
0 ignored issues
show
introduced by
The condition MySQLReplication\Definit...= $eventInfo->getType() is always false.
Loading history...
119
            $eventDTO = (new MariaDbGtidEvent($eventInfo, $binaryDataReader))->makeMariaDbGTIDLogDTO();
120 54
        } elseif (ConstEventType::FORMAT_DESCRIPTION_EVENT === $eventInfo->getType()) {
0 ignored issues
show
introduced by
The condition MySQLReplication\Definit...= $eventInfo->getType() is always false.
Loading history...
121 54
            $eventDTO = new FormatDescriptionEventDTO($eventInfo);
122 54
        } elseif (ConstEventType::HEARTBEAT_LOG_EVENT === $eventInfo->getType()) {
0 ignored issues
show
introduced by
The condition MySQLReplication\Definit...= $eventInfo->getType() is always false.
Loading history...
123
            $eventDTO = new HeartbeatDTO($eventInfo);
124
        }
125
126 54
        $this->dispatch($eventDTO);
127 54
    }
128
129
    /**
130
     * @param BinaryDataReader $binaryDataReader
131
     * @return EventInfo
132
     * @throws \MySQLReplication\BinaryDataReader\BinaryDataReaderException
133
     */
134 54
    private function createEventInfo(BinaryDataReader $binaryDataReader)
135
    {
136 54
        return new EventInfo(
137 54
            $binaryDataReader->readInt32(),
138 54
            $binaryDataReader->readUInt8(),
139 54
            $binaryDataReader->readInt32(),
140 54
            $binaryDataReader->readInt32(),
141 54
            $binaryDataReader->readInt32(),
142 54
            $binaryDataReader->readUInt16(),
143 54
            $this->socketConnect->getCheckSum(),
144 54
            $this->socketConnect->getBinLogCurrent()
145 54
        );
146
    }
147
148
    /**
149
     * @param EventDTO $eventDTO
150
     */
151 54
    private function dispatch(EventDTO $eventDTO = null)
152 1
    {
153 54
        if (null !== $eventDTO) {
154 54
            $this->eventDispatcher->dispatch($eventDTO->getType(), $eventDTO);
155 54
        }
156 54
    }
157
}
158