Test Failed
Pull Request — master (#12)
by
unknown
02:46
created

Produce   A

Complexity

Total Complexity 31

Size/Duplication

Total Lines 288
Duplicated Lines 0 %

Importance

Changes 1
Bugs 0 Features 1
Metric Value
eloc 110
c 1
b 0
f 1
dl 0
loc 288
rs 9.92
wmc 31

12 Methods

Rating   Name   Duplication   Size   Complexity  
A encodeMessage() 0 31 3
A computeAttributes() 0 17 4
A computeMagicByte() 0 7 2
A encodeProduceTopic() 0 14 4
A __construct() 0 5 2
A computeTimestampType() 0 7 2
A encode() 0 16 2
A encodeMessageSet() 0 20 3
A decode() 0 13 2
A produceTopicPair() 0 13 1
A producePartitionPair() 0 23 2
A encodeProducePartition() 0 17 4
1
<?php
2
declare(strict_types=1);
3
4
namespace Seasx\SeasLogger\Kafka;
5
6
use InvalidArgumentException;
7
use Lcobucci\Clock\Clock;
8
use Lcobucci\Clock\SystemClock;
9
use function crc32;
10
use function is_array;
11
use function substr;
12
13
class Produce extends Protocol
14
{
15
    /**
16
     * Specifies the mask for the compression code. 3 bits to hold the compression codec.
17
     * 0 is reserved to indicate no compression
18
     */
19
    public const COMPRESSION_CODEC_MASK = 0x07;
20
21
    /**
22
     * Specify the mask of timestamp type: 0 for CreateTime, 1 for LogAppendTime.
23
     */
24
    private const TIMESTAMP_TYPE_MASK = 0x08;
25
26
    private const TIMESTAMP_NONE = -1;
27
    private const TIMESTAMP_CREATE_TIME = 0;
28
    private const TIMESTAMP_LOG_APPEND_TIME = 1;
29
30
    /**
31
     * @var Clock
32
     */
33
    private $clock;
34
35
    public function __construct(string $version = self::DEFAULT_BROKER_VERION, ?Clock $clock = null)
36
    {
37
        parent::__construct($version);
38
39
        $this->clock = $clock ?: new SystemClock();
40
    }
41
42
    /**
43
     * @param array $payloads
44
     * @return string
45
     */
46
    public function encode(array $payloads = []): string
47
    {
48
        if (!isset($payloads['data'])) {
49
            throw new InvalidArgumentException('given procude data invalid. `data` is undefined.');
50
        }
51
52
        $header = $this->requestHeader('seaslog-kafka', 0, self::PRODUCE_REQUEST);
53
        $data = self::pack(self::BIT_B16, (string)($payloads['required_ack'] ?? 0));
54
        $data .= self::pack(self::BIT_B32, (string)($payloads['timeout'] ?? 100));
55
        $data .= self::encodeArray(
56
            $payloads['data'],
57
            [$this, 'encodeProduceTopic'],
58
            $payloads['compression'] ?? self::COMPRESSION_NONE
59
        );
60
61
        return self::encodeString($header . $data, self::PACK_INT32);
62
    }
63
64
    /**
65
     * @param string $data
66
     * @return array
67
     * @throws \Exception
68
     */
69
    public function decode(string $data): array
70
    {
71
        $offset = 0;
72
        $version = $this->getApiVersion(self::PRODUCE_REQUEST);
73
        $ret = $this->decodeArray(substr($data, $offset), [$this, 'produceTopicPair'], $version);
74
        $offset += $ret['length'];
75
        $throttleTime = 0;
76
77
        if ($version === self::API_VERSION2) {
78
            $throttleTime = self::unpack(self::BIT_B32, substr($data, $offset, 4));
79
        }
80
81
        return ['throttleTime' => $throttleTime, 'data' => $ret['data']];
82
    }
83
84
    /**
85
     * encode signal part
86
     *
87
     * @param mixed[] $values
88
     *
89
     * @param int $compression
90
     * @return string
91
     */
92
    protected function encodeProducePartition(array $values, int $compression): string
93
    {
94
        if (!isset($values['partition_id'])) {
95
            throw new InvalidArgumentException('given produce data invalid. `partition_id` is undefined.');
96
        }
97
98
        if (!isset($values['messages']) || empty($values['messages'])) {
99
            throw new InvalidArgumentException('given produce data invalid. `messages` is undefined.');
100
        }
101
102
        $data = self::pack(self::BIT_B32, (string)$values['partition_id']);
103
        $data .= self::encodeString(
104
            $this->encodeMessageSet((array)$values['messages'], $compression),
105
            self::PACK_INT32
106
        );
107
108
        return $data;
109
    }
110
111
    /**
112
     * encode message set
113
     * N.B., MessageSets are not preceded by an int32 like other array elements
114
     * in the protocol.
115
     *
116
     * @param string[]|string[][] $messages
117
     *
118
     * @param int $compression
119
     * @return string
120
     */
121
    protected function encodeMessageSet(array $messages, int $compression = self::COMPRESSION_NONE): string
122
    {
123
        $data = '';
124
        $next = 0;
125
126
        foreach ($messages as $message) {
127
            $encodedMessage = $this->encodeMessage($message);
128
129
            $data .= self::pack(self::BIT_B64, (string)$next)
130
                . self::encodeString($encodedMessage, self::PACK_INT32);
131
132
            ++$next;
133
        }
134
135
        if ($compression === self::COMPRESSION_NONE) {
136
            return $data;
137
        }
138
139
        return self::pack(self::BIT_B64, '0')
140
            . self::encodeString($this->encodeMessage($data, $compression), self::PACK_INT32);
141
    }
142
143
    /**
144
     * @param string[]|string $message
145
     *
146
     * @param int $compression
147
     * @return string
148
     */
149
    protected function encodeMessage($message, int $compression = self::COMPRESSION_NONE): string
150
    {
151
        $magic = $this->computeMagicByte();
152
        $attributes = $this->computeAttributes($magic, $compression, $this->computeTimestampType($magic));
153
154
        $data = self::pack(self::BIT_B8, (string)$magic);
155
        $data .= self::pack(self::BIT_B8, (string)$attributes);
156
157
        if ($magic >= self::MESSAGE_MAGIC_VERSION1) {
158
            $data .= self::pack(self::BIT_B64, $this->clock->now()->format('Uv'));
159
        }
160
161
        $key = '';
162
163
        if (is_array($message)) {
164
            $key = $message['key'];
165
            $message = $message['value'];
166
        }
167
168
        // message key
169
        $data .= self::encodeString($key, self::PACK_INT32);
170
171
        // message value
172
        $data .= self::encodeString($message, self::PACK_INT32, $compression);
173
174
        $crc = (string)crc32($data);
175
176
        // int32 -- crc code  string data
177
        $message = self::pack(self::BIT_B32, $crc) . $data;
178
179
        return $message;
180
    }
181
182
    private function computeMagicByte(): int
183
    {
184
        if ($this->getApiVersion(self::PRODUCE_REQUEST) === self::API_VERSION2) {
185
            return self::MESSAGE_MAGIC_VERSION1;
186
        }
187
188
        return self::MESSAGE_MAGIC_VERSION0;
189
    }
190
191
    private function computeAttributes(int $magic, int $compression, int $timestampType): int
192
    {
193
        $attributes = 0;
194
195
        if ($compression !== self::COMPRESSION_NONE) {
196
            $attributes |= self::COMPRESSION_CODEC_MASK & $compression;
197
        }
198
199
        if ($magic === self::MESSAGE_MAGIC_VERSION0) {
200
            return $attributes;
201
        }
202
203
        if ($timestampType === self::TIMESTAMP_LOG_APPEND_TIME) {
204
            $attributes |= self::TIMESTAMP_TYPE_MASK;
205
        }
206
207
        return $attributes;
208
    }
209
210
    public function computeTimestampType(int $magic): int
211
    {
212
        if ($magic === self::MESSAGE_MAGIC_VERSION0) {
213
            return self::TIMESTAMP_NONE;
214
        }
215
216
        return self::TIMESTAMP_CREATE_TIME;
217
    }
218
219
    /**
220
     * encode signal topic
221
     *
222
     * @param mixed[] $values
223
     *
224
     * @param int $compression
225
     * @return string
226
     */
227
    protected function encodeProduceTopic(array $values, int $compression): string
228
    {
229
        if (!isset($values['topic_name'])) {
230
            throw new InvalidArgumentException('given produce data invalid. `topic_name` is undefined.');
231
        }
232
233
        if (!isset($values['partitions']) || empty($values['partitions'])) {
234
            throw new InvalidArgumentException('given produce data invalid. `partitions` is undefined.');
235
        }
236
237
        $topic = self::encodeString($values['topic_name'], self::PACK_INT16);
238
        $partitions = self::encodeArray($values['partitions'], [$this, 'encodeProducePartition'], $compression);
239
240
        return $topic . $partitions;
241
    }
242
243
    /**
244
     * decode produce topic pair response
245
     *
246
     * @param string $data
247
     * @param int $version
248
     * @return mixed[]
249
     *
250
     * @throws \Exception
251
     */
252
    protected function produceTopicPair(string $data, int $version): array
253
    {
254
        $offset = 0;
255
        $topicInfo = $this->decodeString($data, self::BIT_B16);
256
        $offset += $topicInfo['length'];
257
        $ret = $this->decodeArray(substr($data, $offset), [$this, 'producePartitionPair'], $version);
258
        $offset += $ret['length'];
259
260
        return [
261
            'length' => $offset,
262
            'data' => [
263
                'topicName' => $topicInfo['data'],
264
                'partitions' => $ret['data'],
265
            ],
266
        ];
267
    }
268
269
    /**
270
     * decode produce partition pair response
271
     *
272
     * @param string $data
273
     * @param int $version
274
     * @return mixed[]
275
     *
276
     * @throws \Exception
277
     */
278
    protected function producePartitionPair(string $data, int $version): array
279
    {
280
        $offset = 0;
281
        $partitionId = self::unpack(self::BIT_B32, substr($data, $offset, 4));
282
        $offset += 4;
283
        $errorCode = self::unpack(self::BIT_B16_SIGNED, substr($data, $offset, 2));
284
        $offset += 2;
285
        self::unpack(self::BIT_B64, substr($data, $offset, 8));
286
        $offset += 8;
287
        $timestamp = 0;
288
289
        if ($version === self::API_VERSION2) {
290
            $timestamp = self::unpack(self::BIT_B64, substr($data, $offset, 8));
291
            $offset += 8;
292
        }
293
294
        return [
295
            'length' => $offset,
296
            'data' => [
297
                'partition' => $partitionId,
298
                'errorCode' => $errorCode,
299
                'offset' => $offset,
300
                'timestamp' => $timestamp,
301
            ],
302
        ];
303
    }
304
}
305