Passed
Push — master ( 72629f...de3f23 )
by Olivier
03:10
created

_String.pack()   A

Complexity

Conditions 4

Size

Total Lines 10

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 5
CRAP Score 4.3731

Importance

Changes 3
Bugs 1 Features 0
Metric Value
cc 4
c 3
b 1
f 0
dl 0
loc 10
ccs 5
cts 7
cp 0.7143
crap 4.3731
rs 9.2
1
"""
2
Binary protocol specific functions and constants
3
"""
4
5 1
import sys
6 1
import struct
7 1
import logging
8 1
import uuid
9 1
from enum import IntEnum, Enum
10
11 1
from opcua.ua.uaerrors import UaError
12 1
from opcua.common.utils import Buffer
13 1
from opcua import ua
14
15 1
if sys.version_info.major > 2:
16 1
    unicode = str
17
18 1
logger = logging.getLogger('__name__')
19
20
21 1
def test_bit(data, offset):
22 1
    mask = 1 << offset
23 1
    return data & mask
24
25
26 1
def set_bit(data, offset):
27 1
    mask = 1 << offset
28 1
    return data | mask
29
30
31 1
def unset_bit(data, offset):
32 1
    mask = 1 << offset
33 1
    return data & ~mask
34
35
36 1
class _DateTime(object):
37 1
    @staticmethod
38
    def pack(dt):
39 1
        epch = ua.datetime_to_win_epoch(dt)
40 1
        return Primitives.Int64.pack(epch)
41
42 1
    @staticmethod
43
    def unpack(data):
44 1
        epch = Primitives.Int64.unpack(data)
45 1
        return ua.win_epoch_to_datetime(epch)
46
47
48 1
class _Bytes(object):
49 1
    @staticmethod
50
    def pack(data):
51 1
        if data is None:
52 1
            return Primitives.Int32.pack(-1)
53 1
        length = len(data)
54 1
        return Primitives.Int32.pack(length) + data
55
56 1
    @staticmethod
57
    def unpack(data):
58 1
        length = Primitives.Int32.unpack(data)
59 1
        if length == -1:
60 1
            return None
61 1
        return data.read(length)
62
63
64 1
class _String(object):
65 1
    @staticmethod
66
    def pack(string):
67 1
        if string is not None:
68 1
            if sys.version_info.major > 2:
69 1
                string = string.encode('utf-8')
70
            else:
71
                # we do not want this test to happen with python3
72
                if isinstance(string, unicode):
73
                    string = string.encode('utf-8')
74 1
        return _Bytes.pack(string)
75
76 1
    @staticmethod
77
    def unpack(data):
78 1
        b = _Bytes.unpack(data)
79 1
        if sys.version_info.major < 3:
80
            # return unicode(b)  #might be correct for python2 but would complicate tests for python3
81
            return b
82
        else:
83 1
            if b is None:
84 1
                return b
85 1
            return b.decode("utf-8")
86
87
88 1
class _Null(object):
89 1
    @staticmethod
90
    def pack(data):
91 1
        return b""
92
93 1
    @staticmethod
94
    def unpack(data):
95 1
        return None
96
97
98 1
class _Guid(object):
99 1
    @staticmethod
100
    def pack(guid):
101
        # convert python UUID 6 field format to OPC UA 4 field format
102 1
        f1 = Primitives.UInt32.pack(guid.time_low)
103 1
        f2 = Primitives.UInt16.pack(guid.time_mid)
104 1
        f3 = Primitives.UInt16.pack(guid.time_hi_version)
105 1
        f4a = Primitives.Byte.pack(guid.clock_seq_hi_variant)
106 1
        f4b = Primitives.Byte.pack(guid.clock_seq_low)
107 1
        f4c = struct.pack('>Q', guid.node)[2:8]  # no primitive .pack available for 6 byte int
108 1
        f4 = f4a + f4b + f4c
109
        # concat byte fields
110 1
        b = f1 + f2 + f3 + f4
111
112 1
        return b
113
114 1
    @staticmethod
115
    def unpack(data):
