TimedEvents.__init__()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 6
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 6
dl 0
loc 6
rs 10
c 0
b 0
f 0
cc 1
nop 2
1
"""
2
 *  PyDMXControl: A Python 3 module to control DMX using OpenDMX or uDMX.
3
 *                Featuring fixture profiles, built-in effects and a web control panel.
4
 *  <https://github.com/MattIPv4/PyDMXControl/>
5
 *  Copyright (C) 2022 Matt Cowley (MattIPv4) ([email protected])
6
"""
7
8
from collections import OrderedDict
9
from threading import Thread
10
from time import sleep, time
11
from typing import Dict, Any
12
13
from ._TimedEvent import TimedEvent
14
from ..exceptions import EventAlreadyExistsException
15
from ... import DEFAULT_INTERVAL
16
17
18
class TimedEvents:
19
20
    def __init__(self, debug_messages: bool = False):
21
        self.__events = {}
22
        self.__started = None
23
        self.__messages = debug_messages
24
        self.__run_cbs = []
25
        self.__stop_cbs = []
26
27
    def __run(self, start_millis):
28
        # Don't allow to run more that once simultaneously
29
        if self.__started is not None:
30
            return
31
32
        # Set starting params
33
        self.__started = (time() * 1000.0) - start_millis
34
        events_left = OrderedDict(sorted(self.__events.items()))
35
36
        # Skip events in the past
37
        for timestamp, event in events_left.copy().items():
38
            if timestamp < start_millis:
39
                del events_left[timestamp]
40
41
        # Keep looping until last event timestamp
42
        end = self.__started + max(self.__events.keys()) + 1000
43
        while end > time() * 1000.0 and self.__started is not None:
44
            # Find all events to run
45
            for timestamp, event in events_left.copy().items():
46
                # Look into the past so we don't ever miss any
47
                if timestamp <= self.progress:
48
                    msg = event.run(self.__started)  # Run
49
                    if self.__messages:  # Debug if needed
50
                        print(msg)
51
                    del events_left[timestamp]  # Remove - we're done with it
52
                else:
53
                    # We're into the future
54
                    break
55
            sleep(0.000001)
56
57
        # Let everyone know we're done
58
        self.__started = None
59
        if self.__messages:
60
            print("Timed events playback completed")
61
62
    def run(self, start_millis: int = 0):
63
        # Create the thread and run loop
64
        thread = Thread(target=self.__run, args=[start_millis], daemon=True)
65
        thread.start()
66
67
        for cb in self.__run_cbs:
68
            thread = Thread(target=cb, daemon=True)
69
            thread.start()
70
71
    def stop(self):
72
        self.__started = None
73
74
        for cb in self.__stop_cbs:
75
            thread = Thread(target=cb, daemon=True)
76
            thread.start()
77
78
        for event in self.__events.values():
79
            event.reset_fired()
80
81
    def toggle_debug_messages(self) -> bool:
82
        self.__messages = not self.__messages
83
        return self.__messages
84
85
    def add_event(self, milliseconds_in: int, callback: callable, *args, name: str = ""):
86
        milliseconds_in = int(milliseconds_in)
87
        if milliseconds_in in self.__events:
88
            raise EventAlreadyExistsException(milliseconds_in)
89
        self.__events[milliseconds_in] = TimedEvent(milliseconds_in, callback, args, name)
90
91
    def remove_event(self, milliseconds_in: int):
92
        milliseconds_in = int(milliseconds_in)
93
        if milliseconds_in in self.__events:
94
            del self.__events[milliseconds_in]
95
96
    def add_run_callback(self, callback: callable):
97
        self.__run_cbs.append(callback)
98
99
    def clear_run_callbacks(self):
100
        self.__run_cbs = []
101
102
    def add_stop_callback(self, callback: callable):
103
        self.__stop_cbs.append(callback)
104
105
    def clear_stop_callbacks(self):
106
        self.__stop_cbs = []
107
108
    @property
109
    def progress(self) -> float:
110
        if self.__started is None:
111
            return 0
112
        return (time() * 1000.0) - self.__started
113
114
    @property
115
    def data(self) -> Dict[str, Any]:
116
        return {
117
            "events": {k: v.data for k, v in OrderedDict(sorted(self.__events.items())).items()},
118
            "progress": "{}ms".format("{:,.4f}".format(self.progress).rstrip("0").rstrip(".")),
119
            "progress_raw": self.progress
120
        }
121
122
    @property
123
    def running(self) -> bool:
124
        return self.__started is not None
125
126
    def sleep_till_done(self):
127
        # Hold until all events completed
128
        while self.__running:
129
            sleep(DEFAULT_INTERVAL)
130