GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Test Failed
Push — master ( c3e90e...6835a9 )
by Virantha
01:29
created

bricknil.hub.DuploTrainHub.__init__()   A

Complexity

Conditions 1

Size

Total Lines 3
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 3
nop 4
dl 0
loc 3
rs 10
c 0
b 0
f 0
1
# Copyright 2019 Virantha N. Ekanayake 
2
# 
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
# 
7
# http://www.apache.org/licenses/LICENSE-2.0
8
# 
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14
15
"""Hub processes for the Boost Move and PoweredUp hubs
16
17
"""
18
import uuid
19
from curio import sleep, UniversalQueue, CancelledError
20
from .process import Process
21
from .peripheral import Peripheral  # for type check
22
from .const import USE_BLEAK
23
24
25
# noinspection SpellCheckingInspection
26
class Hub(Process):
27
    """Base class for all Lego hubs
28
29
       Arguments:
30
            name (str) : Human-readable name for this hub (for logging)
31
            query_port_info (bool) : Set to True if you want to query all the port information on a Hub (very communication intensive)
32
            ble_id (str) : BluetoothLE network(MAC) adddress to connect to (None if you want to connect to the first matching hub)
33
34
       Attributes:
35
      
36
            hubs (list [`Hub`]) : Class attr to keep track of all Hub (and subclasses) instances
37
            message_queue (`curio.Queue`) : Outgoing message queue to :class:`bricknil.ble_queue.BLEventQ`
38
            peripheral_queue (`curio.UniversalQueue`) : Incoming messages from :class:`bricknil.ble_queue.BLEventQ`
39
            uart_uuid (`uuid.UUID`) : UUID broadcast by LEGO UARTs
40
            char_uuid (`uuid.UUID`) : Lego uses only one service characteristic for communicating with the UART services
41
            tx : Service characteristic for tx/rx messages that's set by :func:`bricknil.ble_queue.BLEventQ.connect`
42
            peripherals (dict) : Peripheral name => `bricknil.Peripheral`
43
44
    """
45
    hubs = []
46
47
    # noinspection SpellCheckingInspection,SpellCheckingInspection,SpellCheckingInspection,SpellCheckingInspection
48
    def __init__(self, name, query_port_info=False, ble_id=None):
49
        super().__init__(name)
50
        self.ble_id = ble_id
51
        self.query_port_info = query_port_info
52
        self.message_queue = None
53
        self.uart_uuid = uuid.UUID('00001623-1212-efde-1623-785feabcd123')
54
        self.char_uuid = uuid.UUID('00001624-1212-efde-1623-785feabcd123')
55
        self.tx = None
56
        self.peripherals = {}  # attach_sensor method will add sensors to this
57
        self.peripheral_queue = UniversalQueue()  # Incoming messages from peripherals
58
59
        # Keep track of port info as we get messages from the hub ('update_port' messages)
60
        self.port_info = {}
61
62
        # Register this hub
63
        Hub.hubs.append(self)
64
65
    async def send_message(self, msg_name, msg_bytes):
66
        """Insert a message to the hub into the queue(:func:`bricknil.hub.Hub.message_queue`) connected to our BLE
67
           interface
68
69
        """
70
71
        while not self.tx:  # Need to make sure we have a handle to the uart
72
            await sleep(1)
73
        await self.message_queue.put((msg_name, self, msg_bytes))
74
75
    async def peripheral_message_loop(self):
76
        """The main loop that receives messages from the :class:`bricknil.messages.Message` parser.
77
78
           Waits for messages on a UniversalQueue and dispatches to the appropriate peripheral handler.
79
        """
80
        try:
81
            self.message_debug(f'starting peripheral message loop')
82
83
            # Check if we have any hub button peripherals attached
84
            # - If so, we need to manually call peripheral.activate_updates()
85
            # - and then register the proper handler inside the message parser
86
            while True:
