Passed
Push — master ( e6b4d4...53fd8d )
by Olivier
04:10
created

_Primitive1.pack_array()   A

Complexity

Conditions 2

Size

Total Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 5
CRAP Score 2

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 2
c 1
b 0
f 0
dl 0
loc 5
ccs 5
cts 5
cp 1
crap 2
rs 9.4285
1
"""
2
Binary protocol specific functions and constants
3
"""
4
5 1
import sys
6 1
import struct
7 1
import logging
8 1
import uuid
9 1
from enum import IntEnum, Enum
10
11 1
from opcua.ua.uaerrors import UaError
12 1
from opcua.common.utils import Buffer
13 1
from opcua import ua
14
15 1
if sys.version_info.major > 2:
16
    unicode = str
17
18 1
logger = logging.getLogger('__name__')
19
20
21 1
def test_bit(data, offset):
22 1
    mask = 1 << offset
23 1
    return data & mask
24
25
26 1
def set_bit(data, offset):
27 1
    mask = 1 << offset
28 1
    return data | mask
29
30
31 1
def unset_bit(data, offset):
32 1
    mask = 1 << offset
33 1
    return data & ~mask
34
35
36 1
class _DateTime(object):
37 1
    @staticmethod
38
    def pack(dt):
39 1
        epch = ua.datetime_to_win_epoch(dt)
40 1
        return Primitives.Int64.pack(epch)
41
42 1
    @staticmethod
43
    def unpack(data):
44 1
        epch = Primitives.Int64.unpack(data)
45 1
        return ua.win_epoch_to_datetime(epch)
46
47
48 1
class _String(object):
49 1
    @staticmethod
50
    def pack(string):
51 1
        if string is None:
52 1
            return Primitives.Int32.pack(-1)
53 1
        if isinstance(string, unicode):
54
            string = string.encode('utf-8')
55 1
        length = len(string)
56 1
        return Primitives.Int32.pack(length) + string
57
58 1
    @staticmethod
59
    def unpack(data):
60 1
        b = _Bytes.unpack(data)
61 1
        if sys.version_info.major < 3:
62 1
            return b
63
        else:
64
            if b is None:
65
                return b
66
            return b.decode("utf-8")
67
68
69 1
class _Bytes(object):
70 1
    @staticmethod
71
    def pack(data):
72 1
        return _String.pack(data)
73
74 1
    @staticmethod
75
    def unpack(data):
76 1
        length = Primitives.Int32.unpack(data)
77 1
        if length == -1:
78 1
            return None
79 1
        return data.read(length)
80
81
82 1
class _Null(object):
83 1
    @staticmethod
84
    def pack(data):
85 1
        return b""
86
87 1
    @staticmethod
88
    def unpack(data):
89 1
        return None
90
91
92 1
class _Guid(object):
93 1
    @staticmethod
94
    def pack(guid):
95
        # convert python UUID 6 field format to OPC UA 4 field format
96 1
        f1 = Primitives.UInt32.pack(guid.time_low)
97 1
        f2 = Primitives.UInt16.pack(guid.time_mid)
98 1
        f3 = Primitives.UInt16.pack(guid.time_hi_version)
99 1
        f4a = Primitives.Byte.pack(guid.clock_seq_hi_variant)
100 1
        f4b = Primitives.Byte.pack(guid.clock_seq_low)
101 1
        f4c = struct.pack('>Q', guid.node)[2:8]  # no primitive .pack available for 6 byte int
102 1
        f4 = f4a + f4b + f4c
103
        # concat byte fields
104 1
        b = f1 + f2 + f3 + f4
105
106 1
        return b
107
108 1
    @staticmethod
109
    def unpack(data):
110
        # convert OPC UA 4 field format to python UUID bytes
111 1
        f1 = struct.pack('>I', Primitives.UInt32.unpack(data))
112 1
        f2 = struct.pack('>H', Primitives.UInt16.unpack(data))
