GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Test Failed
Push — master ( 6835a9...405151 )
by Virantha
01:32
created

bricknil.sensor.DuploTrainMotor.__init__()   A

Complexity

Conditions 1

Size

Total Lines 5
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 4
nop 4
dl 0
loc 5
rs 10
c 0
b 0
f 0
1
# Copyright 2019 Virantha N. Ekanayake 
2
# 
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
# 
7
# http://www.apache.org/licenses/LICENSE-2.0
8
# 
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14
15
"""Actual sensor and motor peripheral definitions from Boost and PoweredUp
16
"""
17
from curio import sleep, current_task, spawn  # Needed for motor speed ramp
18
19
from enum import Enum, IntEnum
20
from .const import Color
21
22
from .peripheral import Peripheral
23
24
25
class InternalMotor(Peripheral):
26
    """ Access the internal motor(s) in the Boost Move Hub.
27
28
        Unlike the train motors, these motors have a built-in sensor for sending back
29
        the motor's current speed and position.  You don't need to use the sensors, and
30
        can treat this as strictly an output.
31
32
        .. graphviz::
33
34
            digraph {
35
               "from" -> "to"
36
            }
37
38
39
        Examples::
40
41
            # Basic connection to the motor on Port A
42
            @attach(InternalMotor, 'left_motor', port=InternalMotor.Port.A)
43
44
            # Basic connection to both motors at the same time (virtual I/O port).
45
            # Any speed command will cause both motors to rotate at the same speed
46
            @attach(InternalMotor, 'motors', port=InternalMotor.Port.AB)
47
48
            # Report back when motor speed changes. You must have a motors_change method defined 
49
            @attach(InternalMotor, 'motors', port=InternalMotor.Port.A, capabilities=['sense_speed'])
50
            # Only report back when speed change exceeds 5 units
51
            @attach(InternalMotor, 'motors', port=InternalMotor.Port.A, capabilities=[('sense_speed', 5)])
52
53
        See Also:
54
            :class:`TrainMotor` for connecting to a train motor
55
56
    """
57
    _sensor_id = 0x0027
58
    _DEFAULT_THRESHOLD=2
59
    """Set to 2 to avoid a lot of updates since the speed seems to oscillate a lot"""
60
61
    capability = Enum("capability", {"sense_speed":1, "sense_pos":2})
62
63
    Port = Enum('Port', 'A B AB', start=0)
64
    """Address either motor A or Motor B, or both AB at the same time"""
65
66
    # Dict of cap: (num_datasets, bytes_per_dataset)
67
    datasets = { capability.sense_speed: (1, 1),
68
                 capability.sense_pos: (1, 4),
69
                }
70
    """ Dict of (num_datasets, bytes_per_dataset).
71
       `sense_speed` (1-byte), and `sense_pos` (uint32)"""
72
73
    allowed_combo = [ capability.sense_speed,
74
                      capability.sense_pos,
75
                    ]
76
77
    def __init__(self, name, port=None, capabilities=[]):
78
        """Maps the port names `A`, `B`, `AB` to hard-coded port numbers"""
79
        if port:
80
            port_map = [55, 56, 57]
81
            port = port_map[port.value]
82
        self.speed = 0
83
        super().__init__(name, port, capabilities)
84
    
85
    async def set_speed(self, speed):
86
        """Sets the speed of the motor, and calls the :func:`bricknil.peripheral.Peripheral._convert_speed_to_val` method
87
           to do some sanity checking and bounding.
88
89
           Args:
90
               speed (int) : -100 to 100 (I believe this is a percentage)
91
        """
92
        self.speed = speed
93
        speed = self._convert_speed_to_val(speed)
94
        mode = 0
95
        await self.set_output(mode, speed)
96
97
    async def ramp_speed2(self, target_speed, ramp_time_ms):
98
        
99
        # Set acceleration profile
100
        delta_speed = target_speed - self.speed
101
        zero_100_ramp_time_ms = int(ramp_time_ms/delta_speed * 100.0) 
102
        zero_100_ramp_time_ms = zero_100_ramp_time_ms % 10000 # limit time to 10s
