Completed
Pull Request — master (#490)
by Olivier
10:23
created

header_to_binary()   A

Complexity

Conditions 3

Size

Total Lines 10

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 12

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 3
dl 0
loc 10
ccs 0
cts 0
cp 0
crap 12
rs 9.4285
c 1
b 0
f 0
1
"""
2
Binary protocol specific functions and constants
3
"""
4
5 1
import sys
6 1
import struct
7 1
import logging
8 1
import uuid
9 1
from enum import IntEnum, Enum
10 1
11
from opcua.ua.uaerrors import UaError
12 1
from opcua.common.utils import Buffer
13
from opcua import ua
14
15 1
if sys.version_info.major > 2:
16
    unicode = str
17
18 1
logger = logging.getLogger('__name__')
19
20 1
21 1
def test_bit(data, offset):
22 1
    mask = 1 << offset
23
    return data & mask
24
25 1
26 1
def set_bit(data, offset):
27 1
    mask = 1 << offset
28
    return data | mask
29
30 1
31 1
def unset_bit(data, offset):
32 1
    mask = 1 << offset
33
    return data & ~mask
34
35 1
36 1
class _DateTime(object):
37 1
    @staticmethod
38
    def pack(dt):
39
        epch = ua.datetime_to_win_epoch(dt)
40 1
        return Primitives.Int64.pack(epch)
41
42
    @staticmethod
43
    def unpack(data):
44
        epch = Primitives.Int64.unpack(data)
45 1
        return ua.win_epoch_to_datetime(epch)
46
47
48 1
class _String(object):
49
    @staticmethod
50
    def pack(string):
51 1
        if string is None:
52 1
            return Primitives.Int32.pack(-1)
53
        if isinstance(string, unicode):
54
            string = string.encode('utf-8')
55
        length = len(string)
56 1
        return Primitives.Int32.pack(length) + string
57 1
58 1
    @staticmethod
59 1
    def unpack(data):
60 1
        b = _Bytes.unpack(data)
61
        if sys.version_info.major < 3:
62
            return b
63 1
        else:
64 1
            if b is None:
65 1
                return b
66
            return b.decode("utf-8")
67
68
69
class _Bytes(object):
70
    @staticmethod
71
    def pack(data):
72 1
        return _String.pack(data)
73
74
    @staticmethod
75
    def unpack(data):
76 1
        length = Primitives.Int32.unpack(data)
77
        if length == -1:
78
            return None
79
        return data.read(length)
80 1
81 1
82
class _Null(object):
83
    @staticmethod
84
    def pack(data):
85
        return b""
86 1
87
    @staticmethod
88 1
    def unpack(data):
89 1
        return None
90
91 1
92 1
class _Guid(object):
93 1
    @staticmethod
94 1
    def pack(guid):
95
        # convert python UUID 6 field format to OPC UA 4 field format
96 1
        f1 = Primitives.UInt32.pack(guid.time_low)
97 1
        f2 = Primitives.UInt16.pack(guid.time_mid)
98 1
        f3 = Primitives.UInt16.pack(guid.time_hi_version)
99 1
        f4a = Primitives.Byte.pack(guid.clock_seq_hi_variant)
100 1
        f4b = Primitives.Byte.pack(guid.clock_seq_low)
101 1
        f4c = struct.pack('>Q', guid.node)[2:8]  # no primitive .pack available for 6 byte int
102
        f4 = f4a + f4b + f4c
103 1
        # concat byte fields
104
        b = f1 + f2 + f3 + f4
105
106 1
        return b
107
108 1
    @staticmethod
109
    def unpack(data):
110 1
        # convert OPC UA 4 field format to python UUID bytes
111 1
        f1 = struct.pack('>I', Primitives.UInt32.unpack(data))
112
        f2 = struct.pack('>H', Primitives.UInt16.unpack(data))
113 1
        f3 = struct.pack('>H', Primitives.UInt16.unpack(data))
114
        f4 = data.read(8)
115 1
        # concat byte fields
116 1
        b = f1 + f2 + f3 + f4
117
118
        return uuid.UUID(bytes=b)
119 1
120
121 1
class _Primitive1(object):
122
    def __init__(self, fmt):
123 1
        self.struct = struct.Struct(fmt)
124 1
        self.size = self.struct.size
125 1
        self.format = self.struct.format
126
127 1
    def pack(self, data):
128 1
        return struct.pack(self.format, data)
129
130 1
    def unpack(self, data):
131
        return struct.unpack(self.format, data.read(self.size))[0]
132 1
133 1
134 1
class Primitives1(object):
135
    SByte = _Primitive1("<b")
136
    Int16 = _Primitive1("<h")
137
    Int32 = _Primitive1("<i")
138
    Int64 = _Primitive1("<q")
139
    Byte = _Primitive1("<B")
140
    Char = Byte
141 1
    UInt16 = _Primitive1("<H")
142
    UInt32 = _Primitive1("<I")
143 1
    UInt64 = _Primitive1("<Q")
144
    Boolean = _Primitive1("<?")
145 1
    Double = _Primitive1("<d")
146
    Float = _Primitive1("<f")
147 1
148
149 1
class Primitives(Primitives1):
150 1
    Null = _Null()
151 1
    String = _String()
152 1
    Bytes = _Bytes()
153
    ByteString = _Bytes()
154
    CharArray = _String()
155 1
    DateTime = _DateTime()
156
    Guid = _Guid()
157 1
158
159 1
def pack_uatype(vtype, value):
160
    if hasattr(Primitives, vtype.name):
161 1
        return getattr(Primitives, vtype.name).pack(value)
162
    elif vtype.value > 25:
163 1
        return Primitives.Bytes.pack(value)
164
    elif vtype == ua.VariantType.ExtensionObject:
165
        return extensionobject_to_binary(value)
166 1
    elif vtype in (ua.VariantType.NodeId, ua.VariantType.ExpandedNodeId):
167
        return nodeid_to_binary(value)
168 1
    elif vtype == ua.VariantType.Variant:
169
        return variant_to_binary(value)
170
    else:
171 1
        return struct_to_binary(value)
172 1
173 1
174 1
def unpack_uatype(vtype, data):
175 1
    if hasattr(Primitives, vtype.name):
176 1
        st = getattr(Primitives, vtype.name)
177 1
        return st.unpack(data)
178
    elif vtype.value > 25:
179 1
        return Primitives.Bytes.unpack(data)
180
    elif vtype == ua.VariantType.ExtensionObject:
181 1
        return extensionobject_from_binary(data)
182
    elif vtype in (ua.VariantType.NodeId, ua.VariantType.ExpandedNodeId):
183 1
        return nodeid_from_binary(data)
184
    elif vtype == ua.VariantType.Variant:
185
        return variant_from_binary(data)
186 1
    else:
187 1
        if hasattr(ua, vtype.name):
188 1
            klass = getattr(ua, vtype.name)
189 1
            return struct_from_binary(klass, data)
190
        else:
191 1
            raise UaError("Cannot unpack unknown variant type {0!s}".format(vtype))
192
193 1
194
def pack_uatype_array(vtype, array):
195
    if array is None:
196 1
        return b'\xff\xff\xff\xff'
197 1
    length = len(array)
198 1
    b = [pack_uatype(vtype, val) for val in array]
199 1
    b.insert(0, Primitives.Int32.pack(length))
200 1
    return b"".join(b)
201
202 1
203 1
def unpack_uatype_array(vtype, data):
204
    length = Primitives.Int32.unpack(data)
205 1
    if length == -1:
206 1
        return None
207
    return [unpack_uatype(vtype, data) for _ in range(length)]
208
209
210
def struct_to_binary(obj):
211
    packet = []
212
    has_switch = hasattr(obj, "ua_switches")
213
    if has_switch:
214
        for name, switch in obj.ua_switches.items():
215
            member = getattr(obj, name)
216
            container_name, idx = switch
217
            if member is not None:
218
                container_val = getattr(obj, container_name)
219
                container_val = container_val | 1 << idx
220
                setattr(obj, container_name, container_val)
221
    for name, uatype in obj.ua_types:
222 1
        val = getattr(obj, name)
223 1
        if uatype.startswith("ListOf"):
224 1
            packet.append(list_to_binary(uatype[6:], val))
225 1
        else:
226 1
            if has_switch and val is None and name in obj.ua_switches:
227 1
                pass
228 1
            else:
229 1
                packet.append(to_binary(uatype, val))
230 1
    return b''.join(packet)
231 1
232 1
233 1
def to_binary(uatype, val):
234 1
    """
235 1
    Pack a python object to binary given a string defining its type
236 1
    """
237
    if uatype.startswith("ListOf"):
238
    #if isinstance(val, (list, tuple)):
239 1
        return list_to_binary(uatype[6:], val)
240 1
    elif isinstance(uatype, (str, unicode)) and hasattr(ua.VariantType, uatype):
241 1
        vtype = getattr(ua.VariantType, uatype)
242 1
        return pack_uatype(vtype, val)
243 1
    elif isinstance(val, (IntEnum, Enum)):
244 1
        return Primitives.UInt32.pack(val.value)
245 1
    elif isinstance(val, ua.NodeId):
246 1
        return nodeid_to_binary(val)
247
    elif isinstance(val, ua.Variant):
248
        return variant_to_binary(val)
249 1
    elif hasattr(val, "ua_types"):
250 1
        return struct_to_binary(val)
251 1
    else:
252 1
        raise UaError("No known way to pack {} of type {} to ua binary".format(val, uatype))
253 1
254 1
255 1
def list_to_binary(uatype, val):
256
    if val is None:
257
        return Primitives.Int32.pack(-1)
258 1
    pack = []
259 1
    pack.append(Primitives.Int32.pack(len(val)))
260 1
    for el in val:
261 1
        pack.append(to_binary(uatype, el))
262 1
    return b''.join(pack)
263 1
264
265
def nodeid_to_binary(nodeid):
266
    data = None
267 1
    if nodeid.NodeIdType == ua.NodeIdType.TwoByte:
268 1
        data = struct.pack("<BB", nodeid.NodeIdType.value, nodeid.Identifier)
269
    elif nodeid.NodeIdType == ua.NodeIdType.FourByte:
270 1
        data = struct.pack("<BBH", nodeid.NodeIdType.value, nodeid.NamespaceIndex, nodeid.Identifier)
271 1
    elif nodeid.NodeIdType == ua.NodeIdType.Numeric:
272 1
        data = struct.pack("<BHI", nodeid.NodeIdType.value, nodeid.NamespaceIndex, nodeid.Identifier)
273 1
    elif nodeid.NodeIdType == ua.NodeIdType.String:
274
        data = struct.pack("<BH", nodeid.NodeIdType.value, nodeid.NamespaceIndex) + \
275
            Primitives.String.pack(nodeid.Identifier)
276 1
    elif nodeid.NodeIdType == ua.NodeIdType.ByteString:
277 1
        data = struct.pack("<BH", nodeid.NodeIdType.value, nodeid.NamespaceIndex) + \
278 1
            Primitives.Bytes.pack(nodeid.Identifier)
279 1
    elif nodeid.NodeIdType == ua.NodeIdType.Guid:
280 1
        data = struct.pack("<BH", nodeid.NodeIdType.value, nodeid.NamespaceIndex) + \
281 1
               Primitives.Guid.pack(nodeid.Identifier)
282 1
    else:
283
        raise UaError("Unknown NodeIdType: {} for NodeId: {}".format(nodeid.NodeIdType, nodeid))
284
    # Add NamespaceUri and ServerIndex in case we have an ExpandedNodeId
285
    if nodeid.NamespaceUri:
286 1
        data = bytearray(data)
287 1
        data[0] = set_bit(data[0], 7)
288
        data.extend(Primitives.String.pack(nodeid.NamespaceUri))
289 1
    if nodeid.ServerIndex:
290 1
        if not isinstance(data, bytearray):
291 1
            data = bytearray(data)
292 1
        data[0] = set_bit(data[0], 6)
293
        data.extend(Primitives.UInt32.pack(nodeid.ServerIndex))
294
    return data
295
296
297 1
def nodeid_from_binary(data):
298 1
    nid = ua.NodeId()
299 1
    encoding = ord(data.read(1))
300 1
    nid.NodeIdType = ua.NodeIdType(encoding & 0b00111111)
301
302 1
    if nid.NodeIdType == ua.NodeIdType.TwoByte:
303 1
        nid.Identifier = ord(data.read(1))
304
    elif nid.NodeIdType == ua.NodeIdType.FourByte:
305
        nid.NamespaceIndex, nid.Identifier = struct.unpack("<BH", data.read(3))
306 1
    elif nid.NodeIdType == ua.NodeIdType.Numeric:
307
        nid.NamespaceIndex, nid.Identifier = struct.unpack("<HI", data.read(6))
308
    elif nid.NodeIdType == ua.NodeIdType.String:
309
        nid.NamespaceIndex = Primitives.UInt16.unpack(data)
310
        nid.Identifier = Primitives.String.unpack(data)
311
    elif nid.NodeIdType == ua.NodeIdType.ByteString:
312
        nid.NamespaceIndex = Primitives.UInt16.unpack(data)
313
        nid.Identifier = Primitives.Bytes.unpack(data)
314
    elif nid.NodeIdType == ua.NodeIdType.Guid:
315
        nid.NamespaceIndex = Primitives.UInt16.unpack(data)
316
        nid.Identifier = Primitives.Guid.unpack(data)
317
    else:
318
        raise UaError("Unknown NodeId encoding: " + str(nid.NodeIdType))
319
320
    if test_bit(encoding, 7):
321
        nid.NamespaceUri = Primitives.String.unpack(data)
322
    if test_bit(encoding, 6):
323
        nid.ServerIndex = Primitives.UInt32.unpack(data)
324
325
    return nid
326
327
328
def variant_to_binary(var):
329
    b = []
330
    encoding = var.VariantType.value & 0b111111
331
    if var.is_array or isinstance(var.Value, (list, tuple)):
332
        var.is_array = True
333
        encoding = set_bit(encoding, 7)
334
        if var.Dimensions is not None:
335
            encoding = set_bit(encoding, 6)
336
        b.append(Primitives.Byte.pack(encoding))
337
        b.append(pack_uatype_array(var.VariantType, ua.flatten(var.Value)))
338
        if var.Dimensions is not None:
339
            b.append(pack_uatype_array(ua.VariantType.Int32, var.Dimensions))
340
    else:
341
        b.append(Primitives.Byte.pack(encoding))
342
        b.append(pack_uatype(var.VariantType, var.Value))
343
344
    return b"".join(b)
345
346
347
def variant_from_binary(data):
348
    dimensions = None
349
    array = False
350
    encoding = ord(data.read(1))
351
    int_type = encoding & 0b00111111
352
    vtype = ua.datatype_to_varianttype(int_type)
353
    if test_bit(encoding, 7):
354
        value = unpack_uatype_array(vtype, data)
355
        array = True
356
    else:
357
        value = unpack_uatype(vtype, data)
358
    if test_bit(encoding, 6):
359
        dimensions = unpack_uatype_array(ua.VariantType.Int32, data)
360
        value = _reshape(value, dimensions)
361
    return ua.Variant(value, vtype, dimensions, is_array=array)
362
363
364
def _reshape(flat, dims):
365
    subdims = dims[1:]
366
    subsize = 1
367
    for i in subdims:
368
        if i == 0:
369
            i = 1
370
        subsize *= i
371
    while dims[0] * subsize > len(flat):
372
        flat.append([])
373
    if not subdims or subdims == [0]:
374
        return flat
375
    return [_reshape(flat[i:i + subsize], subdims) for i in range(0, len(flat), subsize)]
376
377
378
def extensionobject_from_binary(data):
379
    """
380
    Convert binary-coded ExtensionObject to a Python object.
381
    Returns an object, or None if TypeId is zero
382
    """
383
    typeid = nodeid_from_binary(data)
384
    Encoding = ord(data.read(1))
385
    body = None
386
    if Encoding & (1 << 0):
387
        length = Primitives.Int32.unpack(data)
388
        if length < 1:
389
            body = Buffer(b"")
390
        else:
391
            body = data.copy(length)
392
            data.skip(length)
393
    if typeid.Identifier == 0:
394
        return None
395
    elif typeid in ua.extension_object_classes:
396
        klass = ua.extension_object_classes[typeid]
397
        if body is None:
398
            raise UaError("parsing ExtensionObject {0} without data".format(klass.__name__))
399
        return from_binary(klass, body)
400
    else:
401
        e = ua.ExtensionObject()
402
        e.TypeId = typeid
403
        e.Encoding = Encoding
404
        if body is not None:
405
            e.Body = body.read(len(body))
406
        return e
407
408
409
def extensionobject_to_binary(obj):
410
    """
411
    Convert Python object to binary-coded ExtensionObject.
412
    If obj is None, convert to empty ExtensionObject (TypeId=0, no Body).
413
    Returns a binary string
414
    """
415
    if isinstance(obj, ua.ExtensionObject):
416
        return struct_to_binary(obj)
417
    if obj is None:
418
        TypeId = ua.NodeId()
419
        Encoding = 0
420
        Body = None
421
    else:
422
        TypeId = ua.extension_object_ids[obj.__class__.__name__]
423
        Encoding = 0x01
424
        Body = struct_to_binary(obj)
425
    packet = []
426
    packet.append(nodeid_to_binary(TypeId))
427
    packet.append(Primitives.Byte.pack(Encoding))
428
    if Body:
429
        packet.append(Primitives.Bytes.pack(Body))
430
    return b''.join(packet)
431
432
433
def from_binary(uatype, data):
434
    """
435
    unpack data given an uatype as a string or a python class having a ua_types memeber
436
    """
437
    if isinstance(uatype, (str, unicode)) and uatype.startswith("ListOf"):
438
        size = Primitives.Int32.unpack(data)
439
        if size == -1:
440
            return None
441
        res = []
442
        for _ in range(size):
443
            res.append(from_binary(uatype[6:], data))
444
        return res
445
    elif isinstance(uatype, (str, unicode)) and hasattr(ua.VariantType, uatype):
446
        vtype = getattr(ua.VariantType, uatype)
447
        return unpack_uatype(vtype, data)
448
    else:
449
        return struct_from_binary(uatype, data)
450
451
452
def struct_from_binary(objtype, data):
453
    """
454
    unpack an ua struct. Arguments are an objtype as Python class or string
455
    """
456
    if isinstance(objtype, (unicode, str)):
457
        objtype = getattr(ua, objtype)
458
    if issubclass(objtype, Enum):
459
        return objtype(Primitives.UInt32.unpack(data))
460
    obj = objtype()
461
    for name, uatype in obj.ua_types:
462
        # if our member has a swtich and it is not set we skip it
463
        if hasattr(obj, "ua_switches") and name in obj.ua_switches:
464
            container_name, idx = obj.ua_switches[name]
465
            val = getattr(obj, container_name)
466
            if not test_bit(val, idx):
467
                continue
468
        val = from_binary(uatype, data)
469
        setattr(obj, name, val)
470
    return obj
471
472
473
def header_to_binary(hdr):
474
    b = []
475
    b.append(struct.pack("<3ss", hdr.MessageType, hdr.ChunkType))
476
    size = hdr.body_size + 8
477
    if hdr.MessageType in (ua.MessageType.SecureOpen, ua.MessageType.SecureClose, ua.MessageType.SecureMessage):
478
        size += 4
479
    b.append(Primitives.UInt32.pack(size))
480
    if hdr.MessageType in (ua.MessageType.SecureOpen, ua.MessageType.SecureClose, ua.MessageType.SecureMessage):
481
        b.append(Primitives.UInt32.pack(hdr.ChannelId))
482
    return b"".join(b)
483
484
485
def header_from_binary(data):
486
    hdr = ua.Header()
487
    hdr.MessageType, hdr.ChunkType, hdr.packet_size = struct.unpack("<3scI", data.read(8))
488
    hdr.body_size = hdr.packet_size - 8
489
    if hdr.MessageType in (ua.MessageType.SecureOpen, ua.MessageType.SecureClose, ua.MessageType.SecureMessage):
490
        hdr.body_size -= 4
491
        hdr.ChannelId = Primitives.UInt32.unpack(data)
492
    return hdr
493
494
495
def uatcp_to_binary(message_type, message):
496
    """
497
    Convert OPC UA TCP message (see OPC UA specs Part 6, 7.1) to binary.
498
    The only supported types are Hello, Acknowledge and ErrorMessage
499
    """
500
    header = ua.Header(message_type, ua.ChunkType.Single)
501
    binmsg = struct_to_binary(message)
502
    header.body_size = len(binmsg)
503
    return header_to_binary(header) + binmsg
504