GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.

test_hub   C
last analyzed

Complexity

Total Complexity 56

Size/Duplication

Total Lines 324
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 243
dl 0
loc 324
rs 5.5199
c 0
b 0
f 0
wmc 56

36 Methods

Rating   Name   Duplication   Size   Complexity  
A MockBLE.clear_cached_data() 0 2 1
A MockBleakDevice.disconnect() 0 2 1
A MockDevice.__init__() 0 4 1
A MockBLE.find_devices() 0 3 1
A TestSensors.setup() 0 16 1
A MockBleak.BleakClient() 0 6 1
A TestSensors._wait_send_message() 0 7 3
A MockBleakDevice.write_gatt_char() 0 2 1
A MockBLE.initialize() 0 2 1
A MockBLE.run_mainloop_with() 0 3 1
A TestSensors._with_header() 0 4 1
A MockBleakDevice.start_notify() 0 3 1
A MockDevice.discover() 0 4 1
A MockDevice.disconnect() 0 2 1
A MockUart.__init__() 0 2 1
A TestSensors._get_hub_class() 0 11 1
B TestSensors.test_run_hub_with_bleak() 0 44 5
A MockUart.find_characteristic() 0 3 1
A MockBleak.discover() 0 6 1
A TestSensors.test_run_hub() 0 20 3
A MockBleak.__init__() 0 3 1
A MockAdapter.stop_scan() 0 2 1
A MockBLE.get_default_adapter() 0 3 1
A MockBleakDevice.__init__() 0 6 1
A MockDevice.find_service() 0 3 1
A MockAdapter.power_on() 0 2 1
A MockBLE.__init__() 0 2 1
A MockBleakDevice.connect() 0 3 1
A MockUart.start_notify() 0 3 1
A MockAdapter.__init__() 0 2 1
A TestSensors._draw_capabilities() 0 15 2
A MockAdapter.start_scan() 0 2 1
A MockDevice.connect() 0 2 1
A TestSensors.test_attach_sensor() 0 14 1
D TestSensors._emit_control() 0 41 12
A MockUart.write_value() 0 2 1

How to fix   Complexity   

Complexity

Complex classes like test_hub often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
import pytest
2
import os, struct, copy, sys
3
from functools import partial
4
import logging, threading
5
from asyncio import coroutine
6
from curio import kernel, sleep, spawn, Event
7
import time
8
9
from mock import Mock
10
from mock import patch, call, create_autospec
11
from mock import MagicMock
12
from mock import PropertyMock
13
14
from hypothesis import given, example, settings
15
from hypothesis import strategies as st
16
17
from bricknil.message_dispatch import MessageDispatch
18
from bricknil.messages import UnknownMessageError, HubPropertiesMessage
19
from bricknil.sensor import *
20
from bricknil.const import DEVICES
21
from bricknil import attach, start
22
from bricknil.hub import PoweredUpHub, Hub, BoostHub, DuploTrainHub, PoweredUpRemote
23
import bricknil
24
import bricknil.const
25
26
27
class TestSensors:
28
29
    def setup(self):
30
        # Create the main dispatch
31
        self.hub = MagicMock()
32
        self.m = MessageDispatch(self.hub)
33
        self.sensor_list = [ CurrentSensor,
34
                             DuploSpeedSensor,
35
                             VisionSensor,
36
                             InternalTiltSensor,
37
                             ExternalMotionSensor,
38
                             ExternalTiltSensor,
39
                             RemoteButtons,
40
                             Button,
41
                             DuploVisionSensor,
42
                             VoltageSensor,
43
        ]
44
        self.hub_list = [ PoweredUpHub, BoostHub, DuploTrainHub, PoweredUpRemote]
45
    
46
    def _with_header(self, msg:bytearray):
47
        l = len(msg)+2
48
        assert l<127
49
        return bytearray([l, 0]+list(msg))
50
51
    def _draw_capabilities(self, data, sensor):
52
        if len(sensor.allowed_combo) > 0:
53
            # test capabilities 1 by 1, 
54
            # or some combination of those in the allowed_combo list
55
            capabilities = data.draw(
56
                    st.one_of(
57
                        st.lists(st.sampled_from([cap.name for cap in list(sensor.capability)]), min_size=1, max_size=1),
58
                        st.lists(st.sampled_from(sensor.capability), min_size=1, max_size=1),
59
                        st.lists(st.sampled_from(sensor.allowed_combo), min_size=1, unique=True)
60
                    )
61
                )
62
        else:
63
            # if no combos allowed, then just test 1 by 1
64
            capabilities = data.draw(st.lists(st.sampled_from(sensor.capability), min_size=1, max_size=1))
65
        return capabilities
66
67
68
    def _get_hub_class(self, hub_type, sensor, sensor_name, capabilities):
69
        stop_evt = Event()
