Passed
Push — master ( 7edf63...91ee3f )
by Matt
03:10
created

PyDMXControl.controllers._Controller   F

Complexity

Total Complexity 82

Size/Duplication

Total Lines 371
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 82
eloc 220
dl 0
loc 371
rs 2
c 0
b 0
f 0

30 Methods

Rating   Name   Duplication   Size   Complexity  
A ControllerHelpers.all_color() 0 3 2
A ControllerHelpers.all_locate() 0 3 2
A ControllerHelpers.all_dim() 0 3 2
A ControllerHelpers.all_on() 0 3 2
A ControllerHelpers.all_off() 0 3 2
A ControllerHelpers.clear_all_effects() 0 3 2
A Controller.get_fixtures_by_name_include() 0 12 3
C Controller.channels() 0 31 10
A Controller.get_fixture() 0 8 2
A Controller.sleep_till_interrupt() 0 9 3
A Controller.close() 0 14 4
A Controller.add_fixture() 0 21 3
A Controller.del_fixture() 0 10 2
B JSONLoadSave.validate_item() 0 30 6
A Controller.sleep_till_enter() 0 4 1
A Controller.get_fixtures_by_profile() 0 12 3
A Controller.web_control() 0 4 2
A Controller.run() 0 3 1
A Controller.get_frame() 0 16 4
A Controller.debug_control() 0 4 2
A Controller.first_channel() 0 7 1
C JSONLoadSave.load_config() 0 32 9
A JSONLoadSave.save_config() 0 17 5
A Controller.get_all_fixtures() 0 3 1
A Controller.get_fixtures_by_name() 0 12 3
A Controller.dmx_value_warnings() 0 3 1
A Controller.next_channel() 0 7 1
A Controller.__init__() 0 29 1
A JSONLoadSave.__init__() 0 2 1
A Controller.dynamic_frame() 0 3 1

How to fix   Complexity   

Complexity

Complex classes like PyDMXControl.controllers._Controller often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
 *  PyDMXControl: A Python 3 module to control DMX using OpenDMX or uDMX.
3
 *                Featuring fixture profiles, built-in effects and a web control panel.
4
 *  <https://github.com/MattIPv4/PyDMXControl/>
5
 *  Copyright (C) 2022 Matt Cowley (MattIPv4) ([email protected])
