Completed
Push — master ( 9e5a26...7e090e )
by Olivier
06:12
created

_Primitive1.unpack_many()   A

Complexity

Conditions 2

Size

Total Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 2

Importance

Changes 1
Bugs 1 Features 0
Metric Value
cc 2
c 1
b 1
f 0
dl 0
loc 4
ccs 4
cts 4
cp 1
crap 2
rs 10
1
"""
2
Binary protocol specific functions and constants
3
"""
4
5 1
import sys
6 1
import struct
7 1
import logging
8 1
import uuid
9 1
from enum import IntEnum, Enum
10
11 1
from opcua.ua.uaerrors import UaError
12 1
from opcua.common.utils import Buffer
13 1
from opcua import ua
14
15 1
if sys.version_info.major > 2:
16
    unicode = str
17
18 1
logger = logging.getLogger('__name__')
19
20
21 1
def test_bit(data, offset):
22 1
    mask = 1 << offset
23 1
    return data & mask
24
25
26 1
def set_bit(data, offset):
27 1
    mask = 1 << offset
28 1
    return data | mask
29
30
31 1
def unset_bit(data, offset):
32 1
    mask = 1 << offset
33 1
    return data & ~mask
34
35
36 1
class _DateTime(object):
37 1
    @staticmethod
38
    def pack(dt):
39 1
        epch = ua.datetime_to_win_epoch(dt)
40 1
        return Primitives.Int64.pack(epch)
41
42 1
    @staticmethod
43
    def unpack(data):
44 1
        epch = Primitives.Int64.unpack(data)
45 1
        return ua.win_epoch_to_datetime(epch)
46
47
48 1
class _String(object):
49 1
    @staticmethod
50
    def pack(string):
51 1
        if string is None:
52 1
            return Primitives.Int32.pack(-1)
53 1
        if isinstance(string, unicode):
54
            string = string.encode('utf-8')
55 1
        length = len(string)
56 1
        return Primitives.Int32.pack(length) + string
57
58 1
    @staticmethod
59
    def unpack(data):
60 1
        b = _Bytes.unpack(data)
61 1
        if sys.version_info.major < 3:
62 1
            return b
63
        else:
64
            if b is None:
65
                return b
66
            return b.decode("utf-8")
67
68
69 1
class _Bytes(object):
70 1
    @staticmethod
71
    def pack(data):
72 1
        return _String.pack(data)
73
74 1
    @staticmethod
75
    def unpack(data):
76 1
        length = Primitives.Int32.unpack(data)
77 1
        if length == -1:
78 1
            return None
79 1
        return data.read(length)
80
81
82 1
class _Null(object):
83 1
    @staticmethod
84
    def pack(data):
85 1
        return b""
86
87 1
    @staticmethod
88
    def unpack(data):
89 1
        return None
90
91
92 1
class _Guid(object):
93 1
    @staticmethod
94
    def pack(guid):
95
        # convert python UUID 6 field format to OPC UA 4 field format
96 1
        f1 = Primitives.UInt32.pack(guid.time_low)
97 1
        f2 = Primitives.UInt16.pack(guid.time_mid)
98 1
        f3 = Primitives.UInt16.pack(guid.time_hi_version)
99 1
        f4a = Primitives.Byte.pack(guid.clock_seq_hi_variant)
100 1
        f4b = Primitives.Byte.pack(guid.clock_seq_low)
101 1
        f4c = struct.pack('>Q', guid.node)[2:8]  # no primitive .pack available for 6 byte int
102 1
        f4 = f4a + f4b + f4c
103
        # concat byte fields
104 1
        b = f1 + f2 + f3 + f4
105
106 1
        return b
107
108 1
    @staticmethod
109
    def unpack(data):
110
        # convert OPC UA 4 field format to python UUID bytes
111 1
        f1 = struct.pack('>I', Primitives.UInt32.unpack(data))
112 1
        f2 = struct.pack('>H', Primitives.UInt16.unpack(data))
113 1
        f3 = struct.pack('>H', Primitives.UInt16.unpack(data))
114 1
        f4 = data.read(8)
115
        # concat byte fields
116 1
        b = f1 + f2 + f3 + f4
117
118 1
        return uuid.UUID(bytes=b)
119
120
121 1
class _Primitive1(object):
122 1
    def __init__(self, fmt):
123 1
        self._fmt = fmt
124 1
        st = struct.Struct(fmt.format(1))
125 1
        self.size = st.size
126 1
        self.format = st.format
127
128 1
    def pack(self, data):
129 1
        return struct.pack(self.format, data)
