Passed
Pull Request — master (#25)
by Matt
09:18 queued 07:55
created

Fixture.manufacturer()   A

Complexity

Conditions 1

Size

Total Lines 3
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 3
nop 1
dl 0
loc 3
rs 10
c 0
b 0
f 0
1
"""
2
 *  PyDMXControl: A Python 3 module to control DMX using uDMX.
3
 *                Featuring fixture profiles, built-in effects and a web control panel.
4
 *  <https://github.com/MattIPv4/PyDMXControl/>
5
 *  Copyright (C) 2018 Matt Cowley (MattIPv4) ([email protected])
6
"""
7
8
import re
9
from datetime import datetime
10
from threading import Thread
11
from time import sleep, time
12
from typing import Union, List, Tuple, Type
13
from warnings import warn
14
15
from ... import Colors
16
from ...effects.defaults import Effect
17
from ...utils.exceptions import FixtureCreationException, JSONConfigSaveException
18
19
20
class Channel:
21
22
    def __init__(self, name: str, parked: Union[bool, int]):
23
        self.name = name
24
        self.value = 0
25
        self.updated = datetime.utcnow()
26
        self.parked = parked if parked is False else (0 if parked is True else parked)
27
28
    def __updated(self):
29
        self.updated = datetime.utcnow()
30
31
    def set_value(self, value: int):
32
        if self.parked is False:
33
            self.value = value
34
            self.__updated()
35
36
    def get_value(self) -> Tuple[int, datetime]:
37
        return (self.value if self.parked is False else self.parked), self.updated
38
39
    def park(self, value: int = 0):
40
        self.parked = value
41
        self.__updated()
42
43
    def unpark(self):
44
        self.parked = False
45
        self.__updated()
46
47
48
class FixtureHelpers:
49
50
    def __dim(self, current, target, millis, channel):
51
        start = time() * 1000.0
52
        gap = target - current
53
54
        if millis > 0:
55
            while (time() * 1000.0) - start <= millis:
56
                diff = gap * (((time() * 1000.0) - start) / millis)
57
                self.set_channel(channel, int(current + diff))
58
                sleep(0.000001)
59
        self.set_channel(channel, int(target))
60
61
    def dim(self, target_value: int, milliseconds: int = 0, channel: Union[str, int] = 'dimmer') -> 'Fixture':
62
63
        # Calculate what we need
64
        current = self.get_channel_value(self.get_channel_id(channel))[0]
65
66
        # Create the thread and run loop
67
        thread = Thread(target=self.__dim, args=(current, target_value, milliseconds, channel))
68
        thread.daemon = True
69
        thread.start()
70
71
        return self
72
73
    def anim(self, milliseconds: int, *channels_values: Tuple[Union[str, int], int]):
74
        for channel_value in channels_values:
75
            self.dim(channel_value[1], milliseconds, channel_value[0])
76
77
    def color(self, color: Union[Colors, List[int], Tuple[int], str], milliseconds: int = 0):
78
        # Handle string color names
79
        if isinstance(color, str):
80
            if color in Colors:
81
                color = Colors[color]
82
            else:
83
                raise ValueError("Color '" + color + "' not defined in Colors enum."
84
                                                     " Supply valid Colors enum or List/Tuple of integers.")
85
86
        # Get a tuple
87
        color = Colors.to_tuples(color)
88
89
        # Apply
90
        self.anim(milliseconds, *color)
91
92
    def get_color(self) -> Union[None, List[int]]:
93
        red = self.get_channel_value(self.get_channel_id("r"))
94
        green = self.get_channel_value(self.get_channel_id("g"))
95
        blue = self.get_channel_value(self.get_channel_id("b"))
96
        white = self.get_channel_value(self.get_channel_id("w"))
97
        amber = self.get_channel_value(self.get_channel_id("a"))
98
        if red[0] == -1 or green[0] == -1 or blue[0] == -1:
99
            return None
100
        color = [red[0], green[0], blue[0]]
101
        if white[0] != -1:
102
            color.append(white[0])
103
            if amber[0] != -1:
104
                color.append(amber[0])
105
        return color
106
107
    def on(self):
108
        self.dim(255)
109
110
    def off(self):
111
        self.dim(0)
112
113
    def locate(self):
114
        self.color([255, 255, 255, 255, 255])
115
        self.dim(255)
116
117
118
class Fixture(FixtureHelpers):
119
120
    def __init__(self, *args, **kwargs):
121
        if "start_channel" not in kwargs:
122
            raise TypeError("__init__() missing 1 required keyword-only argument: 'start_channel'")
123
124
        if kwargs["start_channel"] < 1 or kwargs["start_channel"] > 512:
125
            raise ValueError('Start Channel must be between 1 and 512.')
126
127
        if "name" not in kwargs:
128
            kwargs["name"] = ""
129
130
        self.__start_channel = kwargs["start_channel"]
131
        self.__channels = []
132
        self.__effects = []
133
        self.__id = None
134
        self.__controller = None
135
        self.__name = kwargs["name"]
136
        self.__channel_aliases = {}
137
        self.__kwargs = kwargs
138
        self.__args = args
139
140
    def __str__(self):
141
        return self.title
142
143
    # Internal
144
145
    def _register_channel(self, name: str, *, parked: Union[bool, int] = False) -> int:
146
        if self.__start_channel + len(self.__channels) > 512:
147
            raise FixtureCreationException(self, 'Not enough space in universe for channel `{}`.'.format(name))
148
149
        used_names = [f.name for f in self.__channels]
150
        used_names.extend([f for f in self.__channel_aliases])
151
        if name.lower().strip() in used_names:
152
            raise FixtureCreationException(self, 'Name `{}` already in use for channel (or alias).'.format(name))
153
154
        self.__channels.append(Channel(name.lower().strip(), parked))
155
        return len(self.__channels) - 1
156
157
    def _register_channel_aliases(self, channel: str, *aliases: str) -> bool:
158
        if not aliases:
159
            return False
160
161
        channel = channel.lower().strip()
162
163
        used_names = [f.name for f in self.__channels]
164
        if channel not in used_names:
165
            warn('Channel name `{}` is not registered.'.format(channel))
166
            return False
167
168
        for alias in aliases:
169
            if alias in self.__channel_aliases.keys():
170
                warn('Channel alias `{}` already in use for channel `{}`.'.format(alias, self.__channel_aliases[alias]))
171
                continue
172
            self.__channel_aliases[alias] = channel
173
        return True
174
175
    def set_id(self, fixture_id: int):
176
        # Only ever set once
177
        if self.__id is None:
178
            self.__id = fixture_id
179
180
    def set_controller(self, controller: 'Controller'):
181
        # Only ever set once
182
        if self.__controller is None:
183
            self.__controller = controller
184
185
    def _set_name(self, name: str):
186
        self.__name = name
187
188
    # Properties
189
190
    def _valid_channel_value(self, value: int, channel: Union[str, int]) -> bool:
191
        if value < 0 or value > 255:
192
            if self.__controller.dmx_value_warnings:
193
                warn('{} DMX value must be between 0 and 255. Received value {} for channel {}'.format(
194
                    self.title, value, channel))
195
            return False
196
        return True
197
198
    @property
199
    def id(self) -> int:
200
        return self.__id if self.__id is not None else 0
201
202
    @property
203
    def name(self) -> str:
204
        return self.__name
205
206
    @property
207
    def start_channel(self) -> int:
208
        return self.__start_channel
209
210
    @property
211
    def next_channel(self) -> int:
212
        return len(self.__channels) + 1
213
214
    @property
215
    def channels(self) -> dict:
216
        channels = {}
217
        for i, chan in enumerate(self.__channels):
218
            channels[self.start_channel + i] = {'name': chan.name, 'value': self.get_channel_value(i)}
219
        return channels
220
221
    @property
222
    def channel_usage(self) -> str:
223
        return "{}->{} ({})".format(
224
            self.start_channel,
225
            (self.start_channel + len(self.__channels) - 1),
226
            len(self.__channels)
227
        )
228
229
    @property
230
    def title(self) -> str:
231
        return "Fixture #{} {} of type {} using channels {}.".format(
232
            self.id,
233
            "('{}')".format(self.name) if self.name else "",
234
            self.__class__.__name__,
235
            self.channel_usage
236
        )
237
238
    @property
239
    def json_data(self) -> dict:
240
        pattern = re.compile(r"^PyDMXControl\.profiles\.(([\w\d.]+)\.)*_[\w\d]+$", re.IGNORECASE)
241
        match = pattern.match(self.__class__.__module__)
242
        if not match:
243
            raise JSONConfigSaveException("Failed to generate JSON data for fixture #{}".format(self.id))
244
        base = {
245
            "type": "{}.{}".format(match.group(2), self.__class__.__name__),
246
            "args": self.__args
247
        }
248
        for kwarg, val in self.__kwargs.items():
249
            if kwarg not in base:
250
                if kwarg == "name":
251
                    val = self.name
252
                base[kwarg] = val
253
        return base
254
255
    @property
256
    def manufacturer(self) -> str:
257
        return self.__class__.__module__.split(".")[-2]
258
259
    @property
260
    def model(self) -> str:
261
        return self.__class__.__name__.replace("_", " ")
262
263
    # Channels
264
265
    def get_channel_id(self, channel: Union[str, int]) -> int:
266
        channel = str(channel)
267
268
        if channel.isdigit():
269
            channel = int(channel)
270
            if channel < len(self.__channels):
271
                return channel
272
273
        channel = str(channel).lower().strip()
274
        if channel in self.__channel_aliases.keys():
275
            channel = self.__channel_aliases[channel]
276
277
        for i, chan in enumerate(self.__channels):
278
            if chan.name == channel:
279
                return i
280
281
        return -1
282
283
    def get_channel_value(self, channel: int) -> Tuple[int, datetime]:
284
        channel = self.get_channel_id(channel)
285
        if channel >= len(self.__channels) or channel < 0:
286
            return -1, datetime.utcnow()
287
        return self.__channels[channel].get_value()
288
289
    def set_channel(self, channel: [str, int], value: int) -> 'Fixture':
290
        if not self._valid_channel_value(value, channel):
291
            return self
292
293
        channel = self.get_channel_id(channel)
294
        if channel == -1:
295
            return self
296
297
        self.__channels[channel].set_value(value)
298
        return self
299
300
    def set_channels(self, *args: Union[int, List[int], None], **kwargs) -> 'Fixture':
301
        channel = 0
302
        if 'start' in kwargs and str(kwargs['start']).isdigit() and int(kwargs['start']) > 0:
303
            channel = int(kwargs['start'])
304
305
        def apply_values(fixture, values, chan=1):
306
            for value in values:
307
                if value is not None:
308
                    if isinstance(value, list):
309
                        chan = apply_values(fixture, value, chan)
310
                    elif str(value).isdigit():
311
                        fixture.set_channel(chan, int(value))
312
                        chan += 1
313
            return chan - 1
314
315
        apply_values(self, args, channel)
316
317
        return self
318
319
    # Effects
320
321
    def add_effect(self, effect: Type[Effect], speed: float, *args, **kwargs) -> 'Fixture':
322
        # Instantiate
323
        effect = effect(self, speed, *args, **kwargs)
324
        # Start
325
        effect.start()
326
        # Save
327
        self.__effects.append(effect)
328
329
        return self
330
331
    def get_effect_by_effect(self, effect: Type[Effect]) -> List[Effect]:
332
        matches = []
333
334
        # Iterate over each effect
335
        for this_effect in self.__effects:
336
            # If it matches the given effect
337
            if isinstance(this_effect, effect):
338
                # Store
339
                matches.append(this_effect)
340
341
        # Return any matches
342
        return matches
343
344
    def remove_effect(self, effect: Effect) -> 'Fixture':
345
        if effect in self.__effects:
346
            effect.stop()
347
            self.__effects.remove(effect)
348
349
        return self
350
351
    def clear_effects(self) -> 'Fixture':
352
        # Stop
353
        for effect in self.__effects:
354
            effect.stop()
355
        # Clear
356
        self.__effects = []
357
358
        return self
359