GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Test Failed
Push — master ( 8809a3...7b33e5 )
by Virantha
01:30
created

bricknil.bricknil   A

Complexity

Total Complexity 16

Size/Duplication

Total Lines 203
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 76
dl 0
loc 203
rs 10
c 0
b 0
f 0
wmc 16

3 Functions

Rating   Name   Duplication   Size   Complexity  
A start() 0 29 2
A _curio_event_run() 0 11 1
C _run_all() 0 50 9

2 Methods

Rating   Name   Duplication   Size   Complexity  
A attach.__init__() 0 6 2
A attach.__call__() 0 33 2
1
# Copyright 2019 Virantha N. Ekanayake 
2
# 
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
# 
7
# http://www.apache.org/licenses/LICENSE-2.0
8
# 
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14
15
"""Utility functions to attach sensors/motors and start the whole event loop
16
    
17
    #. The decorator :class:`attach` to specify peripherals that
18
       connect to a hub (which enables sensing and motor control functions), 
19
    #. The function :func:`start` that starts running the BLE communication queue, and all the hubs, in the event-loop system
20
21
"""
22
23
import logging
24
import pprint
25
from curio import run, spawn,  sleep
26
import Adafruit_BluefruitLE
27
from functools import partial, wraps
28
import uuid
29
30
# Local imports
31
from .process import Process
32
from .ble_queue import BLEventQ
33
from .hub import PoweredUpHub, BoostHub, Hub
34
from .const import USE_BLEAK
35
if USE_BLEAK:
36
    from .bleak import Bleak
37
38
import threading
39
40
# Actual decorator that sets up the peripheral classes
41
# noinspection PyPep8Naming
42
class attach:
43
    """ Class-decorator to attach peripherals onto a Hub
44
45
        Injects sub-classes of `Peripheral` as instance variables on a Hub 
46
        such as the PoweredUp Hub, akin to "attaching" a physical sensor or
47
        motor onto the Hub.
48
49
        Before you attach a peripheral with sensing capabilities, 
50
        you need to ensure your `Peripheral` sub-class has the matching
51
        call-back method 'peripheralname_change'.  
52
53
        Examples::
54
55
            @attach(PeripheralType, 
56
                    name="instance name", 
57
                    port='port', 
58
                    capabilities=[])
59
60
        Warnings:
61
            - No support for checking to make sure user put in correct parameters
62
            - Identifies capabilities that need a callback update handler based purely on
63
              checking if the capability name starts with the string "sense*"
64
65
    """
66
    def __init__(self, peripheral_type, **kwargs):
67
        # TODO: check here to make sure parameters were entered
68
        if Process.level == Process.MSG_LEVEL.DEBUG:
69
            print(f'decorating with {peripheral_type}')
70
        self.peripheral_type = peripheral_type
71
        self.kwargs = kwargs
72
73
    def __call__ (self, cls):
74
        """
75
            Since the actual Hub sub-class being decorated can have __init__ params,
76
            we need to have a wrapper function inside here to capture the arguments
77
            going into that __init__ call.
78
79
            Inside that wrapper, we do the following:
80
            
81
            # Instance the peripheral that was decorated with the saved **kwargs
82
            # Check that any `sense_*` capabiilities in the peripheral have an 
83
              appropriate handler method in the hub class being decorated.
84
            # Instance the Hub
85
            # Set the peripheral instance as an instance variable on the hub via the
86
              `Hub.attach_sensor` method
87
88
        """
89
        #print(f"Decorating class {cls.__name__} with {self.peripheral_type.__name__}")
90
        # Define a wrapper function to capture the actual instantiation and __init__ params
91
        @wraps(cls)
92
        def wrapper_f(*args):
93
            #print(f'type of cls is {type(cls)}')
94
            peripheral = self.peripheral_type(**self.kwargs)
95
96
            # Ugly, but scan through and check if any of the capabilities are sense_*
97
            if any([cap.name.startswith('sense') for cap in peripheral.capabilities]):
98
                handler_name = f'{peripheral.name}_change'
99
                assert hasattr(cls, handler_name), f'{cls.__name__} needs a handler {handler_name}'
