Debugger.run_callbacks()   B
last analyzed

Complexity

Conditions 6

Size

Total Lines 34
Code Lines 20

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 20
dl 0
loc 34
rs 8.4666
c 0
b 0
f 0
cc 6
nop 1
1
"""
2
 *  PyDMXControl: A Python 3 module to control DMX using OpenDMX or uDMX.
3
 *                Featuring fixture profiles, built-in effects and a web control panel.
4
 *  <https://github.com/MattIPv4/PyDMXControl/>
5
 *  Copyright (C) 2022 Matt Cowley (MattIPv4) ([email protected])
6
"""
7
8
from collections import namedtuple
9
from inspect import signature, Parameter
10
from re import compile as re_compile
11
from typing import List, Tuple, Union, Dict, Callable
12
13
from ... import Colors
14
from ...profiles.defaults import Fixture, Vdim
15
from ...utils.exceptions import ChannelNotFoundException
16
17
18
class Debugger:
19
20
    def __init__(self, controller: 'Controller', callbacks: Dict[str, Callable] = None):
21
        self.cont = controller
22
        self.cbs = {} if callbacks is None else callbacks
23
24
    def __default_callbacks(self):
25
        # Some default callbacks
26
        if 'all_on' not in self.cbs:
27
            self.cbs['all_on'] = self.cont.all_on
28
        if 'on' not in self.cbs:
29
            self.cbs['on'] = self.cont.all_on
30
31
        if 'all_off' not in self.cbs:
32
            self.cbs['all_off'] = self.cont.all_off
33
        if 'off' not in self.cbs:
34
            self.cbs['off'] = self.cont.all_off
35
36
        if 'all_locate' not in self.cbs:
37
            self.cbs['all_locate'] = self.cont.all_locate
38
        if 'locate' not in self.cbs:
39
            self.cbs['locate'] = self.cont.all_locate
40
41
    def __check_callbacks(self):
42
        for key in self.cbs.keys():
43
            if not self.cbs[key] or not callable(self.cbs[key]):
44
                del self.cbs[key]
45
46
    @staticmethod
47
    def __callbacks_parameters(parameters: List[Parameter]) -> Tuple[list, dict]:
48
        # Given params
49
        ordered_params = []
50
        keyword_params = {}
51
52
        # Go through all params
53
        for param in parameters:
54
            # Basic param information
55
            has_default = (param.default != Parameter.empty)
56
            has_type = (param.annotation != Parameter.empty)
57
            param_type = str(param.annotation) if has_type else "Unknown"
58
            param_default = (", leave blank for default " + str(param.default)) if has_default else ""
59
60
            # Validate the parameter input
61
            given_param = False
62
63
            def valid(this):
64
                # Not started
65
                if this is None:
66
                    return False
67
68
                # Default?
69
                if this.strip() == "":
70
                    if has_default:
71
                        return param.default
72
                    return False
73
74
                # Normal
75
                try:
76
                    return param.annotation(this)
77
                except Exception:
78
                    return this
79
80
            # Get input
81
            while given_param is False:
82
                given_param = valid(input(
83
                    "[Callbacks Debug] Parameter '" + param.name + "' (expects " + param_type + param_default + "): "))
84
85
            # Add to return
86
            if param.kind == Parameter.POSITIONAL_ONLY:
87
                ordered_params.append(given_param)
88
            else:
89
                keyword_params[param.name] = given_param
90
91
        return ordered_params, keyword_params
92
93
    def run_callbacks(self):
94
        # Defaults
95
        self.__default_callbacks()
96
        self.__check_callbacks()
97
98
        # Give callbacks
99
        print("\n[Callbacks Debug] Available callbacks:",
100
              ", ".join(["'" + f + "'" for f in self.cbs.keys()]))
101
        while True:
102
            # Selection
103
            callback = input("\n[Callbacks Debug] Callback Name (or 'exit'): ").strip()
104
105
            # Allow exit
106
            if callback == 'exit':
107
                break
108
109
            # Check it exists
110
            if callback not in self.cbs:
111
                continue
112
113
            # Run the callback
114
            cb = self.cbs[callback]
115
            try:
116
                # Get all parameters
117
                params = signature(cb).parameters.values()
118
                ordered, keyword = self.__callbacks_parameters(params)
119
120
                # Run
121
                res = cb(*ordered, **keyword)
122
            except Exception as e:
123
                print(e)
124
                print("[Callbacks Debug] '" + callback + "' failed.")
125
            else:
126
                print("[Callbacks Debug] Callback '" + callback + "' succeed and returned:", res)
127
128
    @staticmethod
129
    def __fixture_channels(fixture: Fixture) -> List[str]:
130
        names = ["'" + f['name'] + "'" for f in fixture.channels.values()]
131
        if issubclass(type(fixture), Vdim):
132
            names.append("'dimmer'")
133
        return names
