GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Push — master ( 5dafb7...650a9f )
by Benjamin
01:07
created

USBLoop   A

Complexity

Total Complexity 8

Size/Duplication

Total Lines 23
Duplicated Lines 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
wmc 8
c 1
b 0
f 0
dl 0
loc 23
rs 10

4 Methods

Rating   Name   Duplication   Size   Complexity  
A __init__() 0 6 1
A stop() 0 2 1
B run() 0 9 5
A stopped() 0 2 1
1
from abc import abstractmethod
2
from queue import Queue
3
from threading import Lock, Thread, Event
4
5
from serial import Serial, SerialException, SerialTimeoutException
6
7
import usb
8
9
from libAnt.constants import MESSAGE_TX_SYNC
10
11
12
class DriverException(Exception):
13
    pass
14
15
16
class Driver:
17
    """
18
    The driver provides an interface to read and write raw data to and from an ANT+ capable hardware device
19
    """
20
21
    def __init__(self):
22
        self._lock = Lock()
23
24
    def __enter__(self):
25
        self.open()
26
        return self
27
28
    def __exit__(self, exc_type, exc_val, exc_tb):
29
        self.close()
30
31
    def isOpen(self) -> bool:
32
        with self._lock:
33
            return self._isOpen()
34
35
    def open(self) -> None:
36
        with self._lock:
37
            if not self._isOpen():
38
                self._open()
39
40
    def close(self) -> None:
41
        with self._lock:
42
            if self._isOpen():
43
                self._close()
44
45
    def reOpen(self) -> None:
46
        with self._lock:
47
            if self._isOpen():
48
                self._close()
49
                self._open()
50
51
    def read(self, count: int) -> bytearray:
52
        if count <= 0:
53
            raise DriverException("Count must be > 0")
54
        if not self.isOpen():
55
            raise DriverException("Device is closed")
56
57
        with self._lock:
58
            return self._read(count)
59
60
    def write(self, type: int, msg: bytearray) -> None:
61
        if not self.isOpen():
62
            raise DriverException("Device is closed")
63
64
        print('HOST ({:02X}): '.format(type) + ' '.join('{:02X}'.format(x) for x in msg))
65
        payload = bytearray()
66
        payload.append(MESSAGE_TX_SYNC)
67
        payload.append(len(msg))
68
        payload.append(type)
69
        payload.extend(msg)
70
71
        checksum = 0
72
        for b in payload:
73
            checksum ^= b
74
75
        payload.append(checksum)
76
77
        with self._lock:
78
            self._write(payload)
79
80
    @abstractmethod
81
    def _isOpen(self):
82
        pass
83
84
    @abstractmethod
85
    def _open(self):
86
        pass
87
88
    @abstractmethod
89
    def _close(self):
90
        pass
91
92
    @abstractmethod
93
    def _read(self, count):
94
        pass
95
96
    @abstractmethod
97
    def _write(self, data):
98
        pass
99
100
101
class SerialDriver(Driver):
102
    """
103
    An implementation of a serial ANT+ device driver
104
    """
105
106
    def __init__(self, device, baudRate=115200):
107
        super().__init__()
108
        self._device = device
109
        self._baudRate = baudRate
110
        self._serial = None
111
112
    def __str__(self):
113
        if self.isOpen():
114
            return self._device + " @ " + str(self._baudRate)
115
        return None
116
117
    def _isOpen(self):
118
        return self._serial is None
119
120
    def _open(self):
121
        try:
122
            self._serial = Serial(self._device, self._baudRate)
123
        except SerialException as e:
124
            raise DriverException(str(e))
125
126
        if not self._serial.isOpen():
127
            raise DriverException("Could not open specified device")
128
129
    def _close(self):
130
        self._serial.close()
131
        self._serial = None
132
133
    def _read(self, count):
134
        return self._serial.read(count)
135
136
    def _write(self, data):
137
        try:
138
            count = self._serial.write(data)
139
            self._serial.flush()
