asyncua.ua.ua_binary.extensionobject_to_binary()   A
last analyzed

Complexity

Conditions 4

Size

Total Lines 23
Code Lines 16

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 4
eloc 16
nop 1
dl 0
loc 23
rs 9.6
c 0
b 0
f 0
1
"""
2
Binary protocol specific functions and constants
3
"""
4
5
import struct
6
import logging
7
import uuid
8
from enum import IntEnum, Enum
9
10
from .uaerrors import UaError
11
from ..common.utils import Buffer
12
from asyncua import ua
13
14
logger = logging.getLogger('__name__')
15
16
17
def test_bit(data, offset):
18
    mask = 1 << offset
19
    return data & mask
20
21
22
def set_bit(data, offset):
23
    mask = 1 << offset
24
    return data | mask
25
26
27
def unset_bit(data, offset):
28
    mask = 1 << offset
29
    return data & ~mask
30
31
32
class _DateTime(object):
33
    @staticmethod
34
    def pack(dt):
35
        epch = ua.datetime_to_win_epoch(dt)
36
        return Primitives.Int64.pack(epch)
37
38
    @staticmethod
39
    def unpack(data):
40
        epch = Primitives.Int64.unpack(data)
41
        return ua.win_epoch_to_datetime(epch)
42
43
44
class _Bytes(object):
45
    @staticmethod
46
    def pack(data):
47
        if data is None:
48
            return Primitives.Int32.pack(-1)
49
        length = len(data)
50
        return Primitives.Int32.pack(length) + data
51
52
    @staticmethod
53
    def unpack(data):
54
        length = Primitives.Int32.unpack(data)
55
        if length == -1:
56
            return None
57
        return data.read(length)
58
59
60
class _String(object):
61
    @staticmethod
62
    def pack(string):
63
        if string is not None:
64
            string = string.encode('utf-8')
65
        return _Bytes.pack(string)
66
67
    @staticmethod
68
    def unpack(data):
69
        b = _Bytes.unpack(data)
70
        if b is None:
71
            return b
72
        return b.decode('utf-8')
73
74
75
class _Null(object):
76
    @staticmethod
77
    def pack(data):
78
        return b''
79
80
    @staticmethod
81
    def unpack(data):
82
        return None
83
84
85
class _Guid(object):
86
    @staticmethod
87
    def pack(guid):
88
        # convert python UUID 6 field format to OPC UA 4 field format
89
        f1 = Primitives.UInt32.pack(guid.time_low)
90
        f2 = Primitives.UInt16.pack(guid.time_mid)
91
        f3 = Primitives.UInt16.pack(guid.time_hi_version)
92
        f4a = Primitives.Byte.pack(guid.clock_seq_hi_variant)
93
        f4b = Primitives.Byte.pack(guid.clock_seq_low)
94
        f4c = struct.pack('>Q', guid.node)[2:8]  # no primitive .pack available for 6 byte int
95
        f4 = f4a + f4b + f4c
96
        # concat byte fields
97
        b = f1 + f2 + f3 + f4
98
99
        return b
100
101
    @staticmethod
102
    def unpack(data):
103
        # convert OPC UA 4 field format to python UUID bytes
104
        f1 = struct.pack('>I', Primitives.UInt32.unpack(data))
105
        f2 = struct.pack('>H', Primitives.UInt16.unpack(data))
106
        f3 = struct.pack('>H', Primitives.UInt16.unpack(data))
107
        f4 = data.read(8)
108
        # concat byte fields
109
        b = f1 + f2 + f3 + f4
110
111
        return uuid.UUID(bytes=b)
112
113
114
class _Primitive1(object):
115
    def __init__(self, fmt):
116
        self._fmt = fmt
117
        st = struct.Struct(fmt.format(1))
118
        self.size = st.size
119
        self.format = st.format
120
121
    def pack(self, data):
122
        return struct.pack(self.format, data)
123
124
    def unpack(self, data):
125
        return struct.unpack(self.format, data.read(self.size))[0]
126
127
    def pack_array(self, data):
128
        if data is None:
129
            return Primitives.Int32.pack(-1)
130
        if not isinstance(data, list):
131
            logger.warning(f'ua_binary.py > _Primitive1 > pack_array > data: {data} is not a instance of "list"!')
132
            return Primitives.Int32.pack(-1) # to prevent crashing while runtime
