Test Failed
Pull Request — master (#12)
by
unknown
02:46
created

Metadata   A

Complexity

Total Complexity 7

Size/Duplication

Total Lines 120
Duplicated Lines 0 %

Importance

Changes 1
Bugs 0 Features 1
Metric Value
eloc 61
c 1
b 0
f 1
dl 0
loc 120
rs 10
wmc 7

5 Methods

Rating   Name   Duplication   Size   Complexity  
A decode() 0 14 1
A metaPartitionMetaData() 0 22 1
A metaTopicMetaData() 0 16 1
A encode() 0 13 3
A metaBroker() 0 16 1
1
<?php
2
declare(strict_types=1);
3
4
namespace Seasx\SeasLogger\Kafka;
5
6
use Exception;
7
use function is_string;
8
use function substr;
9
10
class Metadata extends Protocol
11
{
12
    /**
13
     * @param array $payloads
14
     * @return string
15
     * @throws Exception
16
     */
17
    public function encode(array $payloads = []): string
18
    {
19
        foreach ($payloads as $topic) {
20
            if (!is_string($topic)) {
21
                throw new Exception('request metadata topic array have invalid value. ');
22
            }
23
        }
24
25
        $header = $this->requestHeader('seaslog-kafka', self::METADATA_REQUEST, self::METADATA_REQUEST);
26
        $data = self::encodeArray($payloads, [$this, 'encodeString'], self::PACK_INT16);
27
        $data = self::encodeString($header . $data, self::PACK_INT32);
28
29
        return $data;
30
    }
31
32
    /**
33
     * @param string $data
34
     * @return array
35
     * @throws Exception
36
     */
37
    public function decode(string $data): array
38
    {
39
        $offset = 0;
40
        $brokerRet = $this->decodeArray(substr($data, $offset), [$this, 'metaBroker']);
41
        $offset += $brokerRet['length'];
42
        $topicMetaRet = $this->decodeArray(substr($data, $offset), [$this, 'metaTopicMetaData']);
43
        $offset += $topicMetaRet['length'];
44
45
        $result = [
46
            'brokers' => $brokerRet['data'],
47
            'topics' => $topicMetaRet['data'],
48
        ];
49
50
        return $result;
51
    }
52
53
    /**
54
     * @param string $data
55
     * @return array
56
     * @throws Exception
57
     */
58
    protected function metaBroker(string $data): array
59
    {
60
        $offset = 0;
61
        $nodeId = self::unpack(self::BIT_B32, substr($data, $offset, 4));
62
        $offset += 4;
63
        $hostNameInfo = $this->decodeString(substr($data, $offset), self::BIT_B16);
64
        $offset += $hostNameInfo['length'];
65
        $port = self::unpack(self::BIT_B32, substr($data, $offset, 4));
66
        $offset += 4;
67
68
        return [
69
            'length' => $offset,
70
            'data' => [
71
                'host' => $hostNameInfo['data'],
72
                'port' => $port,
73
                'nodeId' => $nodeId,
74
            ],
75
        ];
76
    }
77
78
    /**
79
     * @param string $data
80
     * @return array
81
     * @throws Exception
82
     */
83
    protected function metaTopicMetaData(string $data): array
84
    {
85
        $offset = 0;
86
        $topicErrCode = self::unpack(self::BIT_B16_SIGNED, substr($data, $offset, 2));
87
        $offset += 2;
88
        $topicInfo = $this->decodeString(substr($data, $offset), self::BIT_B16);
89
        $offset += $topicInfo['length'];
90
        $partitionsMeta = $this->decodeArray(substr($data, $offset), [$this, 'metaPartitionMetaData']);
91
        $offset += $partitionsMeta['length'];
92
93
        return [
94
            'length' => $offset,
95
            'data' => [
96
                'topicName' => $topicInfo['data'],
97
                'errorCode' => $topicErrCode,
98
                'partitions' => $partitionsMeta['data'],
99
            ],
100
        ];
101
    }
102
103
    /**
104
     * @param string $data
105
     * @return array
106
     * @throws Exception
107
     */
108
    protected function metaPartitionMetaData(string $data): array
109
    {
110
        $offset = 0;
111
        $errcode = self::unpack(self::BIT_B16_SIGNED, substr($data, $offset, 2));
112
        $offset += 2;
113
        $partId = self::unpack(self::BIT_B32, substr($data, $offset, 4));
114
        $offset += 4;
115
        $leader = self::unpack(self::BIT_B32, substr($data, $offset, 4));
116
        $offset += 4;
117
        $replicas = $this->decodePrimitiveArray(substr($data, $offset), self::BIT_B32);
118
        $offset += $replicas['length'];
119
        $isr = $this->decodePrimitiveArray(substr($data, $offset), self::BIT_B32);
120
        $offset += $isr['length'];
121
122
        return [
123
            'length' => $offset,
124
            'data' => [
125
                'partitionId' => $partId,
126
                'errorCode' => $errcode,
127
                'replicas' => $replicas['data'],
128
                'leader' => $leader,
129
                'isr' => $isr['data'],
130
            ],
131
        ];
132
    }
133
}
134