Completed
Pull Request — master (#490)
by Olivier
04:31
created

nodeid_from_binary()   F

Complexity

Conditions 9

Size

Total Lines 29

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 22
CRAP Score 9.1399

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 9
c 1
b 0
f 0
dl 0
loc 29
ccs 22
cts 25
cp 0.88
crap 9.1399
rs 3
1
"""
2
Binary protocol specific functions and constants
3
"""
4
5 1
import sys
6 1
import struct
7 1
import logging
8 1
import uuid
9 1
from enum import IntEnum, Enum
10
11 1
from opcua.ua.uaerrors import UaError
12 1
from opcua.common.utils import Buffer
13 1
from opcua import ua
14
15 1
if sys.version_info.major > 2:
16
    unicode = str
17
18 1
logger = logging.getLogger('__name__')
19
20
21 1
def test_bit(data, offset):
22 1
    mask = 1 << offset
23 1
    return data & mask
24
25
26 1
def set_bit(data, offset):
27 1
    mask = 1 << offset
28 1
    return data | mask
29
30
31 1
def unset_bit(data, offset):
32 1
    mask = 1 << offset
33 1
    return data & ~mask
34
35
36 1
def build_array_format_py2(prefix, length, fmtchar):
37
    return prefix + str(length) + fmtchar
38
39
40 1
def build_array_format_py3(prefix, length, fmtchar):
41
    return prefix + str(length) + chr(fmtchar)
42
43
44 1
if sys.version_info.major < 3:
45 1
    build_array_format = build_array_format_py2
46
else:
47
    build_array_format = build_array_format_py3
48
49
50 1
class _Primitive(object):
51 1
    def pack_array(self, array):
52
        if array is None:
53
            return b'\xff\xff\xff\xff'
54
        length = len(array)
55
        b = [self.pack(val) for val in array]
56
        b.insert(0, Primitives.Int32.pack(length))
57
        return b"".join(b)
58
59 1
    def unpack_array(self, data):
60 1
        length = Primitives.Int32.unpack(data)
61 1
        if length == -1:
62 1
            return None
63 1
        elif length == 0:
64 1
            return []
65
        else:
66 1
            return [self.unpack(data) for _ in range(length)]
67
68
69 1
class _DateTime(_Primitive):
70 1
    @staticmethod
71
    def pack(dt):
72 1
        epch = ua.datetime_to_win_epoch(dt)
73 1
        return Primitives.Int64.pack(epch)
74
75 1
    @staticmethod
76
    def unpack(data):
77 1
        epch = Primitives.Int64.unpack(data)
78 1
        return ua.win_epoch_to_datetime(epch)
79
80
81 1
class _String(_Primitive):
82 1
    @staticmethod
83
    def pack(string):
84 1
        if string is None:
85 1
            return Primitives.Int32.pack(-1)
86 1
        if isinstance(string, unicode):
87
            string = string.encode('utf-8')
88 1
        length = len(string)
89 1
        return Primitives.Int32.pack(length) + string
90
91 1
    @staticmethod
92
    def unpack(data):
93 1
        b = _Bytes.unpack(data)
94 1
        if sys.version_info.major < 3:
95 1
            return b
96
        else:
97
            if b is None:
98
                return b
99
            return b.decode("utf-8")
100
101
102 1
class _Bytes(_Primitive):
103 1
    @staticmethod
104
    def pack(data):
105 1
        return _String.pack(data)
106
107 1
    @staticmethod
108
    def unpack(data):
109 1
        length = Primitives.Int32.unpack(data)
110 1
        if length == -1:
111 1
            return None
112 1
        return data.read(length)
113
114
115 1
class _Null(_Primitive):
116 1
    @staticmethod
117
    def pack(data):
118 1
        return b""
119
120 1
    @staticmethod
121
    def unpack(data):
122 1
        return None
123
124
125 1
class _Guid(_Primitive):
126 1
    @staticmethod
127
    def pack(guid):
128
        # convert python UUID 6 field format to OPC UA 4 field format
129 1
        f1 = Primitives.UInt32.pack(guid.time_low)
130 1
        f2 = Primitives.UInt16.pack(guid.time_mid)
131 1
        f3 = Primitives.UInt16.pack(guid.time_hi_version)
132 1
        f4a = Primitives.Byte.pack(guid.clock_seq_hi_variant)
133 1
        f4b = Primitives.Byte.pack(guid.clock_seq_low)
134 1
        f4c = struct.pack('>Q', guid.node)[2:8]  # no primitive .pack available for 6 byte int
135 1
        f4 = f4a + f4b + f4c
136
        # concat byte fields
137 1
        b = f1 + f2 + f3 + f4
138
139 1
        return b
140
141 1
    @staticmethod
142
    def unpack(data):
143
        # convert OPC UA 4 field format to python UUID bytes
144 1
        f1 = struct.pack('>I', Primitives.UInt32.unpack(data))
145 1
        f2 = struct.pack('>H', Primitives.UInt16.unpack(data))
146 1
        f3 = struct.pack('>H', Primitives.UInt16.unpack(data))
147 1
        f4 = data.read(8)
148
        # concat byte fields
149 1
        b = f1 + f2 + f3 + f4
150
151 1
        return uuid.UUID(bytes=b)
152
153
154 1
class _Primitive1(_Primitive):
155 1
    def __init__(self, fmt):
156 1
        self.struct = struct.Struct(fmt)
157 1
        self.size = self.struct.size
158 1
        self.format = self.struct.format
159
160 1
    def pack(self, data):
161 1
        return struct.pack(self.format, data)
162
163 1
    def unpack(self, data):
164 1
        return struct.unpack(self.format, data.read(self.size))[0]
165
166
167 1
class Primitives1(object):
168 1
    SByte = _Primitive1("<b")
169 1
    Int16 = _Primitive1("<h")
170 1
    Int32 = _Primitive1("<i")
171 1
    Int64 = _Primitive1("<q")
172 1
    Byte = _Primitive1("<B")
173 1
    Char = Byte
174 1
    UInt16 = _Primitive1("<H")
175 1
    UInt32 = _Primitive1("<I")
176 1
    UInt64 = _Primitive1("<Q")
177 1
    Boolean = _Primitive1("<?")
178 1
    Double = _Primitive1("<d")
179 1
    Float = _Primitive1("<f")
180
181
182 1
class Primitives(Primitives1):
183 1
    Null = _Null()
184 1
    String = _String()
185 1
    Bytes = _Bytes()
186 1
    ByteString = _Bytes()
187 1
    CharArray = _String()
188 1
    DateTime = _DateTime()
189 1
    Guid = _Guid()
190
191
192 1
def pack_uatype_array(vtype, array):
193 1
    if array is None:
194 1
        return b'\xff\xff\xff\xff'
195 1
    length = len(array)
196 1
    b = [pack_uatype(vtype, val) for val in array]
197 1
    b.insert(0, Primitives.Int32.pack(length))
198 1
    return b"".join(b)
199
200
201 1
def pack_uatype(vtype, value):
202 1
    if hasattr(Primitives, vtype.name):
203 1
        return getattr(Primitives, vtype.name).pack(value)
204 1
    elif vtype.value > 25:
205 1
        return Primitives.Bytes.pack(value)
206 1
    elif vtype == ua.VariantType.ExtensionObject:
207 1
        return extensionobject_to_binary(value)
208 1
    elif vtype in (ua.VariantType.NodeId, ua.VariantType.ExpandedNodeId):
209 1
        return nodeid_to_binary(value)
210 1
    elif vtype == ua.VariantType.Variant:
211 1
        return variant_to_binary(value)
212
    else:
213 1
        return struct_to_binary(value)
214
215
216 1
def unpack_uatype(vtype, data):
217 1
    if hasattr(Primitives, vtype.name):
218 1
        st = getattr(Primitives, vtype.name)
219 1
        return st.unpack(data)
220 1
    elif vtype.value > 25:
221 1
        return Primitives.Bytes.unpack(data)
222 1
    elif vtype == ua.VariantType.ExtensionObject:
223 1
        return extensionobject_from_binary(data)
224 1
    elif vtype in (ua.VariantType.NodeId, ua.VariantType.ExpandedNodeId):
225 1
        return nodeid_from_binary(data)
226 1
    elif vtype == ua.VariantType.Variant:
227 1
        return variant_from_binary(data)
228
    else:
229 1
        if hasattr(ua, vtype.name):
230 1
            klass = getattr(ua, vtype.name)
231 1
            return struct_from_binary(klass, data)
232
        else:
233
            raise UaError("Cannot unpack unknown variant type {0!s}".format(vtype))
234
235
236 1
def unpack_uatype_array(vtype, data):
237 1
    if hasattr(Primitives, vtype.name):
238 1
        st = getattr(Primitives, vtype.name)
239 1
        return st.unpack_array(data)
240
    else:
241 1
        length = Primitives.Int32.unpack(data)
242 1
        if length == -1:
243
            return None
244
        else:
245 1
            return [unpack_uatype(vtype, data) for _ in range(length)]
246
247
248 1
def struct_to_binary(obj):
249 1
    packet = []
250 1
    has_switch = hasattr(obj, "ua_switches")
251 1
    if has_switch:
252 1
        for name, switch in obj.ua_switches.items():
253 1
            member = getattr(obj, name)
254 1
            container_name, idx = switch
255 1
            if member is not None:
256 1
                container_val = getattr(obj, container_name)
257 1
                container_val = container_val | 1 << idx
258 1
                setattr(obj, container_name, container_val)
259 1
    for name, uatype in obj.ua_types:
260 1
        val = getattr(obj, name)
261 1
        if uatype.startswith("ListOf"):
262 1
            packet.append(list_to_binary(uatype[6:], val))
263
        else:
264 1
            if has_switch and val is None and name in obj.ua_switches:
265 1
                pass
266
            else:
267 1
                packet.append(to_binary(uatype, val))
268 1
    return b''.join(packet)
269
270
271 1
def to_binary(uatype, val):
272
    """
273
    Pack a python object to binary given a string defining its type
274
    """
275 1
    if isinstance(val, (list, tuple)):
276
        length = len(val)
277
        b = [to_binary(uatype, el) for el in val]
278
        b.insert(0, Primitives.Int32.pack(length))
279
        return b"".join(b)
280 1
    elif isinstance(uatype, (str, unicode)) and hasattr(ua.VariantType, uatype):
281 1
        vtype = getattr(ua.VariantType, uatype)
282 1
        return pack_uatype(vtype, val)
283 1
    elif isinstance(val, (IntEnum, Enum)):
284 1
        return Primitives.UInt32.pack(val.value)
285 1
    elif isinstance(val, ua.NodeId):
286
        return nodeid_to_binary(val)
287 1
    elif isinstance(val, ua.Variant):
288
        return variant_to_binary(val)
289 1
    elif hasattr(val, "ua_types"):
290 1
        return struct_to_binary(val)
291
    else:
292
        raise UaError("No known way to pack {} of type {} to ua binary".format(val, uatype))
293
294
295 1
def list_to_binary(uatype, val):
296 1
    if val is None:
297
        return Primitives.Int32.pack(-1)
298
    else:
299 1
        pack = []
300 1
        pack.append(Primitives.Int32.pack(len(val)))
301 1
        for el in val:
302 1
            pack.append(to_binary(uatype, el))
303 1
        return b''.join(pack)
304
305
306 1
def nodeid_to_binary(nodeid):
307 1
    if nodeid.NodeIdType == ua.NodeIdType.TwoByte:
308 1
        return struct.pack("<BB", nodeid.NodeIdType.value, nodeid.Identifier)
309 1
    elif nodeid.NodeIdType == ua.NodeIdType.FourByte:
310 1
        return struct.pack("<BBH", nodeid.NodeIdType.value, nodeid.NamespaceIndex, nodeid.Identifier)
311 1
    elif nodeid.NodeIdType == ua.NodeIdType.Numeric:
312 1
        return struct.pack("<BHI", nodeid.NodeIdType.value, nodeid.NamespaceIndex, nodeid.Identifier)
313 1
    elif nodeid.NodeIdType == ua.NodeIdType.String:
314 1
        return struct.pack("<BH", nodeid.NodeIdType.value, nodeid.NamespaceIndex) + \
315
            Primitives.String.pack(nodeid.Identifier)
316 1
    elif nodeid.NodeIdType == ua.NodeIdType.ByteString:
317 1
        return struct.pack("<BH", nodeid.NodeIdType.value, nodeid.NamespaceIndex) + \
318
            Primitives.Bytes.pack(nodeid.Identifier)
319 1
    elif nodeid.NodeIdType == ua.NodeIdType.Guid:
320 1
        return struct.pack("<BH", nodeid.NodeIdType.value, nodeid.NamespaceIndex) + \
321
               Primitives.Guid.pack(nodeid.Identifier)
322
    else:
323
        return struct.pack("<BH", nodeid.NodeIdType.value, nodeid.NamespaceIndex) + \
324
            nodeid.Identifier.to_binary()
325
    # FIXME: Missing NNamespaceURI and ServerIndex
326
327
328 1
def nodeid_from_binary(data):
329 1
    nid = ua.NodeId()
330 1
    encoding = ord(data.read(1))
331 1
    nid.NodeIdType = ua.NodeIdType(encoding & 0b00111111)
332
333 1
    if nid.NodeIdType == ua.NodeIdType.TwoByte:
334 1
        nid.Identifier = ord(data.read(1))
335 1
    elif nid.NodeIdType == ua.NodeIdType.FourByte:
336 1
        nid.NamespaceIndex, nid.Identifier = struct.unpack("<BH", data.read(3))
337 1
    elif nid.NodeIdType == ua.NodeIdType.Numeric:
338 1
        nid.NamespaceIndex, nid.Identifier = struct.unpack("<HI", data.read(6))
339 1
    elif nid.NodeIdType == ua.NodeIdType.String:
340 1
        nid.NamespaceIndex = Primitives.UInt16.unpack(data)
341 1
        nid.Identifier = Primitives.String.unpack(data)
342 1
    elif nid.NodeIdType == ua.NodeIdType.ByteString:
343 1
        nid.NamespaceIndex = Primitives.UInt16.unpack(data)
344 1
        nid.Identifier = Primitives.Bytes.unpack(data)
345 1
    elif nid.NodeIdType == ua.NodeIdType.Guid:
346 1
        nid.NamespaceIndex = Primitives.UInt16.unpack(data)
347 1
        nid.Identifier = Primitives.Guid.unpack(data)
348
    else:
349
        raise UaError("Unknown NodeId encoding: " + str(nid.NodeIdType))
350
351 1
    if test_bit(encoding, 7):
352
        nid.NamespaceUri = Primitives.String.unpack(data)
353 1
    if test_bit(encoding, 6):
354
        nid.ServerIndex = Primitives.UInt32.unpack(data)
355
356 1
    return nid
357
358
359 1
def variant_to_binary(var):
360 1
    b = []
361 1
    encoding = var.VariantType.value & 0b111111
362 1
    if var.is_array or isinstance(var.Value, (list, tuple)):
363 1
        var.is_array = True
364 1
        encoding = set_bit(encoding, 7)
365 1
        if var.Dimensions is not None:
366 1
            encoding = set_bit(encoding, 6)
367 1
        b.append(Primitives.Byte.pack(encoding))
368 1
        b.append(pack_uatype_array(var.VariantType, ua.flatten(var.Value)))
369 1
        if var.Dimensions is not None:
370 1
            b.append(pack_uatype_array(ua.VariantType.Int32, var.Dimensions))
371
    else:
372 1
        b.append(Primitives.Byte.pack(encoding))
373 1
        b.append(pack_uatype(var.VariantType, var.Value))
374
375 1
    return b"".join(b)
376
377
378 1
def variant_from_binary(data):
379 1
    dimensions = None
380 1
    array = False
381 1
    encoding = ord(data.read(1))
382 1
    int_type = encoding & 0b00111111
383 1
    vtype = ua.datatype_to_varianttype(int_type)
384 1
    if test_bit(encoding, 7):
385 1
        value = unpack_uatype_array(vtype, data)
386 1
        array = True
387
    else:
388 1
        value = unpack_uatype(vtype, data)
389 1
    if test_bit(encoding, 6):
390 1
        dimensions = unpack_uatype_array(ua.VariantType.Int32, data)
391 1
        value = _reshape(value, dimensions)
392 1
    return ua.Variant(value, vtype, dimensions, is_array=array)
393
394
395 1
def _reshape(flat, dims):
396 1
    subdims = dims[1:]
397 1
    subsize = 1
398 1
    for i in subdims:
399 1
        if i == 0:
400 1
            i = 1
401 1
        subsize *= i
402 1
    while dims[0] * subsize > len(flat):
403 1
        flat.append([])
404 1
    if not subdims or subdims == [0]:
405 1
        return flat
406 1
    return [_reshape(flat[i:i + subsize], subdims) for i in range(0, len(flat), subsize)]
407
408
409 1
def extensionobject_from_binary(data):
410
    """
411
    Convert binary-coded ExtensionObject to a Python object.
412
    Returns an object, or None if TypeId is zero
413
    """
414 1
    typeid = nodeid_from_binary(data)
415 1
    Encoding = ord(data.read(1))
416 1
    body = None
417 1
    if Encoding & (1 << 0):
418 1
        length = Primitives.Int32.unpack(data)
419 1
        if length < 1:
420
            body = Buffer(b"")
421
        else:
422 1
            body = data.copy(length)
423 1
            data.skip(length)
424 1
    if typeid.Identifier == 0:
425 1
        return None
426 1
    elif typeid in ua.extension_object_classes:
427 1
        klass = ua.extension_object_classes[typeid]
428 1
        if body is None:
429
            raise UaError("parsing ExtensionObject {0} without data".format(klass.__name__))
430 1
        return from_binary(klass, body)
431
    else:
432 1
        e = ua.ExtensionObject()
433 1
        e.TypeId = typeid
434 1
        e.Encoding = Encoding
435 1
        if body is not None:
436 1
            e.Body = body.read(len(body))
437 1
        return e
438
439
440 1
def extensionobject_to_binary(obj):
441
    """
442
    Convert Python object to binary-coded ExtensionObject.
443
    If obj is None, convert to empty ExtensionObject (TypeId=0, no Body).
444
    Returns a binary string
445
    """
446 1
    if isinstance(obj, ua.ExtensionObject):
447 1
        return struct_to_binary(obj)
448 1
    if obj is None:
449 1
        TypeId = ua.NodeId()
450 1
        Encoding = 0
451 1
        Body = None
452
    else:
453 1
        TypeId = ua.extension_object_ids[obj.__class__.__name__]
454 1
        Encoding = 0x01
455 1
        Body = struct_to_binary(obj)
456 1
    packet = []
457 1
    packet.append(nodeid_to_binary(TypeId))
458 1
    packet.append(Primitives.Byte.pack(Encoding))
459 1
    if Body:
460 1
        packet.append(Primitives.Bytes.pack(Body))
461 1
    return b''.join(packet)
462
463
464 1
def from_binary(uatype, data):
465
    """
466
    unpack data given an uatype as a string or a python class having a ua_types memeber
467
    """
468 1
    if isinstance(uatype, (str, unicode)) and uatype.startswith("ListOf"):
469 1
        size = Primitives.Int32.unpack(data)
470 1
        if size == -1:
471
            return None
472 1
        res = []
473 1
        for _ in range(size):
474 1
            res.append(from_binary(uatype[6:], data))
475 1
        return res
476 1
    elif isinstance(uatype, (str, unicode)) and hasattr(ua.VariantType, uatype):
477 1
        vtype = getattr(ua.VariantType, uatype)
478 1
        return unpack_uatype(vtype, data)
479
    else:
480 1
        return struct_from_binary(uatype, data)
481
482
483 1
def struct_from_binary(objtype, data):
484
    """
485
    unpack an ua struct. Arguments are an objtype as Python class or string
486
    """
487 1
    if isinstance(objtype, (unicode, str)):
488 1
        objtype = getattr(ua, objtype)
489 1
    if issubclass(objtype, Enum):
490 1
        return objtype(Primitives.UInt32.unpack(data))
491 1
    obj = objtype()
492 1
    for name, uatype in obj.ua_types:
493
        # if our member has a swtich and it is not set we skip it
494 1
        if hasattr(obj, "ua_switches") and name in obj.ua_switches:
495 1
            container_name, idx = obj.ua_switches[name]
496 1
            val = getattr(obj, container_name)
497 1
            if not test_bit(val, idx):
498 1
                continue
499 1
        val = from_binary(uatype, data)
500 1
        setattr(obj, name, val)
501 1
    return obj
502
503
504 1
def header_to_binary(hdr):
505 1
    b = []
506 1
    b.append(struct.pack("<3ss", hdr.MessageType, hdr.ChunkType))
507 1
    size = hdr.body_size + 8
508 1
    if hdr.MessageType in (ua.MessageType.SecureOpen, ua.MessageType.SecureClose, ua.MessageType.SecureMessage):
509 1
        size += 4
510 1
    b.append(Primitives.UInt32.pack(size))
511 1
    if hdr.MessageType in (ua.MessageType.SecureOpen, ua.MessageType.SecureClose, ua.MessageType.SecureMessage):
512 1
        b.append(Primitives.UInt32.pack(hdr.ChannelId))
513 1
    return b"".join(b)
514
515
516 1
def header_from_binary(data):
517 1
    hdr = ua.Header()
518 1
    hdr.MessageType, hdr.ChunkType, hdr.packet_size = struct.unpack("<3scI", data.read(8))
519 1
    hdr.body_size = hdr.packet_size - 8
520 1
    if hdr.MessageType in (ua.MessageType.SecureOpen, ua.MessageType.SecureClose, ua.MessageType.SecureMessage):
521 1
        hdr.body_size -= 4
522 1
        hdr.ChannelId = Primitives.UInt32.unpack(data)
523 1
    return hdr
524
525
526 1
def uatcp_to_binary(message_type, message):
527
    """
528
    Convert OPC UA TCP message (see OPC UA specs Part 6, 7.1) to binary.
529
    The only supported types are Hello, Acknowledge and ErrorMessage
530
    """
531 1
    header = ua.Header(message_type, ua.ChunkType.Single)
532 1
    binmsg = struct_to_binary(message)
533 1
    header.body_size = len(binmsg)
534 1
    return header_to_binary(header) + binmsg
535
536
537