GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Pull Request — master (#3)
by
unknown
02:08 queued 01:10
created

Driver.read()   F

Complexity

Conditions 10

Size

Total Lines 39

Duplication

Lines 0
Ratio 0 %

Importance

Changes 11
Bugs 0 Features 0
Metric Value
cc 10
c 11
b 0
f 0
dl 0
loc 39
rs 3.1304

1 Method

Rating   Name   Duplication   Size   Complexity  
A Driver.splitN() 0 2 2

How to fix   Complexity   

Complexity

Complex classes like Driver.read() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
from abc import abstractmethod
2
from queue import Queue, Empty
3
from threading import Lock, Thread, Event
4
5
import math
6
from serial import Serial, SerialException, SerialTimeoutException
7
8
import usb
9
import time
10
from struct import *
11
import os
12
13
from libAnt.constants import MESSAGE_TX_SYNC, MESSAGE_CHANNEL_BROADCAST_DATA
14
from libAnt.message import Message, SystemResetMessage
15
16
17
class DriverException(Exception):
18
    pass
19
20
21
class Driver:
22
    """
23
    The driver provides an interface to read and write raw data to and from an ANT+ capable hardware device
24
    """
25
26
    def __init__(self, debug=False):
27
        self._lock = Lock()
28
        self._debug = debug
29
        self.logfile = 'log.pcap'
30
        self._openTime = None
31
32
    def __enter__(self):
33
        self.open()
34
        return self
35
36
    def __exit__(self, exc_type, exc_val, exc_tb):
37
        self.close()
38
39
    def isOpen(self) -> bool:
40
        with self._lock:
41
            return self._isOpen()
42
43
    def open(self) -> None:
44
        with self._lock:
45
            if not self._isOpen():
46
                self._open()
47
                self._openTime = time.time()
48
                if self._debug:
49
                    # write pcap global header
50
                    magic_number = b'\xD4\xC3\xB2\xA1'
51
                    version_major = 2
52
                    version_minor = 4
53
                    thiszone = b'\x00\x00\x00\x00'
54
                    sigfigs = b'\x00\x00\x00\x00'
55
                    snaplen = b'\xFF\x00\x00\x00'
56
                    network = b'\x01\x00\x00\x00'
57
58
                    pcap_global_header = Struct('<4shh4s4s4s4s')
59
60
                    with open(self.logfile, 'wb') as log:
61
                        log.write(pcap_global_header.pack(magic_number, version_major, version_minor, thiszone, sigfigs, snaplen, network))
62
63
    def close(self) -> None:
64
        with self._lock:
65
            if self._isOpen:
66
                self._close()
67
68
    def reOpen(self) -> None:
69
        with self._lock:
70
            if self._isOpen():
71
                self._close()
72
            self._open()
73
74
    def read(self, timeout=None) -> Message:
75
        # Splits the string into a list of tokens every n characters
76
        def splitN(str1, n):
77
            return [str1[start:start + n] for start in range(0, len(str1), n)]
78
79
        if not self.isOpen():
80
            raise DriverException("Device is closed")
81
82
        with self._lock:
83
            while True:
84
                sync = self._read(1, timeout=timeout)[0]
85
                if sync is not MESSAGE_TX_SYNC:
86
                    continue
87
                length = self._read(1, timeout=timeout)[0]
88
                type = self._read(1, timeout=timeout)[0]
89
                data = self._read(length, timeout=timeout)
90
                chk = self._read(1, timeout=timeout)[0]
91
                msg = Message(type, data)
92
                if self._debug:
93
                    logMsg = bytearray([sync, length, type])
94
                    logMsg.extend(data)
95
                    logMsg.append(chk)
96
                    timestamp = time.time() - self._openTime
97
                    frac, whole = math.modf(timestamp)
98
                    print(timestamp)
99
100
                    ts_sec = int(whole).to_bytes(4, byteorder='little')
101
                    ts_usec = int(frac * 1000 * 1000).to_bytes(4, byteorder='little')
102
                    incl_len = len(logMsg)
103
                    orig_len = incl_len
104
105
                    pcap_packet_header = Struct('<4s4sll')
106
107
                    with open(self.logfile, 'ab') as log:
108
                        log.write(pcap_packet_header.pack(ts_sec, ts_usec, incl_len, orig_len))
109
                        log.write(logMsg)
110
111
                if msg.checksum() == chk:
112
                    return msg
113
114
    def write(self, msg: Message) -> None:
115
        if not self.isOpen():
116
            raise DriverException("Device is closed")
117
118
        with self._lock:
119
            self._write(msg.encode())
120
121
    @abstractmethod
122
    def _isOpen(self) -> bool:
123
        pass
124
125
    @abstractmethod
126
    def _open(self) -> None:
127
        pass
128
129
    @abstractmethod
130
    def _close(self) -> None:
131
        pass
132
133
    @abstractmethod
134
    def _read(self, count: int, timeout=None) -> bytes:
135
        pass
136
137
    @abstractmethod
138
    def _write(self, data: bytes) -> None:
139
        pass
140
141
142
class SerialDriver(Driver):
143
    """
144
    An implementation of a serial ANT+ device driver
145
    """
146
147
    def __init__(self, device: str, baudRate: int = 115200, debug=False):
148
        super().__init__(debug=debug)
149
        self._device = device
150
        self._baudRate = baudRate
151
        self._serial = None
152
153
    def __str__(self):
154
        if self.isOpen():
155
            return self._device + " @ " + str(self._baudRate)
156
        return None
157
158
    def _isOpen(self) -> bool:
159
        return self._serial is None
160
161
    def _open(self) -> None:
162
        try:
163
            self._serial = Serial(self._device, self._baudRate)
164
        except SerialException as e:
165
            raise DriverException(str(e))
166
167
        if not self._serial.isOpen():
168
            raise DriverException("Could not open specified device")
169
170
    def _close(self) -> None:
171
        self._serial.close()
172
        self._serial = None
173
174
    def _read(self, count: int, timeout=None) -> bytes:
175
        return self._serial.read(count, timeout=timeout)
176
177
    def _write(self, data: bytes) -> None:
178
        try:
179
            self._serial.write(data)
180
            self._serial.flush()
181
        except SerialTimeoutException as e:
182
            raise DriverException(str(e))
183
184
185
class USBDriver(Driver):
186
    """
187
    An implementation of a USB ANT+ device driver
188
    """
189
190
    def __init__(self, vid, pid, debug=False):
191
        super().__init__(debug=debug)
192
        self._idVendor = vid
193
        self._idProduct = pid
194
        self._dev = None
195
        self._epOut = None
196
        self._epIn = None
197
        self._interfaceNumber = None
198
        self._packetSize = 0x20
199
        self._queue = None
200
        self._loop = None
201
        self._driver_open = False
202
203
    def __str__(self):
204
        if self.isOpen():
205
            return str(self._dev)
206
        return "Closed"
207
208
    def _isOpen(self) -> bool:
209
        return self._driver_open
210
211
    def _open(self) -> None:
212
        print('USB OPEN START')
213
        try:
214
            # find the first USB device that matches the filter
215
            self._dev = usb.core.find(idVendor=self._idVendor, idProduct=self._idProduct)
216
217
            if self._dev is None:
218
                raise DriverException("Could not open specified device")
219
220
            # Detach kernel driver
221
            try:
222
                if self._dev.is_kernel_driver_active(0):
223
                    try:
224
                        self._dev.detach_kernel_driver(0)
225
                    except usb.USBError as e:
226
                        raise DriverException("Could not detach kernel driver")
227
            except NotImplementedError:
228
                pass  # for non unix systems
229
230
            # set the active configuration. With no arguments, the first
231
            # configuration will be the active one
232
            self._dev.set_configuration()
233
234
            # get an endpoint instance
235
            cfg = self._dev.get_active_configuration()
236
            self._interfaceNumber = cfg[(0, 0)].bInterfaceNumber
237
            interface = usb.util.find_descriptor(cfg, bInterfaceNumber=self._interfaceNumber,
238
                                                 bAlternateSetting=usb.control.get_interface(self._dev,
239
                                                                                             self._interfaceNumber))
240
            usb.util.claim_interface(self._dev, self._interfaceNumber)
241
242
            self._epOut = usb.util.find_descriptor(interface, custom_match=lambda e: usb.util.endpoint_direction(
243
                e.bEndpointAddress) == usb.ENDPOINT_OUT)
244
245
            self._epIn = usb.util.find_descriptor(interface, custom_match=lambda e: usb.util.endpoint_direction(
246
                e.bEndpointAddress) == usb.ENDPOINT_IN)
247
248
            if self._epOut is None or self._epIn is None:
249
                raise DriverException("Could not initialize USB endpoint")
250
251
            self._queue = Queue()
252
            self._loop = USBLoop(self._epIn, self._packetSize, self._queue)
253
            self._loop.start()
254
            self._driver_open = True
255
            print('USB OPEN SUCCESS')
256
        except IOError as e:
257
            self._close()
258
            raise DriverException(str(e))
259
260
    def _close(self) -> None:
261
        print('USB CLOSE START')
262
        if self._loop is not None:
263
            if self._loop.is_alive():
264
                self._loop.stop()
265
                self._loop.join()
266
        self._loop = None
267
        try:
268
            self._dev.reset()
269
            usb.util.dispose_resources(self._dev)
270
        except:
271
            pass
272
        self._dev = self._epOut = self._epIn = None
273
        self._driver_open = False
274
        print('USB CLOSE END')
275
276
    def _read(self, count: int, timeout=None) -> bytes:
277
        data = bytearray()
278
        for i in range(0, count):
279
            b = self._queue.get(timeout=timeout)
280
            if b is None:
281
                self._close()
282
                raise DriverException("Device is closed!")
283
            data.append(b)
284
        return bytes(data)
285
286
    def _write(self, data: bytes) -> None:
287
        return self._epOut.write(data)
288
289
290
class USBLoop(Thread):
291
    def __init__(self, ep, packetSize: int, queue: Queue):
292
        super().__init__()
293
        self._stopper = Event()
294
        self._ep = ep
295
        self._packetSize = packetSize
296
        self._queue = queue
297
298
    def stop(self) -> None:
299
        self._stopper.set()
300
301
    def run(self) -> None:
302
        while not self._stopper.is_set():
303
            try:
304
                data = self._ep.read(self._packetSize, timeout=1000)
305
                for d in data:
306
                    self._queue.put(d)
307
            except usb.core.USBError as e:
308
                if e.errno not in (60, 110) and e.backend_error_code != -116: # Timout errors
309
                    self._stopper.set()
310
        #  We Put in an invalid byte so threads will realize the device is stopped
311
        self._queue.put(None)
312
313
class DummyDriver(Driver):
314
    def __init__(self):
315
        self._isopen = False
316
        self._data = Queue()
317
        msg1 = Message(MESSAGE_CHANNEL_BROADCAST_DATA, b'\x00\x01\x02\x03\x04\x05\x06\x07').encode()
318
        for b in msg1:
319
            self._data.put(b)
320
        msg2 = Message(MESSAGE_CHANNEL_BROADCAST_DATA, b'\x00\xF1\xF2\xF3\xF4\xF5\xF6\xF7').encode()
321
        for b in msg2:
322
            self._data.put(b)
323
        msg3 = Message(MESSAGE_CHANNEL_BROADCAST_DATA, b'\x00\xF9\xFA\xFB\xFC\xFD\xFE\xFF').encode()
324
        for b in msg3:
325
            self._data.put(b)
326
        super().__init__(debug=True)
327
328
    def _isOpen(self) -> bool:
329
        return self._isopen
330
331
    def _close(self) -> None:
332
        self._isopen = False
333
334
    def _read(self, count: int, timeout=None) -> bytes:
335
        data = bytearray()
336
        for i in range(0, count):
337
            data.append(self._data.get(timeout=timeout))
338
        return bytes(data)
339
340
    def _open(self) -> None:
341
        self._isopen = True
342
343
    def _write(self, data: bytes) -> None:
344
        pass
345
346
class PcapDriver(Driver):
347
    def __init__(self, logfile):
348
        super().__init__(debug=False)
349
        self._isopen = False
350
        self._logfile = logfile
351
        self._buffer = Queue()
352
353
    def _isOpen(self) -> bool:
354
        return self._isopen
355
356
    def _open(self) -> None:
357
        self._isopen = True
358
        self._log = open(self._logfile, 'rb')
359
        self._EOF = os.stat(self._logfile).st_size
360
        # move file pointer to first packet header
361
        global_header_length = 24
362
        self._log.seek(global_header_length, 0)
363
364
365
    def _close(self) -> None:
366
        self._isopen = False
367
        self._log.close()
368
369
    def _read(self, count: int, timeout=None) -> bytes:
370
        def read_next_packet_into_buffer():
371
            if self._log.tell() is self._EOF:
372
                print("EOF")
373
                return
374
375
            ts_sec, = unpack('i', self._log.read(4))
376
            ts_usec = unpack('i', self._log.read(4))[0]/1000000
377
            ts = ts_sec + ts_usec
378
            print("ts: ", ts)
379
            time.sleep(ts)
380
381
            packet_length = unpack('i', self._log.read(4))[0]
382
            print("packet length: ", packet_length)
383
            self._log.seek(4, 1)
384
            for i in range(0, packet_length):
385
                self._buffer.put(self._log.read(1))
386
387
        result = bytearray()
388
389
        print("Reading ", count, " byte(s)...")
390
        while len(result) < count:
391
            if self._buffer.empty():
392
                print("reading next packet into buffer...")
393
                read_next_packet_into_buffer()
394
            try:
395
                result.extend(self._buffer.get(block=True, timeout = timeout))
396
            except Empty:
397
                print("Timed out..")
398
399
        print("result: ", bytes(result))
400
        return bytes(result)
401
402
    def _write(self, data: bytes) -> None:
403
        pass
404