DoorPi.parse_string()   F
last analyzed

Complexity

Conditions 16

Size

Total Lines 55

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 16
dl 0
loc 55
rs 3.4798
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like DoorPi.parse_string() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3
4
import logging
5
logger = logging.getLogger(__name__)
6
logger.debug("%s loaded", __name__)
7
8
import sys
0 ignored issues
show
Unused Code introduced by Thomas Meissner
The import sys seems to be unused.
Loading history...
9
import argparse
0 ignored issues
show
Unused Code introduced by Thomas Meissner
The import argparse seems to be unused.
Loading history...
10
11
import time  # used by: DoorPi.run
12
import os  # used by: DoorPi.load_config
13
14
import datetime  # used by: parse_string
15
import cgi  # used by: parse_string
16
import tempfile
0 ignored issues
show
Unused Code introduced by Thomas
The import tempfile seems to be unused.
Loading history...
17
18
import metadata
19
from keyboard.KeyboardInterface import load_keyboard
20
from sipphone.SipphoneInterface import load_sipphone
21
from status.webserver import load_webserver
22
from conf.config_object import ConfigObject
23
from action.handler import EventHandler
24
from status.status_class import DoorPiStatus
25
#from status.webservice import run_webservice, WebService
26
from action.base import SingleAction
27
28
29
class DoorPiShutdownAction(SingleAction): pass
30
class DoorPiNotExistsException(Exception): pass
31
class DoorPiEventHandlerNotExistsException(Exception): pass
32
class DoorPiRestartException(Exception): pass
33
34
class Singleton(type):
35
    _instances = {}
36
    def __call__(cls, *args, **kwargs):
37
        if cls not in cls._instances:
38
            cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
39
        return cls._instances[cls]
40
41
class DoorPi(object):
42
    __metaclass__ = Singleton
43
44
    __prepared = False
45
46
    __config = None
47
    @property
48
    def config(self): return self.__config
49
50
    __keyboard = None
51
    @property
52
    def keyboard(self): return self.__keyboard
53
    #def get_keyboard(self): return self.__keyboard
54
55
    __sipphone = None
56
    @property
57
    def sipphone(self): return self.__sipphone
58
59
    @property
60
    def additional_informations(self):
61
        if self.event_handler is None: return {}
62
        else: return self.event_handler.additional_informations
63
64
    __event_handler = None
65
    @property
66
    def event_handler(self): return self.__event_handler
67
68
    __webserver = None
69
    @property
70
    def webserver(self): return self.__webserver
71
72
    @property
73
    def status(self): return DoorPiStatus(self)
74
    def get_status(self, modules = '', value= '', name = ''): return DoorPiStatus(self, modules, value, name)
75
76
    @property
77
    def epilog(self): return metadata.epilog
78
79
    @property
80
    def name(self): return str(metadata.package)
81
    @property
82
    def name_and_version(self): return str(metadata.package) + " - version: " + metadata.version
83
84
    __shutdown = False
85
    @property
86
    def shutdown(self): return self.__shutdown
87
88
    _base_path = metadata.doorpi_path
89
    @property
90
    def base_path(self):
91
        if self._base_path is None:
92
            try:
93
                self._base_path = os.path.join(os.path.expanduser('~'), metadata.package)
94
                assert os.access(self._base_path, os.W_OK), 'use fallback for base_path (see tmp path)'
95
            except Exception as exp:
96
                logger.error(exp)
97
                import tempfile
0 ignored issues
show
Comprehensibility Bug introduced by Thomas
tempfile is re-defining a name which is already available in the outer-scope (previously defined on line 16).

It is generally a bad practice to shadow variables from the outer-scope. In most cases, this is done unintentionally and might lead to unexpected behavior:

param = 5

class Foo:
    def __init__(self, param):   # "param" would be flagged here
        self.param = param
Loading history...
98
                self._base_path = tempfile.gettempdir()
99
        return self._base_path
100
101
    def __init__(self, parsed_arguments = None):
102
        self.__parsed_arguments = parsed_arguments
103
        # for start as daemon - if start as app it will not matter to load this vars
104
        self.stdin_path = '/dev/null'
105
        self.stdout_path = '/dev/null'
106
        self.stderr_path = '/dev/null'
107
        self.pidfile_path =  '/var/run/doorpi.pid'
108
        self.pidfile_timeout = 5
109
110
        self.__last_tick = time.time()
111
112
    def doorpi_shutdown(self, time_until_shutdown=10):
113
        time.sleep(time_until_shutdown)
114
        self.__shutdown = True
115
116
    def prepare(self, parsed_arguments):
