GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.

test_messages   A
last analyzed

Complexity

Total Complexity 31

Size/Duplication

Total Lines 201
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 160
dl 0
loc 201
rs 9.92
c 0
b 0
f 0
wmc 31

9 Methods

Rating   Name   Duplication   Size   Complexity  
A TestMessages.test_port_value_message() 0 10 1
C TestMessages.test_port_information_message() 0 42 9
B TestMessages.test_hub_properties_message() 0 27 5
B TestMessages.test_port_mode_info_message() 0 33 8
A TestMessages.test_port_combo_value_message() 0 11 1
A TestMessages.setup() 0 4 1
A TestMessages.test_attach_message() 0 32 4
A TestMessages._with_header() 0 4 1
A TestMessages.test_port_output_feedback_message() 0 7 1
1
import pytest
2
import os, struct, copy
3
import logging
4
5
from mock import Mock
6
from mock import patch, call
7
from mock import MagicMock
8
from mock import PropertyMock
9
10
from hypothesis import given, example, settings
11
from hypothesis import strategies as st
12
13
from bricknil.message_dispatch import MessageDispatch
14
from bricknil.messages import UnknownMessageError, HubPropertiesMessage
15
from bricknil.const import DEVICES
16
17
class TestMessages:
18
19
    def setup(self):
20
        # Create the main dispatch
21
        self.hub = MagicMock()
22
        self.m = MessageDispatch(self.hub)
23
    
24
    def _with_header(self, msg:bytearray):
25
        l = len(msg)+2
26
        assert l<127
27
        return bytearray([l, 0]+list(msg))
28
29
    @given(st.data())
30
    def test_port_value_message(self, data):
31
        port = data.draw(st.integers(0,255))
32
        width = data.draw(st.integers(1,3))
33
        nbytes = 1<<(width-1)
34
        values = data.draw(st.lists(st.integers(0,255),min_size=nbytes,max_size=nbytes ))
35
        msg_type = 0x45
36
        msg = bytearray([msg_type, port]+values)
37
        l = self.m.parse(self._with_header(msg))
38
        self.hub.peripheral_queue.put.assert_called_with(('value_change', (port,values)))
39
40
    @given(port=st.integers(0,255),
41
           mode_ptr=st.integers(0, 0xffff),
42
           mode_data=st.lists(st.integers(0,255), min_size=1, max_size=100),
43
    ) 
44
    def test_port_combo_value_message(self, port, mode_ptr, mode_data):
45
        msg_type = 0x46
46
        mptr = struct.pack('H', mode_ptr)
47
        msg = bytearray([msg_type, int(port)])+mptr+bytearray(mode_data)
48
        l = self.m.parse(self._with_header(msg))
49
        assert l==f'Port {port} changed combo value to {list(msg[2:])}'
50
        self.hub.peripheral_queue.put.assert_called_with(('value_change', (port,list(msg[2:]))))
51
52
    @given(prop=st.integers(0,255),
53
           op = st.integers(0,255),
54
           msg_data=st.lists(st.integers(0,255), min_size=1, max_size=100),
55
    )
56
    @example(prop=0, op=1, msg_data=[0])
57
    @example(prop=0, op=0, msg_data=[0])
58
    @example(prop=2, op=6, msg_data=[0])
59
    def test_hub_properties_message(self, prop, op, msg_data):
60
        msg_type = 0x01
61
        msg = bytearray([msg_type, prop, op]+msg_data)
62
        msg = self._with_header(msg)
63
        msg_original = self.m._parse_msg_bytes(list(msg))
64
65
        if prop not in list(range(1,16)):
66
            l = self.m.parse(msg)
67
            assert l == f'Hub property:  {msg_original}'
68
        else:
69
            if op not in list(range(1,7)):
70
                l = self.m.parse(msg)
71
                assert l == f'Hub property:  {HubPropertiesMessage.prop_names[prop]} {msg_original}'
72
            else:
73
                l = self.m.parse(msg)
74
                remaining = self.m._parse_msg_bytes(list(msg[5:]))
75
                if prop==0x02 and op==0x06:
76
                    self.hub.peripheral_queue.put.assert_called_with(('value_change', (255,list(msg[5:]))))
77
                else:
78
                    assert l == f'Hub property:  {HubPropertiesMessage.prop_names[prop]} {HubPropertiesMessage.operation_names[op]} {remaining}'
79
80
81
    @given( event=st.integers(0,2),
82
            port=st.integers(0,255),
83
            data=st.data()
84
    )
85
    def test_attach_message(self, data, port, event):
86
        msg_type = 0x04
87
        msg = bytearray([msg_type, port, event])
88
        if event == 0: #detach
89
            l = self.m.parse(self._with_header(msg))
90
            assert l == f'Detached IO Port:{port}'
91
        elif event == 1: #attach
92
            # Need 10 bytes
93
            #dev_id = data.draw(st.integers(0,255))
94
            dev_id = data.draw(st.sampled_from(sorted(DEVICES.keys())))
95
            fw_version = data.draw(st.lists(st.integers(0,255), min_size=8, max_size=8))
96
            msg = msg + bytearray([dev_id, 0])+ bytearray(fw_version)
97
            l = self.m.parse(self._with_header(msg))
98
            self.hub.peripheral_queue.put.assert_any_call(('update_port', (port, self.m.port_info[port])))
99
            self.hub.peripheral_queue.put.assert_any_call(('port_detected', port))
100
            # ALso need to make sure the port info is added to dispatch
101
            assert self.m.port_info[port]['name'] == DEVICES[dev_id]
102
        elif event == 2: # virtual attach
