Passed
Pull Request — master (#54)
by
unknown
07:42 queued 01:22
created

ArtnetDMXController.set_packet_size()   A

Complexity

Conditions 1

Size

Total Lines 4
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 3
dl 0
loc 4
rs 10
c 0
b 0
f 0
cc 1
nop 2
1
"""
2
 *  PyDMXControl: A Python 3 module to control DMX using OpenDMX or uDMX.
3
 *                Featuring fixture profiles, built-in effects and a web control panel.
4
 *  <https://github.com/MattIPv4/PyDMXControl/>
5
 *  Copyright (C) 2022 Matt Cowley (MattIPv4) ([email protected])
6
"""
7
import socket
8
from typing import List
9
10
from ._TransmittingController import TransmittingController
11
12
13
class ArtnetDMXController(TransmittingController):
14
15
    UDP_PORT = 6454
16
17
    def __init__(self, *args, **kwargs):
18
        # Device information
19
        self.__target_ip = kwargs.pop("target_ip", "127.0.0.1")
20
        self.__universe = kwargs.pop("universe", 0)
21
        self.__subnet = 0
22
        self.__net = 0
23
        self.__sequence = 0
24
        self.__make_even = kwargs.pop("even_packet_size", True)
25
        self.__packet_size = self.put_in_range(kwargs.pop("packet_size", 512), 2, 512, self.__make_even)
26
        self.__packet_header = bytearray()
27
        self.__buffer = bytearray(self.__packet_size)
28
        self.__broadcast=kwargs.pop("broadcast", False)
29
30
        
31
32
        self.__is_simplified = True		# simplify use of universe, net and subnet
33
34
        # UDP SOCKET
35
        self.__socket_client = None
36
37
        # Create the parent controller
38
        super().__init__(*args, **kwargs)
39
40
    def make_header(self):
41
        """Make packet header."""
42
        # 0 - id (7 x bytes + Null)
43
        self.__packet_header = bytearray()
44
        self.__packet_header.extend(bytearray('Art-Net', 'utf8'))
45
        self.__packet_header.append(0x0)
46
        # 8 - opcode (2 x 8 low byte first)
47
        self.__packet_header.append(0x00)
48
        self.__packet_header.append(0x50)  # ArtDmx data packet
49
        # 10 - prototocol version (2 x 8 high byte first)
50
        self.__packet_header.append(0x0)
51
        self.__packet_header.append(14)
52
        # 12 - sequence (int 8), NULL for not implemented
53
        self.__packet_header.append(self.__sequence)
54
        # 13 - physical port (int 8)
55
        self.__packet_header.append(0x00)
56
        # 14 - universe, (2 x 8 low byte first)
57
        if self.__is_simplified:
58
            # not quite correct but good enough for most cases:
59
            # the whole net subnet is simplified
60
            # by transforming a single uint16 into its 8 bit parts
61
            # you will most likely not see any differences in small networks
62
            msb, lsb = self.shift_this(self.__universe)   # convert to MSB / LSB
63
            self.__packet_header.append(lsb)
64
            self.__packet_header.append(msb)
65
        # 14 - universe, subnet (2 x 4 bits each)
66
        # 15 - net (7 bit value)
67
        else:
68
            # as specified in Artnet 4 (remember to set the value manually after):
69
            # Bit 3  - 0 = Universe (1-16)
70
            # Bit 7  - 4 = Subnet (1-16)
71
            # Bit 14 - 8 = Net (1-128)
72
            # Bit 15     = 0
73
            # this means 16 * 16 * 128 = 32768 universes per port
74
            # a subnet is a group of 16 Universes
75
            # 16 subnets will make a net, there are 128 of them
76
            self.__packet_header.append(self.__subnet << 4 | self.__universe)
77
            self.__packet_header.append(self.__net & 0xFF)
78
        # 16 - packet size (2 x 8 high byte first)
79
        msb, lsb = self.shift_this(self.__packet_size)		# convert to MSB / LSB
80
        self.__packet_header.append(msb)
81
        self.__packet_header.append(lsb)    
82
83
    def _connect(self):
84
         # Try to close if exists
85
        if self.__socket_client is not None:
86
            try:
87
                self._close()
88
            except Exception:
89
                pass
90
91
        # Get new device
92
        self.__socket_client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
93
94
        if self.__broadcast:
95
            self.__socket_client.setsockopt(
96
                socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
97
        
98
        self.make_header()
99
100
    def _close(self):         
101
        self.__socket_client.close()
102
        print("CLOSE: ArtnetDMX closed")
103
104
    def _transmit(self, frame: List[int], first: int):
105
        # Convert to a bytearray and pad the start of the frame
106
        # We're transmitting direct DMX data here, so a frame must start at channel 1, but can end early
107
        
108
        self.__buffer=bytearray(([0] * (first - 1)) + frame)
109
        # Write
110
        packet = bytearray()
111
        packet.extend(self.__packet_header)
112
        packet.extend(self.__buffer)
113
        try:
114
            self.__socket_client.sendto(packet, (self.__target_ip, self.UDP_PORT))
115
        except socket.error as error:
116
            print(f"ERROR: Socket error with exception: {error}")
117
118
    """Provides common functions byte objects."""
119
120
    def set_universe(self, universe):
121
        """Setter for universe (0 - 15 / 256).
122
123
        Mind if protocol has been simplified
124
        """
125
        # This is ugly, trying to keep interface easy
126
        # With simplified mode the universe will be split into two
127
        # values, (uni and sub) which is correct anyway. Net will always be 0
128
        if self.is_simplified:
129
            self.universe = self.put_in_range(universe, 0, 255, False)
130
        else:
131
            self.universe = self.put_in_range(universe, 0, 15, False)
132
        self.make_header()
133
134
    def set_subnet(self, sub):
135
        """Setter for subnet address (0 - 15).
136
137
        Set simplify to false to use
138
        """
139
        self.subnet = self.put_in_range(sub, 0, 15, False)
140
        self.make_header()
141
142
    def set_net(self, net):
143
        """Setter for net address (0 - 127).
144
145
        Set simplify to false to use
146
        """
147
        self.net = self.put_in_range(net, 0, 127, False)
148
        self.make_header()
149
150
    def set_packet_size(self, packet_size):
151
        """Setter for packet size (2 - 512, even only)."""
152
        self.packet_size = self.put_in_range(packet_size, 2, 512, self.make_even)
153
        self.make_header()
154
            
155
    def shift_this(number, high_first=True):
156
        """Utility method: extracts MSB and LSB from number.
157
158
        Args:
159
        number - number to shift
160
        high_first - MSB or LSB first (true / false)
161
162
        Returns:
163
        (high, low) - tuple with shifted values
164
165
        """
166
        low = (number & 0xFF)
167
        high = ((number >> 8) & 0xFF)
168
        if high_first:
169
            return((high, low))
170
        return((low, high))
171
172
173
    def clamp(number, min_val, max_val):
174
        """Utility method: sets number in defined range.
175
176
        Args:
177
        number - number to use
178
        range_min - lowest possible number
179
        range_max - highest possible number
180
181
        Returns:
182
        number - number in correct range
183
        """
184
        return max(min_val, min(number, max_val))
185
186
187
    def set_even(number):
188
        """Utility method: ensures number is even by adding.
189
190
        Args:
191
        number - number to make even
192
193
        Returns:
194
        number - even number
195
        """
196
        if number % 2 != 0:
197
            number += 1
198
        return number
199
200
201
    def put_in_range(self,number, range_min, range_max, make_even=True):
202
        """Utility method: sets number in defined range.
203
        DEPRECATED: this will be removed from the library
204
205
        Args:
206
        number - number to use
207
        range_min - lowest possible number
208
        range_max - highest possible number
209
        make_even - should number be made even
210
211
        Returns:
212
        number - number in correct range
213
214
        """
215
        number = self.clamp(number, range_min, range_max)
216
        if make_even:
217
            number = self.set_even(number)
218
        return number
219
220
221
    def make_address_mask(self,universe, sub=0, net=0, is_simplified=True):
222
        """Returns the address bytes for a given universe, subnet and net.
223
224
        Args:
225
        universe - Universe to listen
226
        sub - Subnet to listen
227
        net - Net to listen
228
        is_simplified - Whether to use nets and subnet or universe only,
229
        see User Guide page 5 (Universe Addressing)
230
231
        Returns:
232
        bytes - byte mask for given address
233
234
        """
235
        address_mask = bytearray()
236
237
        if is_simplified:
238
            # Ensure data is in right range
239
            universe = self.clamp(universe, 0, 32767)
240
241
            # Make mask
242
            msb, lsb = self.shift_this(universe)  # convert to MSB / LSB
243
            address_mask.append(lsb)
244
            address_mask.append(msb)
245
        else:
246
            # Ensure data is in right range
247
            universe = self.clamp(universe, 0, 15)
248
            sub = self.clamp(sub, 0, 15)
249
            net = self.clamp(net, 0, 127)
250
251
            # Make mask
252
            address_mask.append(sub << 4 | universe)
253
            address_mask.append(net & 0xFF)
254
255
        return address_mask
256