Completed
Push — master ( 4d7a0f...e1067b )
by Olivier
05:06 queued 53s
created

list_to_binary()   A

Complexity

Conditions 3

Size

Total Lines 8

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 7
CRAP Score 3.0175

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 3
c 1
b 0
f 0
dl 0
loc 8
ccs 7
cts 8
cp 0.875
crap 3.0175
rs 9.4285
1
"""
2
Binary protocol specific functions and constants
3
"""
4
5 1
import sys
6 1
import struct
7 1
import logging
8 1
import uuid
9 1
from enum import IntEnum, Enum
10
11 1
from opcua.ua.uaerrors import UaError
12 1
from opcua.common.utils import Buffer
13 1
from opcua import ua
14
15 1
if sys.version_info.major > 2:
16
    unicode = str
17
18 1
logger = logging.getLogger('__name__')
19
20
21 1
def test_bit(data, offset):
22 1
    mask = 1 << offset
23 1
    return data & mask
24
25
26 1
def set_bit(data, offset):
27 1
    mask = 1 << offset
28 1
    return data | mask
29
30
31 1
def unset_bit(data, offset):
32 1
    mask = 1 << offset
33 1
    return data & ~mask
34
35
36 1
class _DateTime(object):
37 1
    @staticmethod
38
    def pack(dt):
39 1
        epch = ua.datetime_to_win_epoch(dt)
40 1
        return Primitives.Int64.pack(epch)
41
42 1
    @staticmethod
43
    def unpack(data):
44 1
        epch = Primitives.Int64.unpack(data)
45 1
        return ua.win_epoch_to_datetime(epch)
46
47
48 1
class _String(object):
49 1
    @staticmethod
50
    def pack(string):
51 1
        if string is None:
52 1
            return Primitives.Int32.pack(-1)
53 1
        if isinstance(string, unicode):
54
            string = string.encode('utf-8')
55 1
        length = len(string)
56 1
        return Primitives.Int32.pack(length) + string
57
58 1
    @staticmethod
59
    def unpack(data):
60 1
        b = _Bytes.unpack(data)
61 1
        if sys.version_info.major < 3:
62 1
            return b
63
        else:
64
            if b is None:
65
                return b
66
            return b.decode("utf-8")
67
68
69 1
class _Bytes(object):
70 1
    @staticmethod
71
    def pack(data):
72 1
        return _String.pack(data)
73
74 1
    @staticmethod
75
    def unpack(data):
76 1
        length = Primitives.Int32.unpack(data)
77 1
        if length == -1:
78 1
            return None
79 1
        return data.read(length)
80
81
82 1
class _Null(object):
83 1
    @staticmethod
84
    def pack(data):
85 1
        return b""
86
87 1
    @staticmethod
88
    def unpack(data):
89 1
        return None
90
91
92 1
class _Guid(object):
93 1
    @staticmethod
94
    def pack(guid):
95
        # convert python UUID 6 field format to OPC UA 4 field format
96 1
        f1 = Primitives.UInt32.pack(guid.time_low)
97 1
        f2 = Primitives.UInt16.pack(guid.time_mid)
98 1
        f3 = Primitives.UInt16.pack(guid.time_hi_version)
99 1
        f4a = Primitives.Byte.pack(guid.clock_seq_hi_variant)
100 1
        f4b = Primitives.Byte.pack(guid.clock_seq_low)
101 1
        f4c = struct.pack('>Q', guid.node)[2:8]  # no primitive .pack available for 6 byte int
102 1
        f4 = f4a + f4b + f4c
103
        # concat byte fields
104 1
        b = f1 + f2 + f3 + f4
105
106 1
        return b
107
108 1
    @staticmethod
109
    def unpack(data):
110
        # convert OPC UA 4 field format to python UUID bytes
111 1
        f1 = struct.pack('>I', Primitives.UInt32.unpack(data))
112 1
        f2 = struct.pack('>H', Primitives.UInt16.unpack(data))
113 1
        f3 = struct.pack('>H', Primitives.UInt16.unpack(data))
114 1
        f4 = data.read(8)
115
        # concat byte fields
116 1
        b = f1 + f2 + f3 + f4
117
118 1
        return uuid.UUID(bytes=b)
119
120
121 1
class _Primitive1(object):
122 1
    def __init__(self, fmt):
123 1
        self.struct = struct.Struct(fmt)
124 1
        self.size = self.struct.size
125 1
        self.format = self.struct.format
