Passed
Push — master ( 9b4b57...27d41b )
by Matt
01:43
created

Channel.value()   A

Complexity

Conditions 2

Size

Total Lines 3
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 3
dl 0
loc 3
rs 10
c 0
b 0
f 0
cc 2
nop 1
1
"""
2
 *  PyDMXControl: A Python 3 module to control DMX using OpenDMX or uDMX.
3
 *                Featuring fixture profiles, built-in effects and a web control panel.
4
 *  <https://github.com/MattIPv4/PyDMXControl/>
5
 *  Copyright (C) 2023 Matt Cowley (MattIPv4) ([email protected])
6
"""
7
8
import re
9
from datetime import datetime
10
from time import sleep, time
11
from typing import Union, List, Tuple, Type
12
from warnings import warn
13
14
from ... import Colors
15
from ...effects.defaults import Effect
16
from ...utils.exceptions import FixtureCreationException, JSONConfigSaveException, ChannelNotFoundException
17
18
19
class Channel:
20
21
    def __init__(self, name: str, parked: Union[bool, int]):
22
        self.name = name
23
        self.__value = 0
24
        self.__timestamp = datetime.utcnow()
25
        self.__parked = parked if parked is False else (0 if parked is True else parked)
26
27
    def __updated(self):
28
        self.__timestamp = datetime.utcnow()
29
30
    def set(self, value: int):
31
        self.__value = value
32
        if self.parked is False:
33
            self.__updated()
34
35
    @property
36
    def value(self) -> Tuple[int, datetime]:
37
        return (self.__value if self.__parked is False else self.__parked), self.__timestamp
38
39
    @property
40
    def value_unparked(self) -> Tuple[int, datetime]:
41
        return self.__value, self.__timestamp
42
43
    @property
44
    def parked(self) -> bool:
45
        return self.__parked is not False
46
47
    def park(self, value: int = 0):
48
        self.__parked = value
49
        self.__updated()
50
51
    def unpark(self):
52
        if self.__parked is not False:
53
            self.__parked = False
54
            self.__updated()
55
56
57
class FixtureHelpers:
58
59
    def dim(self, target_value: int, milliseconds: int = 0, channel: Union[str, int] = 'dimmer') -> 'Fixture':
60
        # Handle instant edge-case
61
        millis = max(milliseconds, 0)
62
        if millis == 0:
63
            self.set_channel(channel, int(target_value))
64
            return self
65
66
        # Calculate what we need
67
        current = self.get_channel_value(self.get_channel_id(channel))[0]
68
        start = time() * 1000.0
69
        gap = target_value - current
70
71
        # Create the callback for ticker
72
        def callback():
73
            if (time() * 1000.0) - start <= millis:
0 ignored issues
show
introduced by
The variable start does not seem to be defined for all execution paths.
Loading history...
74
                diff = gap * (((time() * 1000.0) - start) / millis)
0 ignored issues
show
introduced by
The variable gap does not seem to be defined for all execution paths.
Loading history...
75
                self.set_channel(channel, int(current + diff))
0 ignored issues
show
introduced by
The variable current does not seem to be defined for all execution paths.
Loading history...
76
            else:
77
                self.set_channel(channel, int(target_value))
78
                self.controller.ticker.remove_callback(callback)
0 ignored issues
show
introduced by
The variable callback does not seem to be defined for all execution paths.
Loading history...
79
80
        # Start the callback
81
        self.controller.ticker.add_callback(callback, 0)
82
83
        return self
84
85
    def anim(self, milliseconds: int, *channels_values: Tuple[Union[str, int], int]):
86
        for channel_value in channels_values:
87
            self.dim(channel_value[1], milliseconds, channel_value[0])
88
89
    def color(self, color: Union[Colors, List[int], Tuple[int], str], milliseconds: int = 0):
90
        # Handle string color names
91
        if isinstance(color, str):
92
            if color in Colors.__members__:
93
                color = Colors[color]
94
            else:
95
                raise ValueError("Color '" + color + "' not defined in Colors enum."
96
                                                     " Supply valid Colors enum or List/Tuple of integers.")
97
98
        # Get a tuple
99
        color = [f for f in Colors.to_tuples(color) if self.has_channel(f[0])]
100
101
        # Apply
102
        self.anim(milliseconds, *color)
103
104
    def get_color(self) -> Union[None, List[int]]:
105
        if not self.has_channel('r') or not self.has_channel('g') or not self.has_channel('b'):
106
            return None