133
        size_data = Primitives.Int32.pack(len(data))
134
        return size_data + struct.pack(self._fmt.format(len(data)), *data)
135
136
    def unpack_array(self, data, length):
137
        if length == -1:
138
            return None
139
        if length == 0:
140
            return ()
141
        return struct.unpack(self._fmt.format(length), data.read(self.size * length))
142
143
144
class Primitives1(object):
145
    SByte = _Primitive1('<{:d}b')
146
    Int16 = _Primitive1('<{:d}h')
147
    Int32 = _Primitive1('<{:d}i')
148
    Int64 = _Primitive1('<{:d}q')
149
    Byte = _Primitive1('<{:d}B')
150
    Char = Byte
151
    UInt16 = _Primitive1('<{:d}H')
152
    UInt32 = _Primitive1('<{:d}I')
153
    UInt64 = _Primitive1('<{:d}Q')
154
    Boolean = _Primitive1('<{:d}?')
155
    Double = _Primitive1('<{:d}d')
156
    Float = _Primitive1('<{:d}f')
157
158
159
class Primitives(Primitives1):
160
    Null = _Null()
161
    String = _String()
162
    Bytes = _Bytes()
163
    ByteString = _Bytes()
164
    CharArray = _String()
165
    DateTime = _DateTime()
166
    Guid = _Guid()
167
168
169
def pack_uatype(vtype, value):
170
    if hasattr(Primitives, vtype.name):
171
        return getattr(Primitives, vtype.name).pack(value)
172
    elif vtype.value > 25:
173
        return Primitives.Bytes.pack(value)
174
    elif vtype == ua.VariantType.ExtensionObject:
175
        return extensionobject_to_binary(value)
176
    elif vtype in (ua.VariantType.NodeId, ua.VariantType.ExpandedNodeId):
177
        return nodeid_to_binary(value)
178
    elif vtype == ua.VariantType.Variant:
179
        return variant_to_binary(value)
180
    else:
181
        return struct_to_binary(value)
182
183
184
def unpack_uatype(vtype, data):
185
    if hasattr(Primitives, vtype.name):
186
        st = getattr(Primitives, vtype.name)
187
        return st.unpack(data)
188
    elif vtype.value > 25:
189
        return Primitives.Bytes.unpack(data)
190
    elif vtype == ua.VariantType.ExtensionObject:
191
        return extensionobject_from_binary(data)
192
    elif vtype in (ua.VariantType.NodeId, ua.VariantType.ExpandedNodeId):
193
        return nodeid_from_binary(data)
194
    elif vtype == ua.VariantType.Variant:
195
        return variant_from_binary(data)
196
    else:
197
        if hasattr(ua, vtype.name):
198
            cls = getattr(ua, vtype.name)
199
            return struct_from_binary(cls, data)
200
        else:
201
            raise UaError(f'Cannot unpack unknown variant type {vtype}')
202
203
204
def pack_uatype_array(vtype, array):
205
    if hasattr(Primitives1, vtype.name):
206
        data_type = getattr(Primitives1, vtype.name)
207
        return data_type.pack_array(array)
208
    if array is None:
209
        return b'\xff\xff\xff\xff'
210
    length = len(array)
211
    b = [pack_uatype(vtype, val) for val in array]
212
    b.insert(0, Primitives.Int32.pack(length))
213
    return b"".join(b)
214
215
216
def unpack_uatype_array(vtype, data):
217
    length = Primitives.Int32.unpack(data)
218
    if length == -1:
219
        return None
220
    elif hasattr(Primitives1, vtype.name):
221
        data_type = getattr(Primitives1, vtype.name)
222
        # Remark: works without tuple conversion to list.
223
        return list(data_type.unpack_array(data, length))
224
    else:
225
        # Revert to slow serial unpacking.
226
        return [unpack_uatype(vtype, data) for _ in range(length)]
227
228
229
def struct_to_binary(obj):
230
    packet = []
231
    has_switch = hasattr(obj, 'ua_switches')
232
    if has_switch:
233
        for name, switch in obj.ua_switches.items():
234
            member = getattr(obj, name)
235
            container_name, idx = switch
236
            if member is not None:
237
                container_val = getattr(obj, container_name)
238
                container_val = container_val | 1 << idx
239
                setattr(obj, container_name, container_val)
240
    for name, uatype in obj.ua_types:
241
        val = getattr(obj, name)
