GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.

bricknil.bricknil   A
last analyzed

Complexity

Total Complexity 17

Size/Duplication

Total Lines 223
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 84
dl 0
loc 223
rs 10
c 0
b 0
f 0
wmc 17

2 Methods

Rating   Name   Duplication   Size   Complexity  
A attach.__init__() 0 6 2
A attach.__call__() 0 32 2

3 Functions

Rating   Name   Duplication   Size   Complexity  
A _curio_event_run() 0 11 1
C _run_all() 0 66 10
A start() 0 30 2
1
# Copyright 2019 Virantha N. Ekanayake 
2
# 
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
# 
7
# http://www.apache.org/licenses/LICENSE-2.0
8
# 
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14
15
"""Utility functions to attach sensors/motors and start the whole event loop
16
    
17
    #. The decorator :class:`attach` to specify peripherals that
18
       connect to a hub (which enables sensing and motor control functions), 
19
    #. The function :func:`start` that starts running the BLE communication queue, and all the hubs, in the event-loop system
20
21
"""
22
23
import logging
24
import pprint
25
from curio import run, spawn,  sleep, Queue, tcp_server
26
import Adafruit_BluefruitLE
27
from functools import partial, wraps
28
import uuid
29
30
# Local imports
31
from .process import Process
32
from .ble_queue import BLEventQ
33
from .hub import PoweredUpHub, BoostHub, Hub
34
from .const import USE_BLEAK
35
from .sockets import bricknil_socket_server
36
37
#if USE_BLEAK:
38
    #from .bleak_interface import Bleak
39
40
import threading
41
42
# Actual decorator that sets up the peripheral classes
43
# noinspection PyPep8Naming
44
class attach:
45
    """ Class-decorator to attach peripherals onto a Hub
46
47
        Injects sub-classes of `Peripheral` as instance variables on a Hub 
48
        such as the PoweredUp Hub, akin to "attaching" a physical sensor or
49
        motor onto the Hub.
50
51
        Before you attach a peripheral with sensing capabilities, 
52
        you need to ensure your `Peripheral` sub-class has the matching
53
        call-back method 'peripheralname_change'.  
54
55
        Examples::
56
57
            @attach(PeripheralType, 
58
                    name="instance name", 
59
                    port='port', 
60
                    capabilities=[])
61
62
        Warnings:
63
            - No support for checking to make sure user put in correct parameters
64
            - Identifies capabilities that need a callback update handler based purely on
65
              checking if the capability name starts with the string "sense*"
66
67
    """
68
    def __init__(self, peripheral_type, **kwargs):
69
        # TODO: check here to make sure parameters were entered
70
        if logging.getLogger().getEffectiveLevel() == logging.DEBUG:
71
            print(f'decorating with {peripheral_type}')
72
        self.peripheral_type = peripheral_type
73
        self.kwargs = kwargs
74
75
    def __call__ (self, cls):
76
        """
77
            Since the actual Hub sub-class being decorated can have __init__ params,
78
            we need to have a wrapper function inside here to capture the arguments
79
            going into that __init__ call.
80
81
            Inside that wrapper, we do the following:
82
            
83
            # Instance the peripheral that was decorated with the saved **kwargs
84
            # Check that any `sense_*` capabiilities in the peripheral have an 
85
              appropriate handler method in the hub class being decorated.
86
            # Instance the Hub
87
            # Set the peripheral instance as an instance variable on the hub via the
88
              `Hub.attach_sensor` method
89
90
        """
91
        # Define a wrapper function to capture the actual instantiation and __init__ params
92
        @wraps(cls)
93
        def wrapper_f(*args, **kwargs):
94
            #print(f'type of cls is {type(cls)}')
95
            peripheral = self.peripheral_type(**self.kwargs)
96
97
            # Ugly, but scan through and check if any of the capabilities are sense_*
98
            if any([cap.name.startswith('sense') for cap in peripheral.capabilities]):
99
                handler_name = f'{peripheral.name}_change'
100
                assert hasattr(cls, handler_name), f'{cls.__name__} needs a handler {handler_name}'
101
            # Create the hub process and attach this peripheral
102
            o = cls(*args, **kwargs)
103
            o.message_debug(f"Decorating class {cls.__name__} with {self.peripheral_type.__name__}")
