Passed
Pull Request — master (#54)
by
unknown
07:42 queued 01:22
created

Fixture.set_channel()   A

Complexity

Conditions 2

Size

Total Lines 6
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 5
dl 0
loc 6
rs 10
c 0
b 0
f 0
cc 2
nop 3
1
"""
2
 *  PyDMXControl: A Python 3 module to control DMX using OpenDMX or uDMX.
3
 *                Featuring fixture profiles, built-in effects and a web control panel.
4
 *  <https://github.com/MattIPv4/PyDMXControl/>
5
 *  Copyright (C) 2023 Matt Cowley (MattIPv4) ([email protected])
6
"""
7
8
import re
9
from datetime import datetime
10
from time import sleep, time
11
from typing import Union, List, Tuple, Type
12
from warnings import warn
13
14
from ... import Colors
15
from ...effects.defaults import Effect
16
from ...utils.exceptions import FixtureCreationException, JSONConfigSaveException, ChannelNotFoundException
17
18
19
class Channel:
20
21
    def __init__(self, name: str, parked: Union[bool, int]):
22
        self.name = name
23
        self.__value = 0
24
        self.__timestamp = datetime.utcnow()
25
        self.__parked = parked if parked is False else (0 if parked is True else parked)
26
27
    def __updated(self):
28
        self.__timestamp = datetime.utcnow()
29
30
    def set(self, value: int):
31
        self.__value = value
32
        if self.parked is False:
33
            self.__updated()
34
35
    @property
36
    def value(self) -> Tuple[int, datetime]:
37
        return (self.__value if self.__parked is False else self.__parked), self.__timestamp
38
39
    @property
40
    def value_unparked(self) -> Tuple[int, datetime]:
41
        return self.__value, self.__timestamp
42
43
    @property
44
    def parked(self) -> bool:
45
        return self.__parked is not False
46
47
    def park(self, value: int = 0):
48
        self.__parked = value
49
        self.__updated()
50
51
    def unpark(self):
52
        if self.__parked is not False:
53
            self.__parked = False
54
            self.__updated()
55
56
57
class FixtureHelpers:
58
59
    # Store the callbacks
60
    __callbacks = []
61
62
    def dim(self, target_value: int, milliseconds: int = 0, channel: Union[str, int] = 'dimmer') -> 'Fixture':
63
        # Handle instant edge-case
64
        millis = max(milliseconds, 0)
65
        if millis == 0:
66
            self.set_channel(channel, int(target_value))
67
            return self
68
69
        # Calculate what we need
70
        current = self.get_channel_value(self.get_channel_id(channel))[0]
71
        start = time() * 1000.0
72
        gap = target_value - current
73
74
        # Create the callback for ticker
75
        def callback():
76
            if (time() * 1000.0) - start <= millis:
0 ignored issues
show
introduced by
The variable start does not seem to be defined for all execution paths.
Loading history...
77
                diff = gap * (((time() * 1000.0) - start) / millis)
0 ignored issues
show
introduced by
The variable gap does not seem to be defined for all execution paths.
Loading history...
78
                self.set_channel(channel, int(current + diff))
0 ignored issues
show
introduced by
The variable current does not seem to be defined for all execution paths.
Loading history...
79
            else:
80
                self.set_channel(channel, int(target_value))
81
                self.controller.ticker.remove_callback(callback)
0 ignored issues
show
introduced by
The variable callback does not seem to be defined for all execution paths.
Loading history...
82
                self.__callbacks.remove(callback)
83
84
        # Append to the tracking list
85
        self.__callbacks.append(callback)
86
87
        # Start the callback
88
        self.controller.ticker.add_callback(callback, 0)
89
90
        return self
91
92
    def anim(self, milliseconds: int, *channels_values: Tuple[Union[str, int], int]):
93
        for channel_value in channels_values:
94
            self.dim(channel_value[1], milliseconds, channel_value[0])
95
96
    def color(self, color: Union[Colors, List[int], Tuple[int], str], milliseconds: int = 0):
97
        # Handle string color names
98
        if isinstance(color, str):
99
            if color in Colors.__members__:
100
                color = Colors[color]
101
            else:
102
                raise ValueError("Color '" + color + "' not defined in Colors enum."
103
                                                     " Supply valid Colors enum or List/Tuple of integers.")
104
105
        # Get a tuple
106
        color = [f for f in Colors.to_tuples(color) if self.has_channel(f[0])]
107
108
        # Apply
109
        self.anim(milliseconds, *color)
110
111
    def get_color(self) -> Union[None, List[int]]:
112
        if not self.has_channel('r') or not self.has_channel('g') or not self.has_channel('b'):
113
            return None
114
115
        color = [
116
            self.get_channel_value('r', False)[0],
117
            self.get_channel_value('g', False)[0],
118
            self.get_channel_value('b', False)[0]
119
        ]
120
121
        if self.has_channel('w'):
122
            color.append(self.get_channel_value('w', False)[0])
123
124
            if self.has_channel('a'):
125
                color.append(self.get_channel_value('a', False)[0])
126
127
        return color
128
129
    def on(self):
130
        self.dim(255)
131
132
    def off(self):
133
        self.dim(0)
134
135
    def locate(self):
136
        self.color([255, 255, 255, 255, 255])
137
        self.dim(255)
138
139
    def clear_callbacks(self):
140
        for callback in self.__callbacks:
141
            self.controller.ticker.remove_callback(callback)
142
        self.__callbacks = []
143
            
144
145
146
class Fixture(FixtureHelpers):
147
148
    def __init__(self, *args, **kwargs):
149
        if "start_channel" not in kwargs:
150
            raise TypeError("__init__() missing 1 required keyword-only argument: 'start_channel'")
151
152
        if kwargs["start_channel"] < 1 or kwargs["start_channel"] > 512:
153
            raise ValueError('Start Channel must be between 1 and 512.')
154
155
        if "name" not in kwargs:
156
            kwargs["name"] = ""
157
158
        self.__start_channel = kwargs["start_channel"]
159
        self.__channels = []
160
        self.__effects = []
161
        self.__id = None
162
        self.__controller = None
163
        self.__name = kwargs["name"]
164
        self.__channel_aliases = {}
165
        self.__kwargs = kwargs
166
        self.__args = args
167
168
    def __str__(self):
169
        return self.title
170
171
    # Internal
172
173
    def _register_channel(self, name: str, *, parked: Union[bool, int] = False) -> int:
174
        if self.__start_channel + len(self.__channels) > 512:
175
            raise FixtureCreationException(self, 'Not enough space in universe for channel `{}`.'.format(name))
176
177
        used_names = [f.name for f in self.__channels]
178
        used_names.extend([f for f in self.__channel_aliases])
179
        if name.lower().strip() in used_names:
180
            raise FixtureCreationException(self, 'Name `{}` already in use for channel (or alias).'.format(name))
181
182
        self.__channels.append(Channel(name.lower().strip(), parked))
183
        return len(self.__channels) - 1
184
185
    def _register_channel_aliases(self, channel: str, *aliases: str) -> bool:
186
        if not aliases:
187
            return False
188
189
        channel = channel.lower().strip()
190
191
        used_names = [f.name for f in self.__channels]
192
        if channel not in used_names:
193
            warn('Channel name `{}` is not registered.'.format(channel))
194
            return False
195
196
        for alias in aliases:
197
            if alias in self.__channel_aliases.keys():
198
                warn('Channel alias `{}` already in use for channel `{}`.'.format(alias, self.__channel_aliases[alias]))
199
                continue
200
            self.__channel_aliases[alias] = channel
201
        return True
202
203
    def set_id(self, fixture_id: int):
204
        # Only ever set once
205
        if self.__id is None:
206
            self.__id = fixture_id
207
208
    def set_controller(self, controller: 'Controller'):
209
        # Only ever set once
210
        if self.__controller is None:
211
            self.__controller = controller
212
213
    def _set_name(self, name: str):
214
        self.__name = name
215
216
    # Properties
217
218
    def _valid_channel_value(self, value: int, channel: Union[str, int]) -> bool:
219
        if value < 0 or value > 255:
220
            if self.__controller.dmx_value_warnings:
221
                warn('{} DMX value must be between 0 and 255. Received value {} for channel {}'.format(
222
                    self.title, value, channel))
223
            return False
224
        return True
225
226
    @property
227
    def id(self) -> int:
228
        return self.__id if self.__id is not None else 0
229
230
    @property
231
    def name(self) -> str:
232
        return self.__name
233
234
    @property
235
    def start_channel(self) -> int:
236
        return self.__start_channel
237
238
    @property
239
    def next_channel(self) -> int:
240
        return len(self.__channels) + 1
241
242
    @property
243
    def parked(self) -> bool:
244
        for chan in self.__channels:
245
            if chan.parked:
246
                return True
247
        return False
248
249
    @property
250
    def controller(self) -> 'Controller':
251
        return self.__controller
252
253
    @property
254
    def channels(self) -> dict:
255
        channels = {}
256
        for i, chan in enumerate(self.__channels):
257
            channels[self.start_channel + i] = {'name': chan.name, 'value': self.get_channel_value(i)}
258
        return channels
259
260
    @property
261
    def channel_usage(self) -> str:
262
        return "{}->{} ({})".format(
263
            self.start_channel,
264
            (self.start_channel + len(self.__channels) - 1),
265
            len(self.__channels)
266
        )
267
268
    @property
269
    def title(self) -> str:
270
        return "Fixture #{} {} of type {} using channels {}.".format(
271
            self.id,
272
            "('{}')".format(self.name) if self.name else "",
273
            self.__class__.__name__,
274
            self.channel_usage
275
        )
276
277
    @property
278
    def json_data(self) -> dict:
279
        pattern = re.compile(r"^PyDMXControl\.profiles\.(([\w\d.]+)\.)*_[\w\d]+$", re.IGNORECASE)
280
        match = pattern.match(self.__class__.__module__)
281
        if not match:
282
            raise JSONConfigSaveException(self.id)
283
        base = {
284
            "type": "{}.{}".format(match.group(2), self.__class__.__name__),
285
            "args": self.__args
286
        }
287
        for kwarg, val in self.__kwargs.items():
288
            if kwarg not in base:
289
                if kwarg == "name":
290
                    val = self.name
291
                base[kwarg] = val
292
        return base
293
294
    # Channels
295
296
    def get_channel_id(self, channel: Union[str, int]) -> int:
297
        channel = str(channel)
298
299
        if channel.isdigit():
300
            channel_int = int(channel)
301
            if channel_int < len(self.__channels):
302
                return channel_int
303
304
        channel = channel.lower().strip()
305
        if channel in self.__channel_aliases.keys():
306
            channel = self.__channel_aliases[channel]
307
308
        for i, chan in enumerate(self.__channels):
309
            if chan.name == channel:
310
                return i
311
312
        raise ChannelNotFoundException(channel, self.id)
313
314
    def has_channel(self, channel: Union[str, int]) -> bool:
315
        try:
316
            self.get_channel_id(channel)
317
            return True
318
        except ChannelNotFoundException:
319
            return False
320
321
    def get_channel_value(self, channel: Union[str, int], apply_parking: bool = True) -> Tuple[int, datetime]:
322
        if not apply_parking:
323
            return self.__channels[self.get_channel_id(channel)].value_unparked
324
        return self.__channels[self.get_channel_id(channel)].value
325
326
    def set_channel(self, channel: Union[str, int], value: int) -> 'Fixture':
327
        if not self._valid_channel_value(value, channel):
328
            return self
329
330
        self.__channels[self.get_channel_id(channel)].set(value)
331
        return self
332
333
    def set_channels(self, *args: Union[int, List[int], None], **kwargs) -> 'Fixture':
334
        channel = 0
335
        if 'start' in kwargs and str(kwargs['start']).isdigit() and int(kwargs['start']) > 0:
336
            channel = int(kwargs['start'])
337
338
        def apply_values(fixture, values, chan=1):
339
            for value in values:
340
                if value is not None:
341
                    if isinstance(value, list):
342
                        chan = apply_values(fixture, value, chan)
343
                    elif str(value).isdigit():
344
                        fixture.set_channel(chan, int(value))
345
                        chan += 1
346
            return chan - 1
347
348
        apply_values(self, args, channel)
349
350
        return self
351
352
    # Parking
353
354
    def park(self) -> 'Fixture':
355
        for chan in self.__channels:
356
            chan.park(chan.value[0])
357
358
        return self
359
360
    def unpark(self) -> 'Fixture':
361
        for chan in self.__channels:
362
            chan.unpark()
363
364
        return self
365
366
    # Effects
367
368
    def add_effect(self, effect: Type[Effect], speed: float, *args, **kwargs) -> 'Fixture':
369
        # Instantiate
370
        effect = effect(self, speed, *args, **kwargs)
371
        # Start
372
        effect.start()
373
        # Save
374
        self.__effects.append(effect)
375
376
        return self
377
378
    def get_effect_by_effect(self, effect: Type[Effect]) -> List[Effect]:
379
        matches = []
380
381
        # Iterate over each effect
382
        for this_effect in self.__effects:
383
            # If it matches the given effect
384
            if isinstance(this_effect, effect):
385
                # Store
386
                matches.append(this_effect)
387
388
        # Return any matches
389
        return matches
390
391
    def remove_effect(self, effect: Effect) -> 'Fixture':
392
        if effect in self.__effects:
393
            effect.stop()
394
            self.__effects.remove(effect)
395
396
        return self
397
398
    def clear_effects(self) -> 'Fixture':
399
        # Stop
400
        for effect in self.__effects:
401
            effect.stop()
402
        # Clear
403
        self.__effects = []
404
405
        return self
406