UsbPlain.last_received_chars()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
dl 0
loc 2
rs 10
c 1
b 0
f 0
1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3
4
import logging
5
logger = logging.getLogger(__name__)
6
logger.debug("%s loaded", __name__)
7
8
import threading
9
import serial 
10
import time
0 ignored issues
show
Unused Code introduced by
The import time seems to be unused.
Loading history...
11
from os import linesep as OS_LINESEP
12
13
from doorpi.keyboard.AbstractBaseClass import KeyboardAbstractBaseClass, HIGH_LEVEL, LOW_LEVEL
0 ignored issues
show
Unused Code introduced by
Unused LOW_LEVEL imported from doorpi.keyboard.AbstractBaseClass
Loading history...
Unused Code introduced by
Unused HIGH_LEVEL imported from doorpi.keyboard.AbstractBaseClass
Loading history...
14
import doorpi
15
16
CONFIG = doorpi.DoorPi().config
17
18
def get(**kwargs): return UsbPlain(**kwargs)
19
class UsbPlain(KeyboardAbstractBaseClass):
20
    name = 'UsbPlain Keyboard'
21
22
    @property
23
    def last_received_chars(self): return self._last_received_chars
24
25
    _ser = None
26
27
    def read_usb_plain(self):
28
29
        self._last_received_chars = ""
30
        while not self._shutdown and self._ser.isOpen():
31
            # char aus buffer holen
32
            newChar = self._ser.read()
33
            if newChar == "": continue
34
35
            self._last_received_chars += str(newChar)
36
            logger.debug("new char %s read and is now %s", newChar, self._last_received_chars)
37
38
            for input_pin in self._InputPins:
39
                if self._last_received_chars.endswith(input_pin):
40
                    self.last_key = input_pin
41
                    self._fire_OnKeyDown(input_pin, __name__)
42
                    self._fire_OnKeyPressed(input_pin, __name__)
43
                    self._fire_OnKeyUp(input_pin, __name__)
44
45
            if self._last_received_chars.endswith(self._input_stop_flag):
46
                logger.debug("found input stop flag -> clear received chars")
47
                self._last_received_chars = ""
48
            if len(self._last_received_chars) > self._input_max_size:
49
                logger.debug("received chars bigger then input max size -> clear received chars")
50
                self._last_received_chars = ""
51
52
        return
53
54
    def __init__(self, input_pins, output_pins, conf_pre, conf_post, keyboard_name, *args, **kwargs):
0 ignored issues
show
Unused Code introduced by
The argument kwargs seems to be unused.
Loading history...
Unused Code introduced by
The argument args seems to be unused.
Loading history...
55
        logger.debug("FileSystem.__init__(input_pins = %s, output_pins = %s)", input_pins, output_pins)
56
        self.keyboard_name = keyboard_name
57
        self._InputPins = map(str, input_pins)
58
        self._OutputPins = map(str, output_pins)
59
60
        self._last_received_chars = ""
61
        self.last_key = ""
62
63
        for input_pin in self._InputPins:
64
            self._register_EVENTS_for_pin(input_pin, __name__)
65
66
        # use set_output to register status @ dict self.__OutputStatus
67
        for output_pin in self._OutputPins:
68
            self.set_output(output_pin, 0, False)
69
70
        # somit wirds aus der Config-Datei geladen, falls dort vorhanden.
71
        section_name = conf_pre+'keyboard'+conf_post
72
73
        port = CONFIG.get(section_name, 'port', "/dev/ttyUSB0")
74
        baudrate = CONFIG.get_int(section_name, 'baudrate', 9600)
75
76
        self._input_stop_flag = CONFIG.get(section_name, 'input_stop_flag', OS_LINESEP)
77
        self._input_max_size = CONFIG.get_int(section_name, 'input_max_size', 255)
78
        self._output_stop_flag = CONFIG.get(section_name, 'output_stop_flag', OS_LINESEP)
79
80
        self._ser = serial.Serial(port, baudrate)
81
82
        self._ser.timeout = 1             #block read, 0 for #non-block read, > 0 for timeout block read
83
        self._ser.close()
84
        #self._ser.bytesize = serial.EIGHTBITS       #number of bits per bytes
85
        #self._ser.parity = serial.PARITY_NONE       #set parity check: no parity
86
        #self._ser.stopbits = serial.STOPBITS_ONE    #number of stop bits
87
        #self._ser.xonxoff = False         #disable software flow control
88
        #self._ser.rtscts = False          #disable hardware (RTS/CTS) flow control
89
        #self._ser.dsrdtr = False          #disable hardware (DSR/DTR) flow control
90
        #self._ser.writeTimeout = 0        #timeout for write
91
92
        self._ser.open()
93
94
        self._shutdown = False
95
        self._thread = threading.Thread(target = self.read_usb_plain)
96
        self._thread.daemon = True
97
        self._thread.start()
98
99
        doorpi.DoorPi().event_handler.register_action('OnShutdown', self.destroy)
100
101
    def destroy(self):
102
        if self.is_destroyed: return
103
        logger.debug("destroy")
104
        self._shutdown = True
105
        if self._ser and self._ser.isOpen(): self._ser.close()
106
        doorpi.DoorPi().event_handler.unregister_source(__name__, True)
107
        self.__destroyed = True
108
        return
109
110
    def status_input(self, input_pin):
111
        logger.debug("status_input for tag %s", tag)
0 ignored issues
show
Comprehensibility Best Practice introduced by
Undefined variable 'tag'
Loading history...
112
        if input_pin == self.last_key:
113
            return True
114
        else:
115
            return False
116
117
    def set_output(self, pin, value, log_output = True):
0 ignored issues
show
Unused Code introduced by
The argument value seems to be unused.
Loading history...
118
        if self._ser and self._ser.isOpen():
119
            if log_output: logger.debug('try to write %s to serial usb plain', pin)
120
            self._ser.flushOutput()
121
            self._ser.write(pin + self._output_stop_flag)
122
            self._ser.flush()
123
            return True
124
        else:
125
            if log_output: logger.warning("couldn't write to serial usb plain, because it's not open")
126
            return False
127