GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Passed
Push — master ( d74eb3...fdd290 )
by Virantha
01:35
created

bricknil.peripheral.Peripheral._convert_bytes()   A

Complexity

Conditions 4

Size

Total Lines 22
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 4
eloc 10
nop 3
dl 0
loc 22
rs 9.9
c 0
b 0
f 0
1
# Copyright 2019 Virantha N. Ekanayake 
2
# 
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
# 
7
# http://www.apache.org/licenses/LICENSE-2.0
8
# 
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14
15
"""Base class for all sensors and motors
16
17
"""
18
import struct
19
from enum import Enum
20
21
from .process import Process
22
from curio import sleep, spawn, current_task
23
from .const import DEVICES
24
25
26
class Peripheral(Process):
27
    """Abstract base class for any Lego Boost/PoweredUp/WeDo peripherals
28
29
       A LEGO sensor can provide either a single_ sensing capability, or a combined_  mode where it returns multiple
30
       sensing values.  All the details can be found in the official protocol description.
31
32
       * **Single capability** - This is the easiest to handle:
33
            * Send a 0x41 Port Input Format Setup command to put the sensor port into the respective mode and activate updates
34
            * Read back the 0x45 Port Value(Single) messages with updates from the sensor on the respective mode
35
       * **Multiple capabilities** - This is more complicated because we need to put the sensor port into CombinedMode
36
            * Send a [0x42, port, 0x02] message to lock the port
37
            * Send multiple 0x41 messages to activate each capability/mode we want updates from
38
            * Send a [0x42, port, 0x01, ..] message with the following bytes:
39
                * 0x00 = Row entry 0 in the supported combination mode table
40
                    (hard-coded for simplicity here because LEGO seems to only use this entry most of the time)
41
                * For each mode/capability, send a byte like the following:
42
                    * Upper 4-bits is mode number
43
                    * Lower 4-bits is the dataset number
44
                    * For example, for getting RGB values, it's mode 6, and we want all three datasets 
45
                        (for each color), so we'd add three bytes [0x60, 0x61, 0x62].  
46
                        If you just wanted the Red value, you just append [0x60]
47
            * Send a [0x42, port, 0x03] message to unlock the port
48
            * Now, when the sensor sends back values, it uses 0x46 messages with the following byte sequence:
49
                * Port id
50
                * 16-bit entry where the true bits mark which mode has values included in this message
51
                    (So 0x00 0x05 means values from Modes 2 and 0)
52
                * Then the set of values from the sensor, which are ordered by Mode number 
53
                    (so the sensor reading from mode 0 would come before the reading from mode 2)
54
                * Each set of values includes however many bytes are needed to represent each dataset
55
                    (for example, up to 3 for RGB colors), and the byte-width of each value (4 bytes for a 32-bit int)
56
57
58
       .. _single: https://lego.github.io/lego-ble-wireless-protocol-docs/index.html#port-input-format-single
59
       .. _combined: https://lego.github.io/lego-ble-wireless-protocol-docs/index.html#port-input-format-combinedmode
60
61
       Args:
62
          capabilities : can be input in the following formats (where the
63
            number in the tuple can be a threshold to trigger updates)
64
65
               * ['sense_color', 'sense_distannce'] 
66
               * [capability.sense_color, capability.sense_distance]
67
               * [('sense_color', 1), ('sense_distance', 2)]
68
69
          name (str) : Human readable name
70
          port (int) : Port to connect to (otherwise will connect to first matching peripheral with defined sensor_id)
71
           
72
73
       Attributes:
74
            port (int) : Physical port on the hub this Peripheral attaches to
75
            sensor_name (str) : Name coming out of `const.DEVICES`
76
            value (dict) : Sensor readings get dumped into this dict
77
            message_handler (func) : Outgoing message queue to `BLEventQ` that's set by the Hub when an attach message is seen
78
            capabilites (list [ `capability` ]) : Support capabilities 
79
            thresholds (list [ int ]) : Integer list of thresholds for updates for each of the sensing capabilities
80
81
    """
82
    _DEFAULT_THRESHOLD = 1
83
    def __init__(self, name, port=None, capabilities=[]):
84
        super().__init__(name)
85
        self.port = port
86
        self.sensor_name = DEVICES[self._sensor_id]
87
        self.value = None
88
        self.message_handler = None
