GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Test Failed
Pull Request — master (#11)
by
unknown
01:53
created

bricknil.hub.Hub.peripheral_message_loop()   A

Complexity

Conditions 3

Size

Total Lines 17
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 9
nop 1
dl 0
loc 17
rs 9.95
c 0
b 0
f 0
1
# Copyright 2019 Virantha N. Ekanayake
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
#
7
# http://www.apache.org/licenses/LICENSE-2.0
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14
15
"""Hub processes for the Boost Move and PoweredUp hubs
16
17
"""
18
import uuid
19
from asyncio import sleep, Queue, CancelledError
20
from .process import Process
21
from .sensor.peripheral import Peripheral  # for type check
22
from .sockets import WebMessage
23
24
class UnknownPeripheralMessage(Exception): pass
25
class DifferentPeripheralOnPortError(Exception): pass
26
27
# noinspection SpellCheckingInspection
28
class Hub(Process):
29
    """Base class for all Lego hubs
30
31
       Arguments:
32
            name (str) : Human-readable name for this hub (for logging)
33
            query_port_info (bool) : Set to True if you want to query all the port information on a Hub (very communication intensive)
34
            ble_id (str) : BluetoothLE network(MAC) adddress to connect to (None if you want to connect to the first matching hub)
35
36
       Attributes:
37
38
            hubs (list [`Hub`]) : Class attr to keep track of all Hub (and subclasses) instances
39
            ble_handler (`BLEventQ`) : Hub's Bluetooth LE handling object
40
            peripheral_queue (`asyncio.Queue`) : Incoming messages from :class:`bricknil.ble_queue.BLEventQ`
41
            uart_uuid (`uuid.UUID`) : UUID broadcast by LEGO UARTs
42
            char_uuid (`uuid.UUID`) : Lego uses only one service characteristic for communicating with the UART services
43
            tx : Service characteristic for tx/rx messages that's set by :func:`bricknil.ble_queue.BLEventQ.connect`
44
            peripherals (dict) : Peripheral name => `bricknil.Peripheral`
45
            port_to_peripheral (dict): Port number(int) -> `bricknil.Peripheral`
46
            port_info (dict):  Keeps track of all the meta-data for each port.  Usually not populated unless `query_port_info` is true
47
48
    """
49
    hubs = []
50
51
    # noinspection SpellCheckingInspection,SpellCheckingInspection,SpellCheckingInspection,SpellCheckingInspection
52
    def __init__(self, name, query_port_info=False, ble_id=None):
53
        super().__init__(name)
54
        self.ble_id = ble_id
55
        self.ble_handler = None
56
        self.query_port_info = query_port_info
57
        self.uart_uuid = uuid.UUID('00001623-1212-efde-1623-785feabcd123')
58
        self.char_uuid = uuid.UUID('00001624-1212-efde-1623-785feabcd123')
59
        self.tx = None
60
        self.peripherals = {}  # attach_sensor method will add sensors to this
61
        self.port_to_peripheral = {}   # Quick mapping from a port number to a peripheral object
62
                                        # Only gets populated once the peripheral attaches itself physically
63
        self.peripheral_queue = Queue()  # Incoming messages from peripherals
64
65
        # Keep track of port info as we get messages from the hub ('update_port' messages)
66
        self.port_info = {}
67
68
        # Register this hub
69
        Hub.hubs.append(self)
70
71
        # Outgoing messages to web client
72
        # Assigned during system instantiaion before ble connect
73
        self.web_queue_out = None
74
75
        self.web_message = WebMessage(self)
76
77
78
    async def send_message(self, msg_name, msg_bytes, peripheral=None):
79
        """Send a message (command) to the hub.
80
81
        """
82
        while not self.tx:  # Need to make sure we have a handle to the uart
83
            await sleep(1)
84
        await self.ble_handler.send_message(self.tx, msg_bytes)
85
        if self.web_queue_out and peripheral:
86
            cls_name = peripheral.__class__.__name__
87
            await self.web_message.send(peripheral, msg_name)
88
            #await self.web_queue_out.put( f'{self.name}|{cls_name}|{peripheral.name}|{peripheral.port}|{msg_name}\r\n'.encode('utf-8') )
89
90
    async def recv_message(self, msg, data):
91
        """Receive and process message (notification) from the hub.
92
93
        """
94
        if msg == 'value_change':
95
            port, msg_bytes = data
96
            peripheral = self.port_to_peripheral[port]
97
            await peripheral.update_value(msg_bytes)
98
            self.message_debug(f'peripheral msg: {peripheral} {msg}')
99
            if self.web_queue_out:
100
                cls_name = peripheral.__class__.__name__
101
                if len(peripheral.capabilities) > 0:
102
                    for cap in peripheral.value:
103
                        await self.web_message.send(peripheral, f'value change mode: {cap.value} = {peripheral.value[cap]}')
104
                        #await self.web_queue_out.put( f'{self.name}|{cls_name}|{peripheral.name}|{peripheral.port}|value change mode: {cap.value} = {peripheral.value[cap]}\r\n'.encode('utf-8') )
105
            handler_name = f'{peripheral.name}_change'
106
            handler = getattr(self, handler_name)
107
            await handler()
108
        elif msg == 'attach':
109
            port, device_name = data
110
            peripheral = await self.connect_peripheral_to_port(device_name, port)
111
            if peripheral:
112
                self.message_debug(f'peripheral msg: {peripheral} {msg}')
113
                peripheral.message_handler = self.send_message
114
                await peripheral.activate_updates()
115
        elif msg == 'update_port':
116
            port, info = data
117
            self.port_info[port] = info
118
        elif msg.startswith('port'):
119
            port = data
120
            if self.query_port_info:
121
                await self._get_port_info(port, msg)
122
        else:
123
            raise UnknownPeripheralMessage
124
125
    async def peripheral_message_loop(self):
126
        """The main loop that receives messages from the :class:`bricknil.messages.Message` parser.
127
128
           Waits for messages on a Queue and dispatches to the appropriate peripheral handler.
129
        """
130
        try:
131
            self.message_debug(f'starting peripheral message loop')
132
133
            # Check if we have any hub button peripherals attached
134
            # - If so, we need to manually call peripheral.activate_updates()
135
            # - and then register the proper handler inside the message parser
136
            while True:
137
                msg = await self.peripheral_queue.get()
138
                msg, data = msg
139
                await self.recv_message(msg, data)
140
        except CancelledError:
141
            self.message(f'Terminating peripheral')
142
143
    async def connect_peripheral_to_port(self, device_name, port):
144
        """Set the port number of the newly attached peripheral
145
146
        When the hub gets an Attached I/O message on a new port with the device_name,
147
        this method is called to find the peripheral it should set this port to.  If
148
        the user has manually specified a port, then this function just validates that
149
        the peripheral name the user has specified on that port is the same as the one
150
        that just attached itself to the hub on that port.
151
152
        """
153
        # register the handler for this IO
154
        #  - Check the hub to see if there's a matching device or port
155
        for peripheral_name, peripheral in self.peripherals.items():
156
            if peripheral.port == port:
157
                if device_name == peripheral.sensor_name:
158
                    self.port_to_peripheral[port] = peripheral
159
                    return peripheral
160
                else:
161
                    raise DifferentPeripheralOnPortError
162
163
        # This port has not been reserved for a specific peripheral, so let's just
164
        # search for the first peripheral with a matching name and attach this port to it
165
        for peripheral_name, peripheral in self.peripherals.items():
166
            if peripheral.sensor_name == device_name and peripheral.port == None:
167
                peripheral.message(f"ASSIGNING PORT {port} on {peripheral.name}")
168
                peripheral.port = port
169
                self.port_to_peripheral[port] = peripheral
170
                return peripheral
171
172
        # User hasn't specified a matching peripheral, so just ignore this attachment
173
        return None
174
175
176
    def attach_sensor(self, sensor: Peripheral):
177
        """Add instance variable for this decorated sensor
178
179
           Called by the class decorator :class:`bricknil.bricknil.attach` when decorating the sensor
180
        """
181
        # Check that we don't already have a sensor with the same name attached
182
        assert sensor.name not in self.peripherals, f'Duplicate {sensor.name} found!'
183
        self.peripherals[sensor.name] = sensor
184
        # Put this sensor as an attribute
185
        setattr(self, sensor.name, sensor)
186
187
    async def _get_port_info(self, port, msg):
188
        """Utility function to query information on available ports and modes from a hub.
189
190
        """
191
        if msg == 'port_detected':
192
            # Request mode info
193
            b = [0x00, 0x21, port, 0x01]
194
            await self.send_message(f'req mode info on {port}', b)
195
        elif msg == 'port_combination_info_received':
196
            pass
197
        elif msg == 'port_mode_info_received':
198
            pass
199
        elif msg == 'port_info_received':
200
            # At this point we know all the available modes for this port
201
            # let's get the name and value format
202
            modes = self.port_info[port]['modes']
203
            if self.port_info[port].get('combinable', False):
204
                # Get combination info on port
205
                b = [0x00, 0x21, port, 0x02]
206
                await self.send_message(f'req mode combination info on {port}', b)
207
            for mode in modes.keys():
208
                info_types = { 'NAME': 0, 'VALUE FORMAT':0x80, 'RAW Range':0x01,
209
                        'PCT Range': 0x02, 'SI Range':0x03, 'Symbol':0x04,
210
                        'MAPPING': 0x05,
211
                        }
212
                # Send a message to requeust each type of info
213
                for k,v in info_types.items():
214
                    b = [0x00, 0x22, port, mode, v]
215
                    await self.send_message(f'req info({k}) on mode {mode} {port}', b)
216
217
218
class PoweredUpHub(Hub):
219
    """PoweredUp Hub class
220
    """
221
    def __init__(self, name, query_port_info=False, ble_id=None):
222
        super().__init__(name, query_port_info, ble_id)
223
        self.ble_name = 'HUB NO.4'
224
        self.manufacturer_id = 65
225
226
class PoweredUpRemote(Hub):
227
    """PoweredUp Remote class
228
    """
229
    def __init__(self, name, query_port_info=False, ble_id=None):
230
        super().__init__(name, query_port_info, ble_id)
231
        self.ble_name = 'Handset'
232
        self.manufacturer_id = 66
233
234
class BoostHub(Hub):
235
    """Boost Move Hub
236
    """
237
    def __init__(self, name, query_port_info=False, ble_id=None):
238
        super().__init__(name, query_port_info, ble_id)
239
        self.ble_name = 'LEGO Move Hub'
240
        self.manufacturer_id = 64
241
242
243
class DuploTrainHub(Hub):
244
    """Duplo Steam train and Cargo Train
245
246
       This is hub is found in Lego sets 10874 and 10875
247
    """
248
    def __init__(self, name, query_port_info=False, ble_id=None):
249
        super().__init__(name, query_port_info, ble_id)
250
        self.ble_name = 'Train Base'
251
        self.manufacturer_id = 32
252
253
254
class CPlusHub(Hub):
255
    """Technic Control+ Hub
256
    """
257
    def __init__(self, name, query_port_info=False, ble_id=None):
258
        super().__init__(name, query_port_info, ble_id)
259
        self.ble_name = "Control+ Hub"
260
        self.manufacturer_id = 128
261