Completed
Pull Request — master (#509)
by
unknown
03:15
created

struct_from_binary()   B

Complexity

Conditions 7

Size

Total Lines 19

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 13
CRAP Score 7.116

Importance

Changes 0
Metric Value
cc 7
dl 0
loc 19
ccs 13
cts 15
cp 0.8667
crap 7.116
rs 7.3333
c 0
b 0
f 0
1
"""
2
Binary protocol specific functions and constants
3
"""
4
5 1
import sys
6 1
import struct
7 1
import logging
8 1
import uuid
9 1
from enum import IntEnum, Enum
10
11 1
from opcua.ua.uaerrors import UaError
12 1
from opcua.common.utils import Buffer
13 1
from opcua import ua
14
15 1
if sys.version_info.major > 2:
16
    unicode = str
17
18 1
logger = logging.getLogger('__name__')
19
20
21 1
def test_bit(data, offset):
22 1
    mask = 1 << offset
23 1
    return data & mask
24
25
26 1
def set_bit(data, offset):
27 1
    mask = 1 << offset
28 1
    return data | mask
29
30
31 1
def unset_bit(data, offset):
32
    mask = 1 << offset
33
    return data & ~mask
34
35
36 1
class _DateTime(object):
37 1
    @staticmethod
38
    def pack(dt):
39 1
        epch = ua.datetime_to_win_epoch(dt)
40 1
        return Primitives.Int64.pack(epch)
41
42 1
    @staticmethod
43
    def unpack(data):
44 1
        epch = Primitives.Int64.unpack(data)
45 1
        return ua.win_epoch_to_datetime(epch)
46
47
48 1
class _String(object):
49 1
    @staticmethod
50
    def pack(string):
51 1
        if string is None:
52 1
            return Primitives.Int32.pack(-1)
53 1
        if isinstance(string, unicode):
54
            string = string.encode('utf-8')
55 1
        length = len(string)
56 1
        return Primitives.Int32.pack(length) + string
57
58 1
    @staticmethod
59
    def unpack(data):
60 1
        b = _Bytes.unpack(data)
61 1
        if sys.version_info.major < 3:
62 1
            return b
63
        else:
64
            if b is None:
65
                return b
66
            return b.decode("utf-8")
67
68
69 1
class _Bytes(object):
70 1
    @staticmethod
71
    def pack(data):
72 1
        return _String.pack(data)
73
74 1
    @staticmethod
75
    def unpack(data):
76 1
        length = Primitives.Int32.unpack(data)
77 1
        if length == -1:
78 1
            return None
79 1
        return data.read(length)
80
81
82 1
class _Null(object):
83 1
    @staticmethod
84
    def pack(data):
85 1
        return b""
86
87 1
    @staticmethod
88
    def unpack(data):
89 1
        return None
90
91
92 1
class _Guid(object):
93 1
    @staticmethod
94
    def pack(guid):
95
        # convert python UUID 6 field format to OPC UA 4 field format
96 1
        f1 = Primitives.UInt32.pack(guid.time_low)
97 1
        f2 = Primitives.UInt16.pack(guid.time_mid)
98 1
        f3 = Primitives.UInt16.pack(guid.time_hi_version)
99 1
        f4a = Primitives.Byte.pack(guid.clock_seq_hi_variant)
100 1
        f4b = Primitives.Byte.pack(guid.clock_seq_low)
101 1
        f4c = struct.pack('>Q', guid.node)[2:8]  # no primitive .pack available for 6 byte int
102 1
        f4 = f4a + f4b + f4c
103
        # concat byte fields
104 1
        b = f1 + f2 + f3 + f4
105
106 1
        return b
107
108 1
    @staticmethod
109
    def unpack(data):
110
        # convert OPC UA 4 field format to python UUID bytes
111 1
        f1 = struct.pack('>I', Primitives.UInt32.unpack(data))
112 1
        f2 = struct.pack('>H', Primitives.UInt16.unpack(data))
113 1
        f3 = struct.pack('>H', Primitives.UInt16.unpack(data))
114 1
        f4 = data.read(8)
115
        # concat byte fields
116 1
        b = f1 + f2 + f3 + f4
117
118 1
        return uuid.UUID(bytes=b)
119
120
121 1
class _Primitive1(object):
122 1
    def __init__(self, fmt):
123 1
        self.struct = struct.Struct(fmt)
124 1
        self.size = self.struct.size
125 1
        self.format = self.struct.format
126
127 1
    def pack(self, data):
128 1
        return struct.pack(self.format, data)
129
130 1
    def unpack(self, data):
131 1
        return struct.unpack(self.format, data.read(self.size))[0]
132
133
134 1
class Primitives1(object):
135 1
    SByte = _Primitive1("<b")
136 1
    Int16 = _Primitive1("<h")
137 1
    Int32 = _Primitive1("<i")
138 1
    Int64 = _Primitive1("<q")
139 1
    Byte = _Primitive1("<B")
140 1
    Char = Byte
141 1
    UInt16 = _Primitive1("<H")
142 1
    UInt32 = _Primitive1("<I")
143 1
    UInt64 = _Primitive1("<Q")
144 1
    Boolean = _Primitive1("<?")
145 1
    Double = _Primitive1("<d")
146 1
    Float = _Primitive1("<f")
147
148
149 1
class Primitives(Primitives1):
150 1
    Null = _Null()
151 1
    String = _String()
152 1
    Bytes = _Bytes()
153 1
    ByteString = _Bytes()
154 1
    CharArray = _String()
155 1
    DateTime = _DateTime()
156 1
    Guid = _Guid()
157
158
159 1
def pack_uatype(vtype, value):
160 1
    if hasattr(Primitives, vtype.name):
161 1
        return getattr(Primitives, vtype.name).pack(value)
162 1
    elif vtype.value > 25:
163 1
        return Primitives.Bytes.pack(value)
164 1
    elif vtype == ua.VariantType.ExtensionObject:
165 1
        return extensionobject_to_binary(value)
166 1
    elif vtype in (ua.VariantType.NodeId, ua.VariantType.ExpandedNodeId):
167 1
        return nodeid_to_binary(value)
168 1
    elif vtype == ua.VariantType.Variant:
169 1
        return variant_to_binary(value)
170
    else:
171 1
        return struct_to_binary(value)
172
173
174 1
def unpack_uatype(vtype, data):
175 1
    if hasattr(Primitives, vtype.name):
176 1
        st = getattr(Primitives, vtype.name)
177 1
        return st.unpack(data)
178 1
    elif vtype.value > 25:
179 1
        return Primitives.Bytes.unpack(data)
180 1
    elif vtype == ua.VariantType.ExtensionObject:
181 1
        return extensionobject_from_binary(data)
182 1
    elif vtype in (ua.VariantType.NodeId, ua.VariantType.ExpandedNodeId):
183 1
        return nodeid_from_binary(data)
184 1
    elif vtype == ua.VariantType.Variant:
185 1
        return variant_from_binary(data)
186
    else:
187 1
        if hasattr(ua, vtype.name):
188 1
            klass = getattr(ua, vtype.name)
189 1
            return struct_from_binary(klass, data)
190
        else:
191
            raise UaError("Cannot unpack unknown variant type {0!s}".format(vtype))
192
193
194 1
def pack_uatype_array(vtype, array):
195 1
    if array is None:
196 1
        return b'\xff\xff\xff\xff'
197 1
    length = len(array)
198 1
    b = [pack_uatype(vtype, val) for val in array]
199 1
    b.insert(0, Primitives.Int32.pack(length))
200 1
    return b"".join(b)
201
202
203 1
def unpack_uatype_array(vtype, data):
204 1
    length = Primitives.Int32.unpack(data)
205 1
    if length == -1:
206 1
        return None
207 1
    return [unpack_uatype(vtype, data) for _ in range(length)]
208
209
210 1
def struct_to_binary(obj):
211 1
    packet = []
212 1
    has_switch = hasattr(obj, "ua_switches")
213 1
    if has_switch:
214 1
        for name, switch in obj.ua_switches.items():
215 1
            member = getattr(obj, name)
216 1
            container_name, idx = switch
217 1
            if member is not None:
218 1
                container_val = getattr(obj, container_name)
219 1
                container_val = container_val | 1 << idx
220 1
                setattr(obj, container_name, container_val)
221 1
    for name, uatype in obj.ua_types:
222 1
        val = getattr(obj, name)
223 1
        if uatype.startswith("ListOf"):
224 1
            packet.append(list_to_binary(uatype[6:], val))
225
        else:
226 1
            if has_switch and val is None and name in obj.ua_switches:
227 1
                pass
228
            else:
229 1
                packet.append(to_binary(uatype, val))
230 1
    return b''.join(packet)
231
232
233 1
def to_binary(uatype, val):
234
    """
235
    Pack a python object to binary given a string defining its type
236
    """
237 1
    if uatype.startswith("ListOf"):
238
    #if isinstance(val, (list, tuple)):
239
        return list_to_binary(uatype[6:], val)
240 1
    elif isinstance(uatype, (str, unicode)) and hasattr(ua.VariantType, uatype):
241 1
        vtype = getattr(ua.VariantType, uatype)
242 1
        return pack_uatype(vtype, val)
243
    elif isinstance(val, (IntEnum, Enum)):
244
        return Primitives.UInt32.pack(val.value)
245
    elif isinstance(val, ua.NodeId):
246
        return nodeid_to_binary(val)
247
    elif isinstance(val, ua.Variant):
248
        return variant_to_binary(val)
249
    elif hasattr(val, "ua_types"):
250
        return struct_to_binary(val)
251
    else:
252
        raise UaError("No known way to pack {} of type {} to ua binary".format(val, uatype))
253
254
255 1
def list_to_binary(uatype, val):
256 1
    if val is None:
257
        return Primitives.Int32.pack(-1)
258 1
    pack = []
259 1
    pack.append(Primitives.Int32.pack(len(val)))
260 1
    for el in val:
261 1
        pack.append(to_binary(uatype, el))
262 1
    return b''.join(pack)
263
264
265 1
def nodeid_to_binary(nodeid):
266 1
    data = None
267 1
    if nodeid.NodeIdType == ua.NodeIdType.TwoByte:
268 1
        data = struct.pack("<BB", nodeid.NodeIdType.value, nodeid.Identifier)
269 1
    elif nodeid.NodeIdType == ua.NodeIdType.FourByte:
270 1
        data = struct.pack("<BBH", nodeid.NodeIdType.value, nodeid.NamespaceIndex, nodeid.Identifier)
271 1
    elif nodeid.NodeIdType == ua.NodeIdType.Numeric:
272 1
        data = struct.pack("<BHI", nodeid.NodeIdType.value, nodeid.NamespaceIndex, nodeid.Identifier)
273 1
    elif nodeid.NodeIdType == ua.NodeIdType.String:
274 1
        data = struct.pack("<BH", nodeid.NodeIdType.value, nodeid.NamespaceIndex) + \
275
            Primitives.String.pack(nodeid.Identifier)
276 1
    elif nodeid.NodeIdType == ua.NodeIdType.ByteString:
277 1
        data = struct.pack("<BH", nodeid.NodeIdType.value, nodeid.NamespaceIndex) + \
278
            Primitives.Bytes.pack(nodeid.Identifier)
279 1
    elif nodeid.NodeIdType == ua.NodeIdType.Guid:
280 1
        data = struct.pack("<BH", nodeid.NodeIdType.value, nodeid.NamespaceIndex) + \
281
               Primitives.Guid.pack(nodeid.Identifier)
282
    else:
283
        raise UaError("Unknown NodeIdType: {} for NodeId: {}".format(nodeid.NodeIdType, nodeid))
284
    # Add NamespaceUri and ServerIndex in case we have an ExpandedNodeId
285 1
    if nodeid.NamespaceUri:
286 1
        data = bytearray(data)
287 1
        data[0] = set_bit(data[0], 7)
288 1
        data.extend(Primitives.String.pack(nodeid.NamespaceUri))
289 1
    if nodeid.ServerIndex:
290 1
        if not isinstance(data, bytearray):
291
            data = bytearray(data)
292 1
        data[0] = set_bit(data[0], 6)
293 1
        data.extend(Primitives.UInt32.pack(nodeid.ServerIndex))
294 1
    return data
295
296
297 1
def nodeid_from_binary(data):
298 1
    nid = ua.NodeId()
299 1
    encoding = ord(data.read(1))
300 1
    nid.NodeIdType = ua.NodeIdType(encoding & 0b00111111)
301
302 1
    if nid.NodeIdType == ua.NodeIdType.TwoByte:
303 1
        nid.Identifier = ord(data.read(1))
304 1
    elif nid.NodeIdType == ua.NodeIdType.FourByte:
305 1
        nid.NamespaceIndex, nid.Identifier = struct.unpack("<BH", data.read(3))
306 1
    elif nid.NodeIdType == ua.NodeIdType.Numeric:
307 1
        nid.NamespaceIndex, nid.Identifier = struct.unpack("<HI", data.read(6))
308 1
    elif nid.NodeIdType == ua.NodeIdType.String:
309 1
        nid.NamespaceIndex = Primitives.UInt16.unpack(data)
310 1
        nid.Identifier = Primitives.String.unpack(data)
311 1
    elif nid.NodeIdType == ua.NodeIdType.ByteString:
312 1
        nid.NamespaceIndex = Primitives.UInt16.unpack(data)
313 1
        nid.Identifier = Primitives.Bytes.unpack(data)
314 1
    elif nid.NodeIdType == ua.NodeIdType.Guid:
315 1
        nid.NamespaceIndex = Primitives.UInt16.unpack(data)
316 1
        nid.Identifier = Primitives.Guid.unpack(data)
317
    else:
318
        raise UaError("Unknown NodeId encoding: " + str(nid.NodeIdType))
319
320 1
    if test_bit(encoding, 7):
321 1
        nid.NamespaceUri = Primitives.String.unpack(data)
322 1
    if test_bit(encoding, 6):
323 1
        nid.ServerIndex = Primitives.UInt32.unpack(data)
324
325 1
    return nid
326
327
328 1
def variant_to_binary(var):
329 1
    b = []
330 1
    encoding = var.VariantType.value & 0b111111
331 1
    if var.is_array or isinstance(var.Value, (list, tuple)):
332 1
        var.is_array = True
333 1
        encoding = set_bit(encoding, 7)
334 1
        if var.Dimensions is not None:
335 1
            encoding = set_bit(encoding, 6)
336 1
        b.append(Primitives.Byte.pack(encoding))
337 1
        b.append(pack_uatype_array(var.VariantType, ua.flatten(var.Value)))
338 1
        if var.Dimensions is not None:
339 1
            b.append(pack_uatype_array(ua.VariantType.Int32, var.Dimensions))
340
    else:
341 1
        b.append(Primitives.Byte.pack(encoding))
342 1
        b.append(pack_uatype(var.VariantType, var.Value))
343
344 1
    return b"".join(b)
345
346
347 1
def variant_from_binary(data):
348 1
    dimensions = None
349 1
    array = False
350 1
    encoding = ord(data.read(1))
351 1
    int_type = encoding & 0b00111111
352 1
    vtype = ua.datatype_to_varianttype(int_type)
353 1
    if test_bit(encoding, 7):
354 1
        value = unpack_uatype_array(vtype, data)
355 1
        array = True
356
    else:
357 1
        value = unpack_uatype(vtype, data)
358 1
    if test_bit(encoding, 6):
359 1
        dimensions = unpack_uatype_array(ua.VariantType.Int32, data)
360 1
        value = _reshape(value, dimensions)
361 1
    return ua.Variant(value, vtype, dimensions, is_array=array)
362
363
364 1
def _reshape(flat, dims):
365 1
    subdims = dims[1:]
366 1
    subsize = 1
367 1
    for i in subdims:
368 1
        if i == 0:
369 1
            i = 1
370 1
        subsize *= i
371 1
    while dims[0] * subsize > len(flat):
372 1
        flat.append([])
373 1
    if not subdims or subdims == [0]:
374 1
        return flat
375 1
    return [_reshape(flat[i:i + subsize], subdims) for i in range(0, len(flat), subsize)]
376
377
378 1
def extensionobject_from_binary(data):
379
    """
380
    Convert binary-coded ExtensionObject to a Python object.
381
    Returns an object, or None if TypeId is zero
382
    """
383 1
    typeid = nodeid_from_binary(data)
384 1
    Encoding = ord(data.read(1))
385 1
    body = None
386 1
    if Encoding & (1 << 0):
387 1
        length = Primitives.Int32.unpack(data)
388 1
        if length < 1:
389
            body = Buffer(b"")
390
        else:
391 1
            body = data.copy(length)
392 1
            data.skip(length)
393 1
    if typeid.Identifier == 0:
394 1
        return None
395 1
    elif typeid in ua.extension_object_classes:
396 1
        klass = ua.extension_object_classes[typeid]
397 1
        if body is None:
398
            raise UaError("parsing ExtensionObject {0} without data".format(klass.__name__))
399 1
        return from_binary(klass, body)
400
    else:
401 1
        e = ua.ExtensionObject()
402 1
        e.TypeId = typeid
403 1
        e.Encoding = Encoding
404 1
        if body is not None:
405 1
            e.Body = body.read(len(body))
406 1
        return e
407
408
409 1
def extensionobject_to_binary(obj):
410
    """
411
    Convert Python object to binary-coded ExtensionObject.
412
    If obj is None, convert to empty ExtensionObject (TypeId=0, no Body).
413
    Returns a binary string
414
    """
415 1
    if isinstance(obj, ua.ExtensionObject):
416 1
        return struct_to_binary(obj)
417 1
    if obj is None:
418
        TypeId = ua.NodeId()
419
        Encoding = 0
420
        Body = None
421
    else:
422 1
        TypeId = ua.extension_object_ids[obj.__class__.__name__]
423 1
        Encoding = 0x01
424 1
        Body = struct_to_binary(obj)
425 1
    packet = []
426 1
    packet.append(nodeid_to_binary(TypeId))
427 1
    packet.append(Primitives.Byte.pack(Encoding))
428 1
    if Body:
429 1
        packet.append(Primitives.Bytes.pack(Body))
430 1
    return b''.join(packet)
431
432
433 1
def from_binary(uatype, data):
434
    """
435
    unpack data given an uatype as a string or a python class having a ua_types memeber
436
    """
437 1
    if isinstance(uatype, (str, unicode)) and uatype.startswith("ListOf"):
438 1
        size = Primitives.Int32.unpack(data)
439 1
        if size == -1:
440
            return None
441 1
        res = []
442 1
        for _ in range(size):
443 1
            res.append(from_binary(uatype[6:], data))
444 1
        return res
445 1
    elif isinstance(uatype, (str, unicode)) and hasattr(ua.VariantType, uatype):
446 1
        vtype = getattr(ua.VariantType, uatype)
447 1
        return unpack_uatype(vtype, data)
448
    else:
449 1
        return struct_from_binary(uatype, data)
450
451
452 1
def struct_from_binary(objtype, data):
453
    """
454
    unpack an ua struct. Arguments are an objtype as Python class or string
455
    """
456 1
    if isinstance(objtype, (unicode, str)):
457
        objtype = getattr(ua, objtype)
458 1
    if issubclass(objtype, Enum):
459
        return objtype(Primitives.UInt32.unpack(data))
460 1
    obj = objtype()
461 1
    for name, uatype in obj.ua_types:
462
        # if our member has a swtich and it is not set we skip it
463 1
        if hasattr(obj, "ua_switches") and name in obj.ua_switches:
464 1
            container_name, idx = obj.ua_switches[name]
465 1
            val = getattr(obj, container_name)
466 1
            if not test_bit(val, idx):
467 1
                continue
468 1
        val = from_binary(uatype, data)
469 1
        setattr(obj, name, val)
470 1
    return obj
471
472
473 1
def header_to_binary(hdr):
474 1
    b = []
475 1
    b.append(struct.pack("<3ss", hdr.MessageType, hdr.ChunkType))
476 1
    size = hdr.body_size + 8
477 1
    if hdr.MessageType in (ua.MessageType.SecureOpen, ua.MessageType.SecureClose, ua.MessageType.SecureMessage):
478 1
        size += 4
479 1
    b.append(Primitives.UInt32.pack(size))
480 1
    if hdr.MessageType in (ua.MessageType.SecureOpen, ua.MessageType.SecureClose, ua.MessageType.SecureMessage):
481 1
        b.append(Primitives.UInt32.pack(hdr.ChannelId))
482 1
    return b"".join(b)
483
484
485 1
def header_from_binary(data):
486 1
    hdr = ua.Header()
487 1
    hdr.MessageType, hdr.ChunkType, hdr.packet_size = struct.unpack("<3scI", data.read(8))
488 1
    hdr.body_size = hdr.packet_size - 8
489 1
    if hdr.MessageType in (ua.MessageType.SecureOpen, ua.MessageType.SecureClose, ua.MessageType.SecureMessage):
490 1
        hdr.body_size -= 4
491 1
        hdr.ChannelId = Primitives.UInt32.unpack(data)
492 1
    return hdr
493
494
495 1
def uatcp_to_binary(message_type, message):
496
    """
497
    Convert OPC UA TCP message (see OPC UA specs Part 6, 7.1) to binary.
498
    The only supported types are Hello, Acknowledge and ErrorMessage
499
    """
500
    header = ua.Header(message_type, ua.ChunkType.Single)
501
    binmsg = struct_to_binary(message)
502
    header.body_size = len(binmsg)
503
    return header_to_binary(header) + binmsg
504