Passed
Push — master ( bbd02a...7ebed9 )
by Matt
02:12
created

PyDMXControl.controllers._Controller   F

Complexity

Total Complexity 81

Size/Duplication

Total Lines 361
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 81
eloc 212
dl 0
loc 361
rs 2
c 0
b 0
f 0

29 Methods

Rating   Name   Duplication   Size   Complexity  
A Controller.get_fixture() 0 8 2
A Controller.add_fixture() 0 21 3
A ControllerHelpers.all_color() 0 3 2
A Controller.del_fixture() 0 10 2
A ControllerHelpers.all_locate() 0 3 2
A Controller.get_fixtures_by_profile() 0 12 3
A ControllerHelpers.all_dim() 0 3 2
A ControllerHelpers.all_on() 0 3 2
A Controller.get_fixtures_by_name() 0 12 3
A ControllerHelpers.all_off() 0 3 2
A Controller.__init__() 0 23 1
A ControllerHelpers.clear_all_effects() 0 3 2
A Controller.get_fixtures_by_name_include() 0 12 3
A Controller.sleep_till_interrupt() 0 9 3
A Controller.sleep_till_enter() 0 4 1
A Controller.get_all_fixtures() 0 3 1
C Controller.channels() 0 31 10
A Controller.close() 0 14 4
B JSONLoadSave.validate_item() 0 30 6
A Controller.web_control() 0 4 2
A Controller.run() 0 3 1
A Controller.get_frame() 0 16 4
A Controller.debug_control() 0 4 2
A Controller.first_channel() 0 7 1
C JSONLoadSave.load_config() 0 32 9
A JSONLoadSave.save_config() 0 17 5
A Controller.next_channel() 0 7 1
A JSONLoadSave.__init__() 0 2 1
A Controller.dynamic_frame() 0 3 1

How to fix   Complexity   

Complexity

Complex classes like PyDMXControl.controllers._Controller often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
 *  PyDMXControl: A Python 3 module to control DMX using uDMX.
3
 *                Featuring fixture profiles, built-in effects and a web control panel.
4
 *  <https://github.com/MattIPv4/PyDMXControl/>
5
 *  Copyright (C) 2022 Matt Cowley (MattIPv4) ([email protected])
