Test Failed
Pull Request — master (#12)
by wujunze
03:09
created

Producter::convertRecordSet()   B

Complexity

Conditions 7
Paths 34

Size

Total Lines 42
Code Lines 25

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 1
Metric Value
cc 7
eloc 25
c 1
b 0
f 1
nc 34
nop 1
dl 0
loc 42
rs 8.5866
1
<?php
2
declare(strict_types=1);
3
4
namespace Seasx\SeasLogger\Kafka;
5
6
use Co;
7
use Exception;
8
use Seasx\SeasLogger\Kafka\Socket\Pool;
9
10
/**
11
 * Class Producter
12
 * @package Seasx\SeasLogger\Kafka
13
 */
14
class Producter
15
{
16
    /** @var array */
17
    private $msgBuffer = [];
18
    /** @var ProducterConfig */
19
    private $config;
20
    /** @var Broker */
21
    private $broker;
22
    /** @var Pool */
23
    private $pool;
24
    /** @var RecordValidator */
25
    private $recordValidator;
26
27
    /**
28
     * Producter constructor.
29
     * @param ProducterConfig $config
30
     * @param Broker $broker
31
     * @param Pool $pool
32
     */
33
    public function __construct(ProducterConfig $config, Broker $broker, Pool $pool)
34
    {
35
        $this->config = $config;
36
        $this->broker = $broker;
37
        $this->pool = $pool;
38
        $this->recordValidator = new RecordValidator();
39
        ProtocolTool::init($config->getBrokerVersion());
40
    }
41
42
    /**
43
     * @param array $recordSet
44
     * @param callable|null $callback
45
     * @throws Exception
46
     */
47
    public function send(array $recordSet, ?callable $callback = null): void
48
    {
49
        static $isInit = false;
50
        if (!$isInit) {
51
            $isInit = true;
52
            $this->syncMeta();
53
        }
54
        $requiredAck = $this->config->getRequiredAck();
55
        $timeout = $this->config->getTimeout();
56
        $compression = $this->config->getCompression();
57
        if (empty($recordSet)) {
58
            return;
59
        }
60
61
        $recordSet = array_merge($recordSet, array_splice($this->msgBuffer, 0));
62
        $sendData = $this->convertRecordSet($recordSet);
63
        foreach ($sendData as $brokerId => $topicList) {
64
            $connect = $this->pool->getConnection();
65
            $params = [
66
                'required_ack' => $requiredAck,
67
                'timeout' => $timeout,
68
                'data' => $topicList,
69
                'compression' => $compression,
70
            ];
71
72
            $requestData = ProtocolTool::encode(ProtocolTool::PRODUCE_REQUEST, $params);
73
            rgo(function () use ($connect, $requestData, $requiredAck, $callback) {
74
                if ($requiredAck !== 0) {
75
                    $connect->send($requestData);
76
                    $dataLen = Protocol::unpack(Protocol::BIT_B32, $connect->recv(4));
77
                    $recordSet = $connect->recv($dataLen);
0 ignored issues
show
Bug introduced by
It seems like $dataLen can also be of type false; however, parameter $length of Seasx\SeasLogger\Kafka\Socket\SocketIO::recv() does only seem to accept integer, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

77
                    $recordSet = $connect->recv(/** @scrutinizer ignore-type */ $dataLen);
Loading history...
78
                    $this->pool->release($connect);
79
                    $correlationId = Protocol::unpack(Protocol::BIT_B32, substr($recordSet, 0, 4));
0 ignored issues
show
Unused Code introduced by
The assignment to $correlationId is dead and can be removed.
Loading history...
80
                    $callback && $callback(ProtocolTool::decode(ProtocolTool::PRODUCE_REQUEST,
81
                        substr($recordSet, 4)));
82
                } else {
83
                    $connect->send($requestData);
84
                    $this->pool->release($connect);
85
                }
86
            });
87
        }
88
    }
89
90
    /**
91
     * @throws Exception
92
     */
93
    public function syncMeta(): void
94
    {
95
        $socket = $this->pool->getConnection();
96
        rgo(function () use ($socket) {
97
            while (true) {
98
                try {
99
                    $params = [];
100
                    $requestData = ProtocolTool::encode(ProtocolTool::METADATA_REQUEST, $params);
101
                    $socket->send($requestData);
102
                    $dataLen = Protocol::unpack(Protocol::BIT_B32, $socket->recv(4));
103
                    $data = $socket->recv($dataLen);
0 ignored issues
show
Bug introduced by
It seems like $dataLen can also be of type false; however, parameter $length of Seasx\SeasLogger\Kafka\Socket\SocketIO::recv() does only seem to accept integer, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

103
                    $data = $socket->recv(/** @scrutinizer ignore-type */ $dataLen);
Loading history...
104
                    $correlationId = Protocol::unpack(Protocol::BIT_B32, substr($data, 0, 4));
0 ignored issues
show
Unused Code introduced by
The assignment to $correlationId is dead and can be removed.
Loading history...
105
                    $result = ProtocolTool::decode(ProtocolTool::METADATA_REQUEST, substr($data, 4));
106
                    if (!isset($result['brokers'], $result['topics'])) {
107
                        throw new Exception('Get metadata is fail, brokers or topics is null.');
108
                    }
109
                    $this->broker->setData($result['topics'], $result['brokers']);
110
                } finally {
111
                    Co::sleep(30);
112
                }
113
            }
114
        });
115
    }
116
117
    /**
118
     * @param string[][] $recordSet
119
     *
120
     * @return mixed[]
121
     * @throws InvalidRecordInSet
122
     */
123
    protected function convertRecordSet(array $recordSet): array
124
    {
125
        $sendData = [];
126
        while (empty($topics = $this->broker->getTopics())) {
127
            Co::sleep(0.5);
128
        }
129
130
        foreach ($recordSet as $record) {
131
132
            $this->recordValidator->validate($record, $topics);
133
134
            $topicMeta = $topics[$record['topic']];
135
            $partNums = array_keys($topicMeta);
136
            shuffle($partNums);
137
138
            $partId = isset($record['partId'], $topicMeta[$record['partId']]) ? $record['partId'] : $partNums[0];
139
140
            $brokerId = $topicMeta[$partId];
141
            $topicData = [];
142
            if (isset($sendData[$brokerId][$record['topic']])) {
143
                $topicData = $sendData[$brokerId][$record['topic']];
144
            }
145
146
            $partition = [];
147
            if (isset($topicData['partitions'][$partId])) {
148
                $partition = $topicData['partitions'][$partId];
149
            }
150
151
            $partition['partition_id'] = $partId;
152
153
            if (trim($record['key'] ?? '') !== '') {
154
                $partition['messages'][] = ['value' => $record['value'], 'key' => $record['key']];
155
            } else {
156
                $partition['messages'][] = $record['value'];
157
            }
158
159
            $topicData['partitions'][$partId] = $partition;
160
            $topicData['topic_name'] = $record['topic'];
161
            $sendData[$brokerId][$record['topic']] = $topicData;
162
        }
163
164
        return $sendData;
165
    }
166
}
167