GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Test Failed
Pull Request — master (#11)
by
unknown
01:53
created

bricknil.ble_queue.BLEventQ._check_devices_for()   B

Complexity

Conditions 6

Size

Total Lines 30
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 6
eloc 12
nop 5
dl 0
loc 30
rs 8.6666
c 0
b 0
f 0
1
# Copyright 2019 Virantha N. Ekanayake
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
#
7
# http://www.apache.org/licenses/LICENSE-2.0
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14
15
from asyncio import Queue, sleep, CancelledError
16
import sys, functools, uuid, bleak
17
18
from .sensor import Button # Hack! only to get the button sensor_id for the fake attach message
19
from .process import Process
20
from .message_dispatch import MessageDispatch
21
22
# Need a class to represent the bluetooth adapter provided
23
class BLEventQ(Process):
24
    """All bluetooth comms go through this object
25
26
       Provides interfaces to connect to a device/hub, send_messages to,
27
       and receive_messages from.
28
29
       All requests to send messages to the BLE device must be inserted into
30
       the :class:`bricknil.BLEventQ.q` Queue object.
31
32
    """
33
34
    def __init__(self):
35
        super().__init__('BLE Event Q')
36
        self.adapter = None
37
        # User needs to make sure adapter is powered up and on
38
        #    sudo hciconfig hci0 up
39
        self.hubs = {}
40
        self.devices = []
41
42
    async def disconnect(self):
43
        if len(self.devices) > 0:
44
            self.message(f'Terminating and disconnecting')
45
            for device in self.devices:
46
                await device.disconnect()
47
            self.devices = []
48
49
    async def send_message(self, characteristic, msg):
50
        """Prepends a byte with the length of the msg and writes it to
51
           the characteristic
52
53
           Arguments:
54
              characteristic : A tuple (device, uuid : str)
55
              msg (bytearray) : Message with header
56
        """
57
        # Message needs to have length prepended
58
        length = len(msg)+1
59
        values = bytearray([length]+msg)
60
        device, char_uuid = characteristic
61
        await device.write_gatt_char(char_uuid, values)
62
63
    async def get_messages(self, hub):
64
        """Instance a Message object to parse incoming messages and setup
65
           the callback from the characteristic to call Message.parse on the
66
           incoming data bytes
67
        """
68
        # Message instance to parse and handle messages from this hub
69
        msg_parser = MessageDispatch(hub)
70
71
        # Create a fake attach message on port 255, so that we can attach any instantiated Button listeners if present
72
        msg_parser.parse(bytearray([15, 0x00, 0x04,255, 1, Button._sensor_id, 0x00, 0,0,0,0, 0,0,0,0]))
73
74
        def bleak_received(sender, data):
75
            self.message_debug(f'Bleak Raw data received: {data}')
76
            msg = msg_parser.parse(data)
77
            self.message_debug('{0} Received: {1}'.format(hub.name, msg))
78
79
        device, char_uuid = hub.tx
80
        await device.start_notify(char_uuid, bleak_received)
81
82
83
    def _check_devices_for(self, devices, name, manufacturer_id, address):
84
        """Check if any of the devices match what we're looking for
85
86
           First, check to make sure the manufacturer_id matches.  If the
87
           manufacturer_id is not present in the BLE advertised data from the
88
           device, then fall back to the name (although this is unreliable because
89
           the name on the device can be changed by the user through the LEGO apps).
90
91
           Then, if address is supplied, only return a device with a matching id/name
92
           if it's BLE MAC address also agrees
93
94
           Returns:
95
              device : Matching device (None if no matches)
96
        """
97
        for device in devices:
98
            self.message(f'checking manufacturer ID for device named {device.name} for {name}')
99
            # Get the device manufacturer id from the advertised data if present
100
            if device.manufacturer_id == manufacturer_id or device.name == name:
101
                if not address:
102
                    return device
103
                else:
104
                    ble_address = device.address
105
                    if address == ble_address:
106
                        return device
107
                    else:
108
                        self.message(f'Address {ble_address} is not a match')
109
            else:
110
                self.message(f'No match for device with advertised data {device.manufacturer_id}')
111
112
        return None
113
114
    async def _ble_connect(self, uart_uuid, ble_name, ble_manufacturer_id, ble_id=None, timeout=60):
115
        """Connect to the underlying BLE device with the needed UART UUID
116
        """
117
        # Set hub.ble_id to a specific hub id if you want it to connect to a
118
        # particular hardware hub instance
119
        if ble_id:
120
            self.message_info(f'Looking for specific hub id {ble_id}')
121
        else:
122
            self.message_info(f'Looking for first matching hub')
123
124
        # Start discovery
125
126
        found = False
127
        while not found and timeout > 0:
128
            print('Awaiting on bleak discover')
129
            devices = await bleak.discover(timeout=1)
130
            print('Done Awaiting on bleak discover')
131
            # Filter out no-matching uuid
132
            devices = [d for d in devices if str(uart_uuid) in d.metadata['uuids']]
133
            # Now, extract the manufacturer_id
134
            for device in devices:
135
                assert len(device.metadata['manufacturer_data']) == 1
136
                data = next(iter(device.metadata['manufacturer_data'].values())) # Get the one and only key
137
                device.manufacturer_id = data[1]
138
139
            device = self._check_devices_for(devices, ble_name, ble_manufacturer_id,  ble_id)
140
            if device:
141
                self.device = device
142
                found = True
143
            else:
144
                self.message(f'Rescanning for {uart_uuid} ({timeout} tries left)')
145
                timeout -= 1
146
                self.device = None
147
                await sleep(1)
148
        if self.device is None:
149
            raise RuntimeError('Failed to find UART device!')
150
151
152
    async def connect(self, hub):
153
        # Connect the messaging queue for communication between self and the hub
154
        hub.ble_handler = self
155
        self.message(f'Starting scan for UART {hub.uart_uuid}')
156
157
        # HACK
158
        try:
159
            ble_id = uuid.UUID(hub.ble_id) if hub.ble_id else None
160
        except ValueError:
161
            # In case the user passed in a
162
            self.message_info(f"ble_id {hub.ble_id} is not a parseable UUID, so assuming it's a BLE network addresss")
163
            ble_id = hub.ble_id
164
165
        await self._ble_connect(hub.uart_uuid, hub.ble_name, hub.manufacturer_id, ble_id)
166
167
        self.message(f"found device {self.device.name}")
168
169
170
        device = bleak.BleakClient(address=self.device.address)
171
        self.devices.append(device)
172
        await device.connect()
173
174
        hub.ble_id = self.device.address
175
        self.message_info(f'Device advertised: {device.services.characteristics}')
176
        hub.tx = (device, hub.char_uuid)
177
        # Hack to fix device name on Windows
178
        if self.device.name == "Unknown" and hasattr(device._requester, 'Name'):
179
            self.device.name = device._requester.Name
180
181
        self.message_info(f"Connected to device {self.device.name}:{hub.ble_id}")
182
        self.hubs[hub.ble_id] = hub
183
184
        await self.get_messages(hub)
185
186
187
188