GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Test Failed
Push — master ( 405151...72bf9f )
by Virantha
01:31
created

bricknil.peripheral.Motor.__init__()   A

Complexity

Conditions 1

Size

Total Lines 5
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 4
nop 4
dl 0
loc 5
rs 10
c 0
b 0
f 0
1
# Copyright 2019 Virantha N. Ekanayake 
2
# 
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
# 
7
# http://www.apache.org/licenses/LICENSE-2.0
8
# 
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14
15
"""Base class for all sensors and motors
16
17
"""
18
import struct
19
from .process import Process
20
from curio import sleep, spawn, current_task
21
from .const import DEVICES
22
23
24
class Peripheral(Process):
25
    """Abstract base class for any Lego Boost/PoweredUp/WeDo peripherals
26
27
       A LEGO sensor can provide either a single_ sensing capability, or a combined_  mode where it returns multiple
28
       sensing values.  All the details can be found in the official protocol description.
29
30
       * **Single capability** - This is the easiest to handle:
31
            * Send a 0x41 Port Input Format Setup command to put the sensor port into the respective mode and activate updates
32
            * Read back the 0x45 Port Value(Single) messages with updates from the sensor on the respective mode
33
       * **Multiple capabilities** - This is more complicated because we need to put the sensor port into CombinedMode
34
            * Send a [0x42, port, 0x02] message to lock the port
35
            * Send multiple 0x41 messages to activate each capability/mode we want updates from
36
            * Send a [0x42, port, 0x01, ..] message with the following bytes:
37
                * 0x00 = Row entry 0 in the supported combination mode table
38
                    (hard-coded for simplicity here because LEGO seems to only use this entry most of the time)
39
                * For each mode/capability, send a byte like the following:
40
                    * Upper 4-bits is mode number
41
                    * Lower 4-bits is the dataset number
42
                    * For example, for getting RGB values, it's mode 6, and we want all three datasets 
43
                        (for each color), so we'd add three bytes [0x60, 0x61, 0x62].  
44
                        If you just wanted the Red value, you just append [0x60]
45
            * Send a [0x42, port, 0x03] message to unlock the port
46
            * Now, when the sensor sends back values, it uses 0x46 messages with the following byte sequence:
47
                * Port id
48
                * 16-bit entry where the true bits mark which mode has values included in this message
49
                    (So 0x00 0x05 means values from Modes 2 and 0)
50
                * Then the set of values from the sensor, which are ordered by Mode number 
51
                    (so the sensor reading from mode 0 would come before the reading from mode 2)
52
                * Each set of values includes however many bytes are needed to represent each dataset
53
                    (for example, up to 3 for RGB colors), and the byte-width of each value (4 bytes for a 32-bit int)
54
55
56
       .. _single: https://lego.github.io/lego-ble-wireless-protocol-docs/index.html#port-input-format-single
57
       .. _combined: https://lego.github.io/lego-ble-wireless-protocol-docs/index.html#port-input-format-combinedmode
58
59
       Args:
60
          capabilities : can be input in the following formats (where the
61
            number in the tuple can be a threshold to trigger updates)
62
63
               * ['sense_color', 'sense_distannce'] 
64
               * [capability.sense_color, capability.sense_distance]
65
               * [('sense_color', 1), ('sense_distance', 2)]
66
67
          name (str) : Human readable name
68
          port (int) : Port to connect to (otherwise will connect to first matching peripheral with defined sensor_id)
69
           
70
71
       Attributes:
72
            port (int) : Physical port on the hub this Peripheral attaches to
73
            sensor_name (str) : Name coming out of `const.DEVICES`
74
            enabled (bool) : `True` when the Hub sends an attach message. For future use
75
            value (dict) : Sensor readings get dumped into this dict
76
            message_handler (func) : Outgoing message queue to `BLEventQ` that's set by the Hub when an attach message is seen
77
            capabilites (list [ `capability` ]) : Support capabilities 
78
            thresholds (list [ int ]) : Integer list of thresholds for updates for each of the sensing capabilities
79
80
    """
81
    _DEFAULT_THRESHOLD = 1
82
    def __init__(self, name, port=None, capabilities=[]):
