Passed
Pull Request — master (#18)
by Matt
118:47 queued 117:45
created

JSONLoadSave.validate_item()   B

Complexity

Conditions 6

Size

Total Lines 30
Code Lines 25

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 25
dl 0
loc 30
rs 8.3466
c 0
b 0
f 0
cc 6
nop 2
1
"""
2
 *  PyDMXControl: A Python 3 module to control DMX via Python. Featuring fixture profiles and working with uDMX.
3
 *  <https://github.com/MattIPv4/PyDMXControl/>
4
 *  Copyright (C) 2018 Matt Cowley (MattIPv4) ([email protected])
5
"""
6
7
import re
8
from importlib import import_module
9
from json import load, dumps, JSONDecodeError
10
from time import sleep
11
from typing import Type, List, Union, Dict, Tuple, Callable
12
from warnings import warn
13
14
from .utils.debug import Debugger
15
from .. import Colors, name
16
from ..profiles.defaults import Fixture_Channel, Fixture
17
from ..utils.exceptions import JSONConfigLoadException, LTPCollisionException
18
from ..utils.timing import DMXMINWAIT, Ticker
19
from ..web import WebController
20
21
22
class ControllerHelpers:
23
24
    def all_on(self, milliseconds: int = 0):
25
        for fixture in self.get_all_fixtures():
26
            fixture.dim(255, milliseconds)
27
28
    def all_off(self, milliseconds: int = 0):
29
        for fixture in self.get_all_fixtures():
30
            fixture.dim(0, milliseconds)
31
32
    def all_locate(self):
33
        for fixture in self.get_all_fixtures():
34
            fixture.locate()
35
36
    def all_dim(self, value: int, milliseconds: int = 0):
37
        for fixture in self.get_all_fixtures():
38
            fixture.dim(value, milliseconds)
39
40
    def all_color(self, color: Union[Colors, List[int], Tuple[int], str], milliseconds: int = 0):
41
        for fixture in self.get_all_fixtures():
42
            fixture.color(color, milliseconds)
43
44
    def clear_all_effects(self):
45
        for fixture in self.get_all_fixtures():
46
            fixture.clear_effects()
47
48
49
class Controller(ControllerHelpers):
50
51
    def __init__(self, *, ltp: bool = True, dynamic_frame: bool = False, suppress_dmx_value_warnings: bool = False):
52
        # Store all registered fixtures
53
        self.__fixtures = {}
54
55
        # LTP (default) (Latest takes priority, disable for Highest takes priority)
56
        self.__ltp = ltp
57
58
        # Frame data
59
        self.__frame = []
60
        self.__dynamic_frame = dynamic_frame
61
62
        # Ticker for callback
63
        self.ticker = Ticker()
64
        self.ticker.start()
65
66
        # Web control attr
67
        self.web = None
68
69
        # JSON load/save
70
        self.json = JSONLoadSave(self)
71
72
        # Warning data
73
        self.dmx_value_warnings = not suppress_dmx_value_warnings
74
75
    def add_fixture(self, fixture: Union[Fixture, Type[Fixture]], *args, **kwargs) -> Fixture:
76
        # Handle auto inserting
77
        if isinstance(fixture, type):
78
            if "start_channel" not in kwargs:
79
                kwargs["start_channel"] = self.next_channel
80
            fixture = fixture(*args, **kwargs)
81
82
        # Get the next id
83
        fixture_id = (max(list(self.__fixtures.keys()) or [0])) + 1
84
85
        # Tell the fixture its id
86
        fixture.set_id(fixture_id)
87
88
        # Give the fixture this controller
89
        fixture.set_controller(self)
90
91
        # Store the fixture
92
        self.__fixtures[fixture_id] = fixture
93
94
        # Return the updated fixture
95
        return self.__fixtures[fixture_id]
96
97
    def del_fixture(self, fixture_id: int) -> bool:
98
        # Check if the id exists
99
        if fixture_id in self.__fixtures.keys():
100
            # Delete the found fixture
101
            del self.__fixtures[fixture_id]
102
            # Return it was found
103
            return True
104
105
        # Return it wasn't found
106
        return False
107
108
    def get_fixture(self, fixture_id: int) -> Union[Fixture, None]:
109
        # Check if the id exists
110
        if fixture_id in self.__fixtures.keys():
111
            # Return the found fixture
112
            return self.__fixtures[fixture_id]
113
114
        # Give up
115
        return None
116
117
    def get_fixtures_by_profile(self, profile: Type[Fixture]) -> List[Fixture]:
118
        matches = []
119
120
        # Iterate over each fixture id
121
        for fixture_id in self.__fixtures:
122
            # If it matches the given profile
123
            if isinstance(self.__fixtures[fixture_id], profile):
124
                # Store
125
                matches.append(self.__fixtures[fixture_id])
126
127
        # Return any matches
128
        return matches
129
130
    def get_fixtures_by_name(self, fixture_name: str) -> List[Fixture]:
131
        matches = []
132
133
        # Iterate over each fixture id
134
        for fixture_id in self.__fixtures:
135
            # If it matches the given name
136
            if self.__fixtures[fixture_id].name.lower() == fixture_name.lower():
137
                # Store
138
                matches.append(self.__fixtures[fixture_id])
139
140
        # Return any matches
141
        return matches
142
143
    def get_all_fixtures(self) -> List[Fixture]:
144
        # Return all the fixtures
145
        return list(self.__fixtures.values())
146
147
    @staticmethod
148
    def sleep_till_enter():
149
        # Hold
150
        input("Press Enter to end sleep...")
151
152
    @staticmethod
153
    def sleep_till_interrupt():
154
        # Hold
155
        try:
156
            while True:
157
                sleep(DMXMINWAIT)
158
        except KeyboardInterrupt:
159
            # We're done
160
            return None
161
162
    def get_frame(self) -> List[int]:
163
        # Generate frame
164
        self.__frame = [0] * 512
165
        if self.__dynamic_frame:
166
            self.__frame = [0] * (self.next_channel - 1)
167
168
        # Get all channels values
169
        for key, val in self.channels.items():
170
            # If channel in frame
171
            if key - 1 < len(self.__frame):
172
                self.__frame[key - 1] = val[0]
173
174
        # Return populated frame
175
        return self.__frame
176
177
    @property
178
    def channels(self) -> Dict[int, Fixture_Channel]:
179
        channels = {}
180
181
        # Channels for each registered fixture
182
        for chans in [v.channels for v in self.__fixtures.values()]:
183
            # Channels in this fixture
184
            for chanid, chanval in chans.items():
185
                chanval = chanval['value']
186
                if chanval[0] == -1:
187
                    chanval[0] = 0
188
189
                # If channel id already set
190
                if chanid in channels.keys():
191
                    if self.__ltp:
192
                        # Handle collision
193
                        if chanval[1] == channels[chanid][1] and chanval[0] != channels[chanid][0]:
194
                            raise LTPCollisionException(chanid)
195
196
                        # LTP
197
                        if chanval[1] > channels[chanid][1]:
198
                            channels[chanid] = chanval
199
                    else:
200
                        # HTP
201
                        if chanval[0] > channels[chanid][0]:
202
                            channels[chanid] = chanval
203
                else:
204
                    channels[chanid] = chanval
205
206
        # Return all the channels
207
        return channels
208
209
    @property
210
    def next_channel(self) -> int:
211
        # Get all channels
212
        channels = list(self.channels.keys())
213
214
        # Return next channel
215
        return max(channels or [0]) + 1
216
217
    def debug_control(self, callbacks: Dict[str, Callable] = None):
218
        if callbacks is None:
219
            callbacks = {}
220
        Debugger(self, callbacks).run()
221
222
    def web_control(self, *args, **kwargs):
223
        if self.web is not None:
224
            self.web.stop()
225
        self.web = WebController(self, *args, **kwargs)
226
227
    def run(self):
228
        # Method used in transmitting controllers
229
        pass
230
231
    def close(self):
232
        # Stop the ticker
233
        self.ticker.stop()
234
        print("CLOSE: ticker stopped")
235
236
        # Stop any effect tickers
237
        for fixture in self.__fixtures.values():
238
            fixture.clear_effects()
239
        print("CLOSE: all effects cleared")
240
241
        # Stop web
242
        if hasattr(self, "web") and self.web:
243
            self.web.stop()
244
            print("CLOSE: web controller stopped")
245
246
247
class JSONLoadSave:
248
249
    def __init__(self, controller: Controller):
250
        self.controller = controller
251
252
    @staticmethod
253
    def validate_item(index: int, item) -> Tuple[bool, Union[None, Fixture]]:
254
        if not isinstance(item, dict):
255
            warn("Failed to load item {} from JSON, expected dict, got {}".format(index, type(item)))
256
            return False, None
257
258
        if 'type' not in item:
259
            warn("Failed to load item {} from JSON, expected a type property".format(index))
260
            return False, None
261
262
        pattern = re.compile(r"^(([\w\d.]+)\.)*([\w\d]+)$", re.IGNORECASE)
263
        match = pattern.match(item['type'])
264
        if not match:
265
            warn("Failed to load item {} from JSON, failed to parse type '{}'".format(index, item['type']))
266
            return False, None
267
268
        try:
269
            module = import_module(".{}".format(match.group(2)), name + '.profiles')
270
        except ModuleNotFoundError:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable ModuleNotFoundError does not seem to be defined.
Loading history...
271
            warn("Failed to load item {} from JSON, profile module '{}' not found".format(index, match.group(2)))
272
            return False, None
273
274
        try:
275
            module = getattr(module, match.group(3))
276
        except AttributeError:
277
            warn("Failed to load item {} from JSON, profile type '{}' not found in '{}'".format(
278
                index, match.group(3), match.group(2)))
279
            return False, None
280
281
        return True, module
282
283
    def load_config(self, filename: str) -> List[Fixture]:
284
        # Get data
285
        try:
286
            with open(filename) as f:
287
                data = load(f)
288
        except (FileNotFoundError, OSError):
289
            raise JSONConfigLoadException(filename)
290
        except JSONDecodeError:
291
            raise JSONConfigLoadException(filename, "unable to parse contents")
292
293
        if not isinstance(data, list):
294
            raise JSONConfigLoadException(filename, "expected list of dicts, got {}".format(type(data)))
295
296
        # Parse data
297
        fixtures = []
298
        for index, item in enumerate(data):
299
            # Validate entry
300
            success, module = self.validate_item(index, item)
301
            if not success or not module:
302
                continue
303
304
            # Parse args
305
            del item['type']
306
            args = []
307
            if 'args' in item:
308
                args = item['args']
309
                del item['args']
310
311
            # Create
312
            fixtures.append(self.controller.add_fixture(module, *args, **dict(item)))
313
314
        return fixtures
315
316
    def save_config(self, filename: Union[str, None] = None, pretty_print: bool = True) -> str:
317
        # Generate data
318
        data = []
319
        for fixture in self.controller.get_all_fixtures():
320
            data.append(fixture.json_data)
321
322
        # JSON-ify
323
        if pretty_print:
324
            data = dumps(data, indent=4)
325
        else:
326
            data = dumps(data)
327
328
        # Save
329
        if filename:
330
            with open(filename, "w+") as f:
331
                f.write(data)
332
        return data
333