Issues (158)

doorpi/keyboard/from_filesystem.py (4 issues)

1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3
4
import logging
5
logger = logging.getLogger(__name__)
6
logger.debug("%s loaded", __name__)
7
8
import os
9
import ntpath
10
from time import sleep
0 ignored issues
show
Unused sleep imported from time
Loading history...
11
12
from watchdog.observers import Observer
13
from watchdog.events import FileSystemEventHandler
14
15
from doorpi.keyboard.AbstractBaseClass import KeyboardAbstractBaseClass, HIGH_LEVEL, LOW_LEVEL
16
import doorpi
17
18
def path_leaf(path):
19
    head, tail = ntpath.split(path)
20
    return tail or ntpath.basename(head)
21
22
class MissingMandatoryParameter(Exception): pass
23
24
def get(**kwargs): return FileSystem(**kwargs)
25
class FileSystem(KeyboardAbstractBaseClass, FileSystemEventHandler):
26
27
    __reset_file = None
28
    
29
    def __init__(self, input_pins, output_pins, conf_pre, conf_post, keyboard_name, polarity = 0, *args, **kwargs):
0 ignored issues
show
The argument kwargs seems to be unused.
Loading history...
The argument args seems to be unused.
Loading history...
30
        logger.debug("FileSystem.__init__(input_pins = %s, output_pins = %s, polarity = %s)",
31
                     input_pins, output_pins, polarity)
32
        self.keyboard_name = keyboard_name
33
        self._polarity = polarity
34
        self._InputPins = map(str, input_pins)
35
        self._OutputPins = map(str, output_pins)
36
37
        section_name = conf_pre+'keyboard'+conf_post
38
        self.__reset_input = doorpi.DoorPi().config.get_bool(section_name, 'reset_input', True)
39
        self.__base_path_input = doorpi.DoorPi().config.get_string_parsed(section_name, 'base_path_input')
40
        self.__base_path_output = doorpi.DoorPi().config.get_string_parsed(section_name, 'base_path_output')
41
42
        if self.__base_path_input == '': raise MissingMandatoryParameter('base_path_input in %s '%section_name)
43
        if self.__base_path_output == '': raise MissingMandatoryParameter('base_path_output in %s '%section_name)
44
45
        if not os.path.exists(os.path.dirname(self.__base_path_input)):
46
            logger.info('Path %s does not exist - creating it now', os.path.dirname(self.__base_path_input))
47
            os.makedirs(os.path.dirname(self.__base_path_input))
48
49
        if not os.path.exists(os.path.dirname(self.__base_path_output)):
50
            logger.info('Path %s does not exist - creating it now', os.path.dirname(self.__base_path_output))
51
            os.makedirs(os.path.dirname(self.__base_path_output))
52
53
        for input_pin in self._InputPins:
54
            self.__set_input(os.path.join(self.__base_path_input, input_pin))
55
            self._register_EVENTS_for_pin(input_pin, __name__)
56
57
        self.__observer = Observer()
58
        self.__observer.schedule(self, self.__base_path_input)
59
        self.__observer.start()
60
61
        # use set_output to register status @ dict self.__OutputStatus
62
        for output_pin in self._OutputPins:
63
            self.set_output(output_pin, 0, False)
64
65
        self.register_destroy_action()
66
67
    def destroy(self):
68
        if self.is_destroyed: return
69
        logger.debug("destroy")
70
71
        self.__observer.stop()
72
        self.__observer.join()
73
74
        for input_pin in self._InputPins:
75
            os.remove(os.path.join(self.__base_path_input, input_pin))
76
        for output_pin in self._OutputPins:
77
            os.remove(os.path.join(self.__base_path_output, output_pin))
78
79
        doorpi.DoorPi().event_handler.unregister_source(__name__, True)
80
        self.__destroyed = True
81
82
    def status_input(self, pin):
83
        f = open(os.path.join(self.__base_path_input, pin), 'r')
84
        plain_value = f.readline().rstrip()
85
        f.close()
86
        if self._polarity is 0:
87
            return str(plain_value).lower() in HIGH_LEVEL
88
        else:
89
            return str(plain_value).lower() in LOW_LEVEL
90
91
    def __write_file(self, file, value = False):
92
        f = open(file, 'w')
93
        value = str(value).lower() in HIGH_LEVEL
94
        if self._polarity is 1: value = not value
95
        f.write(str(value)+'\r\n')
96
        f.close()
97
        return value
98
99
    def __set_input(self, file, value = False):
100
        self.__write_file(file, value)
101
        os.chmod(file, 0o666)
102
103 View Code Duplication
    def set_output(self, pin, value, log_output = True):
0 ignored issues
show
This code seems to be duplicated in your project.
Loading history...
104
        parsed_pin = doorpi.DoorPi().parse_string("!"+str(pin)+"!")
105
        if parsed_pin != "!"+str(pin)+"!":
106
            pin = parsed_pin
107
108
        value = str(value).lower() in HIGH_LEVEL
109
        log_output = str(log_output).lower() in HIGH_LEVEL
110
111
        if pin not in self._OutputPins: return False
112
        written_value = self.__write_file(os.path.join(self.__base_path_output, pin), value)
113
        if log_output: logger.debug("out(pin = %s, value = %s, log_output = %s)", pin, written_value, log_output)
114
115
        self._OutputStatus[pin] = value
116
        return True
117
118
    def on_modified(self, event):
119
        if 'FileModifiedEvent' not in str(event): return
120
        if self.__reset_file:
121
            if self.__reset_file == event.src_path:
122
                self.__reset_file = None
123
                logging.debug('reset inputfile will not fire event (%s)', event.src_path)
124
                return
125
            self.__reset_file = event.src_path
126
            self.__set_input(event.src_path, 'false')
127
128
        input_pin = path_leaf(event.src_path)
129
        if input_pin not in self._InputPins: return
130
131
        if self.status_input(input_pin):
132
            self.__reset_file = event.src_path
133
            self._fire_OnKeyPressed(input_pin, __name__)
134
            self._fire_OnKeyDown(input_pin, __name__)
135
        else:
136
            self._fire_OnKeyUp(input_pin, __name__)
137