GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.

bricknil.sensor.motor   A
last analyzed

Complexity

Total Complexity 13

Size/Duplication

Total Lines 435
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 96
dl 0
loc 435
rs 10
c 0
b 0
f 0
wmc 13

8 Methods

Rating   Name   Duplication   Size   Complexity  
A Motor.set_speed() 0 15 1
A Motor.__init__() 0 4 1
A Motor._cancel_existing_differet_ramp() 0 16 3
A Motor.ramp_speed() 0 32 3
A TachoMotor.ramp_speed2() 0 16 1
A TachoMotor.set_pos() 0 41 1
A InternalMotor.__init__() 0 7 2
A TachoMotor.rotate() 0 34 1
1
# Copyright 2019 Virantha N. Ekanayake 
2
# 
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
# 
7
# http://www.apache.org/licenses/LICENSE-2.0
8
# 
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14
"""All motor related peripherals including base motor classes"""
15
16
from curio import sleep, current_task, spawn  # Needed for motor speed ramp
17
18
from enum import Enum
19
from struct import pack
20
21
from .peripheral import Peripheral
22
23
class Motor(Peripheral):
24
    """Utility class for common functions shared between Train Motors, Internal Motors, and External Motors
25
26
    """
27
    def __init__(self, name, port=None, capabilities=[]):
28
        self.speed = 0  # Initialize current speed to 0
29
        self.ramp_in_progress_task = None
30
        super().__init__(name, port, capabilities)
31
32
    async def set_speed(self, speed):
33
        """ Validate and set the train speed
34
35
            If there is an in-progress ramp, and this command is not part of that ramp, 
36
            then cancel that in-progress ramp first, before issuing this set_speed command.
37
38
            Args:
39
                speed (int) : Range -100 to 100 where negative numbers are reverse.
40
                    Use 0 to put the motor into neutral.
41
                    255 will do a hard brake
42
        """
43
        await self._cancel_existing_differet_ramp()
44
        self.speed = speed
45
        self.message_info(f'Setting speed to {speed}')
46
        await self.set_output(0, self._convert_speed_to_val(speed))
47
        
48
    async def _cancel_existing_differet_ramp(self):
49
        """Cancel the existing speed ramp if it was from a different task
50
51
            Remember that speed ramps must be a task with daemon=True, so there is no 
52
            one awaiting its future.
53
        """
54
        # Check if there's a ramp task in progress
55
        if self.ramp_in_progress_task:
56
            # Check if it's this current task or not
57
            current = await current_task()
58
            if current != self.ramp_in_progress_task:
59
                # We're trying to set the speed 
60
                # outside a previously in-progress ramp, so cancel the previous ramp
61
                await self.ramp_in_progress_task.cancel()
62
                self.ramp_in_progress_task = None
63
                self.message_debug(f'Canceling previous speed ramp in progress')
64
65
66
    async def ramp_speed(self, target_speed, ramp_time_ms):
67
        """Ramp the speed by 10 units in the time given in milliseconds
68
69
        """
70
        TIME_STEP_MS = 100 
71
        await self._cancel_existing_differet_ramp()
72
        assert ramp_time_ms > 100, f'Ramp speed time must be greater than 100ms ({ramp_time_ms}ms used)'
73
74
        # 500ms ramp time, 100ms per step
75
        # Therefore, number of steps = 500/100 = 5
76
        # Therefore speed_step = speed_diff/5
77
        number_of_steps = ramp_time_ms/TIME_STEP_MS
78
        speed_diff = target_speed - self.speed
79
        speed_step = speed_diff/number_of_steps
80
        start_speed = self.speed
81
        self.message_debug(f'ramp_speed steps: {number_of_steps}, speed_diff: {speed_diff}, speed_step: {speed_step}')
82
        current_step = 0
83
        async def _ramp_speed():
84
            nonlocal current_step  # Since this is being assigned to, we need to mark it as coming from the enclosed scope
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable current_step does not seem to be defined.
Loading history...
85
            while current_step < number_of_steps:
86
                next_speed = int(start_speed + current_step*speed_step)
87
                self.message_debug(f'Setting next_speed: {next_speed}')
88
                current_step +=1 
89
                if current_step == number_of_steps: 
90
                    next_speed = target_speed
91
                await self.set_speed(next_speed)
92
                await sleep(TIME_STEP_MS/1000)
93
            await self.set_speed(target_speed)
94
            self.ramp_in_progress_task = None
95
96
        self.message_debug(f'Starting ramp of speed: {start_speed} -> {target_speed} ({ramp_time_ms/1000}s)')
97
        self.ramp_in_progress_task = await spawn(_ramp_speed, daemon = True)