100
            # Create the hub process and attach this peripheral
101
            o = cls(*args)
102
            o.message_debug(f"Decorating class {cls.__name__} with {self.peripheral_type.__name__}")
103
            o.attach_sensor(peripheral)
104
            return o
105
        return wrapper_f
106
107
108
109
async def _run_all(ble, system):
110
    """Curio run loop 
111
    """
112
    print('inside curio run loop')
113
    # Instantiate the Bluetooth LE handler/queue
114
    ble_q = BLEventQ(ble)
115
116
    # Call the user's system routine to instantiate the processes
117
    await system()
118
119
    hub_tasks = []
120
    hub_peripheral_listen_tasks = [] # Need to cancel these at the end
121
122
    # Run the bluetooth listen queue
123
    task_ble_q = await spawn(ble_q.run())
124
125
    # Connect all the hubs first before enabling any of them
126
    for hub in Hub.hubs:
127
        task_connect = await spawn(ble_q.connect(hub))
128
        await task_connect.join()
129
130
    for hub in Hub.hubs:
131
        # Start the peripheral listening loop in each hub
132
        task_listen = await spawn(hub.peripheral_message_loop())
133
        hub_peripheral_listen_tasks.append(task_listen)
134
135
        # Need to wait here until all the ports are set
136
        for name, peripheral in hub.peripherals.items():
137
            while peripheral.port is None:
138
                hub.message_info(f"Waiting for peripheral {name} to attach to a port")
139
                await sleep(1)
140
141
        # Start each hub
142
        task_run = await spawn(hub.run())
143
        hub_tasks.append(task_run)
144
145
146
    # Now wait for the tasks to finish
147
    hub.message_info(f'Waiting for hubs to end')
0 ignored issues
show
introduced by
The variable hub does not seem to be defined in case the for loop on line 126 is not entered. Are you sure this can never be the case?
Loading history...
148
    for task in hub_tasks:
149
        await task.join()
150
    hub.message_info(f'Hubs end')
151
    for task in hub_peripheral_listen_tasks:
152
        await task.cancel()
153
    await task_ble_q.cancel()
154
155
    # Print out the port information in debug mode
156
    for hub in Hub.hubs:
157
        if hub.query_port_info:
158
            hub.message_info(pprint.pformat(hub.port_info))
159
        
160
161
162
def _curio_event_run(ble, system):
163
    """ One line function to start the Curio Event loop, 
164
        starting all the hubs with the message queue to the bluetooth
165
        communcation thread loop.
166
167
        Args:
168
            ble : The Adafruit_BluefruitLE interface object
169
            system :  Coroutine that the user provided to instantate their system
170
171
    """
172
    run(_run_all(ble, system), with_monitor=True)
173
174
def start(user_system_setup_func):
175
    """
176
        Main entry point into running everything.
177
178
        Just pass in the async co-routine that instantiates all your hubs, and this
179
        function will take care of the rest.  This includes:
180
181
        - Initializing the Adafruit bluetooth interface object
182
        - Starting a run loop inside this bluetooth interface for executing the
183
          Curio event loop
184
        - Starting up the user async co-routines inside the Curio event loop
185
    """
186
187
    if USE_BLEAK:
188
        ble = Bleak()
1 ignored issue
show
introduced by
The variable Bleak does not seem to be defined in case USE_BLEAK on line 35 is False. Are you sure this can never be the case?
Loading history...
189
        # Run curio in a thread
190
        curry_curio_event_run = partial(_curio_event_run, ble=ble, system=user_system_setup_func)
191
        t = threading.Thread(target=curry_curio_event_run)
192
        t.start()
193
        print('started thread for curio')
194
        ble.run()
195
    else:
196
        ble = Adafruit_BluefruitLE.get_provider()
197
        ble.initialize()
198
        # run_mainloop_with call does not accept function args.  So let's curry
199
        # the my_run with the ble arg as curry_my_run
200
        curry_curio_event_run = partial(_curio_event_run, ble=ble, system=user_system_setup_func)
201
        
202
        ble.run_mainloop_with(curry_curio_event_run)
203
204