70
        @attach(sensor, name=sensor_name, capabilities=capabilities)
71
        class TestHub(hub_type):
72
            async def sensor_change(self):
73
                pass
74
            async def run(self):
75
                pass
76
                await stop_evt.wait()
77
78
        return TestHub, stop_evt
79
80
    #@patch('bricknil.hub.PoweredUpHub', autospec=True, create=True)
81
    @given(data = st.data())
82
    def test_attach_sensor(self, data):
83
        
84
        sensor_name = 'sensor'
85
        sensor = data.draw(st.sampled_from(self.sensor_list))
86
        capabilities = self._draw_capabilities(data, sensor)
87
88
        hub_type = data.draw(st.sampled_from(self.hub_list))
89
        TestHub, stop_evt = self._get_hub_class(hub_type, sensor, sensor_name, capabilities)
90
        hub = TestHub('testhub')
91
        # Check to make sure we have the peripheral attached
92
        # and the sensor inserted as an attribute
93
        assert sensor_name in hub.peripherals
94
        assert hasattr(hub, sensor_name)
95
96
    @given(data = st.data())
97
    def test_run_hub(self, data):
98
99
        Hub.hubs = []
100
        sensor_name = 'sensor'
101
        sensor = data.draw(st.sampled_from(self.sensor_list))
102
        capabilities = self._draw_capabilities(data, sensor)
103
104
        hub_type = data.draw(st.sampled_from(self.hub_list))
105
        TestHub, stop_evt = self._get_hub_class(hub_type, sensor, sensor_name, capabilities)
106
        hub = TestHub('test_hub')
107
108
        # Start the hub
109
        #kernel.run(self._emit_control(TestHub))
110
        with patch('Adafruit_BluefruitLE.get_provider') as ble,\
111
             patch('bricknil.ble_queue.USE_BLEAK', False) as use_bleak:
112
            ble.return_value = MockBLE(hub)
113
            sensor_obj = getattr(hub, sensor_name)
114
            sensor_obj.send_message = Mock(side_effect=coroutine(lambda x,y: "the awaitable should return this"))
115
            kernel.run(self._emit_control, data, hub, stop_evt, ble(), sensor_obj)
116
            #start(system)
117
118
    async def _wait_send_message(self, mock_call, msg):
119
        print("in mock")
120
        while not mock_call.call_args:
121
            await sleep(0.01)
122
        while not msg in mock_call.call_args[0][0]:
123
            print(mock_call.call_args)
124
            await sleep(0.01)
125
126
    async def _emit_control(self, data, hub, hub_stop_evt, ble, sensor):
127
        async def dummy():
128
            pass
129
        system = await spawn(bricknil.bricknil._run_all(ble, dummy))
130
        while not hub.peripheral_queue:
131
            await sleep(0.1)
132
        #await sleep(3)
133
        port = data.draw(st.integers(0,254))
134
        await hub.peripheral_queue.put( ('attach', (port, sensor.sensor_name)) )
135
136
        # Now, make sure the sensor sent an activate updates message
137
        if sensor.sensor_name == "Button":
138
            await self._wait_send_message(sensor.send_message, 'Activate button')
139
        else:
140
            await self._wait_send_message(sensor.send_message, 'Activate SENSOR')
141
        # Need to generate a value on the port
142
        # if False:
143
        msg = []
144
        if len(sensor.capabilities) == 1:
145
            # Handle single capability
146
            for cap in sensor.capabilities:
147
                n_datasets, byte_count = sensor.datasets[cap][0:2]
148
                for i in range(n_datasets):
149
                    for b in range(byte_count):
150
                        msg.append(data.draw(st.integers(0,255)))
151
            msg = bytearray(msg)
152
            await hub.peripheral_queue.put( ('value_change', (port, msg)))
153
        elif len(sensor.capabilities) > 1:
154
            modes = 1
155
            msg.append(modes)
156
            for cap_i, cap in enumerate(sensor.capabilities):
157
                if modes & (1<<cap_i): 
158
                    n_datasets, byte_count = sensor.datasets[cap][0:2]
159
                    for i in range(n_datasets):
160
                        for b in range(byte_count):
161
                            msg.append(data.draw(st.integers(0,255)))
162
            msg = bytearray(msg)
163
            await hub.peripheral_queue.put( ('value_change', (port, msg)))
164
        
165
        await hub_stop_evt.set()
166
        await system.join()
167
168
    @given(data = st.data())
169
    def test_run_hub_with_bleak(self, data):
170
171
        Hub.hubs = []
172
        sensor_name = 'sensor'
173
        sensor = data.draw(st.sampled_from(self.sensor_list))
174
        capabilities = self._draw_capabilities(data, sensor)
175
176
        hub_type = data.draw(st.sampled_from(self.hub_list))
177
        TestHub, stop_evt = self._get_hub_class(hub_type, sensor, sensor_name, capabilities)
