GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Pull Request — master (#4)
by Benjamin
03:31 queued 02:14
created

Driver._abort()   A

Complexity

Conditions 1

Size

Total Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
c 1
b 0
f 0
dl 0
loc 3
rs 10
1
import time
2
from abc import abstractmethod
3
from queue import Empty
4
from threading import Lock
5
6
from libAnt.constants import MESSAGE_TX_SYNC
7
from libAnt.loggers.logger import Logger
8
from libAnt.message import Message
9
10
11
class DriverException(Exception):
12
    pass
13
14
15
class Driver:
16
    """
17
    The driver provides an interface to read and write raw data to and from an ANT+ capable hardware device
18
    """
19
20
    def __init__(self, logger: Logger = None):
21
        self._lock = Lock()
22
        self._logger = logger
23
        self._openTime = None
24
25
    def __enter__(self):
26
        self.open()
27
        return self
28
29
    def __exit__(self, exc_type, exc_val, exc_tb):
30
        self.close()
31
32
    def isOpen(self) -> bool:
33
        with self._lock:
34
            return self._isOpen()
35
36
    def open(self) -> None:
37
        with self._lock:
38
            if not self._isOpen():
39
                self._openTime = time.time()
40
                if self._logger is not None:
41
                    self._logger.open()
42
                self._open()
43
44
    def close(self) -> None:
45
        with self._lock:
46
            if self._isOpen:
47
                self._close()
48
                if self._logger is not None:
49
                    self._logger.close()
50
51
    def reOpen(self) -> None:
52
        with self._lock:
53
            if self._isOpen():
54
                self._close()
55
            self._open()
56
57
    def read(self, timeout=None) -> Message:
58
        if not self.isOpen():
59
            raise DriverException("Device is closed")
60
61
        with self._lock:
62
            while True:
63
                try:
64
                    sync = self._read(1, timeout=timeout)[0]
65
                    if sync is not MESSAGE_TX_SYNC:
66
                        continue
67
                    length = self._read(1, timeout=timeout)[0]
68
                    type = self._read(1, timeout=timeout)[0]
69
                    data = self._read(length, timeout=timeout)
70
                    chk = self._read(1, timeout=timeout)[0]
71
                    msg = Message(type, data)
72
73
                    if self._logger:
74
                        logMsg = bytearray([sync, length, type])
75
                        logMsg.extend(data)
76
                        logMsg.append(chk)
77
78
                        self._logger.log(bytes(logMsg))
79
80
                    if msg.checksum() == chk:
81
                        return msg
82
                except IndexError:
83
                    raise Empty
84
85
    def write(self, msg: Message) -> None:
86
        if not self.isOpen():
87
            raise DriverException("Device is closed")
88
89
        with self._lock:
90
            self._write(msg.encode())
91
92
    def abort(self) -> None:
93
        self._abort()
94
95
    @abstractmethod
96
    def _isOpen(self) -> bool:
97
        pass
98
99
    @abstractmethod
100
    def _open(self) -> None:
101
        pass
102
103
    @abstractmethod
104
    def _close(self) -> None:
105
        pass
106
107
    @abstractmethod
108
    def _read(self, count: int, timeout=None) -> bytes:
109
        pass
110
111
    @abstractmethod
112
    def _write(self, data: bytes) -> None:
113
        pass
114
115
    @abstractmethod
116
    def _abort(self) -> None:
117
        pass