140
        except SerialTimeoutException as e:
141
            raise DriverException(str(e))
142
143
144
class USBDriver(Driver):
145
    """
146
    An implementation of a USB ANT+ device driver
147
    """
148
149
    def __init__(self, vid, pid):
150
        Driver.__init__(self)
151
        self._idVendor = vid
152
        self._idProduct = pid
153
        self._dev = None
154
        self._epOut = None
155
        self._epIn = None
156
        self._interfaceNumber = None
157
        self._packetSize = 0x20
158
        self._queue = None
159
        self._loop = None
160
161
    def __str__(self):
162
        if self.isOpen():
163
            return str(self._dev)
164
        return "Closed"
165
166
    def _isOpen(self):
167
        return self._dev is not None
168
169
    def _open(self):
170
        try:
171
            # find the first USB device that matches the filter
172
            self._dev = usb.core.find(idVendor=self._idVendor, idProduct=self._idProduct)
173
174
            if self._dev is None:
175
                raise DriverException("Could not open specified device")
176
177
            # Detach kernel driver
178
            try:
179
                if self._dev.is_kernel_driver_active(0):
180
                    try:
181
                        self._dev.detach_kernel_driver(0)
182
                    except usb.USBError as e:
183
                        raise DriverException("Could not detach kernel driver")
184
            except NotImplementedError:
185
                pass  # for non unix systems
186
187
            # set the active configuration. With no arguments, the first
188
            # configuration will be the active one
189
            self._dev.set_configuration()
190
191
            # get an endpoint instance
192
            cfg = self._dev.get_active_configuration()
193
            self._interfaceNumber = cfg[(0, 0)].bInterfaceNumber
194
            interface = usb.util.find_descriptor(cfg, bInterfaceNumber=self._interfaceNumber,
195
                                                 bAlternateSetting=usb.control.get_interface(self._dev,
196
                                                                                             self._interfaceNumber))
197
            usb.util.claim_interface(self._dev, self._interfaceNumber)
198
199
            self._epOut = usb.util.find_descriptor(interface, custom_match=lambda e: usb.util.endpoint_direction(
200
                e.bEndpointAddress) == usb.ENDPOINT_OUT)
201
202
            self._epIn = usb.util.find_descriptor(interface, custom_match=lambda e: usb.util.endpoint_direction(
203
                e.bEndpointAddress) == usb.ENDPOINT_IN)
204
205
            if self._epOut is None or self._epIn is None:
206
                raise DriverException("Could not initialize USB endpoint")
207
208
            self._queue = Queue()
209
            self._loop = USBLoop(self._epIn, self._packetSize, self._queue)
210
            self._loop.start()
211
        except IOError as e:
212
            raise DriverException(str(e))
213
214
    def _close(self):
215
        if self._loop.is_alive():
216
            self._loop.stop()
217
            self._loop.join()
218
        self._loop = None
219
        usb.util.release_interface(self._dev, self._interfaceNumber)
220
        usb.util.dispose_resources(self._dev)
221
        self._dev = self._epOut = self._epIn = None
222
223
    def _read(self, count):
224
        buf = bytearray()
225
        for i in range(0, count):
226
            buf.append(self._queue.get())
227
        return buf
228
229
    def _write(self, data):
230
        return self._epOut.write(data)
231
232
233
class USBLoop(Thread):
234
    def __init__(self, ep, packetSize, queue):
235
        super().__init__()
236
        self._stop = Event()
237
        self._ep = ep
238
        self._packetSize = packetSize
239
        self._queue = queue
240
241
    def stop(self):
242
        self._stop.set()
243
244
    def stopped(self):
245
        return self._stop.isSet()
246
247
    def run(self):
248
        while not self._stop.is_set():
249
            try:
250
                data = self._ep.read(self._packetSize, 10000)
251
                for d in data:
252
                    self._queue.put(d)
253
            except usb.USBError as e:
254
                if e.errno not in (60, 110):
255
                    raise e
256