126
127 1
    def pack(self, data):
128 1
        return struct.pack(self.format, data)
129
130 1
    def unpack(self, data):
131 1
        return struct.unpack(self.format, data.read(self.size))[0]
132
133
134 1
class Primitives1(object):
135 1
    SByte = _Primitive1("<b")
136 1
    Int16 = _Primitive1("<h")
137 1
    Int32 = _Primitive1("<i")
138 1
    Int64 = _Primitive1("<q")
139 1
    Byte = _Primitive1("<B")
140 1
    Char = Byte
141 1
    UInt16 = _Primitive1("<H")
142 1
    UInt32 = _Primitive1("<I")
143 1
    UInt64 = _Primitive1("<Q")
144 1
    Boolean = _Primitive1("<?")
145 1
    Double = _Primitive1("<d")
146 1
    Float = _Primitive1("<f")
147
148
149 1
class Primitives(Primitives1):
150 1
    Null = _Null()
151 1
    String = _String()
152 1
    Bytes = _Bytes()
153 1
    ByteString = _Bytes()
154 1
    CharArray = _String()
155 1
    DateTime = _DateTime()
156 1
    Guid = _Guid()
157
158
159 1
def pack_uatype(vtype, value):
160 1
    if hasattr(Primitives, vtype.name):
161 1
        return getattr(Primitives, vtype.name).pack(value)
162 1
    elif vtype.value > 25:
163 1
        return Primitives.Bytes.pack(value)
164 1
    elif vtype == ua.VariantType.ExtensionObject:
165 1
        return extensionobject_to_binary(value)
166 1
    elif vtype in (ua.VariantType.NodeId, ua.VariantType.ExpandedNodeId):
167 1
        return nodeid_to_binary(value)
168 1
    elif vtype == ua.VariantType.Variant:
169 1
        return variant_to_binary(value)
170
    else:
171 1
        return struct_to_binary(value)
172
173
174 1
def unpack_uatype(vtype, data):
175 1
    if hasattr(Primitives, vtype.name):
176 1
        st = getattr(Primitives, vtype.name)
177 1
        return st.unpack(data)
178 1
    elif vtype.value > 25:
179 1
        return Primitives.Bytes.unpack(data)
180 1
    elif vtype == ua.VariantType.ExtensionObject:
181 1
        return extensionobject_from_binary(data)
182 1
    elif vtype in (ua.VariantType.NodeId, ua.VariantType.ExpandedNodeId):
183 1
        return nodeid_from_binary(data)
184 1
    elif vtype == ua.VariantType.Variant:
185 1
        return variant_from_binary(data)
186
    else:
187 1
        if hasattr(ua, vtype.name):
188 1
            klass = getattr(ua, vtype.name)
189 1
            return struct_from_binary(klass, data)
190
        else:
191
            raise UaError("Cannot unpack unknown variant type {0!s}".format(vtype))
192
193
194 1
def pack_uatype_array(vtype, array):
195 1
    if array is None:
196 1
        return b'\xff\xff\xff\xff'
197 1
    length = len(array)
198 1
    b = [pack_uatype(vtype, val) for val in array]
199 1
    b.insert(0, Primitives.Int32.pack(length))
200 1
    return b"".join(b)
201
202
203 1
def unpack_uatype_array(vtype, data):
204 1
    length = Primitives.Int32.unpack(data)
205 1
    if length == -1:
206 1
        return None
207 1
    return [unpack_uatype(vtype, data) for _ in range(length)]
208
209
210 1
def struct_to_binary(obj):
211 1
    packet = []
212 1
    has_switch = hasattr(obj, "ua_switches")
213 1
    if has_switch:
214 1
        for name, switch in obj.ua_switches.items():
215 1
            member = getattr(obj, name)
216 1
            container_name, idx = switch
217 1
            if member is not None:
218 1
                container_val = getattr(obj, container_name)
219 1
                container_val = container_val | 1 << idx
220 1
                setattr(obj, container_name, container_val)
221 1
    for name, uatype in obj.ua_types:
222 1
        val = getattr(obj, name)
223 1
        if uatype.startswith("ListOf"):
224 1
            packet.append(list_to_binary(uatype[6:], val))
225
        else:
226 1
            if has_switch and val is None and name in obj.ua_switches:
227 1
                pass
228
            else:
229 1
                packet.append(to_binary(uatype, val))
230 1
    return b''.join(packet)