107
108
        color = [
109
            self.get_channel_value('r', False)[0],
110
            self.get_channel_value('g', False)[0],
111
            self.get_channel_value('b', False)[0]
112
        ]
113
114
        if self.has_channel('w'):
115
            color.append(self.get_channel_value('w', False)[0])
116
117
            if self.has_channel('a'):
118
                color.append(self.get_channel_value('a', False)[0])
119
120
        return color
121
122
    def on(self):
123
        self.dim(255)
124
125
    def off(self):
126
        self.dim(0)
127
128
    def locate(self):
129
        self.color([255, 255, 255, 255, 255])
130
        self.dim(255)
131
132
133
class Fixture(FixtureHelpers):
134
135
    def __init__(self, *args, **kwargs):
136
        if "start_channel" not in kwargs:
137
            raise TypeError("__init__() missing 1 required keyword-only argument: 'start_channel'")
138
139
        if kwargs["start_channel"] < 1 or kwargs["start_channel"] > 512:
140
            raise ValueError('Start Channel must be between 1 and 512.')
141
142
        if "name" not in kwargs:
143
            kwargs["name"] = ""
144
145
        self.__start_channel = kwargs["start_channel"]
146
        self.__channels = []
147
        self.__effects = []
148
        self.__id = None
149
        self.__controller = None
150
        self.__name = kwargs["name"]
151
        self.__channel_aliases = {}
152
        self.__kwargs = kwargs
153
        self.__args = args
154
155
    def __str__(self):
156
        return self.title
157
158
    # Internal
159
160
    def _register_channel(self, name: str, *, parked: Union[bool, int] = False) -> int:
161
        if self.__start_channel + len(self.__channels) > 512:
162
            raise FixtureCreationException(self, 'Not enough space in universe for channel `{}`.'.format(name))
163
164
        used_names = [f.name for f in self.__channels]
165
        used_names.extend([f for f in self.__channel_aliases])
166
        if name.lower().strip() in used_names:
167
            raise FixtureCreationException(self, 'Name `{}` already in use for channel (or alias).'.format(name))
168
169
        self.__channels.append(Channel(name.lower().strip(), parked))
170
        return len(self.__channels) - 1
171
172
    def _register_channel_aliases(self, channel: str, *aliases: str) -> bool:
173
        if not aliases:
174
            return False
175
176
        channel = channel.lower().strip()
177
178
        used_names = [f.name for f in self.__channels]
179
        if channel not in used_names:
180
            warn('Channel name `{}` is not registered.'.format(channel))
181
            return False
182
183
        for alias in aliases:
184
            if alias in self.__channel_aliases.keys():
185
                warn('Channel alias `{}` already in use for channel `{}`.'.format(alias, self.__channel_aliases[alias]))
186
                continue
187
            self.__channel_aliases[alias] = channel
188
        return True
189
190
    def set_id(self, fixture_id: int):
191
        # Only ever set once
192
        if self.__id is None:
193
            self.__id = fixture_id
194
195
    def set_controller(self, controller: 'Controller'):
196
        # Only ever set once
197
        if self.__controller is None:
198
            self.__controller = controller
199
200
    def _set_name(self, name: str):
201
        self.__name = name
202
203
    # Properties
204
205
    def _valid_channel_value(self, value: int, channel: Union[str, int]) -> bool:
206
        if value < 0 or value > 255:
207
            if self.__controller.dmx_value_warnings:
208
                warn('{} DMX value must be between 0 and 255. Received value {} for channel {}'.format(
209
                    self.title, value, channel))
210
            return False
211
        return True
212
213
    @property
214
    def id(self) -> int:
215
        return self.__id if self.__id is not None else 0
216
217
    @property
218
    def name(self) -> str:
219
        return self.__name
220
221
    @property
222
    def start_channel(self) -> int:
223
        return self.__start_channel
224
225
    @property
226
    def next_channel(self) -> int:
227
        return len(self.__channels) + 1
228
229
    @property
230
    def parked(self) -> bool:
231
        for chan in self.__channels:
232
            if chan.parked:
233
                return True
234
        return False
235
236
    @property
237
    def controller(self) -> 'Controller':
238
        return self.__controller
239
240
    @property
241
    def channels(self) -> dict:
242
        channels = {}
243
        for i, chan in enumerate(self.__channels):
244
            channels[self.start_channel + i] = {'name': chan.name, 'value': self.get_channel_value(i)}
245
        return channels
246
247
    @property
248
    def channel_usage(self) -> str:
249
        return "{}->{} ({})".format(
250
            self.start_channel,
251
            (self.start_channel + len(self.__channels) - 1),
252
            len(self.__channels)
253
        )
254
255
    @property
256
    def title(self) -> str:
257
        return "Fixture #{} {} of type {} using channels {}.".format(
258
            self.id,
259
            "('{}')".format(self.name) if self.name else "",
260
            self.__class__.__name__,
261
            self.channel_usage
262
        )
263
264
    @property
265
    def json_data(self) -> dict:
266
        pattern = re.compile(r"^PyDMXControl\.profiles\.(([\w\d.]+)\.)*_[\w\d]+$", re.IGNORECASE)
267
        match = pattern.match(self.__class__.__module__)
268
        if not match:
269
            raise JSONConfigSaveException(self.id)
270
        base = {
271
            "type": "{}.{}".format(match.group(2), self.__class__.__name__),
272
            "args": self.__args
273
        }
274
        for kwarg, val in self.__kwargs.items():
275
            if kwarg not in base:
276
                if kwarg == "name":
277
                    val = self.name
278
                base[kwarg] = val
279
        return base
280
281
    # Channels
282
283
    def get_channel_id(self, channel: Union[str, int]) -> int:
284
        channel = str(channel)
285
286
        if channel.isdigit():
287
            channel_int = int(channel)
288
            if channel_int < len(self.__channels):
289
                return channel_int
290
291
        channel = channel.lower().strip()
292
        if channel in self.__channel_aliases.keys():
293
            channel = self.__channel_aliases[channel]
294
295
        for i, chan in enumerate(self.__channels):
296
            if chan.name == channel:
297
                return i
298
299
        raise ChannelNotFoundException(channel, self.id)
300
301
    def has_channel(self, channel: Union[str, int]) -> bool:
302
        try:
303
            self.get_channel_id(channel)
304
            return True
305
        except ChannelNotFoundException:
306
            return False
307
308
    def get_channel_value(self, channel: Union[str, int], apply_parking: bool = True) -> Tuple[int, datetime]:
309
        if not apply_parking:
310
            return self.__channels[self.get_channel_id(channel)].value_unparked
311
        return self.__channels[self.get_channel_id(channel)].value
312
313
    def set_channel(self, channel: Union[str, int], value: int) -> 'Fixture':
314
        if not self._valid_channel_value(value, channel):
315
            return self
316
317
        self.__channels[self.get_channel_id(channel)].set(value)
318
        return self
319
320
    def set_channels(self, *args: Union[int, List[int], None], **kwargs) -> 'Fixture':
321
        channel = 0
322
        if 'start' in kwargs and str(kwargs['start']).isdigit() and int(kwargs['start']) > 0:
323
            channel = int(kwargs['start'])
324
325
        def apply_values(fixture, values, chan=1):
326
            for value in values:
327
                if value is not None:
328
                    if isinstance(value, list):
329
                        chan = apply_values(fixture, value, chan)
330
                    elif str(value).isdigit():
331
                        fixture.set_channel(chan, int(value))
332
                        chan += 1
333
            return chan - 1
334
335
        apply_values(self, args, channel)
336
337
        return self
338
339
    # Parking
340
341
    def park(self) -> 'Fixture':
342
        for chan in self.__channels:
343
            chan.park(chan.value[0])
344
345
        return self
346
347
    def unpark(self) -> 'Fixture':
348
        for chan in self.__channels:
349
            chan.unpark()
350
351
        return self
352
353
    # Effects
354
355
    def add_effect(self, effect: Type[Effect], speed: float, *args, **kwargs) -> 'Fixture':
356
        # Instantiate
357
        effect = effect(self, speed, *args, **kwargs)
358
        # Start
359
        effect.start()
360
        # Save
361
        self.__effects.append(effect)
362
363
        return self
364
365
    def get_effect_by_effect(self, effect: Type[Effect]) -> List[Effect]:
366
        matches = []
367
368
        # Iterate over each effect
369
        for this_effect in self.__effects:
370
            # If it matches the given effect
371
            if isinstance(this_effect, effect):
372
                # Store
373
                matches.append(this_effect)
374
375
        # Return any matches
376
        return matches
377
378
    def remove_effect(self, effect: Effect) -> 'Fixture':
379
        if effect in self.__effects:
380
            effect.stop()
381
            self.__effects.remove(effect)
382
383
        return self
384
385
    def clear_effects(self) -> 'Fixture':
386
        # Stop
387
        for effect in self.__effects:
388
            effect.stop()
389
        # Clear
390
        self.__effects = []
391
392
        return self
393