Passed
Push — master ( 0be0f8...9b5308 )
by Matt
02:56
created

PyDMXControl.controllers._Controller   F

Complexity

Total Complexity 79

Size/Duplication

Total Lines 347
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 79
eloc 203
dl 0
loc 347
rs 2.08
c 0
b 0
f 0

27 Methods

Rating   Name   Duplication   Size   Complexity  
A Controller.get_fixture() 0 8 2
A Controller.add_fixture() 0 21 3
A ControllerHelpers.all_color() 0 3 2
A Controller.del_fixture() 0 10 2
A ControllerHelpers.all_locate() 0 3 2
A Controller.get_fixtures_by_profile() 0 12 3
A ControllerHelpers.all_dim() 0 3 2
A ControllerHelpers.all_on() 0 3 2
A Controller.get_fixtures_by_name() 0 12 3
A ControllerHelpers.all_off() 0 3 2
A Controller.__init__() 0 23 1
A ControllerHelpers.clear_all_effects() 0 3 2
A Controller.get_fixtures_by_name_include() 0 12 3
C Controller.channels() 0 31 10
A Controller.sleep_till_interrupt() 0 9 3
A Controller.close() 0 14 4
B JSONLoadSave.validate_item() 0 30 6
A Controller.sleep_till_enter() 0 4 1
A Controller.web_control() 0 4 2
A Controller.run() 0 3 1
A Controller.get_frame() 0 14 4
A Controller.debug_control() 0 4 2
C JSONLoadSave.load_config() 0 32 9
A JSONLoadSave.save_config() 0 17 5
A Controller.get_all_fixtures() 0 3 1
A Controller.next_channel() 0 7 1
A JSONLoadSave.__init__() 0 2 1

How to fix   Complexity   

Complexity

Complex classes like PyDMXControl.controllers._Controller often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
 *  PyDMXControl: A Python 3 module to control DMX using uDMX.
3
 *                Featuring fixture profiles, built-in effects and a web control panel.
4
 *  <https://github.com/MattIPv4/PyDMXControl/>
5
 *  Copyright (C) 2018 Matt Cowley (MattIPv4) ([email protected])
