GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Push — master ( ca815e...75484e )
by Benjamin
11:24
created

USB2Driver.__init__()   A

Complexity

Conditions 1

Size

Total Lines 8

Duplication

Lines 0
Ratio 0 %

Importance

Changes 2
Bugs 0 Features 0
Metric Value
c 2
b 0
f 0
dl 0
loc 8
rs 9.4285
cc 1

1 Method

Rating   Name   Duplication   Size   Complexity  
A SerialDriver._read() 0 2 1
1
from abc import abstractmethod
2
from threading import Lock
3
4
from serial import Serial, SerialException, SerialTimeoutException
5
6
import usb
7
8
9
class DriverException(Exception):
10
    pass
11
12
13
class Driver:
14
    """
15
    The driver provides an interface to read and write raw data to and from an ANT+ capable hardware device
16
    """
17
18
    def __init__(self):
19
        self._lock = Lock()
20
21
    def __enter__(self):
22
        self.open()
23
24
    def __exit__(self, exc_type, exc_val, exc_tb):
25
        self.close()
26
27
    def isOpen(self):
28
        with self._lock:
29
            return self._isOpen
30
31
    def open(self):
32
        with self._lock:
33
            if not self._isOpen:
34
                self._open()
35
36
    def close(self):
37
        with self._lock:
38
            if self._isOpen:
39
                self._close()
40
41
    def reOpen(self):
42
        with self._lock:
43
            if self._isOpen:
44
                self._close()
45
                self._open()
46
47
    def read(self, count):
48
        if count <= 0:
49
            raise DriverException("Count must be > 0")
50
        if not self.isOpen():
51
            raise DriverException("Device is closed")
52
53
        with self._lock:
54
            return self._read(count)
55
56
    def write(self, msg):
57
        if not self.isOpen():
58
            raise DriverException("Device is closed")
59
60
        with self._lock:
61
            self.write(msg)
62
63
    @abstractmethod
64
    def _isOpen(self):
65
        pass
66
67
    @abstractmethod
68
    def _open(self):
69
        pass
70
71
    @abstractmethod
72
    def _close(self):
73
        pass
74
75
    @abstractmethod
76
    def _read(self, count):
77
        pass
78
79
    @abstractmethod
80
    def _write(self, data):
81
        pass
82
83
84
class SerialDriver(Driver):
85
    """
86
    An implementation of a serial ANT+ device driver
87
    """
88
89
    def __init__(self, device, baudRate=115200):
90
        super().__init__()
91
        self._device = device
92
        self._baudRate = baudRate
93
        self._serial = None
94
95
    def _isOpen(self):
96
        return self._serial is None
97
98
    def _open(self):
99
        try:
100
            self._dev = Serial(self._device, self._baudRate)
101
        except SerialException as e:
102
            raise DriverException(str(e))
103
104
        if not self._dev.isOpen():
105
            raise DriverException("Could not open specified device")
106
107
    def _close(self):
108
        self._serial.close()
109
        self._serial = None
110
111
    def _read(self, count):
112
        return self._serial.read(count)
113
114
    def _write(self, data):
115
        try:
116
            count = self._serial.write(data)
117
            self._serial.flush()
118
        except SerialTimeoutException as e:
119
            raise DriverException(str(e))
120
121
122
class USBDriver(Driver):
123
    """
124
    An implementation of a USB ANT+ device driver
125
    """
126
127
    def __init__(self, idVendor, idProduct):
128
        Driver.__init__(self)
129
        self._idVendor = idVendor
130
        self._idProduct = idProduct
131
        self._dev = None
132
        self._epOut = None
133
        self._epIn = None
134
        self._interfaceNumber = None
135
136
    def _isOpen(self):
137
        return self._dev is not None
138
139
    def _open(self):
140
        try:
141
            # find the first USB device that matches the filter
142
            self._dev = usb.core.find(idVendor=self._idVendor, idProduct=self._idProduct)
143
144
            if self._dev is None:
145
                raise DriverException("Could not open specified device")
146
147
            # Detach kernel driver
148
            if self._dev.is_kernel_driver_active(0):
149
                try:
150
                    self._dev.detach_kernel_driver(0)
151
                except usb.USBError as e:
152
                    raise DriverException("Could not detach kernel driver")
153
154
            # set the active configuration. With no arguments, the first
155
            # configuration will be the active one
156
            self._dev.set_configuration()
157
158
            # get an endpoint instance
159
            cfg = self._dev.get_active_configuration()
160
            self._interfaceNumber = cfg[(0, 0)].bInterfaceNumber
161
            interface = usb.util.find_descriptor(cfg, bInterfaceNumber=self._interfaceNumber,
162
                                                 bAlternateSetting=usb.control.get_interface(self._dev,
163
                                                                                             self._interfaceNumber))
164
            usb.util.claim_interface(self._dev, self._interfaceNumber)
165
166
            self._epOut = usb.util.find_descriptor(interface, custom_match=lambda e: usb.util.endpoint_direction(
167
                e.bEndpointAddress) == usb.ENDPOINT_OUT)
168
169
            self._ep_in = usb.util.find_descriptor(interface, custom_match=lambda e: usb.util.endpoint_direction(
170
                e.bEndpointAddress) == usb.ENDPOINT_IN)
171
172
            if self._epOut is None or self._ep_in is None:
173
                raise DriverException("Could not initialize USB endpoint")
174
        except IOError as e:
175
            raise DriverException(str(e))
176
177
    def _close(self):
178
        usb.util.release_interface(self._dev, self._interfaceNumber)
179
        usb.util.dispose_resources(self._dev)
180
        self._dev = self._epOut = self._epIn = None
181
182
    def _read(self, count):
183
        return self._epIn.read(count)
184
185
    def _write(self, data):
186
        return self._epOut.write(data)
187