GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Pull Request — master (#4)
by Benjamin
03:04 queued 01:48
created

Driver._abort()   A

Complexity

Conditions 1

Size

Total Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
c 1
b 0
f 0
dl 0
loc 3
rs 10
1
import time
2
from abc import abstractmethod
3
from threading import Lock
4
5
from libAnt.constants import MESSAGE_TX_SYNC
6
from libAnt.loggers.logger import Logger
7
from libAnt.message import Message
8
9
10
class DriverException(Exception):
11
    pass
12
13
14
class Driver:
15
    """
16
    The driver provides an interface to read and write raw data to and from an ANT+ capable hardware device
17
    """
18
19
    def __init__(self, logger: Logger = None):
20
        self._lock = Lock()
21
        self._logger = logger
22
        self._openTime = None
23
24
    def __enter__(self):
25
        self.open()
26
        return self
27
28
    def __exit__(self, exc_type, exc_val, exc_tb):
29
        self.close()
30
31
    def isOpen(self) -> bool:
32
        with self._lock:
33
            return self._isOpen()
34
35
    def open(self) -> None:
36
        with self._lock:
37
            if not self._isOpen():
38
                self._openTime = time.time()
39
                if self._logger is not None:
40
                    self._logger.open()
41
                self._open()
42
43
    def close(self) -> None:
44
        with self._lock:
45
            if self._isOpen:
46
                self._close()
47
                if self._logger is not None:
48
                    self._logger.close()
49
50
    def reOpen(self) -> None:
51
        with self._lock:
52
            if self._isOpen():
53
                self._close()
54
            self._open()
55
56
    def read(self, timeout=None) -> Message:
57
        if not self.isOpen():
58
            raise DriverException("Device is closed")
59
60
        with self._lock:
61
            while True:
62
                sync = self._read(1, timeout=timeout)[0]
63
                if sync is not MESSAGE_TX_SYNC:
64
                    continue
65
                length = self._read(1, timeout=timeout)[0]
66
                type = self._read(1, timeout=timeout)[0]
67
                data = self._read(length, timeout=timeout)
68
                chk = self._read(1, timeout=timeout)[0]
69
                msg = Message(type, data)
70
71
                if self._logger:
72
                    logMsg = bytearray([sync, length, type])
73
                    logMsg.extend(data)
74
                    logMsg.append(chk)
75
76
                    self._logger.log(bytes(logMsg))
77
78
                if msg.checksum() == chk:
79
                    return msg
80
81
    def write(self, msg: Message) -> None:
82
        if not self.isOpen():
83
            raise DriverException("Device is closed")
84
85
        with self._lock:
86
            self._write(msg.encode())
87
88
    def abort(self) -> None:
89
        with self._lock:
90
            self._abort()
91
92
    @abstractmethod
93
    def _isOpen(self) -> bool:
94
        pass
95
96
    @abstractmethod
97
    def _open(self) -> None:
98
        pass
99
100
    @abstractmethod
101
    def _close(self) -> None:
102
        pass
103
104
    @abstractmethod
105
    def _read(self, count: int, timeout=None) -> bytes:
106
        pass
107
108
    @abstractmethod
109
    def _write(self, data: bytes) -> None:
110
        pass
111
112
    @abstractmethod
113
    def _abort(self) -> None:
114
        pass