Passed
Push — web-timed ( cfeda2...1d9e3e )
by Matt
01:11
created

PyDMXControl.utils.timing._Ticker.Ticker.start()   A

Complexity

Conditions 2

Size

Total Lines 6
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 5
dl 0
loc 6
rs 10
c 0
b 0
f 0
cc 2
nop 1
1
"""
2
 *  PyDMXControl: A Python 3 module to control DMX via Python. Featuring fixture profiles and working with uDMX.
3
 *  <https://github.com/MattIPv4/PyDMXControl/>
4
 *  Copyright (C) 2018 Matt Cowley (MattIPv4) ([email protected])
5
"""
6
7
from threading import Thread
8
from time import sleep, time
9
from typing import Callable
10
11
from ... import DMXMINWAIT
12
13
14
class Ticker:
15
16
    @staticmethod
17
    def millis_now() -> float:
18
        return time() * 1000.0
19
20
    def __init__(self):
21
        self.__interval = 1000.0
22
        self.__last = None
23
        self.__callbacks = []
24
        self.__paused = False
25
        self.__ticking = False
26
        self.thread = None
27
28
    def __ticker(self):
29
        # New
30
        if self.__last is None:
31
            self.__last = self.millis_now()
32
33
        # If diff in milliseconds is interval
34
        if self.millis_now() - self.__last >= self.__interval:
35
            # If have any callbacks
36
            if self.__callbacks:
37
                # Loop over each callback
38
                for callback in self.__callbacks:
39
                    # Check is valid callback
40
                    if callback and callable(callback):
41
                        callback()
42
            # Finished, update last tick time
43
            self.__last = self.millis_now()
44
45
    def __ticker__loop(self):
46
        # Reset
47
        self.__last = None
48
        self.__paused = False
49
        # Use a variable so loop can be stopped
50
        self.__ticking = True
51
        while self.__ticking:
52
            # Allow for pausing
53
            if not self.__paused:
54
                # Call ticker
55
                self.__ticker()
56
            # Sleep DMX delay time
57
            sleep(DMXMINWAIT)
58
59
    def set_interval(self, milliseconds: float):
60
        self.__interval = milliseconds
61
62
    def get_interval(self) -> float:
63
        return self.__interval
64
65
    def set_callback(self, callback: Callable):
66
        self.__callbacks = [callback]
67
68
    def add_callback(self, callback: Callable):
69
        self.__callbacks.append(callback)
70
71
    def remove_callback(self, callback: Callable):
72
        if callback in self.__callbacks:
73
            self.__callbacks.remove(callback)
74
75
    def clear_callbacks(self):
76
        self.__callbacks = []
77
78
    def stop(self):
79
        # Stop the threaded loop
80
        self.__ticking = False
81
82
    @property
83
    def paused(self) -> bool:
84
        return self.__paused
85
86
    def pause(self) -> bool:
87
        # Toggle pause state
88
        self.__paused = not self.__paused
89
        return self.paused
90
91
    def start(self):
92
        if not self.__ticking:
93
            # Create the thread and run loop
94
            self.thread = Thread(target=self.__ticker__loop)
95
            self.thread.daemon = True
96
            self.thread.start()
97