130
131 1
    def unpack(self, data):
132 1
        return struct.unpack(self.format, data.read(self.size))[0]
133
134 1
    def unpack_many(self, data, length):
135 1
        if length is 0:
136 1
            return ()
137 1
        return struct.unpack(self._fmt.format(length), data.read(self.size*length))
138
139 1
class Primitives1(object):
140 1
    SByte = _Primitive1("<{:d}b")
141 1
    Int16 = _Primitive1("<{:d}h")
142 1
    Int32 = _Primitive1("<{:d}i")
143 1
    Int64 = _Primitive1("<{:d}q")
144 1
    Byte = _Primitive1("<{:d}B")
145 1
    Char = Byte
146 1
    UInt16 = _Primitive1("<{:d}H")
147 1
    UInt32 = _Primitive1("<{:d}I")
148 1
    UInt64 = _Primitive1("<{:d}Q")
149 1
    Boolean = _Primitive1("<{:d}?")
150 1
    Double = _Primitive1("<{:d}d")
151 1
    Float = _Primitive1("<{:d}f")
152
153
154 1
class Primitives(Primitives1):
155 1
    Null = _Null()
156 1
    String = _String()
157 1
    Bytes = _Bytes()
158 1
    ByteString = _Bytes()
159 1
    CharArray = _String()
160 1
    DateTime = _DateTime()
161 1
    Guid = _Guid()
162
163
164 1
def pack_uatype(vtype, value):
165 1
    if hasattr(Primitives, vtype.name):
166 1
        return getattr(Primitives, vtype.name).pack(value)
167 1
    elif vtype.value > 25:
168 1
        return Primitives.Bytes.pack(value)
169 1
    elif vtype == ua.VariantType.ExtensionObject:
170 1
        return extensionobject_to_binary(value)
171 1
    elif vtype in (ua.VariantType.NodeId, ua.VariantType.ExpandedNodeId):
172 1
        return nodeid_to_binary(value)
173 1
    elif vtype == ua.VariantType.Variant:
174 1
        return variant_to_binary(value)
175
    else:
176 1
        return struct_to_binary(value)
177
178
179 1
def unpack_uatype(vtype, data):
180 1
    if hasattr(Primitives, vtype.name):
181 1
        st = getattr(Primitives, vtype.name)
182 1
        return st.unpack(data)
183 1
    elif vtype.value > 25:
184 1
        return Primitives.Bytes.unpack(data)
185 1
    elif vtype == ua.VariantType.ExtensionObject:
186 1
        return extensionobject_from_binary(data)
187 1
    elif vtype in (ua.VariantType.NodeId, ua.VariantType.ExpandedNodeId):
188 1
        return nodeid_from_binary(data)
189 1
    elif vtype == ua.VariantType.Variant:
190 1
        return variant_from_binary(data)
191
    else:
192 1
        if hasattr(ua, vtype.name):
193 1
            klass = getattr(ua, vtype.name)
194 1
            return struct_from_binary(klass, data)
195
        else:
196
            raise UaError("Cannot unpack unknown variant type {0!s}".format(vtype))
197
198
199 1
def pack_uatype_array(vtype, array):
200 1
    if array is None:
201 1
        return b'\xff\xff\xff\xff'
202 1
    length = len(array)
203 1
    b = [pack_uatype(vtype, val) for val in array]
204 1
    b.insert(0, Primitives.Int32.pack(length))
205 1
    return b"".join(b)
206
207
208 1
def unpack_uatype_array(vtype, data):
209 1
    length = Primitives.Int32.unpack(data)
210 1
    if length == -1:
211 1
        return None
212 1
    elif hasattr(Primitives1, vtype.name):
213 1
        dataType = getattr(Primitives1, vtype.name)
214
        # Remark: works without tuple conversion to list.
215 1
        return list(dataType.unpack_many(data, length))
216
    else:
217
        # Revert to slow serial unpacking.
218 1
        return [unpack_uatype(vtype, data) for _ in range(length)]
219
220
221 1
def struct_to_binary(obj):
222 1
    packet = []
223 1
    has_switch = hasattr(obj, "ua_switches")
224 1
    if has_switch:
225 1
        for name, switch in obj.ua_switches.items():
226 1
            member = getattr(obj, name)
227 1
            container_name, idx = switch
228 1
            if member is not None:
229 1
                container_val = getattr(obj, container_name)
230 1
                container_val = container_val | 1 << idx
231 1
                setattr(obj, container_name, container_val)
232 1
    for name, uatype in obj.ua_types:
233 1
        val = getattr(obj, name)
