GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Push — master ( a4ced4...c9dc46 )
by Benjamin
01:09
created

BroadcastMessage   A

Complexity

Total Complexity 8

Size/Duplication

Total Lines 33
Duplicated Lines 0 %

Importance

Changes 2
Bugs 0 Features 0
Metric Value
wmc 8
c 2
b 0
f 0
dl 0
loc 33
rs 10

4 Methods

Rating   Name   Duplication   Size   Complexity  
B __init__() 0 23 5
A encode() 0 2 1
A __str__() 0 2 1
A checksum() 0 2 1
1
from libAnt.constants import *
2
3
4
class Message:
5
    def __init__(self, type: int, content: bytes):
6
        self._type = type
7
        self._content = content
8
9
    def __len__(self):
10
        return len(self._content)
11
12
    def __iter__(self):
13
        return self._content
14
15
    def __str__(self):
16
        return '({:02X}): '.format(self._type) + ' '.join('{:02X}'.format(x) for x in self._content)
17
18
    def checksum(self) -> int:
19
        chk = MESSAGE_TX_SYNC ^ len(self) ^ self._type
20
        for b in self._content:
21
            chk ^= b
22
        return chk
23
24
    def encode(self) -> bytes:
25
        b = bytearray([MESSAGE_TX_SYNC, len(self), self._type]).extend(self._content).append(self.checksum())
26
        return bytes(b)
27
28
    @property
29
    def type(self) -> int:
30
        return self._type
31
32
    @property
33
    def content(self) -> bytes:
34
        return self._content
35
36
37
class BroadcastMessage(Message):
38
    def __init__(self, channel: int, content: bytes):
39
        c = bytearray([channel])
40
        c.extend(content[:8])
41
        if len(content) > 8:  # Extended message
42
            self._flag = content[8]
43
            self._extendedContent = content[len(content) - 9:]
44
            offset = 0
45
            if self._flag & EXT_FLAG_CHANNEL_ID:
46
                self._deviceNumber = int.from_bytes(self._extendedContent[:2], byteorder='little', signed=False)
47
                self._deviceType = self._extendedContent[2]
48
                self._transType = self._extendedContent[3]
49
                offset += 4
50
            if self._flag & EXT_FLAG_RSSI:
51
                rssi = self._extendedContent[len(self._extendedContent) - offset:]
52
                self._rssiMeasurementType = rssi[0]
53
                self._rssi = rssi[1]
54
                self._rssiThreshold = rssi[2]
55
                offset += 3
56
            if self._flag & EXT_FLAG_TIMESTAMP:
57
                self._rxTimestamp = int.from_bytes(self._extendedContent[len(self._extendedContent) - offset:],
58
                                                   byteorder='little', signed=False)
59
60
        super().__init__(MESSAGE_CHANNEL_BROADCAST_DATA, bytes(c))
61
62
    def __str__(self):
63
        pass
64
65
    def checksum(self) -> int:
66
        pass
67
68
    def encode(self) -> bytes:
69
        pass
70
71
72
class SystemResetMessage(Message):
73
    def __init__(self):
74
        super().__init__(MESSAGE_SYSTEM_RESET, b'0')
75
76
77
class SetNetworkKeyMessage(Message):
78
    def __init__(self, channel: int, key: bytes):
79
        content = bytearray([channel]).extend(key)
80
        super().__init__(MESSAGE_NETWORK_KEY, bytes(content))
81
82
83
class AssignChannelMessage(Message):
84
    def __init__(self, channel: int, type: int, network: int = 0, extended: int = None):
85
        content = bytearray([channel, type, network])
86
        if extended is not None:
87
            content.append(extended)
88
        super().__init__(MESSAGE_CHANNEL_ASSIGN, bytes(content))
89
90
91
class SetChannelIdMessage(Message):
92
    def __init__(self, channel: int, deviceNumber: int = 0, deviceType: int = 0, transType: int = 0):
93
        content = bytes([channel, deviceNumber.to_bytes(2, byteorder='big'), deviceType, transType])
94
        super().__init__(MESSAGE_CHANNEL_ID, content)
95
96
97
class SetChannelRfFrequencyMessage(Message):
98
    def __init__(self, channel: int, frequency: int = 2457):
99
        content = bytes([channel, frequency - 2400])
100
        super().__init__(MESSAGE_CHANNEL_FREQUENCY, content)
101
102
103
class OpenRxScanModeMessage(Message):
104
    def __init__(self):
105
        super().__init__(OPEN_RX_SCAN_MODE, b'1')
106
107
108
class EnableExtendedMessagesMessage(Message):
109
    def __init__(self, enable: bool = True):
110
        content = bytes([0, int(enable)])
111
        super().__init__(MESSAGE_ENABLE_EXT_RX_MESSAGES, content)
112