83
        super().__init__(name)
84
        self.port = port
85
        self.sensor_name = DEVICES[self._sensor_id]
86
        self.enabled = False
87
        self.value = None
88
        self.message_handler = None
89
        self.capabilities, self.thresholds = self._get_validated_capabilities(capabilities)
90
91
    def _get_validated_capabilities(self, caps):
92
        """Convert capabilities in different formats (string, tuple, etc)
93
94
           Returns:
95
                
96
                validated_caps, thresholds  (list[`capability`], list[int]): list of capabilities and list of associated thresholds
97
        """
98
        validated_caps = []
99
        thresholds = [1]*len(validated_caps)
100
        for cap in caps:
101
            # Capability can be a tuple of (cap, threshold)
102
            if isinstance(cap, tuple):
103
                cap, threshold = cap
104
                thresholds.append(threshold)
105
            else:
106
                thresholds.append(self._DEFAULT_THRESHOLD)
107
108
            if isinstance(cap, self.capability):
109
                # Make sure it's the write type of enumerated capability
110
                validated_caps.append(cap)
111
            elif type(cap) is str:
112
                # Make sure we can convert this string capability into a defined enum
113
                enum_cap = self.capability[cap]
114
                validated_caps.append(enum_cap)
115
        return validated_caps, thresholds
116
117
    def _convert_bytes(self, msg_bytes:bytearray, byte_count):
118
        """Convert bytearry into a set of values based on byte_count per value
119
120
           Args:
121
                msg_bytes (bytearray): Bytes to convert
122
                byte_count (int): How many bytes per value to use when computer (can be 1, 2, or 4)
123
124
           Returns:
125
                If a single value, then just that value
126
                If multiple values, then a list of those values
127
                Value can be either uint8, uint16, or uint32 depending on value of `byte_count`
128
        """
129
        if byte_count == 1:   # just a uint8
130
            val = msg_bytes[0]
131
        elif byte_count == 2: # uint16 little-endian
132
            val = struct.unpack('<H', msg_bytes)[0]
133
        elif byte_count == 4: # uint32 little-endian
134
            val = struct.unpack('<I', msg_bytes)[0]
135
        else:
136
            self.message_error(f'Cannot convert array of {msg_bytes} length {len(msg_bytes)} to python datatype')
137
            val = None
138
        return val
139
140
    def _parse_combined_sensor_values(self, msg: bytearray):
141
        """
142
            Byte sequence is as follows:
143
                # uint16 where each set bit indicates data value from that mode is present 
144
                  (e.g. 0x00 0x05 means Mode 2 and Mode 0 data is present
145
                # The data from the lowest Mode number comes first in the subsequent bytes
146
                # Each Mode has a number of datasets associated with it (RGB for example is 3 datasets), and
147
                  a byte-width per dataset (RGB dataset is each a uint8)
148
149
            Args:
150
                msg (bytearray) : the sensor message
151
152
            Returns:
153
                None
154
155
            Side-effects:
156
                self.value
157
          
158
        """
159
        msg.pop(0)  # Remove the leading 0 (since we never have more than 7 datasets even with all the combo modes activated
160
        # The next byte is a bit mask of the mode/dataset entries present in this value
161
        modes = msg.pop(0)
162
        dataset_i = 0
163
        for cap in self.capabilities:  # This is the order we prgogramed the sensor
164
            n_datasets, byte_count = self.datasets[cap]
165
            for dataset in range(n_datasets):
166
                if modes & (1<<dataset_i):  # Check if i'th bit of mode is set
167
                    # Data corresponding to this dataset is present!
168
                    # Now, pop off however many bytes are associated with this
169
                    # dataset
170
                    data = msg[0:byte_count]
171
                    msg = msg[byte_count:]
172
                    val = self._convert_bytes(data, byte_count)
173
                    if n_datasets == 1:
174
                        self.value[cap] = val
175
                    else:
176
                        self.value[cap][dataset] = val
177
                dataset_i += 1
178
179
180
181
    async def send_message(self, msg, msg_bytes):
182
        """ Send outgoing message to BLEventQ """
183
        while not self.message_handler:
184
            await sleep(1)
