Passed
Push — web-timed ( cfeda2 )
by Matt
02:14
created

PyDMXControl.utils.timing.TimedEvent.time()   A

Complexity

Conditions 1

Size

Total Lines 3
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 3
dl 0
loc 3
rs 10
c 0
b 0
f 0
cc 1
nop 1
1
"""
2
 *  PyDMXControl: A Python 3 module to control DMX via Python. Featuring fixture profiles and working with uDMX.
3
 *  <https://github.com/MattIPv4/PyDMXControl/>
4
 *  Copyright (C) 2018 Matt Cowley (MattIPv4) ([email protected])
5
"""
6
7
from threading import Thread
8
from time import sleep, time
9
from typing import Callable, Dict
10
from collections import OrderedDict
11
12
from .exceptions import EventAlreadyExistsException
13
14
# DMXMINWAIT = 0.000001 * 92
15
DMXMINWAIT = 0.01  # Provides far smoother animation
16
17
18
class Ticker:
19
20
    @staticmethod
21
    def millis_now() -> float:
22
        return time() * 1000.0
23
24
    def __init__(self):
25
        self.__interval = 1000.0
26
        self.__last = None
27
        self.__callbacks = []
28
        self.__paused = False
29
        self.__ticking = False
30
        self.thread = None
31
32
    def __ticker(self):
33
        # New
34
        if self.__last is None:
35
            self.__last = self.millis_now()
36
37
        # If diff in milliseconds is interval
38
        if self.millis_now() - self.__last >= self.__interval:
39
            # If have any callbacks
40
            if self.__callbacks:
41
                # Loop over each callback
42
                for callback in self.__callbacks:
43
                    # Check is valid callback
44
                    if callback and callable(callback):
45
                        callback()
46
            # Finished, update last tick time
47
            self.__last = self.millis_now()
48
49
    def __ticker__loop(self):
50
        # Reset
51
        self.__last = None
52
        self.__paused = False
53
        # Use a variable so loop can be stopped
54
        self.__ticking = True
55
        while self.__ticking:
56
            # Allow for pausing
57
            if not self.__paused:
58
                # Call ticker
59
                self.__ticker()
60
            # Sleep DMX delay time
61
            sleep(DMXMINWAIT)
62
63
    def set_interval(self, milliseconds: float):
64
        self.__interval = milliseconds
65
66
    def get_interval(self) -> float:
67
        return self.__interval
68
69
    def set_callback(self, callback: Callable):
70
        self.__callbacks = [callback]
71
72
    def add_callback(self, callback: Callable):
73
        self.__callbacks.append(callback)
74
75
    def remove_callback(self, callback: Callable):
76
        if callback in self.__callbacks:
77
            self.__callbacks.remove(callback)
78
79
    def clear_callbacks(self):
80
        self.__callbacks = []
81
82
    def stop(self):
83
        # Stop the threaded loop
84
        self.__ticking = False
85
86
    @property
87
    def paused(self) -> bool:
88
        return self.__paused
89
90
    def pause(self) -> bool:
91
        # Toggle pause state
92
        self.__paused = not self.__paused
93
        return self.paused
94
95
    def start(self):
96
        if not self.__ticking:
97
            # Create the thread and run loop
98
            self.thread = Thread(target=self.__ticker__loop)
99
            self.thread.daemon = True
100
            self.thread.start()
101
102
103
class TimedEvent:
104
105
    def __init__(self, run_time: int, callback: callable, args: tuple = (), name: str = ""):
106
        self.__time = run_time
107
        self.__cb = callback
108
        self.__args = args
109
        self.__name = name
110
        self.__fired = None
111
112
    @property
113
    def time(self) -> str:
114
        return "{}ms".format("{:.4f}".format(self.__time).rstrip("0").rstrip("."))
115
116
    @property
117
    def name(self) -> str:
118
        return "{}".format(self.__name)
119
120
    @property
121
    def func(self) -> str:
122
        return "<func {}>".format(self.__cb.__name__)
