GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Pull Request — master (#4)
by Benjamin
02:19 queued 01:07
created

Driver.abort()   A

Complexity

Conditions 1

Size

Total Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
c 1
b 0
f 0
dl 0
loc 2
rs 10
1
import time
2
from abc import abstractmethod
3
from threading import Lock
4
5
from libAnt.constants import MESSAGE_TX_SYNC
6
from libAnt.loggers.logger import Logger
7
from libAnt.message import Message
8
9
10
class DriverException(Exception):
11
    pass
12
13
14
class Driver:
15
    """
16
    The driver provides an interface to read and write raw data to and from an ANT+ capable hardware device
17
    """
18
19
    def __init__(self, logger: Logger = None):
20
        self._lock = Lock()
21
        self._logger = logger
22
        self._openTime = None
23
24
    def __enter__(self):
25
        self.open()
26
        return self
27
28
    def __exit__(self, exc_type, exc_val, exc_tb):
29
        self.close()
30
31
    def isOpen(self) -> bool:
32
        with self._lock:
33
            return self._isOpen()
34
35
    def open(self) -> None:
36
        with self._lock:
37
            if not self._isOpen():
38
                self._openTime = time.time()
39
                if self._logger is not None:
40
                    self._logger.open()
41
                self._open()
42
43
    def close(self) -> None:
44
        with self._lock:
45
            if self._isOpen:
46
                self._close()
47
                if self._logger is not None:
48
                    self._logger.close()
49
50
    def reOpen(self) -> None:
51
        with self._lock:
52
            if self._isOpen():
53
                self._close()
54
            self._open()
55
56
    def read(self, timeout=None) -> Message:
57
        if not self.isOpen():
58
            raise DriverException("Device is closed")
59
60
        with self._lock:
61
            while True:
62
                sync = self._read(1, timeout=timeout)[0]
63
                if sync is not MESSAGE_TX_SYNC:
64
                    continue
65
                length = self._read(1, timeout=timeout)[0]
66
                type = self._read(1, timeout=timeout)[0]
67
                data = self._read(length, timeout=timeout)
68
                chk = self._read(1, timeout=timeout)[0]
69
                msg = Message(type, data)
70
71
                if self._logger:
72
                    logMsg = bytearray([sync, length, type])
73
                    logMsg.extend(data)
74
                    logMsg.append(chk)
75
76
                    self._logger.log(bytes(logMsg))
77
78
                if msg.checksum() == chk:
79
                    return msg
80
81
    def write(self, msg: Message) -> None:
82
        if not self.isOpen():
83
            raise DriverException("Device is closed")
84
85
        with self._lock:
86
            self._write(msg.encode())
87
88
    def abort(self) -> None:
89
        self._abort()
90
91
    @abstractmethod
92
    def _isOpen(self) -> bool:
93
        pass
94
95
    @abstractmethod
96
    def _open(self) -> None:
97
        pass
98
99
    @abstractmethod
100
    def _close(self) -> None:
101
        pass
102
103
    @abstractmethod
104
    def _read(self, count: int, timeout=None) -> bytes:
105
        pass
106
107
    @abstractmethod
108
    def _write(self, data: bytes) -> None:
109
        pass
110
111
    @abstractmethod
112
    def _abort(self) -> None:
113
        pass