Passed
Push — master ( 11ac00...9b4b57 )
by Matt
03:49
created

Fixture.parked()   A

Complexity

Conditions 3

Size

Total Lines 6
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 6
dl 0
loc 6
rs 10
c 0
b 0
f 0
cc 3
nop 1
1
"""
2
 *  PyDMXControl: A Python 3 module to control DMX using OpenDMX or uDMX.
3
 *                Featuring fixture profiles, built-in effects and a web control panel.
4
 *  <https://github.com/MattIPv4/PyDMXControl/>
5
 *  Copyright (C) 2023 Matt Cowley (MattIPv4) ([email protected])
6
"""
7
8
import re
9
from datetime import datetime
10
from time import sleep, time
11
from typing import Union, List, Tuple, Type
12
from warnings import warn
13
14
from ... import Colors
15
from ...effects.defaults import Effect
16
from ...utils.exceptions import FixtureCreationException, JSONConfigSaveException, ChannelNotFoundException
17
18
19
class Channel:
20
21
    def __init__(self, name: str, parked: Union[bool, int]):
22
        self.name = name
23
        self.value = 0
24
        self.updated = datetime.utcnow()
25
        self.parked = parked if parked is False else (0 if parked is True else parked)
26
27
    def __updated(self):
28
        self.updated = datetime.utcnow()
29
30
    def set_value(self, value: int):
31
        if self.parked is False:
32
            self.value = value
33
            self.__updated()
34
35
    def get_value(self) -> Tuple[int, datetime]:
36
        return (self.value if self.parked is False else self.parked), self.updated
37
38
    def park(self, value: int = 0):
39
        self.parked = value
40
        self.__updated()
41
42
    def unpark(self):
43
        self.parked = False
44
        self.__updated()
45
46
47
class FixtureHelpers:
48
49
    def dim(self, target_value: int, milliseconds: int = 0, channel: Union[str, int] = 'dimmer') -> 'Fixture':
50
        # Handle instant edge-case
51
        millis = max(milliseconds, 0)
52
        if millis == 0:
53
            self.set_channel(channel, int(target_value))
54
            return self
55
56
        # Calculate what we need
57
        current = self.get_channel_value(self.get_channel_id(channel))[0]
58
        start = time() * 1000.0
59
        gap = target_value - current
60
61
        # Create the callback for ticker
62
        def callback():
63
            if (time() * 1000.0) - start <= millis:
0 ignored issues
show
introduced by
The variable start does not seem to be defined for all execution paths.
Loading history...
64
                diff = gap * (((time() * 1000.0) - start) / millis)
0 ignored issues
show
introduced by
The variable gap does not seem to be defined for all execution paths.
Loading history...
65
                self.set_channel(channel, int(current + diff))
0 ignored issues
show
introduced by
The variable current does not seem to be defined for all execution paths.
Loading history...
66
            else:
67
                self.set_channel(channel, int(target_value))
68
                self.controller.ticker.remove_callback(callback)
0 ignored issues
show
introduced by
The variable callback does not seem to be defined for all execution paths.
Loading history...
69
70
        # Start the callback
71
        self.controller.ticker.add_callback(callback, 0)
72
73
        return self
74
75
    def anim(self, milliseconds: int, *channels_values: Tuple[Union[str, int], int]):
76
        for channel_value in channels_values:
77
            self.dim(channel_value[1], milliseconds, channel_value[0])
78
79
    def color(self, color: Union[Colors, List[int], Tuple[int], str], milliseconds: int = 0):
80
        # Handle string color names
81
        if isinstance(color, str):
82
            if color in Colors.__members__:
83
                color = Colors[color]
84
            else:
85
                raise ValueError("Color '" + color + "' not defined in Colors enum."
86
                                                     " Supply valid Colors enum or List/Tuple of integers.")
87
88
        # Get a tuple
89
        color = [f for f in Colors.to_tuples(color) if self.has_channel(f[0])]
90
91
        # Apply
92
        self.anim(milliseconds, *color)
93
94
    def get_color(self) -> Union[None, List[int]]:
95
        if not self.has_channel('r') or not self.has_channel('g') or not self.has_channel('b'):
96
            return None
97
98
        color = [self.get_channel_value('r')[0], self.get_channel_value('g')[0], self.get_channel_value('b')[0]]
99
100
        if self.has_channel('w'):
