GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Passed
Push — master ( bd33de...cf9674 )
by Virantha
01:33
created

bricknil.hub.Hub.peripheral_message_loop()   C

Complexity

Conditions 9

Size

Total Lines 43
Code Lines 32

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 9
eloc 32
nop 1
dl 0
loc 43
rs 6.6666
c 0
b 0
f 0
1
# Copyright 2019 Virantha N. Ekanayake 
2
# 
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
# 
7
# http://www.apache.org/licenses/LICENSE-2.0
8
# 
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14
15
"""Hub processes for the Boost Move and PoweredUp hubs
16
17
"""
18
import uuid
19
from curio import sleep, UniversalQueue, CancelledError
20
from .process import Process
21
from .peripheral import Peripheral  # for type check
22
from .const import USE_BLEAK
23
24
class UnknownPeripheralMessage(Exception): pass
25
class DifferentPeripheralOnPortError(Exception): pass
26
27
# noinspection SpellCheckingInspection
28
class Hub(Process):
29
    """Base class for all Lego hubs
30
31
       Arguments:
32
            name (str) : Human-readable name for this hub (for logging)
33
            query_port_info (bool) : Set to True if you want to query all the port information on a Hub (very communication intensive)
34
            ble_id (str) : BluetoothLE network(MAC) adddress to connect to (None if you want to connect to the first matching hub)
35
36
       Attributes:
37
      
38
            hubs (list [`Hub`]) : Class attr to keep track of all Hub (and subclasses) instances
39
            message_queue (`curio.Queue`) : Outgoing message queue to :class:`bricknil.ble_queue.BLEventQ`
40
            peripheral_queue (`curio.UniversalQueue`) : Incoming messages from :class:`bricknil.ble_queue.BLEventQ`
41
            uart_uuid (`uuid.UUID`) : UUID broadcast by LEGO UARTs
42
            char_uuid (`uuid.UUID`) : Lego uses only one service characteristic for communicating with the UART services
43
            tx : Service characteristic for tx/rx messages that's set by :func:`bricknil.ble_queue.BLEventQ.connect`
44
            peripherals (dict) : Peripheral name => `bricknil.Peripheral`
45
            port_to_peripheral (dict): Port number(int) -> `bricknil.Peripheral`
46
            port_info (dict):  Keeps track of all the meta-data for each port.  Usually not populated unless `query_port_info` is true
47
48
    """
49
    hubs = []
50
51
    # noinspection SpellCheckingInspection,SpellCheckingInspection,SpellCheckingInspection,SpellCheckingInspection
52
    def __init__(self, name, query_port_info=False, ble_id=None):
53
        super().__init__(name)
54
        self.ble_id = ble_id
55
        self.query_port_info = query_port_info
56
        self.message_queue = None
57
        self.uart_uuid = uuid.UUID('00001623-1212-efde-1623-785feabcd123')
58
        self.char_uuid = uuid.UUID('00001624-1212-efde-1623-785feabcd123')
59
        self.tx = None
60
        self.peripherals = {}  # attach_sensor method will add sensors to this
61
        self.port_to_peripheral = {}   # Quick mapping from a port number to a peripheral object
62
                                        # Only gets populated once the peripheral attaches itself physically
63
        self.peripheral_queue = UniversalQueue()  # Incoming messages from peripherals
64
65
        # Keep track of port info as we get messages from the hub ('update_port' messages)
66
        self.port_info = {}
67
68
        # Register this hub
69
        Hub.hubs.append(self)
70
71
    async def send_message(self, msg_name, msg_bytes):
72
        """Insert a message to the hub into the queue(:func:`bricknil.hub.Hub.message_queue`) connected to our BLE
73
           interface
74
75
        """
76
77
        while not self.tx:  # Need to make sure we have a handle to the uart
78
            await sleep(1)
79
        await self.message_queue.put((msg_name, self, msg_bytes))
80
81
    async def peripheral_message_loop(self):
82
        """The main loop that receives messages from the :class:`bricknil.messages.Message` parser.
83
84
           Waits for messages on a UniversalQueue and dispatches to the appropriate peripheral handler.
85
        """
86
        try:
87
            self.message_debug(f'starting peripheral message loop')
88
89
            # Check if we have any hub button peripherals attached
90
            # - If so, we need to manually call peripheral.activate_updates()
91
            # - and then register the proper handler inside the message parser
92
            while True:
93
                msg = await self.peripheral_queue.get()
94
                msg, data = msg
95
                await self.peripheral_queue.task_done()
96
                if msg == 'value_change':
97
                    port, msg_bytes = data
98
                    peripheral = self.port_to_peripheral[port]
99
                    await peripheral.update_value(msg_bytes)
100
                    self.message_debug(f'peripheral msg: {peripheral} {msg}')
101
                    handler_name = f'{peripheral.name}_change'
102
                    handler = getattr(self, handler_name)
103
                    await handler()
104
                elif msg == 'attach':
105
                    port, device_name = data
106
                    peripheral = await self.connect_peripheral_to_port(device_name, port)
107
                    if peripheral:
108
                        self.message_debug(f'peripheral msg: {peripheral} {msg}')
109
                        peripheral.message_handler = self.send_message
110
                        await peripheral.activate_updates()
111
                elif msg == 'update_port':
112
                    port, info = data
113
                    self.port_info[port] = info
114
                elif msg.startswith('port'):
115
                    port = data
116
                    if self.query_port_info:
117
                        await self._get_port_info(port, msg)
118
                else:
119
                    raise UnknownPeripheralMessage
120
                    
121
122
        except CancelledError:
123
            self.message(f'Terminating peripheral')
124
125
    async def connect_peripheral_to_port(self, device_name, port):
126
        """Set the port number of the newly attached peripheral
127
        
128
        When the hub gets an Attached I/O message on a new port with the device_name,
129
        this method is called to find the peripheral it should set this port to.  If
130
        the user has manually specified a port, then this function just validates that
131
        the peripheral name the user has specified on that port is the same as the one
132
        that just attached itself to the hub on that port.
133
134
        """
135
        # register the handler for this IO
136
        #  - Check the hub to see if there's a matching device or port
137
        for peripheral_name, peripheral in self.peripherals.items():
138
            if peripheral.port == port:
139
                if device_name == peripheral.sensor_name:
140
                    return peripheral
141
                else:
142
                    raise DifferentPeripheralOnPortError
143
144
        # This port has not been reserved for a specific peripheral, so let's just 
145
        # search for the first peripheral with a matching name and attach this port to it
146
        for peripheral_name, peripheral in self.peripherals.items():
147
            if peripheral.sensor_name == device_name and peripheral.port == None:
148
                peripheral.message(f"ASSIGNING PORT {port} on {peripheral.name}")
149
                peripheral.port = port
150
                self.port_to_peripheral[port] = peripheral
151
                return peripheral
152
153
        # User hasn't specified a matching peripheral, so just ignore this attachment
154
        return None
155
156
157
    def attach_sensor(self, sensor: Peripheral):
158
        """Add instance variable for this decorated sensor
159
160
           Called by the class decorator :class:`bricknil.bricknil.attach` when decorating the sensor
161
        """
162
        # Check that we don't already have a sensor with the same name attached
163
        assert sensor.name not in self.peripherals, f'Duplicate {sensor.name} found!'
164
        self.peripherals[sensor.name] = sensor
165
        # Put this sensor as an attribute
166
        setattr(self, sensor.name, sensor)
167
168
    async def _get_port_info(self, port, msg):
169
        """Utility function to query information on available ports and modes from a hub.
170
           
171
        """
172
        if msg == 'port_detected':
173
            # Request mode info
174
            b = [0x00, 0x21, port, 0x01]
175
            await self.send_message(f'req mode info on {port}', b)
176
        elif msg == 'port_combination_info_received':
177
            pass
178
        elif msg == 'port_mode_info_received':
179
            pass
180
        elif msg == 'port_info_received':
181
            # At this point we know all the available modes for this port
182
            # let's get the name and value format
183
            modes = self.port_info[port]['modes']
184
            if self.port_info[port].get('combinable', False):
185
                # Get combination info on port
186
                b = [0x00, 0x21, port, 0x02]
187
                await self.send_message(f'req mode combination info on {port}', b)
188
            for mode in modes.keys():
189
                info_types = { 'NAME': 0, 'VALUE FORMAT':0x80, 'RAW Range':0x01,
190
                        'PCT Range': 0x02, 'SI Range':0x03, 'Symbol':0x04,
191
                        'MAPPING': 0x05,
192
                        }
193
                # Send a message to requeust each type of info 
194
                for k,v in info_types.items():
195
                    b = [0x00, 0x22, port, mode, v]
196
                    await self.send_message(f'req info({k}) on mode {mode} {port}', b)
197
198
199
class PoweredUpHub(Hub):
200
    """PoweredUp Hub class 
201
    """
202
    def __init__(self, name, query_port_info=False, ble_id=None):
203
        super().__init__(name, query_port_info, ble_id)
204
        self.ble_name = 'HUB NO.4'
205
        self.manufacturer_id = 65
206
207
class PoweredUpRemote(Hub):
208
    """PoweredUp Remote class 
209
    """
210
    def __init__(self, name, query_port_info=False, ble_id=None):
211
        super().__init__(name, query_port_info, ble_id)
212
        self.ble_name = 'Handset'
213
        self.manufacturer_id = 66
214
215
class BoostHub(Hub):
216
    """Boost Move Hub
217
    """
218
    def __init__(self, name, query_port_info=False, ble_id=None):
219
        super().__init__(name, query_port_info, ble_id)
220
        self.ble_name = 'LEGO Move Hub'
221
        self.manufacturer_id = 64
222
223
224
class DuploTrainHub(Hub):
225
    """Duplo Steam train and Cargo Train
226
227
       This is hub is found in Lego sets 10874 and 10875
228
    """
229
    def __init__(self, name, query_port_info=False, ble_id=None):
230
        super().__init__(name, query_port_info, ble_id)
231
        self.ble_name = 'Train Base'
232
        self.manufacturer_id = 32
233