Test Failed
Pull Request — master (#12)
by wujunze
03:09
created

Produce   A

Complexity

Total Complexity 31

Size/Duplication

Total Lines 284
Duplicated Lines 0 %

Importance

Changes 1
Bugs 0 Features 1
Metric Value
eloc 110
c 1
b 0
f 1
dl 0
loc 284
rs 9.92
wmc 31

12 Methods

Rating   Name   Duplication   Size   Complexity  
A encode() 0 16 2
A encodeMessage() 0 31 3
A encodeMessageSet() 0 20 3
A decode() 0 13 2
A produceTopicPair() 0 13 1
A producePartitionPair() 0 23 2
A encodeProducePartition() 0 17 4
A computeAttributes() 0 17 4
A computeMagicByte() 0 7 2
A encodeProduceTopic() 0 14 4
A __construct() 0 5 2
A computeTimestampType() 0 7 2
1
<?php
2
declare(strict_types=1);
3
4
namespace Seasx\SeasLogger\Kafka;
5
6
use InvalidArgumentException;
7
use Lcobucci\Clock\Clock;
8
use Lcobucci\Clock\SystemClock;
9
use function crc32;
10
use function is_array;
11
use function substr;
12
13
class Produce extends Protocol
14
{
15
    /**
16
     * Specifies the mask for the compression code. 3 bits to hold the compression codec.
17
     * 0 is reserved to indicate no compression
18
     */
19
    public const COMPRESSION_CODEC_MASK = 0x07;
20
21
    /**
22
     * Specify the mask of timestamp type: 0 for CreateTime, 1 for LogAppendTime.
23
     */
24
    private const TIMESTAMP_TYPE_MASK = 0x08;
25
26
    private const TIMESTAMP_NONE = -1;
27
    private const TIMESTAMP_CREATE_TIME = 0;
28
    private const TIMESTAMP_LOG_APPEND_TIME = 1;
29
30
    /**
31
     * @var Clock
32
     */
33
    private $clock;
34
35
    public function __construct(string $version = self::DEFAULT_BROKER_VERION, ?Clock $clock = null)
36
    {
37
        parent::__construct($version);
38
39
        $this->clock = $clock ?: new SystemClock();
40
    }
41
42
    /**
43
     * @param mixed[] $payloads
44
     *
45
     * @throws NotSupported
46
     * @throws ProtocolException
47
     */
48
    public function encode(array $payloads = []): string
49
    {
50
        if (!isset($payloads['data'])) {
51
            throw new InvalidArgumentException('given procude data invalid. `data` is undefined.');
52
        }
53
54
        $header = $this->requestHeader('seaslog-kafka', 0, self::PRODUCE_REQUEST);
55
        $data = self::pack(self::BIT_B16, (string)($payloads['required_ack'] ?? 0));
56
        $data .= self::pack(self::BIT_B32, (string)($payloads['timeout'] ?? 100));
57
        $data .= self::encodeArray(
58
            $payloads['data'],
59
            [$this, 'encodeProduceTopic'],
60
            $payloads['compression'] ?? self::COMPRESSION_NONE
61
        );
62
63
        return self::encodeString($header . $data, self::PACK_INT32);
64
    }
65
66
    /**
67
     * @return mixed[]
68
     *
69
     * @throws ProtocolException
70
     */
71
    public function decode(string $data): array
72
    {
73
        $offset = 0;
74
        $version = $this->getApiVersion(self::PRODUCE_REQUEST);
75
        $ret = $this->decodeArray(substr($data, $offset), [$this, 'produceTopicPair'], $version);
76
        $offset += $ret['length'];
77
        $throttleTime = 0;
78
79
        if ($version === self::API_VERSION2) {
80
            $throttleTime = self::unpack(self::BIT_B32, substr($data, $offset, 4));
81
        }
82
83
        return ['throttleTime' => $throttleTime, 'data' => $ret['data']];
84
    }
85
86
    /**
87
     * encode signal part
88
     *
89
     * @param mixed[] $values
90
     *
91
     * @throws NotSupported
92
     * @throws ProtocolException
93
     */
94
    protected function encodeProducePartition(array $values, int $compression): string
95
    {
96
        if (!isset($values['partition_id'])) {
97
            throw new ProtocolException('given produce data invalid. `partition_id` is undefined.');
0 ignored issues
show
Bug introduced by
The type Seasx\SeasLogger\Kafka\ProtocolException was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
98
        }
99
100
        if (!isset($values['messages']) || empty($values['messages'])) {
101
            throw new ProtocolException('given produce data invalid. `messages` is undefined.');
102
        }
103
104
        $data = self::pack(self::BIT_B32, (string)$values['partition_id']);
105
        $data .= self::encodeString(
106
            $this->encodeMessageSet((array)$values['messages'], $compression),
107
            self::PACK_INT32
108
        );
109
110
        return $data;
111
    }
112
113
    /**
114
     * encode message set
115
     * N.B., MessageSets are not preceded by an int32 like other array elements
116
     * in the protocol.
117
     *
118
     * @param string[]|string[][] $messages
119
     *
120
     * @throws NotSupported
121
     */
122
    protected function encodeMessageSet(array $messages, int $compression = self::COMPRESSION_NONE): string
123
    {
124
        $data = '';
125
        $next = 0;
126
127
        foreach ($messages as $message) {
128
            $encodedMessage = $this->encodeMessage($message);
129
130
            $data .= self::pack(self::BIT_B64, (string)$next)
131
                . self::encodeString($encodedMessage, self::PACK_INT32);
132
133
            ++$next;
134
        }
135
136
        if ($compression === self::COMPRESSION_NONE) {
137
            return $data;
138
        }
139
140
        return self::pack(self::BIT_B64, '0')
141
            . self::encodeString($this->encodeMessage($data, $compression), self::PACK_INT32);
142
    }
143
144
    /**
145
     * @param string[]|string $message
146
     *
147
     * @throws NotSupported
148
     */
149
    protected function encodeMessage($message, int $compression = self::COMPRESSION_NONE): string
150
    {
151
        $magic = $this->computeMagicByte();
152
        $attributes = $this->computeAttributes($magic, $compression, $this->computeTimestampType($magic));
153
154
        $data = self::pack(self::BIT_B8, (string)$magic);
155
        $data .= self::pack(self::BIT_B8, (string)$attributes);
156
157
        if ($magic >= self::MESSAGE_MAGIC_VERSION1) {
158
            $data .= self::pack(self::BIT_B64, $this->clock->now()->format('Uv'));
159
        }
160
161
        $key = '';
162
163
        if (is_array($message)) {
164
            $key = $message['key'];
165
            $message = $message['value'];
166
        }
167
168
        // message key
169
        $data .= self::encodeString($key, self::PACK_INT32);
170
171
        // message value
172
        $data .= self::encodeString($message, self::PACK_INT32, $compression);
173
174
        $crc = (string)crc32($data);
175
176
        // int32 -- crc code  string data
177
        $message = self::pack(self::BIT_B32, $crc) . $data;
178
179
        return $message;
180
    }
181
182
    private function computeMagicByte(): int
183
    {
184
        if ($this->getApiVersion(self::PRODUCE_REQUEST) === self::API_VERSION2) {
185
            return self::MESSAGE_MAGIC_VERSION1;
186
        }
187
188
        return self::MESSAGE_MAGIC_VERSION0;
189
    }
190
191
    private function computeAttributes(int $magic, int $compression, int $timestampType): int
192
    {
193
        $attributes = 0;
194
195
        if ($compression !== self::COMPRESSION_NONE) {
196
            $attributes |= self::COMPRESSION_CODEC_MASK & $compression;
197
        }
198
199
        if ($magic === self::MESSAGE_MAGIC_VERSION0) {
200
            return $attributes;
201
        }
202
203
        if ($timestampType === self::TIMESTAMP_LOG_APPEND_TIME) {
204
            $attributes |= self::TIMESTAMP_TYPE_MASK;
205
        }
206
207
        return $attributes;
208
    }
209
210
    public function computeTimestampType(int $magic): int
211
    {
212
        if ($magic === self::MESSAGE_MAGIC_VERSION0) {
213
            return self::TIMESTAMP_NONE;
214
        }
215
216
        return self::TIMESTAMP_CREATE_TIME;
217
    }
218
219
    /**
220
     * encode signal topic
221
     *
222
     * @param mixed[] $values
223
     *
224
     * @throws NotSupported
225
     * @throws ProtocolException
226
     */
227
    protected function encodeProduceTopic(array $values, int $compression): string
228
    {
229
        if (!isset($values['topic_name'])) {
230
            throw new ProtocolException('given produce data invalid. `topic_name` is undefined.');
231
        }
232
233
        if (!isset($values['partitions']) || empty($values['partitions'])) {
234
            throw new ProtocolException('given produce data invalid. `partitions` is undefined.');
235
        }
236
237
        $topic = self::encodeString($values['topic_name'], self::PACK_INT16);
238
        $partitions = self::encodeArray($values['partitions'], [$this, 'encodeProducePartition'], $compression);
239
240
        return $topic . $partitions;
241
    }
242
243
    /**
244
     * decode produce topic pair response
245
     *
246
     * @return mixed[]
247
     *
248
     * @throws ProtocolException
249
     */
250
    protected function produceTopicPair(string $data, int $version): array
251
    {
252
        $offset = 0;
253
        $topicInfo = $this->decodeString($data, self::BIT_B16);
254
        $offset += $topicInfo['length'];
255
        $ret = $this->decodeArray(substr($data, $offset), [$this, 'producePartitionPair'], $version);
256
        $offset += $ret['length'];
257
258
        return [
259
            'length' => $offset,
260
            'data' => [
261
                'topicName' => $topicInfo['data'],
262
                'partitions' => $ret['data'],
263
            ],
264
        ];
265
    }
266
267
    /**
268
     * decode produce partition pair response
269
     *
270
     * @return mixed[]
271
     *
272
     * @throws ProtocolException
273
     */
274
    protected function producePartitionPair(string $data, int $version): array
275
    {
276
        $offset = 0;
277
        $partitionId = self::unpack(self::BIT_B32, substr($data, $offset, 4));
278
        $offset += 4;
279
        $errorCode = self::unpack(self::BIT_B16_SIGNED, substr($data, $offset, 2));
280
        $offset += 2;
281
        $partitionOffset = self::unpack(self::BIT_B64, substr($data, $offset, 8));
0 ignored issues
show
Unused Code introduced by
The assignment to $partitionOffset is dead and can be removed.
Loading history...
282
        $offset += 8;
283
        $timestamp = 0;
284
285
        if ($version === self::API_VERSION2) {
286
            $timestamp = self::unpack(self::BIT_B64, substr($data, $offset, 8));
287
            $offset += 8;
288
        }
289
290
        return [
291
            'length' => $offset,
292
            'data' => [
293
                'partition' => $partitionId,
294
                'errorCode' => $errorCode,
295
                'offset' => $offset,
296
                'timestamp' => $timestamp,
297
            ],
298
        ];
299
    }
300
}
301