116
        # convert OPC UA 4 field format to python UUID bytes
117 1
        f1 = struct.pack('>I', Primitives.UInt32.unpack(data))
118 1
        f2 = struct.pack('>H', Primitives.UInt16.unpack(data))
119 1
        f3 = struct.pack('>H', Primitives.UInt16.unpack(data))
120 1
        f4 = data.read(8)
121
        # concat byte fields
122 1
        b = f1 + f2 + f3 + f4
123
124 1
        return uuid.UUID(bytes=b)
125
126
127 1
class _Primitive1(object):
128 1
    def __init__(self, fmt):
129 1
        self._fmt = fmt
130 1
        st = struct.Struct(fmt.format(1))
131 1
        self.size = st.size
132 1
        self.format = st.format
133
134 1
    def pack(self, data):
135 1
        return struct.pack(self.format, data)
136
137 1
    def unpack(self, data):
138 1
        return struct.unpack(self.format, data.read(self.size))[0]
139
140 1
    def pack_array(self, data):
141 1
        if data is None:
142 1
            return Primitives.Int32.pack(-1)
143 1
        sizedata = Primitives.Int32.pack(len(data))
144 1
        return sizedata + struct.pack(self._fmt.format(len(data)), *data)
145
146 1
    def unpack_array(self, data, length):
147 1
        if length == -1:
148
            return None
149 1
        if length == 0:
150 1
            return ()
151 1
        return struct.unpack(self._fmt.format(length), data.read(self.size * length))
152
153
154 1
class Primitives1(object):
155 1
    SByte = _Primitive1("<{:d}b")
156 1
    Int16 = _Primitive1("<{:d}h")
157 1
    Int32 = _Primitive1("<{:d}i")
158 1
    Int64 = _Primitive1("<{:d}q")
159 1
    Byte = _Primitive1("<{:d}B")
160 1
    Char = Byte
161 1
    UInt16 = _Primitive1("<{:d}H")
162 1
    UInt32 = _Primitive1("<{:d}I")
163 1
    UInt64 = _Primitive1("<{:d}Q")
164 1
    Boolean = _Primitive1("<{:d}?")
165 1
    Double = _Primitive1("<{:d}d")
166 1
    Float = _Primitive1("<{:d}f")
167
168
169 1
class Primitives(Primitives1):
170 1
    Null = _Null()
171 1
    String = _String()
172 1
    Bytes = _Bytes()
173 1
    ByteString = _Bytes()
174 1
    CharArray = _String()
175 1
    DateTime = _DateTime()
176 1
    Guid = _Guid()
177
178
179 1
def pack_uatype(vtype, value):
180 1
    if hasattr(Primitives, vtype.name):
181 1
        return getattr(Primitives, vtype.name).pack(value)
182 1
    elif vtype.value > 25:
183 1
        return Primitives.Bytes.pack(value)
184 1
    elif vtype == ua.VariantType.ExtensionObject:
185 1
        return extensionobject_to_binary(value)
186 1
    elif vtype in (ua.VariantType.NodeId, ua.VariantType.ExpandedNodeId):
187 1
        return nodeid_to_binary(value)
188 1
    elif vtype == ua.VariantType.Variant:
189 1
        return variant_to_binary(value)
190
    else:
191 1
        return struct_to_binary(value)
192
193
194 1
def unpack_uatype(vtype, data):
195 1
    if hasattr(Primitives, vtype.name):
196 1
        st = getattr(Primitives, vtype.name)
197 1
        return st.unpack(data)
198 1
    elif vtype.value > 25:
199 1
        return Primitives.Bytes.unpack(data)
200 1
    elif vtype == ua.VariantType.ExtensionObject:
201 1
        return extensionobject_from_binary(data)
202 1
    elif vtype in (ua.VariantType.NodeId, ua.VariantType.ExpandedNodeId):
203 1
        return nodeid_from_binary(data)
204 1
    elif vtype == ua.VariantType.Variant:
205 1
        return variant_from_binary(data)
206
    else:
207 1
        if hasattr(ua, vtype.name):