89
        self.capabilities, self.thresholds = self._get_validated_capabilities(capabilities)
90
91
    def _get_validated_capabilities(self, caps):
92
        """Convert capabilities in different formats (string, tuple, etc)
93
94
           Returns:
95
                
96
                validated_caps, thresholds  (list[`capability`], list[int]): list of capabilities and list of associated thresholds
97
        """
98
        validated_caps = []
99
        thresholds = [1]*len(validated_caps)
100
        for cap in caps:
101
            # Capability can be a tuple of (cap, threshold)
102
            if isinstance(cap, tuple):
103
                cap, threshold = cap
104
                thresholds.append(threshold)
105
            else:
106
                thresholds.append(self._DEFAULT_THRESHOLD)
107
108
            if isinstance(cap, self.capability):
109
                # Make sure it's the write type of enumerated capability
110
                validated_caps.append(cap)
111
            elif type(cap) is str:
112
                # Make sure we can convert this string capability into a defined enum
113
                enum_cap = self.capability[cap]
114
                validated_caps.append(enum_cap)
115
        return validated_caps, thresholds
116
117
    def _convert_bytes(self, msg_bytes:bytearray, byte_count):
118
        """Convert bytearry into a set of values based on byte_count per value
119
120
           Args:
121
                msg_bytes (bytearray): Bytes to convert
122
                byte_count (int): How many bytes per value to use when computer (can be 1, 2, or 4)
123
124
           Returns:
125
                If a single value, then just that value
126
                If multiple values, then a list of those values
127
                Value can be either uint8, uint16, or uint32 depending on value of `byte_count`
128
        """
129
        if byte_count == 1:   # just a uint8
130
            val = msg_bytes[0]
131
        elif byte_count == 2: # uint16 little-endian
132
            val = struct.unpack('<H', msg_bytes)[0]
133
        elif byte_count == 4: # uint32 little-endian
134
            val = struct.unpack('<I', msg_bytes)[0]
135
        else:
136
            self.message_error(f'Cannot convert array of {msg_bytes} length {len(msg_bytes)} to python datatype')
137
            val = None
138
        return val
139
140
    async def _parse_combined_sensor_values(self, msg: bytearray):
141
        """
142
            Byte sequence is as follows:
143
                # uint16 where each set bit indicates data value from that mode is present 
144
                  (e.g. 0x00 0x05 means Mode 2 and Mode 0 data is present
145
                # The data from the lowest Mode number comes first in the subsequent bytes
146
                # Each Mode has a number of datasets associated with it (RGB for example is 3 datasets), and
147
                  a byte-width per dataset (RGB dataset is each a uint8)
148
149
            Args:
150
                msg (bytearray) : the sensor message
151
152
            Returns:
153
                None
154
155
            Side-effects:
156
                self.value
157
          
158
        """
159
        msg.pop(0)  # Remove the leading 0 (since we never have more than 7 datasets even with all the combo modes activated
160
        # The next byte is a bit mask of the mode/dataset entries present in this value
161
        modes = msg.pop(0)
162
        dataset_i = 0
163
        for cap in self.capabilities:  # This is the order we prgogramed the sensor
164
            n_datasets, byte_count = self.datasets[cap]
165
            for dataset in range(n_datasets):
166
                if modes & (1<<dataset_i):  # Check if i'th bit of mode is set
167
                    # Data corresponding to this dataset is present!
168
                    # Now, pop off however many bytes are associated with this
169
                    # dataset
170
                    data = msg[0:byte_count]
171
                    msg = msg[byte_count:]
172
                    val = self._convert_bytes(data, byte_count)
173
                    if n_datasets == 1:
174
                        self.value[cap] = val
175
                    else:
176
                        self.value[cap][dataset] = val
177
                dataset_i += 1
178
179
180
181
    async def send_message(self, msg, msg_bytes):
182
        """ Send outgoing message to BLEventQ """
183
        while not self.message_handler:
184
            await sleep(1)
185
        await self.message_handler(msg, msg_bytes)
186
187
    def _convert_speed_to_val(self, speed):
188
        """Map speed of -100 to 100 to a byte range
189
190
            * -100 to 100 (negative means reverse)
191
            * 0 is floating
192
            * 127 is brake
193
194
            Returns:
195
                byte
196
        """
197
        if speed == 127: return 127
198
        if speed > 100: speed = 100
199
        if speed < 0: 