242
        if uatype.startswith('ListOf'):
243
            packet.append(list_to_binary(uatype[6:], val))
244
        else:
245
            if has_switch and val is None and name in obj.ua_switches:
246
                pass
247
            else:
248
                packet.append(to_binary(uatype, val))
249
    return b''.join(packet)
250
251
252
def to_binary(uatype, val):
253
    """
254
    Pack a python object to binary given a string defining its type
255
    """
256
    if uatype.startswith('ListOf'):
257
        # if isinstance(val, (list, tuple)):
258
        return list_to_binary(uatype[6:], val)
259
    elif type(uatype) is str and hasattr(ua.VariantType, uatype):
260
        vtype = getattr(ua.VariantType, uatype)
261
        return pack_uatype(vtype, val)
262
    elif type(uatype) is str and hasattr(Primitives, uatype):
263
        return getattr(Primitives, uatype).pack(val)
264
    elif isinstance(val, (IntEnum, Enum)):
265
        return Primitives.UInt32.pack(val.value)
266
    elif isinstance(val, ua.NodeId):
267
        return nodeid_to_binary(val)
268
    elif isinstance(val, ua.Variant):
269
        return variant_to_binary(val)
270
    elif hasattr(val, 'ua_types'):
271
        return struct_to_binary(val)
272
    else:
273
        raise UaError(f'No known way to pack {val} of type {uatype} to ua binary')
274
275
276
def list_to_binary(uatype, val):
277
    if val is None:
278
        return Primitives.Int32.pack(-1)
279
    if hasattr(Primitives1, uatype):
280
        data_type = getattr(Primitives1, uatype)
281
        return data_type.pack_array(val)
282
    data_size = Primitives.Int32.pack(len(val))
283
    pack = [to_binary(uatype, el) for el in val]
284
    pack.insert(0, data_size)
285
    return b''.join(pack)
286
287
288
def nodeid_to_binary(nodeid):
289
    if nodeid.NodeIdType == ua.NodeIdType.TwoByte:
290
        return struct.pack('<BB', nodeid.NodeIdType.value, nodeid.Identifier)
291
    elif nodeid.NodeIdType == ua.NodeIdType.FourByte:
292
        return struct.pack('<BBH', nodeid.NodeIdType.value, nodeid.NamespaceIndex, nodeid.Identifier)
293
    elif nodeid.NodeIdType == ua.NodeIdType.Numeric:
294
        data = struct.pack('<BHI', nodeid.NodeIdType.value, nodeid.NamespaceIndex, nodeid.Identifier)
295
    elif nodeid.NodeIdType == ua.NodeIdType.String:
296
        data = struct.pack('<BH', nodeid.NodeIdType.value, nodeid.NamespaceIndex) + \
297
            Primitives.String.pack(nodeid.Identifier)
298
    elif nodeid.NodeIdType == ua.NodeIdType.ByteString:
299
        data = struct.pack('<BH', nodeid.NodeIdType.value, nodeid.NamespaceIndex) + \
300
            Primitives.Bytes.pack(nodeid.Identifier)
301
    elif nodeid.NodeIdType == ua.NodeIdType.Guid:
302
        data = struct.pack('<BH', nodeid.NodeIdType.value, nodeid.NamespaceIndex) + \
303
               Primitives.Guid.pack(nodeid.Identifier)
304
    else:
305
        raise UaError(f'Unknown NodeIdType: {nodeid.NodeIdType} for NodeId: {nodeid}')
306
    # Add NamespaceUri and ServerIndex in case we have an ExpandedNodeId
307
    if nodeid.NamespaceUri:
308
        data = bytearray(data)
309
        data[0] = set_bit(data[0], 7)
310
        data.extend(Primitives.String.pack(nodeid.NamespaceUri))
311
    if nodeid.ServerIndex:
312
        if not isinstance(data, bytearray):
313
            data = bytearray(data)
314
        data[0] = set_bit(data[0], 6)
315
        data.extend(Primitives.UInt32.pack(nodeid.ServerIndex))
316
    return data
317
318
319
def nodeid_from_binary(data):
320
    nid = ua.NodeId()
321
    encoding = ord(data.read(1))
322
    nid.NodeIdType = ua.NodeIdType(encoding & 0b00111111)
323
324
    if nid.NodeIdType == ua.NodeIdType.TwoByte:
325
        nid.Identifier = ord(data.read(1))
