Passed
Push — all-web-control ( c78a90...de24a5 )
by Matt
03:02
created

Fixture.set_controller()   A

Complexity

Conditions 2

Size

Total Lines 4
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 3
dl 0
loc 4
rs 10
c 0
b 0
f 0
cc 2
nop 2
1
"""
2
 *  PyDMXControl: A Python 3 module to control DMX using uDMX.
3
 *                Featuring fixture profiles, built-in effects and a web control panel.
4
 *  <https://github.com/MattIPv4/PyDMXControl/>
5
 *  Copyright (C) 2021 Matt Cowley (MattIPv4) ([email protected])
6
"""
7
8
import re
9
from datetime import datetime
10
from time import sleep, time
11
from typing import Union, List, Tuple, Type
12
from warnings import warn
13
14
from ... import Colors
15
from ...effects.defaults import Effect
16
from ...utils.exceptions import FixtureCreationException, JSONConfigSaveException
17
18
19
class Channel:
20
21
    def __init__(self, name: str, parked: Union[bool, int]):
22
        self.name = name
23
        self.value = 0
24
        self.updated = datetime.utcnow()
25
        self.parked = parked if parked is False else (0 if parked is True else parked)
26
27
    def __updated(self):
28
        self.updated = datetime.utcnow()
29
30
    def set_value(self, value: int):
31
        if self.parked is False:
32
            self.value = value
33
            self.__updated()
34
35
    def get_value(self) -> Tuple[int, datetime]:
36
        return (self.value if self.parked is False else self.parked), self.updated
37
38
    def park(self, value: int = 0):
39
        self.parked = value
40
        self.__updated()
41
42
    def unpark(self):
43
        self.parked = False
44
        self.__updated()
45
46
47
class FixtureHelpers:
48
49
    def dim(self, target_value: int, milliseconds: int = 0, channel: Union[str, int] = 'dimmer') -> 'Fixture':
50
        # Handle instant edge-case
51
        millis = max(milliseconds, 0)
52
        if millis == 0:
53
            self.set_channel(channel, int(target_value))
54
            return self
55
56
        # Calculate what we need
57
        current = self.get_channel_value(self.get_channel_id(channel))[0]
58
        start = time() * 1000.0
59
        gap = target_value - current
60
61
        # Create the callback for ticker
62
        def callback():
63
            if (time() * 1000.0) - start <= millis:
0 ignored issues
show
introduced by
The variable start does not seem to be defined for all execution paths.
Loading history...
64
                diff = gap * (((time() * 1000.0) - start) / millis)
0 ignored issues
show
introduced by
The variable gap does not seem to be defined for all execution paths.
Loading history...
65
                self.set_channel(channel, int(current + diff))
0 ignored issues
show
introduced by
The variable current does not seem to be defined for all execution paths.
Loading history...
66
            else:
67
                self.set_channel(channel, int(target_value))
68
                self.controller.ticker.remove_callback(callback)
0 ignored issues
show
introduced by
The variable callback does not seem to be defined for all execution paths.
Loading history...
69
70
        # Start the callback
71
        self.controller.ticker.add_callback(callback, 0)
72
73
        return self
74
75
    def anim(self, milliseconds: int, *channels_values: Tuple[Union[str, int], int]):
76
        for channel_value in channels_values:
77
            self.dim(channel_value[1], milliseconds, channel_value[0])
78
79
    def color(self, color: Union[Colors, List[int], Tuple[int], str], milliseconds: int = 0):
80
        # Handle string color names
81
        if isinstance(color, str):
82
            if color in Colors.__members__:
83
                color = Colors[color]
84
            else:
85
                raise ValueError("Color '" + color + "' not defined in Colors enum."
86
                                                     " Supply valid Colors enum or List/Tuple of integers.")
87
88
        # Get a tuple
89
        color = Colors.to_tuples(color)
90
91
        # Apply
92
        self.anim(milliseconds, *color)
93
94
    def get_color(self) -> Union[None, List[int]]:
95
        red = self.get_channel_value(self.get_channel_id("r"))
96
        green = self.get_channel_value(self.get_channel_id("g"))
97
        blue = self.get_channel_value(self.get_channel_id("b"))
98
        white = self.get_channel_value(self.get_channel_id("w"))