98
99
class TachoMotor(Motor):
100
101
    capability = Enum("capability", {"sense_speed":1, "sense_pos":2})
102
103
    datasets = { 
104
                 capability.sense_speed: (1, 1),
105
                 capability.sense_pos: (1, 4),
106
                }
107
    """ Dict of (num_datasets, bytes_per_dataset).
108
       `sense_speed` (1-byte), and `sense_pos` (uint32)"""
109
110
    allowed_combo = [ capability.sense_speed,
111
                      capability.sense_pos,
112
                    ]
113
114
    async def set_pos(self, pos, speed=50, max_power=50):
115
        """Set the absolute position of the motor
116
117
           Everytime the hub is powered up, the zero-angle reference will be reset to the
118
           motor's current position. When you issue this command, the motor will rotate to 
119
           the position given in degrees.  The sign of the pos tells you which direction to rotate:
120
           (1) a positive number will rotate clockwise as looking from end of shaft towards the motor,
121
           (2) a negative number will rotate counter-clockwise
122
123
124
           Examples::
125
126
              await self.motor.set_pos(90)   # Rotate 90 degrees clockwise (looking from end of shaft towards motor)
127
              await self.motor.set_pos(-90)  # Rotate conter-clockwise 90 degrees
128
              await self.motor.set_pos(720)  # Rotate two full circles clockwise
129
130
           Args:
131
              pos (int) : Absolute position in degrees.
132
              speed (int) : Absolute value from 0-100
133
              max_power (int):  Max percentage power that will be applied (0-100%)
134
135
           Notes: 
136
137
               Use command GotoAbsolutePosition
138
                * 0x00 = hub id
139
                * 0x81 = Port Output command
140
                * port
141
                * 0x11 = Upper nibble (0=buffer, 1=immediate execution), Lower nibble (0=No ack, 1=command feedback)
142
                * 0x0d = Subcommand
143
                * abs_pos (int32)
144
                * speed -100 - 100
145
                * max_power abs(0-100%)
146
                * endstate = 0 (float), 126 (hold), 127 (brake)
147
                * Use Accel profile = (bit 0 = acc profile, bit 1 = decc profile)
148
                *
149
        """
150
        abs_pos = list(pack('i', pos))
151
        speed = self._convert_speed_to_val(speed)
152
153
        b = [0x00, 0x81, self.port, 0x01, 0x0d] + abs_pos + [speed, max_power, 126, 3]
154
        await self.send_message(f'set pos {pos} with speed {speed}', b)
155
156
157
    async def rotate(self, degrees, speed, max_power=50):
158
        """Rotate the given number of degrees from current position, with direction given by sign of speed
159
160
           Examples::
161
162
              await self.motor.rotate(90, speed=50)   # Rotate 90 degrees clockwise (looking from end of shaft towards motor)
163
              await self.motor.set_pos(90, speed=-50)  # Rotate conter-clockwise 90 degrees
164
              await self.motor.set_pos(720, speed=50)  # Rotate two full circles clockwise
165
166
           Args:
167
              degrees (uint) : Relative number of degrees to rotate
168
              speed (int) : -100 to 100
169
              max_power (int):  Max percentage power that will be applied (0-100%)
170
171
           Notes: 
172
173
               Use command StartSpeedForDegrees
174
                * 0x00 = hub id
175
                * 0x81 = Port Output command
176
                * port
177
                * 0x11 = Upper nibble (0=buffer, 1=immediate execution), Lower nibble (0=No ack, 1=command feedback)
178
                * 0x0b = Subcommand
179
                * degrees (int32) 0..1000000
180
                * speed -100 - 100%
181
                * max_power abs(0-100%)
182
                * endstate = 0 (float), 126 (hold), 127 (brake)
183
                * Use Accel profile = (bit 0 = acc profile, bit 1 = decc profile)
184
                *
185
        """
186
        degrees = list(pack('i', degrees))
187
        speed = self._convert_speed_to_val(speed)
188
189
        b = [0x00, 0x81, self.port, 0x01, 0x0b] + degrees + [speed, max_power, 126, 3]
190
        await self.send_message(f'rotate {degrees} deg with speed {speed}', b)
191
192
193
    async def ramp_speed2(self, target_speed, ramp_time_ms): # pragma: no cover
194
        """Experimental function, not implemented yet DO NOT USE
195
        """
196
        # Set acceleration profile
197
        delta_speed = target_speed - self.speed
198
        zero_100_ramp_time_ms = int(ramp_time_ms/delta_speed * 100.0) 
199
        zero_100_ramp_time_ms = zero_100_ramp_time_ms % 10000 # limit time to 10s