200
            # Now, truncate to 8-bits
201
            speed = speed & 255 # Or I guess I could do 256-abs(s)
202
        return speed
203
204
205
    async def set_output(self, mode, value):
206
        """Don't change this unless you're changing the way you do a Port Output command
207
        
208
           Outputs the following sequence to the sensor
209
            * 0x00 = hub id from common header
210
            * 0x81 = Port Output Command
211
            * port
212
            * 0x11 = Upper nibble (0=buffer, 1=immediate execution), Lower nibble (0=No ack, 1=command feedback)
213
            * 0x51 = WriteDirectModeData
214
            * mode
215
            * value(s)
216
        """
217
        b = [0x00, 0x81, self.port, 0x01, 0x51, mode, value ]
218
        await self.send_message('set output', b)
219
220
221
    # Use these for sensor readings
222
    async def update_value(self, msg_bytes):
223
        """ Callback from message parser to update a value from a sensor incoming message
224
            Depending on the number of capabilities enabled, we end up with different processing:
225
226
            If zero, then just set the `self.value` field to the raw message.
227
228
            If one, then:
229
                * Parse the single sensor message which may have multiple data items (like an RGB color value)
230
                * `self.value` dict entry for this capability becomes a list of these values
231
232
            If multiple, then:
233
                * Parse multiple sensor messages (could be any combination of the enabled modes)
234
                * Set each dict entry to `self.value` to either a list of multiple values or a single value
235
236
        """
237
        msg = bytearray(msg_bytes)
238
        if len(self.capabilities)==0:
239
            self.value = msg
240
        if len(self.capabilities)==1:
241
            capability = self.capabilities[0]
242
            datasets, bytes_per_dataset = self.datasets[capability]
243
            for i in range(datasets):
244
                msg_ptr = i*bytes_per_dataset
245
                val = self._convert_bytes(msg[msg_ptr: msg_ptr+bytes_per_dataset], bytes_per_dataset)
246
                if datasets==1:
247
                    self.value[capability] = val
248
                else:
249
                    self.value[capability][i] = val
250
        if len(self.capabilities) > 1:
251
            await self._parse_combined_sensor_values(msg)
252
253
    async def activate_updates(self):
254
        """ Send a message to the sensor to activate updates
255
256
            Called via an 'attach' message from
257
            :func:`bricknil.messages.Message.parse_attached_io` that triggers
258
            this call from :func:`bricknil.hub.Hub.peripheral_message_loop`
259
260
            See class description for explanation on how Combined Mode updates are done.
261
            
262
            Returns:
263
                None
264
265
        """
266
        
267
        assert self.port is not None, f"Cannot activate updates on sensor before it's been attached to {self.name}!"
268
        
269
        if len(self.capabilities) == 0: 
270
            # Nothing to do since no capabilities defined
271
            return
272
273
        self.value = {}
274
        for cap in self.capabilities:
275
            self.value[cap] = [None]*self.datasets[cap][0]
276
277
        if len(self.capabilities)==1:  # Just a normal single sensor
278
            mode = self.capabilities[0].value
279
            b = [0x00, 0x41, self.port, mode, self.thresholds[0], 0, 0, 0, 1]
280
            await self.send_message(f'Activate SENSOR: port {self.port}', b) 
281
        else:
282
            # Combo mode.  Need to make sure only allowed combinations are preset
283
            # Lock sensor
284
            b = [0x00, 0x42, self.port, 0x02]
285
            await self.send_message(f'Lock port {self.port}', b)
286
287
            for cap, threshold in zip(self.capabilities, self.thresholds):
288
                assert cap in self.allowed_combo, f'{cap} is not allowed to be sensed in combination with others'
289
                # Enable each capability
290
                b = [0x00, 0x41, self.port, cap.value, threshold, 0, 0, 0, 1]
291
                await self.send_message(f'enable mode {cap.value} on {self.port}', b)
292
293
            # Now, set the combination mode/dataset report order
294
            b = [0x00, 0x42, self.port, 0x01, 0x00]
295
            for cap in self.capabilities:
296
                # RGB requires 3 datasets
297
                datasets, byte_width = self.datasets[cap]
298
                for i in range(datasets):
299
                    b.append(16*cap.value+i)  # Mode is higher order nibble, dataset is lower order nibble
300
            await self.send_message(f'Set combo port {self.port}', b)
301
302
            # Unlock and start
