GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.

bricknil.messages.Message._parse_msg_bytes()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 3
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 3
nop 2
dl 0
loc 3
rs 10
c 0
b 0
f 0
1
# Copyright 2019 Virantha N. Ekanayake 
2
# 
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
# 
7
# http://www.apache.org/licenses/LICENSE-2.0
8
# 
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14
"""Message parsers for each message type
15
16
"""
17
import struct, logging
18
from .const import DEVICES
19
20
logger = logging.getLogger(__name__)
21
class UnknownMessageError(Exception): pass
22
23
class Message:
24
    """Base class for each message parser.
25
26
       This class instance keeps track of each subclass and stores an object of
27
       each subclass in the attribue `parsers`.  None of these subclass instances
28
       should ever store any instance data since these are shared across multiple
29
       hubs.
30
31
       Attributes:
32
            parsers (dict) : msg_type (int) -> Message parser
33
            msg_type(int)  : msg_type of each subclassed message
34
    """
35
36
    parsers = {}
37
38
    def __init_subclass__(cls):
39
        """Register message subclasses"""
40
        logger.debug(f"registering {cls}")
41
        assert cls.msg_type not in Message.parsers, f'Duplicate Message parser type {cls.msg_type} found in code!'
42
        Message.parsers[cls.msg_type] = cls()
43
44
    def _parse_msg_bytes(self, msg_bytes):
45
        hex_bytes = ':'.join(hex(c) for c in msg_bytes)
46
        return hex_bytes
47
48
    def parse(self, msg_bytes, l, dispatcher):
49
        """Implement this handle parsing of each message body type.
50
51
           Args:
52
               msg_bytes (bytearray): Message body
53
               l (list):  text description of what's being parsed for logging (just append details as you go along)
54
               dispatcher (:class:`bricknil.message_dispatch.MessageDispatch`):  The dispatch object that is sending messages. 
55
                   Call back into its methods to send messages back to the hub.
56
        """
57
        pass
58
59
class PortValueMessage(Message):
60
    """Single value update from a sensor
61
    """
62
    msg_type = 0x45
63
64
    def parse(self, msg_bytes, l, dispatcher):
65
        port = msg_bytes.pop(0)
66
        dispatcher.message_update_value_to_peripheral(port,  msg_bytes)
67
        l.append(f'Port {port} changed value to {msg_bytes}')
68
69
class PortComboValueMessage(Message):
70
    """Multiple (combination) value updates from different modes of a sensor
71
    """
72
    msg_type = 0x46
73
74
    def parse(self, msg_bytes, l, dispatcher):
75
        port = msg_bytes.pop(0)
76
        dispatcher.message_update_value_to_peripheral(port,  msg_bytes)
77
        l.append(f'Port {port} changed combo value to {msg_bytes}')
78
79
class HubPropertiesMessage(Message):
80
    """Used to get data on the hub as well as button press information on the hub
81
    """
82
    msg_type = 0x01
83
    prop_names = {  0x01: 'Advertising Name',
84
                    0x02: 'Button',
85
                    0x03: 'FW Version',
86
                    0x04: 'HW Version',
87
                    0x05: 'RSSI',
88
                    0x06: 'Battery Voltage',
89
                    0x07: 'Battery Type',
90
                    0x08: 'Manufacturer Name',
91
                    0x09: 'Radio FW Version',
92
                    0x0A: 'LEGO Wireles Protocol Version',
93
                    0x0B: 'System Type ID',
94
                    0x0C: 'HW Network ID', 
95
                    0x0D: 'Primary MAC address',
96
                    0x0E: 'Seconary MAC address',
97
                    0X0F: 'HW Network Family',
98
                    }
99
    operation_names = { 0x01: 'Set (downstream)',
100
                        0x02: 'Enable Updates (Downstream)',
101
                        0x03: 'Disable Updates (Downstream)',
102
                        0x04: 'Reset (Downstream)',
103
                        0x05: 'Request Update (Downstream)',
104
                        0x06: 'Update (Upstream)',
105
                        }
106
    def parse(self, msg_bytes, l, dispatcher):