6
"""
7
8
import re
9
from importlib import import_module
10
from json import load, dumps, JSONDecodeError
11
from time import sleep
12
from typing import Type, List, Union, Dict, Tuple, Callable
13
from warnings import warn
14
15
from .utils.debug import Debugger
16
from .. import Colors, name, DMXMINWAIT
17
from ..profiles.defaults import Fixture_Channel, Fixture
18
from ..utils.exceptions import JSONConfigLoadException, LTPCollisionException
19
from ..utils.timing import Ticker
20
from ..web import WebController
21
22
23
class ControllerHelpers:
24
25
    def all_on(self, milliseconds: int = 0):
26
        for fixture in self.get_all_fixtures():
27
            fixture.dim(255, milliseconds)
28
29
    def all_off(self, milliseconds: int = 0):
30
        for fixture in self.get_all_fixtures():
31
            fixture.dim(0, milliseconds)
32
33
    def all_locate(self):
34
        for fixture in self.get_all_fixtures():
35
            fixture.locate()
36
37
    def all_dim(self, value: int, milliseconds: int = 0):
38
        for fixture in self.get_all_fixtures():
39
            fixture.dim(value, milliseconds)
40
41
    def all_color(self, color: Union[Colors, List[int], Tuple[int], str], milliseconds: int = 0):
42
        for fixture in self.get_all_fixtures():
43
            fixture.color(color, milliseconds)
44
45
    def clear_all_effects(self):
46
        for fixture in self.get_all_fixtures():
47
            fixture.clear_effects()
48
49
50
class Controller(ControllerHelpers):
51
52
    def __init__(self, *, ltp: bool = True, dynamic_frame: bool = False, suppress_dmx_value_warnings: bool = False):
53
        # Store all registered fixtures
54
        self.__fixtures = {}
55
56
        # LTP (default) (Latest takes priority, disable for Highest takes priority)
57
        self.__ltp = ltp
58
59
        # Frame data
60
        self.__frame = []
61
        self.__dynamic_frame = dynamic_frame
62
63
        # Ticker for callback
64
        self.ticker = Ticker()
65
        self.ticker.start()
66
67
        # Web control attr
68
        self.web = None
69
70
        # JSON load/save
71
        self.json = JSONLoadSave(self)
72
73
        # Warning data
74
        self.dmx_value_warnings = not suppress_dmx_value_warnings
75
76
    def add_fixture(self, fixture: Union[Fixture, Type[Fixture]], *args, **kwargs) -> Fixture:
77
        # Handle auto inserting
78
        if isinstance(fixture, type):
79
            if "start_channel" not in kwargs:
80
                kwargs["start_channel"] = self.next_channel
81
            fixture = fixture(*args, **kwargs)
82
83
        # Get the next id
84
        fixture_id = (max(list(self.__fixtures.keys()) or [0])) + 1
85
86
        # Tell the fixture its id
87
        fixture.set_id(fixture_id)
88
89
        # Give the fixture this controller
90
        fixture.set_controller(self)
91
92
        # Store the fixture
93
        self.__fixtures[fixture_id] = fixture
94
95
        # Return the updated fixture
96
        return self.__fixtures[fixture_id]
97
98
    def del_fixture(self, fixture_id: int) -> bool:
99
        # Check if the id exists
100
        if fixture_id in self.__fixtures.keys():
101
            # Delete the found fixture
102
            del self.__fixtures[fixture_id]
103
            # Return it was found
104
            return True
105
106
        # Return it wasn't found
107
        return False
108
109
    def get_fixture(self, fixture_id: int) -> Union[Fixture, None]:
110
        # Check if the id exists
111
        if fixture_id in self.__fixtures.keys():
112
            # Return the found fixture
113
            return self.__fixtures[fixture_id]
114
115
        # Give up
116
        return None
117
118
    def get_fixtures_by_profile(self, profile: Type[Fixture]) -> List[Fixture]:
119
        matches = []
120
121
        # Iterate over each fixture id
122
        for fixture_id in self.__fixtures:
123
            # If it matches the given profile
124
            if isinstance(self.__fixtures[fixture_id], profile):
125
                # Store
126
                matches.append(self.__fixtures[fixture_id])
127
128
        # Return any matches
129
        return matches
130
131
    def get_fixtures_by_name(self, fixture_name: str) -> List[Fixture]:
132
        matches = []
133
134
        # Iterate over each fixture id
135
        for fixture_id in self.__fixtures:
136
            # If it matches the given name
137
            if self.__fixtures[fixture_id].name.lower() == fixture_name.lower():
138
                # Store
139
                matches.append(self.__fixtures[fixture_id])
140
141
        # Return any matches
142
        return matches
143
144
    def get_fixtures_by_name_include(self, fixture_name: str) -> List[Fixture]:
145
        matches = []
146
147
        # Iterate over each fixture id
148
        for fixture_id in self.__fixtures:
149
            # If it matches the given name
150
            if fixture_name.lower() in self.__fixtures[fixture_id].name.lower():
151
                # Store
152
                matches.append(self.__fixtures[fixture_id])
153
154
        # Return any matches
155
        return matches
156
157
    def get_all_fixtures(self) -> List[Fixture]:
158
        # Return all the fixtures
159
        return list(self.__fixtures.values())
160
161
    @staticmethod
162
    def sleep_till_enter():
163
        # Hold
164
        input("Press Enter to end sleep...")
165
166
    @staticmethod
167
    def sleep_till_interrupt():
168
        # Hold
169
        try:
170
            while True:
171
                sleep(DMXMINWAIT)
172
        except KeyboardInterrupt:
173
            # We're done
174
            return None
175
176
    def get_frame(self) -> List[int]:
177
        # Generate frame
178
        self.__frame = [0] * 512
179
        if self.__dynamic_frame:
180
            self.__frame = [0] * (self.next_channel - 1)
181
182
        # Get all channels values
183
        for key, val in self.channels.items():
184
            # If channel in frame
185
            if key - 1 < len(self.__frame):
186
                self.__frame[key - 1] = val[0]
187
188
        # Return populated frame
189
        return self.__frame
190
191
    @property
192
    def channels(self) -> Dict[int, Fixture_Channel]:
193
        channels = {}
194
195
        # Channels for each registered fixture
196
        for chans in [v.channels for v in self.__fixtures.values()]:
197
            # Channels in this fixture
198
            for chanid, chanval in chans.items():
199
                chanval = chanval['value']
200
                if chanval[0] == -1:
201
                    chanval[0] = 0
202
203
                # If channel id already set
204
                if chanid in channels.keys():
205
                    if self.__ltp:
206
                        # Handle collision
207
                        if chanval[1] == channels[chanid][1] and chanval[0] != channels[chanid][0]:
208
                            raise LTPCollisionException(chanid)
209
210
                        # LTP
211
                        if chanval[1] > channels[chanid][1]:
212
                            channels[chanid] = chanval
213
                    else:
214
                        # HTP
215
                        if chanval[0] > channels[chanid][0]:
216
                            channels[chanid] = chanval
217
                else:
218
                    channels[chanid] = chanval
219
220
        # Return all the channels
221
        return channels
222
223
    @property
224
    def next_channel(self) -> int:
225
        # Get all channels
226
        channels = list(self.channels.keys())
227
228
        # Return next channel
229
        return max(channels or [0]) + 1
230
231
    def debug_control(self, callbacks: Dict[str, Callable] = None):
232
        if callbacks is None:
233
            callbacks = {}
234
        Debugger(self, callbacks).run()
235
236
    def web_control(self, *args, **kwargs):
237
        if self.web is not None:
238
            self.web.stop()
239
        self.web = WebController(self, *args, **kwargs)
240
241
    def run(self):
242
        # Method used in transmitting controllers
243
        pass
244
245
    def close(self):
246
        # Stop the ticker
247
        self.ticker.stop()
248
        print("CLOSE: ticker stopped")
249
250
        # Stop any effect tickers
251
        for fixture in self.__fixtures.values():
252
            fixture.clear_effects()
253
        print("CLOSE: all effects cleared")
254
255
        # Stop web
256
        if hasattr(self, "web") and self.web:
257
            self.web.stop()
258
            print("CLOSE: web controller stopped")
259
260
261
class JSONLoadSave:
262
263
    def __init__(self, controller: Controller):
264
        self.controller = controller
265
266
    @staticmethod
267
    def validate_item(index: int, item) -> Tuple[bool, Union[None, Fixture]]:
268
        if not isinstance(item, dict):
269
            warn("Failed to load item {} from JSON, expected dict, got {}".format(index, type(item)))
270
            return False, None
271
272
        if 'type' not in item:
273
            warn("Failed to load item {} from JSON, expected a type property".format(index))
274
            return False, None
275
276
        pattern = re.compile(r"^(([\w\d.]+)\.)*([\w\d]+)$", re.IGNORECASE)
277
        match = pattern.match(item['type'])
278
        if not match:
279
            warn("Failed to load item {} from JSON, failed to parse type '{}'".format(index, item['type']))
280
            return False, None
281
282
        try:
283
            module = import_module(".{}".format(match.group(2)), name + '.profiles')
284
        except ModuleNotFoundError:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable ModuleNotFoundError does not seem to be defined.
Loading history...
285
            warn("Failed to load item {} from JSON, profile module '{}' not found".format(index, match.group(2)))
286
            return False, None
287
288
        try:
289
            module = getattr(module, match.group(3))
290
        except AttributeError:
291
            warn("Failed to load item {} from JSON, profile type '{}' not found in '{}'".format(
292
                index, match.group(3), match.group(2)))
293
            return False, None
294
295
        return True, module
296
297
    def load_config(self, filename: str) -> List[Fixture]:
298
        # Get data
299
        try:
300
            with open(filename) as f:
301
                data = load(f)
302
        except (FileNotFoundError, OSError):
303
            raise JSONConfigLoadException(filename)
304
        except JSONDecodeError:
305
            raise JSONConfigLoadException(filename, "unable to parse contents")
306
307
        if not isinstance(data, list):
308
            raise JSONConfigLoadException(filename, "expected list of dicts, got {}".format(type(data)))
309
310
        # Parse data
311
        fixtures = []
312
        for index, item in enumerate(data):
313
            # Validate entry
314
            success, module = self.validate_item(index, item)
315
            if not success or not module:
316
                continue
317
318
            # Parse args
319
            del item['type']
320
            args = []
321
            if 'args' in item:
322
                args = item['args']
323
                del item['args']
324
325
            # Create
326
            fixtures.append(self.controller.add_fixture(module, *args, **dict(item)))
327
328
        return fixtures
329
330
    def save_config(self, filename: Union[str, None] = None, pretty_print: bool = True) -> str:
331
        # Generate data
332
        data = []
333
        for fixture in self.controller.get_all_fixtures():
334
            data.append(fixture.json_data)
335
336
        # JSON-ify
337
        if pretty_print:
338
            data = dumps(data, indent=4)
339
        else:
340
            data = dumps(data)
341
342
        # Save
343
        if filename:
344
            with open(filename, "w+") as f:
345
                f.write(data)
346
        return data
347