103
104
        hi = (zero_100_ramp_time_ms >> 8) & 255
105
        lo = zero_100_ramp_time_ms & 255
106
107
        profile = 1
108
        b = [0x00, 0x81, self.port, 0x01, 0x05, 10, 10, profile]
109
        await self.send_message(f'set accel profile {zero_100_ramp_time_ms} {hi} {lo} ', b)
110
        b = [0x00, 0x81, self.port, 0x01, 0x07, self._convert_speed_to_val(target_speed), 80, 1]
111
        await self.send_message('set speed', b)
112
        
113
class VisionSensor(Peripheral):
114
    """ Access the Boost Vision/Distance Sensor
115
116
        Only the sensing capabilities of this sensor is supported right now.
117
118
        - *sense_color*: Returns one of the 10 predefined colors
119
        - *sense_distance*: Distance from 0-7 in roughly inches
120
        - *sense_count*: Running count of waving your hand/item over the sensor (32-bit)
121
        - *sense_reflectivity*: Under distances of one inch, the inverse of the distance
122
        - *sense_ambient*: Distance under one inch (so inverse of the preceeding)
123
        - *sense_rgb*: R, G, B values (3 sets of uint16)
124
125
        Any combination of sense_color, sense_distance, sense_count, sense_reflectivity, 
126
        and sense_rgb is supported.
127
128
        Examples::
129
130
            # Basic distance sensor
131
            @attach(VisionSensor, 'vision', capabilities=['sense_color'])
132
            # Or use the capability Enum
133
            @attach(VisionSensor, 'vision', capabilities=[VisionSensor.capability.sense_color])
134
135
            # Distance and color sensor
136
            @attach(VisionSensor, 'vision', capabilities=['sense_color', 'sense_distance'])
137
138
            # Distance and rgb sensor with different thresholds to trigger updates
139
            @attach(VisionSensor, 'vision', capabilities=[('sense_color', 1), ('sense_rgb', 5)])
140
141
        The values returned by the sensor will always be available in the instance variable
142
        `self.value`.  For example, when the `sense_color` and `sense_rgb` capabilities are 
143
        enabled, the following values will be stored and updated::
144
145
            self.value = { VisionSensor.capability.sense_color:  uint8,
146
                           VisionSensor.capability.sense_rgb: 
147
                                            [ uint16, uint16, uint16 ]
148
                         }
149
150
        Notes:
151
            The actual modes supported by the sensor are as follows:
152
153
            -  0 = color (0-10)
154
            -  1 = IR proximity (0-7)
155
            -  2 = count (32-bit int)
156
            -  3 = Reflt   (inverse of distance when closer than 1")
157
            -  4 = Amb  (distance when closer than 1")
158
            -  5 = COL (output) ?
159
            -  6 = RGB I
160
            -  7 = IR tx (output) ?
161
            -  8 = combined:  Color byte, Distance byte, 0xFF, Reflected light
162
163
    """
164
165
    _sensor_id = 0x0025
166
    capability = Enum("capability", 
167
                      [('sense_color', 0),
168
                       ('sense_distance', 1),
169
                       ('sense_count', 2),
170
                       ('sense_reflectivity', 3),
171
                       ('sense_ambient', 4),
172
                       ('sense_rgb', 6),
173
                       ])
174
175
    datasets = { capability.sense_color: (1, 1),
176
                 capability.sense_distance: (1, 1),
177
                 capability.sense_count: (1, 4),  # 4-bytes (32-bit)
178
                 capability.sense_reflectivity: (1, 1),
179
                 capability.sense_ambient: (1, 1),
180
                 capability.sense_rgb: (3, 2)   # 3 16-bit values
181
                }
182
183
    allowed_combo = [ capability.sense_color,
184
                      capability.sense_distance,
185
                      capability.sense_count,
186
                      capability.sense_reflectivity,
187
                      capability.sense_rgb,
188
                    ]
