GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Push — master ( a63176...af9207 )
by Benjamin
02:18
created

USB2Driver._read()   A

Complexity

Conditions 1

Size

Total Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
dl 0
loc 2
rs 10
c 1
b 0
f 0
1
from abc import abstractmethod
2
from threading import Lock
3
4
from serial import Serial, SerialException, SerialTimeoutException
5
6
import usb
7
8
9
class DriverException(Exception):
10
    pass
11
12
13
class Driver:
14
    def __init__(self):
15
        self._lock = Lock()
16
17
    def __enter__(self):
18
        self.open()
19
20
    def __exit__(self, exc_type, exc_val, exc_tb):
21
        self.close()
22
23
    def isOpen(self):
24
        with self._lock:
25
            return self._isOpen
26
27
    def open(self):
28
        with self._lock:
29
            if not self._isOpen:
30
                self._open()
31
32
    def close(self):
33
        with self._lock:
34
            if self._isOpen:
35
                self._close()
36
37
    def read(self, count):
38
        if count <= 0:
39
            raise DriverException("Count must be > 0")
40
        if not self.isOpen():
41
            raise DriverException("Device is closed")
42
43
        with self._lock:
44
            return self._read(count)
45
46
    def write(self, msg):
47
        if not self.isOpen():
48
            raise DriverException("Device is closed")
49
50
        with self._lock:
51
            self.write(msg)
52
53
    @abstractmethod
54
    def _isOpen(self):
55
        pass
56
57
    @abstractmethod
58
    def _open(self):
59
        pass
60
61
    @abstractmethod
62
    def _close(self):
63
        pass
64
65
    @abstractmethod
66
    def _read(self, count):
67
        pass
68
69
    @abstractmethod
70
    def _write(self, data):
71
        pass
72
73
74
class SerialDriver(Driver):
75
    def __init__(self, device, baudRate=115200):
76
        super().__init__()
77
        self._device = device
78
        self._baudRate = baudRate
79
        self._serial = None
80
81
    def _isOpen(self):
82
        return self._serial is None
83
84
    def _open(self):
85
        try:
86
            self._dev = Serial(self._device, self._baudRate)
87
        except SerialException as e:
88
            raise DriverException(str(e))
89
90
        if not self._dev.isOpen():
91
            raise DriverException("Could not open specified device")
92
93
    def _close(self):
94
        self._serial.close()
95
        self._serial = None
96
97
    def _read(self, count):
98
        return self._serial.read(count)
99
100
    def _write(self, data):
101
        try:
102
            count = self._serial.write(data)
103
            self._serial.flush()
104
        except SerialTimeoutException as e:
105
            raise DriverException(str(e))
106
107
108
class USB2Driver(Driver):
109
    def __init__(self, idVendor, idProduct):
110
        Driver.__init__(self)
111
        self._idVendor = idVendor
112
        self._idProduct = idProduct
113
        self._dev = None
114
        self._epOut = None
115
        self._epIn = None
116
        self._interfaceNumber = None
117
118
    def _isOpen(self):
119
        return self._dev is not None
120
121
    def _open(self):
122
        try:
123
            # find the first USB device that matches the filter
124
            self._dev = usb.core.find(idVendor=self._idVendor, idProduct=self._idProduct)
125
126
            if self._dev is None:
127
                raise DriverException("Could not open specified device")
128
129
            # Detach kernel driver
130
            if self._dev.is_kernel_driver_active(0):
131
                try:
132
                    self._dev.detach_kernel_driver(0)
133
                except usb.USBError as e:
134
                    raise DriverException("Could not detach kernel driver")
135
136
            # set the active configuration. With no arguments, the first
137
            # configuration will be the active one
138
            self._dev.set_configuration()
139
140
            # get an endpoint instance
141
            cfg = self._dev.get_active_configuration()
142
            self._interfaceNumber = cfg[(0, 0)].bInterfaceNumber
143
            interface = usb.util.find_descriptor(cfg, bInterfaceNumber=self._interfaceNumber,
144
                                                 bAlternateSetting=usb.control.get_interface(self._dev,
145
                                                                                             self._interfaceNumber))
146
            usb.util.claim_interface(self._dev, self._interfaceNumber)
147
148
            self._epOut = usb.util.find_descriptor(interface, custom_match=lambda e: usb.util.endpoint_direction(
149
                e.bEndpointAddress) == usb.ENDPOINT_OUT)
150
151
            self._ep_in = usb.util.find_descriptor(interface, custom_match=lambda e: usb.util.endpoint_direction(
152
                e.bEndpointAddress) == usb.ENDPOINT_IN)
153
154
            if self._epOut is None or self._ep_in is None:
155
                raise DriverException("Could not initialize USB endpoint")
156
        except IOError as e:
157
            raise DriverException(str(e))
158
159
    def _close(self):
160
        usb.util.release_interface(self._dev, self._interfaceNumber)
161
        usb.util.dispose_resources(self._dev)
162
        self._dev = self._epOut = self._epIn = None
163
164
    def _read(self, count):
165
        return self._epIn.read(count)
166
167
    def _write(self, data):
168
        return self._epOut.write(data)
169