117
        logger.debug("prepare")
118
        logger.debug("given arguments argv: %s", parsed_arguments)
119
120
        self.__config = ConfigObject.load_config(parsed_arguments.configfile)
121
        self._base_path = self.config.get('DoorPi', 'base_path', self.base_path)
122
        self.__event_handler = EventHandler()
123
124
        if self.config.config_file is None:
125
            self.event_handler.register_action('AfterStartup', self.config.save_config)
126
            self.config.get('EVENT_OnStartup', '10', 'sleep:1')
127
128
        if 'test' in parsed_arguments and parsed_arguments.test is True:
129
            logger.warning('using only test-mode and destroy after 5 seconds')
130
            self.event_handler.register_action('AfterStartup', DoorPiShutdownAction(self.doorpi_shutdown))
131
132
        # register own events
133
        self.event_handler.register_event('BeforeStartup', __name__)
134
        self.event_handler.register_event('OnStartup', __name__)
135
        self.event_handler.register_event('AfterStartup', __name__)
136
        self.event_handler.register_event('BeforeShutdown', __name__)
137
        self.event_handler.register_event('OnShutdown', __name__)
138
        self.event_handler.register_event('AfterShutdown', __name__)
139
        self.event_handler.register_event('OnTimeTick', __name__)
140
        self.event_handler.register_event('OnTimeTickRealtime', __name__)
141
142
        # register base actions
143
        self.event_handler.register_action('OnTimeTick', 'time_tick:!last_tick!')
144
145
        # register modules
146
        self.__webserver    = load_webserver()
147
        self.__keyboard     = load_keyboard()
148
        self.__sipphone     = load_sipphone()
149
        self.sipphone.start()
150
151
        # register eventbased actions from configfile
152
        for event_section in self.config.get_sections('EVENT_'):
153
            logger.info("found EVENT_ section '%s' in configfile", event_section)
154
            event_name = event_section[len('EVENT_'):]
155
            for action in sorted(self.config.get_keys(event_section)):
156
                logger.info("registering action '%s' for event '%s'", action, event_name)
157
                self.event_handler.register_action(event_name, self.config.get(event_section, action))
158
159
        # register actions for inputpins
160
        if 'KeyboardHandler' not in self.keyboard.name:
161
            section_name = 'InputPins'
162
            for input_pin in sorted(self.config.get_keys(section_name)):
163
                self.event_handler.register_action('OnKeyPressed_'+input_pin, self.config.get(section_name, input_pin))
164
        else:
165
            for keyboard_name in self.keyboard.loaded_keyboards:
166
                section_name = keyboard_name+'_InputPins'
167
                for input_pin in self.config.get_keys(section_name, log = False):
168
                    self.event_handler.register_action(
169
                        'OnKeyPressed_'+keyboard_name+'.'+input_pin,
170
                        self.config.get(section_name, input_pin)
171
                    )
172
173
        # register actions for DTMF
174
        section_name = 'DTMF'
175
        for DTMF in sorted(self.config.get_keys(section_name)):
176
            self.event_handler.register_action('OnDTMF_'+DTMF, self.config.get(section_name, DTMF))
177
178
        # register keep_alive_led
179
        is_alive_led = self.config.get('DoorPi', 'is_alive_led', '')
180
        if is_alive_led is not '':
181
            self.event_handler.register_action('OnTimeSecondEvenNumber', 'out:%s,HIGH,False'%is_alive_led)
182
            self.event_handler.register_action('OnTimeSecondUnevenNumber', 'out:%s,LOW,False'%is_alive_led)
183
184
        self.__prepared = True
185
        return self
186
187
    def __del__(self):
188
        return self.destroy()
189
190
    @property
191
    def modules_destroyed(self):
192
        if len(self.event_handler.sources) > 1: return False
193
        return self.event_handler.idle
194
195
    def destroy(self):
196
        logger.debug('destroy doorpi')
197
198
        if not self.event_handler or self.event_handler.threads == None:
199
            DoorPiEventHandlerNotExistsException("don't try to stop, when not prepared")
200
            return False
201
202
        logger.debug("Threads before starting shutdown: %s", self.event_handler.threads)
203
204
        self.event_handler.fire_event('BeforeShutdown', __name__)
205
        self.event_handler.fire_event_synchron('OnShutdown', __name__)
206
        self.event_handler.fire_event('AfterShutdown', __name__)
207
208
        timeout = 5
209
        waiting_between_checks = 0.5
210
        time.sleep(waiting_between_checks)
211
        while timeout > 0 and self.modules_destroyed is not True:
212
            # while not self.event_handler.idle and timeout > 0 and len(self.event_handler.sources) > 1:
213
            logger.debug('wait %s seconds for threads %s and %s event',
214
                         timeout, len(self.event_handler.threads[1:]), len(self.event_handler.sources))
215
            logger.trace('still existing threads:       %s', self.event_handler.threads[1:])
216
            logger.trace('still existing event sources: %s', self.event_handler.sources)
217
            time.sleep(waiting_between_checks)
218
            timeout -= waiting_between_checks
219
220
        if timeout <= 0:
221
            logger.warning("waiting for threads to time out - there are still threads: %s", self.event_handler.threads[1:])
222
223
        logger.info('======== DoorPi successfully shutdown ========')
224
        return True
225
226
    def restart(self):
227
        if self.destroy(): self.run()
228
        else: raise DoorPiRestartException()
229
230
    def run(self):
231
        logger.debug("run")
232
        if not self.__prepared: self.prepare(self.__parsed_arguments)
233
234
        self.event_handler.fire_event('BeforeStartup', __name__)
235
        self.event_handler.fire_event_synchron('OnStartup', __name__)
236
        self.event_handler.fire_event('AfterStartup', __name__)
237
238
        # self.event_handler.register_action('OnTimeMinuteUnevenNumber', 'doorpi_restart')
239
240
        logger.info('DoorPi started successfully')
241
        logger.info('BasePath is %s', self.base_path)
242
        if self.__webserver:
243
            logger.info('Weburl is %s', self.__webserver.own_url)
244
        else:
245
            logger.info('no Webserver loaded')
246
247
        time_ticks = 0
248
249
        while True and not self.__shutdown:
250
            time_ticks += 0.05
251
            self.check_time_critical_threads()
252
            if time_ticks > 0.5:
253
                self.__last_tick = time.time()
254
                self.__event_handler.fire_event_asynchron('OnTimeTick', __name__)
255
                time_ticks = 0
256
            time.sleep(0.05)
257
        return self
258
259
    def check_time_critical_threads(self):
260
        if self.sipphone: self.sipphone.self_check()
261
262
    def parse_string(self, input_string):
263
        parsed_string = datetime.datetime.now().strftime(str(input_string))
264
265
        if self.keyboard is None or self.keyboard.last_key is None:
266
            self.additional_informations['LastKey'] = "NotSetYet"
267
        else:
268
            self.additional_informations['LastKey'] = str(self.keyboard.last_key)
269
270
        infos_as_html = '<table>'
271
        for key in self.additional_informations.keys():
272
            infos_as_html += '<tr><td>'
273
            infos_as_html += '<b>'+key+'</b>'
274
            infos_as_html += '</td><td>'
275
            infos_as_html += '<i>'+cgi.escape(
276
                str(self.additional_informations.get(key)).replace("\r\n", "<br />")
277
            )+'</i>'
278
            infos_as_html += '</td></tr>'
279
        infos_as_html += '</table>'
280
281
        mapping_table = {
282
            'INFOS_PLAIN':      str(self.additional_informations),
283
            'INFOS':            infos_as_html,
284
            'BASEPATH':         self.base_path,
285
            'last_tick':        str(self.__last_tick)
286
        }
287
288
        for key in metadata.__dict__.keys():
289
            if isinstance(metadata.__dict__[key], str):
290
                mapping_table[key.upper()] = metadata.__dict__[key]
291
292
        if self.config:
293
            mapping_table.update({
294
                'LAST_SNAPSHOT':    str(self.config.get_string('DoorPi', 'last_snapshot', log=False))
295
            })
296
        if self.keyboard and 'KeyboardHandler' not in self.keyboard.name:
297
            for output_pin in self.config.get_keys('OutputPins', log = False):
298
                mapping_table[self.config.get('OutputPins', output_pin, log = False)] = output_pin
299
        elif self.keyboard and 'KeyboardHandler' in self.keyboard.name:
300
            for outputpin_section in self.config.get_sections('_OutputPins', False):
301
                for output_pin in self.config.get_keys(outputpin_section, log = False):
302
                    mapping_table[self.config.get(outputpin_section, output_pin, log = False)] = output_pin
303
304
        for key in mapping_table.keys():
305
            parsed_string = parsed_string.replace(
306
                "!"+key+"!",
307
                mapping_table[key]
308
            )
309
310
        for key in self.additional_informations.keys():
311
            parsed_string = parsed_string.replace(
312
                "!"+key+"!",
313
                str(self.additional_informations[key])
314
            )
315
316
        return parsed_string
317
318
if __name__ == '__main__':
319
    raise Exception('use main.py to start DoorPi')