GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Test Failed
Pull Request — master (#11)
by
unknown
01:53
created

bricknil.bricknil.main()   C

Complexity

Conditions 10

Size

Total Lines 64
Code Lines 33

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 10
eloc 33
nop 1
dl 0
loc 64
rs 5.9999
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like bricknil.bricknil.main() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# Copyright 2019 Virantha N. Ekanayake
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
#
7
# http://www.apache.org/licenses/LICENSE-2.0
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14
15
"""Utility functions to attach sensors/motors and start the whole event loop
16
17
    #. The decorator :class:`attach` to specify peripherals that
18
       connect to a hub (which enables sensing and motor control functions),
19
    #. The function :func:`start` that starts running the BLE communication queue, and all the hubs, in the event-loop system
20
21
"""
22
23
import logging
24
import pprint
25
from asyncio import run, sleep, Queue, get_event_loop
26
from asyncio import create_task as spawn
27
from functools import partial, wraps
28
import uuid
29
30
# Local imports
31
from .process import Process
32
from .ble_queue import BLEventQ
33
from .hub import PoweredUpHub, BoostHub, Hub
34
from .sockets import bricknil_socket_server
35
36
import threading
37
38
# Actual decorator that sets up the peripheral classes
39
# noinspection PyPep8Naming
40
class attach:
41
    """ Class-decorator to attach peripherals onto a Hub
42
43
        Injects sub-classes of `Peripheral` as instance variables on a Hub
44
        such as the PoweredUp Hub, akin to "attaching" a physical sensor or
45
        motor onto the Hub.
46
47
        Before you attach a peripheral with sensing capabilities,
48
        you need to ensure your `Peripheral` sub-class has the matching
49
        call-back method 'peripheralname_change'.
50
51
        Examples::
52
53
            @attach(PeripheralType,
54
                    name="instance name",
55
                    port='port',
56
                    capabilities=[])
57
58
        Warnings:
59
            - No support for checking to make sure user put in correct parameters
60
            - Identifies capabilities that need a callback update handler based purely on
61
              checking if the capability name starts with the string "sense*"
62
63
    """
64
    def __init__(self, peripheral_type, **kwargs):
65
        # TODO: check here to make sure parameters were entered
66
        if logging.getLogger().getEffectiveLevel() == logging.DEBUG:
67
            print(f'decorating with {peripheral_type}')
68
        self.peripheral_type = peripheral_type
69
        self.kwargs = kwargs
70
71
    def __call__ (self, cls):
72
        """
73
            Since the actual Hub sub-class being decorated can have __init__ params,
74
            we need to have a wrapper function inside here to capture the arguments
75
            going into that __init__ call.
76
77
            Inside that wrapper, we do the following:
78
79
            # Instance the peripheral that was decorated with the saved **kwargs
80
            # Check that any `sense_*` capabiilities in the peripheral have an
81
              appropriate handler method in the hub class being decorated.
82
            # Instance the Hub
83
            # Set the peripheral instance as an instance variable on the hub via the
84
              `Hub.attach_sensor` method
85
86
        """
87
        # Define a wrapper function to capture the actual instantiation and __init__ params
88
        @wraps(cls)
89
        def wrapper_f(*args, **kwargs):
90
            #print(f'type of cls is {type(cls)}')
91
            peripheral = self.peripheral_type(**self.kwargs)
92
93
            # Ugly, but scan through and check if any of the capabilities are sense_*
94
            if any([cap.name.startswith('sense') for cap in peripheral.capabilities]):
95
                handler_name = f'{peripheral.name}_change'
96
                assert hasattr(cls, handler_name), f'{cls.__name__} needs a handler {handler_name}'
97
            # Create the hub process and attach this peripheral
98
            o = cls(*args, **kwargs)
99
            o.message_debug(f"Decorating class {cls.__name__} with {self.peripheral_type.__name__}")
100
            o.attach_sensor(peripheral)
101
            return o
102
        return wrapper_f
103
104
async def main(system):
105
    """
106
    Entry-point coroutine that handles everything. This is to be run run
107
    in bricknil's main loop.
108
109
    You normally don't need to use this directly, instead use start()
110
    """
111
112
    # Instantiate the Bluetooth LE handler/queue
113
    ble_q = BLEventQ()
114
    # The web client out_going queue
115
    web_out_queue = Queue()
116
    # Instantiate socket listener
117
    # task_socket = await spawn(socket_server, web_out_queue, ('',25000))
118
    # task_tcp = await spawn(bricknil_socket_server, web_out_queue, ('',25000))
119
    # await task_tcp.join()
120
121
    # Call the user's system routine to instantiate the processes
122
    await system()
123
124
    hub_tasks = []
125
    hub_peripheral_listen_tasks = [] # Need to cancel these at the end
126
127
    # Connect all the hubs first before enabling any of them
128
    for hub in Hub.hubs:
129
        hub.web_queue_out = web_out_queue
130
        task_connect = spawn(ble_q.connect(hub))
131
        await task_connect
132
133
    for hub in Hub.hubs:
134
        # Start the peripheral listening loop in each hub
135
        task_listen = spawn(hub.peripheral_message_loop())
136
        hub_peripheral_listen_tasks.append(task_listen)
137
138
        # Need to wait here until all the ports are set
139
        # Use a faster timeout the first time (for speeding up testing)
140
        first_delay = True
141
        for name, peripheral in hub.peripherals.items():
142
            while peripheral.port is None:
143
                hub.message_info(f"Waiting for peripheral {name} to attach to a port")
144
                if first_delay:
145
                    first_delay = False
146
                    await sleep(0.1)
147
                else:
148
                    await sleep(1)
149
150
        # Start each hub
151
        task_run = spawn(hub.run())
152
        hub_tasks.append(task_run)
153
154
    # Now wait for the tasks to finish
155
    ble_q.message_info(f'Waiting for hubs to end')
156
157
    for task in hub_tasks:
158
        await task
159
    ble_q.message_info(f'Hubs end')
160
    await ble_q.disconnect()
161
    for task in hub_peripheral_listen_tasks:
162
        task.cancel()
163
164
    # Print out the port information in debug mode
165
    for hub in Hub.hubs:
166
        if hub.query_port_info:
167
            hub.message_info(pprint.pformat(hub.port_info))
168
169
def start(user_system_setup_func): #pragma: no cover
170
    """
171
        Main entry point into running everything.
172
173
        Just pass in the async co-routine that instantiates all your hubs, and this
174
        function will take care of the rest.  This includes:
175
176
        - Initializing the bluetooth interface object
177
        - Starting up the user async co-routines inside the asyncio event loop
178
    """
179
    loop = get_event_loop()
180
    loop.run_until_complete(main(user_system_setup_func))
181
182
183