GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Passed
Push — master ( a3394c...b99246 )
by Virantha
01:31
created

bricknil.hub.Hub.send_message()   A

Complexity

Conditions 4

Size

Total Lines 12
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 4
eloc 7
nop 4
dl 0
loc 12
rs 10
c 0
b 0
f 0
1
# Copyright 2019 Virantha N. Ekanayake 
2
# 
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
# 
7
# http://www.apache.org/licenses/LICENSE-2.0
8
# 
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14
15
"""Hub processes for the Boost Move and PoweredUp hubs
16
17
"""
18
import uuid
19
from curio import sleep, UniversalQueue, CancelledError
20
from .process import Process
21
from .sensor.peripheral import Peripheral  # for type check
22
from .const import USE_BLEAK
23
24
class UnknownPeripheralMessage(Exception): pass
25
class DifferentPeripheralOnPortError(Exception): pass
26
27
# noinspection SpellCheckingInspection
28
class Hub(Process):
29
    """Base class for all Lego hubs
30
31
       Arguments:
32
            name (str) : Human-readable name for this hub (for logging)
33
            query_port_info (bool) : Set to True if you want to query all the port information on a Hub (very communication intensive)
34
            ble_id (str) : BluetoothLE network(MAC) adddress to connect to (None if you want to connect to the first matching hub)
35
36
       Attributes:
37
      
38
            hubs (list [`Hub`]) : Class attr to keep track of all Hub (and subclasses) instances
39
            message_queue (`curio.Queue`) : Outgoing message queue to :class:`bricknil.ble_queue.BLEventQ`
40
            peripheral_queue (`curio.UniversalQueue`) : Incoming messages from :class:`bricknil.ble_queue.BLEventQ`
41
            uart_uuid (`uuid.UUID`) : UUID broadcast by LEGO UARTs
42
            char_uuid (`uuid.UUID`) : Lego uses only one service characteristic for communicating with the UART services
43
            tx : Service characteristic for tx/rx messages that's set by :func:`bricknil.ble_queue.BLEventQ.connect`
44
            peripherals (dict) : Peripheral name => `bricknil.Peripheral`
45
            port_to_peripheral (dict): Port number(int) -> `bricknil.Peripheral`
46
            port_info (dict):  Keeps track of all the meta-data for each port.  Usually not populated unless `query_port_info` is true
47
48
    """
49
    hubs = []
50
51
    # noinspection SpellCheckingInspection,SpellCheckingInspection,SpellCheckingInspection,SpellCheckingInspection
52
    def __init__(self, name, query_port_info=False, ble_id=None):
53
        super().__init__(name)
54
        self.ble_id = ble_id
55
        self.query_port_info = query_port_info
56
        self.message_queue = None
57
        self.uart_uuid = uuid.UUID('00001623-1212-efde-1623-785feabcd123')
58
        self.char_uuid = uuid.UUID('00001624-1212-efde-1623-785feabcd123')
59
        self.tx = None
60
        self.peripherals = {}  # attach_sensor method will add sensors to this
61
        self.port_to_peripheral = {}   # Quick mapping from a port number to a peripheral object
62
                                        # Only gets populated once the peripheral attaches itself physically
63
        self.peripheral_queue = UniversalQueue()  # Incoming messages from peripherals
64
65
        # Keep track of port info as we get messages from the hub ('update_port' messages)
66
        self.port_info = {}
67
68
        # Register this hub
69
        Hub.hubs.append(self)
70
71
        # Outgoing messages to web client
72
        # Assigned during system instantiaion before ble connect
73
        self.web_queue_out = None
74
75
76
    async def send_message(self, msg_name, msg_bytes, peripheral=None):
77
        """Insert a message to the hub into the queue(:func:`bricknil.hub.Hub.message_queue`) connected to our BLE
78
           interface
79
80
        """
81
82
        while not self.tx:  # Need to make sure we have a handle to the uart
83
            await sleep(1)
84
        await self.message_queue.put((msg_name, self, msg_bytes))
85
        if self.web_queue_out and peripheral:
86
            cls_name = peripheral.__class__.__name__
87
            await self.web_queue_out.put( f'{self.name}|{cls_name}|{peripheral.name}|{peripheral.port}|{msg_name}\r\n'.encode('utf-8') )
88
89
    async def peripheral_message_loop(self):
90
        """The main loop that receives messages from the :class:`bricknil.messages.Message` parser.
91
92
           Waits for messages on a UniversalQueue and dispatches to the appropriate peripheral handler.
93
        """
94
        try:
95
            self.message_debug(f'starting peripheral message loop')
96
97
            # Check if we have any hub button peripherals attached
98
            # - If so, we need to manually call peripheral.activate_updates()
99
            # - and then register the proper handler inside the message parser
100
            while True:
101
                msg = await self.peripheral_queue.get()
102
                msg, data = msg
103
                await self.peripheral_queue.task_done()
104
                if msg == 'value_change':
105
                    port, msg_bytes = data
106
                    peripheral = self.port_to_peripheral[port]
107
                    await peripheral.update_value(msg_bytes)
108
                    self.message_debug(f'peripheral msg: {peripheral} {msg}')
109
                    if self.web_queue_out:
110
                        cls_name = peripheral.__class__.__name__
111
                        if len(peripheral.capabilities) > 0:
112
                            for cap in peripheral.value:
113
                                await self.web_queue_out.put( f'{self.name}|{cls_name}|{peripheral.name}|{peripheral.port}|value change mode: {cap.value} = {peripheral.value[cap]}\r\n'.encode('utf-8') )
