Passed
Push — master ( b801a7...ad3e94 )
by Matt
01:45
created

Fixture.controller()   A

Complexity

Conditions 1

Size

Total Lines 3
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 3
dl 0
loc 3
rs 10
c 0
b 0
f 0
cc 1
nop 1
1
"""
2
 *  PyDMXControl: A Python 3 module to control DMX using uDMX.
3
 *                Featuring fixture profiles, built-in effects and a web control panel.
4
 *  <https://github.com/MattIPv4/PyDMXControl/>
5
 *  Copyright (C) 2021 Matt Cowley (MattIPv4) ([email protected])
6
"""
7
8
import re
9
from datetime import datetime
10
from threading import Thread
11
from time import sleep, time
12
from typing import Union, List, Tuple, Type
13
from warnings import warn
14
15
from ... import Colors
16
from ...effects.defaults import Effect
17
from ...utils.exceptions import FixtureCreationException, JSONConfigSaveException
18
19
20
class Channel:
21
22
    def __init__(self, name: str, parked: Union[bool, int]):
23
        self.name = name
24
        self.value = 0
25
        self.updated = datetime.utcnow()
26
        self.parked = parked if parked is False else (0 if parked is True else parked)
27
28
    def __updated(self):
29
        self.updated = datetime.utcnow()
30
31
    def set_value(self, value: int):
32
        if self.parked is False:
33
            self.value = value
34
            self.__updated()
35
36
    def get_value(self) -> Tuple[int, datetime]:
37
        return (self.value if self.parked is False else self.parked), self.updated
38
39
    def park(self, value: int = 0):
40
        self.parked = value
41
        self.__updated()
42
43
    def unpark(self):
44
        self.parked = False
45
        self.__updated()
46
47
48
class FixtureHelpers:
49
50
    def __dim(self, current, target, millis, channel):
51
        start = time() * 1000.0
52
        gap = target - current
53
54
        if millis > 0:
55
            while (time() * 1000.0) - start <= millis:
56
                diff = gap * (((time() * 1000.0) - start) / millis)
57
                self.set_channel(channel, int(current + diff))
58
                sleep(0.000001)
59
        self.set_channel(channel, int(target))
60
61
    def dim(self, target_value: int, milliseconds: int = 0, channel: Union[str, int] = 'dimmer') -> 'Fixture':
62
63
        # Calculate what we need
64
        current = self.get_channel_value(self.get_channel_id(channel))[0]
65
66
        # Create the thread and run loop
67
        thread = Thread(target=self.__dim, args=(current, target_value, milliseconds, channel))
68
        thread.daemon = True
69
        thread.start()
70
71
        return self
72
73
    def anim(self, milliseconds: int, *channels_values: Tuple[Union[str, int], int]):
74
        for channel_value in channels_values:
75
            self.dim(channel_value[1], milliseconds, channel_value[0])
76
77
    def color(self, color: Union[Colors, List[int], Tuple[int], str], milliseconds: int = 0):
78
        # Handle string color names
79
        if isinstance(color, str):
80
            if color in Colors.__members__:
81
                color = Colors[color]
82
            else:
83
                raise ValueError("Color '" + color + "' not defined in Colors enum."
84
                                                     " Supply valid Colors enum or List/Tuple of integers.")
85
86
        # Get a tuple
87
        color = Colors.to_tuples(color)
88
89
        # Apply
90
        self.anim(milliseconds, *color)
91
92
    def get_color(self) -> Union[None, List[int]]:
93
        red = self.get_channel_value(self.get_channel_id("r"))
94
        green = self.get_channel_value(self.get_channel_id("g"))
95
        blue = self.get_channel_value(self.get_channel_id("b"))
96
        white = self.get_channel_value(self.get_channel_id("w"))
97
        amber = self.get_channel_value(self.get_channel_id("a"))
98
        if red[0] == -1 or green[0] == -1 or blue[0] == -1:
99
            return None
100
        color = [red[0], green[0], blue[0]]
101
        if white[0] != -1:
102
            color.append(white[0])
103
            if amber[0] != -1:
104
                color.append(amber[0])
105
        return color
106
107
    def on(self):
108
        self.dim(255)
109
110
    def off(self):
111
        self.dim(0)
112
113
    def locate(self):
114
        self.color([255, 255, 255, 255, 255])
115
        self.dim(255)
116
117
118
class Fixture(FixtureHelpers):
119
120
    def __init__(self, *args, **kwargs):
121
        if "start_channel" not in kwargs:
122
            raise TypeError("__init__() missing 1 required keyword-only argument: 'start_channel'")
123
124
        if kwargs["start_channel"] < 1 or kwargs["start_channel"] > 512:
125
            raise ValueError('Start Channel must be between 1 and 512.')
126
127
        if "name" not in kwargs:
128
            kwargs["name"] = ""
129
130
        self.__start_channel = kwargs["start_channel"]
131
        self.__channels = []
132
        self.__effects = []
133
        self.__id = None
134
        self.__controller = None
135
        self.__name = kwargs["name"]
136
        self.__channel_aliases = {}
137
        self.__kwargs = kwargs
138
        self.__args = args
139
140
    def __str__(self):
141
        return self.title
142
143
    # Internal
144
145
    def _register_channel(self, name: str, *, parked: Union[bool, int] = False) -> int:
146
        if self.__start_channel + len(self.__channels) > 512:
147
            raise FixtureCreationException(self, 'Not enough space in universe for channel `{}`.'.format(name))
148
149
        used_names = [f.name for f in self.__channels]
150
        used_names.extend([f for f in self.__channel_aliases])
151
        if name.lower().strip() in used_names:
152
            raise FixtureCreationException(self, 'Name `{}` already in use for channel (or alias).'.format(name))
153
154
        self.__channels.append(Channel(name.lower().strip(), parked))
155
        return len(self.__channels) - 1
156
157
    def _register_channel_aliases(self, channel: str, *aliases: str) -> bool:
158
        if not aliases:
159
            return False
160
161
        channel = channel.lower().strip()
162
163
        used_names = [f.name for f in self.__channels]
164
        if channel not in used_names:
165
            warn('Channel name `{}` is not registered.'.format(channel))
166
            return False
167
168
        for alias in aliases:
169
            if alias in self.__channel_aliases.keys():
170
                warn('Channel alias `{}` already in use for channel `{}`.'.format(alias, self.__channel_aliases[alias]))
171
                continue
172
            self.__channel_aliases[alias] = channel
173
        return True
174
175
    def set_id(self, fixture_id: int):
176
        # Only ever set once
177
        if self.__id is None:
178
            self.__id = fixture_id
179
180
    def set_controller(self, controller: 'Controller'):
181
        # Only ever set once
182
        if self.__controller is None:
183
            self.__controller = controller
184
185
    def _set_name(self, name: str):
186
        self.__name = name
187
188
    # Properties
189
190
    def _valid_channel_value(self, value: int, channel: Union[str, int]) -> bool:
191
        if value < 0 or value > 255:
192
            if self.__controller.dmx_value_warnings:
193
                warn('{} DMX value must be between 0 and 255. Received value {} for channel {}'.format(
194
                    self.title, value, channel))
195
            return False
196
        return True
197
198
    @property
199
    def id(self) -> int:
200
        return self.__id if self.__id is not None else 0
201
202
    @property
203
    def name(self) -> str:
204
        return self.__name
205
206
    @property
207
    def start_channel(self) -> int:
208
        return self.__start_channel
209
210
    @property
211
    def next_channel(self) -> int:
212
        return len(self.__channels) + 1
213
214
    @property
215
    def controller(self) -> 'Controller':
216
        return self.__controller
217
218
    @property
219
    def channels(self) -> dict:
220
        channels = {}
221
        for i, chan in enumerate(self.__channels):
222
            channels[self.start_channel + i] = {'name': chan.name, 'value': self.get_channel_value(i)}
223
        return channels
224
225
    @property
226
    def channel_usage(self) -> str:
227
        return "{}->{} ({})".format(
228
            self.start_channel,
229
            (self.start_channel + len(self.__channels) - 1),
230
            len(self.__channels)
231
        )
