GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Passed
Push — master ( a3394c...b99246 )
by Virantha
01:31
created

bricknil.sensor.motor.TachoMotor.set_pos()   A

Complexity

Conditions 1

Size

Total Lines 41
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 5
nop 4
dl 0
loc 41
rs 10
c 0
b 0
f 0
1
# Copyright 2019 Virantha N. Ekanayake 
2
# 
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
# 
7
# http://www.apache.org/licenses/LICENSE-2.0
8
# 
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14
"""All motor related peripherals including base motor classes"""
15
16
from curio import sleep, current_task, spawn  # Needed for motor speed ramp
17
18
from enum import Enum
19
from struct import pack
20
21
from .peripheral import Peripheral
22
23
class Motor(Peripheral):
24
    """Utility class for common functions shared between Train Motors, Internal Motors, and External Motors
25
26
    """
27
    def __init__(self, name, port=None, capabilities=[]):
28
        self.speed = 0  # Initialize current speed to 0
29
        self.ramp_in_progress_task = None
30
        super().__init__(name, port, capabilities)
31
32
    async def set_speed(self, speed):
33
        """ Validate and set the train speed
34
35
            If there is an in-progress ramp, and this command is not part of that ramp, 
36
            then cancel that in-progress ramp first, before issuing this set_speed command.
37
38
            Args:
39
                speed (int) : Range -100 to 100 where negative numbers are reverse.
40
                    Use 0 to put the motor into neutral.
41
                    255 will do a hard brake
42
        """
43
        await self._cancel_existing_differet_ramp()
44
        self.speed = speed
45
        self.message_info(f'Setting speed to {speed}')
46
        await self.set_output(0, self._convert_speed_to_val(speed))
47
        
48
    async def _cancel_existing_differet_ramp(self):
49
        """Cancel the existing speed ramp if it was from a different task
50
51
            Remember that speed ramps must be a task with daemon=True, so there is no 
52
            one awaiting its future.
53
        """
54
        # Check if there's a ramp task in progress
55
        if self.ramp_in_progress_task:
56
            # Check if it's this current task or not
57
            current = await current_task()
58
            if current != self.ramp_in_progress_task:
59
                # We're trying to set the speed 
60
                # outside a previously in-progress ramp, so cancel the previous ramp
61
                await self.ramp_in_progress_task.cancel()
62
                self.ramp_in_progress_task = None
63
                self.message_debug(f'Canceling previous speed ramp in progress')
64
65
66
    async def ramp_speed(self, target_speed, ramp_time_ms):
67
        """Ramp the speed by 10 units in the time given in milliseconds
68
69
        """
70
        TIME_STEP_MS = 100 
71
        await self._cancel_existing_differet_ramp()
72
        assert ramp_time_ms > 100, f'Ramp speed time must be greater than 100ms ({ramp_time_ms}ms used)'
73
74
        # 500ms ramp time, 100ms per step
75
        # Therefore, number of steps = 500/100 = 5
76
        # Therefore speed_step = speed_diff/5
77
        number_of_steps = ramp_time_ms/TIME_STEP_MS
78
        speed_diff = target_speed - self.speed
79
        speed_step = speed_diff/number_of_steps
80
        start_speed = self.speed
81
        self.message_debug(f'ramp_speed steps: {number_of_steps}, speed_diff: {speed_diff}, speed_step: {speed_step}')
82
        current_step = 0
83
        async def _ramp_speed():
84
            nonlocal current_step  # Since this is being assigned to, we need to mark it as coming from the enclosed scope
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable current_step does not seem to be defined.
Loading history...
85
            while current_step < number_of_steps:
86
                next_speed = int(start_speed + current_step*speed_step)
87
                self.message_debug(f'Setting next_speed: {next_speed}')
88
                current_step +=1 
89
                if current_step == number_of_steps: 
90
                    next_speed = target_speed
91
                await self.set_speed(next_speed)
92
                await sleep(TIME_STEP_MS/1000)
93
            self.ramp_in_progress_task = None
94
95
        self.message_debug(f'Starting ramp of speed: {start_speed} -> {target_speed} ({ramp_time_ms/1000}s)')
96
        self.ramp_in_progress_task = await spawn(_ramp_speed, daemon = True)
97
98
class TachoMotor(Motor):
99
100
    capability = Enum("capability", {"sense_speed":1, "sense_pos":2})
101
102
    datasets = { 
103
                 capability.sense_speed: (1, 1),
104
                 capability.sense_pos: (1, 4),
105
                }
106
    """ Dict of (num_datasets, bytes_per_dataset).
107
       `sense_speed` (1-byte), and `sense_pos` (uint32)"""
108
109
    allowed_combo = [ capability.sense_speed,
110
                      capability.sense_pos,
111
                    ]
112
113
    async def set_pos(self, pos, speed=50, max_power=50):
