Passed
Pull Request — master (#18)
by Matt
118:47 queued 117:45
created

FixtureHelpers.on()   A

Complexity

Conditions 1

Size

Total Lines 2
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 2
dl 0
loc 2
rs 10
c 0
b 0
f 0
cc 1
nop 1
1
"""
2
 *  PyDMXControl: A Python 3 module to control DMX via Python. Featuring fixture profiles and working with uDMX.
3
 *  <https://github.com/MattIPv4/PyDMXControl/>
4
 *  Copyright (C) 2018 Matt Cowley (MattIPv4) ([email protected])
5
"""
6
7
import re
8
from datetime import datetime
9
from threading import Thread
10
from time import sleep, time
11
from typing import Union, List, Tuple, Type
12
from warnings import warn
13
14
from ... import Colors
15
from ...effects.defaults import Effect
16
from ...utils.exceptions import FixtureCreationException, JSONConfigSaveException
17
18
19
class Channel:
20
21
    def __init__(self, name: str, parked: Union[bool, int]):
22
        self.name = name
23
        self.value = 0
24
        self.updated = datetime.utcnow()
25
        self.parked = parked if parked is False else (0 if parked is True else parked)
26
27
    def __updated(self):
28
        self.updated = datetime.utcnow()
29
30
    def set_value(self, value: int):
31
        if self.parked is False:
32
            self.value = value
33
            self.__updated()
34
35
    def get_value(self) -> Tuple[int, datetime]:
36
        return (self.value if self.parked is False else self.parked), self.updated
37
38
    def park(self, value: int = 0):
39
        self.parked = value
40
        self.__updated()
41
42
    def unpark(self):
43
        self.parked = False
44
        self.__updated()
45
46
47
class FixtureHelpers:
48
49
    def __dim(self, current, target, millis, channel):
50
        start = time() * 1000.0
51
        gap = target - current
52
53
        if millis > 0:
54
            while (time() * 1000.0) - start <= millis:
55
                diff = gap * (((time() * 1000.0) - start) / millis)
56
                self.set_channel(channel, int(current + diff))
57
                sleep(0.000001)
58
        self.set_channel(channel, int(target))
59
60
    def dim(self, target_value: int, milliseconds: int = 0, channel: Union[str, int] = 'dimmer') -> 'Fixture':
61
62
        # Calculate what we need
63
        current = self.get_channel_value(self.get_channel_id(channel))[0]
64
65
        # Create the thread and run loop
66
        thread = Thread(target=self.__dim, args=(current, target_value, milliseconds, channel))
67
        thread.daemon = True
68
        thread.start()
69
70
        return self
71
72
    def anim(self, milliseconds: int, *channels_values: Tuple[Union[str, int], int]):
73
        for channel_value in channels_values:
74
            self.dim(channel_value[1], milliseconds, channel_value[0])
75
76
    def color(self, color: Union[Colors, List[int], Tuple[int], str], milliseconds: int = 0):
77
        # Handle string color names
78
        if isinstance(color, str):
79
            if color in Colors:
80
                color = Colors[color]
81
            else:
82
                raise ValueError("Color '" + color + "' not defined in Colors enum."
83
                                                     " Supply valid Colors enum or List/Tuple of integers.")
84
85
        # Get a tuple
86
        color = Colors.to_tuples(color)
87
88
        # Apply
89
        self.anim(milliseconds, *color)
90
91
    def get_color(self) -> Union[None, List[int]]:
92
        red = self.get_channel_value(self.get_channel_id("r"))
93
        green = self.get_channel_value(self.get_channel_id("g"))
94
        blue = self.get_channel_value(self.get_channel_id("b"))
95
        white = self.get_channel_value(self.get_channel_id("w"))
96
        amber = self.get_channel_value(self.get_channel_id("a"))
97
        if red[0] == -1 or green[0] == -1 or blue[0] == -1:
98
            return None
99
        color = [red[0], green[0], blue[0]]
100
        if white[0] != -1:
101
            color.append(white[0])
102
            if amber[0] != -1:
103
                color.append(amber[0])
104
        return color
105
106
    def on(self):
107
        self.dim(255)
108
109
    def off(self):
110
        self.dim(0)
111
112
    def locate(self):
113
        self.color([255, 255, 255, 255, 255])
114
        self.dim(255)
115
116
117
class Fixture(FixtureHelpers):
118
119
    def __init__(self, *args, **kwargs):
120
        if "start_channel" not in kwargs:
121
            raise TypeError("__init__() missing 1 required keyword-only argument: 'start_channel'")
122
123
        if kwargs["start_channel"] < 1 or kwargs["start_channel"] > 512:
124
            raise ValueError('Start Channel must be between 1 and 512.')
125
126
        if "name" not in kwargs:
127
            kwargs["name"] = ""
128
129
        self.__start_channel = kwargs["start_channel"]
130
        self.__channels = []
131
        self.__effects = []
132
        self.__id = None
133
        self.__controller = None
134
        self.__name = kwargs["name"]
135
        self.__channel_aliases = {}
136
        self.__kwargs = kwargs
137
        self.__args = args
138
139
    def __str__(self):
140
        return self.title
141
142
    # Internal
143
144
    def _register_channel(self, name: str, *, parked: Union[bool, int] = False) -> int:
145
        if self.__start_channel + len(self.__channels) > 512:
146
            raise FixtureCreationException(self, 'Not enough space in universe for channel `{}`.'.format(name))
147
148
        used_names = [f.name for f in self.__channels]
149
        used_names.extend([f for f in self.__channel_aliases])
150
        if name.lower().strip() in used_names:
151
            raise FixtureCreationException(self, 'Name `{}` already in use for channel (or alias).'.format(name))
152
153
        self.__channels.append(Channel(name.lower().strip(), parked))
154
        return len(self.__channels) - 1
155
156
    def _register_channel_aliases(self, channel: str, *aliases: str) -> bool:
157
        if not aliases:
158
            return False
159
160
        channel = channel.lower().strip()
161
162
        used_names = [f.name for f in self.__channels]
163
        if channel not in used_names:
164
            warn('Channel name `{}` is not registered.'.format(channel))
165
            return False
166
167
        for alias in aliases:
168
            if alias in self.__channel_aliases.keys():
169
                warn('Channel alias `{}` already in use for channel `{}`.'.format(alias, self.__channel_aliases[alias]))
170
                continue
171
            self.__channel_aliases[alias] = channel
172
        return True
173
174
    def set_id(self, fixture_id: int):
175
        # Only ever set once
176
        if self.__id is None:
177
            self.__id = fixture_id
178
179
    def set_controller(self, controller: 'Controller'):
180
        # Only ever set once
181
        if self.__controller is None:
182
            self.__controller = controller
183
184
    def _set_name(self, name: str):
185
        self.__name = name
186
187
    # Properties
188
189
    def _valid_channel_value(self, value: int, channel: Union[str, int]) -> bool:
190
        if value < 0 or value > 255:
191
            if self.__controller.dmx_value_warnings:
192
                warn('{} DMX value must be between 0 and 255. Received value {} for channel {}'.format(
193
                    self.title, value, channel))
194
            return False
195
        return True
196
197
    @property
198
    def id(self) -> int:
199
        return self.__id if self.__id is not None else 0
200
201
    @property
202
    def name(self) -> str:
203
        return self.__name
204
205
    @property
206
    def start_channel(self) -> int:
207
        return self.__start_channel
208
209
    @property
210
    def next_channel(self) -> int:
211
        return len(self.__channels) + 1
212
213
    @property
214
    def channels(self) -> dict:
215
        channels = {}
216
        for i, chan in enumerate(self.__channels):
217
            channels[self.start_channel + i] = {'name': chan.name, 'value': self.get_channel_value(i)}
218
        return channels
219
220
    @property
221
    def channel_usage(self) -> str:
222
        return "{}->{} ({})".format(
223
            self.start_channel,
224
            (self.start_channel + len(self.__channels) - 1),
225
            len(self.__channels)
226
        )
227
228
    @property
229
    def title(self) -> str:
230
        return "Fixture #{} {} of type {} using channels {}.".format(
231
            self.id,
232
            "('{}')".format(self.name) if self.name else "",
233
            self.__class__.__name__,
234
            self.channel_usage
235
        )
236
237
    @property
238
    def json_data(self) -> dict:
239
        pattern = re.compile(r"^PyDMXControl\.profiles\.(([\w\d.]+)\.)*_[\w\d]+$", re.IGNORECASE)
240
        match = pattern.match(self.__class__.__module__)
241
        if not match:
242
            raise JSONConfigSaveException("Failed to generate JSON data for fixture #{}".format(self.id))
243
        base = {
244
            "type": "{}.{}".format(match.group(2), self.__class__.__name__),
245
            "args": self.__args
246
        }
247
        for kwarg, val in self.__kwargs.items():
248
            if kwarg not in base:
249
                if kwarg == "name":
250
                    val = self.name
251
                base[kwarg] = val
252
        return base
253
254
    # Channels
255
256
    def get_channel_id(self, channel: Union[str, int]) -> int:
257
        channel = str(channel)
258
259
        if channel.isdigit():
260
            channel = int(channel)
261
            if channel < len(self.__channels):
262
                return channel
263
264
        channel = str(channel).lower().strip()
265
        if channel in self.__channel_aliases.keys():
266
            channel = self.__channel_aliases[channel]
267
268
        for i, chan in enumerate(self.__channels):
269
            if chan.name == channel:
270
                return i
271
272
        return -1
273
274
    def get_channel_value(self, channel: int) -> Tuple[int, datetime]:
275
        channel = self.get_channel_id(channel)
276
        if channel >= len(self.__channels) or channel < 0:
277
            return -1, datetime.utcnow()
278
        return self.__channels[channel].get_value()
279
280
    def set_channel(self, channel: [str, int], value: int) -> 'Fixture':
281
        if not self._valid_channel_value(value, channel):
282
            return self
283
284
        channel = self.get_channel_id(channel)
285
        if channel == -1:
286
            return self
287
288
        self.__channels[channel].set_value(value)
289
        return self
290
291
    def set_channels(self, *args: Union[int, List[int], None], **kwargs) -> 'Fixture':
292
        channel = 0
293
        if 'start' in kwargs and str(kwargs['start']).isdigit() and int(kwargs['start']) > 0:
294
            channel = int(kwargs['start'])
295
296
        def apply_values(fixture, values, chan=1):
297
            for value in values:
298
                if value is not None:
299
                    if isinstance(value, list):
300
                        chan = apply_values(fixture, value, chan)
301
                    elif str(value).isdigit():
302
                        fixture.set_channel(chan, int(value))
303
                        chan += 1
304
            return chan - 1
305
306
        apply_values(self, args, channel)
307
308
        return self
309
310
    # Effects
311
312
    def add_effect(self, effect: Type[Effect], speed: float, *args, **kwargs) -> 'Fixture':
313
        # Instantiate
314
        effect = effect(self, speed, *args, **kwargs)
315
        # Start
316
        effect.start()
317
        # Save
318
        self.__effects.append(effect)
319
320
        return self
321
322
    def get_effect_by_effect(self, effect: Type[Effect]) -> List[Effect]:
323
        matches = []
324
325
        # Iterate over each effect
326
        for this_effect in self.__effects:
327
            # If it matches the given effect
328
            if isinstance(this_effect, effect):
329
                # Store
330
                matches.append(this_effect)
331
332
        # Return any matches
333
        return matches
334
335
    def remove_effect(self, effect: Effect) -> 'Fixture':
336
        if effect in self.__effects:
337
            effect.stop()
338
            self.__effects.remove(effect)
339
340
        return self
341
342
    def clear_effects(self) -> 'Fixture':
343
        # Stop
344
        for effect in self.__effects:
345
            effect.stop()
346
        # Clear
347
        self.__effects = []
348
349
        return self
350