GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Push — add-logging ( b5daf2...d977ae )
by
unknown
01:08
created

DummyDriver._write()   A

Complexity

Conditions 1

Size

Total Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
c 1
b 0
f 0
dl 0
loc 2
rs 10
1
from abc import abstractmethod
2
from queue import Queue, Empty
3
from threading import Lock, Thread, Event
4
5
from serial import Serial, SerialException, SerialTimeoutException
6
7
import usb
8
import time
9
10
from libAnt.constants import MESSAGE_TX_SYNC, MESSAGE_CHANNEL_BROADCAST_DATA
11
from libAnt.message import Message, SystemResetMessage
12
13
14
class DriverException(Exception):
15
    pass
16
17
18
class Driver:
19
    """
20
    The driver provides an interface to read and write raw data to and from an ANT+ capable hardware device
21
    """
22
23
    def __init__(self, debug=False):
24
        self._lock = Lock()
25
        self._debug = debug
26
27
    def __enter__(self):
28
        self.open()
29
        return self
30
31
    def __exit__(self, exc_type, exc_val, exc_tb):
32
        self.close()
33
34
    def isOpen(self) -> bool:
35
        with self._lock:
36
            return self._isOpen()
37
38
    def open(self) -> None:
39
        with self._lock:
40
            if not self._isOpen():
41
                self._open()
42
43
    def close(self) -> None:
44
        with self._lock:
45
            if self._isOpen:
46
                self._close()
47
48
    def reOpen(self) -> None:
49
        with self._lock:
50
            if self._isOpen():
51
                self._close()
52
            self._open()
53
54
    def read(self, timeout=None) -> Message:
55
        if not self.isOpen():
56
            raise DriverException("Device is closed")
57
58
        with self._lock:
59
            while True:
60
                sync = self._read(1, timeout=timeout)[0]
61
                if sync is not MESSAGE_TX_SYNC:
62
                    continue
63
                len = self._read(1, timeout=timeout)[0]
64
                type = self._read(1, timeout=timeout)[0]
65
                data = self._read(len, timeout=timeout)
66
                chk = self._read(1, timeout=timeout)[0]
67
                msg = Message(type, data)
68
                if self._debug:
69
                    logMsg = bytes([sync, len, type, data, chk])
70
                    timestamp = time.time()
71
                    # TODO: save to file
72
                if msg.checksum() == chk:
73
                    return msg
74
75
    def write(self, msg: Message) -> None:
76
        if not self.isOpen():
77
            raise DriverException("Device is closed")
78
79
        with self._lock:
80
            self._write(msg.encode())
81
82
    @abstractmethod
83
    def _isOpen(self) -> bool:
84
        pass
85
86
    @abstractmethod
87
    def _open(self) -> None:
88
        pass
89
90
    @abstractmethod
91
    def _close(self) -> None:
92
        pass
93
94
    @abstractmethod
95
    def _read(self, count: int, timeout=None) -> bytes:
96
        pass
97
98
    @abstractmethod
99
    def _write(self, data: bytes) -> None:
100
        pass
101
102
103
class SerialDriver(Driver):
104
    """
105
    An implementation of a serial ANT+ device driver
106
    """
107
108
    def __init__(self, device: str, baudRate: int = 115200, debug=False):
109
        super().__init__(debug=debug)
110
        self._device = device
111
        self._baudRate = baudRate
112
        self._serial = None
113
114
    def __str__(self):
115
        if self.isOpen():
116
            return self._device + " @ " + str(self._baudRate)
117
        return None
118
119
    def _isOpen(self) -> bool:
120
        return self._serial is None
121
122
    def _open(self) -> None:
123
        try:
124
            self._serial = Serial(self._device, self._baudRate)
125
        except SerialException as e:
126
            raise DriverException(str(e))
127
128
        if not self._serial.isOpen():
129
            raise DriverException("Could not open specified device")
130
131
    def _close(self) -> None:
132
        self._serial.close()
133
        self._serial = None
134
135
    def _read(self, count: int, timeout=None) -> bytes:
136
        return self._serial.read(count, timeout=timeout)
137
138
    def _write(self, data: bytes) -> None:
139
        try:
140
            self._serial.write(data)
141
            self._serial.flush()
142
        except SerialTimeoutException as e:
143
            raise DriverException(str(e))
144
145
146
class USBDriver(Driver):
147
    """
148
    An implementation of a USB ANT+ device driver
149
    """
150
151
    def __init__(self, vid, pid, debug=False):
152
        super().__init__(debug=debug)
153
        self._idVendor = vid
154
        self._idProduct = pid
155
        self._dev = None
156
        self._epOut = None
157
        self._epIn = None
158
        self._interfaceNumber = None
159
        self._packetSize = 0x20
160
        self._queue = None
161
        self._loop = None
162
        self._driver_open = False
163
164
    def __str__(self):
165
        if self.isOpen():
166
            return str(self._dev)
167
        return "Closed"