208 1
            klass = getattr(ua, vtype.name)
209 1
            return struct_from_binary(klass, data)
210
        else:
211
            raise UaError("Cannot unpack unknown variant type {0!s}".format(vtype))
212
213
214 1
def pack_uatype_array(vtype, array):
215 1
    if hasattr(Primitives1, vtype.name):
216 1
        dataType = getattr(Primitives1, vtype.name)
217 1
        return dataType.pack_array(array)
218 1
    if array is None:
219 1
        return b'\xff\xff\xff\xff'
220 1
    length = len(array)
221 1
    b = [pack_uatype(vtype, val) for val in array]
222 1
    b.insert(0, Primitives.Int32.pack(length))
223 1
    return b"".join(b)
224
225
226 1
def unpack_uatype_array(vtype, data):
227 1
    length = Primitives.Int32.unpack(data)
228 1
    if length == -1:
229 1
        return None
230 1
    elif hasattr(Primitives1, vtype.name):
231 1
        dataType = getattr(Primitives1, vtype.name)
232
        # Remark: works without tuple conversion to list.
233 1
        return list(dataType.unpack_array(data, length))
234
    else:
235
        # Revert to slow serial unpacking.
236 1
        return [unpack_uatype(vtype, data) for _ in range(length)]
237
238
239 1
def struct_to_binary(obj):
240 1
    packet = []
241 1
    has_switch = hasattr(obj, "ua_switches")
242 1
    if has_switch:
243 1
        for name, switch in obj.ua_switches.items():
244 1
            member = getattr(obj, name)
245 1
            container_name, idx = switch
246 1
            if member is not None:
247 1
                container_val = getattr(obj, container_name)
248 1
                container_val = container_val | 1 << idx
249 1
                setattr(obj, container_name, container_val)
250 1
    for name, uatype in obj.ua_types:
251 1
        val = getattr(obj, name)
252 1
        if uatype.startswith("ListOf"):
253 1
            packet.append(list_to_binary(uatype[6:], val))
254
        else:
255 1
            if has_switch and val is None and name in obj.ua_switches:
256 1
                pass
257
            else:
258 1
                packet.append(to_binary(uatype, val))
259 1
    return b''.join(packet)
260
261
262 1
def to_binary(uatype, val):
263
    """
264
    Pack a python object to binary given a string defining its type
265
    """
266 1
    if uatype.startswith("ListOf"):
267
        #if isinstance(val, (list, tuple)):
268
        return list_to_binary(uatype[6:], val)
269 1
    elif isinstance(uatype, (str, unicode)) and hasattr(ua.VariantType, uatype):
270 1
        vtype = getattr(ua.VariantType, uatype)
271 1
        return pack_uatype(vtype, val)
272 1
    elif isinstance(uatype, (str, unicode)) and hasattr(Primitives, uatype):
273
        return getattr(Primitives, uatype).pack(val)
274 1
    elif isinstance(val, (IntEnum, Enum)):
275 1
        return Primitives.UInt32.pack(val.value)
276 1
    elif isinstance(val, ua.NodeId):
277
        return nodeid_to_binary(val)
278 1
    elif isinstance(val, ua.Variant):
279
        return variant_to_binary(val)
280 1
    elif hasattr(val, "ua_types"):
281 1
        return struct_to_binary(val)
282
    else:
283
        raise UaError("No known way to pack {} of type {} to ua binary".format(val, uatype))
284
285
286 1
def list_to_binary(uatype, val):
287 1
    if val is None:
288
        return Primitives.Int32.pack(-1)
289 1
    if hasattr(Primitives1, uatype):
290 1
        dataType = getattr(Primitives1, uatype)
291 1
        return dataType.pack_array(val)
292 1
    datasize = Primitives.Int32.pack(len(val))
293 1
    pack = [to_binary(uatype, el) for el in val]
294 1
    pack.insert(0, datasize)
295 1
    return b''.join(pack)
296
297
298 1
def nodeid_to_binary(nodeid):
299 1
    data = None