200
201
        hi = (zero_100_ramp_time_ms >> 8) & 255
202
        lo = zero_100_ramp_time_ms & 255
203
204
        profile = 1
205
        b = [0x00, 0x81, self.port, 0x01, 0x05, 10, 10, profile]
206
        await self.send_message(f'set accel profile {zero_100_ramp_time_ms} {hi} {lo} ', b)
207
        b = [0x00, 0x81, self.port, 0x01, 0x07, self._convert_speed_to_val(target_speed), 80, 1]
208
        await self.send_message('set speed', b)
209
210
211
class InternalMotor(TachoMotor):
212
    """ Access the internal motor(s) in the Boost Move Hub.
213
214
        Unlike the train motors, these motors (as well as the stand-alone Boost
215
        motors :class:`ExternalMotor`) have a built-in sensor/tachometer for sending back
216
        the motor's current speed and position.  However, you don't need to use the
217
        sensors, and can treat this motor strictly as an output device.
218
219
        Examples::
220
221
            # Basic connection to the motor on Port A
222
            @attach(InternalMotor, name='left_motor', port=InternalMotor.Port.A)
223
224
            # Basic connection to both motors at the same time (virtual I/O port).
225
            # Any speed command will cause both motors to rotate at the same speed
226
            @attach(InternalMotor, name='motors', port=InternalMotor.Port.AB)
227
228
            # Report back when motor speed changes. You must have a motor_change method defined 
229
            @attach(InternalMotor, name='motor', port=InternalMotor.Port.A, capabilities=['sense_speed'])
230
231
            # Only report back when speed change exceeds 5 units
232
            @attach(InternalMotor, name='motors', port=InternalMotor.Port.A, capabilities=[('sense_speed', 5)])
233
234
        And within the run body you can control the motor output::
235
            await self.motor.set_speed(50)   # Setting the speed
236
            await self.motor.ramp_speed(80, 2000)  # Ramp speed to 80 over 2 seconds
237
            await self.motor.set_pos(90, speed=20) # Turn clockwise to 3 o'clock position
238
            await self.motor.rotate(60, speed=-50) # Turn 60 degrees counter-clockwise from current position
239
240
        See Also:
241
            * :class:`TrainMotor` for connecting to a train motor
242
            * :class:`ExternalMotor` for connecting to a Boost tacho motor
243
244
    """
245
    _sensor_id = 0x0027
246
    _DEFAULT_THRESHOLD=2
247
    """Set to 2 to avoid a lot of updates since the speed seems to oscillate a lot"""
248
249
    Port = Enum('Port', 'A B AB', start=0)
250
    """Address either motor A or Motor B, or both AB at the same time"""
251
252
    def __init__(self, name, port=None, capabilities=[]):
253
        """Maps the port names `A`, `B`, `AB` to hard-coded port numbers"""
254
        if port:
255
            port_map = [55, 56, 57]
256
            port = port_map[port.value]
257
        self.speed = 0
258
        super().__init__(name, port, capabilities)
259
    
260
        
261
class ExternalMotor(TachoMotor):
262
    """ Access the stand-alone Boost motors
263
264
        These are similar to the :class:`InternalMotor` with build-in tachometer and
265
        sensor for sending back the motor's current speed and position.  You
266
        don't need to use the sensors, and can treat this as strictly an
267
        output.
268
269
        Examples::
270
271
            # Basic connection to the motor on Port A
272
            @attach(ExternalMotor, name='motor')
273
274
            # Report back when motor speed changes. You must have a motor_change method defined 
275
            @attach(ExternalMotor, name='motor', capabilities=['sense_speed'])
276
277
            # Only report back when speed change exceeds 5 units, and position changes (degrees)
278
            @attach(ExternalMotor, name='motor', capabilities=[('sense_speed', 5), 'sense_pos'])
279
280
        And then within the run body::
281
282
            await self.motor.set_speed(50)   # Setting the speed
283
            await self.motor.ramp_speed(80, 2000)  # Ramp speed to 80 over 2 seconds
284
            await self.motor.set_pos(90, speed=20) # Turn clockwise to 3 o'clock position
285
            await self.motor.rotate(60, speed=-50) # Turn 60 degrees counter-clockwise from current position
286
287
        See Also:
288
            * :class:`TrainMotor` for connecting to a train motor
289
            * :class:`InternalMotor` for connecting to the Boost hub built-in motors
290
291
    """
292
293
    _sensor_id = 0x26