101
            color.append(self.get_channel_value('w')[0])
102
103
            if self.has_channel('a'):
104
                color.append(self.get_channel_value('a')[0])
105
106
        return color
107
108
    def on(self):
109
        self.dim(255)
110
111
    def off(self):
112
        self.dim(0)
113
114
    def locate(self):
115
        self.color([255, 255, 255, 255, 255])
116
        self.dim(255)
117
118
119
class Fixture(FixtureHelpers):
120
121
    def __init__(self, *args, **kwargs):
122
        if "start_channel" not in kwargs:
123
            raise TypeError("__init__() missing 1 required keyword-only argument: 'start_channel'")
124
125
        if kwargs["start_channel"] < 1 or kwargs["start_channel"] > 512:
126
            raise ValueError('Start Channel must be between 1 and 512.')
127
128
        if "name" not in kwargs:
129
            kwargs["name"] = ""
130
131
        self.__start_channel = kwargs["start_channel"]
132
        self.__channels = []
133
        self.__effects = []
134
        self.__id = None
135
        self.__controller = None
136
        self.__name = kwargs["name"]
137
        self.__channel_aliases = {}
138
        self.__kwargs = kwargs
139
        self.__args = args
140
141
    def __str__(self):
142
        return self.title
143
144
    # Internal
145
146
    def _register_channel(self, name: str, *, parked: Union[bool, int] = False) -> int:
147
        if self.__start_channel + len(self.__channels) > 512:
148
            raise FixtureCreationException(self, 'Not enough space in universe for channel `{}`.'.format(name))
149
150
        used_names = [f.name for f in self.__channels]
151
        used_names.extend([f for f in self.__channel_aliases])
152
        if name.lower().strip() in used_names:
153
            raise FixtureCreationException(self, 'Name `{}` already in use for channel (or alias).'.format(name))
154
155
        self.__channels.append(Channel(name.lower().strip(), parked))
156
        return len(self.__channels) - 1
157
158
    def _register_channel_aliases(self, channel: str, *aliases: str) -> bool:
159
        if not aliases:
160
            return False
161
162
        channel = channel.lower().strip()
163
164
        used_names = [f.name for f in self.__channels]
165
        if channel not in used_names:
166
            warn('Channel name `{}` is not registered.'.format(channel))
167
            return False
168
169
        for alias in aliases:
170
            if alias in self.__channel_aliases.keys():
171
                warn('Channel alias `{}` already in use for channel `{}`.'.format(alias, self.__channel_aliases[alias]))
172
                continue
173
            self.__channel_aliases[alias] = channel
174
        return True
175
176
    def set_id(self, fixture_id: int):
177
        # Only ever set once
178
        if self.__id is None:
179
            self.__id = fixture_id
180
181
    def set_controller(self, controller: 'Controller'):
182
        # Only ever set once
183
        if self.__controller is None:
184
            self.__controller = controller
185
186
    def _set_name(self, name: str):
187
        self.__name = name
188
189
    # Properties
190
191
    def _valid_channel_value(self, value: int, channel: Union[str, int]) -> bool:
192
        if value < 0 or value > 255:
193
            if self.__controller.dmx_value_warnings:
194
                warn('{} DMX value must be between 0 and 255. Received value {} for channel {}'.format(
195
                    self.title, value, channel))
196
            return False
197
        return True
198
199
    @property
200
    def id(self) -> int:
201
        return self.__id if self.__id is not None else 0
202
203
    @property
204
    def name(self) -> str:
205
        return self.__name
206
207
    @property
208
    def start_channel(self) -> int:
209
        return self.__start_channel
210
211
    @property
212
    def next_channel(self) -> int:
213
        return len(self.__channels) + 1
214
215
    @property
216
    def parked(self) -> bool:
217
        for chan in self.__channels:
218
            if chan.parked:
219
                return True
220
        return False
221
222
    @property
223
    def controller(self) -> 'Controller':
224
        return self.__controller
225
226
    @property
227
    def channels(self) -> dict:
228
        channels = {}
229
        for i, chan in enumerate(self.__channels):
230
            channels[self.start_channel + i] = {'name': chan.name, 'value': self.get_channel_value(i)}
231
        return channels
232
233
    @property
234
    def channel_usage(self) -> str:
235
        return "{}->{} ({})".format(
236
            self.start_channel,
237
            (self.start_channel + len(self.__channels) - 1),
238
            len(self.__channels)
239
        )
240
241
    @property
242
    def title(self) -> str:
243
        return "Fixture #{} {} of type {} using channels {}.".format(
244
            self.id,
245
            "('{}')".format(self.name) if self.name else "",
246
            self.__class__.__name__,
247
            self.channel_usage
248
        )
249
250
    @property
251
    def json_data(self) -> dict:
252
        pattern = re.compile(r"^PyDMXControl\.profiles\.(([\w\d.]+)\.)*_[\w\d]+$", re.IGNORECASE)
253
        match = pattern.match(self.__class__.__module__)
254
        if not match:
255
            raise JSONConfigSaveException(self.id)
256
        base = {
257
            "type": "{}.{}".format(match.group(2), self.__class__.__name__),
258
            "args": self.__args
259
        }
260
        for kwarg, val in self.__kwargs.items():
261
            if kwarg not in base:
262
                if kwarg == "name":
263
                    val = self.name
264
                base[kwarg] = val
265
        return base
266
267
    # Channels
268
269
    def get_channel_id(self, channel: Union[str, int]) -> int:
270
        channel = str(channel)
271
272
        if channel.isdigit():
273
            channel_int = int(channel)
274
            if channel_int < len(self.__channels):
275
                return channel_int
276
277
        channel = channel.lower().strip()
278
        if channel in self.__channel_aliases.keys():
279
            channel = self.__channel_aliases[channel]
280
281
        for i, chan in enumerate(self.__channels):
282
            if chan.name == channel:
283
                return i
284
285
        raise ChannelNotFoundException(channel, self.id)
286
287
    def has_channel(self, channel: Union[str, int]) -> bool:
288
        try:
289
            self.get_channel_id(channel)
290
            return True
291
        except ChannelNotFoundException:
292
            return False
293
294
    def get_channel_value(self, channel: Union[str, int]) -> Tuple[int, datetime]:
295
        return self.__channels[self.get_channel_id(channel)].get_value()
296
297
    def set_channel(self, channel: Union[str, int], value: int) -> 'Fixture':
298
        if not self._valid_channel_value(value, channel):
299
            return self
300
301
        self.__channels[self.get_channel_id(channel)].set_value(value)
302
        return self
303
304
    def set_channels(self, *args: Union[int, List[int], None], **kwargs) -> 'Fixture':
305
        channel = 0
306
        if 'start' in kwargs and str(kwargs['start']).isdigit() and int(kwargs['start']) > 0:
307
            channel = int(kwargs['start'])
308
309
        def apply_values(fixture, values, chan=1):
310
            for value in values:
311
                if value is not None:
312
                    if isinstance(value, list):
313
                        chan = apply_values(fixture, value, chan)
314
                    elif str(value).isdigit():
315
                        fixture.set_channel(chan, int(value))
316
                        chan += 1
317
            return chan - 1
318
319
        apply_values(self, args, channel)
320
321
        return self
322
323
    # Parking
324
325
    def park(self):
326
        for chan in self.__channels:
327
            chan.park(chan.value)
328
329
    def unpark(self):
330
        for chan in self.__channels:
331
            chan.unpark()
332
333
    # Effects
334
335
    def add_effect(self, effect: Type[Effect], speed: float, *args, **kwargs) -> 'Fixture':
336
        # Instantiate
337
        effect = effect(self, speed, *args, **kwargs)
338
        # Start
339
        effect.start()
340
        # Save
341
        self.__effects.append(effect)
342
343
        return self
344
345
    def get_effect_by_effect(self, effect: Type[Effect]) -> List[Effect]:
346
        matches = []
347
348
        # Iterate over each effect
349
        for this_effect in self.__effects:
350
            # If it matches the given effect
351
            if isinstance(this_effect, effect):
352
                # Store
353
                matches.append(this_effect)
354
355
        # Return any matches
356
        return matches
357
358
    def remove_effect(self, effect: Effect) -> 'Fixture':
359
        if effect in self.__effects:
360
            effect.stop()
361
            self.__effects.remove(effect)
362
363
        return self
364
365
    def clear_effects(self) -> 'Fixture':
366
        # Stop
367
        for effect in self.__effects:
368
            effect.stop()
369
        # Clear
370
        self.__effects = []
371
372
        return self
373