300 1
    if nodeid.NodeIdType == ua.NodeIdType.TwoByte:
301 1
        data = struct.pack("<BB", nodeid.NodeIdType.value, nodeid.Identifier)
302 1
    elif nodeid.NodeIdType == ua.NodeIdType.FourByte:
303 1
        data = struct.pack("<BBH", nodeid.NodeIdType.value, nodeid.NamespaceIndex, nodeid.Identifier)
304 1
    elif nodeid.NodeIdType == ua.NodeIdType.Numeric:
305 1
        data = struct.pack("<BHI", nodeid.NodeIdType.value, nodeid.NamespaceIndex, nodeid.Identifier)
306 1
    elif nodeid.NodeIdType == ua.NodeIdType.String:
307 1
        data = struct.pack("<BH", nodeid.NodeIdType.value, nodeid.NamespaceIndex) + \
308
            Primitives.String.pack(nodeid.Identifier)
309 1
    elif nodeid.NodeIdType == ua.NodeIdType.ByteString:
310 1
        data = struct.pack("<BH", nodeid.NodeIdType.value, nodeid.NamespaceIndex) + \
311
            Primitives.Bytes.pack(nodeid.Identifier)
312 1
    elif nodeid.NodeIdType == ua.NodeIdType.Guid:
313 1
        data = struct.pack("<BH", nodeid.NodeIdType.value, nodeid.NamespaceIndex) + \
314
               Primitives.Guid.pack(nodeid.Identifier)
315
    else:
316
        raise UaError("Unknown NodeIdType: {} for NodeId: {}".format(nodeid.NodeIdType, nodeid))
317
    # Add NamespaceUri and ServerIndex in case we have an ExpandedNodeId
318 1
    if nodeid.NamespaceUri:
319 1
        data = bytearray(data)
320 1
        data[0] = set_bit(data[0], 7)
321 1
        data.extend(Primitives.String.pack(nodeid.NamespaceUri))
322 1
    if nodeid.ServerIndex:
323 1
        if not isinstance(data, bytearray):
324
            data = bytearray(data)
325 1
        data[0] = set_bit(data[0], 6)
326 1
        data.extend(Primitives.UInt32.pack(nodeid.ServerIndex))
327 1
    return data
328
329
330 1
def nodeid_from_binary(data):
331 1
    nid = ua.NodeId()
332 1
    encoding = ord(data.read(1))
333 1
    nid.NodeIdType = ua.NodeIdType(encoding & 0b00111111)
334
335 1
    if nid.NodeIdType == ua.NodeIdType.TwoByte:
336 1
        nid.Identifier = ord(data.read(1))
337 1
    elif nid.NodeIdType == ua.NodeIdType.FourByte:
338 1
        nid.NamespaceIndex, nid.Identifier = struct.unpack("<BH", data.read(3))
339 1
    elif nid.NodeIdType == ua.NodeIdType.Numeric:
340 1
        nid.NamespaceIndex, nid.Identifier = struct.unpack("<HI", data.read(6))
341 1
    elif nid.NodeIdType == ua.NodeIdType.String:
342 1
        nid.NamespaceIndex = Primitives.UInt16.unpack(data)
343 1
        nid.Identifier = Primitives.String.unpack(data)
344 1
    elif nid.NodeIdType == ua.NodeIdType.ByteString:
345 1
        nid.NamespaceIndex = Primitives.UInt16.unpack(data)
346 1
        nid.Identifier = Primitives.Bytes.unpack(data)
347 1
    elif nid.NodeIdType == ua.NodeIdType.Guid:
348 1
        nid.NamespaceIndex = Primitives.UInt16.unpack(data)
349 1
        nid.Identifier = Primitives.Guid.unpack(data)
350
    else:
351
        raise UaError("Unknown NodeId encoding: " + str(nid.NodeIdType))
352
353 1
    if test_bit(encoding, 7):
354 1
        nid.NamespaceUri = Primitives.String.unpack(data)
355 1
    if test_bit(encoding, 6):
356 1
        nid.ServerIndex = Primitives.UInt32.unpack(data)
