GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Pull Request — master (#3)
by
unknown
02:10 queued 01:04
created

Driver.read()   F

Complexity

Conditions 10

Size

Total Lines 47

Duplication

Lines 0
Ratio 0 %

Importance

Changes 9
Bugs 0 Features 0
Metric Value
cc 10
c 9
b 0
f 0
dl 0
loc 47
rs 3.4285

1 Method

Rating   Name   Duplication   Size   Complexity  
A Driver.splitN() 0 2 2

How to fix   Complexity   

Complexity

Complex classes like Driver.read() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
from abc import abstractmethod
2
from queue import Queue, Empty
3
from threading import Lock, Thread, Event
4
5
import math
6
from serial import Serial, SerialException, SerialTimeoutException
7
8
import usb
9
import time
10
import binascii
11
from struct import *
12
13
from libAnt.constants import MESSAGE_TX_SYNC, MESSAGE_CHANNEL_BROADCAST_DATA
14
from libAnt.message import Message, SystemResetMessage
15
16
17
class DriverException(Exception):
18
    pass
19
20
21
class Driver:
22
    """
23
    The driver provides an interface to read and write raw data to and from an ANT+ capable hardware device
24
    """
25
26
    def __init__(self, debug=False):
27
        self._lock = Lock()
28
        self._debug = debug
29
        self.logfile = 'log.pcap'
30
        self._openTime = None
31
32
    def __enter__(self):
33
        self.open()
34
        return self
35
36
    def __exit__(self, exc_type, exc_val, exc_tb):
37
        self.close()
38
39
    def isOpen(self) -> bool:
40
        with self._lock:
41
            return self._isOpen()
42
43
    def open(self) -> None:
44
        with self._lock:
45
            if not self._isOpen():
46
                self._open()
47
                self._openTime = time.time()
48
                if self._debug:
49
                    # write pcap global header
50
                    magic_number = b'\xD4\xC3\xB2\xA1'
51
                    version_major = 2
52
                    version_minor = 4
53
                    thiszone = b'\x00\x00\x00\x00'
54
                    sigfigs = b'\x00\x00\x00\x00'
55
                    snaplen = b'\xFF\x00\x00\x00'
56
                    network = b'\x01\x00\x00\x00'
57
58
                    pcap_global_header = Struct('<4shh4s4s4s4s')
59
60
                    with open(self.logfile, 'wb') as log:
61
                        log.write(pcap_global_header.pack(magic_number, version_major, version_minor, thiszone, sigfigs, snaplen, network))
62
63
    def close(self) -> None:
64
        with self._lock:
65
            if self._isOpen:
66
                self._close()
67
68
    def reOpen(self) -> None:
69
        with self._lock:
70
            if self._isOpen():
71
                self._close()
72
            self._open()
73
74
    def read(self, timeout=None) -> Message:
75
        # Splits the string into a list of tokens every n characters
76
        def splitN(str1, n):
77
            return [str1[start:start + n] for start in range(0, len(str1), n)]
78
79
        if not self.isOpen():
80
            raise DriverException("Device is closed")
81
82
        with self._lock:
83
            while True:
84
                # TODO: fix index out of range error
85
                # sync = self._read(1, timeout=timeout)
86
                # print(sync)
87
                # if sync == b'':
88
                #     print("EOF")
89
                #     continue
90
                # else:
91
                #     sync = sync[0]
92
93
                sync = self._read(1, timeout=timeout)[0]
94
                if sync is not MESSAGE_TX_SYNC:
95
                    continue
96
                length = self._read(1, timeout=timeout)[0]
97
                type = self._read(1, timeout=timeout)[0]
98
                data = self._read(length, timeout=timeout)
99
                chk = self._read(1, timeout=timeout)[0]
100
                msg = Message(type, data)
101
                if self._debug:
102
                    logMsg = bytearray([sync, length, type])
103
                    logMsg.extend(data)
104
                    logMsg.append(chk)
105
                    timestamp = time.time() - self._openTime
106
                    frac, whole = math.modf(timestamp)
107
108
                    ts_sec = int(whole).to_bytes(4, byteorder='little')
109
                    ts_usec = int(frac * 1000 * 1000).to_bytes(4, byteorder='little')
110
                    incl_len = len(logMsg)
111
                    orig_len = incl_len
112
113
                    pcap_packet_header = Struct('<4s4sll')
114
115
                    with open(self.logfile, 'ab') as log:
116
                        log.write(pcap_packet_header.pack(ts_sec, ts_usec, incl_len, orig_len))
117
                        log.write(logMsg)
118
119
                if msg.checksum() == chk:
120
                    return msg
121
122
    def write(self, msg: Message) -> None:
123
        if not self.isOpen():
124
            raise DriverException("Device is closed")
125
126
        with self._lock:
127
            self._write(msg.encode())
128
129
    @abstractmethod
130
    def _isOpen(self) -> bool:
131
        pass
132
133
    @abstractmethod
134
    def _open(self) -> None:
135
        pass
136
137
    @abstractmethod
138
    def _close(self) -> None:
139
        pass
140
141
    @abstractmethod
142
    def _read(self, count: int, timeout=None) -> bytes:
143
        pass
144
145
    @abstractmethod
146
    def _write(self, data: bytes) -> None:
147
        pass
148
149
150
class SerialDriver(Driver):
151
    """
152
    An implementation of a serial ANT+ device driver
153
    """
154
155
    def __init__(self, device: str, baudRate: int = 115200, debug=False):
156
        super().__init__(debug=debug)
157
        self._device = device
158
        self._baudRate = baudRate
159
        self._serial = None
160
161
    def __str__(self):
162
        if self.isOpen():
163
            return self._device + " @ " + str(self._baudRate)
164
        return None
165
166
    def _isOpen(self) -> bool:
167
        return self._serial is None
168
169
    def _open(self) -> None:
170
        try:
171
            self._serial = Serial(self._device, self._baudRate)
172
        except SerialException as e:
173
            raise DriverException(str(e))
174
175
        if not self._serial.isOpen():
176
            raise DriverException("Could not open specified device")
177
178
    def _close(self) -> None:
179
        self._serial.close()
180
        self._serial = None
181
182
    def _read(self, count: int, timeout=None) -> bytes:
183
        return self._serial.read(count, timeout=timeout)
184
185
    def _write(self, data: bytes) -> None:
186
        try:
187
            self._serial.write(data)
188
            self._serial.flush()
189
        except SerialTimeoutException as e:
190
            raise DriverException(str(e))
191
192
193
class USBDriver(Driver):
194
    """
195
    An implementation of a USB ANT+ device driver
196
    """
197
198
    def __init__(self, vid, pid, debug=False):
199
        super().__init__(debug=debug)
200
        self._idVendor = vid
201
        self._idProduct = pid
202
        self._dev = None
203
        self._epOut = None
204
        self._epIn = None
205
        self._interfaceNumber = None
206
        self._packetSize = 0x20
207
        self._queue = None
208
        self._loop = None
209
        self._driver_open = False
210
211
    def __str__(self):
212
        if self.isOpen():
213
            return str(self._dev)
214
        return "Closed"
215
216
    def _isOpen(self) -> bool:
217
        return self._driver_open
218
219
    def _open(self) -> None:
220
        print('USB OPEN START')
221
        try:
222
            # find the first USB device that matches the filter
223
            self._dev = usb.core.find(idVendor=self._idVendor, idProduct=self._idProduct)
224
225
            if self._dev is None:
226
                raise DriverException("Could not open specified device")
227
228
            # Detach kernel driver
229
            try:
230
                if self._dev.is_kernel_driver_active(0):
231
                    try:
232
                        self._dev.detach_kernel_driver(0)
233
                    except usb.USBError as e:
234
                        raise DriverException("Could not detach kernel driver")
235
            except NotImplementedError:
236
                pass  # for non unix systems
237
238
            # set the active configuration. With no arguments, the first
239
            # configuration will be the active one
240
            self._dev.set_configuration()
241
242
            # get an endpoint instance
243
            cfg = self._dev.get_active_configuration()
244
            self._interfaceNumber = cfg[(0, 0)].bInterfaceNumber
245
            interface = usb.util.find_descriptor(cfg, bInterfaceNumber=self._interfaceNumber,
246
                                                 bAlternateSetting=usb.control.get_interface(self._dev,
247
                                                                                             self._interfaceNumber))
248
            usb.util.claim_interface(self._dev, self._interfaceNumber)
249
250
            self._epOut = usb.util.find_descriptor(interface, custom_match=lambda e: usb.util.endpoint_direction(
251
                e.bEndpointAddress) == usb.ENDPOINT_OUT)
252
253
            self._epIn = usb.util.find_descriptor(interface, custom_match=lambda e: usb.util.endpoint_direction(
254
                e.bEndpointAddress) == usb.ENDPOINT_IN)
255
256
            if self._epOut is None or self._epIn is None:
257
                raise DriverException("Could not initialize USB endpoint")
258
259
            self._queue = Queue()
260
            self._loop = USBLoop(self._epIn, self._packetSize, self._queue)
261
            self._loop.start()
262
            self._driver_open = True
263
            print('USB OPEN SUCCESS')
264
        except IOError as e:
265
            self._close()
266
            raise DriverException(str(e))
267
268
    def _close(self) -> None:
269
        print('USB CLOSE START')
270
        if self._loop is not None:
271
            if self._loop.is_alive():
272
                self._loop.stop()
273
                self._loop.join()
274
        self._loop = None
275
        try:
276
            self._dev.reset()
277
            usb.util.dispose_resources(self._dev)
278
        except:
279
            pass
280
        self._dev = self._epOut = self._epIn = None
281
        self._driver_open = False
282
        print('USB CLOSE END')
283
284
    def _read(self, count: int, timeout=None) -> bytes:
285
        data = bytearray()
286
        for i in range(0, count):
287
            b = self._queue.get(timeout=timeout)
288
            if b is None:
289
                self._close()
290
                raise DriverException("Device is closed!")
291
            data.append(b)
292
        return bytes(data)
293
294
    def _write(self, data: bytes) -> None:
295
        return self._epOut.write(data)
296
297
298
class USBLoop(Thread):
299
    def __init__(self, ep, packetSize: int, queue: Queue):
300
        super().__init__()
301
        self._stopper = Event()
302
        self._ep = ep
303
        self._packetSize = packetSize
304
        self._queue = queue
305
306
    def stop(self) -> None:
307
        self._stopper.set()
308
309
    def run(self) -> None:
310
        while not self._stopper.is_set():
311
            try:
312
                data = self._ep.read(self._packetSize, timeout=1000)
313
                for d in data:
314
                    self._queue.put(d)
315
            except usb.core.USBError as e:
316
                if e.errno not in (60, 110) and e.backend_error_code != -116: # Timout errors
317
                    self._stopper.set()
318
        #  We Put in an invalid byte so threads will realize the device is stopped
319
        self._queue.put(None)
320
321
class DummyDriver(Driver):
322
    def __init__(self):
323
        self._isopen = False
324
        self._data = Queue()
325
        msg1 = Message(MESSAGE_CHANNEL_BROADCAST_DATA, b'\x00\x10\x20\x30\x40\x50\x60\x70').encode()
326
        for b in msg1:
327
            self._data.put(b)
328
        msg2 = Message(MESSAGE_CHANNEL_BROADCAST_DATA, b'\x00\x01\x02\x03\x04\x05\x06\x07').encode()
329
        for b in msg2:
330
            self._data.put(b)
331
        super().__init__(debug=True)
332
333
    def _isOpen(self) -> bool:
334
        return self._isopen
335
336
    def _close(self) -> None:
337
        self._isopen = False
338
339
    def _read(self, count: int, timeout=None) -> bytes:
340
        data = bytearray()
341
        for i in range(0, count):
342
            data.append(self._data.get(timeout=timeout))
343
        return bytes(data)
344
345
    def _open(self) -> None:
346
        self._isopen = True
347
348
    def _write(self, data: bytes) -> None:
349
        pass
350
351
class PcapDriver(Driver):
352
    def __init__(self, logfile):
353
        super().__init__(debug=False)
354
        self._isopen = False
355
        self.logfile = logfile
356
        self.buffer = Queue()
357
358
    def _isOpen(self) -> bool:
359
        return self._isopen
360
361
    def _open(self) -> None:
362
        self._isopen = True
363
        self.log = open(self.logfile, 'rb')
364
365
    def _close(self) -> None:
366
        self._isopen = False
367
        self.log.close()
368
369
    def _read(self, count: int, timeout=None) -> bytes:
370
        while True:
371
            sync = self.log.read(1)
372
            if sync == b'':
373
                print("EOF")
374
                break
375
            else:
376
                sync = sync[0]
377
            if sync is not MESSAGE_TX_SYNC:
378
                continue
379
            length = self.log.read(1)[0]
380
            type = self.log.read(1)[0]
381
            data = self.log.read(length)
382
            chk = self.log.read(1)[0]
383
384
            packet = bytearray([sync, length, type])
385
            packet.extend(data)
386
            packet.append(chk)
387
            print("packet: ", packet)
388
            self.buffer.put(packet)
389
390
        items = bytearray()
391
        # while not self.buffer.empty():
392
        #     # print("reading from self.buffer")
393
        #     # items.append(self.buffer.get()[0])
394
        #     # print("items: ", items)
395
396
        return bytes(items)
397
398
399
    def _write(self, data: bytes) -> None:
400
        pass
401