189
190
class InternalTiltSensor(Peripheral):
191
    """
192
        Access the internal tilt sensor in the Boost Move Hub.
193
        
194
        The various modes are:
195
196
        - **sense_angle** - X, Y angles.  Both are 0 if hub is lying flat with button up
197
        - **sense_tilt** - value from 0-9 if hub is tilted around any of its axis. Seems to be
198
          a rough mesaure of how much the hub is tilted away from lying flat.
199
          There is no update for just a translation along an axis
200
        - **sense_orientation** - returns one of the nine orientations below (0-9)
201
            - `InternalTiltSensor.orientation`.up = flat with button on top
202
            - `InternalTiltSensor.orientation`.right - standing up on side closest to button
203
            - `InternalTiltSensor.orientation`.left - standing up on side furthest from button
204
            - `InternalTiltSensor.orientation`.far_side - on long face facing away
205
            - `InternalTiltSensor.orientation`.near_side -  on long face facing you
206
            - `InternalTiltSensor.orientation`.down - upside down
207
        - **sense_impact** - 32-bit count of impacts to sensor
208
        - **sense_acceleration_3_axis** - 3 bytes of raw accelerometer data.
209
210
        Any combination of the above modes are allowed.
211
212
        Examples::
213
214
            # Basic tilt sensor
215
            @attach(InternalTiltSensor, 'tilt', capabilities=['sense_tilt'])
216
            # Or use the capability Enum
217
            @attach(InternalTiltSensor, 'tilt', capabilities=[InternalTiltSensor.sense_tilt])
218
219
            # Tilt and orientation sensor
220
            @attach(InternalTiltSensor, 'tilt', capabilities=['sense_tilt, sense_orientation'])
221
222
        The values returned by the sensor will always be available in the
223
        instance variable `self.value`.  For example, when the `sense_angle`
224
        and `sense_orientation` capabilities are enabled, the following values
225
        will be stored and updated::
226
227
            self.value = { InternalTiltSensor.capability.sense_angle:  [uint8, uint8],
228
                           InternalTiltSensor.capability.sense_orientation: 
229
                                            Enum(InternalTiltSensor.orientation)
230
                         }
231
    """
232
    _sensor_id = 0x0028
233
    capability = Enum("capability", 
234
                      [('sense_angle', 0),
235
                       ('sense_tilt', 1),
236
                       ('sense_orientation', 2),
237
                       ('sense_impact', 3),
238
                       ('sense_acceleration_3_axis', 4),
239
                       ])
240
241
    datasets = { capability.sense_angle: (2, 1),
242
                 capability.sense_tilt: (1, 1),
243
                 capability.sense_orientation: (1, 1),  
244
                 capability.sense_impact: (1, 4),
245
                 capability.sense_acceleration_3_axis: (3, 1),
246
                }
247
248
    allowed_combo = [ capability.sense_angle,
249
                      capability.sense_tilt,
250
                      capability.sense_orientation,
251
                      capability.sense_impact,
252
                      capability.sense_acceleration_3_axis,
253
                    ]
254
255
    orientation = Enum('orientation', 
256
                        {   'up': 0,
257
                            'right': 1, 
258
                            'left': 2, 
259
                            'far_side':3,
260
                            'near_side':4,
261
                            'down':5,
262
                        })
263
264
265
    def update_value(self, msg_bytes):
266
        """If sense_orientation, then substitute the `IntenalTiltSensor.orientation`
267
           enumeration value into the self.value dict.  Otherwise, don't do anything
268
           special to the self.value dict.
269
        """
270
        super().update_value(msg_bytes)
271
        so = self.capability.sense_orientation
272
        if so in self.value:
273
            self.value[so] = self.orientation(self.value[so])
274
275
276
class LED(Peripheral):
277
    """ Changes the LED color on the Hubs::
278
279
            @attach(LED, 'hub_led')
280
281
            self.hub_led.set_output(Color.red)
282
283
        Warnings:
284
            No support yet for the standalone LEDs that connect to the Hub ports.
285
286
    """
287
    _sensor_id = 0x0017
288
289
    async def set_color(self, color: Color):
290
        """ Converts a Color enumeration to a color value"""
291
292
        # For now, only support preset colors
293
        assert isinstance(color, Color)
294
        col = color.value
295
        assert col < 11
296
        mode = 0
297
        await self.set_output(mode, col)
298
        #b = [0x00, 0x81, self.port, 0x11, 0x51, mode, col ]
