Passed
Push — master ( bbd02a...7ebed9 )
by Matt
02:12
created

Ticker.__ticker()   A

Complexity

Conditions 4

Size

Total Lines 11
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 7
dl 0
loc 11
rs 10
c 0
b 0
f 0
cc 4
nop 1
1
"""
2
 *  PyDMXControl: A Python 3 module to control DMX using uDMX.
3
 *                Featuring fixture profiles, built-in effects and a web control panel.
4
 *  <https://github.com/MattIPv4/PyDMXControl/>
5
 *  Copyright (C) 2022 Matt Cowley (MattIPv4) ([email protected])
6
"""
7
8
from inspect import getframeinfo, stack
9
from threading import Thread
10
from time import sleep, time
11
from typing import Callable
12
from warnings import warn
13
14
from ..exceptions import InvalidArgumentException
15
from ... import DMXMINWAIT
16
17
18
class Callback:
19
20
    def __init__(self, callback, interval, last, source):
21
        if not callable(callback):
22
            raise InvalidArgumentException('callback', 'Not callable')
23
24
        self.callback = callback
25
        self.interval = interval
26
        self.last = last
27
        self.source = source
28
29
30
class Ticker:
31
32
    @staticmethod
33
    def millis_now() -> float:
34
        return time() * 1000.0
35
36
    def __init__(self):
37
        self.__callbacks = []
38
        self.__paused = False
39
        self.__ticking = False
40
41
    def __ticker(self):
42
        # Loop over each callback
43
        for callback in self.__callbacks:
44
            # New
45
            if callback.last is None:
46
                callback.last = self.millis_now()
47
48
            # If diff in milliseconds is interval, run
49
            if self.millis_now() - callback.last >= callback.interval:
50
                callback.callback()
51
                callback.last = self.millis_now()
52
53
    def __ticker__loop(self):
54
        # Reset
55
        for callback in self.__callbacks:
56
            callback.last = None
57
        self.__paused = False
58
59
        # Use a variable so loop can be stopped
60
        self.__ticking = True
61
        while self.__ticking:
62
            # Track start time
63
            loop_start = self.millis_now()
64
65
            # Call ticker
66
            if not self.__paused:
67
                self.__ticker()
68
69
            # Get end time and duration
70
            loop_end = self.millis_now()
71
            loop_dur = loop_end - loop_start
72
            wait_dur = DMXMINWAIT * 1000.0 - loop_dur
73
74
            # Handle negative wait
75
            if wait_dur < 0:
76
                warn("Ticker loop behind by {:,}ms, took {:,}ms".format(-wait_dur, loop_dur))
77
                continue
78
79
            # Sleep DMX delay time
80
            sleep(wait_dur / 1000.0)
81
82
    def add_callback(self, callback: Callable, interval_millis: float = 1000.0):
83
        self.__callbacks.append(Callback(callback, interval_millis, None, getframeinfo(stack()[1][0])))
84
85
    def remove_callback(self, callback: Callable):
86
        idx = [i for i, cb in enumerate(self.__callbacks) if cb.callback == callback]
87
        if len(idx):
88
            del self.__callbacks[idx[0]]
89
90
    def clear_callbacks(self):
91
        self.__callbacks = []
92
93
    def stop(self):
94
        # Stop the threaded loop
95
        self.__ticking = False
96
97
    @property
98
    def paused(self) -> bool:
99
        return self.__paused
100
101
    def pause(self) -> bool:
102
        # Toggle pause state
103
        self.__paused = not self.__paused
104
        return self.paused
105
106
    def start(self):
107
        if not self.__ticking:
108
            # Create the thread and run loop
109
            thread = Thread(target=self.__ticker__loop, daemon=True)
110
            thread.start()
111