GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Push — serial-driver ( 0ebe12...675509 )
by Benjamin
01:14
created

Driver.read()   D

Complexity

Conditions 8

Size

Total Lines 27

Duplication

Lines 0
Ratio 0 %

Importance

Changes 17
Bugs 1 Features 0
Metric Value
cc 8
c 17
b 1
f 0
dl 0
loc 27
rs 4
1
import time
2
from abc import abstractmethod
3
from threading import Lock
4
5
from libAnt.constants import MESSAGE_TX_SYNC
6
from libAnt.loggers.logger import Logger
7
from libAnt.message import Message
8
9
10
class DriverException(Exception):
11
    pass
12
13
14
class Driver:
15
    """
16
    The driver provides an interface to read and write raw data to and from an ANT+ capable hardware device
17
    """
18
19
    def __init__(self, logger: Logger = None):
20
        self._lock = Lock()
21
        self._logger = logger
22
        self._openTime = None
23
24
    def __enter__(self):
25
        self.open()
26
        return self
27
28
    def __exit__(self, exc_type, exc_val, exc_tb):
29
        self.close()
30
31
    def isOpen(self) -> bool:
32
        with self._lock:
33
            return self._isOpen()
34
35
    def open(self) -> None:
36
        with self._lock:
37
            if not self._isOpen():
38
                self._openTime = time.time()
39
                if self._logger is not None:
40
                    self._logger.open()
41
                self._open()
42
43
    def close(self) -> None:
44
        with self._lock:
45
            if self._isOpen:
46
                self._close()
47
                if self._logger is not None:
48
                    self._logger.close()
49
50
    def reOpen(self) -> None:
51
        with self._lock:
52
            if self._isOpen():
53
                self._close()
54
            self._open()
55
56
    def read(self, timeout=None) -> Message:
57
        if not self.isOpen():
58
            raise DriverException("Device is closed")
59
60
        with self._lock:
61
            while True:
62
                try:
63
                    sync = self._read(1, timeout=timeout)[0]
64
                    if sync is not MESSAGE_TX_SYNC:
65
                        continue
66
                    length = self._read(1, timeout=timeout)[0]
67
                    type = self._read(1, timeout=timeout)[0]
68
                    data = self._read(length, timeout=timeout)
69
                    chk = self._read(1, timeout=timeout)[0]
70
                    msg = Message(type, data)
71
72
                    if self._logger:
73
                        logMsg = bytearray([sync, length, type])
74
                        logMsg.extend(data)
75
                        logMsg.append(chk)
76
77
                        self._logger.log(bytes(logMsg))
78
79
                    if msg.checksum() == chk:
80
                        return msg
81
                except IndexError:
82
                    pass
83
84
    def write(self, msg: Message) -> None:
85
        if not self.isOpen():
86
            raise DriverException("Device is closed")
87
88
        with self._lock:
89
            self._write(msg.encode())
90
91
    def abort(self) -> None:
92
        self._abort()
93
94
    @abstractmethod
95
    def _isOpen(self) -> bool:
96
        pass
97
98
    @abstractmethod
99
    def _open(self) -> None:
100
        pass
101
102
    @abstractmethod
103
    def _close(self) -> None:
104
        pass
105
106
    @abstractmethod
107
    def _read(self, count: int, timeout=None) -> bytes:
108
        pass
109
110
    @abstractmethod
111
    def _write(self, data: bytes) -> None:
112
        pass
113
114
    @abstractmethod
115
    def _abort(self) -> None:
116
        pass