Passed
Push — web-timed ( 1d9e3e...80b983 )
by Matt
01:14
created

TimedEvents.add_event()   A

Complexity

Conditions 2

Size

Total Lines 5
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 5
dl 0
loc 5
rs 10
c 0
b 0
f 0
cc 2
nop 5
1
"""
2
 *  PyDMXControl: A Python 3 module to control DMX via Python. Featuring fixture profiles and working with uDMX.
3
 *  <https://github.com/MattIPv4/PyDMXControl/>
4
 *  Copyright (C) 2018 Matt Cowley (MattIPv4) ([email protected])
5
"""
6
7
from collections import OrderedDict
8
from threading import Thread
9
from time import sleep, time
10
from typing import Dict, Any
11
12
from ._TimedEvent import TimedEvent
13
from ..exceptions import EventAlreadyExistsException
14
from ... import DMXMINWAIT
15
16
17
class TimedEvents:
18
19
    def __init__(self, debug_messages: bool = False):
20
        self.__events = {}
21
        self.__started = None
22
        self.__messages = debug_messages
23
        self.__run_cbs = []
24
        self.__stop_cbs = []
25
26
    def __run(self, start_millis):
27
        # Don't allow to run more that once simultaneously
28
        if self.__started is not None:
29
            return
30
31
        # Set starting params
32
        self.__started = (time() * 1000.0) - start_millis
33
        events_left = OrderedDict(sorted(self.__events.items()))
34
35
        # Skip events in the past
36
        for timestamp, event in events_left.copy().items():
37
            if timestamp < start_millis:
38
                del events_left[timestamp]
39
40
        # Keep looping until last event timestamp
41
        end = self.__started + max(self.__events.keys()) + 1000
42
        while end > time() * 1000.0 and self.__started is not None:
43
            # Find all events to run
44
            for timestamp, event in events_left.copy().items():
45
                # Look into the past so we don't ever miss any
46
                if timestamp <= self.progress:
47
                    msg = event.run(self.__started)  # Run
48
                    if self.__messages:  # Debug if needed
49
                        print(msg)
50
                    del events_left[timestamp]  # Remove - we're done with it
51
                else:
52
                    # We're into the future
53
                    break
54
            sleep(0.000001)
55
56
        # Let everyone know we're done
57
        self.__started = None
58
        if self.__messages:
59
            print("Timed events playback completed")
60
61
    def run(self, start_millis: int = 0):
62
        # Create the thread and run loop
63
        thread = Thread(target=self.__run, args=[start_millis])
64
        thread.daemon = True
65
        thread.start()
66
67
        for cb in self.__run_cbs:
68
            thread = Thread(target=cb)
69
            thread.daemon = True
70
            thread.start()
71
72
    def stop(self):
73
        self.__started = None
74
75
        for cb in self.__stop_cbs:
76
            thread = Thread(target=cb)
77
            thread.daemon = True
78
            thread.start()
79
80
        for event in self.__events.values():
81
            event.reset_fired()
82
83
    def toggle_debug_messages(self) -> bool:
84
        self.__messages = not self.__messages
85
        return self.__messages
86
87
    def add_event(self, milliseconds_in: int, callback: callable, *args, name: str = ""):
88
        milliseconds_in = int(milliseconds_in)
89
        if milliseconds_in in self.__events:
90
            raise EventAlreadyExistsException(milliseconds_in)
91
        self.__events[milliseconds_in] = TimedEvent(milliseconds_in, callback, args, name)
92
93
    def remove_event(self, milliseconds_in: int):
94
        milliseconds_in = int(milliseconds_in)
95
        if milliseconds_in in self.__events:
96
            del self.__events[milliseconds_in]
97
98
    def add_run_callback(self, callback: callable):
99
        self.__run_cbs.append(callback)
100
101
    def clear_run_callbacks(self):
102
        self.__run_cbs = []
103
104
    def add_stop_callback(self, callback: callable):
105
        self.__stop_cbs.append(callback)
106
107
    def clear_stop_callbacks(self):
108
        self.__stop_cbs = []
109
110
    @property
111
    def progress(self) -> float:
112
        if self.__started is None:
113
            return 0
114
        return (time() * 1000.0) - self.__started
115
116
    @property
117
    def data(self) -> Dict[str, Any]:
118
        return {
119
            "events": {k: v.data for k, v in OrderedDict(sorted(self.__events.items())).items()},
120
            "progress": "{}ms".format("{:,.4f}".format(self.progress).rstrip("0").rstrip(".")),
121
            "progress_raw": self.progress
122
        }
123
124
    def sleep_till_done(self):
125
        # Hold until all events completed
126
        while self.__running:
127
            sleep(DMXMINWAIT)
128