114
        """Set the absolute position of the motor
115
116
           Everytime the hub is powered up, the zero-angle reference will be reset to the
117
           motor's current position. When you issue this command, the motor will rotate to 
118
           the position given in degrees.  The sign of the pos tells you which direction to rotate:
119
           (1) a positive number will rotate clockwise as looking from end of shaft towards the motor,
120
           (2) a negative number will rotate counter-clockwise
121
122
123
           Examples::
124
125
              await self.motor.set_pos(90)   # Rotate 90 degrees clockwise (looking from end of shaft towards motor)
126
              await self.motor.set_pos(-90)  # Rotate conter-clockwise 90 degrees
127
              await self.motor.set_pos(720)  # Rotate two full circles clockwise
128
129
           Args:
130
              pos (int) : Absolute position in degrees.
131
              speed (int) : Absolute value from 0-100
132
              max_power (int):  Max percentage power that will be applied (0-100%)
133
134
           Notes: 
135
136
               Use command GotoAbsolutePosition
137
                * 0x00 = hub id
138
                * 0x81 = Port Output command
139
                * port
140
                * 0x11 = Upper nibble (0=buffer, 1=immediate execution), Lower nibble (0=No ack, 1=command feedback)
141
                * 0x0d = Subcommand
142
                * abs_pos (int32)
143
                * speed -100 - 100
144
                * max_power abs(0-100%)
145
                * endstate = 0 (float), 126 (hold), 127 (brake)
146
                * Use Accel profile = (bit 0 = acc profile, bit 1 = decc profile)
147
                *
148
        """
149
        abs_pos = list(struct.pack('i', pos))
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable struct does not seem to be defined.
Loading history...
150
        speed = self._convert_speed_to_val(speed)
151
152
        b = [0x00, 0x81, self.port, 0x01, 0x0d] + abs_pos + [speed, max_power, 126, 3]
153
        await self.send_message(f'set pos {pos} with speed {speed}', b)
154
155
156
    async def rotate(self, degrees, speed, max_power=50):
157
        """Rotate the given number of degrees from current position, with direction given by sign of speed
158
159
           Examples::
160
161
              await self.motor.rotate(90, speed=50)   # Rotate 90 degrees clockwise (looking from end of shaft towards motor)
162
              await self.motor.set_pos(90, speed=-50)  # Rotate conter-clockwise 90 degrees
163
              await self.motor.set_pos(720, speed=50)  # Rotate two full circles clockwise
164
165
           Args:
166
              degrees (uint) : Relative number of degrees to rotate
167
              speed (int) : -100 to 100
168
              max_power (int):  Max percentage power that will be applied (0-100%)
169
170
           Notes: 
171
172
               Use command StartSpeedForDegrees
173
                * 0x00 = hub id
174
                * 0x81 = Port Output command
175
                * port
176
                * 0x11 = Upper nibble (0=buffer, 1=immediate execution), Lower nibble (0=No ack, 1=command feedback)
177
                * 0x0b = Subcommand
178
                * degrees (int32) 0..1000000
179
                * speed -100 - 100%
180
                * max_power abs(0-100%)
181
                * endstate = 0 (float), 126 (hold), 127 (brake)
182
                * Use Accel profile = (bit 0 = acc profile, bit 1 = decc profile)
183
                *
184
        """
185
        degrees = list(struct.pack('i', degrees))
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable struct does not seem to be defined.
Loading history...
186
        speed = self._convert_speed_to_val(speed)
187
188
        b = [0x00, 0x81, self.port, 0x01, 0x0b] + degrees + [speed, max_power, 126, 3]
189
        await self.send_message(f'rotate {degrees} deg with speed {speed}', b)
190
191
192
    async def ramp_speed2(self, target_speed, ramp_time_ms):
193
        """Experimental function, not implemented yet DO NOT USE
194
        """
195
        # Set acceleration profile
196
        delta_speed = target_speed - self.speed
197
        zero_100_ramp_time_ms = int(ramp_time_ms/delta_speed * 100.0) 
198
        zero_100_ramp_time_ms = zero_100_ramp_time_ms % 10000 # limit time to 10s
199
200
        hi = (zero_100_ramp_time_ms >> 8) & 255
201
        lo = zero_100_ramp_time_ms & 255
202
203
        profile = 1
204
        b = [0x00, 0x81, self.port, 0x01, 0x05, 10, 10, profile]
205
        await self.send_message(f'set accel profile {zero_100_ramp_time_ms} {hi} {lo} ', b)
206
        b = [0x00, 0x81, self.port, 0x01, 0x07, self._convert_speed_to_val(target_speed), 80, 1]
207
        await self.send_message('set speed', b)
