GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Passed
Push — master ( 0610d8...b4e475 )
by Virantha
02:20
created

test_hub.MockDevice.connect()   A

Complexity

Conditions 1

Size

Total Lines 2
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 1
dl 0
loc 2
rs 10
c 0
b 0
f 0
1
import pytest
2
import os, struct, copy
3
import logging
4
from asyncio import coroutine
5
from curio import kernel, sleep, spawn, Event
6
7
from mock import Mock
8
from mock import patch, call, create_autospec
9
from mock import MagicMock
10
from mock import PropertyMock
11
12
from hypothesis import given, example, settings
13
from hypothesis import strategies as st
14
15
from bricknil.message_dispatch import MessageDispatch
16
from bricknil.messages import UnknownMessageError, HubPropertiesMessage
17
from bricknil.sensor import *
18
from bricknil.const import DEVICES
19
from bricknil import attach, start
20
from bricknil.hub import PoweredUpHub, Hub, BoostHub, DuploTrainHub
21
import bricknil
22
import bricknil.const
23
24
25
class TestSensors:
26
27
    def setup(self):
28
        # Create the main dispatch
29
        self.hub = MagicMock()
30
        self.m = MessageDispatch(self.hub)
31
        self.sensor_list = [ CurrentSensor,
32
                             DuploSpeedSensor,
33
                             VisionSensor,
34
                             InternalTiltSensor,
35
                             ExternalMotionSensor,
36
                             ExternalTiltSensor,
37
                             RemoteButtons,
38
                             Button,
39
                             DuploVisionSensor,
40
                             VoltageSensor,
41
        ]
42
        self.hub_list = [ PoweredUpHub, BoostHub, DuploTrainHub]
43
    
44
    def _with_header(self, msg:bytearray):
45
        l = len(msg)+2
46
        assert l<127
47
        return bytearray([l, 0]+list(msg))
48
49
    def _draw_capabilities(self, data, sensor):
50
        if len(sensor.allowed_combo) > 0:
51
            # test capabilities 1 by 1, 
52
            # or some combination of those in the allowed_combo list
53
            capabilities = data.draw(
54
                    st.one_of(
55
                        st.lists(st.sampled_from(sensor.capability), min_size=1, max_size=1),
56
                        st.lists(st.sampled_from(sensor.allowed_combo), min_size=1, unique=True)
57
                    )
58
                )
59
        else:
60
            # if no combos allowed, then just test 1 by 1
61
            capabilities = data.draw(st.lists(st.sampled_from(sensor.capability), min_size=1, max_size=1))
62
        return capabilities
63
64
65
    def _get_hub_class(self, hub_type, sensor, sensor_name, capabilities):
66
        stop_evt = Event()
67
        @attach(sensor, name=sensor_name, capabilities=capabilities)
68
        class TestHub(hub_type):
69
            async def sensor_change(self):
70
                pass
71
            async def run(self):
72
                pass
73
                await stop_evt.wait()
74
75
        return TestHub, stop_evt
76
77
    #@patch('bricknil.hub.PoweredUpHub', autospec=True, create=True)
78
    @given(data = st.data())
79
    def test_attach_sensor(self, data):
80
        
81
        sensor_name = 'sensor'
82
        sensor = data.draw(st.sampled_from(self.sensor_list))
83
        capabilities = self._draw_capabilities(data, sensor)
84
85
        hub_type = data.draw(st.sampled_from(self.hub_list))
86
        TestHub, stop_evt = self._get_hub_class(hub_type, sensor, sensor_name, capabilities)
87
        hub = TestHub('testhub')
88
        # Check to make sure we have the peripheral attached
89
        # and the sensor inserted as an attribute
90
        assert sensor_name in hub.peripherals
91
        assert hasattr(hub, sensor_name)
92
93
    @given(data = st.data())
94
    def test_run_hub(self, data):
95
96
        Hub.hubs = []
97
        sensor_name = 'sensor'
98
        sensor = data.draw(st.sampled_from(self.sensor_list))
99
        capabilities = self._draw_capabilities(data, sensor)
100
101
        hub_type = data.draw(st.sampled_from(self.hub_list))
102
        TestHub, stop_evt = self._get_hub_class(hub_type, sensor, sensor_name, capabilities)
103
        hub = TestHub('test_hub')
104
105
        # Start the hub
106
        #kernel.run(self._emit_control(TestHub))
107
        with patch('Adafruit_BluefruitLE.get_provider') as ble,\
108
             patch('bricknil.ble_queue.USE_BLEAK', False) as use_bleak:
109
            ble.return_value = MockBLE(hub)
110
            sensor_obj = getattr(hub, sensor_name)
111
            sensor_obj.send_message = Mock(side_effect=coroutine(lambda x,y: "the awaitable should return this"))
112
            kernel.run(self._emit_control, data, hub, stop_evt, ble(), sensor_obj)
113
            #start(system)
114
115
    async def _wait_send_message(self, mock_call, msg):
116
        print("in mock")
117
        while not mock_call.call_args:
118
            await sleep(0.01)
119
        while not msg in mock_call.call_args[0][0]:
120
            print(mock_call.call_args)
121
            await sleep(0.01)
122
123
    async def _emit_control(self, data, hub, hub_stop_evt, ble, sensor):
124
        async def dummy():
125
            pass
126
        system = await spawn(bricknil.bricknil._run_all(ble, dummy))
127
        while not hub.peripheral_queue:
128
            await sleep(0.1)
129
        #await sleep(3)
130
        port = data.draw(st.integers(0,254))
131
        await hub.peripheral_queue.put( ('attach', (port, sensor.sensor_name)) )
132
133
        # Now, make sure the sensor sent an activate updates message
134
        if sensor.sensor_name == "Button":
135
            await self._wait_send_message(sensor.send_message, 'Activate button')
136
        else:
137
            await self._wait_send_message(sensor.send_message, 'Activate SENSOR')
138
        # Need to generate a value on the port
139
        # if False:
140
        msg = []
141
        if len(sensor.capabilities) == 1:
142
            # Handle single capability
143
            for cap in sensor.capabilities:
144
                n_datasets, byte_count = sensor.datasets[cap][0:2]
145
                for i in range(n_datasets):
146
                    for b in range(byte_count):
147
                        msg.append(data.draw(st.integers(0,255)))
148
            msg = bytearray(msg)
149
            await hub.peripheral_queue.put( ('value_change', (port, msg)))
150
        elif len(sensor.capabilities) > 1:
151
            modes = 1
152
            msg.append(modes)
153
            for cap_i, cap in enumerate(sensor.capabilities):
154
                if modes & (1<<cap_i): 
155
                    n_datasets, byte_count = sensor.datasets[cap][0:2]
156
                    for i in range(n_datasets):
157
                        for b in range(byte_count):
158
                            msg.append(data.draw(st.integers(0,255)))
159
            msg = bytearray(msg)
160
            await hub.peripheral_queue.put( ('value_change', (port, msg)))
161
        
162
        await hub_stop_evt.set()
163
        await system.join()
164
165
class MockBLE:
166
    def __init__(self, hub):
167
        self.hub = hub
168
169
    def initialize(self):
170
        print("initialized")
171
    
172
    def clear_cached_data(self):
173
        pass
174
175
    def get_default_adapter(self):
176
        self.mock_adapter = MockAdapter()
177
        return self.mock_adapter
178
179
    def find_devices(self, service_uuids):
180
        self.device = MockDevice(hub_name = self.hub.ble_name, hub_id = self.hub.manufacturer_id)
181
        return [self.device]
182
183
    def run_mainloop_with(self, func):
184
        print("run mainloop")
185
        func()
186
187
class MockAdapter:
188
    def __init__(self):
189
        self.name = 'Mock adapter'
190
    def power_on(self):
191
        pass
192
193
    def start_scan(self):
194
        print("start scan called")
195
196
    def stop_scan(self):
197
        print("stop scan called")
198
199
class MockDevice:
200
    def __init__(self, hub_name, hub_id):
201
        self.advertised = [-1, -1, -1, -1, hub_id]
202
        self.id = 'XX:XX:XX:XX:XX:XX'
203
        self.name = hub_name
204
205
    def connect(self):
206
        print("device connect called")
207
208
    def discover(self, uart_uuid, char_uuid):
209
        print(f'discover called on uart {uart_uuid}, char {char_uuid}')
210
        self.uart_uuid = uart_uuid
211
        self.char = char_uuid
212
    
213
    def find_service(self, uart_uuid):
214
        self.uart = MockUart()
215
        return self.uart
216
217
    def disconnect(self):
218
        print('device disconnect called')
219
220
221
class MockUart:
222
    def __init__(self):
223
        pass
224
    def find_characteristic(self, char_uuid):
225
        self.char_uuid = char_uuid
226
        return self
227
228
    def start_notify(self, callback):
229
        # Spawn a task to do the attachments, etc
230
        self.notify = callback
231
232
    def write_value(self, values):
233
        print(f'received values: {values}')