113 1
        f3 = struct.pack('>H', Primitives.UInt16.unpack(data))
114 1
        f4 = data.read(8)
115
        # concat byte fields
116 1
        b = f1 + f2 + f3 + f4
117
118 1
        return uuid.UUID(bytes=b)
119
120
121 1
class _Primitive1(object):
122 1
    def __init__(self, fmt):
123 1
        self._fmt = fmt
124 1
        st = struct.Struct(fmt.format(1))
125 1
        self.size = st.size
126 1
        self.format = st.format
127
128 1
    def pack(self, data):
129 1
        return struct.pack(self.format, data)
130
131 1
    def unpack(self, data):
132 1
        return struct.unpack(self.format, data.read(self.size))[0]
133
134 1
    def pack_array(self, data):
135 1
        if data is None:
136 1
            return Primitives.Int32.pack(-1)
137 1
        sizedata = Primitives.Int32.pack(len(data))
138 1
        return sizedata + struct.pack(self._fmt.format(len(data)), *data)
139
140 1
    def unpack_array(self, data, length):
141 1
        if length == -1:
142
            return None
143 1
        if length == 0:
144 1
            return ()
145 1
        return struct.unpack(self._fmt.format(length), data.read(self.size*length))
146
147
148 1
class Primitives1(object):
149 1
    SByte = _Primitive1("<{:d}b")
150 1
    Int16 = _Primitive1("<{:d}h")
151 1
    Int32 = _Primitive1("<{:d}i")
152 1
    Int64 = _Primitive1("<{:d}q")
153 1
    Byte = _Primitive1("<{:d}B")
154 1
    Char = Byte
155 1
    UInt16 = _Primitive1("<{:d}H")
156 1
    UInt32 = _Primitive1("<{:d}I")
157 1
    UInt64 = _Primitive1("<{:d}Q")
158 1
    Boolean = _Primitive1("<{:d}?")
159 1
    Double = _Primitive1("<{:d}d")
160 1
    Float = _Primitive1("<{:d}f")
161
162
163 1
class Primitives(Primitives1):
164 1
    Null = _Null()
165 1
    String = _String()
166 1
    Bytes = _Bytes()
167 1
    ByteString = _Bytes()
168 1
    CharArray = _String()
169 1
    DateTime = _DateTime()
170 1
    Guid = _Guid()
171
172
173 1
def pack_uatype(vtype, value):
174 1
    if hasattr(Primitives, vtype.name):
175 1
        return getattr(Primitives, vtype.name).pack(value)
176 1
    elif vtype.value > 25:
177 1
        return Primitives.Bytes.pack(value)
178 1
    elif vtype == ua.VariantType.ExtensionObject:
179 1
        return extensionobject_to_binary(value)
180 1
    elif vtype in (ua.VariantType.NodeId, ua.VariantType.ExpandedNodeId):
181 1
        return nodeid_to_binary(value)
182 1
    elif vtype == ua.VariantType.Variant:
183 1
        return variant_to_binary(value)
184
    else:
185 1
        return struct_to_binary(value)
186
187
188 1
def unpack_uatype(vtype, data):
189 1
    if hasattr(Primitives, vtype.name):
190 1
        st = getattr(Primitives, vtype.name)
191 1
        return st.unpack(data)
192 1
    elif vtype.value > 25:
193 1
        return Primitives.Bytes.unpack(data)
194 1
    elif vtype == ua.VariantType.ExtensionObject:
195 1
        return extensionobject_from_binary(data)
196 1
    elif vtype in (ua.VariantType.NodeId, ua.VariantType.ExpandedNodeId):
197 1
        return nodeid_from_binary(data)
198 1
    elif vtype == ua.VariantType.Variant:
199 1
        return variant_from_binary(data)
200
    else:
201 1
        if hasattr(ua, vtype.name):
202 1
            klass = getattr(ua, vtype.name)
203 1
            return struct_from_binary(klass, data)
204
        else:
205
            raise UaError("Cannot unpack unknown variant type {0!s}".format(vtype))
206
207
208 1
def pack_uatype_array(vtype, array):
209 1
    if hasattr(Primitives1, vtype.name):
210 1
        dataType = getattr(Primitives1, vtype.name)
211 1
        return dataType.pack_array(array)
212 1
    if array is None:
213 1
        return b'\xff\xff\xff\xff'
214 1
    length = len(array)
215 1
    b = [pack_uatype(vtype, val) for val in array]
216 1
    b.insert(0, Primitives.Int32.pack(length))
217 1
    return b"".join(b)
218
219
220 1
def unpack_uatype_array(vtype, data):
221 1
    length = Primitives.Int32.unpack(data)
222 1
    if length == -1:
223 1
        return None
224 1
    elif hasattr(Primitives1, vtype.name):
225 1
        dataType = getattr(Primitives1, vtype.name)
226
        # Remark: works without tuple conversion to list.
227 1
        return list(dataType.unpack_array(data, length))
228
    else:
229
        # Revert to slow serial unpacking.
230 1
        return [unpack_uatype(vtype, data) for _ in range(length)]
231
232
233 1
def struct_to_binary(obj):
234 1
    packet = []
235 1
    has_switch = hasattr(obj, "ua_switches")
236 1
    if has_switch:
237 1
        for name, switch in obj.ua_switches.items():
238 1
            member = getattr(obj, name)
239 1
            container_name, idx = switch
240 1
            if member is not None:
241 1
                container_val = getattr(obj, container_name)
242 1
                container_val = container_val | 1 << idx
243 1
                setattr(obj, container_name, container_val)
244 1
    for name, uatype in obj.ua_types:
245 1
        val = getattr(obj, name)
246 1
        if uatype.startswith("ListOf"):
247 1
            packet.append(list_to_binary(uatype[6:], val))
248
        else:
249 1
            if has_switch and val is None and name in obj.ua_switches:
250 1
                pass
251
            else:
252 1
                packet.append(to_binary(uatype, val))
253 1
    return b''.join(packet)
254
255
256 1
def to_binary(uatype, val):
257
    """
258
    Pack a python object to binary given a string defining its type
259
    """
260 1
    if uatype.startswith("ListOf"):
261
    #if isinstance(val, (list, tuple)):
262
        return list_to_binary(uatype[6:], val)
263 1
    elif isinstance(uatype, (str, unicode)) and hasattr(ua.VariantType, uatype):
264 1
        vtype = getattr(ua.VariantType, uatype)
265 1
        return pack_uatype(vtype, val)
266 1
    elif isinstance(val, (IntEnum, Enum)):
267 1
        return Primitives.UInt32.pack(val.value)
268 1
    elif isinstance(val, ua.NodeId):
269
        return nodeid_to_binary(val)
270 1
    elif isinstance(val, ua.Variant):
271
        return variant_to_binary(val)
272 1
    elif hasattr(val, "ua_types"):
273 1
        return struct_to_binary(val)
274
    else:
275
        raise UaError("No known way to pack {} of type {} to ua binary".format(val, uatype))
276
277
278 1
def list_to_binary(uatype, val):
279 1
    if val is None:
280
        return Primitives.Int32.pack(-1)
281 1
    if hasattr(Primitives1, uatype):
282 1
        dataType = getattr(Primitives1, uatype)
283 1
        return dataType.pack_array(val)
284 1
    datasize = Primitives.Int32.pack(len(val))
285 1
    pack = [to_binary(uatype, el) for el in val]
286 1
    pack.insert(0, datasize)
287 1
    return b''.join(pack)
288
289
290 1
def nodeid_to_binary(nodeid):
291 1
    data = None
292 1
    if nodeid.NodeIdType == ua.NodeIdType.TwoByte:
293 1
        data = struct.pack("<BB", nodeid.NodeIdType.value, nodeid.Identifier)
