GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Push — master ( 4aaa2e...925db5 )
by Benjamin
01:12
created

Driver.write()   B

Complexity

Conditions 5

Size

Total Lines 21

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 21
rs 8.439
cc 5
1
from abc import abstractmethod
2
from threading import Lock
3
4
from serial import Serial, SerialException, SerialTimeoutException
5
6
import usb
7
8
from libAnt.constants import MESSAGE_TX_SYNC
9
10
11
class DriverException(Exception):
12
    pass
13
14
15
class Driver:
16
    """
17
    The driver provides an interface to read and write raw data to and from an ANT+ capable hardware device
18
    """
19
20
    def __init__(self):
21
        self._lock = Lock()
22
23
    def __enter__(self):
24
        self.open()
25
        return self
26
27
    def __exit__(self, exc_type, exc_val, exc_tb):
28
        self.close()
29
30
    def isOpen(self) -> bool:
31
        with self._lock:
32
            return self._isOpen()
33
34
    def open(self) -> None:
35
        with self._lock:
36
            if not self._isOpen():
37
                self._open()
38
39
    def close(self) -> None:
40
        with self._lock:
41
            if self._isOpen():
42
                self._close()
43
44
    def reOpen(self) -> None:
45
        with self._lock:
46
            if self._isOpen():
47
                self._close()
48
                self._open()
49
50
    def read(self, count: int) -> bytearray:
51
        if count <= 0:
52
            raise DriverException("Count must be > 0")
53
        if not self.isOpen():
54
            raise DriverException("Device is closed")
55
56
        with self._lock:
57
            return self._read(count)
58
59
    def write(self, type: int, msg: bytearray) -> None:
60
        if not self.isOpen():
61
            raise DriverException("Device is closed")
62
        payload = bytearray()
63
        payload.append(MESSAGE_TX_SYNC)
64
        payload.append(len(msg))
65
        payload.append(type)
66
        payload.extend(msg)
67
68
        checksum = 0
69
        for b in payload:
70
            checksum ^= b
71
72
        payload.append(checksum)
73
74
        for p in payload:
75
            print(hex(p), end="")
76
77
78
        with self._lock:
79
            self._write(payload)
80
81
    @abstractmethod
82
    def _isOpen(self):
83
        pass
84
85
    @abstractmethod
86
    def _open(self):
87
        pass
88
89
    @abstractmethod
90
    def _close(self):
91
        pass
92
93
    @abstractmethod
94
    def _read(self, count):
95
        pass
96
97
    @abstractmethod
98
    def _write(self, data):
99
        pass
100
101
102
class SerialDriver(Driver):
103
    """
104
    An implementation of a serial ANT+ device driver
105
    """
106
107
    def __init__(self, device, baudRate=115200):
108
        super().__init__()
109
        self._device = device
110
        self._baudRate = baudRate
111
        self._serial = None
112
113
    def __str__(self):
114
        if self.isOpen():
115
            return self._device + " @ " + str(self._baudRate)
116
        return None
117
118
    def _isOpen(self):
119
        return self._serial is None
120
121
    def _open(self):
122
        try:
123
            self._serial = Serial(self._device, self._baudRate)
124
        except SerialException as e:
125
            raise DriverException(str(e))
126
127
        if not self._serial.isOpen():
128
            raise DriverException("Could not open specified device")
129
130
    def _close(self):
131
        self._serial.close()
132
        self._serial = None
133
134
    def _read(self, count):
135
        return self._serial.read(count)
136
137
    def _write(self, data):
138
        try:
139
            count = self._serial.write(data)
140
            self._serial.flush()
141
        except SerialTimeoutException as e:
142
            raise DriverException(str(e))
143
144
145
class USBDriver(Driver):
146
    """
147
    An implementation of a USB ANT+ device driver
148
    """
149
150
    def __init__(self, vid, pid):
151
        Driver.__init__(self)
152
        self._idVendor = vid
153
        self._idProduct = pid
154
        self._dev = None
155
        self._epOut = None
156
        self._epIn = None
157
        self._interfaceNumber = None
158
159
    def __str__(self):
160
        if self.isOpen():
161
            return str(self._dev)
162
        return "Closed"
163
164
    def _isOpen(self):
165
        return self._dev is not None
166
167
    def _open(self):
168
        try:
169
            # find the first USB device that matches the filter
170
            self._dev = usb.core.find(idVendor=self._idVendor, idProduct=self._idProduct)
171
172
            if self._dev is None:
173
                raise DriverException("Could not open specified device")
174
175
            # Detach kernel driver
176
            try:
177
                if self._dev.is_kernel_driver_active(0):
178
                    try:
179
                        self._dev.detach_kernel_driver(0)
180
                    except usb.USBError as e:
181
                        raise DriverException("Could not detach kernel driver")
182
            except NotImplementedError:
183
                pass  # for non unix systems
184
185
            # set the active configuration. With no arguments, the first
186
            # configuration will be the active one
187
            self._dev.set_configuration()
188
189
            # get an endpoint instance
190
            cfg = self._dev.get_active_configuration()
191
            self._interfaceNumber = cfg[(0, 0)].bInterfaceNumber
192
            interface = usb.util.find_descriptor(cfg, bInterfaceNumber=self._interfaceNumber,
193
                                                 bAlternateSetting=usb.control.get_interface(self._dev,
194
                                                                                             self._interfaceNumber))
195
            usb.util.claim_interface(self._dev, self._interfaceNumber)
196
197
            self._epOut = usb.util.find_descriptor(interface, custom_match=lambda e: usb.util.endpoint_direction(
198
                e.bEndpointAddress) == usb.ENDPOINT_OUT)
199
200
            self._epIn = usb.util.find_descriptor(interface, custom_match=lambda e: usb.util.endpoint_direction(
201
                e.bEndpointAddress) == usb.ENDPOINT_IN)
202
203
            if self._epOut is None or self._epIn is None:
204
                raise DriverException("Could not initialize USB endpoint")
205
        except IOError as e:
206
            raise DriverException(str(e))
207
208
    def _close(self):
209
        usb.util.release_interface(self._dev, self._interfaceNumber)
210
        usb.util.dispose_resources(self._dev)
211
        self._dev = self._epOut = self._epIn = None
212
213
    def _read(self, count):
214
        return self._epIn.read(count, timeout=-1)  # wait as long as it takes :)
215
216
    def _write(self, data):
217
        return self._epOut.write(data)
218