103
            dev_id = data.draw(st.sampled_from(sorted(DEVICES.keys())))
104
            v_port_a = data.draw(st.integers(0,255))
105
            v_port_b = data.draw(st.integers(0,255))
106
            msg = msg + bytearray([dev_id, 0, v_port_a, v_port_b])
107
            l = self.m.parse(self._with_header(msg))
108
            self.hub.peripheral_queue.put.assert_any_call(('update_port', (port, self.m.port_info[port])))
109
            self.hub.peripheral_queue.put.assert_any_call(('port_detected', port))
110
            assert l == f'Attached VirtualIO Port:{port} {self.m.port_info[port]["name"]} Port A: {v_port_a}, Port B: {v_port_b}'
111
            assert self.m.port_info[port]['virtual'] == (v_port_a, v_port_b)
112
            assert self.m.port_info[port]['name'] == DEVICES[dev_id]
113
114
    @given( mode = st.integers(1,2),
115
            port = st.integers(0,255),
116
            data = st.data()
117
           )
118
    def test_port_information_message(self, data, port, mode):
119
        msg_type = 0x43
120
        if mode == 1:
121
            capabilities = data.draw(st.integers(0,15)) # bit mask of 4 bits
122
            nmodes = data.draw(st.integers(0,255))
123
            input_modes = [data.draw(st.integers(0,255)), data.draw(st.integers(0,255))]
124
            output_modes = [data.draw(st.integers(0,255)), data.draw(st.integers(0,255))]
125
            msg = bytearray([msg_type, port, mode, capabilities, nmodes]+input_modes+output_modes)
126
            l = self.m.parse(self._with_header(msg))
127
128
            self.hub.peripheral_queue.put.assert_any_call(('update_port', (port, self.m.port_info[port])))
129
            self.hub.peripheral_queue.put.assert_any_call(('port_info_received', port))
130
131
            # Make sure the proper capabilities have been set
132
            bitmask = ['output', 'input', 'combinable', 'synchronizable'] # capabilities
133
            for i,cap in enumerate(bitmask):
134
                assert self.m.port_info[port][cap] == capabilities & (1<<i)
135
            
136
            for i in range(8):
137
                if input_modes[0] & (1<<i):
138
                    assert self.m.port_info[port]['modes'][i]['input']
139
                if input_modes[1] & (1<<i):
140
                    assert self.m.port_info[port]['modes'][i+8]['input']
141
                if output_modes[0] & (1<<i):
142
                    assert self.m.port_info[port]['modes'][i]['output']
143
                if output_modes[1] & (1<<i):
144
                    assert self.m.port_info[port]['modes'][i+8]['output']
145
                    
146
        elif mode == 2:
147
            # Combination info
148
            # Up to 8x 16-bit words (bitmasks) of combinations possible
149
            ncombos = data.draw(st.integers(0,6))  # how many combos should we allow
150
            combos = data.draw(st.lists(st.integers(0,255), min_size=ncombos*2, max_size=ncombos*2))
151
            msg = bytearray([msg_type, port, mode]+combos+[0,0])
152
            l = self.m.parse(self._with_header(msg))
153
154
            self.hub.peripheral_queue.put.assert_any_call(('update_port', (port, self.m.port_info[port])))
155
            self.hub.peripheral_queue.put.assert_any_call(('port_combination_info_received', port))
156
157
            # Assert number of combos
158
            #assert len(combos)/2 == len(self.m.port_info[port]['mode_combinations'])
159
160
    @given(feedback=st.integers(0,32),
161
           port=st.integers(0,255)
162
    )
163
    def test_port_output_feedback_message(self, port, feedback):
164
        msg_type = 0x82
165
        msg = bytearray([msg_type, port, feedback])
166
        self.m.parse(self._with_header(msg))
167
        
168
    @given(mode_type=st.sampled_from([0,1,2,3,4,5, 0x80]),#([0,1,2,3,4,5,0x80]),
169
           mode=st.integers(0,255),
170
           port=st.integers(0,255),
171
           data=st.data()
172
    )
173
    @settings(deadline=None)
174
    def test_port_mode_info_message(self, port, mode, mode_type, data):
175
        msg_type = 0x44
176
177
        if mode_type == 0:
178
            name = data.draw(st.text(min_size=1, max_size=11))
179
            payload = bytearray(name.encode('utf-8'))
180
        elif mode_type == 1 or mode_type == 2 or mode_type==3:
181
            payload = data.draw(st.lists(st.integers(0,255), min_size=8, max_size=8))
182
            payload = bytearray(payload)
183
        elif mode_type == 4:
184
            name = data.draw(st.text(min_size=1, max_size=5))
185
            payload = bytearray(name.encode('utf-8'))
186
        elif mode_type == 5:
187
            payload = data.draw(st.lists(st.integers(0,255), min_size=2, max_size=2))
188
            payload = bytearray(payload)
189
        elif mode_type == 0x80:
190
            ndatasets = data.draw(st.integers(0,255))
191
            dataset_type = data.draw(st.integers(0,3))
192
            total_figures = data.draw(st.integers(0,255))
193
            decimals = data.draw(st.integers(0,255))
194
            payload = bytearray([ndatasets, dataset_type, total_figures, decimals])
195
            pass
196
        else:
197
            assert False
198
199
        msg = bytearray([msg_type, port, mode, mode_type]) + payload
0 ignored issues
show
introduced by
The variable payload does not seem to be defined for all execution paths.
Loading history...
200
        self.m.parse(self._with_header(msg))
201
            
202
203
204
205
206
207
208
209
            
210
211
212
213