1 | #!/usr/bin/env python |
||
2 | # -*- coding: utf-8 -*- |
||
3 | |||
4 | import logging |
||
5 | logger = logging.getLogger(__name__) |
||
6 | logger.debug("%s loaded", __name__) |
||
7 | |||
8 | import os |
||
9 | import ntpath |
||
10 | from time import sleep |
||
0 ignored issues
–
show
Unused Code
introduced
by
Loading history...
|
|||
11 | |||
12 | from watchdog.observers import Observer |
||
13 | from watchdog.events import FileSystemEventHandler |
||
14 | |||
15 | from doorpi.keyboard.AbstractBaseClass import KeyboardAbstractBaseClass, HIGH_LEVEL, LOW_LEVEL |
||
16 | import doorpi |
||
17 | |||
18 | def path_leaf(path): |
||
19 | head, tail = ntpath.split(path) |
||
20 | return tail or ntpath.basename(head) |
||
21 | |||
22 | class MissingMandatoryParameter(Exception): pass |
||
23 | |||
24 | def get(**kwargs): return FileSystem(**kwargs) |
||
25 | class FileSystem(KeyboardAbstractBaseClass, FileSystemEventHandler): |
||
26 | |||
27 | __reset_file = None |
||
28 | |||
29 | def __init__(self, input_pins, output_pins, conf_pre, conf_post, keyboard_name, polarity = 0, *args, **kwargs): |
||
0 ignored issues
–
show
|
|||
30 | logger.debug("FileSystem.__init__(input_pins = %s, output_pins = %s, polarity = %s)", |
||
31 | input_pins, output_pins, polarity) |
||
32 | self.keyboard_name = keyboard_name |
||
33 | self._polarity = polarity |
||
34 | self._InputPins = map(str, input_pins) |
||
35 | self._OutputPins = map(str, output_pins) |
||
36 | |||
37 | section_name = conf_pre+'keyboard'+conf_post |
||
38 | self.__reset_input = doorpi.DoorPi().config.get_bool(section_name, 'reset_input', True) |
||
39 | self.__base_path_input = doorpi.DoorPi().config.get_string_parsed(section_name, 'base_path_input') |
||
40 | self.__base_path_output = doorpi.DoorPi().config.get_string_parsed(section_name, 'base_path_output') |
||
41 | |||
42 | if self.__base_path_input == '': raise MissingMandatoryParameter('base_path_input in %s '%section_name) |
||
43 | if self.__base_path_output == '': raise MissingMandatoryParameter('base_path_output in %s '%section_name) |
||
44 | |||
45 | if not os.path.exists(os.path.dirname(self.__base_path_input)): |
||
46 | logger.info('Path %s does not exist - creating it now', os.path.dirname(self.__base_path_input)) |
||
47 | os.makedirs(os.path.dirname(self.__base_path_input)) |
||
48 | |||
49 | if not os.path.exists(os.path.dirname(self.__base_path_output)): |
||
50 | logger.info('Path %s does not exist - creating it now', os.path.dirname(self.__base_path_output)) |
||
51 | os.makedirs(os.path.dirname(self.__base_path_output)) |
||
52 | |||
53 | for input_pin in self._InputPins: |
||
54 | self.__set_input(os.path.join(self.__base_path_input, input_pin)) |
||
55 | self._register_EVENTS_for_pin(input_pin, __name__) |
||
56 | |||
57 | self.__observer = Observer() |
||
58 | self.__observer.schedule(self, self.__base_path_input) |
||
59 | self.__observer.start() |
||
60 | |||
61 | # use set_output to register status @ dict self.__OutputStatus |
||
62 | for output_pin in self._OutputPins: |
||
63 | self.set_output(output_pin, 0, False) |
||
64 | |||
65 | self.register_destroy_action() |
||
66 | |||
67 | def destroy(self): |
||
68 | if self.is_destroyed: return |
||
69 | logger.debug("destroy") |
||
70 | |||
71 | self.__observer.stop() |
||
72 | self.__observer.join() |
||
73 | |||
74 | for input_pin in self._InputPins: |
||
75 | os.remove(os.path.join(self.__base_path_input, input_pin)) |
||
76 | for output_pin in self._OutputPins: |
||
77 | os.remove(os.path.join(self.__base_path_output, output_pin)) |
||
78 | |||
79 | doorpi.DoorPi().event_handler.unregister_source(__name__, True) |
||
80 | self.__destroyed = True |
||
81 | |||
82 | def status_input(self, pin): |
||
83 | f = open(os.path.join(self.__base_path_input, pin), 'r') |
||
84 | plain_value = f.readline().rstrip() |
||
85 | f.close() |
||
86 | if self._polarity is 0: |
||
87 | return str(plain_value).lower() in HIGH_LEVEL |
||
88 | else: |
||
89 | return str(plain_value).lower() in LOW_LEVEL |
||
90 | |||
91 | def __write_file(self, file, value = False): |
||
92 | f = open(file, 'w') |
||
93 | value = str(value).lower() in HIGH_LEVEL |
||
94 | if self._polarity is 1: value = not value |
||
95 | f.write(str(value)+'\r\n') |
||
96 | f.close() |
||
97 | return value |
||
98 | |||
99 | def __set_input(self, file, value = False): |
||
100 | self.__write_file(file, value) |
||
101 | os.chmod(file, 0o666) |
||
102 | |||
103 | View Code Duplication | def set_output(self, pin, value, log_output = True): |
|
104 | parsed_pin = doorpi.DoorPi().parse_string("!"+str(pin)+"!") |
||
105 | if parsed_pin != "!"+str(pin)+"!": |
||
106 | pin = parsed_pin |
||
107 | |||
108 | value = str(value).lower() in HIGH_LEVEL |
||
109 | log_output = str(log_output).lower() in HIGH_LEVEL |
||
110 | |||
111 | if pin not in self._OutputPins: return False |
||
112 | written_value = self.__write_file(os.path.join(self.__base_path_output, pin), value) |
||
113 | if log_output: logger.debug("out(pin = %s, value = %s, log_output = %s)", pin, written_value, log_output) |
||
114 | |||
115 | self._OutputStatus[pin] = value |
||
116 | return True |
||
117 | |||
118 | def on_modified(self, event): |
||
119 | if 'FileModifiedEvent' not in str(event): return |
||
120 | if self.__reset_file: |
||
121 | if self.__reset_file == event.src_path: |
||
122 | self.__reset_file = None |
||
123 | logging.debug('reset inputfile will not fire event (%s)', event.src_path) |
||
124 | return |
||
125 | self.__reset_file = event.src_path |
||
126 | self.__set_input(event.src_path, 'false') |
||
127 | |||
128 | input_pin = path_leaf(event.src_path) |
||
129 | if input_pin not in self._InputPins: return |
||
130 | |||
131 | if self.status_input(input_pin): |
||
132 | self.__reset_file = event.src_path |
||
133 | self._fire_OnKeyPressed(input_pin, __name__) |
||
134 | self._fire_OnKeyDown(input_pin, __name__) |
||
135 | else: |
||
136 | self._fire_OnKeyUp(input_pin, __name__) |
||
137 |