Test Failed
Pull Request — master (#12)
by wujunze
03:09
created

Protocol   D

Complexity

Total Complexity 59

Size/Duplication

Total Lines 468
Duplicated Lines 0 %

Importance

Changes 1
Bugs 0 Features 1
Metric Value
eloc 161
c 1
b 0
f 1
dl 0
loc 468
rs 4.08
wmc 59

16 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 3 1
A isSystemLittleEndian() 0 10 2
B decodePrimitiveArray() 0 29 7
A unpack() 0 23 5
A encodeArray() 0 10 3
A decodeString() 0 17 4
A convertSignedShortFromLittleEndianToBigEndian() 0 15 2
A decompress() 0 15 4
B decodeArray() 0 28 7
A pack() 0 21 4
A getApiVersion() 0 19 5
A getApiText() 0 8 1
B checkLen() 0 24 7
A requestHeader() 0 11 1
A compress() 0 15 4
A encodeString() 0 6 2

How to fix   Complexity   

Complex Class

Complex classes like Protocol often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use Protocol, and based on these observations, apply Extract Interface, too.

1
<?php
2
declare(strict_types=1);
3
4
namespace Seasx\SeasLogger\Kafka;
5
6
use BadMethodCallException;
7
use Exception;
8
use function array_map;
9
use function array_shift;
10
use function array_values;
11
use function count;
12
use function gzdecode;
13
use function gzencode;
14
use function hex2bin;
15
use function in_array;
16
use function is_array;
17
use function pack;
18
use function strlen;
19
use function substr;
20
use function unpack;
21
use function version_compare;
22
23
abstract class Protocol
24
{
25
    public const NO_ERROR = 0;
26
    /**
27
     *  Default kafka broker verion
28
     */
29
    public const DEFAULT_BROKER_VERION = '0.9.0.0';
30
31
    /**
32
     *  Kafka server protocol version0
33
     */
34
    public const API_VERSION0 = 0;
35
36
    /**
37
     *  Kafka server protocol version 1
38
     */
39
    public const API_VERSION1 = 1;
40
41
    /**
42
     *  Kafka server protocol version 2
43
     */
44
    public const API_VERSION2 = 2;
45
46
    /**
47
     * use encode message, This is a version id used to allow backwards
48
     * compatible evolution of the message binary format.
49
     */
50
    public const MESSAGE_MAGIC_VERSION0 = 0;
51
52
    /**
53
     * use encode message, This is a version id used to allow backwards
54
     * compatible evolution of the message binary format.
55
     */
56
    public const MESSAGE_MAGIC_VERSION1 = 1;
57
58
    /**
59
     * message no compression
60
     */
61
    public const COMPRESSION_NONE = 0;
62
63
    /**
64
     * Message using gzip compression
65
     */
66
    public const COMPRESSION_GZIP = 1;
67
68
    /**
69
     * Message using Snappy compression
70
     */
71
    public const COMPRESSION_SNAPPY = 2;
72
73
    /**
74
     *  pack int32 type
75
     */
76
    public const PACK_INT32 = 0;
77
78
    /**
79
     * pack int16 type
80
     */
81
    public const PACK_INT16 = 1;
82
83
    /**
84
     * protocol request code
85
     */
86
    public const PRODUCE_REQUEST = 0;
87
88
    public const METADATA_REQUEST = 3;
89
90
    // unpack/pack bit
91
    public const BIT_B64 = 'N2';
92
93
    public const BIT_B32 = 'N';
94
95
    public const BIT_B16 = 'n';
96
97
    public const BIT_B16_SIGNED = 's';
98
99
    public const BIT_B8 = 'C';
100
    /**
101
     * gets set to true if the computer this code is running is little endian,
102
     * gets set to false if the computer this code is running on is big endian.
103
     *
104
     * @var null|bool
105
     */
106
    private static $isLittleEndianSystem;
107
    /**
108
     * @var string
109
     */
110
    protected $version = self::DEFAULT_BROKER_VERION;
111
112
    public function __construct(string $version = self::DEFAULT_BROKER_VERION)
113
    {
114
        $this->version = $version;
115
    }
116
117
    /**
118
     * @param array $array
119
     * @param callable $func
120
     * @param int|null $options
121
     * @return string
122
     */
123
    public static function encodeArray(array $array, callable $func, ?int $options = null): string
124
    {
125
        $arrayCount = count($array);
126
127
        $body = '';
128
        foreach ($array as $value) {
129
            $body .= $options !== null ? $func($value, $options) : $func($value);
130
        }
131
132
        return self::pack(self::BIT_B32, (string)$arrayCount) . $body;
133
    }
134
135
    public static function pack(string $type, string $data): string
136
    {
137
        if ($type !== self::BIT_B64) {
138
            return pack($type, $data);
139
        }
140
141
        if ((int)$data === -1) { // -1L
142
            return hex2bin('ffffffffffffffff');
143
        }
144
145
        if ((int)$data === -2) { // -2L
146
            return hex2bin('fffffffffffffffe');
147
        }
148
149
        $left = 0xffffffff00000000;
150
        $right = 0x00000000ffffffff;
151
152
        $l = ($data & $left) >> 32;
153
        $r = $data & $right;
154
155
        return pack($type, $l, $r);
156
    }
157
158
    /**
159
     * Get kafka api text
160
     * @param int $apikey
161
     * @return string
162
     */
163
    public static function getApiText(int $apikey): string
164
    {
165
        $apis = [
166
            self::PRODUCE_REQUEST => 'ProduceRequest',
167
            self::METADATA_REQUEST => 'MetadataRequest'
168
        ];
169
170
        return $apis[$apikey] ?? 'Unknown message';
171
    }
172
173
    /**
174
     * @param string $clientId
175
     * @param int $correlationId
176
     * @param int $apiKey
177
     * @return string
178
     */
179
    public function requestHeader(string $clientId, int $correlationId, int $apiKey): string
180
    {
181
        // int16 -- apiKey int16 -- apiVersion int32 correlationId
182
        $binData = self::pack(self::BIT_B16, (string)$apiKey);
183
        $binData .= self::pack(self::BIT_B16, (string)$this->getApiVersion($apiKey));
184
        $binData .= self::pack(self::BIT_B32, (string)$correlationId);
185
186
        // concat client id
187
        $binData .= self::encodeString($clientId, self::PACK_INT16);
188
189
        return $binData;
190
    }
191
192
    /**
193
     * Get kafka api version according to specify kafka broker version
194
     * @param int $apikey
195
     * @return int
196
     */
197
    public function getApiVersion(int $apikey): int
198
    {
199
        switch ($apikey) {
200
            case self::METADATA_REQUEST:
201
                return self::API_VERSION0;
202
            case self::PRODUCE_REQUEST:
203
                if (version_compare($this->version, '0.10.0') >= 0) {
204
                    return self::API_VERSION2;
205
                }
206
207
                if (version_compare($this->version, '0.9.0') >= 0) {
208
                    return self::API_VERSION1;
209
                }
210
211
                return self::API_VERSION0;
212
        }
213
214
        // default
215
        return self::API_VERSION0;
216
    }
217
218
    /**
219
     * @param string $string
220
     * @param int $bytes
221
     * @param int $compression
222
     * @return string
223
     */
224
    public static function encodeString(string $string, int $bytes, int $compression = self::COMPRESSION_NONE): string
225
    {
226
        $packLen = $bytes === self::PACK_INT32 ? self::BIT_B32 : self::BIT_B16;
227
        $string = self::compress($string, $compression);
228
229
        return self::pack($packLen, (string)strlen($string)) . $string;
230
    }
231
232
    /**
233
     * @param string $string
234
     * @param int $compression
235
     * @return string
236
     */
237
    private static function compress(string $string, int $compression): string
238
    {
239
        if ($compression === self::COMPRESSION_NONE) {
240
            return $string;
241
        }
242
243
        if ($compression === self::COMPRESSION_SNAPPY) {
244
            throw new BadMethodCallException('SNAPPY compression not yet implemented');
245
        }
246
247
        if ($compression !== self::COMPRESSION_GZIP) {
248
            throw new BadMethodCallException('Unknown compression flag: ' . $compression);
249
        }
250
251
        return gzencode($string);
252
    }
253
254
    /**
255
     * @param string $data
256
     * @param string $bytes
257
     * @param int $compression
258
     * @return mixed[]
259
     *
260
     * @throws Exception
261
     */
262
    public function decodeString(string $data, string $bytes, int $compression = self::COMPRESSION_NONE): array
263
    {
264
        $offset = $bytes === self::BIT_B32 ? 4 : 2;
265
        $packLen = self::unpack($bytes, substr($data, 0, $offset)); // int16 topic name length
266
267
        if ($packLen === 4294967295) { // uint32(4294967295) is int32 (-1)
268
            $packLen = 0;
269
        }
270
271
        if ($packLen === 0) {
272
            return ['length' => $offset, 'data' => ''];
273
        }
274
275
        $data = (string)substr($data, $offset, $packLen);
0 ignored issues
show
Bug introduced by
It seems like $packLen can also be of type false; however, parameter $length of substr() does only seem to accept integer, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

275
        $data = (string)substr($data, $offset, /** @scrutinizer ignore-type */ $packLen);
Loading history...
276
        $offset += $packLen;
277
278
        return ['length' => $offset, 'data' => self::decompress($data, $compression)];
279
    }
280
281
    /**
282
     * Unpack a bit integer as big endian long
283
     *
284
     * @param string $type
285
     * @param string $bytes
286
     * @return mixed
287
     * @throws Exception
288
     */
289
    public static function unpack(string $type, string $bytes)
290
    {
291
        self::checkLen($type, $bytes);
292
293
        if ($type === self::BIT_B64) {
294
            $set = unpack($type, $bytes);
295
            $result = ($set[1] & 0xFFFFFFFF) << 32 | ($set[2] & 0xFFFFFFFF);
296
        } elseif ($type === self::BIT_B16_SIGNED) {
297
            // According to PHP docs: 's' = signed short (always 16 bit, machine byte order)
298
            // So lets unpack it..
299
            $set = unpack($type, $bytes);
300
301
            // But if our system is little endian
302
            if (self::isSystemLittleEndian()) {
303
                // We need to flip the endianess because coming from kafka it is big endian
304
                $set = self::convertSignedShortFromLittleEndianToBigEndian($set);
0 ignored issues
show
Bug introduced by
It seems like $set can also be of type false; however, parameter $bits of Seasx\SeasLogger\Kafka\P...ttleEndianToBigEndian() does only seem to accept array, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

304
                $set = self::convertSignedShortFromLittleEndianToBigEndian(/** @scrutinizer ignore-type */ $set);
Loading history...
305
            }
306
            $result = $set;
307
        } else {
308
            $result = unpack($type, $bytes);
309
        }
310
311
        return is_array($result) ? array_shift($result) : $result;
312
    }
313
314
    /**
315
     * check unpack bit is valid
316
     *
317
     * @param string $type
318
     * @param string $bytes
319
     * @throws Exception
320
     */
321
    protected static function checkLen(string $type, string $bytes): void
322
    {
323
        $expectedLength = 0;
324
325
        switch ($type) {
326
            case self::BIT_B64:
327
                $expectedLength = 8;
328
                break;
329
            case self::BIT_B32:
330
                $expectedLength = 4;
331
                break;
332
            case self::BIT_B16_SIGNED:
333
            case self::BIT_B16:
334
                $expectedLength = 2;
335
                break;
336
            case self::BIT_B8:
337
                $expectedLength = 1;
338
                break;
339
        }
340
341
        $length = strlen($bytes);
342
343
        if ($length !== $expectedLength) {
344
            throw new Exception('unpack failed. string(raw) length is ' . $length . ' , TO ' . $type);
345
        }
346
    }
347
348
    /**
349
     * Determines if the computer currently running this code is big endian or little endian.
350
     */
351
    public static function isSystemLittleEndian(): bool
352
    {
353
        // If we don't know if our system is big endian or not yet...
354
        if (self::$isLittleEndianSystem === null) {
355
            [$endianTest] = array_values(unpack('L1L', pack('V', 1)));
0 ignored issues
show
Bug introduced by
It seems like unpack('L1L', pack('V', 1)) can also be of type false; however, parameter $input of array_values() does only seem to accept array, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

355
            [$endianTest] = array_values(/** @scrutinizer ignore-type */ unpack('L1L', pack('V', 1)));
Loading history...
356
357
            self::$isLittleEndianSystem = (int)$endianTest === 1;
358
        }
359
360
        return self::$isLittleEndianSystem;
361
    }
362
363
    /**
364
     * Converts a signed short (16 bits) from little endian to big endian.
365
     *
366
     * @param int[] $bits
367
     *
368
     * @return int[]
369
     */
370
    public static function convertSignedShortFromLittleEndianToBigEndian(array $bits): array
371
    {
372
        $convert = function (int $bit): int {
373
            $lsb = $bit & 0xff;
374
            $msb = $bit >> 8 & 0xff;
375
            $bit = $lsb << 8 | $msb;
376
377
            if ($bit >= 32768) {
378
                $bit -= 65536;
379
            }
380
381
            return $bit;
382
        };
383
384
        return array_map($convert, $bits);
385
    }
386
387
    private static function decompress(string $string, int $compression): string
388
    {
389
        if ($compression === self::COMPRESSION_NONE) {
390
            return $string;
391
        }
392
393
        if ($compression === self::COMPRESSION_SNAPPY) {
394
            throw new BadMethodCallException('SNAPPY compression not yet implemented');
395
        }
396
397
        if ($compression !== self::COMPRESSION_GZIP) {
398
            throw new BadMethodCallException('Unknown compression flag: ' . $compression);
399
        }
400
401
        return gzdecode($string);
402
    }
403
404
    /**
405
     * @param string $data
406
     * @param callable $func
407
     * @param mixed|null $options
408
     *
409
     * @return mixed[]
410
     *
411
     * @throws Exception
412
     */
413
    public function decodeArray(string $data, callable $func, $options = null): array
414
    {
415
        $offset = 0;
416
        $arrayCount = self::unpack(self::BIT_B32, substr($data, $offset, 4));
417
        $offset += 4;
418
419
        $result = [];
420
421
        for ($i = 0; $i < $arrayCount; $i++) {
422
            $value = substr($data, $offset);
423
            $ret = $options !== null ? $func($value, $options) : $func($value);
424
425
            if (!is_array($ret) && $ret === false) {
426
                break;
427
            }
428
429
            if (!isset($ret['length'], $ret['data'])) {
430
                throw new Exception('Decode array failed, given function return format is invalid');
431
            }
432
            if ((int)$ret['length'] === 0) {
433
                continue;
434
            }
435
436
            $offset += $ret['length'];
437
            $result[] = $ret['data'];
438
        }
439
440
        return ['length' => $offset, 'data' => $result];
441
    }
442
443
    /**
444
     * @param string $data
445
     * @param string $bit
446
     * @return mixed[]
447
     *
448
     */
449
    public function decodePrimitiveArray(string $data, string $bit): array
450
    {
451
        $offset = 0;
452
        $arrayCount = self::unpack(self::BIT_B32, substr($data, $offset, 4));
453
        $offset += 4;
454
455
        if ($arrayCount === 4294967295) {
456
            $arrayCount = 0;
457
        }
458
459
        $result = [];
460
461
        for ($i = 0; $i < $arrayCount; $i++) {
462
            if ($bit === self::BIT_B64) {
463
                $result[] = self::unpack(self::BIT_B64, substr($data, $offset, 8));
464
                $offset += 8;
465
            } elseif ($bit === self::BIT_B32) {
466
                $result[] = self::unpack(self::BIT_B32, substr($data, $offset, 4));
467
                $offset += 4;
468
            } elseif (in_array($bit, [self::BIT_B16, self::BIT_B16_SIGNED], true)) {
469
                $result[] = self::unpack($bit, substr($data, $offset, 2));
470
                $offset += 2;
471
            } elseif ($bit === self::BIT_B8) {
472
                $result[] = self::unpack($bit, substr($data, $offset, 1));
473
                ++$offset;
474
            }
475
        }
476
477
        return ['length' => $offset, 'data' => $result];
478
    }
479
480
    /**
481
     * @param array $payloads
482
     * @return string
483
     */
484
    abstract public function encode(array $payloads = []): string;
485
486
    /**
487
     * @param string $data
488
     * @return array
489
     */
490
    abstract public function decode(string $data): array;
491
}
492