168
169
    def _isOpen(self) -> bool:
170
        return self._driver_open
171
172
    def _open(self) -> None:
173
        print('USB OPEN START')
174
        try:
175
            # find the first USB device that matches the filter
176
            self._dev = usb.core.find(idVendor=self._idVendor, idProduct=self._idProduct)
177
178
            if self._dev is None:
179
                raise DriverException("Could not open specified device")
180
181
            # Detach kernel driver
182
            try:
183
                if self._dev.is_kernel_driver_active(0):
184
                    try:
185
                        self._dev.detach_kernel_driver(0)
186
                    except usb.USBError as e:
187
                        raise DriverException("Could not detach kernel driver")
188
            except NotImplementedError:
189
                pass  # for non unix systems
190
191
            # set the active configuration. With no arguments, the first
192
            # configuration will be the active one
193
            self._dev.set_configuration()
194
195
            # get an endpoint instance
196
            cfg = self._dev.get_active_configuration()
197
            self._interfaceNumber = cfg[(0, 0)].bInterfaceNumber
198
            interface = usb.util.find_descriptor(cfg, bInterfaceNumber=self._interfaceNumber,
199
                                                 bAlternateSetting=usb.control.get_interface(self._dev,
200
                                                                                             self._interfaceNumber))
201
            usb.util.claim_interface(self._dev, self._interfaceNumber)
202
203
            self._epOut = usb.util.find_descriptor(interface, custom_match=lambda e: usb.util.endpoint_direction(
204
                e.bEndpointAddress) == usb.ENDPOINT_OUT)
205
206
            self._epIn = usb.util.find_descriptor(interface, custom_match=lambda e: usb.util.endpoint_direction(
207
                e.bEndpointAddress) == usb.ENDPOINT_IN)
208
209
            if self._epOut is None or self._epIn is None:
210
                raise DriverException("Could not initialize USB endpoint")
211
212
            self._queue = Queue()
213
            self._loop = USBLoop(self._epIn, self._packetSize, self._queue)
214
            self._loop.start()
215
            self._driver_open = True
216
            print('USB OPEN SUCCESS')
217
        except IOError as e:
218
            self._close()
219
            raise DriverException(str(e))
220
221
    def _close(self) -> None:
222
        print('USB CLOSE START')
223
        if self._loop is not None:
224
            if self._loop.is_alive():
225
                self._loop.stop()
226
                self._loop.join()
227
        self._loop = None
228
        try:
229
            self._dev.reset()
230
            usb.util.dispose_resources(self._dev)
231
        except:
232
            pass
233
        self._dev = self._epOut = self._epIn = None
234
        self._driver_open = False
235
        print('USB CLOSE END')
236
237
    def _read(self, count: int, timeout=None) -> bytes:
238
        data = bytearray()
239
        for i in range(0, count):
240
            b = self._queue.get(timeout=timeout)
241
            if b is None:
242
                self._close()
243
                raise DriverException("Device is closed!")
244
            data.append(b)
245
        return bytes(data)
246
247
    def _write(self, data: bytes) -> None:
248
        return self._epOut.write(data)
249
250
251
class USBLoop(Thread):
252
    def __init__(self, ep, packetSize: int, queue: Queue):
253
        super().__init__()
254
        self._stopper = Event()
255
        self._ep = ep
256
        self._packetSize = packetSize
257
        self._queue = queue
258
259
    def stop(self) -> None:
260
        self._stopper.set()
261
262
    def run(self) -> None:
263
        while not self._stopper.is_set():
264
            try:
265
                data = self._ep.read(self._packetSize, timeout=1000)
266
                for d in data:
267
                    self._queue.put(d)
268
            except usb.core.USBError as e:
269
                if e.errno not in (60, 110) and e.backend_error_code != -116: # Timout errors
270
                    self._stopper.set()
271
        #  We Put in an invalid byte so threads will realize the device is stopped
272
        self._queue.put(None)
273
274
class DummyDriver(Driver):
275
    def __init__(self):
276
        self._isopen = False
277
        self._data = Queue()
278
        msg1 = Message(MESSAGE_CHANNEL_BROADCAST_DATA, b'\x00\x00\x00\x00\x00\x00\x00\x00').encode()
279
        for b in msg1:
280
            self._data.put(b)
281
        msg2 = Message(MESSAGE_CHANNEL_BROADCAST_DATA, b'\x00\x00\x00\x00\x00\x00\x00\x00').encode()
282
        for b in msg2:
283
            self._data.put(b)
284
        super().__init__(debug=True)
285
286
    def _isOpen(self) -> bool:
287
        return self._isopen
288
289
    def _close(self) -> None:
290
        self._isopen = False
291
292
    def _read(self, count: int, timeout=None) -> bytes:
293
        return self._data.get(timeout=timeout)
294
295
    def _open(self) -> None:
296
        self._isopen = True
297
298
    def _write(self, data: bytes) -> None:
299
        pass
300