123
124
    @property
125
    def args(self) -> str:
126
        return "[{}]".format(", ".join(["{}".format(f) for f in self.__args]))
127
128
    @property
129
    def fired(self) -> str:
130
        if self.__fired is None:
131
            return ""
132
        return "{:.4f}ms ({:.4f}ms late)".format(self.__fired, self.__fired - self.__time)
133
134
    @property
135
    def data(self) -> Dict[str, str]:
136
        return {
137
            "time": self.time,
138
            "name": self.name,
139
            "func": self.func,
140
            "args": self.args,
141
            "fired": self.fired
142
        }
143
144
    def __str__(self) -> str:
145
        return "Event {} (\"{}\") {}".format(self.__time, self.name, self.func)
146
147
    def run(self, start_time) -> str:
148
        self.__cb(*self.__args)
149
        self.__fired = (time() * 1000.0) - start_time
150
        return "{} fired at {}".format(str(self), self.fired)
151
152
153
class TimedEvents:
154
155
    def __init__(self, debug_messages: bool = False):
156
        self.__events = {}
157
        self.__running = False
158
        self.__messages = debug_messages
159
        self.__run_cbs = []
160
161
    def __run(self, start_millis):
162
        # Don't allow to run more that once simultaneously
163
        if self.__running:
164
            return
165
166
        # Set starting params
167
        start = (time() * 1000.0) - start_millis
168
        events_left = OrderedDict(sorted(self.__events.items()))
169
        self.__running = True
170
171
        # Skip events in the past
172
        for timestamp, event in events_left.copy().items():
173
            if timestamp < start_millis:
174
                del events_left[timestamp]
175
176
        # Keep looping until last event timestamp
177
        end = start + max(self.__events.keys()) + 1000
178
        while end > time() * 1000.0 and self.__running:
179
            # Find all events to run
180
            for timestamp, event in events_left.copy().items():
181
                # Look into the past so we don't ever miss any
182
                if timestamp <= (time() * 1000.0) - start:
183
                    msg = event.run(start)  # Run
184
                    if self.__messages:  # Debug if needed
185
                        print(msg)
186
                    del events_left[timestamp]  # Remove - we're done with it
187
                else:
188
                    # We're into the future
189
                    break
190
            sleep(0.000001)
191
192
        # Let debug know we're done
193
        if self.__messages:
194
            print("Timed events playback completed")
195
196
    def run(self, start_millis: int = 0):
197
        # Create the thread and run loop
198
        thread = Thread(target=self.__run, args=[start_millis])
199
        thread.daemon = True
200
        thread.start()
201
202
        for cb in self.__run_cbs:
203
            thread = Thread(target=cb)
204
            thread.daemon = True
205
            thread.start()
206
207
    def stop(self):
208
        self.__running = False
209
210
    def toggle_debug_messages(self) -> bool:
211
        self.__messages = not self.__messages
212
        return self.__messages
213
214
    def add_event(self, milliseconds_in: int, callback: callable, *args, name: str = ""):
215
        milliseconds_in = int(milliseconds_in)
216
        if milliseconds_in in self.__events:
217
            raise EventAlreadyExistsException(milliseconds_in)
218
        self.__events[milliseconds_in] = TimedEvent(milliseconds_in, callback, args, name)
219
220
    def remove_event(self, milliseconds_in: int):
221
        milliseconds_in = int(milliseconds_in)
222
        if milliseconds_in in self.__events:
223
            del self.__events[milliseconds_in]
224
225
    def add_run_callback(self, callback: callable):
226
        self.__run_cbs.append(callback)
227
228
    def clear_run_callbacks(self):
229
        self.__run_cbs = []
230
231
    @property
232
    def data(self) -> Dict[int, Dict[str, str]]:
233
        return {k: v.data for k, v in self.__events.items()}
234
235
    def sleep_till_done(self):
236
        # Hold until all events completed
237
        while self.__running:
238
            sleep(DMXMINWAIT)
239