99
        amber = self.get_channel_value(self.get_channel_id("a"))
100
        if red[0] == -1 or green[0] == -1 or blue[0] == -1:
101
            return None
102
        color = [red[0], green[0], blue[0]]
103
        if white[0] != -1:
104
            color.append(white[0])
105
            if amber[0] != -1:
106
                color.append(amber[0])
107
        return color
108
109
    def on(self):
110
        self.dim(255)
111
112
    def off(self):
113
        self.dim(0)
114
115
    def locate(self):
116
        self.color([255, 255, 255, 255, 255])
117
        self.dim(255)
118
119
120
class Fixture(FixtureHelpers):
121
122
    def __init__(self, *args, **kwargs):
123
        if "start_channel" not in kwargs:
124
            raise TypeError("__init__() missing 1 required keyword-only argument: 'start_channel'")
125
126
        if kwargs["start_channel"] < 1 or kwargs["start_channel"] > 512:
127
            raise ValueError('Start Channel must be between 1 and 512.')
128
129
        if "name" not in kwargs:
130
            kwargs["name"] = ""
131
132
        self.__start_channel = kwargs["start_channel"]
133
        self.__channels = []
134
        self.__effects = []
135
        self.__id = None
136
        self.__controller = None
137
        self.__name = kwargs["name"]
138
        self.__channel_aliases = {}
139
        self.__kwargs = kwargs
140
        self.__args = args
141
142
    def __str__(self):
143
        return self.title
144
145
    # Internal
146
147
    def _register_channel(self, name: str, *, parked: Union[bool, int] = False) -> int:
148
        if self.__start_channel + len(self.__channels) > 512:
149
            raise FixtureCreationException(self, 'Not enough space in universe for channel `{}`.'.format(name))
150
151
        used_names = [f.name for f in self.__channels]
152
        used_names.extend([f for f in self.__channel_aliases])
153
        if name.lower().strip() in used_names:
154
            raise FixtureCreationException(self, 'Name `{}` already in use for channel (or alias).'.format(name))
155
156
        self.__channels.append(Channel(name.lower().strip(), parked))
157
        return len(self.__channels) - 1
158
159
    def _register_channel_aliases(self, channel: str, *aliases: str) -> bool:
160
        if not aliases:
161
            return False
162
163
        channel = channel.lower().strip()
164
165
        used_names = [f.name for f in self.__channels]
166
        if channel not in used_names:
167
            warn('Channel name `{}` is not registered.'.format(channel))
168
            return False
169
170
        for alias in aliases:
171
            if alias in self.__channel_aliases.keys():
172
                warn('Channel alias `{}` already in use for channel `{}`.'.format(alias, self.__channel_aliases[alias]))
173
                continue
174
            self.__channel_aliases[alias] = channel
175
        return True
176
177
    def set_id(self, fixture_id: int):
178
        # Only ever set once
179
        if self.__id is None:
180
            self.__id = fixture_id
181
182
    def set_controller(self, controller: 'Controller'):
183
        # Only ever set once
184
        if self.__controller is None:
185
            self.__controller = controller
186
187
    def _set_name(self, name: str):
188
        self.__name = name
189
190
    # Properties
191
192
    def _valid_channel_value(self, value: int, channel: Union[str, int]) -> bool:
193
        if value < 0 or value > 255:
194
            if self.__controller.dmx_value_warnings:
195
                warn('{} DMX value must be between 0 and 255. Received value {} for channel {}'.format(
196
                    self.title, value, channel))
197
            return False
198
        return True
199
200
    @property
201
    def id(self) -> int:
202
        return self.__id if self.__id is not None else 0
203
204
    @property
205
    def name(self) -> str:
206
        return self.__name
207
208
    @property
209
    def start_channel(self) -> int:
210
        return self.__start_channel
211
212
    @property
213
    def next_channel(self) -> int:
214
        return len(self.__channels) + 1
215
216
    @property
217
    def controller(self) -> 'Controller':
218
        return self.__controller
219
220
    @property
221
    def channels(self) -> dict:
222
        channels = {}
223
        for i, chan in enumerate(self.__channels):
224
            channels[self.start_channel + i] = {'name': chan.name, 'value': self.get_channel_value(i)}
