GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Pull Request — master (#4)
by Benjamin
02:19 queued 01:07
created

SerialDriver._abort()   A

Complexity

Conditions 2

Size

Total Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 2
c 1
b 0
f 0
dl 0
loc 6
rs 9.4285
1
from queue import Queue
2
from threading import Thread, Event
3
4
from serial import Serial
5
from serial import SerialTimeoutException, SerialException
6
7
from libAnt.drivers.driver import Driver, DriverException
8
from libAnt.loggers.logger import Logger
9
10
11
class SerialDriver(Driver):
12
    """
13
    An implementation of a serial ANT+ device driver
14
    """
15
16
    def __init__(self, device: str, baudRate: int = 115200, logger: Logger = None):
17
        super().__init__(logger=logger)
18
        self._device = device
19
        self._baudRate = baudRate
20
        self._serial = None
21
22
    def __str__(self):
23
        if self.isOpen():
24
            return self._device + " @ " + str(self._baudRate)
25
        return None
26
27
    def _isOpen(self) -> bool:
28
        return self._serial is not None
29
30
    def _open(self) -> None:
31
        try:
32
            self._serial = Serial(port=self._device, baudrate=self._baudRate)
33
        except SerialException as e:
34
            raise DriverException(str(e))
35
36
        if not self._serial.isOpen():
37
            raise DriverException("Could not open specified device")
38
39
    def _close(self) -> None:
40
        self._serial.close()
41
        self._serial = None
42
43
    def _read(self, count: int, timeout=None) -> bytes:
44
        return self._serial.read(count)
45
46
    def _write(self, data: bytes) -> None:
47
        try:
48
            self._serial.write(data)
49
            self._serial.flush()
50
        except SerialTimeoutException as e:
51
            raise DriverException(str(e))
52
53
    def _abort(self) -> None:
54
        if self._serial is not None:
55
            self._serial.cancel_read()
56
            self._serial.cancel_write()
57
            self._serial.reset_input_buffer()
58
            self._serial.reset_output_buffer()