GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Pull Request — master (#3)
by
unknown
59s
created

Driver.splitN()   A

Complexity

Conditions 2

Size

Total Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 2
c 1
b 0
f 0
dl 0
loc 2
rs 10
1
from abc import abstractmethod
2
from queue import Queue, Empty
3
from threading import Lock, Thread, Event
4
5
from serial import Serial, SerialException, SerialTimeoutException
6
7
import usb
8
import time
9
import binascii
10
11
from libAnt.constants import MESSAGE_TX_SYNC, MESSAGE_CHANNEL_BROADCAST_DATA
12
from libAnt.message import Message, SystemResetMessage
13
14
15
class DriverException(Exception):
16
    pass
17
18
19
class Driver:
20
    """
21
    The driver provides an interface to read and write raw data to and from an ANT+ capable hardware device
22
    """
23
24
    def __init__(self, debug=False):
25
        self._lock = Lock()
26
        self._debug = debug
27
28
    def __enter__(self):
29
        self.open()
30
        return self
31
32
    def __exit__(self, exc_type, exc_val, exc_tb):
33
        self.close()
34
35
    def byteString2String(self, byteString) -> str:
36
        return binascii.a2b_hex(''.join(byteString.split()))
37
38
39
    def isOpen(self) -> bool:
40
        with self._lock:
41
            return self._isOpen()
42
43
    def open(self) -> None:
44
        with self._lock:
45
            if not self._isOpen():
46
                self._open()
47
            if self._debug:
48
                # write pcap global header
49
                # Global header for pcap 2.4
50
                pcap_global_header = ('D4 C3 B2 A1'
51
                                      '02 00'  # File format major revision (i.e. pcap <2>.4)
52
                                      '04 00'  # File format minor revision (i.e. pcap 2.<4>)
53
                                      '00 00 00 00'
54
                                      '00 00 00 00'
55
                                      'FF 00 00 00'
56
                                      '01 00 00 00')
57
                self.logfile = open('log.pcap', 'wb')
58
                self.logfile.write(self.byteString2String(pcap_global_header))
59
60
61
62
63
    def close(self) -> None:
64
        with self._lock:
65
            if self._isOpen:
66
                if self.logfile:
67
                        self.logfile.close()
68
                self._close()
69
70
    def reOpen(self) -> None:
71
        with self._lock:
72
            if self._isOpen():
73
                self._close()
74
            self._open()
75
76
    def read(self, timeout=None) -> Message:
77
        # Splits the string into a list of tokens every n characters
78
        def splitN(str1, n):
79
            return [str1[start:start + n] for start in range(0, len(str1), n)]
80
81
        if not self.isOpen():
82
            raise DriverException("Device is closed")
83
84
        with self._lock:
85
            while True:
86
                sync = self._read(1, timeout=timeout)[0]
87
                if sync is not MESSAGE_TX_SYNC:
88
                    continue
89
                length = self._read(1, timeout=timeout)[0]
90
                type = self._read(1, timeout=timeout)[0]
91
                data = self._read(length, timeout=timeout)
92
                chk = self._read(1, timeout=timeout)[0]
93
                msg = Message(type, data)
94
                if self._debug:
95
                    logMsg = bytearray([sync, length, type])
96
                    logMsg.extend(data)
97
                    logMsg.append(chk)
98
                    timestamp = time.time()
99
100
                    # calculate frame size
101
                    print(logMsg.hex())
102
                    print(len(self.byteString2String(logMsg.hex())))
103
                    print(len(logMsg.hex()))
104
105
                    # pcap packet header that must preface every packet
106
                    pcap_packet_header = ('AA 77 9F 47'
107
                                          '90 A2 04 00'
108
                                          'XX XX XX XX'  # Frame Size (little endian)
109
                                          'YY YY YY YY')  # Frame Size (little endian)
110
111
                    pcap_len = len(self.byteString2String(logMsg.hex()))
112
                    print(pcap_len)
113
                    hex_str = "%08x" % pcap_len
114
                    reverse_hex_str = hex_str[6:] + hex_str[4:6] + hex_str[2:4] + hex_str[:2]
115
                    pcaph = pcap_packet_header.replace('XX XX XX XX', reverse_hex_str)
116
                    pcaph = pcaph.replace('YY YY YY YY', reverse_hex_str)
117
118
                    self.logfile.write(self.byteString2String(pcaph))
119
                    self.logfile.write(self.byteString2String(logMsg.hex()))
120
121
                if msg.checksum() == chk:
122
                    return msg
123
124
    def write(self, msg: Message) -> None:
125
        if not self.isOpen():
126
            raise DriverException("Device is closed")
127
128
        with self._lock:
129
            self._write(msg.encode())
130
131
    @abstractmethod
132
    def _isOpen(self) -> bool:
133
        pass
134
135
    @abstractmethod
136
    def _open(self) -> None:
137
        pass
138
139
    @abstractmethod
140
    def _close(self) -> None:
141
        pass
142
143
    @abstractmethod
144
    def _read(self, count: int, timeout=None) -> bytes:
145
        pass
146
147
    @abstractmethod
148
    def _write(self, data: bytes) -> None:
149
        pass
150
151
152
class SerialDriver(Driver):
153
    """
154
    An implementation of a serial ANT+ device driver
155
    """
156
157
    def __init__(self, device: str, baudRate: int = 115200, debug=False):
158
        super().__init__(debug=debug)
159
        self._device = device
160
        self._baudRate = baudRate
161
        self._serial = None
162
163
    def __str__(self):
164
        if self.isOpen():
165
            return self._device + " @ " + str(self._baudRate)
166
        return None
167
168
    def _isOpen(self) -> bool:
169
        return self._serial is None
170
171
    def _open(self) -> None:
172
        try:
173
            self._serial = Serial(self._device, self._baudRate)
174
        except SerialException as e:
175
            raise DriverException(str(e))
176
177
        if not self._serial.isOpen():
178
            raise DriverException("Could not open specified device")
179
180
    def _close(self) -> None:
181
        self._serial.close()
182
        self._serial = None
183
184
    def _read(self, count: int, timeout=None) -> bytes:
185
        return self._serial.read(count, timeout=timeout)
186
187
    def _write(self, data: bytes) -> None:
188
        try:
189
            self._serial.write(data)
190
            self._serial.flush()
191
        except SerialTimeoutException as e:
192
            raise DriverException(str(e))
193
194
195
class USBDriver(Driver):
196
    """
197
    An implementation of a USB ANT+ device driver
198
    """
199
200
    def __init__(self, vid, pid, debug=False):
201
        super().__init__(debug=debug)
202
        self._idVendor = vid
203
        self._idProduct = pid
204
        self._dev = None
205
        self._epOut = None
206
        self._epIn = None
207
        self._interfaceNumber = None
208
        self._packetSize = 0x20
209
        self._queue = None
210
        self._loop = None
211
        self._driver_open = False
212
213
    def __str__(self):
214
        if self.isOpen():
215
            return str(self._dev)
216
        return "Closed"
217
218
    def _isOpen(self) -> bool:
219
        return self._driver_open
220
221
    def _open(self) -> None:
222
        print('USB OPEN START')
223
        try:
224
            # find the first USB device that matches the filter
225
            self._dev = usb.core.find(idVendor=self._idVendor, idProduct=self._idProduct)
226
227
            if self._dev is None:
228
                raise DriverException("Could not open specified device")
229
230
            # Detach kernel driver
231
            try:
232
                if self._dev.is_kernel_driver_active(0):
233
                    try:
234
                        self._dev.detach_kernel_driver(0)
235
                    except usb.USBError as e:
236
                        raise DriverException("Could not detach kernel driver")
237
            except NotImplementedError:
238
                pass  # for non unix systems
239
240
            # set the active configuration. With no arguments, the first
241
            # configuration will be the active one
242
            self._dev.set_configuration()
243
244
            # get an endpoint instance
245
            cfg = self._dev.get_active_configuration()
246
            self._interfaceNumber = cfg[(0, 0)].bInterfaceNumber
247
            interface = usb.util.find_descriptor(cfg, bInterfaceNumber=self._interfaceNumber,
248
                                                 bAlternateSetting=usb.control.get_interface(self._dev,
249
                                                                                             self._interfaceNumber))
250
            usb.util.claim_interface(self._dev, self._interfaceNumber)
251
252
            self._epOut = usb.util.find_descriptor(interface, custom_match=lambda e: usb.util.endpoint_direction(
253
                e.bEndpointAddress) == usb.ENDPOINT_OUT)
254
255
            self._epIn = usb.util.find_descriptor(interface, custom_match=lambda e: usb.util.endpoint_direction(
256
                e.bEndpointAddress) == usb.ENDPOINT_IN)
257
258
            if self._epOut is None or self._epIn is None:
259
                raise DriverException("Could not initialize USB endpoint")
260
261
            self._queue = Queue()
262
            self._loop = USBLoop(self._epIn, self._packetSize, self._queue)
263
            self._loop.start()
264
            self._driver_open = True
265
            print('USB OPEN SUCCESS')
266
        except IOError as e:
267
            self._close()
268
            raise DriverException(str(e))
269
270
    def _close(self) -> None:
271
        print('USB CLOSE START')
272
        if self._loop is not None:
273
            if self._loop.is_alive():
274
                self._loop.stop()
275
                self._loop.join()
276
        self._loop = None
277
        try:
278
            self._dev.reset()
279
            usb.util.dispose_resources(self._dev)
280
        except:
281
            pass
282
        self._dev = self._epOut = self._epIn = None
283
        self._driver_open = False
284
        print('USB CLOSE END')
285
286
    def _read(self, count: int, timeout=None) -> bytes:
287
        data = bytearray()
288
        for i in range(0, count):
289
            b = self._queue.get(timeout=timeout)
290
            if b is None:
291
                self._close()
292
                raise DriverException("Device is closed!")
293
            data.append(b)
294
        return bytes(data)
295
296
    def _write(self, data: bytes) -> None:
297
        return self._epOut.write(data)
298
299
300
class USBLoop(Thread):
301
    def __init__(self, ep, packetSize: int, queue: Queue):
302
        super().__init__()
303
        self._stopper = Event()
304
        self._ep = ep
305
        self._packetSize = packetSize
306
        self._queue = queue
307
308
    def stop(self) -> None:
309
        self._stopper.set()
310
311
    def run(self) -> None:
312
        while not self._stopper.is_set():
313
            try:
314
                data = self._ep.read(self._packetSize, timeout=1000)
315
                for d in data:
316
                    self._queue.put(d)
317
            except usb.core.USBError as e:
318
                if e.errno not in (60, 110) and e.backend_error_code != -116: # Timout errors
319
                    self._stopper.set()
320
        #  We Put in an invalid byte so threads will realize the device is stopped
321
        self._queue.put(None)
322
323
class DummyDriver(Driver):
324
    def __init__(self):
325
        self._isopen = False
326
        self._data = Queue()
327
        msg1 = Message(MESSAGE_CHANNEL_BROADCAST_DATA, b'\x00\x10\x20\x30\x40\x50\x60\x70').encode()
328
        for b in msg1:
329
            self._data.put(b)
330
        msg2 = Message(MESSAGE_CHANNEL_BROADCAST_DATA, b'\x00\x01\x02\x03\x04\x05\x06\x07').encode()
331
        for b in msg2:
332
            self._data.put(b)
333
        super().__init__(debug=True)
334
335
    def _isOpen(self) -> bool:
336
        return self._isopen
337
338
    def _close(self) -> None:
339
        self._isopen = False
340
341
    def _read(self, count: int, timeout=None) -> bytes:
342
        data = bytearray()
343
        for i in range(0, count):
344
            data.append(self._data.get(timeout=timeout))
345
        return bytes(data)
346
347
    def _open(self) -> None:
348
        self._isopen = True
349
350
    def _write(self, data: bytes) -> None:
351
        pass
352