6
"""
7
8
import re
9
from importlib import import_module
10
from json import load, dumps, JSONDecodeError
11
from time import sleep
12
from typing import Type, List, Union, Dict, Tuple, Callable
13
from warnings import warn
14
15
from .utils.debug import Debugger
16
from .. import Colors, name
17
from ..profiles.defaults import Fixture_Channel, Fixture
18
from ..utils.exceptions import JSONConfigLoadException, LTPCollisionException
19
from ..utils.timing import Ticker
20
from ..web import WebController
21
22
23
class ControllerHelpers:
24
25
    def all_on(self, milliseconds: int = 0):
26
        for fixture in self.get_all_fixtures():
27
            fixture.dim(255, milliseconds)
28
29
    def all_off(self, milliseconds: int = 0):
30
        for fixture in self.get_all_fixtures():
31
            fixture.dim(0, milliseconds)
32
33
    def all_locate(self):
34
        for fixture in self.get_all_fixtures():
35
            fixture.locate()
36
37
    def all_dim(self, value: int, milliseconds: int = 0):
38
        for fixture in self.get_all_fixtures():
39
            fixture.dim(value, milliseconds)
40
41
    def all_color(self, color: Union[Colors, List[int], Tuple[int], str], milliseconds: int = 0):
42
        for fixture in self.get_all_fixtures():
43
            fixture.color(color, milliseconds)
44
45
    def clear_all_effects(self):
46
        for fixture in self.get_all_fixtures():
47
            fixture.clear_effects()
48
49
50
class Controller(ControllerHelpers):
51
52
    def __init__(self, *, ltp: bool = True, dynamic_frame: bool = False, suppress_dmx_value_warnings: bool = False):
53
        # Store all registered fixtures
54
        self.__fixtures = {}
55
56
        # LTP (default) (Latest takes priority, disable for Highest takes priority)
57
        self.__ltp = ltp
58
59
        # Frame data
60
        self.__frame = []
61
        self.__dynamic_frame = dynamic_frame
62
63
        # Ticker for callback
64
        self.ticker = Ticker()
65
        self.ticker.start()
66
67
        # Web control attr
68
        self.web = None
69
70
        # JSON load/save
71
        self.json = JSONLoadSave(self)
72
73
        # Warning data
74
        self.dmx_value_warnings = not suppress_dmx_value_warnings
75
76
    def add_fixture(self, fixture: Union[Fixture, Type[Fixture]], *args, **kwargs) -> Fixture:
77
        # Handle auto inserting
78
        if isinstance(fixture, type):
79
            if "start_channel" not in kwargs:
80
                kwargs["start_channel"] = self.next_channel
81
            fixture = fixture(*args, **kwargs)
82
83
        # Get the next id
84
        fixture_id = (max(list(self.__fixtures.keys()) or [0])) + 1
85
86
        # Tell the fixture its id
87
        fixture.set_id(fixture_id)
88
89
        # Give the fixture this controller
90
        fixture.set_controller(self)
91
92
        # Store the fixture
93
        self.__fixtures[fixture_id] = fixture
94
95
        # Return the updated fixture
96
        return self.__fixtures[fixture_id]
97
98
    def del_fixture(self, fixture_id: int) -> bool:
99
        # Check if the id exists
100
        if fixture_id in self.__fixtures.keys():
101
            # Delete the found fixture
102
            del self.__fixtures[fixture_id]
103
            # Return it was found
104
            return True
105
106
        # Return it wasn't found
107
        return False
108
109
    def get_fixture(self, fixture_id: int) -> Union[Fixture, None]:
110
        # Check if the id exists
111
        if fixture_id in self.__fixtures.keys():
112
            # Return the found fixture
113
            return self.__fixtures[fixture_id]
114
115
        # Give up
116
        return None
117
118
    def get_fixtures_by_profile(self, profile: Type[Fixture]) -> List[Fixture]:
119
        matches = []
120
121
        # Iterate over each fixture id
122
        for fixture_id in self.__fixtures:
123
            # If it matches the given profile
124
            if isinstance(self.__fixtures[fixture_id], profile):
125
                # Store
126
                matches.append(self.__fixtures[fixture_id])
127
128
        # Return any matches
129
        return matches
130
131
    def get_fixtures_by_name(self, fixture_name: str) -> List[Fixture]:
132
        matches = []
133
134
        # Iterate over each fixture id
135
        for fixture_id in self.__fixtures:
136
            # If it matches the given name
137
            if self.__fixtures[fixture_id].name.lower() == fixture_name.lower():
138
                # Store
139
                matches.append(self.__fixtures[fixture_id])
140
141
        # Return any matches
142
        return matches
143
144
    def get_fixtures_by_name_include(self, fixture_name: str) -> List[Fixture]:
145
        matches = []
146
147
        # Iterate over each fixture id
148
        for fixture_id in self.__fixtures:
149
            # If it matches the given name
150
            if fixture_name.lower() in self.__fixtures[fixture_id].name.lower():
151
                # Store
152
                matches.append(self.__fixtures[fixture_id])
153
154
        # Return any matches
155
        return matches
156
157
    def get_all_fixtures(self) -> List[Fixture]:
158
        # Return all the fixtures
159
        return list(self.__fixtures.values())
160
161
    @staticmethod
162
    def sleep_till_enter():
163
        # Hold
164
        input("Press Enter to end sleep...")
165
166
    @staticmethod
167
    def sleep_till_interrupt():
168
        # Hold
169
        try:
170
            while True:
171
                sleep(0.1)
172
        except KeyboardInterrupt:
173
            # We're done
174
            return None
175
176
    def get_frame(self) -> List[int]:
177
        # Generate frame
178
        self.__frame = [0] * 512
179
        first = 1
180
        if self.__dynamic_frame:
181
            first = self.first_channel
182
            self.__frame = [0] * (self.next_channel - first)
183
184
        # Get all channels values
185
        for key, val in self.channels.items():
186
            # If channel in frame
187
            if key - first < len(self.__frame):
188
                self.__frame[key - first] = val[0]
189
190
        # Return populated frame
191
        return self.__frame
192
193
    @property
194
    def dynamic_frame(self) -> bool:
195
        return self.__dynamic_frame
196
197
    @property
198
    def channels(self) -> Dict[int, Fixture_Channel]:
199
        channels = {}
200
201
        # Channels for each registered fixture
202
        for chans in [v.channels for v in list(self.__fixtures.values())]:
203
            # Channels in this fixture
204
            for chanid, chanval in chans.items():
205
                chanval = chanval['value']
206
                if chanval[0] == -1:
207
                    chanval[0] = 0
208
209
                # If channel id already set
210
                if chanid in channels.keys():
211
                    if self.__ltp:
212
                        # Handle collision
213
                        if chanval[1] == channels[chanid][1] and chanval[0] != channels[chanid][0]:
214
                            raise LTPCollisionException(chanid)
215
216
                        # LTP
217
                        if chanval[1] > channels[chanid][1]:
218
                            channels[chanid] = chanval
219
                    else:
220
                        # HTP
221
                        if chanval[0] > channels[chanid][0]:
222
                            channels[chanid] = chanval
223
                else:
224
                    channels[chanid] = chanval
225
226
        # Return all the channels
227
        return channels
228
229
    @property
230
    def next_channel(self) -> int:
231
        # Get all channels
232
        channels = list(self.channels.keys())
233
234
        # Return next channel
235
        return max(channels or [0]) + 1
236
237
    @property
238
    def first_channel(self) -> int:
239
        # Get all channels
240
        channels = list(self.channels.keys())
241
242
        # Return next channel
243
        return min(channels or [1])
244
245
    def debug_control(self, callbacks: Dict[str, Callable] = None):
246
        if callbacks is None:
247
            callbacks = {}
248
        Debugger(self, callbacks).run()
249
250
    def web_control(self, *args, **kwargs):
251
        if self.web is not None:
252
            self.web.stop()
253
        self.web = WebController(self, *args, **kwargs)
254
255
    def run(self):
256
        # Method used in transmitting controllers
257
        pass
258
259
    def close(self):
260
        # Stop the ticker
261
        self.ticker.stop()
262
        print("CLOSE: ticker stopped")
263
264
        # Stop any effect tickers
265
        for fixture in self.__fixtures.values():
266
            fixture.clear_effects()
267
        print("CLOSE: all effects cleared")
268
269
        # Stop web
270
        if hasattr(self, "web") and self.web:
271
            self.web.stop()
272
            print("CLOSE: web controller stopped")
273
274
275
class JSONLoadSave:
276
277
    def __init__(self, controller: Controller):
278
        self.controller = controller
279
280
    @staticmethod
281
    def validate_item(index: int, item) -> Tuple[bool, Union[None, Fixture]]:
282
        if not isinstance(item, dict):
283
            warn("Failed to load item {} from JSON, expected dict, got {}".format(index, type(item)))
284
            return False, None
285
286
        if 'type' not in item:
287
            warn("Failed to load item {} from JSON, expected a type property".format(index))
288
            return False, None
289
290
        pattern = re.compile(r"^(([\w\d.]+)\.)*([\w\d]+)$", re.IGNORECASE)
291
        match = pattern.match(item['type'])
292
        if not match:
293
            warn("Failed to load item {} from JSON, failed to parse type '{}'".format(index, item['type']))
294
            return False, None
295
296
        try:
297
            module = import_module(".{}".format(match.group(2)), name + '.profiles')
298
        except ModuleNotFoundError:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable ModuleNotFoundError does not seem to be defined.
Loading history...
299
            warn("Failed to load item {} from JSON, profile module '{}' not found".format(index, match.group(2)))
300
            return False, None
301
302
        try:
303
            module = getattr(module, match.group(3))
304
        except AttributeError:
305
            warn("Failed to load item {} from JSON, profile type '{}' not found in '{}'".format(
306
                index, match.group(3), match.group(2)))
307
            return False, None
308
309
        return True, module
310
311
    def load_config(self, filename: str) -> List[Fixture]:
312
        # Get data
313
        try:
314
            with open(filename) as f:
315
                data = load(f)
316
        except (FileNotFoundError, OSError):
317
            raise JSONConfigLoadException(filename)
318
        except JSONDecodeError:
319
            raise JSONConfigLoadException(filename, "unable to parse contents")
320
321
        if not isinstance(data, list):
322
            raise JSONConfigLoadException(filename, "expected list of dicts, got {}".format(type(data)))
323
324
        # Parse data
325
        fixtures = []
326
        for index, item in enumerate(data):
327
            # Validate entry
328
            success, module = self.validate_item(index, item)
329
            if not success or not module:
330
                continue
331
332
            # Parse args
333
            del item['type']
334
            args = []
335
            if 'args' in item:
336
                args = item['args']
337
                del item['args']
338
339
            # Create
340
            fixtures.append(self.controller.add_fixture(module, *args, **dict(item)))
341
342
        return fixtures
343
344
    def save_config(self, filename: Union[str, None] = None, pretty_print: bool = True) -> str:
345
        # Generate data
346
        data = []
347
        for fixture in self.controller.get_all_fixtures():
348
            data.append(fixture.json_data)
349
350
        # JSON-ify
351
        if pretty_print:
352
            data = dumps(data, indent=4)
353
        else:
354
            data = dumps(data)
355
356
        # Save
357
        if filename:
358
            with open(filename, "w+") as f:
359
                f.write(data)
360
        return data
361