232
233
    @property
234
    def title(self) -> str:
235
        return "Fixture #{} {} of type {} using channels {}.".format(
236
            self.id,
237
            "('{}')".format(self.name) if self.name else "",
238
            self.__class__.__name__,
239
            self.channel_usage
240
        )
241
242
    @property
243
    def json_data(self) -> dict:
244
        pattern = re.compile(r"^PyDMXControl\.profiles\.(([\w\d.]+)\.)*_[\w\d]+$", re.IGNORECASE)
245
        match = pattern.match(self.__class__.__module__)
246
        if not match:
247
            raise JSONConfigSaveException("Failed to generate JSON data for fixture #{}".format(self.id))
248
        base = {
249
            "type": "{}.{}".format(match.group(2), self.__class__.__name__),
250
            "args": self.__args
251
        }
252
        for kwarg, val in self.__kwargs.items():
253
            if kwarg not in base:
254
                if kwarg == "name":
255
                    val = self.name
256
                base[kwarg] = val
257
        return base
258
259
    # Channels
260
261
    def get_channel_id(self, channel: Union[str, int]) -> int:
262
        channel = str(channel)
263
264
        if channel.isdigit():
265
            channel = int(channel)
266
            if channel < len(self.__channels):
267
                return channel
268
269
        channel = str(channel).lower().strip()
270
        if channel in self.__channel_aliases.keys():
271
            channel = self.__channel_aliases[channel]
272
273
        for i, chan in enumerate(self.__channels):
274
            if chan.name == channel:
275
                return i
276
277
        return -1
278
279
    def get_channel_value(self, channel: int) -> Tuple[int, datetime]:
280
        channel = self.get_channel_id(channel)
281
        if channel >= len(self.__channels) or channel < 0:
282
            return -1, datetime.utcnow()
283
        return self.__channels[channel].get_value()
284
285
    def set_channel(self, channel: [str, int], value: int) -> 'Fixture':
286
        if not self._valid_channel_value(value, channel):
287
            return self
288
289
        channel = self.get_channel_id(channel)
290
        if channel == -1:
291
            return self
292
293
        self.__channels[channel].set_value(value)
294
        return self
295
296
    def set_channels(self, *args: Union[int, List[int], None], **kwargs) -> 'Fixture':
297
        channel = 0
298
        if 'start' in kwargs and str(kwargs['start']).isdigit() and int(kwargs['start']) > 0:
299
            channel = int(kwargs['start'])
300
301
        def apply_values(fixture, values, chan=1):
302
            for value in values:
303
                if value is not None:
304
                    if isinstance(value, list):
305
                        chan = apply_values(fixture, value, chan)
306
                    elif str(value).isdigit():
307
                        fixture.set_channel(chan, int(value))
308
                        chan += 1
309
            return chan - 1
310
311
        apply_values(self, args, channel)
312
313
        return self
314
315
    # Effects
316
317
    def add_effect(self, effect: Type[Effect], speed: float, *args, **kwargs) -> 'Fixture':
318
        # Instantiate
319
        effect = effect(self, speed, *args, **kwargs)
320
        # Start
321
        effect.start()
322
        # Save
323
        self.__effects.append(effect)
324
325
        return self
326
327
    def get_effect_by_effect(self, effect: Type[Effect]) -> List[Effect]:
328
        matches = []
329
330
        # Iterate over each effect
331
        for this_effect in self.__effects:
332
            # If it matches the given effect
333
            if isinstance(this_effect, effect):
334
                # Store
335
                matches.append(this_effect)
336
337
        # Return any matches
338
        return matches
339
340
    def remove_effect(self, effect: Effect) -> 'Fixture':
341
        if effect in self.__effects:
342
            effect.stop()
343
            self.__effects.remove(effect)
344
345
        return self
346
347
    def clear_effects(self) -> 'Fixture':
348
        # Stop
349
        for effect in self.__effects:
350
            effect.stop()
351
        # Clear
352
        self.__effects = []
353
354
        return self
355