107
        l.append('Hub property: ')
108
109
        prop = msg_bytes.pop(0)
110
        if prop not in self.prop_names:
111
            raise UnknownMessageError
112
        l.append(self.prop_names[prop])
113
114
        op = msg_bytes.pop(0)
115
        if op not in self.operation_names:
116
            raise UnknownMessageError
117
        l.append(self.operation_names[op])
118
119
        # Now, just append the number 
120
        l.append(self._parse_msg_bytes(msg_bytes))
121
122
        # Now forward any button presses as if it were a "port value" change
123
        if prop==0x02 and op == 0x06:  # Button and update op
124
            msg_bytes.insert(0, 0xFF)  # Insert Dummy port value of 255
125
            Message.parsers[PortValueMessage.msg_type].parse(msg_bytes, l, dispatcher)
126
127
class PortInformationMessage(Message):
128
    """Information on what modes are supported on a port and whether a port
129
       is input/output.
130
    """
131
    msg_type = 0x43
132
133
    def _parse_mode_info(self, msg_bytes, l, port_info):
134
        l.append(' INFO:')
135
        capabilities = msg_bytes.pop(0)
136
        bitmask = ['output', 'input', 'combinable', 'synchronizable']
137
        for i, attr in enumerate(bitmask):
138
            port_info[attr] = capabilities & 1<<i
139
            if port_info[attr]: l.append(attr[:3])
140
        
141
    def _parse_mode_info_input_output(self, msg_bytes, l, modes_info):
142
        input_modes = msg_bytes.pop(0) + msg_bytes.pop(0)*256
143
        output_modes = msg_bytes.pop(0) + msg_bytes.pop(0)*256
144
        for i in range(16):
145
            if input_modes & (1<<i):
146
                l.append(i)
147
                mode_info = modes_info.setdefault(i, {})
148
                mode_info['input'] = True
149
        l.append(', output: ')
150
        for i in range(16):
151
            if output_modes & (1<<i):
152
                l.append(i)
153
                mode_info = modes_info.setdefault(i, {})
154
                mode_info['output'] = True
155
156
    def _parse_combination_info(self, msg_bytes, l, port_info):
157
        port_info['mode_combinations'] = []
158
        
159
        mode_combo = msg_bytes.pop(0) + msg_bytes.pop(0)*256
160
        l.append('Combinations:')
161
        while mode_combo != 0:
162
            cmodes = []
163
            for i in range(16):
164
                if mode_combo & (1<<i):
165
                    cmodes.append(i)
166
            l.append('+'.join([f'Mode {m}' for m in cmodes]))
167
            port_info['mode_combinations'].append(cmodes)
168
            if len(msg_bytes) == 0:
169
                mode_combo = 0
170
            else:
171
                mode_combo = msg_bytes.pop(0) + msg_bytes.pop(0)*256
172
                l.append(', ')
173
        
174
    def parse(self, msg_bytes, l, dispatcher):
175
        port = msg_bytes.pop(0)
176
        mode = msg_bytes.pop(0)
177
        l.append(f'Port {port} Mode {mode}:')
178
179
        port_info = dispatcher.port_info.setdefault(port, {})
180
        modes_info = port_info.setdefault('modes', {})
181
        if mode == 0x01: # MODE INFO
182
            self._parse_mode_info(msg_bytes, l, port_info)
183
            nModes = msg_bytes.pop(0)
184
            l.append(f'nModes:{nModes}, input:')
185
            self._parse_mode_info_input_output(msg_bytes, l, modes_info)
186
            dispatcher.message_port_info_to_peripheral(port, 'port_info_received')
187
        elif mode == 0x02: # Combination info
188
            self._parse_combination_info(msg_bytes, l, port_info)
189
            dispatcher.message_port_info_to_peripheral(port, 'port_combination_info_received')
190
        else:
191
            raise UnknownMessageError
192
193
class PortOutputFeedbackMessage(Message):
194
    """Ack messages/error messages sent in response to a command being issued to the hub
195
    """
196
    msg_type = 0x82
197
198
    def parse(self, msg_bytes, l, dispatcher):
