GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.

bricknil.hub   A
last analyzed

Complexity

Total Complexity 38

Size/Duplication

Total Lines 261
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 134
dl 0
loc 261
rs 9.36
c 0
b 0
f 0
wmc 38

11 Methods

Rating   Name   Duplication   Size   Complexity  
A BoostHub.__init__() 0 4 1
B Hub._get_port_info() 0 29 8
A Hub.send_message() 0 12 4
A Hub.__init__() 0 24 1
B Hub.connect_peripheral_to_port() 0 31 7
A PoweredUpRemote.__init__() 0 4 1
A Hub.attach_sensor() 0 10 1
A DuploTrainHub.__init__() 0 4 1
D Hub.peripheral_message_loop() 0 49 12
A PoweredUpHub.__init__() 0 4 1
A CPlusHub.__init__() 0 4 1
1
# Copyright 2019 Virantha N. Ekanayake 
2
# 
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
# 
7
# http://www.apache.org/licenses/LICENSE-2.0
8
# 
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14
15
"""Hub processes for the Boost Move and PoweredUp hubs
16
17
"""
18
import uuid
19
from curio import sleep, UniversalQueue, CancelledError
20
from .process import Process
21
from .sensor.peripheral import Peripheral  # for type check
22
from .const import USE_BLEAK
23
from .sockets import WebMessage
24
25
class UnknownPeripheralMessage(Exception): pass
26
class DifferentPeripheralOnPortError(Exception): pass
27
28
# noinspection SpellCheckingInspection
29
class Hub(Process):
30
    """Base class for all Lego hubs
31
32
       Arguments:
33
            name (str) : Human-readable name for this hub (for logging)
34
            query_port_info (bool) : Set to True if you want to query all the port information on a Hub (very communication intensive)
35
            ble_id (str) : BluetoothLE network(MAC) adddress to connect to (None if you want to connect to the first matching hub)
36
37
       Attributes:
38
      
39
            hubs (list [`Hub`]) : Class attr to keep track of all Hub (and subclasses) instances
40
            message_queue (`curio.Queue`) : Outgoing message queue to :class:`bricknil.ble_queue.BLEventQ`
41
            peripheral_queue (`curio.UniversalQueue`) : Incoming messages from :class:`bricknil.ble_queue.BLEventQ`
42
            uart_uuid (`uuid.UUID`) : UUID broadcast by LEGO UARTs
43
            char_uuid (`uuid.UUID`) : Lego uses only one service characteristic for communicating with the UART services
44
            tx : Service characteristic for tx/rx messages that's set by :func:`bricknil.ble_queue.BLEventQ.connect`
45
            peripherals (dict) : Peripheral name => `bricknil.Peripheral`
46
            port_to_peripheral (dict): Port number(int) -> `bricknil.Peripheral`
47
            port_info (dict):  Keeps track of all the meta-data for each port.  Usually not populated unless `query_port_info` is true
48
49
    """
50
    hubs = []
51
52
    # noinspection SpellCheckingInspection,SpellCheckingInspection,SpellCheckingInspection,SpellCheckingInspection
53
    def __init__(self, name, query_port_info=False, ble_id=None):
54
        super().__init__(name)
55
        self.ble_id = ble_id
56
        self.query_port_info = query_port_info
57
        self.message_queue = None
58
        self.uart_uuid = uuid.UUID('00001623-1212-efde-1623-785feabcd123')
59
        self.char_uuid = uuid.UUID('00001624-1212-efde-1623-785feabcd123')
60
        self.tx = None
61
        self.peripherals = {}  # attach_sensor method will add sensors to this
62
        self.port_to_peripheral = {}   # Quick mapping from a port number to a peripheral object
63
                                        # Only gets populated once the peripheral attaches itself physically
64
        self.peripheral_queue = UniversalQueue()  # Incoming messages from peripherals
65
66
        # Keep track of port info as we get messages from the hub ('update_port' messages)
67
        self.port_info = {}
68
69
        # Register this hub
70
        Hub.hubs.append(self)
71
72
        # Outgoing messages to web client
