1 | import pytest |
||
2 | import os, struct, copy |
||
3 | import logging |
||
4 | |||
5 | from mock import Mock |
||
6 | from mock import patch, call |
||
7 | from mock import MagicMock |
||
8 | from mock import PropertyMock |
||
9 | |||
10 | from hypothesis import given, example, settings |
||
11 | from hypothesis import strategies as st |
||
12 | |||
13 | from bricknil.message_dispatch import MessageDispatch |
||
14 | from bricknil.messages import UnknownMessageError, HubPropertiesMessage |
||
15 | from bricknil.const import DEVICES |
||
16 | |||
17 | class TestMessages: |
||
18 | |||
19 | def setup(self): |
||
20 | # Create the main dispatch |
||
21 | self.hub = MagicMock() |
||
22 | self.m = MessageDispatch(self.hub) |
||
23 | |||
24 | def _with_header(self, msg:bytearray): |
||
25 | l = len(msg)+2 |
||
26 | assert l<127 |
||
27 | return bytearray([l, 0]+list(msg)) |
||
28 | |||
29 | @given(st.data()) |
||
30 | def test_port_value_message(self, data): |
||
31 | port = data.draw(st.integers(0,255)) |
||
32 | width = data.draw(st.integers(1,3)) |
||
33 | nbytes = 1<<(width-1) |
||
34 | values = data.draw(st.lists(st.integers(0,255),min_size=nbytes,max_size=nbytes )) |
||
35 | msg_type = 0x45 |
||
36 | msg = bytearray([msg_type, port]+values) |
||
37 | l = self.m.parse(self._with_header(msg)) |
||
38 | self.hub.peripheral_queue.put.assert_called_with(('value_change', (port,values))) |
||
39 | |||
40 | @given(port=st.integers(0,255), |
||
41 | mode_ptr=st.integers(0, 0xffff), |
||
42 | mode_data=st.lists(st.integers(0,255), min_size=1, max_size=100), |
||
43 | ) |
||
44 | def test_port_combo_value_message(self, port, mode_ptr, mode_data): |
||
45 | msg_type = 0x46 |
||
46 | mptr = struct.pack('H', mode_ptr) |
||
47 | msg = bytearray([msg_type, int(port)])+mptr+bytearray(mode_data) |
||
48 | l = self.m.parse(self._with_header(msg)) |
||
49 | assert l==f'Port {port} changed combo value to {list(msg[2:])}' |
||
50 | self.hub.peripheral_queue.put.assert_called_with(('value_change', (port,list(msg[2:])))) |
||
51 | |||
52 | @given(prop=st.integers(0,255), |
||
53 | op = st.integers(0,255), |
||
54 | msg_data=st.lists(st.integers(0,255), min_size=1, max_size=100), |
||
55 | ) |
||
56 | @example(prop=0, op=1, msg_data=[0]) |
||
57 | @example(prop=0, op=0, msg_data=[0]) |
||
58 | @example(prop=2, op=6, msg_data=[0]) |
||
59 | def test_hub_properties_message(self, prop, op, msg_data): |
||
60 | msg_type = 0x01 |
||
61 | msg = bytearray([msg_type, prop, op]+msg_data) |
||
62 | msg = self._with_header(msg) |
||
63 | msg_original = self.m._parse_msg_bytes(list(msg)) |
||
64 | |||
65 | if prop not in list(range(1,16)): |
||
66 | l = self.m.parse(msg) |
||
67 | assert l == f'Hub property: {msg_original}' |
||
68 | else: |
||
69 | if op not in list(range(1,7)): |
||
70 | l = self.m.parse(msg) |
||
71 | assert l == f'Hub property: {HubPropertiesMessage.prop_names[prop]} {msg_original}' |
||
72 | else: |
||
73 | l = self.m.parse(msg) |
||
74 | remaining = self.m._parse_msg_bytes(list(msg[5:])) |
||
75 | if prop==0x02 and op==0x06: |
||
76 | self.hub.peripheral_queue.put.assert_called_with(('value_change', (255,list(msg[5:])))) |
||
77 | else: |
||
78 | assert l == f'Hub property: {HubPropertiesMessage.prop_names[prop]} {HubPropertiesMessage.operation_names[op]} {remaining}' |
||
79 | |||
80 | |||
81 | @given( event=st.integers(0,2), |
||
82 | port=st.integers(0,255), |
||
83 | data=st.data() |
||
84 | ) |
||
85 | def test_attach_message(self, data, port, event): |
||
86 | msg_type = 0x04 |
||
87 | msg = bytearray([msg_type, port, event]) |
||
88 | if event == 0: #detach |
||
89 | l = self.m.parse(self._with_header(msg)) |
||
90 | assert l == f'Detached IO Port:{port}' |
||
91 | elif event == 1: #attach |
||
92 | # Need 10 bytes |
||
93 | #dev_id = data.draw(st.integers(0,255)) |
||
94 | dev_id = data.draw(st.sampled_from(sorted(DEVICES.keys()))) |
||
95 | fw_version = data.draw(st.lists(st.integers(0,255), min_size=8, max_size=8)) |
||
96 | msg = msg + bytearray([dev_id, 0])+ bytearray(fw_version) |
||
97 | l = self.m.parse(self._with_header(msg)) |
||
98 | self.hub.peripheral_queue.put.assert_any_call(('update_port', (port, self.m.port_info[port]))) |
||
99 | self.hub.peripheral_queue.put.assert_any_call(('port_detected', port)) |
||
100 | # ALso need to make sure the port info is added to dispatch |
||
101 | assert self.m.port_info[port]['name'] == DEVICES[dev_id] |
||
102 | elif event == 2: # virtual attach |
||
103 | dev_id = data.draw(st.sampled_from(sorted(DEVICES.keys()))) |
||
104 | v_port_a = data.draw(st.integers(0,255)) |
||
105 | v_port_b = data.draw(st.integers(0,255)) |
||
106 | msg = msg + bytearray([dev_id, 0, v_port_a, v_port_b]) |
||
107 | l = self.m.parse(self._with_header(msg)) |
||
108 | self.hub.peripheral_queue.put.assert_any_call(('update_port', (port, self.m.port_info[port]))) |
||
109 | self.hub.peripheral_queue.put.assert_any_call(('port_detected', port)) |
||
110 | assert l == f'Attached VirtualIO Port:{port} {self.m.port_info[port]["name"]} Port A: {v_port_a}, Port B: {v_port_b}' |
||
111 | assert self.m.port_info[port]['virtual'] == (v_port_a, v_port_b) |
||
112 | assert self.m.port_info[port]['name'] == DEVICES[dev_id] |
||
113 | |||
114 | @given( mode = st.integers(1,2), |
||
115 | port = st.integers(0,255), |
||
116 | data = st.data() |
||
117 | ) |
||
118 | def test_port_information_message(self, data, port, mode): |
||
119 | msg_type = 0x43 |
||
120 | if mode == 1: |
||
121 | capabilities = data.draw(st.integers(0,15)) # bit mask of 4 bits |
||
122 | nmodes = data.draw(st.integers(0,255)) |
||
123 | input_modes = [data.draw(st.integers(0,255)), data.draw(st.integers(0,255))] |
||
124 | output_modes = [data.draw(st.integers(0,255)), data.draw(st.integers(0,255))] |
||
125 | msg = bytearray([msg_type, port, mode, capabilities, nmodes]+input_modes+output_modes) |
||
126 | l = self.m.parse(self._with_header(msg)) |
||
127 | |||
128 | self.hub.peripheral_queue.put.assert_any_call(('update_port', (port, self.m.port_info[port]))) |
||
129 | self.hub.peripheral_queue.put.assert_any_call(('port_info_received', port)) |
||
130 | |||
131 | # Make sure the proper capabilities have been set |
||
132 | bitmask = ['output', 'input', 'combinable', 'synchronizable'] # capabilities |
||
133 | for i,cap in enumerate(bitmask): |
||
134 | assert self.m.port_info[port][cap] == capabilities & (1<<i) |
||
135 | |||
136 | for i in range(8): |
||
137 | if input_modes[0] & (1<<i): |
||
138 | assert self.m.port_info[port]['modes'][i]['input'] |
||
139 | if input_modes[1] & (1<<i): |
||
140 | assert self.m.port_info[port]['modes'][i+8]['input'] |
||
141 | if output_modes[0] & (1<<i): |
||
142 | assert self.m.port_info[port]['modes'][i]['output'] |
||
143 | if output_modes[1] & (1<<i): |
||
144 | assert self.m.port_info[port]['modes'][i+8]['output'] |
||
145 | |||
146 | elif mode == 2: |
||
147 | # Combination info |
||
148 | # Up to 8x 16-bit words (bitmasks) of combinations possible |
||
149 | ncombos = data.draw(st.integers(0,6)) # how many combos should we allow |
||
150 | combos = data.draw(st.lists(st.integers(0,255), min_size=ncombos*2, max_size=ncombos*2)) |
||
151 | msg = bytearray([msg_type, port, mode]+combos+[0,0]) |
||
152 | l = self.m.parse(self._with_header(msg)) |
||
153 | |||
154 | self.hub.peripheral_queue.put.assert_any_call(('update_port', (port, self.m.port_info[port]))) |
||
155 | self.hub.peripheral_queue.put.assert_any_call(('port_combination_info_received', port)) |
||
156 | |||
157 | # Assert number of combos |
||
158 | #assert len(combos)/2 == len(self.m.port_info[port]['mode_combinations']) |
||
159 | |||
160 | @given(feedback=st.integers(0,32), |
||
161 | port=st.integers(0,255) |
||
162 | ) |
||
163 | def test_port_output_feedback_message(self, port, feedback): |
||
164 | msg_type = 0x82 |
||
165 | msg = bytearray([msg_type, port, feedback]) |
||
166 | self.m.parse(self._with_header(msg)) |
||
167 | |||
168 | @given(mode_type=st.sampled_from([0,1,2,3,4,5, 0x80]),#([0,1,2,3,4,5,0x80]), |
||
169 | mode=st.integers(0,255), |
||
170 | port=st.integers(0,255), |
||
171 | data=st.data() |
||
172 | ) |
||
173 | @settings(deadline=None) |
||
174 | def test_port_mode_info_message(self, port, mode, mode_type, data): |
||
175 | msg_type = 0x44 |
||
176 | |||
177 | if mode_type == 0: |
||
178 | name = data.draw(st.text(min_size=1, max_size=11)) |
||
179 | payload = bytearray(name.encode('utf-8')) |
||
180 | elif mode_type == 1 or mode_type == 2 or mode_type==3: |
||
181 | payload = data.draw(st.lists(st.integers(0,255), min_size=8, max_size=8)) |
||
182 | payload = bytearray(payload) |
||
183 | elif mode_type == 4: |
||
184 | name = data.draw(st.text(min_size=1, max_size=5)) |
||
185 | payload = bytearray(name.encode('utf-8')) |
||
186 | elif mode_type == 5: |
||
187 | payload = data.draw(st.lists(st.integers(0,255), min_size=2, max_size=2)) |
||
188 | payload = bytearray(payload) |
||
189 | elif mode_type == 0x80: |
||
190 | ndatasets = data.draw(st.integers(0,255)) |
||
191 | dataset_type = data.draw(st.integers(0,3)) |
||
192 | total_figures = data.draw(st.integers(0,255)) |
||
193 | decimals = data.draw(st.integers(0,255)) |
||
194 | payload = bytearray([ndatasets, dataset_type, total_figures, decimals]) |
||
195 | pass |
||
196 | else: |
||
197 | assert False |
||
198 | |||
199 | msg = bytearray([msg_type, port, mode, mode_type]) + payload |
||
0 ignored issues
–
show
introduced
by
![]() |
|||
200 | self.m.parse(self._with_header(msg)) |
||
201 | |||
202 | |||
203 | |||
204 | |||
205 | |||
206 | |||
207 | |||
208 | |||
209 | |||
210 | |||
211 | |||
212 | |||
213 |