Passed
Push — master ( 7edf63...91ee3f )
by Matt
03:10
created

PyDMXControl.utils.timing._Ticker   A

Complexity

Total Complexity 23

Size/Duplication

Total Lines 114
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 23
eloc 71
dl 0
loc 114
rs 10
c 0
b 0
f 0

12 Methods

Rating   Name   Duplication   Size   Complexity  
A Callback.__init__() 0 8 2
A Ticker.millis_now() 0 3 1
A Ticker.__init__() 0 6 1
A Ticker.remove_callback() 0 4 2
A Ticker.paused() 0 3 1
A Ticker.__ticker() 0 11 4
A Ticker.clear_callbacks() 0 2 1
A Ticker.start() 0 5 2
A Ticker.stop() 0 3 1
A Ticker.pause() 0 4 1
B Ticker.__ticker__loop() 0 29 6
A Ticker.add_callback() 0 2 1
1
"""
2
 *  PyDMXControl: A Python 3 module to control DMX using OpenDMX or uDMX.
3
 *                Featuring fixture profiles, built-in effects and a web control panel.
4
 *  <https://github.com/MattIPv4/PyDMXControl/>
5
 *  Copyright (C) 2022 Matt Cowley (MattIPv4) ([email protected])
6
"""
7
8
from inspect import getframeinfo, stack
9
from threading import Thread
10
from time import sleep, time
11
from typing import Callable
12
from warnings import warn
13
14
from ..exceptions import InvalidArgumentException
15
from ... import DEFAULT_INTERVAL
16
17
18
class Callback:
19
20
    def __init__(self, callback, interval, last, source):
21
        if not callable(callback):
22
            raise InvalidArgumentException('callback', 'Not callable')
23
24
        self.callback = callback
25
        self.interval = interval
26
        self.last = last
27
        self.source = source
28
29
30
class Ticker:
31
32
    @staticmethod
33
    def millis_now() -> float:
34
        return time() * 1000.0
35
36
    def __init__(self, interval_millis: float = DEFAULT_INTERVAL * 1000.0, warn_on_behind: bool = True):
37
        self.__callbacks = []
38
        self.__paused = False
39
        self.__ticking = False
40
        self.__interval = interval_millis
41
        self.__warn_on_behind = warn_on_behind
42
43
    def __ticker(self):
44
        # Loop over each callback
45
        for callback in self.__callbacks:
46
            # New
47
            if callback.last is None:
48
                callback.last = self.millis_now()
49
50
            # If diff in milliseconds is interval, run
51
            if self.millis_now() - callback.last >= callback.interval:
52
                callback.callback()
53
                callback.last = self.millis_now()
54
55
    def __ticker__loop(self):
56
        # Reset
57
        for callback in self.__callbacks:
58
            callback.last = None
59
        self.__paused = False
60
61
        # Use a variable so loop can be stopped
62
        self.__ticking = True
63
        while self.__ticking:
64
            # Track start time
65
            loop_start = self.millis_now()
66
67
            # Call ticker
68
            if not self.__paused:
69
                self.__ticker()
70
71
            # Get end time and duration
72
            loop_end = self.millis_now()
73
            loop_dur = loop_end - loop_start
74
            wait_dur = self.__interval - loop_dur
75
76
            # Handle negative wait
77
            if wait_dur < 0:
78
                if self.__warn_on_behind:
79
                    warn("Ticker loop behind by {:,}ms, took {:,}ms".format(-wait_dur, loop_dur))
80
                continue
81
82
            # Sleep DMX delay time
83
            sleep(wait_dur / 1000.0)
84
85
    def add_callback(self, callback: Callable, interval_millis: float = 1000.0):
86
        self.__callbacks.append(Callback(callback, interval_millis, None, getframeinfo(stack()[1][0])))
87
88
    def remove_callback(self, callback: Callable):
89
        idx = [i for i, cb in enumerate(self.__callbacks) if cb.callback == callback]
90
        if len(idx):
91
            del self.__callbacks[idx[0]]
92
93
    def clear_callbacks(self):
94
        self.__callbacks = []
95
96
    def stop(self):
97
        # Stop the threaded loop
98
        self.__ticking = False
99
100
    @property
101
    def paused(self) -> bool:
102
        return self.__paused
103
104
    def pause(self) -> bool:
105
        # Toggle pause state
106
        self.__paused = not self.__paused
107
        return self.paused
108
109
    def start(self):
110
        if not self.__ticking:
111
            # Create the thread and run loop
112
            thread = Thread(target=self.__ticker__loop, daemon=True)
113
            thread.start()
114