185
        await self.message_handler(msg, msg_bytes)
186
187
    def _convert_speed_to_val(self, speed):
188
        """Map speed of -100 to 100 to a byte range
189
190
            * -100 to 100 (negative means reverse)
191
            * 0 is floating
192
            * 127 is brake
193
194
            Returns:
195
                byte
196
        """
197
        if speed == 127: return 127
198
        if speed > 100: speed = 100
199
        if speed < 0: 
200
            # Now, truncate to 8-bits
201
            speed = speed & 255 # Or I guess I could do 256-abs(s)
202
        return speed
203
204
205
    async def set_output(self, mode, value):
206
        """Don't change this unless you're changing the way you do a Port Output command
207
        
208
           Outputs the following sequence to the sensor
209
            * 0x00 = hub id from common header
210
            * 0x81 = Port Output Command
211
            * port
212
            * 0x11 = Upper nibble (0=buffer, 1=immediate execution), Lower nibble (0=No ack, 1=command feedback)
213
            * 0x51 = WriteDirectModeData
214
            * mode
215
            * value(s)
216
        """
217
        b = [0x00, 0x81, self.port, 0x11, 0x51, mode, value ]
218
        await self.send_message('set output', b)
219
220
221
    # Use these for sensor readings
222
    def update_value(self, msg_bytes):
223
        """ Callback from message parser to update a value from a sensor incoming message
224
            Depending on the number of capabilities enabled, we end up with different processing:
225
226
            If zero, then just set the `self.value` field to the raw message.
227
228
            If one, then:
229
                * Parse the single sensor message which may have multiple data items (like an RGB color value)
230
                * `self.value` dict entry for this capability becomes a list of these values
231
232
            If multiple, then:
233
                * Parse multiple sensor messages (could be any combination of the enabled modes)
234
                * Set each dict entry to `self.value` to either a list of multiple values or a single value
235
236
        """
237
        msg = bytearray(msg_bytes)
238
        if len(self.capabilities)==0:
239
            self.value = msg
240
        if len(self.capabilities)==1:
241
            capability = self.capabilities[0]
242
            datasets, bytes_per_dataset = self.datasets[capability]
243
            for i in range(datasets):
244
                msg_ptr = i*bytes_per_dataset
245
                val = self._convert_bytes(msg[msg_ptr: msg_ptr+bytes_per_dataset], bytes_per_dataset)
246
                if datasets==1:
247
                    self.value[capability] = val
248
                else:
249
                    self.value[capability][i] = val
250
        if len(self.capabilities) > 1:
251
            self._parse_combined_sensor_values(msg)
252
253
    async def activate_updates(self):
254
        """ Send a message to the sensor to activate updates
255
256
            Called via an 'attach' message from
257
            :func:`bricknil.messages.Message.parse_attached_io` that triggers
258
            this call from :func:`bricknil.hub.Hub.peripheral_message_loop`
259
260
            See class description for explanation on how Combined Mode updates are done.
261
            
262
            Returns:
263
                None
264
265
        """
266
        
267
        assert self.port is not None, f"Cannot activate updates on sensor before it's been attached to {self.name}!"
268
        
269
        if len(self.capabilities) == 0: 
270
            # Nothing to do since no capabilities defined
271
            return
272
273
        self.value = {}
274
        for cap in self.capabilities:
275
            self.value[cap] = [None]*self.datasets[cap][0]
276
277
        if len(self.capabilities)==1:  # Just a normal single sensor
278
            mode = self.capabilities[0].value
279
            b = [0x00, 0x41, self.port, mode, self.thresholds[0], 0, 0, 0, 1]
280
            await self.send_message(f'Activate SENSOR: port {self.port}', b) 
281
        else:
282
            # Combo mode.  Need to make sure only allowed combinations are preset
283
            # Lock sensor
284
            b = [0x00, 0x42, self.port, 0x02]
285
            await self.send_message(f'Lock port {self.port}', b)
286
287
            for cap, threshold in zip(self.capabilities, self.thresholds):
288
                assert cap in self.allowed_combo, f'{cap} is not allowed to be sensed in combination with others'
289
                # Enable each capability