299
        #await self.message_info(f'set color to {color}')
300
301
class TrainMotor(Peripheral):
302
    """
303
        Connects to the train motors.
304
305
        TrainMotor has no sensing capabilities and only supports a single output mode that
306
        sets the speed.
307
308
        Examples::
309
310
            @attach(TrainMotor, 'train')
311
312
        And then within the run body, use::
313
314
            self.train.set_speed(speed)
315
316
        Attributes:
317
            speed (int) : Keep track of the current speed in order to ramp it
318
319
        See Also:
320
            `InternalMotor`
321
    """
322
    _sensor_id = 0x0002
323
324
    def __init__(self, name, port=None, capabilities=[]):
325
        """Initialize current speed to 0"""
326
        self.speed = 0
327
        self.ramp_in_progress_task = None
328
        super().__init__(name, port, capabilities)
329
330
    async def set_speed(self, speed):
331
        """ Validate and set the train speed
332
333
            If there is an in-progress ramp, and this command is not part of that ramp, 
334
            then cancel that in-progress ramp first, before issuing this set_speed command.
335
336
            Args:
337
                speed (int) : Range -100 to 100 where negative numbers are reverse.
338
                    Use 0 to put the motor into neutral.
339
                    255 will do a hard brake
340
        """
341
342
        await self._cancel_existing_differet_ramp()
343
        self.speed = speed
344
        self.message_info(f'Setting speed to {speed}')
345
        await self.set_output(0, self._convert_speed_to_val(speed))
346
        
347
    async def _cancel_existing_differet_ramp(self):
348
        """Cancel the existing speed ramp if it was from a different task
349
350
            Remember that speed ramps must be a task with daemon=True, so there is no 
351
            one awaiting its future.
352
        """
353
        # Check if there's a ramp task in progress
354
        if self.ramp_in_progress_task:
355
            # Check if it's this current task or not
356
            current = await current_task()
357
            if current != self.ramp_in_progress_task:
358
                # We're trying to set the speed 
359
                # outside a previously in-progress ramp, so cancel the previous ramp
360
                await self.ramp_in_progress_task.cancel()
361
                self.ramp_in_progress_task = None
362
                self.message_info(f'Canceling previous speed ramp in progress')
363
364
365
366
367
    async def ramp_speed(self, target_speed, ramp_time_ms):
368
        """Ramp the speed by 10 units in the time given
369
370
        """
371
        TIME_STEP_MS = 100 
372
        await self._cancel_existing_differet_ramp()
373
374
        # 500ms ramp time, 100ms per step
375
        # Therefore, number of steps = 500/100 = 5
376
        # Therefore speed_step = speed_diff/5
377
        number_of_steps = ramp_time_ms/TIME_STEP_MS
378
        speed_diff = target_speed - self.speed
379
        speed_step = speed_diff/number_of_steps
380
        start_speed = self.speed
381
        self.message(f'ramp_speed steps: {number_of_steps}, speed_diff: {speed_diff}, speed_step: {speed_step}')
382
        current_step = 0
383
        async def _ramp_speed():
384
            nonlocal current_step  # Since this is being assigned to, we need to mark it as coming from the enclosed scope
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable current_step does not seem to be defined.
Loading history...
385
            while current_step < number_of_steps:
386
                next_speed = int(start_speed + current_step*speed_step)
387
                self.message(f'Setting next_speed: {next_speed}')
388
                current_step +=1 
389
                if current_step == number_of_steps: 
390
                    next_speed = target_speed
391
                await self.set_speed(next_speed)
392
                await sleep(TIME_STEP_MS/1000)
393
            self.ramp_in_progress_task = None
394
395
        self.message_info(f'Starting ramp of speed: {start_speed} -> {target_speed} ({ramp_time_ms/1000}s)')
396
        self.ramp_in_progress_task = await spawn(_ramp_speed, daemon = True)