199
        port = msg_bytes.pop(0)
200
        l.append(f'Command feedback: Port {port}')
201
        feedback = msg_bytes.pop(0)
202
        if feedback & 1:
203
            l.append('Buffer empty, Command in progess')
204
        if feedback & 2:
205
            l.append('Buffer empty, Command completed')
206
        if feedback & 8:
207
            l.append(': Idle ')
208
        if feedback & 4:
209
            l.append(': Command discarded')
210
        if feedback & 16: 
211
            l.append(': Busy/Full')
212
213
class PortModeInformationMessage(Message):
214
    """Information on a specific mode
215
216
       This tells us a mode's name, what numeric format it uses, and it's range.
217
    """
218
    msg_type = 0x44
219
    def parse(self, msg_bytes, l, dispatcher):
220
        port = msg_bytes.pop(0)
221
        mode = msg_bytes.pop(0)
222
        mode_type = msg_bytes.pop(0)
223
224
        port_info = dispatcher.port_info.setdefault(port, {})
225
        modes_info = port_info.setdefault('modes', {})
226
        mode_info = modes_info.setdefault(mode, {})
227
228
        l.append(f'MODE INFO Port:{port} Mode:{mode}')
229
        mode_types = { 0: self._parse_name,
230
                       0x1: self._parse_raw_range,
231
                       0x2: self._parse_pct_range,
232
                       0x3: self._parse_si_range,
233
                       0x4: self._parse_symbol,
234
                       0x5: self._parse_mapping,
235
                       0x80: self._parse_format,
236
                     }
237
        if mode_type in mode_types:
238
            mode_types[mode_type](msg_bytes, l, mode_info)
239
        else:
240
            raise UnknownMessageError
241
        dispatcher.message_port_info_to_peripheral(port, 'port_mode_info_received')
242
243
244
    def _parse_format(self, msg_bytes, l, mode_info):
245
        # 4 bytes
246
        # [0] = Number of datasets (e.g. RBG has 3 for each color)
247
        # [1] = Dataset type.  00-byte, 01=16b, 10=32b, 11=float
248
        # [2] = Total figures
249
        # [3] = Decimals if any
250
        mode_info['datasets'] = msg_bytes.pop(0)
251
        dataset_types = ['8b', '16b', '32b', 'float']
252
        mode_info['dataset_type'] = dataset_types[msg_bytes.pop(0)]
253
        mode_info['dataset_total_figures'] = msg_bytes.pop(0)
254
        mode_info['dataset_decimals'] = msg_bytes.pop(0)
255
256
    def _parse_mapping(self, msg_bytes, l, mode_info):
257
        l.append('Input Mapping:')
258
        bits = ['NA', 'NA', 'Discrete', 'Relative', 'Absolute', 'NA', 'Supports Functional Mapping 2.0}', 'Supports NULL']
259
        # First byte is bit-mask of input details
260
        mask = msg_bytes[0]
261
        maps = [ bits[i]  for i in range(8) if (mask>>i) & 1]
262
        l.append(','.join(maps))
263
        mode_info['input_mapping'] = maps
264
265
        l.append('Output Mapping:')
266
        mask = msg_bytes[1]
267
        maps = [ bits[i]  for i in range(8) if (mask>>i)&1]
268
        l.append(','.join(maps))
269
        mode_info['output_mapping'] = maps
270
271
    def _parse_symbol(self, msg_bytes, l, mode_info):
272
        l.append('Symbol:')
273
        symbol = ''.join( [chr(b) for b in msg_bytes if b!=0])
274
        l.append(symbol)
275
        mode_info['symbol'] =symbol
276
277
    def _unpack_float(self, b):
278
        return struct.unpack('<f', bytearray(b[0:4]))
279
280
    def _parse_si_range(self, msg_bytes, l, mode_info):
281
        l.append('SI range:')
282
        mn = self._unpack_float(msg_bytes[0:4])[0]
283
        mx = self._unpack_float(msg_bytes[4:])[0]
284
        l.append(f'{mn} to {mx}')
285
        mode_info['si_range'] = (mn, mx)