73
        # Assigned during system instantiaion before ble connect
74
        self.web_queue_out = None
75
76
        self.web_message = WebMessage(self)
77
78
79
    async def send_message(self, msg_name, msg_bytes, peripheral=None):
80
        """Insert a message to the hub into the queue(:func:`bricknil.hub.Hub.message_queue`) connected to our BLE
81
           interface
82
83
        """
84
85
        while not self.tx:  # Need to make sure we have a handle to the uart
86
            await sleep(1)
87
        await self.message_queue.put((msg_name, self, msg_bytes))
88
        if self.web_queue_out and peripheral:
89
            cls_name = peripheral.__class__.__name__
90
            await self.web_message.send(peripheral, msg_name)
91
            #await self.web_queue_out.put( f'{self.name}|{cls_name}|{peripheral.name}|{peripheral.port}|{msg_name}\r\n'.encode('utf-8') )
92
93
    async def peripheral_message_loop(self):
94
        """The main loop that receives messages from the :class:`bricknil.messages.Message` parser.
95
96
           Waits for messages on a UniversalQueue and dispatches to the appropriate peripheral handler.
97
        """
98
        try:
99
            self.message_debug(f'starting peripheral message loop')
100
101
            # Check if we have any hub button peripherals attached
102
            # - If so, we need to manually call peripheral.activate_updates()
103
            # - and then register the proper handler inside the message parser
104
            while True:
105
                msg = await self.peripheral_queue.get()
106
                msg, data = msg
107
                await self.peripheral_queue.task_done()
108
                if msg == 'value_change':
109
                    port, msg_bytes = data
110
                    peripheral = self.port_to_peripheral[port]
111
                    await peripheral.update_value(msg_bytes)
112
                    self.message_debug(f'peripheral msg: {peripheral} {msg}')
113
                    if self.web_queue_out:
114
                        cls_name = peripheral.__class__.__name__
115
                        if len(peripheral.capabilities) > 0:
116
                            for cap in peripheral.value:
117
                                await self.web_message.send(peripheral, f'value change mode: {cap.value} = {peripheral.value[cap]}')
118
                                #await self.web_queue_out.put( f'{self.name}|{cls_name}|{peripheral.name}|{peripheral.port}|value change mode: {cap.value} = {peripheral.value[cap]}\r\n'.encode('utf-8') )
119
                    handler_name = f'{peripheral.name}_change'
120
                    handler = getattr(self, handler_name)
121
                    await handler()
122
                elif msg == 'attach':
123
                    port, device_name = data
124
                    peripheral = await self.connect_peripheral_to_port(device_name, port)
125
                    if peripheral:
126
                        self.message_debug(f'peripheral msg: {peripheral} {msg}')
127
                        peripheral.message_handler = self.send_message
128
                        await peripheral.activate_updates()
129
                elif msg == 'update_port':
130
                    port, info = data
131
                    self.port_info[port] = info
132
                elif msg.startswith('port'):
133
                    port = data
134
                    if self.query_port_info:
135
                        await self._get_port_info(port, msg)
136
                else:
137
                    raise UnknownPeripheralMessage
138
                    
139
140
        except CancelledError:
141
            self.message(f'Terminating peripheral')
142
143
    async def connect_peripheral_to_port(self, device_name, port):
144
        """Set the port number of the newly attached peripheral
145
        
146
        When the hub gets an Attached I/O message on a new port with the device_name,
147
        this method is called to find the peripheral it should set this port to.  If
148
        the user has manually specified a port, then this function just validates that
149
        the peripheral name the user has specified on that port is the same as the one
150
        that just attached itself to the hub on that port.
151
152
        """
153
        # register the handler for this IO
154
        #  - Check the hub to see if there's a matching device or port
155
        for peripheral_name, peripheral in self.peripherals.items():
156
            if peripheral.port == port:
157
                if device_name == peripheral.sensor_name:
158
                    self.port_to_peripheral[port] = peripheral
159
                    return peripheral
160
                else:
161
                    raise DifferentPeripheralOnPortError
162
163
        # This port has not been reserved for a specific peripheral, so let's just 
