GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Test Failed
Push — master ( 64f1b1...0610d8 )
by Virantha
02:04
created

test_hub.MockBLE.run_mainloop_with()   A

Complexity

Conditions 1

Size

Total Lines 3
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 3
nop 2
dl 0
loc 3
rs 10
c 0
b 0
f 0
1
import pytest
2
import os, struct, copy
3
import logging
4
from asyncio import coroutine
5
from curio import kernel, sleep, spawn
6
7
from mock import Mock
8
from mock import patch, call, create_autospec
9
from mock import MagicMock
10
from mock import PropertyMock
11
12
from hypothesis import given, example, settings
13
from hypothesis import strategies as st
14
15
from bricknil.message_dispatch import MessageDispatch
16
from bricknil.messages import UnknownMessageError, HubPropertiesMessage
17
from bricknil.sensor import *
18
from bricknil.const import DEVICES
19
from bricknil import attach, start
20
from bricknil.hub import PoweredUpHub, Hub
21
import bricknil
22
23
class TestSensors:
24
25
    def setup(self):
26
        # Create the main dispatch
27
        self.hub = MagicMock()
28
        self.m = MessageDispatch(self.hub)
29
        self.sensor_list = [ CurrentSensor,
30
                             DuploSpeedSensor,
31
                             VisionSensor,
32
                             InternalTiltSensor,
33
                             ExternalMotionSensor,
34
                             ExternalTiltSensor,
35
                             RemoteButtons,
36
                             Button,
37
                             DuploVisionSensor,
38
                             VoltageSensor,
39
        ]
40
    
41
    def _with_header(self, msg:bytearray):
42
        l = len(msg)+2
43
        assert l<127
44
        return bytearray([l, 0]+list(msg))
45
46
    def _draw_capabilities(self, data, sensor):
47
        if len(sensor.allowed_combo) > 0:
48
            # test capabilities 1 by 1, 
49
            # or some combination of those in the allowed_combo list
50
            capabilities = data.draw(
51
                    st.one_of(
52
                        st.lists(st.sampled_from(sensor.capability), min_size=1, max_size=1),
53
                        st.lists(st.sampled_from(sensor.allowed_combo), min_size=1, unique=True)
54
                    )
55
                )
56
        else:
57
            # if no combos allowed, then just test 1 by 1
58
            capabilities = data.draw(st.lists(st.sampled_from(sensor.capability), min_size=1, max_size=1))
59
        return capabilities
60
61
62
    def _get_hub_class(self, sensor, sensor_name, capabilities):
63
        @attach(sensor, name=sensor_name, capabilities=capabilities)
64
        class TestHub(PoweredUpHub):
65
            async def sensor_change(self):
66
                pass
67
            async def run(self):
68
                pass
69
        return TestHub
70
71
    #@patch('bricknil.hub.PoweredUpHub', autospec=True, create=True)
72
    @given(data = st.data())
73
    def test_attach_sensor(self, data):
74
        
75
        sensor_name = 'sensor'
76
        sensor = data.draw(st.sampled_from(self.sensor_list))
77
        capabilities = self._draw_capabilities(data, sensor)
78
79
        TestHub = self._get_hub_class(sensor, sensor_name, capabilities)
80
        hub = TestHub('testhub')
81
        # Check to make sure we have the peripheral attached
82
        # and the sensor inserted as an attribute
83
        assert sensor_name in hub.peripherals
84
        assert hasattr(hub, sensor_name)
85
86
    @given(data = st.data())
87
    def test_run_hub(self, data):
88
89
        Hub.hubs = []
90
        sensor_name = 'sensor'
91
        sensor = data.draw(st.sampled_from(self.sensor_list))
92
        capabilities = self._draw_capabilities(data, sensor)
93
94
        TestHub = self._get_hub_class(sensor, sensor_name, capabilities)('testhub')
95
96
        # Start the hub
97
        #kernel.run(self._emit_control(TestHub))
98
99
        with patch('Adafruit_BluefruitLE.get_provider') as ble:
100
            ble.return_value = MockBLE()
101
            sensor_obj = getattr(TestHub, sensor_name)
102
            kernel.run(self._emit_control, TestHub, ble(), sensor_obj)
103
            #start(system)
104
105
    async def _emit_control(self, hub, ble, sensor):
106
        async def dummy():
107
            pass
108
        system = await spawn(bricknil.bricknil._run_all(ble, dummy))
109
        while not hub.peripheral_queue:
110
            await sleep(0.1)
111
        #await sleep(3)
112
        await hub.peripheral_queue.put( ('attach', (1, sensor.sensor_name)) )
113
        await system.join()
114
115
class MockBLE:
116
    def initialize(self):
117
        print("initialized")
118
    
119
    def clear_cached_data(self):
120
        pass
121
122
    def get_default_adapter(self):
123
        self.mock_adapter = MockAdapter()
124
        return self.mock_adapter
125
126
    def find_devices(self, service_uuids):
127
        self.device = MockDevice()
128
        return [self.device]
129
130
    def run_mainloop_with(self, func):
131
        print("run mainloop")
132
        func()
133
134
class MockAdapter:
135
    def __init__(self):
136
        self.name = 'Mock adapter'
137
    def power_on(self):
138
        pass
139
140
    def start_scan(self):
141
        print("start scan called")
142
143
    def stop_scan(self):
144
        print("stop scan called")
145
146
class MockDevice:
147
    def __init__(self):
148
        self.advertised = [-1, -1, -1, -1, 65]
149
        self.id = 'XX:XX:XX:XX:XX:XX'
150
        self.name = 'HUB No.4'
151
152
    def connect(self):
153
        print("device connect called")
154
155
    def discover(self, uart_uuid, char_uuid):
156
        print(f'discover called on uart {uart_uuid}, char {char_uuid}')
157
        self.uart_uuid = uart_uuid
158
        self.char = char_uuid
159
    
160
    def find_service(self, uart_uuid):
161
        self.uart = MockUart()
162
        return self.uart
163
164
    def disconnect(self):
165
        print('device disconnect called')
166
167
168
class MockUart:
169
    def __init__(self):
170
        pass
171
    def find_characteristic(self, char_uuid):
172
        self.char_uuid = char_uuid
173
        return self
174
175
    def start_notify(self, callback):
176
        # Spawn a task to do the attachments, etc
177
        self.notify = callback
178
179
    def write_value(self, values):
180
        print(f'received values: {values}')