Passed
Push — update-web-timed-events ( 173c1f...5e0b33 )
by Matt
01:08
created

PyDMXControl.utils.timing._TimedEvents   A

Complexity

Total Complexity 31

Size/Duplication

Total Lines 133
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 31
eloc 90
dl 0
loc 133
rs 9.92
c 0
b 0
f 0

15 Methods

Rating   Name   Duplication   Size   Complexity  
A TimedEvents.toggle_debug_messages() 0 3 1
A TimedEvents.__init__() 0 6 1
A TimedEvents.progress() 0 5 2
A TimedEvents.clear_run_callbacks() 0 2 1
A TimedEvents.add_event() 0 5 2
A TimedEvents.data() 0 6 1
A TimedEvents.add_run_callback() 0 2 1
A TimedEvents.clear_stop_callbacks() 0 2 1
A TimedEvents.add_stop_callback() 0 2 1
A TimedEvents.remove_event() 0 4 2
C TimedEvents.__run() 0 34 10
A TimedEvents.stop() 0 10 3
A TimedEvents.run() 0 10 2
A TimedEvents.sleep_till_done() 0 4 2
A TimedEvents.running() 0 3 1
1
"""
2
 *  PyDMXControl: A Python 3 module to control DMX using uDMX.
3
 *                Featuring fixture profiles, built-in effects and a web control panel.
4
 *  <https://github.com/MattIPv4/PyDMXControl/>
5
 *  Copyright (C) 2018 Matt Cowley (MattIPv4) ([email protected])
6
"""
7
8
from collections import OrderedDict
9
from threading import Thread
10
from time import sleep, time
11
from typing import Dict, Any
12
13
from ._TimedEvent import TimedEvent
14
from ..exceptions import EventAlreadyExistsException
15
from ... import DMXMINWAIT
16
17
18
class TimedEvents:
19
20
    def __init__(self, debug_messages: bool = False):
21
        self.__events = {}
22
        self.__started = None
23
        self.__messages = debug_messages
24
        self.__run_cbs = []
25
        self.__stop_cbs = []
26
27
    def __run(self, start_millis):
28
        # Don't allow to run more that once simultaneously
29
        if self.__started is not None:
30
            return
31
32
        # Set starting params
33
        self.__started = (time() * 1000.0) - start_millis
34
        events_left = OrderedDict(sorted(self.__events.items()))
35
36
        # Skip events in the past
37
        for timestamp, event in events_left.copy().items():
38
            if timestamp < start_millis:
39
                del events_left[timestamp]
40
41
        # Keep looping until last event timestamp
42
        end = self.__started + max(self.__events.keys()) + 1000
43
        while end > time() * 1000.0 and self.__started is not None:
44
            # Find all events to run
45
            for timestamp, event in events_left.copy().items():
46
                # Look into the past so we don't ever miss any
47
                if timestamp <= self.progress:
48
                    msg = event.run(self.__started)  # Run
49
                    if self.__messages:  # Debug if needed
50
                        print(msg)
51
                    del events_left[timestamp]  # Remove - we're done with it
52
                else:
53
                    # We're into the future
54
                    break
55
            sleep(0.000001)
56
57
        # Let everyone know we're done
58
        self.__started = None
59
        if self.__messages:
60
            print("Timed events playback completed")
61
62
    def run(self, start_millis: int = 0):
63
        # Create the thread and run loop
64
        thread = Thread(target=self.__run, args=[start_millis])
65
        thread.daemon = True
66
        thread.start()
67
68
        for cb in self.__run_cbs:
69
            thread = Thread(target=cb)
70
            thread.daemon = True
71
            thread.start()
72
73
    def stop(self):
74
        self.__started = None
75
76
        for cb in self.__stop_cbs:
77
            thread = Thread(target=cb)
78
            thread.daemon = True
79
            thread.start()
80
81
        for event in self.__events.values():
82
            event.reset_fired()
83
84
    def toggle_debug_messages(self) -> bool:
85
        self.__messages = not self.__messages
86
        return self.__messages
87
88
    def add_event(self, milliseconds_in: int, callback: callable, *args, name: str = ""):
89
        milliseconds_in = int(milliseconds_in)
90
        if milliseconds_in in self.__events:
91
            raise EventAlreadyExistsException(milliseconds_in)
92
        self.__events[milliseconds_in] = TimedEvent(milliseconds_in, callback, args, name)
93
94
    def remove_event(self, milliseconds_in: int):
95
        milliseconds_in = int(milliseconds_in)
96
        if milliseconds_in in self.__events:
97
            del self.__events[milliseconds_in]
98
99
    def add_run_callback(self, callback: callable):
100
        self.__run_cbs.append(callback)
101
102
    def clear_run_callbacks(self):
103
        self.__run_cbs = []
104
105
    def add_stop_callback(self, callback: callable):
106
        self.__stop_cbs.append(callback)
107
108
    def clear_stop_callbacks(self):
109
        self.__stop_cbs = []
110
111
    @property
112
    def progress(self) -> float:
113
        if self.__started is None:
114
            return 0
115
        return (time() * 1000.0) - self.__started
116
117
    @property
118
    def data(self) -> Dict[str, Any]:
119
        return {
120
            "events": {k: v.data for k, v in OrderedDict(sorted(self.__events.items())).items()},
121
            "progress": "{}ms".format("{:,.4f}".format(self.progress).rstrip("0").rstrip(".")),
122
            "progress_raw": self.progress
123
        }
124
125
    @property
126
    def running(self) -> bool:
127
        return self.__started is not None
128
129
    def sleep_till_done(self):
130
        # Hold until all events completed
131
        while self.__running:
132
            sleep(DMXMINWAIT)
133