1 | import pytest |
||
2 | import os, struct, copy |
||
3 | import logging |
||
4 | from asyncio import coroutine |
||
5 | from curio import kernel, sleep |
||
6 | |||
7 | from mock import Mock |
||
8 | from mock import patch, call |
||
9 | from mock import MagicMock |
||
10 | from mock import PropertyMock |
||
11 | |||
12 | from hypothesis import given, example, settings |
||
13 | from hypothesis import strategies as st |
||
14 | |||
15 | from bricknil.sensor.light import * |
||
16 | from bricknil.sensor.motor import * |
||
17 | from bricknil.sensor.sound import * |
||
18 | from bricknil.const import Color |
||
19 | |||
20 | class AsyncMock(MagicMock): |
||
21 | async def __call__(self, *args, **kwargs): |
||
22 | return super(AsyncMock, self).__call__(*args, **kwargs) |
||
23 | |||
24 | class DirectWrite: |
||
25 | def get_bytes(self, port, mode, value): |
||
26 | return [0x00, 0x81, port, 0x01, 0x51, mode, value ] |
||
27 | |||
28 | def get_bytes_for_set_pos(self, port, pos, speed, max_power): |
||
29 | abs_pos = list(struct.pack('i', pos)) |
||
30 | return [0x00, 0x81, port, 0x01, 0x0d] + abs_pos + [speed, max_power, 126, 3] |
||
31 | |||
32 | def get_bytes_for_rotate(self, port, angle, speed, max_power): |
||
33 | angle = list(struct.pack('i',angle)) |
||
34 | return [0x00, 0x81, port, 0x01, 0x0b] + angle + [speed, max_power, 126, 3] |
||
35 | |||
36 | class TestLED: |
||
37 | |||
38 | def setup(self): |
||
39 | self.l = LED(name='led') |
||
40 | self.l.send_message = Mock(side_effect=coroutine(lambda x,y: "the awaitable should return this")) |
||
41 | self.write = DirectWrite() |
||
42 | |||
43 | @pytest.mark.curio |
||
44 | async def test_set_color(self): |
||
45 | port = 10 |
||
46 | self.l.port = port |
||
47 | await self.l.set_color(Color.blue) |
||
48 | self.l.send_message.ask_called_once() |
||
49 | args, kwargs = self.l.send_message.call_args |
||
50 | assert args[1] == self.write.get_bytes(port, 0, Color.blue.value) |
||
51 | |||
52 | class TestLight: |
||
53 | |||
54 | def setup(self): |
||
55 | self.l = Light(name='light') |
||
56 | self.l.send_message = Mock(side_effect=coroutine(lambda x,y: "the awaitable should return this")) |
||
57 | self.write = DirectWrite() |
||
58 | |||
59 | @given( brightness = st.integers(0,100), |
||
60 | port = st.integers(0,255) |
||
61 | ) |
||
62 | def test_set_brightness(self, port, brightness): |
||
63 | self.l.port = port |
||
64 | |||
65 | async def child(): |
||
66 | await self.l.set_brightness(brightness) |
||
67 | kernel.run(child) |
||
68 | |||
69 | self.l.send_message.ask_called_once() |
||
70 | args, kwargs = self.l.send_message.call_args |
||
71 | assert args[1] == self.write.get_bytes(port, 0, brightness) |
||
72 | |||
73 | class TestSpeaker: |
||
74 | |||
75 | def setup(self): |
||
76 | self.l = DuploSpeaker(name='light') |
||
77 | self.l.send_message = Mock(side_effect=coroutine(lambda x,y: "the awaitable should return this")) |
||
78 | self.write = DirectWrite() |
||
79 | |||
80 | @given( sound = st.sampled_from(DuploSpeaker.sounds), |
||
81 | port = st.integers(0,255) |
||
82 | ) |
||
83 | def test_play_sound(self, port, sound): |
||
84 | self.l.port = port |
||
85 | |||
86 | async def child(): |
||
87 | await self.l.play_sound(sound) |
||
88 | kernel.run(child) |
||
89 | |||
90 | self.l.send_message.ask_called_once() |
||
91 | args, kwargs = self.l.send_message.call_args |
||
92 | assert args[1] == self.write.get_bytes(port, 1, sound.value) |
||
93 | |||
94 | @given( port = st.integers(0,255) |
||
95 | ) |
||
96 | def test_activate_updates(self, port): |
||
97 | self.l.port = port |
||
98 | async def child(): |
||
99 | await self.l.activate_updates() |
||
100 | kernel.run(child) |
||
101 | |||
102 | class TestMotor: |
||
103 | |||
104 | def setup(self): |
||
105 | #self.m = TrainMotor(name='motor') |
||
106 | #self.m.send_message = Mock(side_effect=coroutine(lambda x,y: "the awaitable should return this")) |
||
107 | self.write = DirectWrite() |
||
108 | |||
109 | def _create_motor(self, cls): |
||
110 | self.m = cls(name='motor') |
||
111 | self.m.send_message = Mock(side_effect=coroutine(lambda x,y: "the awaitable should return this")) |
||
112 | |||
113 | @given( speed = st.integers(-100,100), |
||
114 | port = st.integers(0,255), |
||
115 | cls = st.sampled_from([TrainMotor, DuploTrainMotor, WedoMotor, |
||
116 | ExternalMotor, InternalMotor]) |
||
117 | ) |
||
118 | def test_set_speed(self, cls, port, speed): |
||
119 | self._create_motor(cls) |
||
120 | self.m.port = port |
||
121 | |||
122 | async def child(): |
||
123 | await self.m.set_speed(speed) |
||
124 | kernel.run(child) |
||
125 | |||
126 | self.m.send_message.ask_called_once() |
||
127 | args, kwargs = self.m.send_message.call_args |
||
128 | assert args[1] == self.write.get_bytes(port, 0, self.m._convert_speed_to_val(speed)) |
||
129 | |||
130 | View Code Duplication | @given( speed = st.integers(-100,100), |
|
0 ignored issues
–
show
Duplication
introduced
by
![]() |
|||
131 | port = st.integers(0,255), |
||
132 | cls = st.sampled_from([TrainMotor, DuploTrainMotor, WedoMotor, |
||
133 | ExternalMotor, InternalMotor]) |
||
134 | ) |
||
135 | def test_ramp_speed(self, cls, port, speed): |
||
136 | self._create_motor(cls) |
||
137 | self.m.port = port |
||
138 | |||
139 | async def child(): |
||
140 | await self.m.ramp_speed(speed, 200) |
||
141 | await self.m.ramp_in_progress_task.join() |
||
142 | async def main(): |
||
143 | t = await spawn(child()) |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
|
|||
144 | await t.join() |
||
145 | assert self.m.speed == speed |
||
146 | kernel.run(main) |
||
147 | |||
148 | View Code Duplication | @given( speed = st.sampled_from([-50,0,100]), |
|
0 ignored issues
–
show
|
|||
149 | port = st.integers(0,255), |
||
150 | cls = st.sampled_from([TrainMotor, DuploTrainMotor, WedoMotor, |
||
151 | ExternalMotor, InternalMotor]) |
||
152 | ) |
||
153 | def test_ramp_cancel_speed(self, cls, port, speed): |
||
154 | self._create_motor(cls) |
||
155 | self.m.port = port |
||
156 | |||
157 | async def child(): |
||
158 | await self.m.ramp_speed(speed, 2000) |
||
159 | await sleep(0.1) |
||
160 | await self.m.set_speed(speed+10) |
||
161 | |||
162 | async def main(): |
||
163 | t = await spawn(child()) |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
|
|||
164 | await t.join() |
||
165 | assert self.m.speed == speed+10 |
||
166 | kernel.run(main) |
||
167 | |||
168 | View Code Duplication | @given( pos = st.integers(-2147483648, 2147483647), |
|
0 ignored issues
–
show
|
|||
169 | port = st.integers(0,255), |
||
170 | cls = st.sampled_from([ExternalMotor, InternalMotor]) |
||
171 | ) |
||
172 | def test_set_pos(self, cls, port, pos): |
||
173 | self._create_motor(cls) |
||
174 | self.m.port = port |
||
175 | speed = 50 |
||
176 | max_power = 50 |
||
177 | |||
178 | async def child(): |
||
179 | await self.m.set_pos(pos, speed, max_power) |
||
180 | |||
181 | async def main(): |
||
182 | t = await spawn(child()) |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
|
|||
183 | await t.join() |
||
184 | kernel.run(main) |
||
185 | |||
186 | args, kwargs = self.m.send_message.call_args |
||
187 | assert args[1] == self.write.get_bytes_for_set_pos(port, pos, self.m._convert_speed_to_val(speed), max_power) |
||
188 | |||
189 | View Code Duplication | @given( angle = st.integers(0, 2147483647), |
|
0 ignored issues
–
show
|
|||
190 | speed = st.integers(-100,100), |
||
191 | port = st.integers(0,255), |
||
192 | cls = st.sampled_from([ExternalMotor, InternalMotor]) |
||
193 | ) |
||
194 | def test_rotate(self, cls, port, angle, speed): |
||
195 | self._create_motor(cls) |
||
196 | self.m.port = port |
||
197 | max_power = 50 |
||
198 | |||
199 | async def child(): |
||
200 | await self.m.rotate(angle, speed, max_power) |
||
201 | |||
202 | async def main(): |
||
203 | t = await spawn(child()) |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
|
|||
204 | await t.join() |
||
205 | kernel.run(main) |
||
206 | |||
207 | args, kwargs = self.m.send_message.call_args |
||
208 | assert args[1] == self.write.get_bytes_for_rotate(port, angle, self.m._convert_speed_to_val(speed), max_power) |
||
209 | |||
210 | def test_port(self): |
||
211 | t = InternalMotor('motor', port=InternalMotor.Port.A) |
||
212 |