Controller.get_fixtures_by_profile()   A
last analyzed

Complexity

Conditions 3

Size

Total Lines 12
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 6
dl 0
loc 12
rs 10
c 0
b 0
f 0
cc 3
nop 2
1
"""
2
 *  PyDMXControl: A Python 3 module to control DMX using OpenDMX or uDMX.
3
 *                Featuring fixture profiles, built-in effects and a web control panel.
4
 *  <https://github.com/MattIPv4/PyDMXControl/>
5
 *  Copyright (C) 2022 Matt Cowley (MattIPv4) ([email protected])
6
"""
7
8
import re
9
from importlib import import_module
10
from json import load, dumps, JSONDecodeError
11
from time import sleep
12
from typing import Type, List, Union, Dict, Tuple, Callable
13
from warnings import warn
14
15
from .utils.debug import Debugger
16
from .. import Colors, name, DEFAULT_INTERVAL
17
from ..profiles.defaults import Fixture_Channel, Fixture
18
from ..utils.exceptions import JSONConfigLoadException, LTPCollisionException
19
from ..utils.timing import Ticker
20
from ..web import WebController
21
22
23
class ControllerHelpers:
24
25
    def all_on(self, milliseconds: int = 0):
26
        for fixture in self.get_all_fixtures():
27
            fixture.dim(255, milliseconds)
28
29
    def all_off(self, milliseconds: int = 0):
30
        for fixture in self.get_all_fixtures():
31
            fixture.dim(0, milliseconds)
32
33
    def all_locate(self):
34
        for fixture in self.get_all_fixtures():
35
            fixture.locate()
36
37
    def all_dim(self, value: int, milliseconds: int = 0):
38
        for fixture in self.get_all_fixtures():
39
            fixture.dim(value, milliseconds)
40
41
    def all_color(self, color: Union[Colors, List[int], Tuple[int], str], milliseconds: int = 0):
42
        for fixture in self.get_all_fixtures():
43
            fixture.color(color, milliseconds)
44
45
    def clear_all_effects(self):
46
        for fixture in self.get_all_fixtures():
47
            fixture.clear_effects()
48
49
50
class Controller(ControllerHelpers):
51
52
    def __init__(self, *,
53
                 ltp: bool = True,
54
                 dynamic_frame: bool = False,
55
                 suppress_dmx_value_warnings: bool = False,
56
                 suppress_ticker_behind_warnings: bool = False,
57
                 ticker_interval_millis: float = DEFAULT_INTERVAL * 1000.0,
58
                 ):
59
        # Store all registered fixtures
60
        self.__fixtures = {}
61
62
        # LTP (default) (Latest takes priority, disable for Highest takes priority)
63
        self.__ltp = ltp
64
65
        # Frame data
66
        self.__frame = []
67
        self.__dynamic_frame = dynamic_frame
68
69
        # Ticker for callback
70
        self.ticker = Ticker(ticker_interval_millis, not suppress_ticker_behind_warnings)
71
        self.ticker.start()
72
73
        # Web control attr
74
        self.web = None
75
76
        # JSON load/save
77
        self.json = JSONLoadSave(self)
78
79
        # Warning data
80
        self.__dmx_value_warnings = not suppress_dmx_value_warnings
81
82
    def add_fixture(self, fixture: Union[Fixture, Type[Fixture]], *args, **kwargs) -> Fixture:
83
        # Handle auto inserting
84
        if isinstance(fixture, type):
85
            if "start_channel" not in kwargs:
86
                kwargs["start_channel"] = self.next_channel
87
            fixture = fixture(*args, **kwargs)
88
89
        # Get the next id
90
        fixture_id = (max(list(self.__fixtures.keys()) or [0])) + 1
91
92
        # Tell the fixture its id
93
        fixture.set_id(fixture_id)
94
95
        # Give the fixture this controller
96
        fixture.set_controller(self)
97
98
        # Store the fixture
99
        self.__fixtures[fixture_id] = fixture
100
101
        # Return the updated fixture
102
        return self.__fixtures[fixture_id]
103
104
    def del_fixture(self, fixture_id: int) -> bool:
105
        # Check if the id exists