286
287
    def _parse_pct_range(self, msg_bytes, l, mode_info):
288
        l.append('Pct range:')
289
        b_array = bytearray(msg_bytes)
290
        pct_min = struct.unpack('<f', b_array[0:4])[0]
291
        pct_max = struct.unpack('<f', b_array[4:])[0]
292
        l.append(f'{pct_min} to {pct_max}')
293
        mode_info['pct_range'] = (pct_min, pct_max)
294
295
    def _parse_raw_range(self, msg_bytes, l, mode_info):
296
        l.append('Raw range:')
297
        b_array = bytearray(msg_bytes)
298
        raw_min = struct.unpack('<f', b_array[0:4])[0]
299
        raw_max = struct.unpack('<f', b_array[4:])[0]
300
        l.append(f'{raw_min} to {raw_max}')
301
        mode_info['raw_range'] = (raw_min, raw_max)
302
303
    def _parse_name(self, msg_bytes, l, mode_info):
304
        l.append('Name:')
305
        name = ''.join( [chr(b) for b in msg_bytes if b!=0])
306
        l.append(name)
307
        mode_info['name'] = name
308
309
class AttachedIOMessage(Message):
310
    """Peripheral attach and detach message
311
    """
312
    msg_type = 0x04
313
314
    def parse(self, msg_bytes, l, dispatcher):
315
        # 5-bytes = detached
316
        # 15 bytes = attached
317
        # 9 bytes = virtual attached
318
        # Subtract 3 bytes for what we've already popped off
319
        port = msg_bytes.pop(0)
320
        event = msg_bytes.pop(0)
321
        detach, attach, virtual_attach = [event==x  for x in range(3)]
322
        if detach:
323
            l.append(f'Detached IO Port:{port}')
324
            return
325
        elif attach:
326
            l.append(f'Attached IO Port:{port}')
327
        elif virtual_attach:
328
            l.append(f'Attached VirtualIO Port:{port}')
329
330
        if attach or virtual_attach:
331
            # Next two bytes (little-endian) is the device number (MSB is not used)
332
            device_id = msg_bytes.pop(0)
333
            assert device_id in DEVICES, f'Unknown device with id {device_id} being attached (port {port}'
334
            device_name = DEVICES[device_id]
335
            self._add_port_info(dispatcher,port, 'id', device_id)
336
            self._add_port_info(dispatcher,port, 'name', device_name)
337
338
            msg_bytes.pop(0) # pop off MSB that's always 0 
339
            l.append(f'{device_name}')
340
341
            # register the handler for this IO
342
            dispatcher.message_attach_to_hub(device_name, port)
343
344
        if attach:
345
            for ver_type in ['HW', 'SW']:
346
                # NExt few bytes are fw versions
347
                build0 = hex(msg_bytes.pop(0))
348
                build1 = hex(msg_bytes.pop(0))
349
                bugfix = hex(msg_bytes.pop(0))
350
                ver = hex(msg_bytes.pop(0))
351
                l.append(f'{ver_type}:{ver}.{bugfix}.{build1}{build0}')
352
353
        if virtual_attach:
354
            assert len(msg_bytes) == 2
355
            port0, port1 = msg_bytes
356
            l.append(f'Port A: {port0}, Port B: {port1}')
357
            self._add_port_info(dispatcher, port, 'virtual', (port0, port1))
358
359
    def _add_port_info(self, dispatcher, port, info_key, info_val):
360
        port_info_item = dispatcher.port_info.get(port, {})
361
        port_info_item[info_key] = info_val
362
        dispatcher.port_info[port] = port_info_item
363
364
365
if __name__ == '__main__':
366
    from mock import MagicMock
367
    hub = MagicMock()
368
    dis = MessageDispatch(hub)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable MessageDispatch does not seem to be defined.
Loading history...
369
    dis.port_to_peripheral[1] = MagicMock()
370
371
    msg = bytearray([0,0,0x45,1,9,9,9])
372
    l = dis.parse(msg)
373
    print(l)
374
375
    msg = bytearray([0,0,0x46,1,9,9,9])
376
    l = dis.parse(msg)
377
    print(l)
378