GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.

bricknil.sensor.peripheral   A
last analyzed

Complexity

Total Complexity 35

Size/Duplication

Total Lines 308
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 111
dl 0
loc 308
rs 9.6
c 0
b 0
f 0
wmc 35

9 Methods

Rating   Name   Duplication   Size   Complexity  
A Peripheral._convert_bytes() 0 22 4
B Peripheral.update_value() 0 30 6
A Peripheral.__init__() 0 8 1
A Peripheral._parse_combined_sensor_values() 0 38 5
B Peripheral.activate_updates() 0 52 7
A Peripheral.set_output() 0 14 1
A Peripheral._convert_speed_to_val() 0 16 4
A Peripheral.send_message() 0 5 2
A Peripheral._get_validated_capabilities() 0 25 5
1
# Copyright 2019 Virantha N. Ekanayake 
2
# 
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
# 
7
# http://www.apache.org/licenses/LICENSE-2.0
8
# 
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14
15
"""Base class for all sensors and motors
16
17
"""
18
import struct
19
from enum import Enum
20
from collections import namedtuple
21
22
from ..process import Process
23
from curio import sleep, spawn, current_task
24
from ..const import DEVICES
25
26
27
class Peripheral(Process):
28
    """Abstract base class for any Lego Boost/PoweredUp/WeDo peripherals
29
30
       A LEGO sensor can provide either a single_ sensing capability, or a combined_  mode where it returns multiple
31
       sensing values.  All the details can be found in the official protocol description.
32
33
       * **Single capability** - This is the easiest to handle:
34
            * Send a 0x41 Port Input Format Setup command to put the sensor port into the respective mode and activate updates
35
            * Read back the 0x45 Port Value(Single) messages with updates from the sensor on the respective mode
36
       * **Multiple capabilities** - This is more complicated because we need to put the sensor port into CombinedMode
37
            * Send a [0x42, port, 0x02] message to lock the port
38
            * Send multiple 0x41 messages to activate each capability/mode we want updates from
39
            * Send a [0x42, port, 0x01, ..] message with the following bytes:
40
                * 0x00 = Row entry 0 in the supported combination mode table
41
                    (hard-coded for simplicity here because LEGO seems to only use this entry most of the time)
42
                * For each mode/capability, send a byte like the following:
43
                    * Upper 4-bits is mode number
44
                    * Lower 4-bits is the dataset number
45
                    * For example, for getting RGB values, it's mode 6, and we want all three datasets 
46
                        (for each color), so we'd add three bytes [0x60, 0x61, 0x62].  
47
                        If you just wanted the Red value, you just append [0x60]
48
            * Send a [0x42, port, 0x03] message to unlock the port
49
            * Now, when the sensor sends back values, it uses 0x46 messages with the following byte sequence:
50
                * Port id
51
                * 16-bit entry where the true bits mark which mode has values included in this message
52
                    (So 0x00 0x05 means values from Modes 2 and 0)
53
                * Then the set of values from the sensor, which are ordered by Mode number 
54
                    (so the sensor reading from mode 0 would come before the reading from mode 2)
55
                * Each set of values includes however many bytes are needed to represent each dataset
56
                    (for example, up to 3 for RGB colors), and the byte-width of each value (4 bytes for a 32-bit int)
57
58
59
       .. _single: https://lego.github.io/lego-ble-wireless-protocol-docs/index.html#port-input-format-single
60
       .. _combined: https://lego.github.io/lego-ble-wireless-protocol-docs/index.html#port-input-format-combinedmode
61
62
       Args:
63
          capabilities : can be input in the following formats (where the
64
            number in the tuple can be a threshold to trigger updates)
65
66
               * ['sense_color', 'sense_distannce'] 
67
               * [capability.sense_color, capability.sense_distance]
68
               * [('sense_color', 1), ('sense_distance', 2)]
69
70
          name (str) : Human readable name
71
          port (int) : Port to connect to (otherwise will connect to first matching peripheral with defined sensor_id)
72
           
73
74
       Attributes:
75
            port (int) : Physical port on the hub this Peripheral attaches to
76
            sensor_name (str) : Name coming out of `const.DEVICES`
77
            value (dict) : Sensor readings get dumped into this dict
78
            message_handler (func) : Outgoing message queue to `BLEventQ` that's set by the Hub when an attach message is seen
79
            capabilites (list [ `capability` ]) : Support capabilities 
80
            thresholds (list [ int ]) : Integer list of thresholds for updates for each of the sensing capabilities
81
82
    """
83
    _DEFAULT_THRESHOLD = 1
84
    Dataset = namedtuple('Dataset', ['n', 'w', 'min', 'max'])
