Passed
Pull Request — master (#54)
by
unknown
01:52
created

set_even()   A

Complexity

Conditions 2

Size

Total Lines 12
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 4
dl 0
loc 12
rs 10
c 0
b 0
f 0
cc 2
nop 1
1
"""
2
 *  PyDMXControl: A Python 3 module to control DMX using OpenDMX or uDMX.
3
 *                Featuring fixture profiles, built-in effects and a web control panel.
4
 *  <https://github.com/MattIPv4/PyDMXControl/>
5
 *  Copyright (C) 2022 Matt Cowley (MattIPv4) ([email protected])
6
"""
7
import socket
8
from typing import List
9
10
from ._TransmittingController import TransmittingController
11
12
13
class ArtnetDMXController(TransmittingController):
14
15
    UDP_PORT = 6454
16
17
    def __init__(self, *args, **kwargs):
18
        # Device information
19
        self.__target_ip = kwargs.pop("target_ip", "127.0.0.1")
20
        self.__universe = kwargs.pop("universe", 0)
21
        self.__subnet = 0
22
        self.__net = 0
23
        self.__sequence = 0
24
        self.__make_even = kwargs.pop("even_packet_size", True)
25
        self.__packet_size = put_in_range(kwargs.pop("packet_size", 512), 2, 512, self.__make_even)
26
        self.__packet_header = bytearray()
27
        self.__buffer = bytearray(self.__packet_size)
28
        self.__broadcast=kwargs.pop("broadcast", False)
29
30
        
31
32
        self.__is_simplified = True		# simplify use of universe, net and subnet
33
34
        # UDP SOCKET
35
        self.__socket_client = None
36
37
        # Create the parent controller
38
        super().__init__(*args, **kwargs)
39
40
    def make_header(self):
41
        """Make packet header."""
42
        # 0 - id (7 x bytes + Null)
43
        self.__packet_header = bytearray()
44
        self.__packet_header.extend(bytearray('Art-Net', 'utf8'))
45
        self.__packet_header.append(0x0)
46
        # 8 - opcode (2 x 8 low byte first)
47
        self.__packet_header.append(0x00)
48
        self.__packet_header.append(0x50)  # ArtDmx data packet
49
        # 10 - prototocol version (2 x 8 high byte first)
50
        self.__packet_header.append(0x0)
51
        self.__packet_header.append(14)
52
        # 12 - sequence (int 8), NULL for not implemented
53
        self.__packet_header.append(self.__sequence)
54
        # 13 - physical port (int 8)
55
        self.__packet_header.append(0x00)
56
        # 14 - universe, (2 x 8 low byte first)
57
        if self.__is_simplified:
58
            # not quite correct but good enough for most cases:
59
            # the whole net subnet is simplified
60
            # by transforming a single uint16 into its 8 bit parts
61
            # you will most likely not see any differences in small networks
62
            msb, lsb = shift_this(self.__universe)   # convert to MSB / LSB
63
            self.__packet_header.append(lsb)
64
            self.__packet_header.append(msb)
65
        # 14 - universe, subnet (2 x 4 bits each)
66
        # 15 - net (7 bit value)
67
        else:
68
            # as specified in Artnet 4 (remember to set the value manually after):
69
            # Bit 3  - 0 = Universe (1-16)
70
            # Bit 7  - 4 = Subnet (1-16)
71
            # Bit 14 - 8 = Net (1-128)
72
            # Bit 15     = 0
73
            # this means 16 * 16 * 128 = 32768 universes per port
74
            # a subnet is a group of 16 Universes
75
            # 16 subnets will make a net, there are 128 of them
76
            self.__packet_header.append(self.__subnet << 4 | self.__universe)
77
            self.__packet_header.append(self.__net & 0xFF)
78
        # 16 - packet size (2 x 8 high byte first)
79
        msb, lsb = shift_this(self.__packet_size)		# convert to MSB / LSB
80
        self.__packet_header.append(msb)
81
        self.__packet_header.append(lsb)    
82
83
    def _connect(self):
84
         # Try to close if exists
85
        if self.__socket_client is not None:
86
            try:
87
                self._close()
88
            except Exception:
89
                pass
90
91
        # Get new device
92
        self.__socket_client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
93
94
        if self.__broadcast:
95
            self.__socket_client.setsockopt(
96
                socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
97
        
98
        self.make_header()
99
100
    def _close(self):         
101
        self.__socket_client.close()
102
        print("CLOSE: ArtnetDMX closed")
103
104
    def _transmit(self, frame: List[int], first: int):
105
        # Convert to a bytearray and pad the start of the frame
106
        # We're transmitting direct DMX data here, so a frame must start at channel 1, but can end early
107
        
108
        self.__buffer=bytearray(([0] * (first - 1)) + frame)
109
        # Write
110
        packet = bytearray()
111
        packet.extend(self.__packet_header)
112
        packet.extend(self.__buffer)
113
        try:
114
            self.__socket_client.sendto(packet, (self.__target_ip, self.UDP_PORT))
115
        except socket.error as error:
116
            print(f"ERROR: Socket error with exception: {error}")
117
118
    """Provides common functions byte objects."""
119
120
    def set_universe(self, universe):
121
        """Setter for universe (0 - 15 / 256).
122
123
        Mind if protocol has been simplified
124
        """
125
        # This is ugly, trying to keep interface easy
126
        # With simplified mode the universe will be split into two
127
        # values, (uni and sub) which is correct anyway. Net will always be 0
128
        if self.is_simplified:
129
            self.universe = put_in_range(universe, 0, 255, False)
130
        else:
131
            self.universe = put_in_range(universe, 0, 15, False)
132
        self.make_header()
133
134
    def set_subnet(self, sub):
135
        """Setter for subnet address (0 - 15).
136
137
        Set simplify to false to use
138
        """
139
        self.subnet = put_in_range(sub, 0, 15, False)
140
        self.make_header()
141
142
    def set_net(self, net):
143
        """Setter for net address (0 - 127).
144
145
        Set simplify to false to use
146
        """
147
        self.net = put_in_range(net, 0, 127, False)
148
        self.make_header()
149
150
    def set_packet_size(self, packet_size):
151
        """Setter for packet size (2 - 512, even only)."""
152
        self.packet_size = put_in_range(packet_size, 2, 512, self.make_even)
153
        self.make_header()
154
            
155
    
156
    
157
"""Provides common functions byte objects."""
158
159
160
def shift_this(number, high_first=True):
161
    """Utility method: extracts MSB and LSB from number.
162
163
    Args:
164
    number - number to shift
165
    high_first - MSB or LSB first (true / false)
166
167
    Returns:
168
    (high, low) - tuple with shifted values
169
170
    """
171
    low = (number & 0xFF)
172
    high = ((number >> 8) & 0xFF)
173
    if high_first:
174
        return((high, low))
175
    return((low, high))
176
177
178
def clamp(number, min_val, max_val):
179
    """Utility method: sets number in defined range.
180
181
    Args:
182
    number - number to use
183
    range_min - lowest possible number
184
    range_max - highest possible number
185
186
    Returns:
187
    number - number in correct range
188
    """
189
    return max(min_val, min(number, max_val))
190
191
192
def set_even(number):
193
    """Utility method: ensures number is even by adding.
194
195
    Args:
196
    number - number to make even
197
198
    Returns:
199
    number - even number
200
    """
201
    if number % 2 != 0:
202
        number += 1
203
    return number
204
205
206
def put_in_range(number, range_min, range_max, make_even=True):
207
    """Utility method: sets number in defined range.
208
    DEPRECATED: this will be removed from the library
209
210
    Args:
211
    number - number to use
212
    range_min - lowest possible number
213
    range_max - highest possible number
214
    make_even - should number be made even
215
216
    Returns:
217
    number - number in correct range
218
219
    """
220
    number = clamp(number, range_min, range_max)
221
    if make_even:
222
        number = set_even(number)
223
    return number
224
225
226
def make_address_mask(universe, sub=0, net=0, is_simplified=True):
227
    """Returns the address bytes for a given universe, subnet and net.
228
229
    Args:
230
    universe - Universe to listen
231
    sub - Subnet to listen
232
    net - Net to listen
233
    is_simplified - Whether to use nets and subnet or universe only,
234
    see User Guide page 5 (Universe Addressing)
235
236
    Returns:
237
    bytes - byte mask for given address
238
239
    """
240
    address_mask = bytearray()
241
242
    if is_simplified:
243
        # Ensure data is in right range
244
        universe = clamp(universe, 0, 32767)
245
246
        # Make mask
247
        msb, lsb = shift_this(universe)  # convert to MSB / LSB
248
        address_mask.append(lsb)
249
        address_mask.append(msb)
250
    else:
251
        # Ensure data is in right range
252
        universe = clamp(universe, 0, 15)
253
        sub = clamp(sub, 0, 15)
254
        net = clamp(net, 0, 127)
255
256
        # Make mask
257
        address_mask.append(sub << 4 | universe)
258
        address_mask.append(net & 0xFF)
259
260
    return address_mask
261