114
                    handler_name = f'{peripheral.name}_change'
115
                    handler = getattr(self, handler_name)
116
                    await handler()
117
                elif msg == 'attach':
118
                    port, device_name = data
119
                    peripheral = await self.connect_peripheral_to_port(device_name, port)
120
                    if peripheral:
121
                        self.message_debug(f'peripheral msg: {peripheral} {msg}')
122
                        peripheral.message_handler = self.send_message
123
                        await peripheral.activate_updates()
124
                elif msg == 'update_port':
125
                    port, info = data
126
                    self.port_info[port] = info
127
                elif msg.startswith('port'):
128
                    port = data
129
                    if self.query_port_info:
130
                        await self._get_port_info(port, msg)
131
                else:
132
                    raise UnknownPeripheralMessage
133
                    
134
135
        except CancelledError:
136
            self.message(f'Terminating peripheral')
137
138
    async def connect_peripheral_to_port(self, device_name, port):
139
        """Set the port number of the newly attached peripheral
140
        
141
        When the hub gets an Attached I/O message on a new port with the device_name,
142
        this method is called to find the peripheral it should set this port to.  If
143
        the user has manually specified a port, then this function just validates that
144
        the peripheral name the user has specified on that port is the same as the one
145
        that just attached itself to the hub on that port.
146
147
        """
148
        # register the handler for this IO
149
        #  - Check the hub to see if there's a matching device or port
150
        for peripheral_name, peripheral in self.peripherals.items():
151
            if peripheral.port == port:
152
                if device_name == peripheral.sensor_name:
153
                    self.port_to_peripheral[port] = peripheral
154
                    return peripheral
155
                else:
156
                    raise DifferentPeripheralOnPortError
157
158
        # This port has not been reserved for a specific peripheral, so let's just 
159
        # search for the first peripheral with a matching name and attach this port to it
160
        for peripheral_name, peripheral in self.peripherals.items():
161
            if peripheral.sensor_name == device_name and peripheral.port == None:
162
                peripheral.message(f"ASSIGNING PORT {port} on {peripheral.name}")
163
                peripheral.port = port
164
                self.port_to_peripheral[port] = peripheral
165
                return peripheral
166
167
        # User hasn't specified a matching peripheral, so just ignore this attachment
168
        return None
169
170
171
    def attach_sensor(self, sensor: Peripheral):
172
        """Add instance variable for this decorated sensor
173
174
           Called by the class decorator :class:`bricknil.bricknil.attach` when decorating the sensor
175
        """
176
        # Check that we don't already have a sensor with the same name attached
177
        assert sensor.name not in self.peripherals, f'Duplicate {sensor.name} found!'
178
        self.peripherals[sensor.name] = sensor
179
        # Put this sensor as an attribute
180
        setattr(self, sensor.name, sensor)
181
182
    async def _get_port_info(self, port, msg):
183
        """Utility function to query information on available ports and modes from a hub.
184
           
185
        """
186
        if msg == 'port_detected':
187
            # Request mode info
188
            b = [0x00, 0x21, port, 0x01]
189
            await self.send_message(f'req mode info on {port}', b)
190
        elif msg == 'port_combination_info_received':
191
            pass
192
        elif msg == 'port_mode_info_received':
193
            pass
194
        elif msg == 'port_info_received':
195
            # At this point we know all the available modes for this port
196
            # let's get the name and value format
197
            modes = self.port_info[port]['modes']
198
            if self.port_info[port].get('combinable', False):
199
                # Get combination info on port
200
                b = [0x00, 0x21, port, 0x02]
201
                await self.send_message(f'req mode combination info on {port}', b)
202
            for mode in modes.keys():
203
                info_types = { 'NAME': 0, 'VALUE FORMAT':0x80, 'RAW Range':0x01,
204
                        'PCT Range': 0x02, 'SI Range':0x03, 'Symbol':0x04,
205
                        'MAPPING': 0x05,
206
                        }
207
                # Send a message to requeust each type of info 
208
                for k,v in info_types.items():
209
                    b = [0x00, 0x22, port, mode, v]
210
                    await self.send_message(f'req info({k}) on mode {mode} {port}', b)
211
212
213
class PoweredUpHub(Hub):
214
    """PoweredUp Hub class 
215
    """
216
    def __init__(self, name, query_port_info=False, ble_id=None):
217
        super().__init__(name, query_port_info, ble_id)
218
        self.ble_name = 'HUB NO.4'
219
        self.manufacturer_id = 65
220
221
class PoweredUpRemote(Hub):
222
    """PoweredUp Remote class 
223
    """
224
    def __init__(self, name, query_port_info=False, ble_id=None):
225
        super().__init__(name, query_port_info, ble_id)
226
        self.ble_name = 'Handset'
227
        self.manufacturer_id = 66
228
229
class BoostHub(Hub):
230
    """Boost Move Hub
231
    """
232
    def __init__(self, name, query_port_info=False, ble_id=None):
233
        super().__init__(name, query_port_info, ble_id)
234
        self.ble_name = 'LEGO Move Hub'
235
        self.manufacturer_id = 64
236
237
238
class DuploTrainHub(Hub):
239
    """Duplo Steam train and Cargo Train
240
241
       This is hub is found in Lego sets 10874 and 10875
242
    """
243
    def __init__(self, name, query_port_info=False, ble_id=None):
244
        super().__init__(name, query_port_info, ble_id)
245
        self.ble_name = 'Train Base'
246
        self.manufacturer_id = 32
247