1 | """ |
||
2 | * PyDMXControl: A Python 3 module to control DMX using OpenDMX or uDMX. |
||
3 | * Featuring fixture profiles, built-in effects and a web control panel. |
||
4 | * <https://github.com/MattIPv4/PyDMXControl/> |
||
5 | * Copyright (C) 2022 Matt Cowley (MattIPv4) ([email protected]) |
||
6 | """ |
||
7 | |||
8 | import re |
||
9 | from importlib import import_module |
||
10 | from json import load, dumps, JSONDecodeError |
||
11 | from time import sleep |
||
12 | from typing import Type, List, Union, Dict, Tuple, Callable |
||
13 | from warnings import warn |
||
14 | |||
15 | from .utils.debug import Debugger |
||
16 | from .. import Colors, name, DEFAULT_INTERVAL |
||
17 | from ..profiles.defaults import Fixture_Channel, Fixture |
||
18 | from ..utils.exceptions import JSONConfigLoadException, LTPCollisionException |
||
19 | from ..utils.timing import Ticker |
||
20 | from ..web import WebController |
||
21 | |||
22 | |||
23 | class ControllerHelpers: |
||
24 | |||
25 | def all_on(self, milliseconds: int = 0): |
||
26 | for fixture in self.get_all_fixtures(): |
||
27 | fixture.dim(255, milliseconds) |
||
28 | |||
29 | def all_off(self, milliseconds: int = 0): |
||
30 | for fixture in self.get_all_fixtures(): |
||
31 | fixture.dim(0, milliseconds) |
||
32 | |||
33 | def all_locate(self): |
||
34 | for fixture in self.get_all_fixtures(): |
||
35 | fixture.locate() |
||
36 | |||
37 | def all_dim(self, value: int, milliseconds: int = 0): |
||
38 | for fixture in self.get_all_fixtures(): |
||
39 | fixture.dim(value, milliseconds) |
||
40 | |||
41 | def all_color(self, color: Union[Colors, List[int], Tuple[int], str], milliseconds: int = 0): |
||
42 | for fixture in self.get_all_fixtures(): |
||
43 | fixture.color(color, milliseconds) |
||
44 | |||
45 | def clear_all_effects(self): |
||
46 | for fixture in self.get_all_fixtures(): |
||
47 | fixture.clear_effects() |
||
48 | |||
49 | |||
50 | class Controller(ControllerHelpers): |
||
51 | |||
52 | def __init__(self, *, |
||
53 | ltp: bool = True, |
||
54 | dynamic_frame: bool = False, |
||
55 | suppress_dmx_value_warnings: bool = False, |
||
56 | suppress_ticker_behind_warnings: bool = False, |
||
57 | ticker_interval_millis: float = DEFAULT_INTERVAL * 1000.0, |
||
58 | ): |
||
59 | # Store all registered fixtures |
||
60 | self.__fixtures = {} |
||
61 | |||
62 | # LTP (default) (Latest takes priority, disable for Highest takes priority) |
||
63 | self.__ltp = ltp |
||
64 | |||
65 | # Frame data |
||
66 | self.__frame = [] |
||
67 | self.__dynamic_frame = dynamic_frame |
||
68 | |||
69 | # Ticker for callback |
||
70 | self.ticker = Ticker(ticker_interval_millis, not suppress_ticker_behind_warnings) |
||
71 | self.ticker.start() |
||
72 | |||
73 | # Web control attr |
||
74 | self.web = None |
||
75 | |||
76 | # JSON load/save |
||
77 | self.json = JSONLoadSave(self) |
||
78 | |||
79 | # Warning data |
||
80 | self.__dmx_value_warnings = not suppress_dmx_value_warnings |
||
81 | |||
82 | def add_fixture(self, fixture: Union[Fixture, Type[Fixture]], *args, **kwargs) -> Fixture: |
||
83 | # Handle auto inserting |
||
84 | if isinstance(fixture, type): |
||
85 | if "start_channel" not in kwargs: |
||
86 | kwargs["start_channel"] = self.next_channel |
||
87 | fixture = fixture(*args, **kwargs) |
||
88 | |||
89 | # Get the next id |
||
90 | fixture_id = (max(list(self.__fixtures.keys()) or [0])) + 1 |
||
91 | |||
92 | # Tell the fixture its id |
||
93 | fixture.set_id(fixture_id) |
||
94 | |||
95 | # Give the fixture this controller |
||
96 | fixture.set_controller(self) |
||
97 | |||
98 | # Store the fixture |
||
99 | self.__fixtures[fixture_id] = fixture |
||
100 | |||
101 | # Return the updated fixture |
||
102 | return self.__fixtures[fixture_id] |
||
103 | |||
104 | def del_fixture(self, fixture_id: int) -> bool: |
||
105 | # Check if the id exists |
||
106 | if fixture_id in self.__fixtures.keys(): |
||
107 | # Delete the found fixture |
||
108 | del self.__fixtures[fixture_id] |
||
109 | # Return it was found |
||
110 | return True |
||
111 | |||
112 | # Return it wasn't found |
||
113 | return False |
||
114 | |||
115 | def get_fixture(self, fixture_id: int) -> Union[Fixture, None]: |
||
116 | # Check if the id exists |
||
117 | if fixture_id in self.__fixtures.keys(): |
||
118 | # Return the found fixture |
||
119 | return self.__fixtures[fixture_id] |
||
120 | |||
121 | # Give up |
||
122 | return None |
||
123 | |||
124 | def get_fixtures_by_profile(self, profile: Type[Fixture]) -> List[Fixture]: |
||
125 | matches = [] |
||
126 | |||
127 | # Iterate over each fixture id |
||
128 | for fixture_id in self.__fixtures: |
||
129 | # If it matches the given profile |
||
130 | if isinstance(self.__fixtures[fixture_id], profile): |
||
131 | # Store |
||
132 | matches.append(self.__fixtures[fixture_id]) |
||
133 | |||
134 | # Return any matches |
||
135 | return matches |
||
136 | |||
137 | def get_fixtures_by_name(self, fixture_name: str) -> List[Fixture]: |
||
138 | matches = [] |
||
139 | |||
140 | # Iterate over each fixture id |
||
141 | for fixture_id in self.__fixtures: |
||
142 | # If it matches the given name |
||
143 | if self.__fixtures[fixture_id].name.lower() == fixture_name.lower(): |
||
144 | # Store |
||
145 | matches.append(self.__fixtures[fixture_id]) |
||
146 | |||
147 | # Return any matches |
||
148 | return matches |
||
149 | |||
150 | def get_fixtures_by_name_include(self, fixture_name: str) -> List[Fixture]: |
||
151 | matches = [] |
||
152 | |||
153 | # Iterate over each fixture id |
||
154 | for fixture_id in self.__fixtures: |
||
155 | # If it matches the given name |
||
156 | if fixture_name.lower() in self.__fixtures[fixture_id].name.lower(): |
||
157 | # Store |
||
158 | matches.append(self.__fixtures[fixture_id]) |
||
159 | |||
160 | # Return any matches |
||
161 | return matches |
||
162 | |||
163 | def get_all_fixtures(self) -> List[Fixture]: |
||
164 | # Return all the fixtures |
||
165 | return list(self.__fixtures.values()) |
||
166 | |||
167 | @staticmethod |
||
168 | def sleep_till_enter(): |
||
169 | # Hold |
||
170 | input("Press Enter to end sleep...") |
||
171 | |||
172 | @staticmethod |
||
173 | def sleep_till_interrupt(): |
||
174 | # Hold |
||
175 | try: |
||
176 | while True: |
||
177 | sleep(0.1) |
||
178 | except KeyboardInterrupt: |
||
179 | # We're done |
||
180 | return None |
||
181 | |||
182 | def get_frame(self) -> List[int]: |
||
183 | # Generate frame |
||
184 | self.__frame = [0] * 512 |
||
185 | first = 1 |
||
186 | if self.__dynamic_frame: |
||
187 | first = self.first_channel |
||
188 | self.__frame = [0] * (self.next_channel - first) |
||
189 | |||
190 | # Get all channels values |
||
191 | for key, val in self.channels.items(): |
||
192 | # If channel in frame |
||
193 | if key - first < len(self.__frame): |
||
194 | self.__frame[key - first] = val[0] |
||
195 | |||
196 | # Return populated frame |
||
197 | return self.__frame |
||
198 | |||
199 | @property |
||
200 | def dynamic_frame(self) -> bool: |
||
201 | return self.__dynamic_frame |
||
202 | |||
203 | @property |
||
204 | def dmx_value_warnings(self) -> bool: |
||
205 | return self.__dmx_value_warnings |
||
206 | |||
207 | @property |
||
208 | def channels(self) -> Dict[int, Fixture_Channel]: |
||
209 | channels = {} |
||
210 | |||
211 | # Channels for each registered fixture |
||
212 | for fixture in list(self.__fixtures.values()): |
||
213 | for chanid, chanval in fixture.channels.items(): |
||
214 | chanval = chanval['value'] |
||
215 | |||
216 | # If channel id already set |
||
217 | if chanid in channels.keys(): |
||
218 | if self.__ltp: |
||
219 | # Handle collision |
||
220 | if chanval[1] == channels[chanid][1] and chanval[0] != channels[chanid][0]: |
||
221 | raise LTPCollisionException(chanid) |
||
222 | |||
223 | # LTP |
||
224 | if chanval[1] > channels[chanid][1]: |
||
225 | channels[chanid] = chanval |
||
226 | else: |
||
227 | # HTP |
||
228 | if chanval[0] > channels[chanid][0]: |
||
229 | channels[chanid] = chanval |
||
230 | else: |
||
231 | channels[chanid] = chanval |
||
232 | |||
233 | # Return all the channels |
||
234 | return channels |
||
235 | |||
236 | @property |
||
237 | def next_channel(self) -> int: |
||
238 | # Get all channels |
||
239 | channels = list(self.channels.keys()) |
||
240 | |||
241 | # Return next channel |
||
242 | return max(channels or [0]) + 1 |
||
243 | |||
244 | @property |
||
245 | def first_channel(self) -> int: |
||
246 | # Get all channels |
||
247 | channels = list(self.channels.keys()) |
||
248 | |||
249 | # Return next channel |
||
250 | return min(channels or [1]) |
||
251 | |||
252 | def debug_control(self, callbacks: Dict[str, Callable] = None): |
||
253 | if callbacks is None: |
||
254 | callbacks = {} |
||
255 | Debugger(self, callbacks).run() |
||
256 | |||
257 | def web_control(self, *args, **kwargs): |
||
258 | if self.web is not None: |
||
259 | self.web.stop() |
||
260 | self.web = WebController(self, *args, **kwargs) |
||
261 | |||
262 | def run(self): |
||
263 | # Method used in transmitting controllers |
||
264 | pass |
||
265 | |||
266 | def close(self): |
||
267 | # Stop the ticker |
||
268 | self.ticker.stop() |
||
269 | print("CLOSE: ticker stopped") |
||
270 | |||
271 | # Stop any effect tickers |
||
272 | for fixture in self.__fixtures.values(): |
||
273 | fixture.clear_effects() |
||
274 | print("CLOSE: all effects cleared") |
||
275 | |||
276 | # Stop web |
||
277 | if hasattr(self, "web") and self.web: |
||
278 | self.web.stop() |
||
279 | print("CLOSE: web controller stopped") |
||
280 | |||
281 | |||
282 | class JSONLoadSave: |
||
283 | |||
284 | def __init__(self, controller: Controller): |
||
285 | self.controller = controller |
||
286 | |||
287 | @staticmethod |
||
288 | def validate_item(index: int, item) -> Tuple[bool, Union[None, Fixture]]: |
||
289 | if not isinstance(item, dict): |
||
290 | warn("Failed to load item {} from JSON, expected dict, got {}".format(index, type(item))) |
||
291 | return False, None |
||
292 | |||
293 | if 'type' not in item: |
||
294 | warn("Failed to load item {} from JSON, expected a type property".format(index)) |
||
295 | return False, None |
||
296 | |||
297 | pattern = re.compile(r"^(([\w\d.]+)\.)*([\w\d]+)$", re.IGNORECASE) |
||
298 | match = pattern.match(item['type']) |
||
299 | if not match: |
||
300 | warn("Failed to load item {} from JSON, failed to parse type '{}'".format(index, item['type'])) |
||
301 | return False, None |
||
302 | |||
303 | try: |
||
304 | module = import_module(".{}".format(match.group(2)), name + '.profiles') |
||
305 | except ModuleNotFoundError: |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
![]() |
|||
306 | warn("Failed to load item {} from JSON, profile module '{}' not found".format(index, match.group(2))) |
||
307 | return False, None |
||
308 | |||
309 | try: |
||
310 | module = getattr(module, match.group(3)) |
||
311 | except AttributeError: |
||
312 | warn("Failed to load item {} from JSON, profile type '{}' not found in '{}'".format( |
||
313 | index, match.group(3), match.group(2))) |
||
314 | return False, None |
||
315 | |||
316 | return True, module |
||
317 | |||
318 | def load_config(self, filename: str) -> List[Fixture]: |
||
319 | # Get data |
||
320 | try: |
||
321 | with open(filename) as f: |
||
322 | data = load(f) |
||
323 | except (FileNotFoundError, OSError): |
||
324 | raise JSONConfigLoadException(filename) |
||
325 | except JSONDecodeError: |
||
326 | raise JSONConfigLoadException(filename, "unable to parse contents") |
||
327 | |||
328 | if not isinstance(data, list): |
||
329 | raise JSONConfigLoadException(filename, "expected list of dicts, got {}".format(type(data))) |
||
330 | |||
331 | # Parse data |
||
332 | fixtures = [] |
||
333 | for index, item in enumerate(data): |
||
334 | # Validate entry |
||
335 | success, module = self.validate_item(index, item) |
||
336 | if not success or not module: |
||
337 | continue |
||
338 | |||
339 | # Parse args |
||
340 | del item['type'] |
||
341 | args = [] |
||
342 | if 'args' in item: |
||
343 | args = item['args'] |
||
344 | del item['args'] |
||
345 | |||
346 | # Create |
||
347 | fixtures.append(self.controller.add_fixture(module, *args, **dict(item))) |
||
348 | |||
349 | return fixtures |
||
350 | |||
351 | def save_config(self, filename: Union[str, None] = None, pretty_print: bool = True) -> str: |
||
352 | # Generate data |
||
353 | data = [] |
||
354 | for fixture in self.controller.get_all_fixtures(): |
||
355 | data.append(fixture.json_data) |
||
356 | |||
357 | # JSON-ify |
||
358 | if pretty_print: |
||
359 | data = dumps(data, indent=4) |
||
360 | else: |
||
361 | data = dumps(data) |
||
362 | |||
363 | # Save |
||
364 | if filename: |
||
365 | with open(filename, "w+") as f: |
||
366 | f.write(data) |
||
367 | return data |
||
368 |