GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Passed
Push — master ( a3394c...b99246 )
by Virantha
01:31
created

bricknil.sensor.peripheral.Peripheral.__init__()   A

Complexity

Conditions 1

Size

Total Lines 8
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 8
nop 4
dl 0
loc 8
rs 10
c 0
b 0
f 0
1
# Copyright 2019 Virantha N. Ekanayake 
2
# 
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
# 
7
# http://www.apache.org/licenses/LICENSE-2.0
8
# 
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14
15
"""Base class for all sensors and motors
16
17
"""
18
import struct
19
from enum import Enum
20
21
from ..process import Process
22
from curio import sleep, spawn, current_task
23
from ..const import DEVICES
24
25
26
class Peripheral(Process):
27
    """Abstract base class for any Lego Boost/PoweredUp/WeDo peripherals
28
29
       A LEGO sensor can provide either a single_ sensing capability, or a combined_  mode where it returns multiple
30
       sensing values.  All the details can be found in the official protocol description.
31
32
       * **Single capability** - This is the easiest to handle:
33
            * Send a 0x41 Port Input Format Setup command to put the sensor port into the respective mode and activate updates
34
            * Read back the 0x45 Port Value(Single) messages with updates from the sensor on the respective mode
35
       * **Multiple capabilities** - This is more complicated because we need to put the sensor port into CombinedMode
36
            * Send a [0x42, port, 0x02] message to lock the port
37
            * Send multiple 0x41 messages to activate each capability/mode we want updates from
38
            * Send a [0x42, port, 0x01, ..] message with the following bytes:
39
                * 0x00 = Row entry 0 in the supported combination mode table
40
                    (hard-coded for simplicity here because LEGO seems to only use this entry most of the time)
41
                * For each mode/capability, send a byte like the following:
42
                    * Upper 4-bits is mode number
43
                    * Lower 4-bits is the dataset number
44
                    * For example, for getting RGB values, it's mode 6, and we want all three datasets 
45
                        (for each color), so we'd add three bytes [0x60, 0x61, 0x62].  
46
                        If you just wanted the Red value, you just append [0x60]
47
            * Send a [0x42, port, 0x03] message to unlock the port
48
            * Now, when the sensor sends back values, it uses 0x46 messages with the following byte sequence:
49
                * Port id
50
                * 16-bit entry where the true bits mark which mode has values included in this message
51
                    (So 0x00 0x05 means values from Modes 2 and 0)
52
                * Then the set of values from the sensor, which are ordered by Mode number 
53
                    (so the sensor reading from mode 0 would come before the reading from mode 2)
54
                * Each set of values includes however many bytes are needed to represent each dataset
55
                    (for example, up to 3 for RGB colors), and the byte-width of each value (4 bytes for a 32-bit int)
56
57
58
       .. _single: https://lego.github.io/lego-ble-wireless-protocol-docs/index.html#port-input-format-single
59
       .. _combined: https://lego.github.io/lego-ble-wireless-protocol-docs/index.html#port-input-format-combinedmode
60
61
       Args:
62
          capabilities : can be input in the following formats (where the
63
            number in the tuple can be a threshold to trigger updates)
64
65
               * ['sense_color', 'sense_distannce'] 
66
               * [capability.sense_color, capability.sense_distance]
67
               * [('sense_color', 1), ('sense_distance', 2)]
68
69
          name (str) : Human readable name
70
          port (int) : Port to connect to (otherwise will connect to first matching peripheral with defined sensor_id)
71
           
72
73
       Attributes:
74
            port (int) : Physical port on the hub this Peripheral attaches to
75
            sensor_name (str) : Name coming out of `const.DEVICES`
76
            value (dict) : Sensor readings get dumped into this dict
77
            message_handler (func) : Outgoing message queue to `BLEventQ` that's set by the Hub when an attach message is seen
78
            capabilites (list [ `capability` ]) : Support capabilities 
79
            thresholds (list [ int ]) : Integer list of thresholds for updates for each of the sensing capabilities
80
81
    """
82
    _DEFAULT_THRESHOLD = 1
83
    def __init__(self, name, port=None, capabilities=[]):
84
        super().__init__(name)
85
        self.port = port
86
        self.sensor_name = DEVICES[self._sensor_id]
87
        self.value = None
88
        self.message_handler = None
89
        self.web_queue_output = None
90
        self.capabilities, self.thresholds = self._get_validated_capabilities(capabilities)