106
        if fixture_id in self.__fixtures.keys():
107
            # Delete the found fixture
108
            del self.__fixtures[fixture_id]
109
            # Return it was found
110
            return True
111
112
        # Return it wasn't found
113
        return False
114
115
    def get_fixture(self, fixture_id: int) -> Union[Fixture, None]:
116
        # Check if the id exists
117
        if fixture_id in self.__fixtures.keys():
118
            # Return the found fixture
119
            return self.__fixtures[fixture_id]
120
121
        # Give up
122
        return None
123
124
    def get_fixtures_by_profile(self, profile: Type[Fixture]) -> List[Fixture]:
125
        matches = []
126
127
        # Iterate over each fixture id
128
        for fixture_id in self.__fixtures:
129
            # If it matches the given profile
130
            if isinstance(self.__fixtures[fixture_id], profile):
131
                # Store
132
                matches.append(self.__fixtures[fixture_id])
133
134
        # Return any matches
135
        return matches
136
137
    def get_fixtures_by_name(self, fixture_name: str) -> List[Fixture]:
138
        matches = []
139
140
        # Iterate over each fixture id
141
        for fixture_id in self.__fixtures:
142
            # If it matches the given name
143
            if self.__fixtures[fixture_id].name.lower() == fixture_name.lower():
144
                # Store
145
                matches.append(self.__fixtures[fixture_id])
146
147
        # Return any matches
148
        return matches
149
150
    def get_fixtures_by_name_include(self, fixture_name: str) -> List[Fixture]:
151
        matches = []
152
153
        # Iterate over each fixture id
154
        for fixture_id in self.__fixtures:
155
            # If it matches the given name
156
            if fixture_name.lower() in self.__fixtures[fixture_id].name.lower():
157
                # Store
158
                matches.append(self.__fixtures[fixture_id])
159
160
        # Return any matches
161
        return matches
162
163
    def get_all_fixtures(self) -> List[Fixture]:
164
        # Return all the fixtures
165
        return list(self.__fixtures.values())
166
167
    @staticmethod
168
    def sleep_till_enter():
169
        # Hold
170
        input("Press Enter to end sleep...")
171
172
    @staticmethod
173
    def sleep_till_interrupt():
174
        # Hold
175
        try:
176
            while True:
177
                sleep(0.1)
178
        except KeyboardInterrupt:
179
            # We're done
180
            return None
181
182
    def get_frame(self) -> List[int]:
183
        # Generate frame
184
        self.__frame = [0] * 512
185
        first = 1
186
        if self.__dynamic_frame:
187
            first = self.first_channel
188
            self.__frame = [0] * (self.next_channel - first)
189
190
        # Get all channels values
191
        for key, val in self.channels.items():
192
            # If channel in frame
193
            if key - first < len(self.__frame):
194
                self.__frame[key - first] = val[0]
195
196
        # Return populated frame
197
        return self.__frame
198
199
    @property
200
    def dynamic_frame(self) -> bool:
201
        return self.__dynamic_frame
202
203
    @property
204
    def dmx_value_warnings(self) -> bool:
205
        return self.__dmx_value_warnings
206
207
    @property
208
    def channels(self) -> Dict[int, Fixture_Channel]:
209
        channels = {}
210
211
        # Channels for each registered fixture
212
        for fixture in list(self.__fixtures.values()):
213
            for chanid, chanval in fixture.channels.items():
214
                chanval = chanval['value']
215
216
                # If channel id already set
217
                if chanid in channels.keys():
218
                    if self.__ltp:
219
                        # Handle collision
220
                        if chanval[1] == channels[chanid][1] and chanval[0] != channels[chanid][0]:
221
                            raise LTPCollisionException(chanid)
222
223
                        # LTP
224
                        if chanval[1] > channels[chanid][1]:
225
                            channels[chanid] = chanval
226
                    else:
227
                        # HTP
228
                        if chanval[0] > channels[chanid][0]:
229
                            channels[chanid] = chanval
230
                else:
231
                    channels[chanid] = chanval
232
233
        # Return all the channels
234
        return channels
235
236
    @property
237
    def next_channel(self) -> int:
238
        # Get all channels
239
        channels = list(self.channels.keys())
