Passed
Push — master ( bc3893...622e69 )
by Matt
02:55
created

PyDMXControl.profiles.defaults._Fixture   F

Complexity

Total Complexity 93

Size/Duplication

Total Lines 353
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 93
eloc 243
dl 0
loc 353
rs 2
c 0
b 0
f 0

38 Methods

Rating   Name   Duplication   Size   Complexity  
A Channel.unpark() 0 3 1
A Channel.__init__() 0 5 3
A Channel.park() 0 3 1
A Channel.__updated() 0 2 1
A Channel.get_value() 0 2 2
A Channel.set_value() 0 4 2
A Fixture.title() 0 7 2
A Fixture.start_channel() 0 3 1
A Fixture._register_channel_aliases() 0 17 5
A FixtureHelpers.locate() 0 3 1
A Fixture.channel_usage() 0 6 1
A Fixture.controller() 0 3 1
A FixtureHelpers.dim() 0 21 2
A FixtureHelpers.off() 0 2 1
A Fixture.clear_effects() 0 8 2
A Fixture.remove_effect() 0 6 2
A Fixture._valid_channel_value() 0 7 4
A Fixture._register_channel() 0 11 3
A Fixture.get_channel_value() 0 5 3
A Fixture.__init__() 0 19 5
A Fixture.id() 0 3 2
A Fixture.__str__() 0 2 1
A Fixture.set_controller() 0 4 2
B Fixture.set_channels() 0 18 8
A FixtureHelpers.color() 0 14 3
A Fixture.channels() 0 6 2
A FixtureHelpers.on() 0 2 1
B FixtureHelpers.get_color() 0 14 6
A Fixture._set_name() 0 2 1
A Fixture.set_id() 0 4 2
B Fixture.get_channel_id() 0 17 6
A Fixture.get_effect_by_effect() 0 12 3
A Fixture.next_channel() 0 3 1
A Fixture.set_channel() 0 10 3
A Fixture.name() 0 3 1
A Fixture.add_effect() 0 9 1
A FixtureHelpers.anim() 0 3 2
A Fixture.json_data() 0 16 5

How to fix   Complexity   

Complexity

Complex classes like PyDMXControl.profiles.defaults._Fixture often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
 *  PyDMXControl: A Python 3 module to control DMX using uDMX.
3
 *                Featuring fixture profiles, built-in effects and a web control panel.
4
 *  <https://github.com/MattIPv4/PyDMXControl/>
5
 *  Copyright (C) 2021 Matt Cowley (MattIPv4) ([email protected])