294 1
    elif nodeid.NodeIdType == ua.NodeIdType.FourByte:
295 1
        data = struct.pack("<BBH", nodeid.NodeIdType.value, nodeid.NamespaceIndex, nodeid.Identifier)
296 1
    elif nodeid.NodeIdType == ua.NodeIdType.Numeric:
297 1
        data = struct.pack("<BHI", nodeid.NodeIdType.value, nodeid.NamespaceIndex, nodeid.Identifier)
298 1
    elif nodeid.NodeIdType == ua.NodeIdType.String:
299 1
        data = struct.pack("<BH", nodeid.NodeIdType.value, nodeid.NamespaceIndex) + \
300
            Primitives.String.pack(nodeid.Identifier)
301 1
    elif nodeid.NodeIdType == ua.NodeIdType.ByteString:
302 1
        data = struct.pack("<BH", nodeid.NodeIdType.value, nodeid.NamespaceIndex) + \
303
            Primitives.Bytes.pack(nodeid.Identifier)
304 1
    elif nodeid.NodeIdType == ua.NodeIdType.Guid:
305 1
        data = struct.pack("<BH", nodeid.NodeIdType.value, nodeid.NamespaceIndex) + \
306
               Primitives.Guid.pack(nodeid.Identifier)
307
    else:
308
        raise UaError("Unknown NodeIdType: {} for NodeId: {}".format(nodeid.NodeIdType, nodeid))
309
    # Add NamespaceUri and ServerIndex in case we have an ExpandedNodeId
310 1
    if nodeid.NamespaceUri:
311 1
        data = bytearray(data)
312 1
        data[0] = set_bit(data[0], 7)
313 1
        data.extend(Primitives.String.pack(nodeid.NamespaceUri))
314 1
    if nodeid.ServerIndex:
315 1
        if not isinstance(data, bytearray):
316
            data = bytearray(data)
317 1
        data[0] = set_bit(data[0], 6)
318 1
        data.extend(Primitives.UInt32.pack(nodeid.ServerIndex))
319 1
    return data
320
321
322 1
def nodeid_from_binary(data):
323 1
    nid = ua.NodeId()
324 1
    encoding = ord(data.read(1))
325 1
    nid.NodeIdType = ua.NodeIdType(encoding & 0b00111111)
326
327 1
    if nid.NodeIdType == ua.NodeIdType.TwoByte:
328 1
        nid.Identifier = ord(data.read(1))
329 1
    elif nid.NodeIdType == ua.NodeIdType.FourByte:
330 1
        nid.NamespaceIndex, nid.Identifier = struct.unpack("<BH", data.read(3))
331 1
    elif nid.NodeIdType == ua.NodeIdType.Numeric:
332 1
        nid.NamespaceIndex, nid.Identifier = struct.unpack("<HI", data.read(6))
333 1
    elif nid.NodeIdType == ua.NodeIdType.String:
334 1
        nid.NamespaceIndex = Primitives.UInt16.unpack(data)
335 1
        nid.Identifier = Primitives.String.unpack(data)
336 1
    elif nid.NodeIdType == ua.NodeIdType.ByteString:
337 1
        nid.NamespaceIndex = Primitives.UInt16.unpack(data)
338 1
        nid.Identifier = Primitives.Bytes.unpack(data)
339 1
    elif nid.NodeIdType == ua.NodeIdType.Guid:
340 1
        nid.NamespaceIndex = Primitives.UInt16.unpack(data)
341 1
        nid.Identifier = Primitives.Guid.unpack(data)
342
    else:
343
        raise UaError("Unknown NodeId encoding: " + str(nid.NodeIdType))
344
345 1
    if test_bit(encoding, 7):
346 1
        nid.NamespaceUri = Primitives.String.unpack(data)
347 1
    if test_bit(encoding, 6):
348 1
        nid.ServerIndex = Primitives.UInt32.unpack(data)
349
350 1
    return nid
351
352
353 1
def variant_to_binary(var):
354 1
    b = []