290
                b = [0x00, 0x41, self.port, cap.value, threshold, 0, 0, 0, 1]
291
                await self.send_message(f'enable mode {cap.value} on {self.port}', b)
292
293
            # Now, set the combination mode/dataset report order
294
            b = [0x00, 0x42, self.port, 0x01, 0x00]
295
            for cap in self.capabilities:
296
                # RGB requires 3 datasets
297
                datasets, byte_width = self.datasets[cap]
298
                for i in range(datasets):
299
                    b.append(16*cap.value+i)  # Mode is higher order nibble, dataset is lower order nibble
300
            await self.send_message(f'Set combo port {self.port}', b)
301
302
            # Unlock and start
303
            b = [0x00, 0x42, self.port, 0x03]
304
            await self.send_message(f'Activate SENSOR multi-update {self.port}', b)
305
306
307
class Motor(Peripheral):
308
    """Utility functions for a train motor.
309
310
    """
311
    def __init__(self, name, port=None, capabilities=[]):
312
        """Initialize current speed to 0"""
313
        self.speed = 0
314
        self.ramp_in_progress_task = None
315
        super().__init__(name, port, capabilities)
316
317
    async def set_speed(self, speed):
318
        """ Validate and set the train speed
319
320
            If there is an in-progress ramp, and this command is not part of that ramp, 
321
            then cancel that in-progress ramp first, before issuing this set_speed command.
322
323
            Args:
324
                speed (int) : Range -100 to 100 where negative numbers are reverse.
325
                    Use 0 to put the motor into neutral.
326
                    255 will do a hard brake
327
        """
328
        await self._cancel_existing_differet_ramp()
329
        self.speed = speed
330
        self.message_info(f'Setting speed to {speed}')
331
        await self.set_output(0, self._convert_speed_to_val(speed))
332
        
333
    async def _cancel_existing_differet_ramp(self):
334
        """Cancel the existing speed ramp if it was from a different task
335
336
            Remember that speed ramps must be a task with daemon=True, so there is no 
337
            one awaiting its future.
338
        """
339
        # Check if there's a ramp task in progress
340
        if self.ramp_in_progress_task:
341
            # Check if it's this current task or not
342
            current = await current_task()
343
            if current != self.ramp_in_progress_task:
344
                # We're trying to set the speed 
345
                # outside a previously in-progress ramp, so cancel the previous ramp
346
                await self.ramp_in_progress_task.cancel()
347
                self.ramp_in_progress_task = None
348
                self.message_info(f'Canceling previous speed ramp in progress')
349
350
351
    async def ramp_speed(self, target_speed, ramp_time_ms):
352
        """Ramp the speed by 10 units in the time given
353
354
        """
355
        TIME_STEP_MS = 100 
356
        await self._cancel_existing_differet_ramp()
357
358
        # 500ms ramp time, 100ms per step
359
        # Therefore, number of steps = 500/100 = 5
360
        # Therefore speed_step = speed_diff/5
361
        number_of_steps = ramp_time_ms/TIME_STEP_MS
362
        speed_diff = target_speed - self.speed
363
        speed_step = speed_diff/number_of_steps
364
        start_speed = self.speed
365
        self.message(f'ramp_speed steps: {number_of_steps}, speed_diff: {speed_diff}, speed_step: {speed_step}')
366
        current_step = 0
367
        async def _ramp_speed():
368
            nonlocal current_step  # Since this is being assigned to, we need to mark it as coming from the enclosed scope
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable current_step does not seem to be defined.
Loading history...
369
            while current_step < number_of_steps:
370
                next_speed = int(start_speed + current_step*speed_step)
371
                self.message(f'Setting next_speed: {next_speed}')
372
                current_step +=1 
373
                if current_step == number_of_steps: 
374
                    next_speed = target_speed
375
                await self.set_speed(next_speed)
376
                await sleep(TIME_STEP_MS/1000)
377
            self.ramp_in_progress_task = None
378
379
        self.message_info(f'Starting ramp of speed: {start_speed} -> {target_speed} ({ramp_time_ms/1000}s)')
380
        self.ramp_in_progress_task = await spawn(_ramp_speed, daemon = True)
381
382
383