231
232
233 1
def to_binary(uatype, val):
234
    """
235
    Pack a python object to binary given a string defining its type
236
    """
237 1
    if uatype.startswith("ListOf"):
238
    #if isinstance(val, (list, tuple)):
239
        return list_to_binary(uatype[6:], val)
240 1
    elif isinstance(uatype, (str, unicode)) and hasattr(ua.VariantType, uatype):
241 1
        vtype = getattr(ua.VariantType, uatype)
242 1
        return pack_uatype(vtype, val)
243 1
    elif isinstance(val, (IntEnum, Enum)):
244 1
        return Primitives.UInt32.pack(val.value)
245 1
    elif isinstance(val, ua.NodeId):
246
        return nodeid_to_binary(val)
247 1
    elif isinstance(val, ua.Variant):
248
        return variant_to_binary(val)
249 1
    elif hasattr(val, "ua_types"):
250 1
        return struct_to_binary(val)
251
    else:
252
        raise UaError("No known way to pack {} of type {} to ua binary".format(val, uatype))
253
254
255 1
def list_to_binary(uatype, val):
256 1
    if val is None:
257
        return Primitives.Int32.pack(-1)
258 1
    pack = []
259 1
    pack.append(Primitives.Int32.pack(len(val)))
260 1
    for el in val:
261 1
        pack.append(to_binary(uatype, el))
262 1
    return b''.join(pack)
263
264
265 1
def nodeid_to_binary(nodeid):
266 1
    data = None
267 1
    if nodeid.NodeIdType == ua.NodeIdType.TwoByte:
268 1
        data = struct.pack("<BB", nodeid.NodeIdType.value, nodeid.Identifier)
269 1
    elif nodeid.NodeIdType == ua.NodeIdType.FourByte:
270 1
        data = struct.pack("<BBH", nodeid.NodeIdType.value, nodeid.NamespaceIndex, nodeid.Identifier)
271 1
    elif nodeid.NodeIdType == ua.NodeIdType.Numeric:
272 1
        data = struct.pack("<BHI", nodeid.NodeIdType.value, nodeid.NamespaceIndex, nodeid.Identifier)
273 1
    elif nodeid.NodeIdType == ua.NodeIdType.String:
274 1
        data = struct.pack("<BH", nodeid.NodeIdType.value, nodeid.NamespaceIndex) + \
275
            Primitives.String.pack(nodeid.Identifier)
276 1
    elif nodeid.NodeIdType == ua.NodeIdType.ByteString:
277 1
        data = struct.pack("<BH", nodeid.NodeIdType.value, nodeid.NamespaceIndex) + \
278
            Primitives.Bytes.pack(nodeid.Identifier)
279 1
    elif nodeid.NodeIdType == ua.NodeIdType.Guid:
280 1
        data = struct.pack("<BH", nodeid.NodeIdType.value, nodeid.NamespaceIndex) + \
281
               Primitives.Guid.pack(nodeid.Identifier)
282
    else:
283
        raise UaError("Unknown NodeIdType: {} for NodeId: {}".format(nodeid.NodeIdType, nodeid))
284
    # Add NamespaceUri and ServerIndex in case we have an ExpandedNodeId
285 1
    if nodeid.NamespaceUri:
286 1
        data = bytearray(data)
287 1
        data[0] = set_bit(data[0], 7)
288 1
        data.extend(Primitives.String.pack(nodeid.NamespaceUri))
289 1
    if nodeid.ServerIndex:
290 1
        if not isinstance(data, bytearray):
291
            data = bytearray(data)
292 1
        data[0] = set_bit(data[0], 6)
293 1
        data.extend(Primitives.UInt32.pack(nodeid.ServerIndex))
294 1
    return data
295
296
297 1
def nodeid_from_binary(data):
298 1
    nid = ua.NodeId()
299 1
    encoding = ord(data.read(1))
300 1
    nid.NodeIdType = ua.NodeIdType(encoding & 0b00111111)
301
302 1
    if nid.NodeIdType == ua.NodeIdType.TwoByte:
303 1
        nid.Identifier = ord(data.read(1))
304 1
    elif nid.NodeIdType == ua.NodeIdType.FourByte:
305 1
        nid.NamespaceIndex, nid.Identifier = struct.unpack("<BH", data.read(3))
306 1
    elif nid.NodeIdType == ua.NodeIdType.Numeric:
307 1
        nid.NamespaceIndex, nid.Identifier = struct.unpack("<HI", data.read(6))
