Passed
Push — master ( 91ee3f...f145d6 )
by Matt
01:40
created

Fixture.has_channel()   A

Complexity

Conditions 2

Size

Total Lines 6
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 6
dl 0
loc 6
rs 10
c 0
b 0
f 0
cc 2
nop 2
1
"""
2
 *  PyDMXControl: A Python 3 module to control DMX using OpenDMX or uDMX.
3
 *                Featuring fixture profiles, built-in effects and a web control panel.
4
 *  <https://github.com/MattIPv4/PyDMXControl/>
5
 *  Copyright (C) 2022 Matt Cowley (MattIPv4) ([email protected])
6
"""
7
8
import re
9
from datetime import datetime
10
from time import sleep, time
11
from typing import Union, List, Tuple, Type
12
from warnings import warn
13
14
from ... import Colors
15
from ...effects.defaults import Effect
16
from ...utils.exceptions import FixtureCreationException, JSONConfigSaveException, ChannelNotFoundException
17
18
19
class Channel:
20
21
    def __init__(self, name: str, parked: Union[bool, int]):
22
        self.name = name
23
        self.value = 0
24
        self.updated = datetime.utcnow()
25
        self.parked = parked if parked is False else (0 if parked is True else parked)
26
27
    def __updated(self):
28
        self.updated = datetime.utcnow()
29
30
    def set_value(self, value: int):
31
        if self.parked is False:
32
            self.value = value
33
            self.__updated()
34
35
    def get_value(self) -> Tuple[int, datetime]:
36
        return (self.value if self.parked is False else self.parked), self.updated
37
38
    def park(self, value: int = 0):
39
        self.parked = value
40
        self.__updated()
41
42
    def unpark(self):
43
        self.parked = False
44
        self.__updated()
45
46
47
class FixtureHelpers:
48
49
    def dim(self, target_value: int, milliseconds: int = 0, channel: Union[str, int] = 'dimmer') -> 'Fixture':
50
        # Handle instant edge-case
51
        millis = max(milliseconds, 0)
52
        if millis == 0:
53
            self.set_channel(channel, int(target_value))
54
            return self
55
56
        # Calculate what we need
57
        current = self.get_channel_value(self.get_channel_id(channel))[0]
58
        start = time() * 1000.0
59
        gap = target_value - current
60
61
        # Create the callback for ticker
62
        def callback():
63
            if (time() * 1000.0) - start <= millis:
0 ignored issues
show
introduced by
The variable start does not seem to be defined for all execution paths.
Loading history...
64
                diff = gap * (((time() * 1000.0) - start) / millis)
0 ignored issues
show
introduced by
The variable gap does not seem to be defined for all execution paths.
Loading history...
65
                self.set_channel(channel, int(current + diff))
0 ignored issues
show
introduced by
The variable current does not seem to be defined for all execution paths.
Loading history...
66
            else:
67
                self.set_channel(channel, int(target_value))
68
                self.controller.ticker.remove_callback(callback)
0 ignored issues
show
introduced by
The variable callback does not seem to be defined for all execution paths.
Loading history...
69
70
        # Start the callback
71
        self.controller.ticker.add_callback(callback, 0)
72
73
        return self
74
75
    def anim(self, milliseconds: int, *channels_values: Tuple[Union[str, int], int]):
76
        for channel_value in channels_values:
77
            self.dim(channel_value[1], milliseconds, channel_value[0])
78
79
    def color(self, color: Union[Colors, List[int], Tuple[int], str], milliseconds: int = 0):
80
        # Handle string color names
81
        if isinstance(color, str):
82
            if color in Colors.__members__:
83
                color = Colors[color]
84
            else:
85
                raise ValueError("Color '" + color + "' not defined in Colors enum."
86
                                                     " Supply valid Colors enum or List/Tuple of integers.")
87
88
        # Get a tuple
89
        color = [f for f in Colors.to_tuples(color) if self.has_channel(f[0])]
90
91
        # Apply
92
        self.anim(milliseconds, *color)
93
94
    def get_color(self) -> Union[None, List[int]]:
95
        if not self.has_channel('r') or not self.has_channel('g') or not self.has_channel('b'):
96
            return None