208
209
210
class InternalMotor(TachoMotor):
211
    """ Access the internal motor(s) in the Boost Move Hub.
212
213
        Unlike the train motors, these motors (as well as the stand-alone Boost
214
        motors :class:`ExternalMotor`) have a built-in sensor/tachometer for sending back
215
        the motor's current speed and position.  However, you don't need to use the
216
        sensors, and can treat this motor strictly as an output device.
217
218
        Examples::
219
220
            # Basic connection to the motor on Port A
221
            @attach(InternalMotor, name='left_motor', port=InternalMotor.Port.A)
222
223
            # Basic connection to both motors at the same time (virtual I/O port).
224
            # Any speed command will cause both motors to rotate at the same speed
225
            @attach(InternalMotor, name='motors', port=InternalMotor.Port.AB)
226
227
            # Report back when motor speed changes. You must have a motor_change method defined 
228
            @attach(InternalMotor, name='motor', port=InternalMotor.Port.A, capabilities=['sense_speed'])
229
230
            # Only report back when speed change exceeds 5 units
231
            @attach(InternalMotor, name='motors', port=InternalMotor.Port.A, capabilities=[('sense_speed', 5)])
232
233
        And within the run body you can control the motor output::
234
            await self.motor.set_speed(50)   # Setting the speed
235
            await self.motor.ramp_speed(80, 2000)  # Ramp speed to 80 over 2 seconds
236
            await self.motor.set_pos(90, speed=20) # Turn clockwise to 3 o'clock position
237
            await self.motor.rotate(60, speed=-50) # Turn 60 degrees counter-clockwise from current position
238
239
        See Also:
240
            * :class:`TrainMotor` for connecting to a train motor
241
            * :class:`ExternalMotor` for connecting to a Boost tacho motor
242
243
    """
244
    _sensor_id = 0x0027
245
    _DEFAULT_THRESHOLD=2
246
    """Set to 2 to avoid a lot of updates since the speed seems to oscillate a lot"""
247
248
    Port = Enum('Port', 'A B AB', start=0)
249
    """Address either motor A or Motor B, or both AB at the same time"""
250
251
    def __init__(self, name, port=None, capabilities=[]):
252
        """Maps the port names `A`, `B`, `AB` to hard-coded port numbers"""
253
        if port:
254
            port_map = [55, 56, 57]
255
            port = port_map[port.value]
256
        self.speed = 0
257
        super().__init__(name, port, capabilities)
258
    
259
        
260
class ExternalMotor(TachoMotor):
261
    """ Access the stand-alone Boost motors
262
263
        These are similar to the :class:`InternalMotor` with build-in tachometer and
264
        sensor for sending back the motor's current speed and position.  You
265
        don't need to use the sensors, and can treat this as strictly an
266
        output.
267
268
        Examples::
269
270
            # Basic connection to the motor on Port A
271
            @attach(ExternalMotor, name='motor')
272
273
            # Report back when motor speed changes. You must have a motor_change method defined 
274
            @attach(ExternalMotor, name='motor', capabilities=['sense_speed'])
275
276
            # Only report back when speed change exceeds 5 units, and position changes (degrees)
277
            @attach(ExternalMotor, name='motor', capabilities=[('sense_speed', 5), 'sense_pos'])
278
279
        And then within the run body::
280
281
            await self.motor.set_speed(50)   # Setting the speed
282
            await self.motor.ramp_speed(80, 2000)  # Ramp speed to 80 over 2 seconds
283
            await self.motor.set_pos(90, speed=20) # Turn clockwise to 3 o'clock position
284
            await self.motor.rotate(60, speed=-50) # Turn 60 degrees counter-clockwise from current position
285
286
        See Also:
287
            * :class:`TrainMotor` for connecting to a train motor
288
            * :class:`InternalMotor` for connecting to the Boost hub built-in motors
289
290
    """
291
292
    _sensor_id = 0x26
293
294
295
class TrainMotor(Motor):
296
    """
297
        Connects to the train motors.
298
299
        TrainMotor has no sensing capabilities and only supports a single output mode that
300
        sets the speed.
301
302
        Examples::
303
304
             @attach(TrainMotor, name='train')
305
306
        And then within the run body, use::
307
308
            await self.train.set_speed(speed)
309
310
        Attributes:
311
            speed (int) : Keep track of the current speed in order to ramp it
312
313
        See Also:
314
            :class:`InternalMotor`
315
    """
316
    _sensor_id = 0x0002
317
318
class WedoMotor(Motor):
319
    """
320
        Connects to the Wedo motors.
321
322
        WedoMotor has no sensing capabilities and only supports a single output mode that
323
        sets the speed.
324
325
        Examples::
326
327
             @attach(WedoMotor, name='motor')
328
329
        And then within the run body, use::
330
331
            await self.motor.set_speed(speed)
332
333
        Attributes:
334
            speed (int) : Keep track of the current speed in order to ramp it
335
336
        See Also:
337
            * :class:`InternalMotor`
338
            * :class:`TrainMotor`
339
    """
340
    _sensor_id = 0x0001
341
342
class DuploTrainMotor(Motor):
343
    """Train Motor on Duplo Trains
344
345
       Make sure that the train is sitting on the ground (the front wheels need to keep rotating) in 
346
       order to keep the train motor powered.  If you pick up the train, the motor will stop operating
347
       withina few seconds.
348
349
       Examples::
350
351
            @attach(DuploTrainMotor, name='motor')
352
353
       And then within the run body, use::
354
355
            await self.train.set_speed(speed)
356
357
       Attributes:
358
            speed (int): Keep track of the current speed in order to ramp it
359
360
       See Also:
361
            :class:`TrainMotor` for connecting to a PoweredUp train motor
362
    """
363
    _sensor_id = 0x0029
364
365