path_leaf()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
dl 0
loc 3
rs 10
c 0
b 0
f 0
1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3
4
import logging
5
logger = logging.getLogger(__name__)
6
logger.debug("%s loaded", __name__)
7
8
import os
9
import ntpath
10
from time import sleep
0 ignored issues
show
Unused Code introduced by
Unused sleep imported from time
Loading history...
11
12
from watchdog.observers import Observer
13
from watchdog.events import FileSystemEventHandler
14
15
from doorpi.keyboard.AbstractBaseClass import KeyboardAbstractBaseClass, HIGH_LEVEL, LOW_LEVEL
16
import doorpi
17
18
def path_leaf(path):
19
    head, tail = ntpath.split(path)
20
    return tail or ntpath.basename(head)
21
22
class MissingMandatoryParameter(Exception): pass
23
24
def get(**kwargs): return FileSystem(**kwargs)
25
class FileSystem(KeyboardAbstractBaseClass, FileSystemEventHandler):
26
27
    __reset_file = None
28
    
29
    def __init__(self, input_pins, output_pins, conf_pre, conf_post, keyboard_name, polarity = 0, *args, **kwargs):
0 ignored issues
show
Unused Code introduced by
The argument kwargs seems to be unused.
Loading history...
Unused Code introduced by
The argument args seems to be unused.
Loading history...
30
        logger.debug("FileSystem.__init__(input_pins = %s, output_pins = %s, polarity = %s)",
31
                     input_pins, output_pins, polarity)
32
        self.keyboard_name = keyboard_name
33
        self._polarity = polarity
34
        self._InputPins = map(str, input_pins)
35
        self._OutputPins = map(str, output_pins)
36
37
        section_name = conf_pre+'keyboard'+conf_post
38
        self.__reset_input = doorpi.DoorPi().config.get_bool(section_name, 'reset_input', True)
39
        self.__base_path_input = doorpi.DoorPi().config.get_string_parsed(section_name, 'base_path_input')
40
        self.__base_path_output = doorpi.DoorPi().config.get_string_parsed(section_name, 'base_path_output')
41
42
        if self.__base_path_input == '': raise MissingMandatoryParameter('base_path_input in %s '%section_name)
43
        if self.__base_path_output == '': raise MissingMandatoryParameter('base_path_output in %s '%section_name)
44
45
        if not os.path.exists(os.path.dirname(self.__base_path_input)):
46
            logger.info('Path %s does not exist - creating it now', os.path.dirname(self.__base_path_input))
47
            os.makedirs(os.path.dirname(self.__base_path_input))
48
49
        if not os.path.exists(os.path.dirname(self.__base_path_output)):
50
            logger.info('Path %s does not exist - creating it now', os.path.dirname(self.__base_path_output))
51
            os.makedirs(os.path.dirname(self.__base_path_output))
52
53
        for input_pin in self._InputPins:
54
            self.__set_input(os.path.join(self.__base_path_input, input_pin))
55
            self._register_EVENTS_for_pin(input_pin, __name__)
56
57
        self.__observer = Observer()
58
        self.__observer.schedule(self, self.__base_path_input)
59
        self.__observer.start()
60
61
        # use set_output to register status @ dict self.__OutputStatus
62
        for output_pin in self._OutputPins:
63
            self.set_output(output_pin, 0, False)
64
65
        self.register_destroy_action()
66
67
    def destroy(self):
68
        if self.is_destroyed: return
69
        logger.debug("destroy")
70
71
        self.__observer.stop()
72
        self.__observer.join()
73
74
        for input_pin in self._InputPins:
75
            os.remove(os.path.join(self.__base_path_input, input_pin))
76
        for output_pin in self._OutputPins:
77
            os.remove(os.path.join(self.__base_path_output, output_pin))
78
79
        doorpi.DoorPi().event_handler.unregister_source(__name__, True)
80
        self.__destroyed = True
81
82
    def status_input(self, pin):
83
        f = open(os.path.join(self.__base_path_input, pin), 'r')
84
        plain_value = f.readline().rstrip()
85
        f.close()
86
        if self._polarity is 0:
87
            return str(plain_value).lower() in HIGH_LEVEL
88
        else:
89
            return str(plain_value).lower() in LOW_LEVEL
90
91
    def __write_file(self, file, value = False):
92
        f = open(file, 'w')
93
        value = str(value).lower() in HIGH_LEVEL
94
        if self._polarity is 1: value = not value
95
        f.write(str(value)+'\r\n')
96
        f.close()
97
        return value
98
99
    def __set_input(self, file, value = False):
100
        self.__write_file(file, value)
101
        os.chmod(file, 0o666)
102
103 View Code Duplication
    def set_output(self, pin, value, log_output = True):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
104
        parsed_pin = doorpi.DoorPi().parse_string("!"+str(pin)+"!")
105
        if parsed_pin != "!"+str(pin)+"!":
106
            pin = parsed_pin
107
108
        value = str(value).lower() in HIGH_LEVEL
109
        log_output = str(log_output).lower() in HIGH_LEVEL
110
111
        if pin not in self._OutputPins: return False
112
        written_value = self.__write_file(os.path.join(self.__base_path_output, pin), value)
113
        if log_output: logger.debug("out(pin = %s, value = %s, log_output = %s)", pin, written_value, log_output)
114
115
        self._OutputStatus[pin] = value
116
        return True
117
118
    def on_modified(self, event):
119
        if 'FileModifiedEvent' not in str(event): return
120
        if self.__reset_file:
121
            if self.__reset_file == event.src_path:
122
                self.__reset_file = None
123
                logging.debug('reset inputfile will not fire event (%s)', event.src_path)
124
                return
125
            self.__reset_file = event.src_path
126
            self.__set_input(event.src_path, 'false')
127
128
        input_pin = path_leaf(event.src_path)
129
        if input_pin not in self._InputPins: return
130
131
        if self.status_input(input_pin):
132
            self.__reset_file = event.src_path
133
            self._fire_OnKeyPressed(input_pin, __name__)
134
            self._fire_OnKeyDown(input_pin, __name__)
135
        else:
136
            self._fire_OnKeyUp(input_pin, __name__)
137