234 1
        if uatype.startswith("ListOf"):
235 1
            packet.append(list_to_binary(uatype[6:], val))
236
        else:
237 1
            if has_switch and val is None and name in obj.ua_switches:
238 1
                pass
239
            else:
240 1
                packet.append(to_binary(uatype, val))
241 1
    return b''.join(packet)
242
243
244 1
def to_binary(uatype, val):
245
    """
246
    Pack a python object to binary given a string defining its type
247
    """
248 1
    if uatype.startswith("ListOf"):
249
    #if isinstance(val, (list, tuple)):
250
        return list_to_binary(uatype[6:], val)
251 1
    elif isinstance(uatype, (str, unicode)) and hasattr(ua.VariantType, uatype):
252 1
        vtype = getattr(ua.VariantType, uatype)
253 1
        return pack_uatype(vtype, val)
254 1
    elif isinstance(val, (IntEnum, Enum)):
255 1
        return Primitives.UInt32.pack(val.value)
256 1
    elif isinstance(val, ua.NodeId):
257
        return nodeid_to_binary(val)
258 1
    elif isinstance(val, ua.Variant):
259
        return variant_to_binary(val)
260 1
    elif hasattr(val, "ua_types"):
261 1
        return struct_to_binary(val)
262
    else:
263
        raise UaError("No known way to pack {} of type {} to ua binary".format(val, uatype))
264
265
266 1
def list_to_binary(uatype, val):
267 1
    if val is None:
268
        return Primitives.Int32.pack(-1)
269 1
    pack = []
270 1
    pack.append(Primitives.Int32.pack(len(val)))
271 1
    for el in val:
272 1
        pack.append(to_binary(uatype, el))
273 1
    return b''.join(pack)
274
275
276 1
def nodeid_to_binary(nodeid):
277 1
    data = None
278 1
    if nodeid.NodeIdType == ua.NodeIdType.TwoByte:
279 1
        data = struct.pack("<BB", nodeid.NodeIdType.value, nodeid.Identifier)
280 1
    elif nodeid.NodeIdType == ua.NodeIdType.FourByte:
281 1
        data = struct.pack("<BBH", nodeid.NodeIdType.value, nodeid.NamespaceIndex, nodeid.Identifier)
282 1
    elif nodeid.NodeIdType == ua.NodeIdType.Numeric:
283 1
        data = struct.pack("<BHI", nodeid.NodeIdType.value, nodeid.NamespaceIndex, nodeid.Identifier)
284 1
    elif nodeid.NodeIdType == ua.NodeIdType.String:
285 1
        data = struct.pack("<BH", nodeid.NodeIdType.value, nodeid.NamespaceIndex) + \
286
            Primitives.String.pack(nodeid.Identifier)
287 1
    elif nodeid.NodeIdType == ua.NodeIdType.ByteString:
288 1
        data = struct.pack("<BH", nodeid.NodeIdType.value, nodeid.NamespaceIndex) + \
289
            Primitives.Bytes.pack(nodeid.Identifier)
290 1
    elif nodeid.NodeIdType == ua.NodeIdType.Guid:
291 1
        data = struct.pack("<BH", nodeid.NodeIdType.value, nodeid.NamespaceIndex) + \
292
               Primitives.Guid.pack(nodeid.Identifier)
293
    else:
294
        raise UaError("Unknown NodeIdType: {} for NodeId: {}".format(nodeid.NodeIdType, nodeid))
295
    # Add NamespaceUri and ServerIndex in case we have an ExpandedNodeId
296 1
    if nodeid.NamespaceUri:
297 1
        data = bytearray(data)
298 1
        data[0] = set_bit(data[0], 7)
299 1
        data.extend(Primitives.String.pack(nodeid.NamespaceUri))
300 1
    if nodeid.ServerIndex:
301 1
        if not isinstance(data, bytearray):
302
            data = bytearray(data)
303 1
        data[0] = set_bit(data[0], 6)
304 1
        data.extend(Primitives.UInt32.pack(nodeid.ServerIndex))
305 1
    return data
306
307
308 1
def nodeid_from_binary(data):
309 1
    nid = ua.NodeId()
310 1
    encoding = ord(data.read(1))
311 1
    nid.NodeIdType = ua.NodeIdType(encoding & 0b00111111)
312
313 1
    if nid.NodeIdType == ua.NodeIdType.TwoByte:
314 1
        nid.Identifier = ord(data.read(1))
315 1
    elif nid.NodeIdType == ua.NodeIdType.FourByte:
316 1
        nid.NamespaceIndex, nid.Identifier = struct.unpack("<BH", data.read(3))
