Passed
Push — master ( 207a08...b801a7 )
by Matt
02:53
created

PyDMXControl.utils.timing._Ticker.Ticker.paused()   A

Complexity

Conditions 1

Size

Total Lines 3
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 3
dl 0
loc 3
rs 10
c 0
b 0
f 0
cc 1
nop 1
1
"""
2
 *  PyDMXControl: A Python 3 module to control DMX using uDMX.
3
 *                Featuring fixture profiles, built-in effects and a web control panel.
4
 *  <https://github.com/MattIPv4/PyDMXControl/>
5
 *  Copyright (C) 2021 Matt Cowley (MattIPv4) ([email protected])
6
"""
7
8
from threading import Thread
9
from time import sleep, time
10
from typing import Callable
11
12
from ... import DMXMINWAIT
13
14
15
class Ticker:
16
17
    @staticmethod
18
    def millis_now() -> float:
19
        return time() * 1000.0
20
21
    def __init__(self):
22
        self.__callbacks = []
23
        self.__paused = False
24
        self.__ticking = False
25
        self.thread = None
26
27
    def __ticker(self):
28
        # Loop over each callback
29
        for callback in self.__callbacks:
30
            # New
31
            if callback["last"] is None:
32
                callback["last"] = self.millis_now()
33
34
            # If diff in milliseconds is interval
35
            if self.millis_now() - callback["last"] >= callback["interval"]:
36
                # Check is valid callback
37
                if callback["callback"] and callable(callback["callback"]):
38
                    callback["callback"]()
39
40
                    # Finished, update last tick time
41
                    callback["last"] = self.millis_now()
42
43
    def __ticker__loop(self):
44
        # Reset
45
        for callback in self.__callbacks:
46
            callback["last"] = None
47
        self.__paused = False
48
        # Use a variable so loop can be stopped
49
        self.__ticking = True
50
        while self.__ticking:
51
            # Allow for pausing
52
            if not self.__paused:
53
                # Call ticker
54
                self.__ticker()
55
            # Sleep DMX delay time
56
            sleep(DMXMINWAIT)
57
58
    def add_callback(self, callback: Callable, interval_millis: float = 1000.0):
59
        self.__callbacks.append({
60
            "callback": callback,
61
            "interval": interval_millis,
62
            "last": None
63
        })
64
65
    def remove_callback(self, callback: Callable):
66
        idx = [i for i, cb in enumerate(self.__callbacks) if cb["callback"] == callback]
67
        if len(idx):
68
            del self.__callbacks[idx[0]]
69
70
    def clear_callbacks(self):
71
        self.__callbacks = []
72
73
    def stop(self):
74
        # Stop the threaded loop
75
        self.__ticking = False
76
77
    @property
78
    def paused(self) -> bool:
79
        return self.__paused
80
81
    def pause(self) -> bool:
82
        # Toggle pause state
83
        self.__paused = not self.__paused
84
        return self.paused
85
86
    def start(self):
87
        if not self.__ticking:
88
            # Create the thread and run loop
89
            self.thread = Thread(target=self.__ticker__loop)
90
            self.thread.daemon = True
91
            self.thread.start()
92