178
        hub = TestHub('test_hub')
179
180
        async def dummy():
181
            pass
182
        # Start the hub
183
        #MockBleak = MagicMock()
184
        sys.modules['bleak'] = MockBleak(hub)
185
        with patch('bricknil.bricknil.USE_BLEAK', True), \
186
             patch('bricknil.ble_queue.USE_BLEAK', True) as use_bleak:
187
            sensor_obj = getattr(hub, sensor_name)
188
            sensor_obj.send_message = Mock(side_effect=coroutine(lambda x,y: "the awaitable should return this"))
189
            from bricknil.bleak_interface import Bleak
190
            ble = Bleak()
191
            # Run curio in a thread
192
            async def dummy(): pass
193
194
            async def start_curio():
195
                system = await spawn(bricknil.bricknil._run_all(ble, dummy))
196
                while len(ble.devices) < 1 or not ble.devices[0].notify:
197
                    await sleep(0.01)
198
                await stop_evt.set()
199
                print("sending quit")
200
                await ble.in_queue.put( ('quit', ''))
201
                #await system.join()
202
                print('system joined')
203
204
            def start_thread():
205
                kernel.run(start_curio)
206
207
            t = threading.Thread(target=start_thread)
208
            t.start()
209
            print('started thread for curio')
210
            ble.run()
211
            t.join()
212
213
214
215
216
class MockBleak(MagicMock):
217
    def __init__(self, hub):
218
        MockBleak.hub = hub
219
        pass
220
    @classmethod
221
    async def discover(cls, timeout, loop):
222
        # Need to return devices here, which is a list of device tuples
223
        hub = MockBleak.hub
224
        devices = [MockBleakDevice(hub.uart_uuid, hub.manufacturer_id)]
225
        return devices
226
227
    @classmethod
228
    def BleakClient(cls, address, loop):
229
        print("starting BleakClient")
230
        hub = MockBleak.hub
231
        device = MockBleakDevice(hub.uart_uuid, hub.manufacturer_id)
232
        return device
233
234
class MockBleakDevice:
235
    def __init__(self, uuid, manufacturer_id):
236
        self.uuids = [str(uuid)]
237
        self.manufacturer_data = {'values': [0, manufacturer_id]  }
238
        self.name = ""
239
        self.address = "XX:XX:XX:XX:XX" 
240
        self.notify = False
241
242
    async def connect(self):
243
        self.characteristics = MockBleak.hub.char_uuid
244
        pass
245
    async def write_gatt_char(self, char_uuid, msg_bytes):
246
        print(f'Got msg on {char_uuid}: {msg_bytes}')
247
248
    async def start_notify(self, char_uuid, handler):
249
        print("started notify")
250
        self.notify = True
251
252
    async def disconnect(self):
253
        print("device disconnected")
254
255
class MockBLE:
256
    def __init__(self, hub):
257
        self.hub = hub
258
259
    def initialize(self):
260
        print("initialized")
261
    
262
    def clear_cached_data(self):
263
        pass
264
265
    def get_default_adapter(self):
266
        self.mock_adapter = MockAdapter()
267
        return self.mock_adapter
268
269
    def find_devices(self, service_uuids):
270
        self.device = MockDevice(hub_name = self.hub.ble_name, hub_id = self.hub.manufacturer_id)
271
        return [self.device]
272
273
    def run_mainloop_with(self, func):
274
        print("run mainloop")
275
        func()
276
277
class MockAdapter:
278
    def __init__(self):
279
        self.name = 'Mock adapter'
280
    def power_on(self):
281
        pass
282
283
    def start_scan(self):
284
        print("start scan called")
285
286
    def stop_scan(self):
287
        print("stop scan called")
288
289
class MockDevice:
290
    def __init__(self, hub_name, hub_id):
291
        self.advertised = [-1, -1, -1, -1, hub_id]
292
        self.id = 'XX:XX:XX:XX:XX:XX'
293
        self.name = hub_name
294
295
    def connect(self):
296
        print("device connect called")
297
298
    def discover(self, uart_uuid, char_uuid):
299
        print(f'discover called on uart {uart_uuid}, char {char_uuid}')
300
        self.uart_uuid = uart_uuid
301
        self.char = char_uuid
302
    
303
    def find_service(self, uart_uuid):
304
        self.uart = MockUart()
305
        return self.uart
306
307
    def disconnect(self):
308
        print('device disconnect called')
309
310
311
class MockUart:
312
    def __init__(self):
313
        pass
314
    def find_characteristic(self, char_uuid):
315
        self.char_uuid = char_uuid
316
        return self
317
318
    def start_notify(self, callback):
319
        # Spawn a task to do the attachments, etc
320
        self.notify = callback
321
322
    def write_value(self, values):
323
        print(f'received values: {values}')