GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Push — master ( 75484e...6f72c9 )
by Benjamin
01:10
created

SerialDriver._close()   A

Complexity

Conditions 1

Size

Total Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 2
Bugs 0 Features 0
Metric Value
cc 1
dl 0
loc 3
rs 10
c 2
b 0
f 0
1
from abc import abstractmethod
2
from threading import Lock
3
4
from serial import Serial, SerialException, SerialTimeoutException
5
6
import usb
7
8
9
class DriverException(Exception):
10
    pass
11
12
13
class Driver:
14
    """
15
    The driver provides an interface to read and write raw data to and from an ANT+ capable hardware device
16
    """
17
18
    def __init__(self):
19
        self._lock = Lock()
20
21
    def __enter__(self):
22
        self.open()
23
        return self
24
25
    def __exit__(self, exc_type, exc_val, exc_tb):
26
        self.close()
27
28
    def isOpen(self):
29
        with self._lock:
30
            return self._isOpen
31
32
    def open(self):
33
        with self._lock:
34
            if not self._isOpen:
35
                self._open()
36
37
    def close(self):
38
        with self._lock:
39
            if self._isOpen:
40
                self._close()
41
42
    def reOpen(self):
43
        with self._lock:
44
            if self._isOpen:
45
                self._close()
46
                self._open()
47
48
    def read(self, count):
49
        if count <= 0:
50
            raise DriverException("Count must be > 0")
51
        if not self.isOpen():
52
            raise DriverException("Device is closed")
53
54
        with self._lock:
55
            return self._read(count)
56
57
    def write(self, msg):
58
        if not self.isOpen():
59
            raise DriverException("Device is closed")
60
61
        with self._lock:
62
            self.write(msg)
63
64
    @abstractmethod
65
    def _isOpen(self):
66
        pass
67
68
    @abstractmethod
69
    def _open(self):
70
        pass
71
72
    @abstractmethod
73
    def _close(self):
74
        pass
75
76
    @abstractmethod
77
    def _read(self, count):
78
        pass
79
80
    @abstractmethod
81
    def _write(self, data):
82
        pass
83
84
85
class SerialDriver(Driver):
86
    """
87
    An implementation of a serial ANT+ device driver
88
    """
89
90
    def __init__(self, device, baudRate=115200):
91
        super().__init__()
92
        self._device = device
93
        self._baudRate = baudRate
94
        self._serial = None
95
96
    def __str__(self):
97
        if self.isOpen():
98
            return self._device + " @ " + str(self._baudRate)
99
        return None
100
101
    def _isOpen(self):
102
        return self._serial is None
103
104
    def _open(self):
105
        try:
106
            self._serial = Serial(self._device, self._baudRate)
107
        except SerialException as e:
108
            raise DriverException(str(e))
109
110
        if not self._serial.isOpen():
111
            raise DriverException("Could not open specified device")
112
113
    def _close(self):
114
        self._serial.close()
115
        self._serial = None
116
117
    def _read(self, count):
118
        return self._serial.read(count)
119
120
    def _write(self, data):
121
        try:
122
            count = self._serial.write(data)
123
            self._serial.flush()
124
        except SerialTimeoutException as e:
125
            raise DriverException(str(e))
126
127
128
class USBDriver(Driver):
129
    """
130
    An implementation of a USB ANT+ device driver
131
    """
132
133
    def __init__(self, idVendor, idProduct):
134
        Driver.__init__(self)
135
        self._idVendor = idVendor
136
        self._idProduct = idProduct
137
        self._dev = None
138
        self._epOut = None
139
        self._epIn = None
140
        self._interfaceNumber = None
141
142
    def __str__(self):
143
        if self.isOpen():
144
            return str(self._dev)
145
        return None
146
147
    def _isOpen(self):
148
        return self._dev is not None
149
150
    def _open(self):
151
        try:
152
            # find the first USB device that matches the filter
153
            self._dev = usb.core.find(idVendor=self._idVendor, idProduct=self._idProduct)
154
155
            if self._dev is None:
156
                raise DriverException("Could not open specified device")
157
158
            # Detach kernel driver
159
            if self._dev.is_kernel_driver_active(0):
160
                try:
161
                    self._dev.detach_kernel_driver(0)
162
                except usb.USBError as e:
163
                    raise DriverException("Could not detach kernel driver")
164
165
            # set the active configuration. With no arguments, the first
166
            # configuration will be the active one
167
            self._dev.set_configuration()
168
169
            # get an endpoint instance
170
            cfg = self._dev.get_active_configuration()
171
            self._interfaceNumber = cfg[(0, 0)].bInterfaceNumber
172
            interface = usb.util.find_descriptor(cfg, bInterfaceNumber=self._interfaceNumber,
173
                                                 bAlternateSetting=usb.control.get_interface(self._dev,
174
                                                                                             self._interfaceNumber))
175
            usb.util.claim_interface(self._dev, self._interfaceNumber)
176
177
            self._epOut = usb.util.find_descriptor(interface, custom_match=lambda e: usb.util.endpoint_direction(
178
                e.bEndpointAddress) == usb.ENDPOINT_OUT)
179
180
            self._ep_in = usb.util.find_descriptor(interface, custom_match=lambda e: usb.util.endpoint_direction(
181
                e.bEndpointAddress) == usb.ENDPOINT_IN)
182
183
            if self._epOut is None or self._ep_in is None:
184
                raise DriverException("Could not initialize USB endpoint")
185
        except IOError as e:
186
            raise DriverException(str(e))
187
188
    def _close(self):
189
        usb.util.release_interface(self._dev, self._interfaceNumber)
190
        usb.util.dispose_resources(self._dev)
191
        self._dev = self._epOut = self._epIn = None
192
193
    def _read(self, count):
194
        return self._epIn.read(count)
195
196
    def _write(self, data):
197
        return self._epOut.write(data)
198