GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Pull Request — master (#3)
by
unknown
02:12 queued 01:06
created

Driver.splitN()   A

Complexity

Conditions 2

Size

Total Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 2
c 1
b 0
f 0
dl 0
loc 2
rs 10
1
from abc import abstractmethod
2
from queue import Queue, Empty
3
from threading import Lock, Thread, Event
4
5
import math
6
from serial import Serial, SerialException, SerialTimeoutException
7
8
import usb
9
import time
10
import binascii
11
from struct import *
12
13
from libAnt.constants import MESSAGE_TX_SYNC, MESSAGE_CHANNEL_BROADCAST_DATA
14
from libAnt.message import Message, SystemResetMessage
15
16
17
class DriverException(Exception):
18
    pass
19
20
21
class Driver:
22
    """
23
    The driver provides an interface to read and write raw data to and from an ANT+ capable hardware device
24
    """
25
26
    def __init__(self, debug=False):
27
        self._lock = Lock()
28
        self._debug = debug
29
        self.logfile = 'log.pcap'
30
        self._openTime = None
31
32
    def __enter__(self):
33
        self.open()
34
        return self
35
36
    def __exit__(self, exc_type, exc_val, exc_tb):
37
        self.close()
38
39
    def isOpen(self) -> bool:
40
        with self._lock:
41
            return self._isOpen()
42
43
    def open(self) -> None:
44
        with self._lock:
45
            if not self._isOpen():
46
                self._open()
47
                self._openTime = time.time()
48
                if self._debug:
49
                    # write pcap global header
50
                    magic_number = b'\xD4\xC3\xB2\xA1'
51
                    version_major = 2
52
                    version_minor = 4
53
                    thiszone = b'\x00\x00\x00\x00'
54
                    sigfigs = b'\x00\x00\x00\x00'
55
                    snaplen = b'\xFF\x00\x00\x00'
56
                    network = b'\x01\x00\x00\x00'
57
58
                    pcap_global_header = Struct('<4shh4s4s4s4s')
59
60
                    with open(self.logfile, 'wb') as log:
61
                        log.write(pcap_global_header.pack(magic_number, version_major, version_minor, thiszone, sigfigs, snaplen, network))
62
63
    def close(self) -> None:
64
        with self._lock:
65
            if self._isOpen:
66
                self._close()
67
68
    def reOpen(self) -> None:
69
        with self._lock:
70
            if self._isOpen():
71
                self._close()
72
            self._open()
73
74
    def read(self, timeout=None) -> Message:
75
        # Splits the string into a list of tokens every n characters
76
        def splitN(str1, n):
77
            return [str1[start:start + n] for start in range(0, len(str1), n)]
78
79
        if not self.isOpen():
80
            raise DriverException("Device is closed")
81
82
        with self._lock:
83
            while True:
84
                sync = self._read(1, timeout=timeout)[0]
85
                if sync is not MESSAGE_TX_SYNC:
86
                    continue
87
                length = self._read(1, timeout=timeout)[0]
88
                type = self._read(1, timeout=timeout)[0]
89
                data = self._read(length, timeout=timeout)
90
                chk = self._read(1, timeout=timeout)[0]
91
                msg = Message(type, data)
92
                if self._debug:
93
                    logMsg = bytearray([sync, length, type])
94
                    logMsg.extend(data)
95
                    logMsg.append(chk)
96
                    timestamp = time.time() - self._openTime
97
                    frac, whole = math.modf(timestamp)
98
99
                    ts_sec = int(whole).to_bytes(4, byteorder='little')
100
                    ts_usec = int(frac * 1000 * 1000).to_bytes(4, byteorder='little')
101
                    incl_len = len(logMsg)
102
                    orig_len = incl_len
103
104
                    pcap_packet_header = Struct('<4s4sll')
105
106
                    with open(self.logfile, 'ab') as log:
107
                        log.write(pcap_packet_header.pack(ts_sec, ts_usec, incl_len, orig_len))
108
                        log.write(logMsg)
109
110
                if msg.checksum() == chk:
111
                    return msg
112
113
    def write(self, msg: Message) -> None:
114
        if not self.isOpen():
115
            raise DriverException("Device is closed")
116
117
        with self._lock:
118
            self._write(msg.encode())
119
120
    @abstractmethod
121
    def _isOpen(self) -> bool:
122
        pass
123
124
    @abstractmethod
125
    def _open(self) -> None:
126
        pass
127
128
    @abstractmethod
129
    def _close(self) -> None:
130
        pass
131
132
    @abstractmethod
133
    def _read(self, count: int, timeout=None) -> bytes:
134
        pass
135
136
    @abstractmethod
137
    def _write(self, data: bytes) -> None:
138
        pass
139
140
141
class SerialDriver(Driver):
142
    """
143
    An implementation of a serial ANT+ device driver
144
    """
145
146
    def __init__(self, device: str, baudRate: int = 115200, debug=False):
147
        super().__init__(debug=debug)
148
        self._device = device
149
        self._baudRate = baudRate
150
        self._serial = None
151
152
    def __str__(self):
153
        if self.isOpen():
154
            return self._device + " @ " + str(self._baudRate)
155
        return None
156
157
    def _isOpen(self) -> bool:
158
        return self._serial is None
159
160
    def _open(self) -> None:
161
        try:
162
            self._serial = Serial(self._device, self._baudRate)
163
        except SerialException as e:
164
            raise DriverException(str(e))
165
166
        if not self._serial.isOpen():
167
            raise DriverException("Could not open specified device")
168
169
    def _close(self) -> None:
170
        self._serial.close()
171
        self._serial = None
172
173
    def _read(self, count: int, timeout=None) -> bytes:
174
        return self._serial.read(count, timeout=timeout)
175
176
    def _write(self, data: bytes) -> None:
177
        try:
178
            self._serial.write(data)
179
            self._serial.flush()
180
        except SerialTimeoutException as e:
181
            raise DriverException(str(e))
182
183
184
class USBDriver(Driver):
185
    """
186
    An implementation of a USB ANT+ device driver
187
    """
188
189
    def __init__(self, vid, pid, debug=False):
190
        super().__init__(debug=debug)
191
        self._idVendor = vid
192
        self._idProduct = pid
193
        self._dev = None
194
        self._epOut = None
195
        self._epIn = None
196
        self._interfaceNumber = None
197
        self._packetSize = 0x20
198
        self._queue = None
199
        self._loop = None
200
        self._driver_open = False
201
202
    def __str__(self):
203
        if self.isOpen():
204
            return str(self._dev)
205
        return "Closed"
206
207
    def _isOpen(self) -> bool:
208
        return self._driver_open
209
210
    def _open(self) -> None:
211
        print('USB OPEN START')
212
        try:
213
            # find the first USB device that matches the filter
214
            self._dev = usb.core.find(idVendor=self._idVendor, idProduct=self._idProduct)
215
216
            if self._dev is None:
217
                raise DriverException("Could not open specified device")
218
219
            # Detach kernel driver
220
            try:
221
                if self._dev.is_kernel_driver_active(0):
222
                    try:
223
                        self._dev.detach_kernel_driver(0)
224
                    except usb.USBError as e:
225
                        raise DriverException("Could not detach kernel driver")
226
            except NotImplementedError:
227
                pass  # for non unix systems
228
229
            # set the active configuration. With no arguments, the first
230
            # configuration will be the active one
231
            self._dev.set_configuration()
232
233
            # get an endpoint instance
234
            cfg = self._dev.get_active_configuration()
235
            self._interfaceNumber = cfg[(0, 0)].bInterfaceNumber
236
            interface = usb.util.find_descriptor(cfg, bInterfaceNumber=self._interfaceNumber,
237
                                                 bAlternateSetting=usb.control.get_interface(self._dev,
238
                                                                                             self._interfaceNumber))
239
            usb.util.claim_interface(self._dev, self._interfaceNumber)
240
241
            self._epOut = usb.util.find_descriptor(interface, custom_match=lambda e: usb.util.endpoint_direction(
242
                e.bEndpointAddress) == usb.ENDPOINT_OUT)
243
244
            self._epIn = usb.util.find_descriptor(interface, custom_match=lambda e: usb.util.endpoint_direction(
245
                e.bEndpointAddress) == usb.ENDPOINT_IN)
246
247
            if self._epOut is None or self._epIn is None:
248
                raise DriverException("Could not initialize USB endpoint")
249
250
            self._queue = Queue()
251
            self._loop = USBLoop(self._epIn, self._packetSize, self._queue)
252
            self._loop.start()
253
            self._driver_open = True
254
            print('USB OPEN SUCCESS')
255
        except IOError as e:
256
            self._close()
257
            raise DriverException(str(e))
258
259
    def _close(self) -> None:
260
        print('USB CLOSE START')
261
        if self._loop is not None:
262
            if self._loop.is_alive():
263
                self._loop.stop()
264
                self._loop.join()
265
        self._loop = None
266
        try:
267
            self._dev.reset()
268
            usb.util.dispose_resources(self._dev)
269
        except:
270
            pass
271
        self._dev = self._epOut = self._epIn = None
272
        self._driver_open = False
273
        print('USB CLOSE END')
274
275
    def _read(self, count: int, timeout=None) -> bytes:
276
        data = bytearray()
277
        for i in range(0, count):
278
            b = self._queue.get(timeout=timeout)
279
            if b is None:
280
                self._close()
281
                raise DriverException("Device is closed!")
282
            data.append(b)
283
        return bytes(data)
284
285
    def _write(self, data: bytes) -> None:
286
        return self._epOut.write(data)
287
288
289
class USBLoop(Thread):
290
    def __init__(self, ep, packetSize: int, queue: Queue):
291
        super().__init__()
292
        self._stopper = Event()
293
        self._ep = ep
294
        self._packetSize = packetSize
295
        self._queue = queue
296
297
    def stop(self) -> None:
298
        self._stopper.set()
299
300
    def run(self) -> None:
301
        while not self._stopper.is_set():
302
            try:
303
                data = self._ep.read(self._packetSize, timeout=1000)
304
                for d in data:
305
                    self._queue.put(d)
306
            except usb.core.USBError as e:
307
                if e.errno not in (60, 110) and e.backend_error_code != -116: # Timout errors
308
                    self._stopper.set()
309
        #  We Put in an invalid byte so threads will realize the device is stopped
310
        self._queue.put(None)
311
312
class DummyDriver(Driver):
313
    def __init__(self):
314
        self._isopen = False
315
        self._data = Queue()
316
        msg1 = Message(MESSAGE_CHANNEL_BROADCAST_DATA, b'\x00\x10\x20\x30\x40\x50\x60\x70').encode()
317
        for b in msg1:
318
            self._data.put(b)
319
        msg2 = Message(MESSAGE_CHANNEL_BROADCAST_DATA, b'\x00\x01\x02\x03\x04\x05\x06\x07').encode()
320
        for b in msg2:
321
            self._data.put(b)
322
        super().__init__(debug=True)
323
324
    def _isOpen(self) -> bool:
325
        return self._isopen
326
327
    def _close(self) -> None:
328
        self._isopen = False
329
330
    def _read(self, count: int, timeout=None) -> bytes:
331
        data = bytearray()
332
        for i in range(0, count):
333
            data.append(self._data.get(timeout=timeout))
334
        return bytes(data)
335
336
    def _open(self) -> None:
337
        self._isopen = True
338
339
    def _write(self, data: bytes) -> None:
340
        pass
341
342
class PcapDriver(Driver):
343
    def __init__(self, logfile):
344
        super().__init__(debug=False)
345
        self._isopen = False
346
        logfile = logfile
347
348
    def _isOpen(self) -> bool:
349
        return self._isopen
350
351
    def _close(self) -> None:
352
        self._isopen = False
353
354
    def _read(self, count: int, timeout=None) -> bytes:
355
        data = bytearray()
356
        print(count)
357
        with open(self.logfile, 'rb') as log:
358
            byte = log.read(1)
359
            while byte:
360
                data.extend(byte)
361
                print(byte)
362
                byte = log.read(1)
363
364
        print(data)
365
        return bytes(data)
366
367
    def _open(self) -> None:
368
        self._isopen = True
369
370
    def _write(self, data: bytes) -> None:
371
        pass
372