397
398
399
class RemoteButtons(Peripheral):
400
    """Represents one set of '+', '-', 'red' buttons on the PoweredHub Remote
401
402
       Each remote has two sets of buttons, on the left and right side.  Pick the one
403
       your want to attach to by using the port argument with either Port.L or Port.R.
404
405
       There are actually a few different modes that the hardware supports, but we are
406
       only going to use one of them called 'KEYSD' (see the notes in the documentation on the
407
       raw values reported by the hub).  This mode makes the remote send three values back
408
       in a list.  To access each button state, there are three helper methods provided 
409
       (see below)
410
411
       Examples::
412
413
            # Basic connection to the left buttons
414
            @attach(RemoteButtons, 'left_buttons', port=RemoteButtons.Port.L)
415
416
            # Getting values back in the handler
417
            async def left_buttons_change(self):
418
419
                is_plus_pressed = self.left_buttons.plus_pressed()
420
                is_minus_pressed = self.left_buttons.minus_pressed()
421
                is_red_pressed = self.left_buttons.red_pressed()
422
423
    """
424
425
    _sensor_id = 0x0037
426
    Port = Enum('Port', 'L R', start=0)
427
    Button = IntEnum('Button', 'PLUS RED MINUS', start=0)
428
    """The button index in the value list returned by the sensor"""
429
430
    capability = Enum('capability', {'sense_press':4},)
431
432
    datasets = { capability.sense_press: (3,1) }
433
    allowed_combo = []
434
435
    def __init__(self, name, port=None, capabilities=[]):
436
        """Maps the port names `L`, `R`"""
437
        if port:
438
            port = port.value
439
        super().__init__(name, port, capabilities)
440
441
    def plus_pressed(self):
442
        """Return whether `value` reflects that the PLUS button is pressed"""
443
        button_list = self.value[self.capability.sense_press]
444
        return button_list[self.Button.PLUS] == 1
445
    def minus_pressed(self):
446
        """Return whether `value` reflects that the MINUS button is pressed"""
447
        button_list = self.value[self.capability.sense_press]
448
        return button_list[self.Button.MINUS] == 1
449
    def red_pressed(self):
450
        """Return whether `value` reflects that the RED button is pressed"""
451
        button_list = self.value[self.capability.sense_press]
452
        return button_list[self.Button.RED] == 1
453
454
class Button(Peripheral):
455
    """ Register to be notified of button presses on the Hub (Boost or PoweredUp)
456
457
        This is actually a slight hack, since the Hub button is not a peripheral that is 
458
        attached like other sensors in the Lego protocol.  Instead, the buttons are accessed
459
        through Hub property messages.  We abstract away these special messages to make the
460
        button appear to be like any other peripheral sensor.
461
462
        Examples::
463
464
            @attach(Button, 'hub_btn')
465
466
        Notes:
467
            Since there is no attach I/O message from the hub to trigger the
468
            :func:`activate_updates` method, we instead insert a fake
469
            "attaach" message from this fake sensor on port 255 in the
470
            `BLEventQ.get_messages` method that is used to register for updates
471
            from a given sensor.
472
473
    """
474
    _sensor_id = 0x0005
475
    """Piggy back the hub button off the normal peripheral button id 0x0005.
476
       Might need to change this in the future"""
477
478
    capability = Enum('capability', {'sense_press':0})
479
480
    datasets = { capability.sense_press: (1,1)
481
               }
482
    allowed_combo = [capability.sense_press]
483
484
    def __init__(self, name, port=None, capabilities=[]):
485
        """Call super-class with port set to 255 """
486
        super().__init__(name, 255, capabilities)
487
488
    async def activate_updates(self):
489
        """Use a special Hub Properties button message updates activation message"""
490
        self.value = {}
491
        for cap in self.capabilities:
492
            self.value[cap] = [None]*self.datasets[cap][0]
493
494
        b = [0x00, 0x01, 0x02, 0x02]  # Button reports from "Hub Properties Message Type"
495
        await self.send_message(f'Activate button reports: port {self.port}', b) 
496
497
class DuploTrainMotor(Peripheral):
498
    _sensor_id = 0x0029
499
500
    def __init__(self, name, port=None, capabilities=[]):
501
        """Initialize current speed to 0"""
502
        self.speed = 0
503
        self.ramp_in_progress_task = None
504
        super().__init__(name, port, capabilities)
505
506
507