Passed
Push — web-timed ( cfeda2...1d9e3e )
by Matt
01:11
created

PyDMXControl.utils.timing._TimedEvents   A

Complexity

Total Complexity 24

Size/Duplication

Total Lines 103
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 24
eloc 68
dl 0
loc 103
rs 10
c 0
b 0
f 0

11 Methods

Rating   Name   Duplication   Size   Complexity  
A TimedEvents.toggle_debug_messages() 0 3 1
A TimedEvents.__init__() 0 5 1
A TimedEvents.clear_run_callbacks() 0 2 1
A TimedEvents.data() 0 3 1
A TimedEvents.add_event() 0 5 2
A TimedEvents.sleep_till_done() 0 4 2
A TimedEvents.add_run_callback() 0 2 1
A TimedEvents.stop() 0 2 1
C TimedEvents.__run() 0 34 10
A TimedEvents.remove_event() 0 4 2
A TimedEvents.run() 0 10 2
1
"""
2
 *  PyDMXControl: A Python 3 module to control DMX via Python. Featuring fixture profiles and working with uDMX.
3
 *  <https://github.com/MattIPv4/PyDMXControl/>
4
 *  Copyright (C) 2018 Matt Cowley (MattIPv4) ([email protected])
5
"""
6
7
from collections import OrderedDict
8
from threading import Thread
9
from time import sleep, time
10
from typing import Dict
11
12
from ... import DMXMINWAIT
13
from ._TimedEvent import TimedEvent
14
from ..exceptions import EventAlreadyExistsException
15
16
17
class TimedEvents:
18
19
    def __init__(self, debug_messages: bool = False):
20
        self.__events = {}
21
        self.__running = False
22
        self.__messages = debug_messages
23
        self.__run_cbs = []
24
25
    def __run(self, start_millis):
26
        # Don't allow to run more that once simultaneously
27
        if self.__running:
28
            return
29
30
        # Set starting params
31
        start = (time() * 1000.0) - start_millis
32
        events_left = OrderedDict(sorted(self.__events.items()))
33
        self.__running = True
34
35
        # Skip events in the past
36
        for timestamp, event in events_left.copy().items():
37
            if timestamp < start_millis:
38
                del events_left[timestamp]
39
40
        # Keep looping until last event timestamp
41
        end = start + max(self.__events.keys()) + 1000
42
        while end > time() * 1000.0 and self.__running:
43
            # Find all events to run
44
            for timestamp, event in events_left.copy().items():
45
                # Look into the past so we don't ever miss any
46
                if timestamp <= (time() * 1000.0) - start:
47
                    msg = event.run(start)  # Run
48
                    if self.__messages:  # Debug if needed
49
                        print(msg)
50
                    del events_left[timestamp]  # Remove - we're done with it
51
                else:
52
                    # We're into the future
53
                    break
54
            sleep(0.000001)
55
56
        # Let debug know we're done
57
        if self.__messages:
58
            print("Timed events playback completed")
59
60
    def run(self, start_millis: int = 0):
61
        # Create the thread and run loop
62
        thread = Thread(target=self.__run, args=[start_millis])
63
        thread.daemon = True
64
        thread.start()
65
66
        for cb in self.__run_cbs:
67
            thread = Thread(target=cb)
68
            thread.daemon = True
69
            thread.start()
70
71
    def stop(self):
72
        self.__running = False
73
74
    def toggle_debug_messages(self) -> bool:
75
        self.__messages = not self.__messages
76
        return self.__messages
77
78
    def add_event(self, milliseconds_in: int, callback: callable, *args, name: str = ""):
79
        milliseconds_in = int(milliseconds_in)
80
        if milliseconds_in in self.__events:
81
            raise EventAlreadyExistsException(milliseconds_in)
82
        self.__events[milliseconds_in] = TimedEvent(milliseconds_in, callback, args, name)
83
84
    def remove_event(self, milliseconds_in: int):
85
        milliseconds_in = int(milliseconds_in)
86
        if milliseconds_in in self.__events:
87
            del self.__events[milliseconds_in]
88
89
    def add_run_callback(self, callback: callable):
90
        self.__run_cbs.append(callback)
91
92
    def clear_run_callbacks(self):
93
        self.__run_cbs = []
94
95
    @property
96
    def data(self) -> Dict[int, Dict[str, str]]:
97
        return {k: v.data for k, v in self.__events.items()}
98
99
    def sleep_till_done(self):
100
        # Hold until all events completed
101
        while self.__running:
102
            sleep(DMXMINWAIT)
103