240
241
        # Return next channel
242
        return max(channels or [0]) + 1
243
244
    @property
245
    def first_channel(self) -> int:
246
        # Get all channels
247
        channels = list(self.channels.keys())
248
249
        # Return next channel
250
        return min(channels or [1])
251
252
    def debug_control(self, callbacks: Dict[str, Callable] = None):
253
        if callbacks is None:
254
            callbacks = {}
255
        Debugger(self, callbacks).run()
256
257
    def web_control(self, *args, **kwargs):
258
        if self.web is not None:
259
            self.web.stop()
260
        self.web = WebController(self, *args, **kwargs)
261
262
    def run(self):
263
        # Method used in transmitting controllers
264
        pass
265
266
    def close(self):
267
        # Stop the ticker
268
        self.ticker.stop()
269
        print("CLOSE: ticker stopped")
270
271
        # Stop any effect tickers
272
        for fixture in self.__fixtures.values():
273
            fixture.clear_effects()
274
        print("CLOSE: all effects cleared")
275
276
        # Stop web
277
        if hasattr(self, "web") and self.web:
278
            self.web.stop()
279
            print("CLOSE: web controller stopped")
280
281
282
class JSONLoadSave:
283
284
    def __init__(self, controller: Controller):
285
        self.controller = controller
286
287
    @staticmethod
288
    def validate_item(index: int, item) -> Tuple[bool, Union[None, Fixture]]:
289
        if not isinstance(item, dict):
290
            warn("Failed to load item {} from JSON, expected dict, got {}".format(index, type(item)))
291
            return False, None
292
293
        if 'type' not in item:
294
            warn("Failed to load item {} from JSON, expected a type property".format(index))
295
            return False, None
296
297
        pattern = re.compile(r"^(([\w\d.]+)\.)*([\w\d]+)$", re.IGNORECASE)
298
        match = pattern.match(item['type'])
299
        if not match:
300
            warn("Failed to load item {} from JSON, failed to parse type '{}'".format(index, item['type']))
301
            return False, None
302
303
        try:
304
            module = import_module(".{}".format(match.group(2)), name + '.profiles')
305
        except ModuleNotFoundError:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable ModuleNotFoundError does not seem to be defined.
Loading history...
306
            warn("Failed to load item {} from JSON, profile module '{}' not found".format(index, match.group(2)))
307
            return False, None
308
309
        try:
310
            module = getattr(module, match.group(3))
311
        except AttributeError:
312
            warn("Failed to load item {} from JSON, profile type '{}' not found in '{}'".format(
313
                index, match.group(3), match.group(2)))
314
            return False, None
315
316
        return True, module
317
318
    def load_config(self, filename: str) -> List[Fixture]:
319
        # Get data
320
        try:
321
            with open(filename) as f:
322
                data = load(f)
323
        except (FileNotFoundError, OSError):
324
            raise JSONConfigLoadException(filename)
325
        except JSONDecodeError:
326
            raise JSONConfigLoadException(filename, "unable to parse contents")
327
328
        if not isinstance(data, list):
329
            raise JSONConfigLoadException(filename, "expected list of dicts, got {}".format(type(data)))
330
331
        # Parse data
332
        fixtures = []
333
        for index, item in enumerate(data):
334
            # Validate entry
335
            success, module = self.validate_item(index, item)
336
            if not success or not module:
337
                continue
338
339
            # Parse args
340
            del item['type']
341
            args = []
342
            if 'args' in item:
343
                args = item['args']
344
                del item['args']
345
346
            # Create
347
            fixtures.append(self.controller.add_fixture(module, *args, **dict(item)))
348
349
        return fixtures
350
351
    def save_config(self, filename: Union[str, None] = None, pretty_print: bool = True) -> str:
352
        # Generate data
353
        data = []
354
        for fixture in self.controller.get_all_fixtures():
355
            data.append(fixture.json_data)
356
357
        # JSON-ify
358
        if pretty_print:
359
            data = dumps(data, indent=4)
360
        else:
361
            data = dumps(data)
362
363
        # Save
364
        if filename:
365
            with open(filename, "w+") as f:
366
                f.write(data)
367
        return data
368