GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Push — master ( 810014...a4ced4 )
by Benjamin
01:05
created

BroadcastMessage   A

Complexity

Total Complexity 5

Size/Duplication

Total Lines 24
Duplicated Lines 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
wmc 5
c 1
b 0
f 0
dl 0
loc 24
rs 10

1 Method

Rating   Name   Duplication   Size   Complexity  
B __init__() 0 23 5
1
from libAnt.constants import *
2
3
4
class Message:
5
    def __init__(self, type: int, content: bytes):
6
        self._type = type
7
        self._content = content
8
9
    def __len__(self):
10
        return len(self._content)
11
12
    def __iter__(self):
13
        return self._content
14
15
    def __str__(self):
16
        return '({:02X}): '.format(self._type) + ' '.join('{:02X}'.format(x) for x in self._content)
17
18
    def checksum(self) -> int:
19
        chk = MESSAGE_TX_SYNC ^ len(self) ^ self._type
20
        for b in self._content:
21
            chk ^= b
22
        return chk
23
24
    def encode(self) -> bytes:
25
        b = bytearray([MESSAGE_TX_SYNC, len(self), self._type]).extend(self._content).append(self.checksum())
26
        return bytes(b)
27
28
    @property
29
    def type(self) -> int:
30
        return self._type
31
32
    @property
33
    def content(self) -> bytes:
34
        return self._content
35
36
37
class BroadcastMessage(Message):
38
    def __init__(self, channel: int, content: bytes):
39
        c = bytearray([channel])
40
        c.extend(content[:8])
41
        if len(content) > 8:  # Extended message
42
            self._flag = content[8]
43
            self._extendedContent = content[len(content) - 9:]
44
            offset = 0
45
            if self._flag & EXT_FLAG_CHANNEL_ID:
46
                self._deviceNumber = int.from_bytes(self._extendedContent[:2], byteorder='little', signed=False)
47
                self._deviceType = self._extendedContent[2]
48
                self._transType = self._extendedContent[3]
49
                offset += 4
50
            if self._flag & EXT_FLAG_RSSI:
51
                rssi = self._extendedContent[len(self._extendedContent) - offset:]
52
                self._rssiMeasurementType = rssi[0]
53
                self._rssi = rssi[1]
54
                self._rssiThreshold = rssi[2]
55
                offset += 3
56
            if self._flag & EXT_FLAG_TIMESTAMP:
57
                self._rxTimestamp = int.from_bytes(self._extendedContent[len(self._extendedContent) - offset:],
58
                                                   byteorder='little', signed=False)
59
60
        super().__init__(MESSAGE_CHANNEL_BROADCAST_DATA, bytes(c))
61
62
63
class SystemResetMessage(Message):
64
    def __init__(self):
65
        super().__init__(MESSAGE_SYSTEM_RESET, b'0')
66
67
68
class SetNetworkKeyMessage(Message):
69
    def __init__(self, channel: int, key: bytes):
70
        content = bytearray([channel]).extend(key)
71
        super().__init__(MESSAGE_NETWORK_KEY, bytes(content))
72
73
74
class AssignChannelMessage(Message):
75
    def __init__(self, channel: int, type: int, network: int = 0, extended: int = None):
76
        content = bytearray([channel, type, network])
77
        if extended is not None:
78
            content.append(extended)
79
        super().__init__(MESSAGE_CHANNEL_ASSIGN, bytes(content))
80
81
82
class SetChannelIdMessage(Message):
83
    def __init__(self, channel: int, deviceNumber: int = 0, deviceType: int = 0, transType: int = 0):
84
        content = bytes([channel, deviceNumber.to_bytes(2, byteorder='big'), deviceType, transType])
85
        super().__init__(MESSAGE_CHANNEL_ID, content)
86
87
88
class SetChannelRfFrequencyMessage(Message):
89
    def __init__(self, channel: int, frequency: int = 2457):
90
        content = bytes([channel, frequency - 2400])
91
        super().__init__(MESSAGE_CHANNEL_FREQUENCY, content)
92
93
94
class OpenRxScanModeMessage(Message):
95
    def __init__(self):
96
        super().__init__(OPEN_RX_SCAN_MODE, b'1')
97
98
99
class EnableExtendedMessagesMessage(Message):
100
    def __init__(self, enable: bool = True):
101
        content = bytes([0, int(enable)])
102
        super().__init__(MESSAGE_ENABLE_EXT_RX_MESSAGES, content)
103