6
"""
7
8
import re
9
from importlib import import_module
10
from json import load, dumps, JSONDecodeError
11
from time import sleep
12
from typing import Type, List, Union, Dict, Tuple, Callable
13
from warnings import warn
14
15
from .utils.debug import Debugger
16
from .. import Colors, name, DEFAULT_INTERVAL
17
from ..profiles.defaults import Fixture_Channel, Fixture
18
from ..utils.exceptions import JSONConfigLoadException, LTPCollisionException
19
from ..utils.timing import Ticker
20
from ..web import WebController
21
22
23
class ControllerHelpers:
24
25
    def all_on(self, milliseconds: int = 0):
26
        for fixture in self.get_all_fixtures():
27
            fixture.dim(255, milliseconds)
28
29
    def all_off(self, milliseconds: int = 0):
30
        for fixture in self.get_all_fixtures():
31
            fixture.dim(0, milliseconds)
32
33
    def all_locate(self):
34
        for fixture in self.get_all_fixtures():
35
            fixture.locate()
36
37
    def all_dim(self, value: int, milliseconds: int = 0):
38
        for fixture in self.get_all_fixtures():
39
            fixture.dim(value, milliseconds)
40
41
    def all_color(self, color: Union[Colors, List[int], Tuple[int], str], milliseconds: int = 0):
42
        for fixture in self.get_all_fixtures():
43
            fixture.color(color, milliseconds)
44
45
    def clear_all_effects(self):
46
        for fixture in self.get_all_fixtures():
47
            fixture.clear_effects()
48
49
50
class Controller(ControllerHelpers):
51
52
    def __init__(self, *,
53
                 ltp: bool = True,
54
                 dynamic_frame: bool = False,
55
                 suppress_dmx_value_warnings: bool = False,
56
                 suppress_ticker_behind_warnings: bool = False,
57
                 ticker_interval_millis: float = DEFAULT_INTERVAL * 1000.0,
58
                 ):
59
        # Store all registered fixtures
60
        self.__fixtures = {}
61
62
        # LTP (default) (Latest takes priority, disable for Highest takes priority)
63
        self.__ltp = ltp
64
65
        # Frame data
66
        self.__frame = []
67
        self.__dynamic_frame = dynamic_frame
68
69
        # Ticker for callback
70
        self.ticker = Ticker(ticker_interval_millis, not suppress_ticker_behind_warnings)
71
        self.ticker.start()
72
73
        # Web control attr
74
        self.web = None
75
76
        # JSON load/save
77
        self.json = JSONLoadSave(self)
78
79
        # Warning data
80
        self.__dmx_value_warnings = not suppress_dmx_value_warnings
81
82
    def add_fixture(self, fixture: Union[Fixture, Type[Fixture]], *args, **kwargs) -> Fixture:
83
        # Handle auto inserting
84
        if isinstance(fixture, type):
85
            if "start_channel" not in kwargs:
86
                kwargs["start_channel"] = self.next_channel
87
            fixture = fixture(*args, **kwargs)
88
89
        # Get the next id
90
        fixture_id = (max(list(self.__fixtures.keys()) or [0])) + 1
91
92
        # Tell the fixture its id
93
        fixture.set_id(fixture_id)
94
95
        # Give the fixture this controller
96
        fixture.set_controller(self)
97
98
        # Store the fixture
99
        self.__fixtures[fixture_id] = fixture
100
101
        # Return the updated fixture
102
        return self.__fixtures[fixture_id]
103
104
    def del_fixture(self, fixture_id: int) -> bool:
105
        # Check if the id exists
106
        if fixture_id in self.__fixtures.keys():
107
            # Delete the found fixture
108
            del self.__fixtures[fixture_id]
109
            # Return it was found
110
            return True
111
112
        # Return it wasn't found
113
        return False
114
115
    def get_fixture(self, fixture_id: int) -> Union[Fixture, None]:
116
        # Check if the id exists
117
        if fixture_id in self.__fixtures.keys():
118
            # Return the found fixture
119
            return self.__fixtures[fixture_id]
120
121
        # Give up
122
        return None
123
124
    def get_fixtures_by_profile(self, profile: Type[Fixture]) -> List[Fixture]:
125
        matches = []
126
127
        # Iterate over each fixture id
128
        for fixture_id in self.__fixtures:
129
            # If it matches the given profile
130
            if isinstance(self.__fixtures[fixture_id], profile):
131
                # Store
132
                matches.append(self.__fixtures[fixture_id])
133
134
        # Return any matches
135
        return matches
136
137
    def get_fixtures_by_name(self, fixture_name: str) -> List[Fixture]:
138
        matches = []
139
140
        # Iterate over each fixture id
141
        for fixture_id in self.__fixtures:
142
            # If it matches the given name
143
            if self.__fixtures[fixture_id].name.lower() == fixture_name.lower():
144
                # Store
145
                matches.append(self.__fixtures[fixture_id])
146
147
        # Return any matches
148
        return matches
149
150
    def get_fixtures_by_name_include(self, fixture_name: str) -> List[Fixture]:
151
        matches = []
152
153
        # Iterate over each fixture id
154
        for fixture_id in self.__fixtures:
155
            # If it matches the given name
156
            if fixture_name.lower() in self.__fixtures[fixture_id].name.lower():
157
                # Store
158
                matches.append(self.__fixtures[fixture_id])
159
160
        # Return any matches
161
        return matches
162
163
    def get_all_fixtures(self) -> List[Fixture]:
164
        # Return all the fixtures
165
        return list(self.__fixtures.values())
166
167
    @staticmethod
168
    def sleep_till_enter():
169
        # Hold
170
        input("Press Enter to end sleep...")
171
172
    @staticmethod
173
    def sleep_till_interrupt():
174
        # Hold
175
        try:
176
            while True:
177
                sleep(0.1)
178
        except KeyboardInterrupt:
179
            # We're done
180
            return None
181
182
    def get_frame(self) -> List[int]:
183
        # Generate frame
184
        self.__frame = [0] * 512
185
        first = 1
186
        if self.__dynamic_frame:
187
            first = self.first_channel
188
            self.__frame = [0] * (self.next_channel - first)
189
190
        # Get all channels values
191
        for key, val in self.channels.items():
192
            # If channel in frame
193
            if key - first < len(self.__frame):
194
                self.__frame[key - first] = val[0]
195
196
        # Return populated frame
197
        return self.__frame
198
199
    @property
200
    def dynamic_frame(self) -> bool:
201
        return self.__dynamic_frame
202
203
    @property
204
    def dmx_value_warnings(self) -> bool:
205
        return self.__dmx_value_warnings
206
207
    @property
208
    def channels(self) -> Dict[int, Fixture_Channel]:
209
        channels = {}
210
211
        # Channels for each registered fixture
212
        for chans in [v.channels for v in list(self.__fixtures.values())]:
213
            # Channels in this fixture
214
            for chanid, chanval in chans.items():
215
                chanval = chanval['value']
216
                if chanval[0] == -1:
217
                    chanval[0] = 0
218
219
                # If channel id already set
220
                if chanid in channels.keys():
221
                    if self.__ltp:
222
                        # Handle collision
223
                        if chanval[1] == channels[chanid][1] and chanval[0] != channels[chanid][0]:
224
                            raise LTPCollisionException(chanid)
225
226
                        # LTP
227
                        if chanval[1] > channels[chanid][1]:
228
                            channels[chanid] = chanval
229
                    else:
230
                        # HTP
231
                        if chanval[0] > channels[chanid][0]:
232
                            channels[chanid] = chanval
233
                else:
234
                    channels[chanid] = chanval
235
236
        # Return all the channels
237
        return channels
238
239
    @property
240
    def next_channel(self) -> int:
241
        # Get all channels
242
        channels = list(self.channels.keys())
243
244
        # Return next channel
245
        return max(channels or [0]) + 1
246
247
    @property
248
    def first_channel(self) -> int:
249
        # Get all channels
250
        channels = list(self.channels.keys())
251
252
        # Return next channel
253
        return min(channels or [1])
254
255
    def debug_control(self, callbacks: Dict[str, Callable] = None):
256
        if callbacks is None:
257
            callbacks = {}
258
        Debugger(self, callbacks).run()
259
260
    def web_control(self, *args, **kwargs):
261
        if self.web is not None:
262
            self.web.stop()
263
        self.web = WebController(self, *args, **kwargs)
264
265
    def run(self):
266
        # Method used in transmitting controllers
267
        pass
268
269
    def close(self):
270
        # Stop the ticker
271
        self.ticker.stop()
272
        print("CLOSE: ticker stopped")
273
274
        # Stop any effect tickers
275
        for fixture in self.__fixtures.values():
276
            fixture.clear_effects()
277
        print("CLOSE: all effects cleared")
278
279
        # Stop web
280
        if hasattr(self, "web") and self.web:
281
            self.web.stop()
282
            print("CLOSE: web controller stopped")
283
284
285
class JSONLoadSave:
286
287
    def __init__(self, controller: Controller):
288
        self.controller = controller
289
290
    @staticmethod
291
    def validate_item(index: int, item) -> Tuple[bool, Union[None, Fixture]]:
292
        if not isinstance(item, dict):
293
            warn("Failed to load item {} from JSON, expected dict, got {}".format(index, type(item)))
294
            return False, None
295
296
        if 'type' not in item:
297
            warn("Failed to load item {} from JSON, expected a type property".format(index))
298
            return False, None
299
300
        pattern = re.compile(r"^(([\w\d.]+)\.)*([\w\d]+)$", re.IGNORECASE)
301
        match = pattern.match(item['type'])
302
        if not match:
303
            warn("Failed to load item {} from JSON, failed to parse type '{}'".format(index, item['type']))
304
            return False, None
305
306
        try:
307
            module = import_module(".{}".format(match.group(2)), name + '.profiles')
308
        except ModuleNotFoundError:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable ModuleNotFoundError does not seem to be defined.
Loading history...
309
            warn("Failed to load item {} from JSON, profile module '{}' not found".format(index, match.group(2)))
310
            return False, None
311
312
        try:
313
            module = getattr(module, match.group(3))
314
        except AttributeError:
315
            warn("Failed to load item {} from JSON, profile type '{}' not found in '{}'".format(
316
                index, match.group(3), match.group(2)))
317
            return False, None
318
319
        return True, module
320
321
    def load_config(self, filename: str) -> List[Fixture]:
322
        # Get data
323
        try:
324
            with open(filename) as f:
325
                data = load(f)
326
        except (FileNotFoundError, OSError):
327
            raise JSONConfigLoadException(filename)
328
        except JSONDecodeError:
329
            raise JSONConfigLoadException(filename, "unable to parse contents")
330
331
        if not isinstance(data, list):
332
            raise JSONConfigLoadException(filename, "expected list of dicts, got {}".format(type(data)))
333
334
        # Parse data
335
        fixtures = []
336
        for index, item in enumerate(data):
337
            # Validate entry
338
            success, module = self.validate_item(index, item)
339
            if not success or not module:
340
                continue
341
342
            # Parse args
343
            del item['type']
344
            args = []
345
            if 'args' in item:
346
                args = item['args']
347
                del item['args']
348
349
            # Create
350
            fixtures.append(self.controller.add_fixture(module, *args, **dict(item)))
351
352
        return fixtures
353
354
    def save_config(self, filename: Union[str, None] = None, pretty_print: bool = True) -> str:
355
        # Generate data
356
        data = []
357
        for fixture in self.controller.get_all_fixtures():
358
            data.append(fixture.json_data)
359
360
        # JSON-ify
361
        if pretty_print:
362
            data = dumps(data, indent=4)
363
        else:
364
            data = dumps(data)
365
366
        # Save
367
        if filename:
368
            with open(filename, "w+") as f:
369
                f.write(data)
370
        return data
371