Passed
Push — master ( bbd02a...7ebed9 )
by Matt
02:12
created

uDMXController.__drain()   A

Complexity

Conditions 3

Size

Total Lines 7
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 7
dl 0
loc 7
rs 10
c 0
b 0
f 0
cc 3
nop 1
1
"""
2
 *  PyDMXControl: A Python 3 module to control DMX using uDMX.
3
 *                Featuring fixture profiles, built-in effects and a web control panel.
4
 *  <https://github.com/MattIPv4/PyDMXControl/>
5
 *  Copyright (C) 2022 Matt Cowley (MattIPv4) ([email protected])
6
"""
7
from queue import Queue, Empty
8
from threading import Thread
9
from typing import List
10
11
from pyudmx import pyudmx
12
13
from ._transmittingController import transmittingController
14
15
16
class uDMXController(transmittingController):
17
18
    def __init__(self, *args, **kwargs):
19
        # Device information
20
        self.__udmx_vendor_id = kwargs.pop("udmx_vendor_id", 0x16c0)
21
        self.__udmx_product_id = kwargs.pop("udmx_product_id", 0x5dc)
22
        self.__udmx_bus = kwargs.pop("udmx_bus", None)
23
        self.__udmx_address = kwargs.pop("udmx_address", None)
24
25
        # Store the device
26
        self.__udmx = None
27
        self.__connect()
28
29
        # Create the queue
30
        self.__queue = Queue()
31
        thread = Thread(target=self.__runner, daemon=True)
32
        thread.start()
33
34
        # Create the parent controller
35
        super().__init__(*args, **kwargs)
36
37
    def close(self):
38
        # uDMX
39
        self.__drain()
40
        self.__queue.put(None)
41
        self.__queue.join()
42
        self.__udmx.close()
43
        print("CLOSE: uDMX closed")
44
45
        # Parent
46
        super().close()
47
48
    def _send_data(self):
49
        self.__queue.put_nowait((self.get_frame(), self.first_channel if self.dynamic_frame else 1))
50
51
    def __runner(self):
52
        while True:
53
            # Get next item in queue, or wait for item
54
            item = self.__queue.get()
55
56
            # None is passed to end the runner
57
            if item is None:
58
                self.__queue.task_done()
59
                break
60
61
            # Transmit current frame
62
            self.__transmit(*item)
63
            self.__queue.task_done()
64
65
            # Drain excess frames as they are generated more frequently than uDMX can transmit
66
            self.__drain()
67
68
    def __transmit(self, frame: List[int], first: int):
69
        # Attempt to send data max 5 times, then 2 more with reconnect to device
70
        # Thanks to Dave Hocker (pyudmx author) for giving me this solution to the random usb errors
71
        success = False
72
        retry_count = 0
73
        while not success:
74
            try:
75
                if retry_count > 5:
76
                    self.__connect()
77
                self.__udmx.send_multi_value(first, frame)
78
                success = True
79
            except Exception as e:
80
                retry_count += 1
81
                if retry_count > 7:
82
                    raise e
83
84
    def __connect(self):
85
        # Try to close if exists
86
        if self.__udmx is not None:
87
            try:
88
                self.__udmx.close()
89
            except Exception:
90
                pass
91
92
        # Get new device
93
        self.__udmx = pyudmx.uDMXDevice()
94
        self.__udmx.open(self.__udmx_vendor_id, self.__udmx_product_id, self.__udmx_bus, self.__udmx_address)
95
96
    def __drain(self):
97
        while not self.__queue.empty():
98
            try:
99
                self.__queue.get(False)
100
            except Empty:
101
                continue
102
            self.__queue.task_done()
103