GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Pull Request — master (#3)
by
unknown
02:19 queued 01:11
created

PcapDriver._write()   A

Complexity

Conditions 1

Size

Total Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
c 1
b 0
f 0
dl 0
loc 2
rs 10
1
from abc import abstractmethod
2
from queue import Queue, Empty
3
from threading import Lock, Thread, Event
4
5
from serial import Serial, SerialException, SerialTimeoutException
6
7
import usb
8
import time
9
import binascii
10
from struct import *
11
12
from libAnt.constants import MESSAGE_TX_SYNC, MESSAGE_CHANNEL_BROADCAST_DATA
13
from libAnt.message import Message, SystemResetMessage
14
15
16
class DriverException(Exception):
17
    pass
18
19
20
class Driver:
21
    """
22
    The driver provides an interface to read and write raw data to and from an ANT+ capable hardware device
23
    """
24
25
    def __init__(self, debug=False):
26
        self._lock = Lock()
27
        self._debug = debug
28
        self.logfile = 'log.pcap'
29
30
    def __enter__(self):
31
        self.open()
32
        return self
33
34
    def __exit__(self, exc_type, exc_val, exc_tb):
35
        self.close()
36
37
    def isOpen(self) -> bool:
38
        with self._lock:
39
            return self._isOpen()
40
41
    def open(self) -> None:
42
        with self._lock:
43
            if not self._isOpen():
44
                self._open()
45
            if self._debug:
46
                # write pcap global header
47
                magic_number = b'\xD4\xC3\xB2\xA1'
48
                version_major = 2
49
                version_minor = 4
50
                thiszone = b'\x00\x00\x00\x00'
51
                sigfigs = b'\x00\x00\x00\x00'
52
                snaplen = b'\xFF\x00\x00\x00'
53
                network = b'\x01\x00\x00\x00'
54
55
                pcap_global_header = Struct('<4shh4s4s4s4s')
56
57
                with open(self.logfile, 'wb') as log:
58
                    log.write(pcap_global_header.pack(magic_number, version_major, version_minor, thiszone, sigfigs, snaplen, network))
59
60
    def close(self) -> None:
61
        with self._lock:
62
            if self._isOpen:
63
                self._close()
64
65
    def reOpen(self) -> None:
66
        with self._lock:
67
            if self._isOpen():
68
                self._close()
69
            self._open()
70
71
    def read(self, timeout=None) -> Message:
72
        # Splits the string into a list of tokens every n characters
73
        def splitN(str1, n):
74
            return [str1[start:start + n] for start in range(0, len(str1), n)]
75
76
        if not self.isOpen():
77
            raise DriverException("Device is closed")
78
79
        with self._lock:
80
            while True:
81
                sync = self._read(1, timeout=timeout)[0]
82
                if sync is not MESSAGE_TX_SYNC:
83
                    continue
84
                length = self._read(1, timeout=timeout)[0]
85
                type = self._read(1, timeout=timeout)[0]
86
                data = self._read(length, timeout=timeout)
87
                chk = self._read(1, timeout=timeout)[0]
88
                msg = Message(type, data)
89
                if self._debug:
90
                    logMsg = bytearray([sync, length, type])
91
                    logMsg.extend(data)
92
                    logMsg.append(chk)
93
                    # timestamp = time.time()
94
95
                    ts_sec = b'\xAA\x77\x9F\x47'
96
                    ts_usec = b'\x90\xA2\x04\x00'
97
                    incl_len = len(logMsg)
98
                    orig_len = incl_len
99
100
                    pcap_packet_header = Struct('<4s4sll')
101
102
                    with open(self.logfile, 'ab') as log:
103
                        log.write(pcap_packet_header.pack(ts_sec, ts_usec, incl_len, orig_len))
104
                        log.write(logMsg)
105
106
                if msg.checksum() == chk:
107
                    return msg
108
109
    def write(self, msg: Message) -> None:
110
        if not self.isOpen():
111
            raise DriverException("Device is closed")
112
113
        with self._lock:
114
            self._write(msg.encode())
115
116
    @abstractmethod
117
    def _isOpen(self) -> bool:
118
        pass
119
120
    @abstractmethod
121
    def _open(self) -> None:
122
        pass
123
124
    @abstractmethod
125
    def _close(self) -> None:
126
        pass
127
128
    @abstractmethod
129
    def _read(self, count: int, timeout=None) -> bytes:
130
        pass
131
132
    @abstractmethod
133
    def _write(self, data: bytes) -> None:
134
        pass
135
136
137
class SerialDriver(Driver):
138
    """
139
    An implementation of a serial ANT+ device driver
140
    """
141
142
    def __init__(self, device: str, baudRate: int = 115200, debug=False):
143
        super().__init__(debug=debug)
144
        self._device = device
145
        self._baudRate = baudRate
146
        self._serial = None
147
148
    def __str__(self):
149
        if self.isOpen():
150
            return self._device + " @ " + str(self._baudRate)
151
        return None
152
153
    def _isOpen(self) -> bool:
154
        return self._serial is None
155
156
    def _open(self) -> None:
157
        try:
158
            self._serial = Serial(self._device, self._baudRate)
159
        except SerialException as e:
160
            raise DriverException(str(e))
161
162
        if not self._serial.isOpen():
163
            raise DriverException("Could not open specified device")
164
165
    def _close(self) -> None:
166
        self._serial.close()
167
        self._serial = None
168
169
    def _read(self, count: int, timeout=None) -> bytes:
170
        return self._serial.read(count, timeout=timeout)
171
172
    def _write(self, data: bytes) -> None:
173
        try:
174
            self._serial.write(data)
175
            self._serial.flush()
176
        except SerialTimeoutException as e:
177
            raise DriverException(str(e))
178
179
180
class USBDriver(Driver):
181
    """
182
    An implementation of a USB ANT+ device driver
183
    """
184
185
    def __init__(self, vid, pid, debug=False):
186
        super().__init__(debug=debug)
187
        self._idVendor = vid
188
        self._idProduct = pid
189
        self._dev = None
190
        self._epOut = None
191
        self._epIn = None
192
        self._interfaceNumber = None
193
        self._packetSize = 0x20
194
        self._queue = None
195
        self._loop = None
196
        self._driver_open = False
197
198
    def __str__(self):
199
        if self.isOpen():
200
            return str(self._dev)
201
        return "Closed"
202
203
    def _isOpen(self) -> bool:
204
        return self._driver_open
205
206
    def _open(self) -> None:
207
        print('USB OPEN START')
208
        try:
209
            # find the first USB device that matches the filter
210
            self._dev = usb.core.find(idVendor=self._idVendor, idProduct=self._idProduct)
211
212
            if self._dev is None:
213
                raise DriverException("Could not open specified device")
214
215
            # Detach kernel driver
216
            try:
217
                if self._dev.is_kernel_driver_active(0):
218
                    try:
219
                        self._dev.detach_kernel_driver(0)
220
                    except usb.USBError as e:
221
                        raise DriverException("Could not detach kernel driver")
222
            except NotImplementedError:
223
                pass  # for non unix systems
224
225
            # set the active configuration. With no arguments, the first
226
            # configuration will be the active one
227
            self._dev.set_configuration()
228
229
            # get an endpoint instance
230
            cfg = self._dev.get_active_configuration()
231
            self._interfaceNumber = cfg[(0, 0)].bInterfaceNumber
232
            interface = usb.util.find_descriptor(cfg, bInterfaceNumber=self._interfaceNumber,
233
                                                 bAlternateSetting=usb.control.get_interface(self._dev,
234
                                                                                             self._interfaceNumber))
235
            usb.util.claim_interface(self._dev, self._interfaceNumber)
236
237
            self._epOut = usb.util.find_descriptor(interface, custom_match=lambda e: usb.util.endpoint_direction(
238
                e.bEndpointAddress) == usb.ENDPOINT_OUT)
239
240
            self._epIn = usb.util.find_descriptor(interface, custom_match=lambda e: usb.util.endpoint_direction(
241
                e.bEndpointAddress) == usb.ENDPOINT_IN)
242
243
            if self._epOut is None or self._epIn is None:
244
                raise DriverException("Could not initialize USB endpoint")
245
246
            self._queue = Queue()
247
            self._loop = USBLoop(self._epIn, self._packetSize, self._queue)
248
            self._loop.start()
249
            self._driver_open = True
250
            print('USB OPEN SUCCESS')
251
        except IOError as e:
252
            self._close()
253
            raise DriverException(str(e))
254
255
    def _close(self) -> None:
256
        print('USB CLOSE START')
257
        if self._loop is not None:
258
            if self._loop.is_alive():
259
                self._loop.stop()
260
                self._loop.join()
261
        self._loop = None
262
        try:
263
            self._dev.reset()
264
            usb.util.dispose_resources(self._dev)
265
        except:
266
            pass
267
        self._dev = self._epOut = self._epIn = None
268
        self._driver_open = False
269
        print('USB CLOSE END')
270
271
    def _read(self, count: int, timeout=None) -> bytes:
272
        data = bytearray()
273
        for i in range(0, count):
274
            b = self._queue.get(timeout=timeout)
275
            if b is None:
276
                self._close()
277
                raise DriverException("Device is closed!")
278
            data.append(b)
279
        return bytes(data)
280
281
    def _write(self, data: bytes) -> None:
282
        return self._epOut.write(data)
283
284
285
class USBLoop(Thread):
286
    def __init__(self, ep, packetSize: int, queue: Queue):
287
        super().__init__()
288
        self._stopper = Event()
289
        self._ep = ep
290
        self._packetSize = packetSize
291
        self._queue = queue
292
293
    def stop(self) -> None:
294
        self._stopper.set()
295
296
    def run(self) -> None:
297
        while not self._stopper.is_set():
298
            try:
299
                data = self._ep.read(self._packetSize, timeout=1000)
300
                for d in data:
301
                    self._queue.put(d)
302
            except usb.core.USBError as e:
303
                if e.errno not in (60, 110) and e.backend_error_code != -116: # Timout errors
304
                    self._stopper.set()
305
        #  We Put in an invalid byte so threads will realize the device is stopped
306
        self._queue.put(None)
307
308
class DummyDriver(Driver):
309
    def __init__(self):
310
        self._isopen = False
311
        self._data = Queue()
312
        msg1 = Message(MESSAGE_CHANNEL_BROADCAST_DATA, b'\x00\x10\x20\x30\x40\x50\x60\x70').encode()
313
        for b in msg1:
314
            self._data.put(b)
315
        msg2 = Message(MESSAGE_CHANNEL_BROADCAST_DATA, b'\x00\x01\x02\x03\x04\x05\x06\x07').encode()
316
        for b in msg2:
317
            self._data.put(b)
318
        super().__init__(debug=True)
319
320
    def _isOpen(self) -> bool:
321
        return self._isopen
322
323
    def _close(self) -> None:
324
        self._isopen = False
325
326
    def _read(self, count: int, timeout=None) -> bytes:
327
        data = bytearray()
328
        for i in range(0, count):
329
            data.append(self._data.get(timeout=timeout))
330
        return bytes(data)
331
332
    def _open(self) -> None:
333
        self._isopen = True
334
335
    def _write(self, data: bytes) -> None:
336
        pass
337
338
class PcapDriver(Driver):
339
    def __init__(self):
340
        super().__init__(debug=True)
341
        self._isopen = False
342
343
        with open(self.logfile, 'rb') as log:
344
            data = log.read()
345
346
        print(data)
347
348
    def _isOpen(self) -> bool:
349
        return self._isopen
350
351
    def _close(self) -> None:
352
        self._isopen = False
353
        if self.logfile:
354
            self.logfile.close()
355
356
    def _read(self, count: int, timeout=None) -> bytes:
357
        return bytes(1)
358
359
    def _open(self) -> None:
360
        self._isopen = True
361
362
    def _write(self, data: bytes) -> None:
363
        pass
364