303
            b = [0x00, 0x42, self.port, 0x03]
304
            await self.send_message(f'Activate SENSOR multi-update {self.port}', b)
305
306
307
class Motor(Peripheral):
308
    """Utility class for common functions shared between Train Motors, Internal Motors, and External Motors
309
310
    """
311
    def __init__(self, name, port=None, capabilities=[]):
312
        self.speed = 0  # Initialize current speed to 0
313
        self.ramp_in_progress_task = None
314
        super().__init__(name, port, capabilities)
315
316
    async def set_speed(self, speed):
317
        """ Validate and set the train speed
318
319
            If there is an in-progress ramp, and this command is not part of that ramp, 
320
            then cancel that in-progress ramp first, before issuing this set_speed command.
321
322
            Args:
323
                speed (int) : Range -100 to 100 where negative numbers are reverse.
324
                    Use 0 to put the motor into neutral.
325
                    255 will do a hard brake
326
        """
327
        await self._cancel_existing_differet_ramp()
328
        self.speed = speed
329
        self.message_info(f'Setting speed to {speed}')
330
        await self.set_output(0, self._convert_speed_to_val(speed))
331
        
332
    async def _cancel_existing_differet_ramp(self):
333
        """Cancel the existing speed ramp if it was from a different task
334
335
            Remember that speed ramps must be a task with daemon=True, so there is no 
336
            one awaiting its future.
337
        """
338
        # Check if there's a ramp task in progress
339
        if self.ramp_in_progress_task:
340
            # Check if it's this current task or not
341
            current = await current_task()
342
            if current != self.ramp_in_progress_task:
343
                # We're trying to set the speed 
344
                # outside a previously in-progress ramp, so cancel the previous ramp
345
                await self.ramp_in_progress_task.cancel()
346
                self.ramp_in_progress_task = None
347
                self.message_debug(f'Canceling previous speed ramp in progress')
348
349
350
    async def ramp_speed(self, target_speed, ramp_time_ms):
351
        """Ramp the speed by 10 units in the time given
352
353
        """
354
        TIME_STEP_MS = 100 
355
        await self._cancel_existing_differet_ramp()
356
357
        # 500ms ramp time, 100ms per step
358
        # Therefore, number of steps = 500/100 = 5
359
        # Therefore speed_step = speed_diff/5
360
        number_of_steps = ramp_time_ms/TIME_STEP_MS
361
        speed_diff = target_speed - self.speed
362
        speed_step = speed_diff/number_of_steps
363
        start_speed = self.speed
364
        self.message_debug(f'ramp_speed steps: {number_of_steps}, speed_diff: {speed_diff}, speed_step: {speed_step}')
365
        current_step = 0
366
        async def _ramp_speed():
367
            nonlocal current_step  # Since this is being assigned to, we need to mark it as coming from the enclosed scope
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable current_step does not seem to be defined.
Loading history...
368
            while current_step < number_of_steps:
369
                next_speed = int(start_speed + current_step*speed_step)
370
                self.message_debug(f'Setting next_speed: {next_speed}')
371
                current_step +=1 
372
                if current_step == number_of_steps: 
373
                    next_speed = target_speed
374
                await self.set_speed(next_speed)
375
                await sleep(TIME_STEP_MS/1000)
376
            self.ramp_in_progress_task = None
377
378
        self.message_debug(f'Starting ramp of speed: {start_speed} -> {target_speed} ({ramp_time_ms/1000}s)')
379
        self.ramp_in_progress_task = await spawn(_ramp_speed, daemon = True)
380
381
class TachoMotor(Motor):
382
383
    capability = Enum("capability", {"sense_speed":1, "sense_pos":2})
384
385
    datasets = { 
386
                 capability.sense_speed: (1, 1),
387
                 capability.sense_pos: (1, 4),
388
                }
389
    """ Dict of (num_datasets, bytes_per_dataset).
390
       `sense_speed` (1-byte), and `sense_pos` (uint32)"""
391
392
    allowed_combo = [ capability.sense_speed,
393
                      capability.sense_pos,
394
                    ]
395
396
    async def set_pos(self, pos, speed=50, max_power=50):