85
86
    def __init__(self, name, port=None, capabilities=[]):
87
        super().__init__(name)
88
        self.port = port
89
        self.sensor_name = DEVICES[self._sensor_id]
90
        self.value = None
91
        self.message_handler = None
92
        self.web_queue_output = None
93
        self.capabilities, self.thresholds = self._get_validated_capabilities(capabilities)
94
95
    def _get_validated_capabilities(self, caps):
96
        """Convert capabilities in different formats (string, tuple, etc)
97
98
           Returns:
99
                
100
                validated_caps, thresholds  (list[`capability`], list[int]): list of capabilities and list of associated thresholds
101
        """
102
        validated_caps = []
103
        thresholds = [1]*len(validated_caps)
104
        for cap in caps:
105
            # Capability can be a tuple of (cap, threshold)
106
            if isinstance(cap, tuple):
107
                cap, threshold = cap
108
                thresholds.append(threshold)
109
            else:
110
                thresholds.append(self._DEFAULT_THRESHOLD)
111
112
            if isinstance(cap, self.capability):
113
                # Make sure it's the write type of enumerated capability
114
                validated_caps.append(cap)
115
            elif type(cap) is str:
116
                # Make sure we can convert this string capability into a defined enum
117
                enum_cap = self.capability[cap]
118
                validated_caps.append(enum_cap)
119
        return validated_caps, thresholds
120
121
    def _convert_bytes(self, msg_bytes:bytearray, byte_count):
122
        """Convert bytearry into a set of values based on byte_count per value
123
124
           Args:
125
                msg_bytes (bytearray): Bytes to convert
126
                byte_count (int): How many bytes per value to use when computer (can be 1, 2, or 4)
127
128
           Returns:
129
                If a single value, then just that value
130
                If multiple values, then a list of those values
131
                Value can be either uint8, uint16, or uint32 depending on value of `byte_count`
132
        """
133
        if byte_count == 1:   # just a uint8
134
            val = msg_bytes[0]
135
        elif byte_count == 2: # uint16 little-endian
136
            val = struct.unpack('<H', msg_bytes)[0]
137
        elif byte_count == 4: # uint32 little-endian
138
            val = struct.unpack('<I', msg_bytes)[0]
139
        else:
140
            self.message_error(f'Cannot convert array of {msg_bytes} length {len(msg_bytes)} to python datatype')
141
            val = None
142
        return val
143
144
    async def _parse_combined_sensor_values(self, msg: bytearray):
145
        """
146
            Byte sequence is as follows:
147
                # uint16 where each set bit indicates data value from that mode is present 
148
                  (e.g. 0x00 0x05 means Mode 2 and Mode 0 data is present
149
                # The data from the lowest Mode number comes first in the subsequent bytes
150
                # Each Mode has a number of datasets associated with it (RGB for example is 3 datasets), and
151
                  a byte-width per dataset (RGB dataset is each a uint8)
152
153
            Args:
154
                msg (bytearray) : the sensor message
155
156
            Returns:
157
                None
158
159
            Side-effects:
160
                self.value
161
          
162
        """
163
        msg.pop(0)  # Remove the leading 0 (since we never have more than 7 datasets even with all the combo modes activated
164
        # The next byte is a bit mask of the mode/dataset entries present in this value
165
        modes = msg.pop(0)
166
        dataset_i = 0
167
        for cap in self.capabilities:  # This is the order we prgogramed the sensor
168
            n_datasets, byte_count = self.datasets[cap][0:2]
169
            for dataset in range(n_datasets):
170
                if modes & (1<<dataset_i):  # Check if i'th bit of mode is set
171
                    # Data corresponding to this dataset is present!
172
                    # Now, pop off however many bytes are associated with this
173
                    # dataset
174
                    data = msg[0:byte_count]
175
                    msg = msg[byte_count:]
176
                    val = self._convert_bytes(data, byte_count)
177
                    if n_datasets == 1:
178
                        self.value[cap] = val
179
                    else:
180
                        self.value[cap][dataset] = val
181
                dataset_i += 1
182
183
184
185
    async def send_message(self, msg, msg_bytes):
186
        """ Send outgoing message to BLEventQ """
187
        while not self.message_handler:
188
            await sleep(1)
189
        await self.message_handler(msg, msg_bytes, peripheral=self)
190
191
    def _convert_speed_to_val(self, speed):
192
        """Map speed of -100 to 100 to a byte range
193
194
            * -100 to 100 (negative means reverse)
195
            * 0 is floating
196
            * 127 is brake
197
198
            Returns:
199
                byte
200
        """
201
        if speed == 127: return 127
202
        if speed > 100: speed = 100