357
358 1
    return nid
359
360
361 1
def variant_to_binary(var):
362 1
    b = []
363 1
    encoding = var.VariantType.value & 0b111111
364 1
    if var.is_array or isinstance(var.Value, (list, tuple)):
365 1
        var.is_array = True
366 1
        encoding = set_bit(encoding, 7)
367 1
        if var.Dimensions is not None:
368 1
            encoding = set_bit(encoding, 6)
369 1
        b.append(Primitives.Byte.pack(encoding))
370 1
        b.append(pack_uatype_array(var.VariantType, ua.flatten(var.Value)))
371 1
        if var.Dimensions is not None:
372 1
            b.append(pack_uatype_array(ua.VariantType.Int32, var.Dimensions))
373
    else:
374 1
        b.append(Primitives.Byte.pack(encoding))
375 1
        b.append(pack_uatype(var.VariantType, var.Value))
376
377 1
    return b"".join(b)
378
379
380 1
def variant_from_binary(data):
381 1
    dimensions = None
382 1
    array = False
383 1
    encoding = ord(data.read(1))
384 1
    int_type = encoding & 0b00111111
385 1
    vtype = ua.datatype_to_varianttype(int_type)
386 1
    if test_bit(encoding, 7):
387 1
        value = unpack_uatype_array(vtype, data)
388 1
        array = True
389
    else:
390 1
        value = unpack_uatype(vtype, data)
391 1
    if test_bit(encoding, 6):
392 1
        dimensions = unpack_uatype_array(ua.VariantType.Int32, data)
393 1
        value = _reshape(value, dimensions)
394 1
    return ua.Variant(value, vtype, dimensions, is_array=array)
395
396
397 1
def _reshape(flat, dims):
398 1
    subdims = dims[1:]
399 1
    subsize = 1
400 1
    for i in subdims:
401 1
        if i == 0:
402 1
            i = 1
403 1
        subsize *= i
404 1
    while dims[0] * subsize > len(flat):
405 1
        flat.append([])
406 1
    if not subdims or subdims == [0]:
407 1
        return flat
408 1
    return [_reshape(flat[i:i + subsize], subdims) for i in range(0, len(flat), subsize)]
409
410
411 1
def extensionobject_from_binary(data):
412
    """
413
    Convert binary-coded ExtensionObject to a Python object.
414
    Returns an object, or None if TypeId is zero
415
    """
416 1
    typeid = nodeid_from_binary(data)
417 1
    Encoding = ord(data.read(1))
418 1
    body = None
419 1
    if Encoding & (1 << 0):
420 1
        length = Primitives.Int32.unpack(data)
421 1
        if length < 1:
422
            body = Buffer(b"")
423
        else:
424 1
            body = data.copy(length)
425 1
            data.skip(length)
426 1
    if typeid.Identifier == 0:
427 1
        return None
428 1
    elif typeid in ua.extension_object_classes:
429 1
        klass = ua.extension_object_classes[typeid]
430 1
        if body is None:
431
            raise UaError("parsing ExtensionObject {0} without data".format(klass.__name__))
432 1
        return from_binary(klass, body)
433
    else:
434 1
        e = ua.ExtensionObject()
435 1
        e.TypeId = typeid
436 1
        e.Encoding = Encoding
437 1
        if body is not None:
438 1
            e.Body = body.read(len(body))
439 1
        return e
440
441
442 1
def extensionobject_to_binary(obj):
443
    """
444
    Convert Python object to binary-coded ExtensionObject.
445
    If obj is None, convert to empty ExtensionObject (TypeId=0, no Body).
446
    Returns a binary string
447
    """
448 1
    if isinstance(obj, ua.ExtensionObject):
449 1
        return struct_to_binary(obj)
450 1
    if obj is None:
451 1
        TypeId = ua.NodeId()
452 1
        Encoding = 0
453 1
        Body = None
454
    else:
455 1
        TypeId = ua.extension_object_ids[obj.__class__.__name__]
456 1
        Encoding = 0x01
457 1
        Body = struct_to_binary(obj)