87
                msg = await self.peripheral_queue.get()
88
                peripheral, msg = msg
89
                await self.peripheral_queue.task_done()
90
                if msg == 'value_change':
91
                    self.message_debug(f'peripheral msg: {peripheral} {msg}')
92
                    handler_name = f'{peripheral.name}_change'
93
                    handler = getattr(self, handler_name)
94
                    await handler()
95
                elif msg == 'attach':
96
                    self.message_debug(f'peripheral msg: {peripheral} {msg}')
97
                    peripheral.message_handler = self.send_message
98
                    peripheral.enabled = True
99
                    await peripheral.activate_updates()
100
                elif msg == 'update_port':
101
                    port, info = peripheral
102
                    self.port_info[port] = info
103
                elif msg.startswith('port'):
104
                    if self.query_port_info:
105
                        await self._get_port_info(peripheral, msg)
106
107
        except CancelledError:
108
            self.message(f'Terminating peripheral')
109
110
    def attach_sensor(self, sensor: Peripheral):
111
        """Add instance variable for this decorated sensor
112
113
           Called by the class decorator :class:`bricknil.bricknil.attach` when decorating the sensor
114
        """
115
        # Check that we don't already have a sensor with the same name attached
116
        assert sensor.name not in self.peripherals, f'Duplicate {sensor.name} found!'
117
        self.peripherals[sensor.name] = sensor
118
        # Put this sensor as an attribute
119
        setattr(self, sensor.name, sensor)
120
121
    async def _get_port_info(self, port, msg):
122
        """Utility function to query information on available ports and modes from a hub.
123
           
124
        """
125
        if msg == 'port_detected':
126
            # Request mode info
127
            b = [0x00, 0x21, port, 0x01]
128
            await self.send_message(f'req mode info on {port}', b)
129
        elif msg == 'port_info_received':
130
            # At this point we know all the available modes for this port
131
            # let's get the name and value format
132
            modes = self.port_info[port]['modes']
133
            if self.port_info[port].get('combinable', False):
134
                # Get combination info on port
135
                b = [0x00, 0x21, port, 0x02]
136
                await self.send_message(f'req mode combination info on {port}', b)
137
            for mode in modes.keys():
138
                info_types = { 'NAME': 0, 'VALUE FORMAT':0x80, 'RAW Range':0x01,
139
                        'PCT Range': 0x02, 'SI Range':0x03, 'Symbol':0x04,
140
                        'MAPPING': 0x05,
141
                        }
142
                # Send a message to requeust each type of info 
143
                for k,v in info_types.items():
144
                    b = [0x00, 0x22, port, mode, v]
145
                    await self.send_message(f'req info({k}) on mode {mode} {port}', b)
146
147
148
class PoweredUpHub(Hub):
149
    """PoweredUp Hub class 
150
    """
151
    def __init__(self, name, query_port_info=False, ble_id=None):
152
        super().__init__(name, query_port_info, ble_id)
153
        self.ble_name = 'HUB NO.4'
154
155
class PoweredUpRemote(Hub):
156
    """PoweredUp Remote class 
157
    """
158
    def __init__(self, name, query_port_info=False, ble_id=None):
159
        super().__init__(name, query_port_info, ble_id)
160
        self.ble_name = 'Handset'
161
162
class BoostHub(Hub):
163
    """Boost Move Hub
164
    """
165
    def __init__(self, name, query_port_info=False, ble_id=None):
166
        super().__init__(name, query_port_info, ble_id)
167
        self.ble_name = 'LEGO Move Hub'
168
169
class DuploTrainHub(Hub):
170
    """Duplo Steam train and Cargo Train
171
172
       This is hub is found in Lego sets 10874 and 10875
173
    """
174
    def __init__(self, name, query_port_info=False, ble_id=None):
175
        super().__init__(name, query_port_info, ble_id)
176
        self.ble_name = 'Train Base'
177