91
92
    def _get_validated_capabilities(self, caps):
93
        """Convert capabilities in different formats (string, tuple, etc)
94
95
           Returns:
96
                
97
                validated_caps, thresholds  (list[`capability`], list[int]): list of capabilities and list of associated thresholds
98
        """
99
        validated_caps = []
100
        thresholds = [1]*len(validated_caps)
101
        for cap in caps:
102
            # Capability can be a tuple of (cap, threshold)
103
            if isinstance(cap, tuple):
104
                cap, threshold = cap
105
                thresholds.append(threshold)
106
            else:
107
                thresholds.append(self._DEFAULT_THRESHOLD)
108
109
            if isinstance(cap, self.capability):
110
                # Make sure it's the write type of enumerated capability
111
                validated_caps.append(cap)
112
            elif type(cap) is str:
113
                # Make sure we can convert this string capability into a defined enum
114
                enum_cap = self.capability[cap]
115
                validated_caps.append(enum_cap)
116
        return validated_caps, thresholds
117
118
    def _convert_bytes(self, msg_bytes:bytearray, byte_count):
119
        """Convert bytearry into a set of values based on byte_count per value
120
121
           Args:
122
                msg_bytes (bytearray): Bytes to convert
123
                byte_count (int): How many bytes per value to use when computer (can be 1, 2, or 4)
124
125
           Returns:
126
                If a single value, then just that value
127
                If multiple values, then a list of those values
128
                Value can be either uint8, uint16, or uint32 depending on value of `byte_count`
129
        """
130
        if byte_count == 1:   # just a uint8
131
            val = msg_bytes[0]
132
        elif byte_count == 2: # uint16 little-endian
133
            val = struct.unpack('<H', msg_bytes)[0]
134
        elif byte_count == 4: # uint32 little-endian
135
            val = struct.unpack('<I', msg_bytes)[0]
136
        else:
137
            self.message_error(f'Cannot convert array of {msg_bytes} length {len(msg_bytes)} to python datatype')
138
            val = None
139
        return val
140
141
    async def _parse_combined_sensor_values(self, msg: bytearray):
142
        """
143
            Byte sequence is as follows:
144
                # uint16 where each set bit indicates data value from that mode is present 
145
                  (e.g. 0x00 0x05 means Mode 2 and Mode 0 data is present
146
                # The data from the lowest Mode number comes first in the subsequent bytes
147
                # Each Mode has a number of datasets associated with it (RGB for example is 3 datasets), and
148
                  a byte-width per dataset (RGB dataset is each a uint8)
149
150
            Args:
151
                msg (bytearray) : the sensor message
152
153
            Returns:
154
                None
155
156
            Side-effects:
157
                self.value
158
          
159
        """
160
        msg.pop(0)  # Remove the leading 0 (since we never have more than 7 datasets even with all the combo modes activated
161
        # The next byte is a bit mask of the mode/dataset entries present in this value
162
        modes = msg.pop(0)
163
        dataset_i = 0
164
        for cap in self.capabilities:  # This is the order we prgogramed the sensor
165
            n_datasets, byte_count = self.datasets[cap]
166
            for dataset in range(n_datasets):
167
                if modes & (1<<dataset_i):  # Check if i'th bit of mode is set
168
                    # Data corresponding to this dataset is present!
169
                    # Now, pop off however many bytes are associated with this
170
                    # dataset
171
                    data = msg[0:byte_count]
172
                    msg = msg[byte_count:]
173
                    val = self._convert_bytes(data, byte_count)
174
                    if n_datasets == 1:
175
                        self.value[cap] = val
176
                    else:
177
                        self.value[cap][dataset] = val
178
                dataset_i += 1
179
180
181
182
    async def send_message(self, msg, msg_bytes):
183
        """ Send outgoing message to BLEventQ """
184
        while not self.message_handler:
185
            await sleep(1)
186
        await self.message_handler(msg, msg_bytes, peripheral=self)
187
188
    def _convert_speed_to_val(self, speed):
189
        """Map speed of -100 to 100 to a byte range
190
191
            * -100 to 100 (negative means reverse)
192
            * 0 is floating
193
            * 127 is brake
194
195
            Returns:
196
                byte
197
        """
198
        if speed == 127: return 127
199
        if speed > 100: speed = 100
200
        if speed < 0: 
201
            # Now, truncate to 8-bits
202
            speed = speed & 255 # Or I guess I could do 256-abs(s)
203
        return speed
204
205
206
    async def set_output(self, mode, value):