294
295
296
class CPlusLargeMotor(TachoMotor):
297
    """ Access the Technic Control Plus Large motors
298
299
        These are similar to the :class:`InternalMotor` with build-in tachometer and
300
        sensor for sending back the motor's current speed and position.  You
301
        don't need to use the sensors, and can treat this as strictly an
302
        output.
303
304
        Examples::
305
306
            # Basic connection to the motor on Port A
307
            @attach(CPlusLargeMotor, name='motor')
308
309
            # Report back when motor speed changes. You must have a motor_change method defined 
310
            @attach(CPlusLargeMotor, name='motor', capabilities=['sense_speed'])
311
312
            # Only report back when speed change exceeds 5 units, and position changes (degrees)
313
            @attach(CPlusLargeMotor, name='motor', capabilities=[('sense_speed', 5), 'sense_pos'])
314
315
        And then within the run body::
316
317
            await self.motor.set_speed(50)   # Setting the speed
318
            await self.motor.ramp_speed(80, 2000)  # Ramp speed to 80 over 2 seconds
319
            await self.motor.set_pos(90, speed=20) # Turn clockwise to 3 o'clock position
320
            await self.motor.rotate(60, speed=-50) # Turn 60 degrees counter-clockwise from current position
321
322
        See Also:
323
            * :class:`TrainMotor` for connecting to a train motor
324
            * :class:`InternalMotor` for connecting to the Boost hub built-in motors
325
326
    """
327
328
    _sensor_id = 0x2E
329
330
331
class CPlusXLMotor(TachoMotor):
332
    """ Access the Technic Control Plus XL motors
333
334
        These are similar to the :class:`InternalMotor` with build-in tachometer and
335
        sensor for sending back the motor's current speed and position.  You
336
        don't need to use the sensors, and can treat this as strictly an
337
        output.
338
339
        Examples::
340
341
            # Basic connection to the motor on Port A
342
            @attach(CPlusXLMotor, name='motor')
343
344
            # Report back when motor speed changes. You must have a motor_change method defined 
345
            @attach(CPlusXLMotor, name='motor', capabilities=['sense_speed'])
346
347
            # Only report back when speed change exceeds 5 units, and position changes (degrees)
348
            @attach(CPlusXLMotor, name='motor', capabilities=[('sense_speed', 5), 'sense_pos'])
349
350
        And then within the run body::
351
352
            await self.motor.set_speed(50)   # Setting the speed
353
            await self.motor.ramp_speed(80, 2000)  # Ramp speed to 80 over 2 seconds
354
            await self.motor.set_pos(90, speed=20) # Turn clockwise to 3 o'clock position
355
            await self.motor.rotate(60, speed=-50) # Turn 60 degrees counter-clockwise from current position
356
357
        See Also:
358
            * :class:`TrainMotor` for connecting to a train motor
359
            * :class:`InternalMotor` for connecting to the Boost hub built-in motors
360
361
    """
362
363
    _sensor_id = 0x2F
364
365
366
class TrainMotor(Motor):
367
    """
368
        Connects to the train motors.
369
370
        TrainMotor has no sensing capabilities and only supports a single output mode that
371
        sets the speed.
372
373
        Examples::
374
375
             @attach(TrainMotor, name='train')
376
377
        And then within the run body, use::
378
379
            await self.train.set_speed(speed)
380
381
        Attributes:
382
            speed (int) : Keep track of the current speed in order to ramp it
383
384
        See Also:
385
            :class:`InternalMotor`
386
    """
387
    _sensor_id = 0x0002
388
389
class WedoMotor(Motor):
390
    """
391
        Connects to the Wedo motors.
392
393
        WedoMotor has no sensing capabilities and only supports a single output mode that
394
        sets the speed.
395
396
        Examples::
397
398
             @attach(WedoMotor, name='motor')
399
400
        And then within the run body, use::
401
402
            await self.motor.set_speed(speed)
403
404
        Attributes:
405
            speed (int) : Keep track of the current speed in order to ramp it
406
407
        See Also:
408
            * :class:`InternalMotor`
409
            * :class:`TrainMotor`
410
    """
411
    _sensor_id = 0x0001
412
413
class DuploTrainMotor(Motor):
414
    """Train Motor on Duplo Trains
415
416
       Make sure that the train is sitting on the ground (the front wheels need to keep rotating) in 
417
       order to keep the train motor powered.  If you pick up the train, the motor will stop operating
418
       withina few seconds.
419
420
       Examples::
421
422
            @attach(DuploTrainMotor, name='motor')
423
424
       And then within the run body, use::
425
426
            await self.train.set_speed(speed)
427
428
       Attributes:
429
            speed (int): Keep track of the current speed in order to ramp it
430
431
       See Also:
432
            :class:`TrainMotor` for connecting to a PoweredUp train motor
433
    """
434
    _sensor_id = 0x0029
435
436