326
    elif nid.NodeIdType == ua.NodeIdType.FourByte:
327
        nid.NamespaceIndex, nid.Identifier = struct.unpack("<BH", data.read(3))
328
    elif nid.NodeIdType == ua.NodeIdType.Numeric:
329
        nid.NamespaceIndex, nid.Identifier = struct.unpack("<HI", data.read(6))
330
    elif nid.NodeIdType == ua.NodeIdType.String:
331
        nid.NamespaceIndex = Primitives.UInt16.unpack(data)
332
        nid.Identifier = Primitives.String.unpack(data)
333
    elif nid.NodeIdType == ua.NodeIdType.ByteString:
334
        nid.NamespaceIndex = Primitives.UInt16.unpack(data)
335
        nid.Identifier = Primitives.Bytes.unpack(data)
336
    elif nid.NodeIdType == ua.NodeIdType.Guid:
337
        nid.NamespaceIndex = Primitives.UInt16.unpack(data)
338
        nid.Identifier = Primitives.Guid.unpack(data)
339
    else:
340
        raise UaError(f'Unknown NodeId encoding: {nid.NodeIdType}')
341
342
    if test_bit(encoding, 7):
343
        nid.NamespaceUri = Primitives.String.unpack(data)
344
    if test_bit(encoding, 6):
345
        nid.ServerIndex = Primitives.UInt32.unpack(data)
346
347
    return nid
348
349
350
def variant_to_binary(var):
351
    b = []
352
    encoding = var.VariantType.value & 0b111111
353
    if var.is_array or isinstance(var.Value, (list, tuple)):
354
        var.is_array = True
355
        encoding = set_bit(encoding, 7)
356
        if var.Dimensions is not None:
357
            encoding = set_bit(encoding, 6)
358
        b.append(Primitives.Byte.pack(encoding))
359
        b.append(pack_uatype_array(var.VariantType, ua.flatten(var.Value)))
360
        if var.Dimensions is not None:
361
            b.append(pack_uatype_array(ua.VariantType.Int32, var.Dimensions))
362
    else:
363
        b.append(Primitives.Byte.pack(encoding))
364
        b.append(pack_uatype(var.VariantType, var.Value))
365
366
    return b"".join(b)
367
368
369
def variant_from_binary(data):
370
    dimensions = None
371
    array = False
372
    encoding = ord(data.read(1))
373
    int_type = encoding & 0b00111111
374
    vtype = ua.datatype_to_varianttype(int_type)
375
    if test_bit(encoding, 7):
376
        value = unpack_uatype_array(vtype, data)
377
        array = True
378
    else:
379
        value = unpack_uatype(vtype, data)
380
    if test_bit(encoding, 6):
381
        dimensions = unpack_uatype_array(ua.VariantType.Int32, data)
382
        value = _reshape(value, dimensions)
383
    return ua.Variant(value, vtype, dimensions, is_array=array)
384
385
386
def _reshape(flat, dims):
387
    subdims = dims[1:]
388
    subsize = 1
389
    for i in subdims:
390
        if i == 0:
391
            i = 1
392
        subsize *= i
393
    while dims[0] * subsize > len(flat):
394
        flat.append([])
395
    if not subdims or subdims == [0]:
396
        return flat
397
    return [_reshape(flat[i:i + subsize], subdims) for i in range(0, len(flat), subsize)]
398
399
400
def extensionobject_from_binary(data):
401
    """
402
    Convert binary-coded ExtensionObject to a Python object.
403
    Returns an object, or None if TypeId is zero
404
    """
405
    typeid = nodeid_from_binary(data)
406
    encoding = ord(data.read(1))
407
    body = None
408
    if encoding & (1 << 0):
409
        length = Primitives.Int32.unpack(data)
410
        if length < 1:
411
            body = Buffer(b"")
412
        else:
413
            body = data.copy(length)
414
            data.skip(length)
415
    if typeid.Identifier == 0:
416
        return None
417
    elif typeid in ua.extension_objects_by_typeid:
418
        cls = ua.extension_objects_by_typeid[typeid]
419
        if body is None:
420
            raise UaError(f'parsing ExtensionObject {cls.__name__} without data')
421
        return from_binary(cls, body)
422
    else:
423
        e = ua.ExtensionObject()
424
        e.TypeId = typeid
425
        e.Encoding = encoding
426
        if body is not None:
427
            e.Body = body.read(len(body))
428
        return e
429
430
431
def extensionobject_to_binary(obj):
432
    """
433
    Convert Python object to binary-coded ExtensionObject.
434
    If obj is None, convert to empty ExtensionObject (TypeId=0, no Body).
435
    Returns a binary string
436
    """
437
    if isinstance(obj, ua.ExtensionObject):
438
        return struct_to_binary(obj)
439
    if obj is None:
440
        type_id = ua.NodeId()
441
        encoding = 0
442
        body = None
443
    else:
444
        type_id = ua.extension_object_typeids[obj.__class__.__name__]
445
        encoding = 0x01
446
        body = struct_to_binary(obj)
447
    packet = [
448
        nodeid_to_binary(type_id),
449
        Primitives.Byte.pack(encoding),
450
    ]
451
    if body:
452
        packet.append(Primitives.Bytes.pack(body))
453
    return b''.join(packet)
454
455
456
def from_binary(uatype, data):
457
    """
458
    unpack data given an uatype as a string or a python class having a ua_types member
459
    """
460
    if isinstance(uatype, str) and uatype.startswith("ListOf"):
461
        utype = uatype[6:]
462
        if hasattr(ua.VariantType, utype):
463
            vtype = getattr(ua.VariantType, utype)
464
            return unpack_uatype_array(vtype, data)
465
        size = Primitives.Int32.unpack(data)
466
        return [from_binary(utype, data) for _ in range(size)]
467
    elif isinstance(uatype, str) and hasattr(ua.VariantType, uatype):
468
        vtype = getattr(ua.VariantType, uatype)
469
        return unpack_uatype(vtype, data)
470
    elif isinstance(uatype, str) and hasattr(Primitives, uatype):
471
        return getattr(Primitives, uatype).unpack(data)
472
    else:
473
        return struct_from_binary(uatype, data)
474
475
476
def struct_from_binary(objtype, data):
477
    """
478
    unpack an ua struct. Arguments are an objtype as Python class or string
479
    """
480
    if isinstance(objtype, str):
481
        objtype = getattr(ua, objtype)
482
    if issubclass(objtype, Enum):
483
        return objtype(Primitives.UInt32.unpack(data))
484
    obj = objtype()
485
    for name, uatype in obj.ua_types:
486
        # if our member has a switch and it is not set we skip it
487
        if hasattr(obj, "ua_switches") and name in obj.ua_switches:
488
            container_name, idx = obj.ua_switches[name]
489
            val = getattr(obj, container_name)
490
            if not test_bit(val, idx):
491
                continue
492
        val = from_binary(uatype, data)
493
        setattr(obj, name, val)
494
    return obj
495
496
497
def header_to_binary(hdr):
498
    b = [struct.pack("<3ss", hdr.MessageType, hdr.ChunkType)]
499
    size = hdr.body_size + 8
500
    if hdr.MessageType in (ua.MessageType.SecureOpen, ua.MessageType.SecureClose, ua.MessageType.SecureMessage):
501
        size += 4
502
    b.append(Primitives.UInt32.pack(size))
503
    if hdr.MessageType in (ua.MessageType.SecureOpen, ua.MessageType.SecureClose, ua.MessageType.SecureMessage):
504
        b.append(Primitives.UInt32.pack(hdr.ChannelId))
505
    return b"".join(b)
506
507
508
def header_from_binary(data):
509
    hdr = ua.Header()
510
    hdr.MessageType, hdr.ChunkType, hdr.packet_size = struct.unpack("<3scI", data.read(8))
511
    hdr.body_size = hdr.packet_size - 8
512
    if hdr.MessageType in (ua.MessageType.SecureOpen, ua.MessageType.SecureClose, ua.MessageType.SecureMessage):
513
        hdr.body_size -= 4
514
        hdr.ChannelId = Primitives.UInt32.unpack(data)
515
        hdr.header_size = 12
516
    return hdr
517
518
519
def uatcp_to_binary(message_type, message):
520
    """
521
    Convert OPC UA TCP message (see OPC UA specs Part 6, 7.1) to binary.
522
    The only supported types are Hello, Acknowledge and ErrorMessage
523
    """
524
    header = ua.Header(message_type, ua.ChunkType.Single)
525
    binmsg = struct_to_binary(message)
526
    header.body_size = len(binmsg)
527
    return header_to_binary(header) + binmsg
528