97
98
        color = [self.get_channel_value('r')[0], self.get_channel_value('g')[0], self.get_channel_value('b')[0]]
99
100
        if self.has_channel('w'):
101
            color.append(self.get_channel_value('w')[0])
102
103
            if self.has_channel('a'):
104
                color.append(self.get_channel_value('a')[0])
105
106
        return color
107
108
    def on(self):
109
        self.dim(255)
110
111
    def off(self):
112
        self.dim(0)
113
114
    def locate(self):
115
        self.color([255, 255, 255, 255, 255])
116
        self.dim(255)
117
118
119
class Fixture(FixtureHelpers):
120
121
    def __init__(self, *args, **kwargs):
122
        if "start_channel" not in kwargs:
123
            raise TypeError("__init__() missing 1 required keyword-only argument: 'start_channel'")
124
125
        if kwargs["start_channel"] < 1 or kwargs["start_channel"] > 512:
126
            raise ValueError('Start Channel must be between 1 and 512.')
127
128
        if "name" not in kwargs:
129
            kwargs["name"] = ""
130
131
        self.__start_channel = kwargs["start_channel"]
132
        self.__channels = []
133
        self.__effects = []
134
        self.__id = None
135
        self.__controller = None
136
        self.__name = kwargs["name"]
137
        self.__channel_aliases = {}
138
        self.__kwargs = kwargs
139
        self.__args = args
140
141
    def __str__(self):
142
        return self.title
143
144
    # Internal
145
146
    def _register_channel(self, name: str, *, parked: Union[bool, int] = False) -> int:
147
        if self.__start_channel + len(self.__channels) > 512:
148
            raise FixtureCreationException(self, 'Not enough space in universe for channel `{}`.'.format(name))
149
150
        used_names = [f.name for f in self.__channels]
151
        used_names.extend([f for f in self.__channel_aliases])
152
        if name.lower().strip() in used_names:
153
            raise FixtureCreationException(self, 'Name `{}` already in use for channel (or alias).'.format(name))
154
155
        self.__channels.append(Channel(name.lower().strip(), parked))
156
        return len(self.__channels) - 1
157
158
    def _register_channel_aliases(self, channel: str, *aliases: str) -> bool:
159
        if not aliases:
160
            return False
161
162
        channel = channel.lower().strip()
163
164
        used_names = [f.name for f in self.__channels]
165
        if channel not in used_names:
166
            warn('Channel name `{}` is not registered.'.format(channel))
167
            return False
168
169
        for alias in aliases:
170
            if alias in self.__channel_aliases.keys():
171
                warn('Channel alias `{}` already in use for channel `{}`.'.format(alias, self.__channel_aliases[alias]))
172
                continue
173
            self.__channel_aliases[alias] = channel
174
        return True
175
176
    def set_id(self, fixture_id: int):
177
        # Only ever set once
178
        if self.__id is None:
179
            self.__id = fixture_id
180
181
    def set_controller(self, controller: 'Controller'):
182
        # Only ever set once
183
        if self.__controller is None:
184
            self.__controller = controller
185
186
    def _set_name(self, name: str):
187
        self.__name = name
188
189
    # Properties
190
191
    def _valid_channel_value(self, value: int, channel: Union[str, int]) -> bool:
192
        if value < 0 or value > 255:
193
            if self.__controller.dmx_value_warnings:
194
                warn('{} DMX value must be between 0 and 255. Received value {} for channel {}'.format(
195
                    self.title, value, channel))
196
            return False
197
        return True
198
199
    @property
200
    def id(self) -> int:
201
        return self.__id if self.__id is not None else 0
202
203
    @property
204
    def name(self) -> str:
205
        return self.__name
206
207
    @property
208
    def start_channel(self) -> int:
209
        return self.__start_channel
210
211
    @property
212
    def next_channel(self) -> int:
213
        return len(self.__channels) + 1
214
215
    @property
216
    def controller(self) -> 'Controller':
217
        return self.__controller
218
219
    @property
220
    def channels(self) -> dict:
221
        channels = {}
222
        for i, chan in enumerate(self.__channels):
223
            channels[self.start_channel + i] = {'name': chan.name, 'value': self.get_channel_value(i)}
224
        return channels
225
226
    @property