355 1
    encoding = var.VariantType.value & 0b111111
356 1
    if var.is_array or isinstance(var.Value, (list, tuple)):
357 1
        var.is_array = True
358 1
        encoding = set_bit(encoding, 7)
359 1
        if var.Dimensions is not None:
360 1
            encoding = set_bit(encoding, 6)
361 1
        b.append(Primitives.Byte.pack(encoding))
362 1
        b.append(pack_uatype_array(var.VariantType, ua.flatten(var.Value)))
363 1
        if var.Dimensions is not None:
364 1
            b.append(pack_uatype_array(ua.VariantType.Int32, var.Dimensions))
365
    else:
366 1
        b.append(Primitives.Byte.pack(encoding))
367 1
        b.append(pack_uatype(var.VariantType, var.Value))
368
369 1
    return b"".join(b)
370
371
372 1
def variant_from_binary(data):
373 1
    dimensions = None
374 1
    array = False
375 1
    encoding = ord(data.read(1))
376 1
    int_type = encoding & 0b00111111
377 1
    vtype = ua.datatype_to_varianttype(int_type)
378 1
    if test_bit(encoding, 7):
379 1
        value = unpack_uatype_array(vtype, data)
380 1
        array = True
381
    else:
382 1
        value = unpack_uatype(vtype, data)
383 1
    if test_bit(encoding, 6):
384 1
        dimensions = unpack_uatype_array(ua.VariantType.Int32, data)
385 1
        value = _reshape(value, dimensions)
386 1
    return ua.Variant(value, vtype, dimensions, is_array=array)
387
388
389 1
def _reshape(flat, dims):
390 1
    subdims = dims[1:]
391 1
    subsize = 1
392 1
    for i in subdims:
393 1
        if i == 0:
394 1
            i = 1
395 1
        subsize *= i
396 1
    while dims[0] * subsize > len(flat):
397 1
        flat.append([])
398 1
    if not subdims or subdims == [0]:
399 1
        return flat
400 1
    return [_reshape(flat[i:i + subsize], subdims) for i in range(0, len(flat), subsize)]
401
402
403 1
def extensionobject_from_binary(data):
404
    """
405
    Convert binary-coded ExtensionObject to a Python object.
406
    Returns an object, or None if TypeId is zero
407
    """
408 1
    typeid = nodeid_from_binary(data)
409 1
    Encoding = ord(data.read(1))
410 1
    body = None
411 1
    if Encoding & (1 << 0):
412 1
        length = Primitives.Int32.unpack(data)
413 1
        if length < 1:
414
            body = Buffer(b"")
415
        else:
416 1
            body = data.copy(length)
417 1
            data.skip(length)
418 1
    if typeid.Identifier == 0:
419 1
        return None
420 1
    elif typeid in ua.extension_object_classes:
421 1
        klass = ua.extension_object_classes[typeid]
422 1
        if body is None:
423
            raise UaError("parsing ExtensionObject {0} without data".format(klass.__name__))
424 1
        return from_binary(klass, body)
425
    else:
426 1
        e = ua.ExtensionObject()
427 1
        e.TypeId = typeid
428 1
        e.Encoding = Encoding
429 1
        if body is not None:
430 1
            e.Body = body.read(len(body))
431 1
        return e
432
433
434 1
def extensionobject_to_binary(obj):
435
    """
436
    Convert Python object to binary-coded ExtensionObject.
437
    If obj is None, convert to empty ExtensionObject (TypeId=0, no Body).
438
    Returns a binary string
439
    """
440 1
    if isinstance(obj, ua.ExtensionObject):
441 1
        return struct_to_binary(obj)
442 1
    if obj is None:
443 1
        TypeId = ua.NodeId()
444 1
        Encoding = 0
445 1
        Body = None
446
    else:
447 1
        TypeId = ua.extension_object_ids[obj.__class__.__name__]
448 1
        Encoding = 0x01