104
            o.attach_sensor(peripheral)
105
            return o
106
        return wrapper_f
107
108
109
110
111
112
async def _run_all(ble, system):
113
    """Curio run loop 
114
    """
115
    print('inside curio run loop')
116
    # Instantiate the Bluetooth LE handler/queue
117
    ble_q = BLEventQ(ble)
118
    # The web client out_going queue
119
    web_out_queue = Queue()
120
    # Instantiate socket listener
121
    #task_socket = await spawn(socket_server, web_out_queue, ('',25000))
122
    task_tcp = await spawn(bricknil_socket_server, web_out_queue, ('',25000))
123
    await task_tcp.join()
124
125
    # Call the user's system routine to instantiate the processes
126
    await system()
127
128
    hub_tasks = []
129
    hub_peripheral_listen_tasks = [] # Need to cancel these at the end
130
131
    # Run the bluetooth listen queue
132
    task_ble_q = await spawn(ble_q.run())
133
134
    # Connect all the hubs first before enabling any of them
135
    for hub in Hub.hubs:
136
        hub.web_queue_out = web_out_queue
137
        task_connect = await spawn(ble_q.connect(hub))
138
        await task_connect.join()
139
140
141
    for hub in Hub.hubs:
142
        # Start the peripheral listening loop in each hub
143
        task_listen = await spawn(hub.peripheral_message_loop())
144
        hub_peripheral_listen_tasks.append(task_listen)
145
146
        # Need to wait here until all the ports are set
147
        # Use a faster timeout the first time (for speeding up testing)
148
        first_delay = True
149
        for name, peripheral in hub.peripherals.items():
150
            while peripheral.port is None:
151
                hub.message_info(f"Waiting for peripheral {name} to attach to a port")
152
                if first_delay:
153
                    first_delay = False
154
                    await sleep(0.1)
155
                else:
156
                    await sleep(1)
157
158
        # Start each hub
159
        task_run = await spawn(hub.run())
160
        hub_tasks.append(task_run)
161
162
163
    # Now wait for the tasks to finish
164
    ble_q.message_info(f'Waiting for hubs to end')
165
166
    for task in hub_tasks:
167
        await task.join()
168
    ble_q.message_info(f'Hubs end')
169
170
    for task in hub_peripheral_listen_tasks:
171
        await task.cancel()
172
    await task_ble_q.cancel()
173
174
    # Print out the port information in debug mode
175
    for hub in Hub.hubs:
176
        if hub.query_port_info:
177
            hub.message_info(pprint.pformat(hub.port_info))
178
        
179
180
181
def _curio_event_run(ble, system):
182
    """ One line function to start the Curio Event loop, 
183
        starting all the hubs with the message queue to the bluetooth
184
        communcation thread loop.
185
186
        Args:
187
            ble : The Adafruit_BluefruitLE interface object
188
            system :  Coroutine that the user provided to instantate their system
189
190
    """
191
    run(_run_all(ble, system), with_monitor=False)
192
193
def start(user_system_setup_func): #pragma: no cover
194
    """
195
        Main entry point into running everything.
196
197
        Just pass in the async co-routine that instantiates all your hubs, and this
198
        function will take care of the rest.  This includes:
199
200
        - Initializing the Adafruit bluetooth interface object
201
        - Starting a run loop inside this bluetooth interface for executing the
202
          Curio event loop
203
        - Starting up the user async co-routines inside the Curio event loop
204
    """
205
206
    if USE_BLEAK:
207
        from .bleak_interface import Bleak
208
        ble = Bleak()
209
        # Run curio in a thread
210
        curry_curio_event_run = partial(_curio_event_run, ble=ble, system=user_system_setup_func)
211
        t = threading.Thread(target=curry_curio_event_run)
212
        t.start()
213
        print('started thread for curio')
214
        ble.run()
215
    else:
216
        ble = Adafruit_BluefruitLE.get_provider()
217
        ble.initialize()
218
        # run_mainloop_with call does not accept function args.  So let's curry
219
        # the my_run with the ble arg as curry_my_run
220
        curry_curio_event_run = partial(_curio_event_run, ble=ble, system=user_system_setup_func)
221
        
222
        ble.run_mainloop_with(curry_curio_event_run)
223
224