308 1
    elif nid.NodeIdType == ua.NodeIdType.String:
309 1
        nid.NamespaceIndex = Primitives.UInt16.unpack(data)
310 1
        nid.Identifier = Primitives.String.unpack(data)
311 1
    elif nid.NodeIdType == ua.NodeIdType.ByteString:
312 1
        nid.NamespaceIndex = Primitives.UInt16.unpack(data)
313 1
        nid.Identifier = Primitives.Bytes.unpack(data)
314 1
    elif nid.NodeIdType == ua.NodeIdType.Guid:
315 1
        nid.NamespaceIndex = Primitives.UInt16.unpack(data)
316 1
        nid.Identifier = Primitives.Guid.unpack(data)
317
    else:
318
        raise UaError("Unknown NodeId encoding: " + str(nid.NodeIdType))
319
320 1
    if test_bit(encoding, 7):
321 1
        nid.NamespaceUri = Primitives.String.unpack(data)
322 1
    if test_bit(encoding, 6):
323 1
        nid.ServerIndex = Primitives.UInt32.unpack(data)
324
325 1
    return nid
326
327
328 1
def variant_to_binary(var):
329 1
    b = []
330 1
    encoding = var.VariantType.value & 0b111111
331 1
    if var.is_array or isinstance(var.Value, (list, tuple)):
332 1
        var.is_array = True
333 1
        encoding = set_bit(encoding, 7)
334 1
        if var.Dimensions is not None:
335 1
            encoding = set_bit(encoding, 6)
336 1
        b.append(Primitives.Byte.pack(encoding))
337 1
        b.append(pack_uatype_array(var.VariantType, ua.flatten(var.Value)))
338 1
        if var.Dimensions is not None:
339 1
            b.append(pack_uatype_array(ua.VariantType.Int32, var.Dimensions))
340
    else:
341 1
        b.append(Primitives.Byte.pack(encoding))
342 1
        b.append(pack_uatype(var.VariantType, var.Value))
343
344 1
    return b"".join(b)
345
346
347 1
def variant_from_binary(data):
348 1
    dimensions = None
349 1
    array = False
350 1
    encoding = ord(data.read(1))
351 1
    int_type = encoding & 0b00111111
352 1
    vtype = ua.datatype_to_varianttype(int_type)
353 1
    if test_bit(encoding, 7):
354 1
        value = unpack_uatype_array(vtype, data)
355 1
        array = True
356
    else:
357 1
        value = unpack_uatype(vtype, data)
358 1
    if test_bit(encoding, 6):
359 1
        dimensions = unpack_uatype_array(ua.VariantType.Int32, data)
360 1
        value = _reshape(value, dimensions)
361 1
    return ua.Variant(value, vtype, dimensions, is_array=array)
362
363
364 1
def _reshape(flat, dims):
365 1
    subdims = dims[1:]
366 1
    subsize = 1
367 1
    for i in subdims:
368 1
        if i == 0:
369 1
            i = 1
370 1
        subsize *= i
371 1
    while dims[0] * subsize > len(flat):
372 1
        flat.append([])
373 1
    if not subdims or subdims == [0]:
374 1
        return flat
375 1
    return [_reshape(flat[i:i + subsize], subdims) for i in range(0, len(flat), subsize)]
376
377
378 1
def extensionobject_from_binary(data):
379
    """
380
    Convert binary-coded ExtensionObject to a Python object.
381
    Returns an object, or None if TypeId is zero
382
    """
383 1
    typeid = nodeid_from_binary(data)
384 1
    Encoding = ord(data.read(1))
385 1
    body = None
386 1
    if Encoding & (1 << 0):
387 1
        length = Primitives.Int32.unpack(data)
388 1
        if length < 1:
389
            body = Buffer(b"")
390
        else:
391 1
            body = data.copy(length)
392 1
            data.skip(length)
393 1
    if typeid.Identifier == 0:
394 1
        return None
395 1
    elif typeid in ua.extension_object_classes:
396 1
        klass = ua.extension_object_classes[typeid]
397 1
        if body is None:
398
            raise UaError("parsing ExtensionObject {0} without data".format(klass.__name__))
399 1
        return from_binary(klass, body)
400
    else:
401 1
        e = ua.ExtensionObject()
402 1
        e.TypeId = typeid
403 1
        e.Encoding = Encoding
404 1
        if body is not None:
405 1
            e.Body = body.read(len(body))
406 1
        return e