203
        if speed < 0: 
204
            # Now, truncate to 8-bits
205
            speed = speed & 255 # Or I guess I could do 256-abs(s)
206
        return speed
207
208
209
    async def set_output(self, mode, value):
210
        """Don't change this unless you're changing the way you do a Port Output command
211
        
212
           Outputs the following sequence to the sensor
213
            * 0x00 = hub id from common header
214
            * 0x81 = Port Output Command
215
            * port
216
            * 0x11 = Upper nibble (0=buffer, 1=immediate execution), Lower nibble (0=No ack, 1=command feedback)
217
            * 0x51 = WriteDirectModeData
218
            * mode
219
            * value(s)
220
        """
221
        b = [0x00, 0x81, self.port, 0x01, 0x51, mode, value ]
222
        await self.send_message(f'set output port:{self.port} mode: {mode} = {value}', b)
223
224
    # Use these for sensor readings
225
    async def update_value(self, msg_bytes):
226
        """ Message from message_dispatch will trigger Hub to call this to update a value from a sensor incoming message
227
            Depending on the number of capabilities enabled, we end up with different processing:
228
229
            If zero, then just set the `self.value` field to the raw message.
230
231
            If one, then:
232
                * Parse the single sensor message which may have multiple data items (like an RGB color value)
233
                * `self.value` dict entry for this capability becomes a list of these values
234
235
            If multiple, then:
236
                * Parse multiple sensor messages (could be any combination of the enabled modes)
237
                * Set each dict entry to `self.value` to either a list of multiple values or a single value
238
239
        """
240
        msg = bytearray(msg_bytes)
241
        if len(self.capabilities)==0:
242
            self.value = msg
243
        if len(self.capabilities)==1:
244
            capability = self.capabilities[0]
245
            datasets, bytes_per_dataset = self.datasets[capability][0:2]
246
            for i in range(datasets):
247
                msg_ptr = i*bytes_per_dataset
248
                val = self._convert_bytes(msg[msg_ptr: msg_ptr+bytes_per_dataset], bytes_per_dataset)
249
                if datasets==1:
250
                    self.value[capability] = val
251
                else:
252
                    self.value[capability][i] = val
253
        if len(self.capabilities) > 1:
254
            await self._parse_combined_sensor_values(msg)
255
256
    async def activate_updates(self):
257
        """ Send a message to the sensor to activate updates
258
259
            Called via an 'attach' message from
260
            :func:`bricknil.messages.Message.parse_attached_io` that triggers
261
            this call from :func:`bricknil.hub.Hub.peripheral_message_loop`
262
263
            See class description for explanation on how Combined Mode updates are done.
264
            
265
            Returns:
266
                None
267
268
        """
269
        
270
        assert self.port is not None, f"Cannot activate updates on sensor before it's been attached to {self.name}!"
271
        
272
        if len(self.capabilities) == 0: 
273
            # Nothing to do since no capabilities defined
274
            return
275
276
        self.value = {}
277
        for cap in self.capabilities:
278
            self.value[cap] = [None]*self.datasets[cap][0]
279
280
        if len(self.capabilities)==1:  # Just a normal single sensor
281
            mode = self.capabilities[0].value
282
            b = [0x00, 0x41, self.port, mode, self.thresholds[0], 0, 0, 0, 1]
283
            await self.send_message(f'Activate SENSOR: port {self.port}', b) 
284
        else:
285
            # Combo mode.  Need to make sure only allowed combinations are preset
286
            # Lock sensor
287
            b = [0x00, 0x42, self.port, 0x02]
288
            await self.send_message(f'Lock port {self.port}', b)
289
290
            for cap, threshold in zip(self.capabilities, self.thresholds):
291
                assert cap in self.allowed_combo, f'{cap} is not allowed to be sensed in combination with others'
292
                # Enable each capability
293
                b = [0x00, 0x41, self.port, cap.value, threshold, 0, 0, 0, 1]
294
                await self.send_message(f'enable mode {cap.value} on {self.port}', b)
295
296
            # Now, set the combination mode/dataset report order
297
            b = [0x00, 0x42, self.port, 0x01, 0x00]
298
            for cap in self.capabilities:
299
                # RGB requires 3 datasets
300
                datasets, byte_width = self.datasets[cap][0:2]
301
                for i in range(datasets):
302
                    b.append(16*cap.value+i)  # Mode is higher order nibble, dataset is lower order nibble
303
            await self.send_message(f'Set combo port {self.port}', b)
304
305
            # Unlock and start
306
            b = [0x00, 0x42, self.port, 0x03]
307
            await self.send_message(f'Activate SENSOR multi-update {self.port}', b)
308
309
310