449 1
        Body = struct_to_binary(obj)
450 1
    packet = []
451 1
    packet.append(nodeid_to_binary(TypeId))
452 1
    packet.append(Primitives.Byte.pack(Encoding))
453 1
    if Body:
454 1
        packet.append(Primitives.Bytes.pack(Body))
455 1
    return b''.join(packet)
456
457
458 1
def from_binary(uatype, data):
459
    """
460
    unpack data given an uatype as a string or a python class having a ua_types memeber
461
    """
462 1
    if isinstance(uatype, (str, unicode)) and uatype.startswith("ListOf"):
463 1
        utype = uatype[6:]
464 1
        if hasattr(ua.VariantType, utype):
465 1
            vtype = getattr(ua.VariantType, utype)
466 1
            return unpack_uatype_array(vtype, data)
467 1
        size = Primitives.Int32.unpack(data)
468 1
        return [from_binary(utype, data) for _ in range(size)]
469 1
    elif isinstance(uatype, (str, unicode)) and hasattr(ua.VariantType, uatype):
470 1
        vtype = getattr(ua.VariantType, uatype)
471 1
        return unpack_uatype(vtype, data)
472
    else:
473 1
        return struct_from_binary(uatype, data)
474
475
476 1
def struct_from_binary(objtype, data):
477
    """
478
    unpack an ua struct. Arguments are an objtype as Python class or string
479
    """
480 1
    if isinstance(objtype, (unicode, str)):
481 1
        objtype = getattr(ua, objtype)
482 1
    if issubclass(objtype, Enum):
483 1
        return objtype(Primitives.UInt32.unpack(data))
484 1
    obj = objtype()
485 1
    for name, uatype in obj.ua_types:
486
        # if our member has a swtich and it is not set we skip it
487 1
        if hasattr(obj, "ua_switches") and name in obj.ua_switches:
488 1
            container_name, idx = obj.ua_switches[name]
489 1
            val = getattr(obj, container_name)
490 1
            if not test_bit(val, idx):
491 1
                continue
492 1
        val = from_binary(uatype, data)
493 1
        setattr(obj, name, val)
494 1
    return obj
495
496
497 1
def header_to_binary(hdr):
498 1
    b = []
499 1
    b.append(struct.pack("<3ss", hdr.MessageType, hdr.ChunkType))
500 1
    size = hdr.body_size + 8
501 1
    if hdr.MessageType in (ua.MessageType.SecureOpen, ua.MessageType.SecureClose, ua.MessageType.SecureMessage):
502 1
        size += 4
503 1
    b.append(Primitives.UInt32.pack(size))
504 1
    if hdr.MessageType in (ua.MessageType.SecureOpen, ua.MessageType.SecureClose, ua.MessageType.SecureMessage):
505 1
        b.append(Primitives.UInt32.pack(hdr.ChannelId))
506 1
    return b"".join(b)
507
508
509 1
def header_from_binary(data):
510 1
    hdr = ua.Header()
511 1
    hdr.MessageType, hdr.ChunkType, hdr.packet_size = struct.unpack("<3scI", data.read(8))
512 1
    hdr.body_size = hdr.packet_size - 8
513 1
    if hdr.MessageType in (ua.MessageType.SecureOpen, ua.MessageType.SecureClose, ua.MessageType.SecureMessage):
514 1
        hdr.body_size -= 4
515 1
        hdr.ChannelId = Primitives.UInt32.unpack(data)
516 1
    return hdr
517
518
519 1
def uatcp_to_binary(message_type, message):
520
    """
521
    Convert OPC UA TCP message (see OPC UA specs Part 6, 7.1) to binary.
522
    The only supported types are Hello, Acknowledge and ErrorMessage
523
    """
524 1
    header = ua.Header(message_type, ua.ChunkType.Single)
525 1
    binmsg = struct_to_binary(message)
526 1
    header.body_size = len(binmsg)
527
    return header_to_binary(header) + binmsg
528