317 1
    elif nid.NodeIdType == ua.NodeIdType.Numeric:
318 1
        nid.NamespaceIndex, nid.Identifier = struct.unpack("<HI", data.read(6))
319 1
    elif nid.NodeIdType == ua.NodeIdType.String:
320 1
        nid.NamespaceIndex = Primitives.UInt16.unpack(data)
321 1
        nid.Identifier = Primitives.String.unpack(data)
322 1
    elif nid.NodeIdType == ua.NodeIdType.ByteString:
323 1
        nid.NamespaceIndex = Primitives.UInt16.unpack(data)
324 1
        nid.Identifier = Primitives.Bytes.unpack(data)
325 1
    elif nid.NodeIdType == ua.NodeIdType.Guid:
326 1
        nid.NamespaceIndex = Primitives.UInt16.unpack(data)
327 1
        nid.Identifier = Primitives.Guid.unpack(data)
328
    else:
329
        raise UaError("Unknown NodeId encoding: " + str(nid.NodeIdType))
330
331 1
    if test_bit(encoding, 7):
332 1
        nid.NamespaceUri = Primitives.String.unpack(data)
333 1
    if test_bit(encoding, 6):
334 1
        nid.ServerIndex = Primitives.UInt32.unpack(data)
335
336 1
    return nid
337
338
339 1
def variant_to_binary(var):
340 1
    b = []
341 1
    encoding = var.VariantType.value & 0b111111
342 1
    if var.is_array or isinstance(var.Value, (list, tuple)):
343 1
        var.is_array = True
344 1
        encoding = set_bit(encoding, 7)
345 1
        if var.Dimensions is not None:
346 1
            encoding = set_bit(encoding, 6)
347 1
        b.append(Primitives.Byte.pack(encoding))
348 1
        b.append(pack_uatype_array(var.VariantType, ua.flatten(var.Value)))
349 1
        if var.Dimensions is not None:
350 1
            b.append(pack_uatype_array(ua.VariantType.Int32, var.Dimensions))
351
    else:
352 1
        b.append(Primitives.Byte.pack(encoding))
353 1
        b.append(pack_uatype(var.VariantType, var.Value))
354
355 1
    return b"".join(b)
356
357
358 1
def variant_from_binary(data):
359 1
    dimensions = None
360 1
    array = False
361 1
    encoding = ord(data.read(1))
362 1
    int_type = encoding & 0b00111111
363 1
    vtype = ua.datatype_to_varianttype(int_type)
364 1
    if test_bit(encoding, 7):
365 1
        value = unpack_uatype_array(vtype, data)
366 1
        array = True
367
    else:
368 1
        value = unpack_uatype(vtype, data)
369 1
    if test_bit(encoding, 6):
370 1
        dimensions = unpack_uatype_array(ua.VariantType.Int32, data)
371 1
        value = _reshape(value, dimensions)
372 1
    return ua.Variant(value, vtype, dimensions, is_array=array)
373
374
375 1
def _reshape(flat, dims):
376 1
    subdims = dims[1:]
377 1
    subsize = 1
378 1
    for i in subdims:
379 1
        if i == 0:
380 1
            i = 1
381 1
        subsize *= i
382 1
    while dims[0] * subsize > len(flat):
383 1
        flat.append([])
384 1
    if not subdims or subdims == [0]:
385 1
        return flat
386 1
    return [_reshape(flat[i:i + subsize], subdims) for i in range(0, len(flat), subsize)]
387
388
389 1
def extensionobject_from_binary(data):
390
    """
391
    Convert binary-coded ExtensionObject to a Python object.
392
    Returns an object, or None if TypeId is zero
393
    """
394 1
    typeid = nodeid_from_binary(data)
395 1
    Encoding = ord(data.read(1))
396 1
    body = None
397 1
    if Encoding & (1 << 0):
398 1
        length = Primitives.Int32.unpack(data)
399 1
        if length < 1:
400
            body = Buffer(b"")
401
        else:
402 1
            body = data.copy(length)
403 1
            data.skip(length)
404 1
    if typeid.Identifier == 0:
405 1
        return None
406 1
    elif typeid in ua.extension_object_classes:
407 1
        klass = ua.extension_object_classes[typeid]
408 1
        if body is None:
409
            raise UaError("parsing ExtensionObject {0} without data".format(klass.__name__))
410 1
        return from_binary(klass, body)
411
    else:
412 1
        e = ua.ExtensionObject()
413 1
        e.TypeId = typeid
414 1
        e.Encoding = Encoding