207
        """Don't change this unless you're changing the way you do a Port Output command
208
        
209
           Outputs the following sequence to the sensor
210
            * 0x00 = hub id from common header
211
            * 0x81 = Port Output Command
212
            * port
213
            * 0x11 = Upper nibble (0=buffer, 1=immediate execution), Lower nibble (0=No ack, 1=command feedback)
214
            * 0x51 = WriteDirectModeData
215
            * mode
216
            * value(s)
217
        """
218
        b = [0x00, 0x81, self.port, 0x01, 0x51, mode, value ]
219
        await self.send_message(f'set output port:{self.port} mode: {mode} = {value}', b)
220
221
    # Use these for sensor readings
222
    async def update_value(self, msg_bytes):
223
        """ Message from message_dispatch will trigger Hub to call this to update a value from a sensor incoming message
224
            Depending on the number of capabilities enabled, we end up with different processing:
225
226
            If zero, then just set the `self.value` field to the raw message.
227
228
            If one, then:
229
                * Parse the single sensor message which may have multiple data items (like an RGB color value)
230
                * `self.value` dict entry for this capability becomes a list of these values
231
232
            If multiple, then:
233
                * Parse multiple sensor messages (could be any combination of the enabled modes)
234
                * Set each dict entry to `self.value` to either a list of multiple values or a single value
235
236
        """
237
        msg = bytearray(msg_bytes)
238
        if len(self.capabilities)==0:
239
            self.value = msg
240
        if len(self.capabilities)==1:
241
            capability = self.capabilities[0]
242
            datasets, bytes_per_dataset = self.datasets[capability]
243
            for i in range(datasets):
244
                msg_ptr = i*bytes_per_dataset
245
                val = self._convert_bytes(msg[msg_ptr: msg_ptr+bytes_per_dataset], bytes_per_dataset)
246
                if datasets==1:
247
                    self.value[capability] = val
248
                else:
249
                    self.value[capability][i] = val
250
        if len(self.capabilities) > 1:
251
            await self._parse_combined_sensor_values(msg)
252
253
    async def activate_updates(self):
254
        """ Send a message to the sensor to activate updates
255
256
            Called via an 'attach' message from
257
            :func:`bricknil.messages.Message.parse_attached_io` that triggers
258
            this call from :func:`bricknil.hub.Hub.peripheral_message_loop`
259
260
            See class description for explanation on how Combined Mode updates are done.
261
            
262
            Returns:
263
                None
264
265
        """
266
        
267
        assert self.port is not None, f"Cannot activate updates on sensor before it's been attached to {self.name}!"
268
        
269
        if len(self.capabilities) == 0: 
270
            # Nothing to do since no capabilities defined
271
            return
272
273
        self.value = {}
274
        for cap in self.capabilities:
275
            self.value[cap] = [None]*self.datasets[cap][0]
276
277
        if len(self.capabilities)==1:  # Just a normal single sensor
278
            mode = self.capabilities[0].value
279
            b = [0x00, 0x41, self.port, mode, self.thresholds[0], 0, 0, 0, 1]
280
            await self.send_message(f'Activate SENSOR: port {self.port}', b) 
281
        else:
282
            # Combo mode.  Need to make sure only allowed combinations are preset
283
            # Lock sensor
284
            b = [0x00, 0x42, self.port, 0x02]
285
            await self.send_message(f'Lock port {self.port}', b)
286
287
            for cap, threshold in zip(self.capabilities, self.thresholds):
288
                assert cap in self.allowed_combo, f'{cap} is not allowed to be sensed in combination with others'
289
                # Enable each capability
290
                b = [0x00, 0x41, self.port, cap.value, threshold, 0, 0, 0, 1]
291
                await self.send_message(f'enable mode {cap.value} on {self.port}', b)
292
293
            # Now, set the combination mode/dataset report order
294
            b = [0x00, 0x42, self.port, 0x01, 0x00]
295
            for cap in self.capabilities:
296
                # RGB requires 3 datasets
297
                datasets, byte_width = self.datasets[cap]
298
                for i in range(datasets):
299
                    b.append(16*cap.value+i)  # Mode is higher order nibble, dataset is lower order nibble
300
            await self.send_message(f'Set combo port {self.port}', b)
301
302
            # Unlock and start
303
            b = [0x00, 0x42, self.port, 0x03]
304
            await self.send_message(f'Activate SENSOR multi-update {self.port}', b)
305
306
307