397
        """Set the absolute position of the motor
398
399
           Everytime the hub is powered up, the zero-angle reference will be reset to the
400
           motor's current position. When you issue this command, the motor will rotate to 
401
           the position given in degrees.  The sign of the pos tells you which direction to rotate:
402
           (1) a positive number will rotate clockwise as looking from end of shaft towards the motor,
403
           (2) a negative number will rotate counter-clockwise
404
405
406
           Examples::
407
408
              await self.motor.set_pos(90)   # Rotate 90 degrees clockwise (looking from end of shaft towards motor)
409
              await self.motor.set_pos(-90)  # Rotate conter-clockwise 90 degrees
410
              await self.motor.set_pos(720)  # Rotate two full circles clockwise
411
412
           Args:
413
              pos (int) : Absolute position in degrees.
414
              speed (int) : Absolute value from 0-100
415
              max_power (int):  Max percentage power that will be applied (0-100%)
416
417
           Notes: 
418
419
               Use command GotoAbsolutePosition
420
                * 0x00 = hub id
421
                * 0x81 = Port Output command
422
                * port
423
                * 0x11 = Upper nibble (0=buffer, 1=immediate execution), Lower nibble (0=No ack, 1=command feedback)
424
                * 0x0d = Subcommand
425
                * abs_pos (int32)
426
                * speed -100 - 100
427
                * max_power abs(0-100%)
428
                * endstate = 0 (float), 126 (hold), 127 (brake)
429
                * Use Accel profile = (bit 0 = acc profile, bit 1 = decc profile)
430
                *
431
        """
432
        abs_pos = list(struct.pack('i', pos))
433
        speed = self._convert_speed_to_val(speed)
434
435
        b = [0x00, 0x81, self.port, 0x01, 0x0d] + abs_pos + [speed, max_power, 126, 3]
436
        await self.send_message(f'set pos {pos} with speed {speed}', b)
437
438
439
    async def rotate(self, degrees, speed, max_power=50):
440
        """Rotate the given number of degrees from current position, with direction given by sign of speed
441
442
           Examples::
443
444
              await self.motor.rotate(90, speed=50)   # Rotate 90 degrees clockwise (looking from end of shaft towards motor)
445
              await self.motor.set_pos(90, speed=-50)  # Rotate conter-clockwise 90 degrees
446
              await self.motor.set_pos(720, speed=50)  # Rotate two full circles clockwise
447
448
           Args:
449
              degrees (uint) : Relative number of degrees to rotate
450
              speed (int) : -100 to 100
451
              max_power (int):  Max percentage power that will be applied (0-100%)
452
453
           Notes: 
454
455
               Use command StartSpeedForDegrees
456
                * 0x00 = hub id
457
                * 0x81 = Port Output command
458
                * port
459
                * 0x11 = Upper nibble (0=buffer, 1=immediate execution), Lower nibble (0=No ack, 1=command feedback)
460
                * 0x0b = Subcommand
461
                * degrees (int32) 0..1000000
462
                * speed -100 - 100%
463
                * max_power abs(0-100%)
464
                * endstate = 0 (float), 126 (hold), 127 (brake)
465
                * Use Accel profile = (bit 0 = acc profile, bit 1 = decc profile)
466
                *
467
        """
468
        degrees = list(struct.pack('i', degrees))
469
        speed = self._convert_speed_to_val(speed)
470
471
        b = [0x00, 0x81, self.port, 0x01, 0x0b] + degrees + [speed, max_power, 126, 3]
472
        await self.send_message(f'rotate {degrees} deg with speed {speed}', b)
473
474
475
    async def ramp_speed2(self, target_speed, ramp_time_ms):
476
        """Experimental function, not implemented yet DO NOT USE
477
        """
478
        # Set acceleration profile
479
        delta_speed = target_speed - self.speed
480
        zero_100_ramp_time_ms = int(ramp_time_ms/delta_speed * 100.0) 
481
        zero_100_ramp_time_ms = zero_100_ramp_time_ms % 10000 # limit time to 10s
482
483
        hi = (zero_100_ramp_time_ms >> 8) & 255
484
        lo = zero_100_ramp_time_ms & 255
485
486
        profile = 1
487
        b = [0x00, 0x81, self.port, 0x01, 0x05, 10, 10, profile]
488
        await self.send_message(f'set accel profile {zero_100_ramp_time_ms} {hi} {lo} ', b)
489
        b = [0x00, 0x81, self.port, 0x01, 0x07, self._convert_speed_to_val(target_speed), 80, 1]
490
        await self.send_message('set speed', b)
491