6
"""
7
8
import re
9
from datetime import datetime
10
from time import sleep, time
11
from typing import Union, List, Tuple, Type
12
from warnings import warn
13
14
from ... import Colors
15
from ...effects.defaults import Effect
16
from ...utils.exceptions import FixtureCreationException, JSONConfigSaveException
17
18
19
class Channel:
20
21
    def __init__(self, name: str, parked: Union[bool, int]):
22
        self.name = name
23
        self.value = 0
24
        self.updated = datetime.utcnow()
25
        self.parked = parked if parked is False else (0 if parked is True else parked)
26
27
    def __updated(self):
28
        self.updated = datetime.utcnow()
29
30
    def set_value(self, value: int):
31
        if self.parked is False:
32
            self.value = value
33
            self.__updated()
34
35
    def get_value(self) -> Tuple[int, datetime]:
36
        return (self.value if self.parked is False else self.parked), self.updated
37
38
    def park(self, value: int = 0):
39
        self.parked = value
40
        self.__updated()
41
42
    def unpark(self):
43
        self.parked = False
44
        self.__updated()
45
46
47
class FixtureHelpers:
48
49
    def dim(self, target_value: int, milliseconds: int = 0, channel: Union[str, int] = 'dimmer') -> 'Fixture':
50
51
        # Calculate what we need
52
        current = self.get_channel_value(self.get_channel_id(channel))[0]
53
        start = time() * 1000.0
54
        gap = target_value - current
55
        millis = max(milliseconds, 0)
56
57
        # Create the callback for ticker
58
        def callback():
59
            if (time() * 1000.0) - start <= millis:
60
                diff = gap * (((time() * 1000.0) - start) / millis)
61
                self.set_channel(channel, int(current + diff))
62
            else:
63
                self.set_channel(channel, int(target_value))
64
                self.controller.ticker.remove_callback(callback)
65
66
        # Start the callback
67
        self.controller.ticker.add_callback(callback, 0)
68
69
        return self
70
71
    def anim(self, milliseconds: int, *channels_values: Tuple[Union[str, int], int]):
72
        for channel_value in channels_values:
73
            self.dim(channel_value[1], milliseconds, channel_value[0])
74
75
    def color(self, color: Union[Colors, List[int], Tuple[int], str], milliseconds: int = 0):
76
        # Handle string color names
77
        if isinstance(color, str):
78
            if color in Colors.__members__:
79
                color = Colors[color]
80
            else:
81
                raise ValueError("Color '" + color + "' not defined in Colors enum."
82
                                                     " Supply valid Colors enum or List/Tuple of integers.")
83
84
        # Get a tuple
85
        color = Colors.to_tuples(color)
86
87
        # Apply
88
        self.anim(milliseconds, *color)
89
90
    def get_color(self) -> Union[None, List[int]]:
91
        red = self.get_channel_value(self.get_channel_id("r"))
92
        green = self.get_channel_value(self.get_channel_id("g"))
93
        blue = self.get_channel_value(self.get_channel_id("b"))
94
        white = self.get_channel_value(self.get_channel_id("w"))
95
        amber = self.get_channel_value(self.get_channel_id("a"))
96
        if red[0] == -1 or green[0] == -1 or blue[0] == -1:
97
            return None
98
        color = [red[0], green[0], blue[0]]
99
        if white[0] != -1:
100
            color.append(white[0])
101
            if amber[0] != -1:
102
                color.append(amber[0])
103
        return color
104
105
    def on(self):
106
        self.dim(255)
107
108
    def off(self):
109
        self.dim(0)
110
111
    def locate(self):
112
        self.color([255, 255, 255, 255, 255])
113
        self.dim(255)
114
115
116
class Fixture(FixtureHelpers):
117
118
    def __init__(self, *args, **kwargs):
119
        if "start_channel" not in kwargs:
120
            raise TypeError("__init__() missing 1 required keyword-only argument: 'start_channel'")
121
122
        if kwargs["start_channel"] < 1 or kwargs["start_channel"] > 512:
123
            raise ValueError('Start Channel must be between 1 and 512.')
124
125
        if "name" not in kwargs:
126
            kwargs["name"] = ""
127
128
        self.__start_channel = kwargs["start_channel"]
129
        self.__channels = []
130
        self.__effects = []
131
        self.__id = None
132
        self.__controller = None
133
        self.__name = kwargs["name"]
134
        self.__channel_aliases = {}
135
        self.__kwargs = kwargs
136
        self.__args = args
137
138
    def __str__(self):
139
        return self.title
140
141
    # Internal
142
143
    def _register_channel(self, name: str, *, parked: Union[bool, int] = False) -> int:
144
        if self.__start_channel + len(self.__channels) > 512:
145
            raise FixtureCreationException(self, 'Not enough space in universe for channel `{}`.'.format(name))
146
147
        used_names = [f.name for f in self.__channels]
148
        used_names.extend([f for f in self.__channel_aliases])
149
        if name.lower().strip() in used_names:
150
            raise FixtureCreationException(self, 'Name `{}` already in use for channel (or alias).'.format(name))
151
152
        self.__channels.append(Channel(name.lower().strip(), parked))
153
        return len(self.__channels) - 1
154
155
    def _register_channel_aliases(self, channel: str, *aliases: str) -> bool:
156
        if not aliases:
157
            return False
158
159
        channel = channel.lower().strip()
160
161
        used_names = [f.name for f in self.__channels]
162
        if channel not in used_names:
163
            warn('Channel name `{}` is not registered.'.format(channel))
164
            return False
165
166
        for alias in aliases:
167
            if alias in self.__channel_aliases.keys():
168
                warn('Channel alias `{}` already in use for channel `{}`.'.format(alias, self.__channel_aliases[alias]))
169
                continue
170
            self.__channel_aliases[alias] = channel
171
        return True
172
173
    def set_id(self, fixture_id: int):
174
        # Only ever set once
175
        if self.__id is None:
176
            self.__id = fixture_id
177
178
    def set_controller(self, controller: 'Controller'):
179
        # Only ever set once
180
        if self.__controller is None:
181
            self.__controller = controller
182
183
    def _set_name(self, name: str):
184
        self.__name = name
185
186
    # Properties
187
188
    def _valid_channel_value(self, value: int, channel: Union[str, int]) -> bool:
189
        if value < 0 or value > 255:
190
            if self.__controller.dmx_value_warnings:
191
                warn('{} DMX value must be between 0 and 255. Received value {} for channel {}'.format(
192
                    self.title, value, channel))
193
            return False
194
        return True
195
196
    @property
197
    def id(self) -> int:
198
        return self.__id if self.__id is not None else 0
199
200
    @property
201
    def name(self) -> str:
202
        return self.__name
203
204
    @property
205
    def start_channel(self) -> int:
206
        return self.__start_channel
207
208
    @property
209
    def next_channel(self) -> int:
210
        return len(self.__channels) + 1
211
212
    @property
213
    def controller(self) -> 'Controller':
214
        return self.__controller
215
216
    @property
217
    def channels(self) -> dict:
218
        channels = {}
219
        for i, chan in enumerate(self.__channels):
220
            channels[self.start_channel + i] = {'name': chan.name, 'value': self.get_channel_value(i)}
221
        return channels
222
223
    @property
224
    def channel_usage(self) -> str:
225
        return "{}->{} ({})".format(
226
            self.start_channel,
227
            (self.start_channel + len(self.__channels) - 1),
228
            len(self.__channels)
229
        )
230
231
    @property
232
    def title(self) -> str:
233
        return "Fixture #{} {} of type {} using channels {}.".format(
234
            self.id,
235
            "('{}')".format(self.name) if self.name else "",
236
            self.__class__.__name__,
237
            self.channel_usage
238
        )
239
240
    @property
241
    def json_data(self) -> dict:
242
        pattern = re.compile(r"^PyDMXControl\.profiles\.(([\w\d.]+)\.)*_[\w\d]+$", re.IGNORECASE)
243
        match = pattern.match(self.__class__.__module__)
244
        if not match:
245
            raise JSONConfigSaveException("Failed to generate JSON data for fixture #{}".format(self.id))
246
        base = {
247
            "type": "{}.{}".format(match.group(2), self.__class__.__name__),
248
            "args": self.__args
249
        }
250
        for kwarg, val in self.__kwargs.items():
251
            if kwarg not in base:
252
                if kwarg == "name":
253
                    val = self.name
254
                base[kwarg] = val
255
        return base
256
257
    # Channels
258
259
    def get_channel_id(self, channel: Union[str, int]) -> int:
260
        channel = str(channel)
261
262
        if channel.isdigit():
263
            channel = int(channel)
264
            if channel < len(self.__channels):
265
                return channel
266
267
        channel = str(channel).lower().strip()
268
        if channel in self.__channel_aliases.keys():
269
            channel = self.__channel_aliases[channel]
270
271
        for i, chan in enumerate(self.__channels):
272
            if chan.name == channel:
273
                return i
274
275
        return -1
276
277
    def get_channel_value(self, channel: int) -> Tuple[int, datetime]:
278
        channel = self.get_channel_id(channel)
279
        if channel >= len(self.__channels) or channel < 0:
280
            return -1, datetime.utcnow()
281
        return self.__channels[channel].get_value()
282
283
    def set_channel(self, channel: [str, int], value: int) -> 'Fixture':
284
        if not self._valid_channel_value(value, channel):
285
            return self
286
287
        channel = self.get_channel_id(channel)
288
        if channel == -1:
289
            return self
290
291
        self.__channels[channel].set_value(value)
292
        return self
293
294
    def set_channels(self, *args: Union[int, List[int], None], **kwargs) -> 'Fixture':
295
        channel = 0
296
        if 'start' in kwargs and str(kwargs['start']).isdigit() and int(kwargs['start']) > 0:
297
            channel = int(kwargs['start'])
298
299
        def apply_values(fixture, values, chan=1):
300
            for value in values:
301
                if value is not None:
302
                    if isinstance(value, list):
303
                        chan = apply_values(fixture, value, chan)
304
                    elif str(value).isdigit():
305
                        fixture.set_channel(chan, int(value))
306
                        chan += 1
307
            return chan - 1
308
309
        apply_values(self, args, channel)
310
311
        return self
312
313
    # Effects
314
315
    def add_effect(self, effect: Type[Effect], speed: float, *args, **kwargs) -> 'Fixture':
316
        # Instantiate
317
        effect = effect(self, speed, *args, **kwargs)
318
        # Start
319
        effect.start()
320
        # Save
321
        self.__effects.append(effect)
322
323
        return self
324
325
    def get_effect_by_effect(self, effect: Type[Effect]) -> List[Effect]:
326
        matches = []
327
328
        # Iterate over each effect
329
        for this_effect in self.__effects:
330
            # If it matches the given effect
331
            if isinstance(this_effect, effect):
332
                # Store
333
                matches.append(this_effect)
334
335
        # Return any matches
336
        return matches
337
338
    def remove_effect(self, effect: Effect) -> 'Fixture':
339
        if effect in self.__effects:
340
            effect.stop()
341
            self.__effects.remove(effect)
342
343
        return self
344
345
    def clear_effects(self) -> 'Fixture':
346
        # Stop
347
        for effect in self.__effects:
348
            effect.stop()
349
        # Clear
350
        self.__effects = []
351
352
        return self
353