GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Pull Request — master (#4)
by Benjamin
02:19 queued 01:07
created

USBDriver

Size/Duplication

Total Lines 129
Duplicated Lines 0 %

Importance

Changes 10
Bugs 0 Features 0
Metric Value
c 10
b 0
f 0
dl 0
loc 129

11 Methods

Rating   Name   Duplication   Size   Complexity  
A USBLoop.__init__() 0 6 1
A USBLoop.stop() 0 2 1
A _isOpen() 0 2 1
B USBLoop.run() 0 11 6
A _read() 0 9 3
A _write() 0 2 1
A _close() 0 15 4
A __str__() 0 4 2
A __init__() 0 12 1
F _open() 0 48 10
A _abort() 0 2 1
1
from queue import Queue
2
from threading import Event, Thread
3
4
from usb import USBError, ENDPOINT_OUT, ENDPOINT_IN
5
from usb.control import get_interface
6
from usb.core import find
7
from usb.util import find_descriptor, endpoint_direction, claim_interface, dispose_resources
8
9
from libAnt.drivers.driver import Driver, DriverException
10
from libAnt.loggers.logger import Logger
11
12
13
class USBDriver(Driver):
14
    """
15
    An implementation of a USB ANT+ device driver
16
    """
17
18
    def __init__(self, vid, pid, logger: Logger = None):
19
        super().__init__(logger=logger)
20
        self._idVendor = vid
21
        self._idProduct = pid
22
        self._dev = None
23
        self._epOut = None
24
        self._epIn = None
25
        self._interfaceNumber = None
26
        self._packetSize = 0x20
27
        self._queue = None
28
        self._loop = None
29
        self._driver_open = False
30
31
    def __str__(self):
32
        if self.isOpen():
33
            return str(self._dev)
34
        return "Closed"
35
36
    class USBLoop(Thread):
37
        def __init__(self, ep, packetSize: int, queue: Queue):
38
            super().__init__()
39
            self._stopper = Event()
40
            self._ep = ep
41
            self._packetSize = packetSize
42
            self._queue = queue
43
44
        def stop(self) -> None:
45
            self._stopper.set()
46
47
        def run(self) -> None:
48
            while not self._stopper.is_set():
49
                try:
50
                    data = self._ep.read(self._packetSize, timeout=1000)
51
                    for d in data:
52
                        self._queue.put(d)
53
                except USBError as e:
54
                    if e.errno not in (60, 110) and e.backend_error_code != -116:  # Timout errors
55
                        self._stopper.set()
56
            # We Put in an invalid byte so threads will realize the device is stopped
57
            self._queue.put(None)
58
59
    def _isOpen(self) -> bool:
60
        return self._driver_open
61
62
    def _open(self) -> None:
63
        print('USB OPEN START')
64
        try:
65
            # find the first USB device that matches the filter
66
            self._dev = find(idVendor=self._idVendor, idProduct=self._idProduct)
67
68
            if self._dev is None:
69
                raise DriverException("Could not open specified device")
70
71
            # Detach kernel driver
72
            try:
73
                if self._dev.is_kernel_driver_active(0):
74
                    try:
75
                        self._dev.detach_kernel_driver(0)
76
                    except USBError as e:
77
                        raise DriverException("Could not detach kernel driver")
78
            except NotImplementedError:
79
                pass  # for non unix systems
80
81
            # set the active configuration. With no arguments, the first
82
            # configuration will be the active one
83
            self._dev.set_configuration()
84
85
            # get an endpoint instance
86
            cfg = self._dev.get_active_configuration()
87
            self._interfaceNumber = cfg[(0, 0)].bInterfaceNumber
88
            interface = find_descriptor(cfg, bInterfaceNumber=self._interfaceNumber,
89
                                        bAlternateSetting=get_interface(self._dev,
90
                                                                        self._interfaceNumber))
91
            claim_interface(self._dev, self._interfaceNumber)
92
93
            self._epOut = find_descriptor(interface, custom_match=lambda e: endpoint_direction(
94
                e.bEndpointAddress) == ENDPOINT_OUT)
95
96
            self._epIn = find_descriptor(interface, custom_match=lambda e: endpoint_direction(
97
                e.bEndpointAddress) == ENDPOINT_IN)
98
99
            if self._epOut is None or self._epIn is None:
100
                raise DriverException("Could not initialize USB endpoint")
101
102
            self._queue = Queue()
103
            self._loop = self.USBLoop(self._epIn, self._packetSize, self._queue)
104
            self._loop.start()
105
            self._driver_open = True
106
            print('USB OPEN SUCCESS')
107
        except IOError as e:
108
            self._close()
109
            raise DriverException(str(e))
110
111
    def _close(self) -> None:
112
        print('USB CLOSE START')
113
        if self._loop is not None:
114
            if self._loop.is_alive():
115
                self._loop.stop()
116
                self._loop.join()
117
        self._loop = None
118
        try:
119
            self._dev.reset()
120
            dispose_resources(self._dev)
121
        except:
122
            pass
123
        self._dev = self._epOut = self._epIn = None
124
        self._driver_open = False
125
        print('USB CLOSE END')
126
127
    def _read(self, count: int, timeout=None) -> bytes:
128
        data = bytearray()
129
        for i in range(0, count):
130
            b = self._queue.get(timeout=timeout)
131
            if b is None:
132
                self._close()
133
                raise DriverException("Device is closed!")
134
            data.append(b)
135
        return bytes(data)
136
137
    def _write(self, data: bytes) -> None:
138
        return self._epOut.write(data)
139
140
    def _abort(self) -> None:
141
        pass  # not implemented for USB driver, use timeouts instead
142