134
135
    @staticmethod
136
    def __fixture_channel_value(fixture: Fixture, channel: Union[str, int]) -> int:
137
        if issubclass(type(fixture), Vdim):
138
            return fixture.get_channel_value(channel, False)[0]
139
        return fixture.get_channel_value(channel)[0]
140
141
    @staticmethod
142
    def run_fixture_color(fixture: Fixture):
143
        # Select
144
        select = input("\n[Fixture Debug] Color Name: ").strip().lower()
145
146
        # Try finding enum
147
        color = [c for c in Colors if c.name.lower() == select]
148
149
        # If not enum, try regex, else fetch enum
150
        if not color:
151
            pattern = re_compile(
152
                r"^\s*(\d{1,3})\s*[, ]\s*(\d{1,3})\s*[, ]\s*(\d{1,3})\s*(?:[, ]\s*(\d{1,3})\s*)*$")
153
            match = pattern.match(select)
154
            if not match:
155
                return
156
            color = {"name": "User Input", "value": [int(f) for f in match.groups() if f]}
157
            color = namedtuple("Color", color.keys())(*color.values())
158
        else:
159
            color = color[0]
160
161
        # Apply
162
        fixture.color(color.value)
163
        print("[Fixture Debug] Color set to " + color.name + " (" + Colors.to_print(color.value) + ")")
164
165
    def run_fixture_channel(self, fixture: Fixture):
166
        # Select
167
        channel = input("\n[Channel Debug] Channel Number/Name: ").strip()
168
169
        # Find
170
        try:
171
            channel = fixture.get_channel_id(channel)
172
        except ChannelNotFoundException:
173
            return
174
175
        # Value
176
        value = str(self.__fixture_channel_value(fixture, channel))
177
        value = input("[Channel Debug] Channel Value (Currently " + value + ", leave blank to abort): ").strip()
178
179
        # Abort if bad value
180
        if value == "":
181
            return
182
        if not value.isdigit():
183
            return
184
185
        # Apply
186
        value = int(value)
187
        fixture.set_channel(channel, value)
188
        print("[Channel Debug] Channel '" + str(channel) + "' set to " + str(
189
            self.__fixture_channel_value(fixture, channel)))
190
191
    def run_fixture(self):
192
        fixture = input("\n[Fixture Debug] Fixture ID/Name: ").strip()
193
194
        # Find the fixture
195
        if not fixture.isdigit():
196
            fixture = self.cont.get_fixtures_by_name(fixture)
197
            if fixture:
198
                fixture = fixture[0]
199
        else:
200
            fixture = self.cont.get_fixture(int(fixture))
201
        if not fixture:
202
            return
203
204
        # Fixture debug control
205
        print("\n[Fixture Debug] Fixture selected:", fixture)
206
        while True:
207
208
            # Selection
209
            select = input("\n[Fixture Debug] '1': Channel Select by Number/Name"
210
                           "\n            '2': Channel List"
211
                           "\n            '3': Color (if fixture supports)"
212
                           "\n            '4': Color List"
213
                           "\n            '5': Exit"
214
                           "\nSelection: ").strip()
215
216
            # Channel number/name
217
            if select == '1':
218
                self.run_fixture_channel(fixture)
219
                continue
220
221
            # Channel list
222
            if select == '2':
223
                print("\n[Fixture Debug] Channel List:", ", ".join(self.__fixture_channels(fixture)))
224
                continue
225
226
            # Color select
227
            if select == '3':
228
                self.run_fixture_color(fixture)
229
                continue
230
231
            # Color list
232
            if select == '4':
233
                print("\n[Fixture Debug] Color List:", ", ".join([color.name for color in Colors]))
234
                continue
235
236
            # Exit
237
            if select == '5':
238
                break
239
240
    def run(self):
241
        # DMX debug control
242
        print("[DMX Debug] Currently operating in channels: {}".format("1->{}.".format(self.cont.next_channel - 1) if
243
                                                                       self.cont.next_channel > 1 else "None"))
244
        while True:
245
            # Selection
246
            select = input("\n[DMX Debug] '1': Fixture Select by ID/Name"
247
                           "\n            '2': Fixture List"
248
                           "\n            '3': Callbacks"
249
                           "\n            '4': Exit"
250
                           "\nSelection: ").strip()
251
252
            # Fixture id/name
253
            if select == '1':
254
                self.run_fixture()
255
                continue
256
257
            # Fixture list
258
            if select == '2':
259
                # Compile list
260
                fixtures = []
261
                for f in self.cont.get_all_fixtures():
262
                    fixtures.extend(["\n", f])
263
                # Output
264
                print("\n[DMX Debug] Fixture List:", *fixtures)
265
                continue
266
267
            # Callbacks
268
            if select == '3':
269
                self.run_callbacks()
270
                continue
271
272
            # Exit
273
            if select == '4':
274
                break
275