164
        # search for the first peripheral with a matching name and attach this port to it
165
        for peripheral_name, peripheral in self.peripherals.items():
166
            if peripheral.sensor_name == device_name and peripheral.port == None:
167
                peripheral.message(f"ASSIGNING PORT {port} on {peripheral.name}")
168
                peripheral.port = port
169
                self.port_to_peripheral[port] = peripheral
170
                return peripheral
171
172
        # User hasn't specified a matching peripheral, so just ignore this attachment
173
        return None
174
175
176
    def attach_sensor(self, sensor: Peripheral):
177
        """Add instance variable for this decorated sensor
178
179
           Called by the class decorator :class:`bricknil.bricknil.attach` when decorating the sensor
180
        """
181
        # Check that we don't already have a sensor with the same name attached
182
        assert sensor.name not in self.peripherals, f'Duplicate {sensor.name} found!'
183
        self.peripherals[sensor.name] = sensor
184
        # Put this sensor as an attribute
185
        setattr(self, sensor.name, sensor)
186
187
    async def _get_port_info(self, port, msg):
188
        """Utility function to query information on available ports and modes from a hub.
189
           
190
        """
191
        if msg == 'port_detected':
192
            # Request mode info
193
            b = [0x00, 0x21, port, 0x01]
194
            await self.send_message(f'req mode info on {port}', b)
195
        elif msg == 'port_combination_info_received':
196
            pass
197
        elif msg == 'port_mode_info_received':
198
            pass
199
        elif msg == 'port_info_received':
200
            # At this point we know all the available modes for this port
201
            # let's get the name and value format
202
            modes = self.port_info[port]['modes']
203
            if self.port_info[port].get('combinable', False):
204
                # Get combination info on port
205
                b = [0x00, 0x21, port, 0x02]
206
                await self.send_message(f'req mode combination info on {port}', b)
207
            for mode in modes.keys():
208
                info_types = { 'NAME': 0, 'VALUE FORMAT':0x80, 'RAW Range':0x01,
209
                        'PCT Range': 0x02, 'SI Range':0x03, 'Symbol':0x04,
210
                        'MAPPING': 0x05,
211
                        }
212
                # Send a message to requeust each type of info 
213
                for k,v in info_types.items():
214
                    b = [0x00, 0x22, port, mode, v]
215
                    await self.send_message(f'req info({k}) on mode {mode} {port}', b)
216
217
218
class PoweredUpHub(Hub):
219
    """PoweredUp Hub class 
220
    """
221
    def __init__(self, name, query_port_info=False, ble_id=None):
222
        super().__init__(name, query_port_info, ble_id)
223
        self.ble_name = 'HUB NO.4'
224
        self.manufacturer_id = 65
225
226
class PoweredUpRemote(Hub):
227
    """PoweredUp Remote class 
228
    """
229
    def __init__(self, name, query_port_info=False, ble_id=None):
230
        super().__init__(name, query_port_info, ble_id)
231
        self.ble_name = 'Handset'
232
        self.manufacturer_id = 66
233
234
class BoostHub(Hub):
235
    """Boost Move Hub
236
    """
237
    def __init__(self, name, query_port_info=False, ble_id=None):
238
        super().__init__(name, query_port_info, ble_id)
239
        self.ble_name = 'LEGO Move Hub'
240
        self.manufacturer_id = 64
241
242
243
class DuploTrainHub(Hub):
244
    """Duplo Steam train and Cargo Train
245
246
       This is hub is found in Lego sets 10874 and 10875
247
    """
248
    def __init__(self, name, query_port_info=False, ble_id=None):
249
        super().__init__(name, query_port_info, ble_id)
250
        self.ble_name = 'Train Base'
251
        self.manufacturer_id = 32
252
253
254
class CPlusHub(Hub):
255
    """Technic Control+ Hub
256
    """
257
    def __init__(self, name, query_port_info=False, ble_id=None):
258
        super().__init__(name, query_port_info, ble_id)
259
        self.ble_name = "Control+ Hub"
260
        self.manufacturer_id = 128
261