227
    def channel_usage(self) -> str:
228
        return "{}->{} ({})".format(
229
            self.start_channel,
230
            (self.start_channel + len(self.__channels) - 1),
231
            len(self.__channels)
232
        )
233
234
    @property
235
    def title(self) -> str:
236
        return "Fixture #{} {} of type {} using channels {}.".format(
237
            self.id,
238
            "('{}')".format(self.name) if self.name else "",
239
            self.__class__.__name__,
240
            self.channel_usage
241
        )
242
243
    @property
244
    def json_data(self) -> dict:
245
        pattern = re.compile(r"^PyDMXControl\.profiles\.(([\w\d.]+)\.)*_[\w\d]+$", re.IGNORECASE)
246
        match = pattern.match(self.__class__.__module__)
247
        if not match:
248
            raise JSONConfigSaveException(self.id)
249
        base = {
250
            "type": "{}.{}".format(match.group(2), self.__class__.__name__),
251
            "args": self.__args
252
        }
253
        for kwarg, val in self.__kwargs.items():
254
            if kwarg not in base:
255
                if kwarg == "name":
256
                    val = self.name
257
                base[kwarg] = val
258
        return base
259
260
    # Channels
261
262
    def get_channel_id(self, channel: Union[str, int]) -> int:
263
        channel = str(channel)
264
265
        if channel.isdigit():
266
            channel_int = int(channel)
267
            if channel_int < len(self.__channels):
268
                return channel_int
269
270
        channel = channel.lower().strip()
271
        if channel in self.__channel_aliases.keys():
272
            channel = self.__channel_aliases[channel]
273
274
        for i, chan in enumerate(self.__channels):
275
            if chan.name == channel:
276
                return i
277
278
        raise ChannelNotFoundException(channel, self.id)
279
280
    def has_channel(self, channel: Union[str, int]) -> bool:
281
        try:
282
            self.get_channel_id(channel)
283
            return True
284
        except ChannelNotFoundException:
285
            return False
286
287
    def get_channel_value(self, channel: Union[str, int]) -> Tuple[int, datetime]:
288
        return self.__channels[self.get_channel_id(channel)].get_value()
289
290
    def set_channel(self, channel: Union[str, int], value: int) -> 'Fixture':
291
        if not self._valid_channel_value(value, channel):
292
            return self
293
294
        self.__channels[self.get_channel_id(channel)].set_value(value)
295
        return self
296
297
    def set_channels(self, *args: Union[int, List[int], None], **kwargs) -> 'Fixture':
298
        channel = 0
299
        if 'start' in kwargs and str(kwargs['start']).isdigit() and int(kwargs['start']) > 0:
300
            channel = int(kwargs['start'])
301
302
        def apply_values(fixture, values, chan=1):
303
            for value in values:
304
                if value is not None:
305
                    if isinstance(value, list):
306
                        chan = apply_values(fixture, value, chan)
307
                    elif str(value).isdigit():
308
                        fixture.set_channel(chan, int(value))
309
                        chan += 1
310
            return chan - 1
311
312
        apply_values(self, args, channel)
313
314
        return self
315
316
    # Effects
317
318
    def add_effect(self, effect: Type[Effect], speed: float, *args, **kwargs) -> 'Fixture':
319
        # Instantiate
320
        effect = effect(self, speed, *args, **kwargs)
321
        # Start
322
        effect.start()
323
        # Save
324
        self.__effects.append(effect)
325
326
        return self
327
328
    def get_effect_by_effect(self, effect: Type[Effect]) -> List[Effect]:
329
        matches = []
330
331
        # Iterate over each effect
332
        for this_effect in self.__effects:
333
            # If it matches the given effect
334
            if isinstance(this_effect, effect):
335
                # Store
336
                matches.append(this_effect)
337
338
        # Return any matches
339
        return matches
340
341
    def remove_effect(self, effect: Effect) -> 'Fixture':
342
        if effect in self.__effects:
343
            effect.stop()
344
            self.__effects.remove(effect)
345
346
        return self
347
348
    def clear_effects(self) -> 'Fixture':
349
        # Stop
350
        for effect in self.__effects:
351
            effect.stop()
352
        # Clear
353
        self.__effects = []
354
355
        return self
356