415 1
        if body is not None:
416 1
            e.Body = body.read(len(body))
417 1
        return e
418
419
420 1
def extensionobject_to_binary(obj):
421
    """
422
    Convert Python object to binary-coded ExtensionObject.
423
    If obj is None, convert to empty ExtensionObject (TypeId=0, no Body).
424
    Returns a binary string
425
    """
426 1
    if isinstance(obj, ua.ExtensionObject):
427 1
        return struct_to_binary(obj)
428 1
    if obj is None:
429 1
        TypeId = ua.NodeId()
430 1
        Encoding = 0
431 1
        Body = None
432
    else:
433 1
        TypeId = ua.extension_object_ids[obj.__class__.__name__]
434 1
        Encoding = 0x01
435 1
        Body = struct_to_binary(obj)
436 1
    packet = []
437 1
    packet.append(nodeid_to_binary(TypeId))
438 1
    packet.append(Primitives.Byte.pack(Encoding))
439 1
    if Body:
440 1
        packet.append(Primitives.Bytes.pack(Body))
441 1
    return b''.join(packet)
442
443
444 1
def from_binary(uatype, data):
445
    """
446
    unpack data given an uatype as a string or a python class having a ua_types memeber
447
    """
448 1
    if isinstance(uatype, (str, unicode)) and uatype.startswith("ListOf"):
449 1
        size = Primitives.Int32.unpack(data)
450 1
        if size == -1:
451
            return None
452 1
        res = []
453 1
        for _ in range(size):
454 1
            res.append(from_binary(uatype[6:], data))
455 1
        return res
456 1
    elif isinstance(uatype, (str, unicode)) and hasattr(ua.VariantType, uatype):
457 1
        vtype = getattr(ua.VariantType, uatype)
458 1
        return unpack_uatype(vtype, data)
459
    else:
460 1
        return struct_from_binary(uatype, data)
461
462
463 1
def struct_from_binary(objtype, data):
464
    """
465
    unpack an ua struct. Arguments are an objtype as Python class or string
466
    """
467 1
    if isinstance(objtype, (unicode, str)):
468 1
        objtype = getattr(ua, objtype)
469 1
    if issubclass(objtype, Enum):
470 1
        return objtype(Primitives.UInt32.unpack(data))
471 1
    obj = objtype()
472 1
    for name, uatype in obj.ua_types:
473
        # if our member has a swtich and it is not set we skip it
474 1
        if hasattr(obj, "ua_switches") and name in obj.ua_switches:
475 1
            container_name, idx = obj.ua_switches[name]
476 1
            val = getattr(obj, container_name)
477 1
            if not test_bit(val, idx):
478 1
                continue
479 1
        val = from_binary(uatype, data)
480 1
        setattr(obj, name, val)
481 1
    return obj
482
483
484 1
def header_to_binary(hdr):
485 1
    b = []
486 1
    b.append(struct.pack("<3ss", hdr.MessageType, hdr.ChunkType))
487 1
    size = hdr.body_size + 8
488 1
    if hdr.MessageType in (ua.MessageType.SecureOpen, ua.MessageType.SecureClose, ua.MessageType.SecureMessage):
489 1
        size += 4
490 1
    b.append(Primitives.UInt32.pack(size))
491 1
    if hdr.MessageType in (ua.MessageType.SecureOpen, ua.MessageType.SecureClose, ua.MessageType.SecureMessage):
492 1
        b.append(Primitives.UInt32.pack(hdr.ChannelId))
493 1
    return b"".join(b)
494
495
496 1
def header_from_binary(data):
497 1
    hdr = ua.Header()
498 1
    hdr.MessageType, hdr.ChunkType, hdr.packet_size = struct.unpack("<3scI", data.read(8))
499 1
    hdr.body_size = hdr.packet_size - 8
500 1
    if hdr.MessageType in (ua.MessageType.SecureOpen, ua.MessageType.SecureClose, ua.MessageType.SecureMessage):
501 1
        hdr.body_size -= 4
502 1
        hdr.ChannelId = Primitives.UInt32.unpack(data)
503 1
    return hdr
504
505
506 1
def uatcp_to_binary(message_type, message):
507
    """
508
    Convert OPC UA TCP message (see OPC UA specs Part 6, 7.1) to binary.
509
    The only supported types are Hello, Acknowledge and ErrorMessage
510
    """
511 1
    header = ua.Header(message_type, ua.ChunkType.Single)
512 1
    binmsg = struct_to_binary(message)
513 1
    header.body_size = len(binmsg)
514
    return header_to_binary(header) + binmsg
515