GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Test Failed
Pull Request — master (#11)
by
unknown
01:53
created

test_hub.MockBleak.BleakClient()   A

Complexity

Conditions 1

Size

Total Lines 6
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 6
nop 3
dl 0
loc 6
rs 10
c 0
b 0
f 0
1
import pytest
2
import os, struct, copy, sys
3
from functools import partial
4
import logging, threading
5
from asyncio import coroutine
6
from asyncio import kernel, sleep, spawn, Event
7
import time
8
9
from mock import Mock
10
from mock import patch, call, create_autospec
11
from mock import MagicMock
12
from mock import PropertyMock
13
14
from hypothesis import given, example, settings
15
from hypothesis import strategies as st
16
17
from bricknil.message_dispatch import MessageDispatch
18
from bricknil.messages import UnknownMessageError, HubPropertiesMessage
19
from bricknil.sensor import *
20
from bricknil.const import DEVICES
21
from bricknil import attach, start
22
from bricknil.hub import PoweredUpHub, Hub, BoostHub, DuploTrainHub, PoweredUpRemote
23
import bricknil
24
import bricknil.const
25
26
27
class TestSensors:
28
29
    def setup(self):
30
        # Create the main dispatch
31
        self.hub = MagicMock()
32
        self.m = MessageDispatch(self.hub)
33
        self.sensor_list = [ CurrentSensor,
34
                             DuploSpeedSensor,
35
                             VisionSensor,
36
                             InternalTiltSensor,
37
                             ExternalMotionSensor,
38
                             ExternalTiltSensor,
39
                             RemoteButtons,
40
                             Button,
41
                             DuploVisionSensor,
42
                             VoltageSensor,
43
        ]
44
        self.hub_list = [ PoweredUpHub, BoostHub, DuploTrainHub, PoweredUpRemote]
45
    
46
    def _with_header(self, msg:bytearray):
47
        l = len(msg)+2
48
        assert l<127
49
        return bytearray([l, 0]+list(msg))
50
51
    def _draw_capabilities(self, data, sensor):
52
        if len(sensor.allowed_combo) > 0:
53
            # test capabilities 1 by 1, 
54
            # or some combination of those in the allowed_combo list
55
            capabilities = data.draw(
56
                    st.one_of(
57
                        st.lists(st.sampled_from([cap.name for cap in list(sensor.capability)]), min_size=1, max_size=1),
58
                        st.lists(st.sampled_from(sensor.capability), min_size=1, max_size=1),
59
                        st.lists(st.sampled_from(sensor.allowed_combo), min_size=1, unique=True)
60
                    )
61
                )
62
        else:
63
            # if no combos allowed, then just test 1 by 1
64
            capabilities = data.draw(st.lists(st.sampled_from(sensor.capability), min_size=1, max_size=1))
65
        return capabilities
66
67
68
    def _get_hub_class(self, hub_type, sensor, sensor_name, capabilities):
69
        stop_evt = Event()
70
        @attach(sensor, name=sensor_name, capabilities=capabilities)
71
        class TestHub(hub_type):
72
            async def sensor_change(self):
73
                pass
74
            async def run(self):
75
                pass
76
                await stop_evt.wait()
77
78
        return TestHub, stop_evt
79
80
    #@patch('bricknil.hub.PoweredUpHub', autospec=True, create=True)
81
    @given(data = st.data())
82
    def test_attach_sensor(self, data):
83
        
84
        sensor_name = 'sensor'
85
        sensor = data.draw(st.sampled_from(self.sensor_list))
86
        capabilities = self._draw_capabilities(data, sensor)
87
88
        hub_type = data.draw(st.sampled_from(self.hub_list))
89
        TestHub, stop_evt = self._get_hub_class(hub_type, sensor, sensor_name, capabilities)
90
        hub = TestHub('testhub')
91
        # Check to make sure we have the peripheral attached
92
        # and the sensor inserted as an attribute
93
        assert sensor_name in hub.peripherals
94
        assert hasattr(hub, sensor_name)
95
96
    @given(data = st.data())
97
    def test_run_hub(self, data):
98
99
        Hub.hubs = []
100
        sensor_name = 'sensor'
101
        sensor = data.draw(st.sampled_from(self.sensor_list))
102
        capabilities = self._draw_capabilities(data, sensor)
103
104
        hub_type = data.draw(st.sampled_from(self.hub_list))
105
        TestHub, stop_evt = self._get_hub_class(hub_type, sensor, sensor_name, capabilities)
106
        hub = TestHub('test_hub')
107
108
        # Start the hub
109
        sys.modules['bleak'] = MockBleak(hub)
110
        sensor_obj = getattr(hub, sensor_name)
111
        sensor_obj.send_message = Mock(side_effect=coroutine(lambda x,y: "the awaitable should return this"))
112
        from bricknil.bleak_interface import Bleak
113
        ble = Bleak()
114
        # Run curio in a thread
115
        async def dummy(): pass
116
117
        async def start_curio():
118
            system = await spawn(bricknil.bricknil._run_all(ble, dummy))
119
            while len(ble.devices) < 1 or not ble.devices[0].notify:
120
                await sleep(0.01)
121
            await stop_evt.set()
122
            print("sending quit")
123
            await ble.in_queue.put( ('quit', ''))
124
            #await system.join()
125
            print('system joined')
126
127
        def start_thread():
128
            kernel.run(start_curio)
129
130
        t = threading.Thread(target=start_thread)
131
        t.start()
132
        print('started thread for curio')
133
        ble.run()
134
        t.join()
135
136
    async def _wait_send_message(self, mock_call, msg):
137
        print("in mock")
138
        while not mock_call.call_args:
139
            await sleep(0.01)
140
        while not msg in mock_call.call_args[0][0]:
141
            print(mock_call.call_args)
142
            await sleep(0.01)
143
144
    async def _emit_control(self, data, hub, hub_stop_evt, ble, sensor):
145
        async def dummy():
146
            pass
147
        system = await spawn(bricknil.bricknil._run_all(ble, dummy))
148
        while not hub.peripheral_queue:
149
            await sleep(0.1)
150
        #await sleep(3)
151
        port = data.draw(st.integers(0,254))
152
        await hub.peripheral_queue.put( ('attach', (port, sensor.sensor_name)) )
153
154
        # Now, make sure the sensor sent an activate updates message
155
        if sensor.sensor_name == "Button":
156
            await self._wait_send_message(sensor.send_message, 'Activate button')
157
        else:
158
            await self._wait_send_message(sensor.send_message, 'Activate SENSOR')
159
        # Need to generate a value on the port
160
        # if False:
161
        msg = []
162
        if len(sensor.capabilities) == 1:
163
            # Handle single capability
164
            for cap in sensor.capabilities:
165
                n_datasets, byte_count = sensor.datasets[cap][0:2]
166
                for i in range(n_datasets):
167
                    for b in range(byte_count):
168
                        msg.append(data.draw(st.integers(0,255)))
169
            msg = bytearray(msg)
170
            await hub.peripheral_queue.put( ('value_change', (port, msg)))
171
        elif len(sensor.capabilities) > 1:
172
            modes = 1
173
            msg.append(modes)
174
            for cap_i, cap in enumerate(sensor.capabilities):
175
                if modes & (1<<cap_i): 
176
                    n_datasets, byte_count = sensor.datasets[cap][0:2]
177
                    for i in range(n_datasets):
178
                        for b in range(byte_count):
179
                            msg.append(data.draw(st.integers(0,255)))
180
            msg = bytearray(msg)
181
            await hub.peripheral_queue.put( ('value_change', (port, msg)))
182
        
183
        await hub_stop_evt.set()
184
        await system.join()
185
186
187
class MockBleak(MagicMock):
188
    def __init__(self, hub):
189
        MockBleak.hub = hub
190
        pass
191
    @classmethod
192
    async def discover(cls, timeout, loop):
193
        # Need to return devices here, which is a list of device tuples
194
        hub = MockBleak.hub
195
        devices = [MockBleakDevice(hub.uart_uuid, hub.manufacturer_id)]
196
        return devices
197
198
    @classmethod
199
    def BleakClient(cls, address, loop):
200
        print("starting BleakClient")
201
        hub = MockBleak.hub
202
        device = MockBleakDevice(hub.uart_uuid, hub.manufacturer_id)
203
        return device
204
205
class MockBleakDevice:
206
    def __init__(self, uuid, manufacturer_id):
207
        self.uuids = [str(uuid)]
208
        self.manufacturer_data = {'values': [0, manufacturer_id]  }
209
        self.name = ""
210
        self.address = "XX:XX:XX:XX:XX" 
211
        self.notify = False
212
213
    async def connect(self):
214
        self.characteristics = MockBleak.hub.char_uuid
215
        pass
216
    async def write_gatt_char(self, char_uuid, msg_bytes):
217
        print(f'Got msg on {char_uuid}: {msg_bytes}')
218
219
    async def start_notify(self, char_uuid, handler):
220
        print("started notify")
221
        self.notify = True
222
223
    async def disconnect(self):
224
        print("device disconnected")
225
226
class MockBLE:
227
    def __init__(self, hub):
228
        self.hub = hub
229
230
    def initialize(self):
231
        print("initialized")
232
    
233
    def clear_cached_data(self):
234
        pass
235
236
    def get_default_adapter(self):
237
        self.mock_adapter = MockAdapter()
238
        return self.mock_adapter
239
240
    def find_devices(self, service_uuids):
241
        self.device = MockDevice(hub_name = self.hub.ble_name, hub_id = self.hub.manufacturer_id)
242
        return [self.device]
243
244
    def run_mainloop_with(self, func):
245
        print("run mainloop")
246
        func()
247
248
class MockAdapter:
249
    def __init__(self):
250
        self.name = 'Mock adapter'
251
    def power_on(self):
252
        pass
253
254
    def start_scan(self):
255
        print("start scan called")
256
257
    def stop_scan(self):
258
        print("stop scan called")
259
260
class MockDevice:
261
    def __init__(self, hub_name, hub_id):
262
        self.advertised = [-1, -1, -1, -1, hub_id]
263
        self.id = 'XX:XX:XX:XX:XX:XX'
264
        self.name = hub_name
265
266
    def connect(self):
267
        print("device connect called")
268
269
    def discover(self, uart_uuid, char_uuid):
270
        print(f'discover called on uart {uart_uuid}, char {char_uuid}')
271
        self.uart_uuid = uart_uuid
272
        self.char = char_uuid
273
    
274
    def find_service(self, uart_uuid):
275
        self.uart = MockUart()
276
        return self.uart
277
278
    def disconnect(self):
279
        print('device disconnect called')
280
281
282
class MockUart:
283
    def __init__(self):
284
        pass
285
    def find_characteristic(self, char_uuid):
286
        self.char_uuid = char_uuid
287
        return self
288
289
    def start_notify(self, callback):
290
        # Spawn a task to do the attachments, etc
291
        self.notify = callback
292
293
    def write_value(self, values):
294
        print(f'received values: {values}')