GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Push — master ( e6b36c...e49d52 )
by Benjamin
01:05
created

BroadcastMessage.build()   B

Complexity

Conditions 5

Size

Total Lines 23

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
c 0
b 0
f 0
dl 0
loc 23
rs 8.2508
cc 5
1
from libAnt.constants import *
2
3
4
class Message:
5
    def __init__(self, type: int, content: bytes):
6
        self._type = type
7
        self._content = content
8
9
    def __len__(self):
10
        return len(self._content)
11
12
    def __iter__(self):
13
        return self._content
14
15
    def __str__(self):
16
        return '({:02X}): '.format(self._type) + ' '.join('{:02X}'.format(x) for x in self._content)
17
18
    def checksum(self) -> int:
19
        chk = MESSAGE_TX_SYNC ^ len(self) ^ self._type
20
        for b in self._content:
21
            chk ^= b
22
        return chk
23
24
    def encode(self) -> bytes:
25
        b = bytearray([MESSAGE_TX_SYNC, len(self), self._type])
26
        b.extend(self._content)
27
        b.append(self.checksum())
28
        return bytes(b)
29
30
    @property
31
    def type(self) -> int:
32
        return self._type
33
34
    @property
35
    def content(self) -> bytes:
36
        return self._content
37
38
39
class BroadcastMessage(Message):
40
    def __init__(self, type: int, content: bytes):
41
        self.flag = None
42
        self.deviceNumber = self.deviceType = self.transType = None
43
        self.rssiMeasurementType = self.rssi = self.rssiThreshold = None
44
        self.rxTimestamp = None
45
46
        super().__init__(type, content)
47
48
    def build(self, raw: bytes):
49
        self._type = MESSAGE_CHANNEL_BROADCAST_DATA
50
        self.channel = raw[0]
51
        self._content = raw[1:9]
52
        if len(raw) > 9:  # Extended message
53
            self.flag = raw[9]
54
            self.extendedContent = raw[10:]
55
            offset = 0
56
            if self.flag & EXT_FLAG_CHANNEL_ID:
57
                self.deviceNumber = int.from_bytes(self.extendedContent[:2], byteorder='little', signed=False)
58
                self.deviceType = self.extendedContent[2]
59
                self.transType = self.extendedContent[3]
60
                offset += 4
61
            if self.flag & EXT_FLAG_RSSI:
62
                rssi = self.extendedContent[offset:(offset+3)]
63
                self.rssiMeasurementType = rssi[0]
64
                self.rssi = rssi[1]
65
                self.rssiThreshold = rssi[2]
66
                offset += 3
67
            if self.flag & EXT_FLAG_TIMESTAMP:
68
                self.rxTimestamp = int.from_bytes(self.extendedContent[offset:],
69
                                                   byteorder='little', signed=False)
70
        return self
71
72
    def checksum(self) -> int:
73
        pass
74
75
    def encode(self) -> bytes:
76
        pass
77
78
79
class SystemResetMessage(Message):
80
    def __init__(self):
81
        super().__init__(MESSAGE_SYSTEM_RESET, b'0')
82
83
84
class SetNetworkKeyMessage(Message):
85
    def __init__(self, channel: int, key: bytes):
86
        content = bytearray([channel])
87
        content.extend(key)
88
        super().__init__(MESSAGE_NETWORK_KEY, bytes(content))
89
90
91
class AssignChannelMessage(Message):
92
    def __init__(self, channel: int, type: int, network: int = 0, extended: int = None):
93
        content = bytearray([channel, type, network])
94
        if extended is not None:
95
            content.append(extended)
96
        super().__init__(MESSAGE_CHANNEL_ASSIGN, bytes(content))
97
98
99
class SetChannelIdMessage(Message):
100
    def __init__(self, channel: int, deviceNumber: int = 0, deviceType: int = 0, transType: int = 0):
101
        content = bytearray([channel])
102
        content.extend(deviceNumber.to_bytes(2, byteorder='big'))
103
        content.append(deviceType)
104
        content.append(transType)
105
        super().__init__(MESSAGE_CHANNEL_ID, bytes(content))
106
107
108
class SetChannelRfFrequencyMessage(Message):
109
    def __init__(self, channel: int, frequency: int = 2457):
110
        content = bytes([channel, frequency - 2400])
111
        super().__init__(MESSAGE_CHANNEL_FREQUENCY, content)
112
113
114
class OpenRxScanModeMessage(Message):
115
    def __init__(self):
116
        super().__init__(OPEN_RX_SCAN_MODE, bytes([0]))
117
118
119
class EnableExtendedMessagesMessage(Message):
120
    def __init__(self, enable: bool = True):
121
        content = bytes([0, int(enable)])
122
        super().__init__(MESSAGE_ENABLE_EXT_RX_MESSAGES, content)
123
124
class LibConfigMessage(Message):
125
    def __init__(self, rxTimestamp: bool = True, rssi: bool = True, channelId: bool = True):
126
        config = 0
127
        if rxTimestamp:
128
            config |= EXT_FLAG_TIMESTAMP
129
        if rssi:
130
            config |= EXT_FLAG_RSSI
131
        if channelId:
132
            config |= EXT_FLAG_CHANNEL_ID
133
        super().__init__(MESSAGE_LIB_CONFIG, bytes([0, config]))