225
        return channels
226
227
    @property
228
    def channel_usage(self) -> str:
229
        return "{}->{} ({})".format(
230
            self.start_channel,
231
            (self.start_channel + len(self.__channels) - 1),
232
            len(self.__channels)
233
        )
234
235
    @property
236
    def title(self) -> str:
237
        return "Fixture #{} {} of type {} using channels {}.".format(
238
            self.id,
239
            "('{}')".format(self.name) if self.name else "",
240
            self.__class__.__name__,
241
            self.channel_usage
242
        )
243
244
    @property
245
    def json_data(self) -> dict:
246
        pattern = re.compile(r"^PyDMXControl\.profiles\.(([\w\d.]+)\.)*_[\w\d]+$", re.IGNORECASE)
247
        match = pattern.match(self.__class__.__module__)
248
        if not match:
249
            raise JSONConfigSaveException("Failed to generate JSON data for fixture #{}".format(self.id))
250
        base = {
251
            "type": "{}.{}".format(match.group(2), self.__class__.__name__),
252
            "args": self.__args
253
        }
254
        for kwarg, val in self.__kwargs.items():
255
            if kwarg not in base:
256
                if kwarg == "name":
257
                    val = self.name
258
                base[kwarg] = val
259
        return base
260
261
    # Channels
262
263
    def get_channel_id(self, channel: Union[str, int]) -> int:
264
        channel = str(channel)
265
266
        if channel.isdigit():
267
            channel = int(channel)
268
            if channel < len(self.__channels):
269
                return channel
270
271
        channel = str(channel).lower().strip()
272
        if channel in self.__channel_aliases.keys():
273
            channel = self.__channel_aliases[channel]
274
275
        for i, chan in enumerate(self.__channels):
276
            if chan.name == channel:
277
                return i
278
279
        return -1
280
281
    def get_channel_value(self, channel: int) -> Tuple[int, datetime]:
282
        channel = self.get_channel_id(channel)
283
        if channel >= len(self.__channels) or channel < 0:
284
            return -1, datetime.utcnow()
285
        return self.__channels[channel].get_value()
286
287
    def set_channel(self, channel: [str, int], value: int) -> 'Fixture':
288
        if not self._valid_channel_value(value, channel):
289
            return self
290
291
        channel = self.get_channel_id(channel)
292
        if channel == -1:
293
            return self
294
295
        self.__channels[channel].set_value(value)
296
        return self
297
298
    def set_channels(self, *args: Union[int, List[int], None], **kwargs) -> 'Fixture':
299
        channel = 0
300
        if 'start' in kwargs and str(kwargs['start']).isdigit() and int(kwargs['start']) > 0:
301
            channel = int(kwargs['start'])
302
303
        def apply_values(fixture, values, chan=1):
304
            for value in values:
305
                if value is not None:
306
                    if isinstance(value, list):
307
                        chan = apply_values(fixture, value, chan)
308
                    elif str(value).isdigit():
309
                        fixture.set_channel(chan, int(value))
310
                        chan += 1
311
            return chan - 1
312
313
        apply_values(self, args, channel)
314
315
        return self
316
317
    # Effects
318
319
    def add_effect(self, effect: Type[Effect], speed: float, *args, **kwargs) -> 'Fixture':
320
        # Instantiate
321
        effect = effect(self, speed, *args, **kwargs)
322
        # Start
323
        effect.start()
324
        # Save
325
        self.__effects.append(effect)
326
327
        return self
328
329
    def get_effect_by_effect(self, effect: Type[Effect]) -> List[Effect]:
330
        matches = []
331
332
        # Iterate over each effect
333
        for this_effect in self.__effects:
334
            # If it matches the given effect
335
            if isinstance(this_effect, effect):
336
                # Store
337
                matches.append(this_effect)
338
339
        # Return any matches
340
        return matches
341
342
    def remove_effect(self, effect: Effect) -> 'Fixture':
343
        if effect in self.__effects:
344
            effect.stop()
345
            self.__effects.remove(effect)
346
347
        return self
348
349
    def clear_effects(self) -> 'Fixture':
350
        # Stop
351
        for effect in self.__effects:
352
            effect.stop()
353
        # Clear
354
        self.__effects = []
355
356
        return self
357