458 1
    packet = []
459 1
    packet.append(nodeid_to_binary(TypeId))
460 1
    packet.append(Primitives.Byte.pack(Encoding))
461 1
    if Body:
462 1
        packet.append(Primitives.Bytes.pack(Body))
463 1
    return b''.join(packet)
464
465
466 1
def from_binary(uatype, data):
467
    """
468
    unpack data given an uatype as a string or a python class having a ua_types memeber
469
    """
470 1
    if isinstance(uatype, (str, unicode)) and uatype.startswith("ListOf"):
471 1
        utype = uatype[6:]
472 1
        if hasattr(ua.VariantType, utype):
473 1
            vtype = getattr(ua.VariantType, utype)
474 1
            return unpack_uatype_array(vtype, data)
475 1
        size = Primitives.Int32.unpack(data)
476 1
        return [from_binary(utype, data) for _ in range(size)]
477 1
    elif isinstance(uatype, (str, unicode)) and hasattr(ua.VariantType, uatype):
478 1
        vtype = getattr(ua.VariantType, uatype)
479 1
        return unpack_uatype(vtype, data)
480 1
    elif isinstance(uatype, (str, unicode)) and hasattr(Primitives, uatype):
481
        return  getattr(Primitives, uatype).unpack(data)
482
    else:
483 1
        return struct_from_binary(uatype, data)
484
485
486 1
def struct_from_binary(objtype, data):
487
    """
488
    unpack an ua struct. Arguments are an objtype as Python class or string
489
    """
490 1
    if isinstance(objtype, (unicode, str)):
491 1
        objtype = getattr(ua, objtype)
492 1
    if issubclass(objtype, Enum):
493 1
        return objtype(Primitives.UInt32.unpack(data))
494 1
    obj = objtype()
495 1
    for name, uatype in obj.ua_types:
496
        # if our member has a swtich and it is not set we skip it
497 1
        if hasattr(obj, "ua_switches") and name in obj.ua_switches:
498 1
            container_name, idx = obj.ua_switches[name]
499 1
            val = getattr(obj, container_name)
500 1
            if not test_bit(val, idx):
501 1
                continue
502 1
        val = from_binary(uatype, data)
503 1
        setattr(obj, name, val)
504 1
    return obj
505
506
507 1
def header_to_binary(hdr):
508 1
    b = []
509 1
    b.append(struct.pack("<3ss", hdr.MessageType, hdr.ChunkType))
510 1
    size = hdr.body_size + 8
511 1
    if hdr.MessageType in (ua.MessageType.SecureOpen, ua.MessageType.SecureClose, ua.MessageType.SecureMessage):
512 1
        size += 4
513 1
    b.append(Primitives.UInt32.pack(size))
514 1
    if hdr.MessageType in (ua.MessageType.SecureOpen, ua.MessageType.SecureClose, ua.MessageType.SecureMessage):
515 1
        b.append(Primitives.UInt32.pack(hdr.ChannelId))
516 1
    return b"".join(b)
517
518
519 1
def header_from_binary(data):
520 1
    hdr = ua.Header()
521 1
    hdr.MessageType, hdr.ChunkType, hdr.packet_size = struct.unpack("<3scI", data.read(8))
522 1
    hdr.body_size = hdr.packet_size - 8
523 1
    if hdr.MessageType in (ua.MessageType.SecureOpen, ua.MessageType.SecureClose, ua.MessageType.SecureMessage):
524 1
        hdr.body_size -= 4
525 1
        hdr.ChannelId = Primitives.UInt32.unpack(data)
526 1
    return hdr
527
528
529 1
def uatcp_to_binary(message_type, message):
530
    """
531
    Convert OPC UA TCP message (see OPC UA specs Part 6, 7.1) to binary.
532
    The only supported types are Hello, Acknowledge and ErrorMessage
533
    """
534 1
    header = ua.Header(message_type, ua.ChunkType.Single)
535 1
    binmsg = struct_to_binary(message)
536 1
    header.body_size = len(binmsg)
537
    return header_to_binary(header) + binmsg
538