407
408
409 1
def extensionobject_to_binary(obj):
410
    """
411
    Convert Python object to binary-coded ExtensionObject.
412
    If obj is None, convert to empty ExtensionObject (TypeId=0, no Body).
413
    Returns a binary string
414
    """
415 1
    if isinstance(obj, ua.ExtensionObject):
416 1
        return struct_to_binary(obj)
417 1
    if obj is None:
418 1
        TypeId = ua.NodeId()
419 1
        Encoding = 0
420 1
        Body = None
421
    else:
422 1
        TypeId = ua.extension_object_ids[obj.__class__.__name__]
423 1
        Encoding = 0x01
424 1
        Body = struct_to_binary(obj)
425 1
    packet = []
426 1
    packet.append(nodeid_to_binary(TypeId))
427 1
    packet.append(Primitives.Byte.pack(Encoding))
428 1
    if Body:
429 1
        packet.append(Primitives.Bytes.pack(Body))
430 1
    return b''.join(packet)
431
432
433 1
def from_binary(uatype, data):
434
    """
435
    unpack data given an uatype as a string or a python class having a ua_types memeber
436
    """
437 1
    if isinstance(uatype, (str, unicode)) and uatype.startswith("ListOf"):
438 1
        size = Primitives.Int32.unpack(data)
439 1
        if size == -1:
440
            return None
441 1
        res = []
442 1
        for _ in range(size):
443 1
            res.append(from_binary(uatype[6:], data))
444 1
        return res
445 1
    elif isinstance(uatype, (str, unicode)) and hasattr(ua.VariantType, uatype):
446 1
        vtype = getattr(ua.VariantType, uatype)
447 1
        return unpack_uatype(vtype, data)
448
    else:
449 1
        return struct_from_binary(uatype, data)
450
451
452 1
def struct_from_binary(objtype, data):
453
    """
454
    unpack an ua struct. Arguments are an objtype as Python class or string
455
    """
456 1
    if isinstance(objtype, (unicode, str)):
457 1
        objtype = getattr(ua, objtype)
458 1
    if issubclass(objtype, Enum):
459 1
        return objtype(Primitives.UInt32.unpack(data))
460 1
    obj = objtype()
461 1
    for name, uatype in obj.ua_types:
462
        # if our member has a swtich and it is not set we skip it
463 1
        if hasattr(obj, "ua_switches") and name in obj.ua_switches:
464 1
            container_name, idx = obj.ua_switches[name]
465 1
            val = getattr(obj, container_name)
466 1
            if not test_bit(val, idx):
467 1
                continue
468 1
        val = from_binary(uatype, data)
469 1
        setattr(obj, name, val)
470 1
    return obj
471
472
473 1
def header_to_binary(hdr):
474 1
    b = []
475 1
    b.append(struct.pack("<3ss", hdr.MessageType, hdr.ChunkType))
476 1
    size = hdr.body_size + 8
477 1
    if hdr.MessageType in (ua.MessageType.SecureOpen, ua.MessageType.SecureClose, ua.MessageType.SecureMessage):
478 1
        size += 4
479 1
    b.append(Primitives.UInt32.pack(size))
480 1
    if hdr.MessageType in (ua.MessageType.SecureOpen, ua.MessageType.SecureClose, ua.MessageType.SecureMessage):
481 1
        b.append(Primitives.UInt32.pack(hdr.ChannelId))
482 1
    return b"".join(b)
483
484
485 1
def header_from_binary(data):
486 1
    hdr = ua.Header()
487 1
    hdr.MessageType, hdr.ChunkType, hdr.packet_size = struct.unpack("<3scI", data.read(8))
488 1
    hdr.body_size = hdr.packet_size - 8
489 1
    if hdr.MessageType in (ua.MessageType.SecureOpen, ua.MessageType.SecureClose, ua.MessageType.SecureMessage):
490 1
        hdr.body_size -= 4
491 1
        hdr.ChannelId = Primitives.UInt32.unpack(data)
492 1
    return hdr
493
494
495 1
def uatcp_to_binary(message_type, message):
496
    """
497
    Convert OPC UA TCP message (see OPC UA specs Part 6, 7.1) to binary.
498
    The only supported types are Hello, Acknowledge and ErrorMessage
499
    """
500 1
    header = ua.Header(message_type, ua.ChunkType.Single)
501 1
    binmsg = struct_to_binary(message)
502 1
    header.body_size = len(binmsg)
503
    return header_to_binary(header) + binmsg
504