Completed
Pull Request — master (#490)
by Olivier
03:32
created

struct_to_binary()   C

Complexity

Conditions 9

Size

Total Lines 21

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 90

Importance

Changes 2
Bugs 0 Features 0
Metric Value
cc 9
c 2
b 0
f 0
dl 0
loc 21
ccs 0
cts 19
cp 0
crap 90
rs 5.4999
1
"""
2
Binary protocol specific functions and constants
3
"""
4
5 1
import sys
6 1
import struct
7 1
import logging
8 1
from datetime import datetime, timedelta, tzinfo, MAXYEAR
9 1
from calendar import timegm
10 1
import uuid
11 1
from enum import IntEnum, Enum
12
13 1
from opcua.ua.uaerrors import UaError
14 1
from opcua.common.utils import Buffer
15 1
from opcua import ua
16
17
if sys.version_info.major > 2:
18
    unicode = str
19
20
logger = logging.getLogger('__name__')
21
22
EPOCH_AS_FILETIME = 116444736000000000  # January 1, 1970 as MS file time
23
HUNDREDS_OF_NANOSECONDS = 10000000
24
FILETIME_EPOCH_AS_DATETIME = datetime(1601, 1, 1)
25
26
27
def test_bit(data, offset):
28
    mask = 1 << offset
29
    return data & mask
30
31
32
def set_bit(data, offset):
33
    mask = 1 << offset
34
    return data | mask
35
36
37
def unset_bit(data, offset):
38
    mask = 1 << offset
39
    return data & ~mask
40
41
42
class UTC(tzinfo):
43
    """
44
    UTC
45
    """
46
47
    def utcoffset(self, dt):
48
        return timedelta(0)
49
50
    def tzname(self, dt):
51
        return "UTC"
52
53
    def dst(self, dt):
54
        return timedelta(0)
55
56
57
# method copied from David Buxton <[email protected]> sample code
58
def datetime_to_win_epoch(dt):
59
    if (dt.tzinfo is None) or (dt.tzinfo.utcoffset(dt) is None):
60
        dt = dt.replace(tzinfo=UTC())
61
    ft = EPOCH_AS_FILETIME + (timegm(dt.timetuple()) * HUNDREDS_OF_NANOSECONDS)
62
    return ft + (dt.microsecond * 10)
63
64
65
def win_epoch_to_datetime(epch):
66
    try:
67
        return FILETIME_EPOCH_AS_DATETIME + timedelta(microseconds=epch // 10)
68
    except OverflowError:
69
        # FILETIMEs after 31 Dec 9999 can't be converted to datetime
70
        logger.warning("datetime overflow: %s", epch)
71
        return datetime(MAXYEAR, 12, 31, 23, 59, 59, 999999)
72
73
74
def build_array_format_py2(prefix, length, fmtchar):
75
    return prefix + str(length) + fmtchar
76
77
78
def build_array_format_py3(prefix, length, fmtchar):
79
    return prefix + str(length) + chr(fmtchar)
80
81
82
if sys.version_info.major < 3:
83
    build_array_format = build_array_format_py2
84
else:
85
    build_array_format = build_array_format_py3
86
87
88
class _Primitive(object):
89
    def pack_array(self, array):
90
        if array is None:
91
            return b'\xff\xff\xff\xff'
92
        length = len(array)
93
        b = [self.pack(val) for val in array]
94
        b.insert(0, Primitives.Int32.pack(length))
95
        return b"".join(b)
96
97
    def unpack_array(self, data):
98
        length = Primitives.Int32.unpack(data)
99
        if length == -1:
100
            return None
101
        elif length == 0:
102
            return []
103
        else:
104
            return [self.unpack(data) for _ in range(length)]
105
106
107
class _DateTime(_Primitive):
108
    @staticmethod
109
    def pack(dt):
110
        epch = datetime_to_win_epoch(dt)
111
        return Primitives.Int64.pack(epch)
112
113
    @staticmethod
114
    def unpack(data):
115
        epch = Primitives.Int64.unpack(data)
116
        return win_epoch_to_datetime(epch)
117
118
119
class _String(_Primitive):
120
    @staticmethod
121
    def pack(string):
122
        if string is None:
123
            return Primitives.Int32.pack(-1)
124
        if isinstance(string, unicode):
125
            string = string.encode('utf-8')
126
        length = len(string)
127
        return Primitives.Int32.pack(length) + string
128
129
    @staticmethod
130
    def unpack(data):
131
        b = _Bytes.unpack(data)
132
        if sys.version_info.major < 3:
133
            return b
134
        else:
135
            if b is None:
136
                return b
137
            return b.decode("utf-8")
138
139
140
class _Bytes(_Primitive):
141
    @staticmethod
142
    def pack(data):
143
        return _String.pack(data)
144
145
    @staticmethod
146
    def unpack(data):
147
        length = Primitives.Int32.unpack(data)
148
        if length == -1:
149
            return None
150
        return data.read(length)
151
152
153
class _Null(_Primitive):
154
    @staticmethod
155
    def pack(data):
156
        return b""
157
158
    @staticmethod
159
    def unpack(data):
160
        return None
161
162
163
class _Guid(_Primitive):
164
    @staticmethod
165
    def pack(guid):
166
        # convert python UUID 6 field format to OPC UA 4 field format
167
        f1 = Primitives.UInt32.pack(guid.time_low)
168
        f2 = Primitives.UInt16.pack(guid.time_mid)
169
        f3 = Primitives.UInt16.pack(guid.time_hi_version)
170
        f4a = Primitives.Byte.pack(guid.clock_seq_hi_variant)
171
        f4b = Primitives.Byte.pack(guid.clock_seq_low)
172
        f4c = struct.pack('>Q', guid.node)[2:8]  # no primitive .pack available for 6 byte int
173
        f4 = f4a + f4b + f4c
174
        # concat byte fields
175
        b = f1 + f2 + f3 + f4
176
177
        return b
178
179
    @staticmethod
180
    def unpack(data):
181
        # convert OPC UA 4 field format to python UUID bytes
182
        f1 = struct.pack('>I', Primitives.UInt32.unpack(data))
183
        f2 = struct.pack('>H', Primitives.UInt16.unpack(data))
184
        f3 = struct.pack('>H', Primitives.UInt16.unpack(data))
185
        f4 = data.read(8)
186
        # concat byte fields
187
        b = f1 + f2 + f3 + f4
188
189
        return uuid.UUID(bytes=b)
190
191
192
class _Primitive1(_Primitive):
193
    def __init__(self, fmt):
194
        self.struct = struct.Struct(fmt)
195
        self.size = self.struct.size
196
        self.format = self.struct.format
197
198
    def pack(self, data):
199
        return struct.pack(self.format, data)
200
201
    def unpack(self, data):
202
        return struct.unpack(self.format, data.read(self.size))[0]
203
204
205
class Primitives1(object):
206
    SByte = _Primitive1("<b")
207
    Int16 = _Primitive1("<h")
208
    Int32 = _Primitive1("<i")
209
    Int64 = _Primitive1("<q")
210
    Byte = _Primitive1("<B")
211
    Char = Byte
212
    UInt16 = _Primitive1("<H")
213
    UInt32 = _Primitive1("<I")
214
    UInt64 = _Primitive1("<Q")
215
    Boolean = _Primitive1("<?")
216
    Double = _Primitive1("<d")
217
    Float = _Primitive1("<f")
218
219
220
class Primitives(Primitives1):
221
    Null = _Null()
222
    String = _String()
223
    Bytes = _Bytes()
224
    ByteString = _Bytes()
225
    CharArray = _String()
226
    DateTime = _DateTime()
227
    Guid = _Guid()
228
229
230
def pack_uatype_array(vtype, array):
231
    if array is None:
232
        return b'\xff\xff\xff\xff'
233
    length = len(array)
234
    b = [pack_uatype(vtype, val) for val in array]
235
    b.insert(0, Primitives.Int32.pack(length))
236
    return b"".join(b)
237
238
239
def pack_uatype(vtype, value):
240
    if hasattr(Primitives, vtype.name):
241
        return getattr(Primitives, vtype.name).pack(value)
242
    elif vtype.value > 25:
243
        return Primitives.Bytes.pack(value)
244
    elif vtype == ua.VariantType.ExtensionObject:
245
        return extensionobject_to_binary(value)
246
    elif vtype in (ua.VariantType.NodeId, ua.VariantType.ExpandedNodeId):
247
        return nodeid_to_binary(value)
248
    elif vtype == ua.VariantType.Variant:
249
        return variant_to_binary(value)
250
    else:
251
        return struct_to_binary(value)
252
253
254
def unpack_uatype(vtype, data):
255
    if hasattr(Primitives, vtype.name):
256
        st = getattr(Primitives, vtype.name)
257
        return st.unpack(data)
258
    elif vtype.value > 25:
259
        return Primitives.Bytes.unpack(data)
260
    elif vtype == ua.VariantType.ExtensionObject:
261
        return extensionobject_from_binary(data)
262
    elif vtype in (ua.VariantType.NodeId, ua.VariantType.ExpandedNodeId):
263
        return nodeid_from_binary(data)
264
    elif vtype == ua.VariantType.Variant:
265
        return variant_from_binary(data)
266
    else:
267
        if hasattr(ua, vtype.name):
268
            klass = getattr(ua, vtype.name)
269
            return struct_from_binary(klass, data)
270
        else:
271
            raise UaError("Cannot unpack unknown variant type {0!s}".format(vtype))
272
273
274
def unpack_uatype_array(vtype, data):
275
    if hasattr(Primitives, vtype.name):
276
        st = getattr(Primitives, vtype.name)
277
        return st.unpack_array(data)
278
    else:
279
        length = Primitives.Int32.unpack(data)
280
        if length == -1:
281
            return None
282
        else:
283
            return [unpack_uatype(vtype, data) for _ in range(length)]
284
285
286
def struct_to_binary(obj):
287
    packet = []
288
    has_switch = hasattr(obj, "ua_switches")
289
    if has_switch:
290
        for name, switch in obj.ua_switches.items():
291
            member = getattr(obj, name)
292
            container_name, idx = switch
293
            if member is not None:
294
                container_val = getattr(obj, container_name)
295
                container_val = container_val | 1 << idx
296
                setattr(obj, container_name, container_val)
297
    for name, uatype in obj.ua_types:
298
        val = getattr(obj, name)
299
        if uatype.startswith("ListOf"):
300
            packet.append(list_to_binary(val, uatype[6:]))
301
        else:
302
            if has_switch and val is None and name in obj.ua_switches:
303
                pass
304
            else:
305
                packet.append(to_binary(val, uatype))
306
    return b''.join(packet)
307
308
309
def to_binary(val, uatype):
310
    """
311
    Pack a python object to binary given a string defining its type
312
    """
313
    if isinstance(val, (list, tuple)):
314
        length = len(val)
315
        b = [to_binary(el, uatype) for el in val]
316
        b.insert(0, Primitives.Int32.pack(length))
317
        return b"".join(b)
318
    elif isinstance(uatype, (str, unicode)) and hasattr(ua.VariantType, uatype):
319
        vtype = getattr(ua.VariantType, uatype)
320
        return pack_uatype(vtype, val)
321
    elif isinstance(val, (IntEnum, Enum)):
322
        return Primitives.UInt32.pack(val.value)
323
    elif isinstance(val, ua.NodeId):
324
        return nodeid_to_binary(val)
325
    elif isinstance(val, ua.Variant):
326
        return variant_to_binary(val)
327
    elif hasattr(val, "ua_types"):
328
        return struct_to_binary(val)
329
    else:
330
        raise UaError("No known way to pack {} of type {} to ua binary".format(val, uatype))
331
332
333
def list_to_binary(val, uatype):
334
    if val is None:
335
        return Primitives.Int32.pack(-1)
336
    else:
337
        pack = []
338
        pack.append(Primitives.Int32.pack(len(val)))
339
        for el in val:
340
            pack.append(to_binary(el, uatype))
341
        return b''.join(pack)
342
343
344
def nodeid_to_binary(nodeid):
345
    if nodeid.NodeIdType == ua.NodeIdType.TwoByte:
346
        return struct.pack("<BB", nodeid.NodeIdType.value, nodeid.Identifier)
347
    elif nodeid.NodeIdType == ua.NodeIdType.FourByte:
348
        return struct.pack("<BBH", nodeid.NodeIdType.value, nodeid.NamespaceIndex, nodeid.Identifier)
349
    elif nodeid.NodeIdType == ua.NodeIdType.Numeric:
350
        return struct.pack("<BHI", nodeid.NodeIdType.value, nodeid.NamespaceIndex, nodeid.Identifier)
351
    elif nodeid.NodeIdType == ua.NodeIdType.String:
352
        return struct.pack("<BH", nodeid.NodeIdType.value, nodeid.NamespaceIndex) + \
353
            Primitives.String.pack(nodeid.Identifier)
354
    elif nodeid.NodeIdType == ua.NodeIdType.ByteString:
355
        return struct.pack("<BH", nodeid.NodeIdType.value, nodeid.NamespaceIndex) + \
356
            Primitives.Bytes.pack(nodeid.Identifier)
357
    elif nodeid.NodeIdType == ua.NodeIdType.Guid:
358
        return struct.pack("<BH", nodeid.NodeIdType.value, nodeid.NamespaceIndex) + \
359
               Primitives.Guid.pack(nodeid.Identifier)
360
    else:
361
        return struct.pack("<BH", nodeid.NodeIdType.value, nodeid.NamespaceIndex) + \
362
            nodeid.Identifier.to_binary()
363
    # FIXME: Missing NNamespaceURI and ServerIndex
364
365
366
def nodeid_from_binary(data):
367
    nid = ua.NodeId()
368
    encoding = ord(data.read(1))
369
    nid.NodeIdType = ua.NodeIdType(encoding & 0b00111111)
370
371
    if nid.NodeIdType == ua.NodeIdType.TwoByte:
372
        nid.Identifier = ord(data.read(1))
373
    elif nid.NodeIdType == ua.NodeIdType.FourByte:
374
        nid.NamespaceIndex, nid.Identifier = struct.unpack("<BH", data.read(3))
375
    elif nid.NodeIdType == ua.NodeIdType.Numeric:
376
        nid.NamespaceIndex, nid.Identifier = struct.unpack("<HI", data.read(6))
377
    elif nid.NodeIdType == ua.NodeIdType.String:
378
        nid.NamespaceIndex = Primitives.UInt16.unpack(data)
379
        nid.Identifier = Primitives.String.unpack(data)
380
    elif nid.NodeIdType == ua.NodeIdType.ByteString:
381
        nid.NamespaceIndex = Primitives.UInt16.unpack(data)
382
        nid.Identifier = Primitives.Bytes.unpack(data)
383
    elif nid.NodeIdType == ua.NodeIdType.Guid:
384
        nid.NamespaceIndex = Primitives.UInt16.unpack(data)
385
        nid.Identifier = Primitives.Guid.unpack(data)
386
    else:
387
        raise UaError("Unknown NodeId encoding: " + str(nid.NodeIdType))
388
389
    if test_bit(encoding, 7):
390
        nid.NamespaceUri = Primitives.String.unpack(data)
391
    if test_bit(encoding, 6):
392
        nid.ServerIndex = Primitives.UInt32.unpack(data)
393
394
    return nid
395
396
397
def variant_to_binary(var):
398
    b = []
399
    encoding = var.VariantType.value & 0b111111
400
    if var.is_array or isinstance(var.Value, (list, tuple)):
401
        var.is_array = True
402
        encoding = set_bit(encoding, 7)
403
        if var.Dimensions is not None:
404
            encoding = set_bit(encoding, 6)
405
        b.append(Primitives.Byte.pack(encoding))
406
        b.append(pack_uatype_array(var.VariantType, ua.flatten(var.Value)))
407
        if var.Dimensions is not None:
408
            b.append(pack_uatype_array(ua.VariantType.Int32, var.Dimensions))
409
    else:
410
        b.append(Primitives.Byte.pack(encoding))
411
        b.append(pack_uatype(var.VariantType, var.Value))
412
413
    return b"".join(b)
414
415
416
def variant_from_binary(data):
417
    dimensions = None
418
    array = False
419
    encoding = ord(data.read(1))
420
    int_type = encoding & 0b00111111
421
    vtype = ua.datatype_to_varianttype(int_type)
422
    if test_bit(encoding, 7):
423
        value = unpack_uatype_array(vtype, data)
424
        array = True
425
    else:
426
        value = unpack_uatype(vtype, data)
427
    if test_bit(encoding, 6):
428
        dimensions = unpack_uatype_array(ua.VariantType.Int32, data)
429
        value = reshape(value, dimensions)
430
    return ua.Variant(value, vtype, dimensions, is_array=array)
431
432
433
def reshape(flat, dims):
434
    subdims = dims[1:]
435
    subsize = 1
436
    for i in subdims:
437
        if i == 0:
438
            i = 1
439
        subsize *= i
440
    while dims[0] * subsize > len(flat):
441
        flat.append([])
442
    if not subdims or subdims == [0]:
443
        return flat
444
    return [reshape(flat[i:i + subsize], subdims) for i in range(0, len(flat), subsize)]
445
446
447
def extensionobject_from_binary(data):
448
    """
449
    Convert binary-coded ExtensionObject to a Python object.
450
    Returns an object, or None if TypeId is zero
451
    """
452
    typeid = nodeid_from_binary(data)
453
    Encoding = ord(data.read(1))
454
    body = None
455
    if Encoding & (1 << 0):
456
        length = Primitives.Int32.unpack(data)
457
        if length < 1:
458
            body = Buffer(b"")
459
        else:
460
            body = data.copy(length)
461
            data.skip(length)
462
    if typeid.Identifier == 0:
463
        return None
464
    elif typeid in ua.extension_object_classes:
465
        klass = ua.extension_object_classes[typeid]
466
        if body is None:
467
            raise UaError("parsing ExtensionObject {0} without data".format(klass.__name__))
468
        return from_binary(klass, body)
469
    else:
470
        e = ua.ExtensionObject()
471
        e.TypeId = typeid
472
        e.Encoding = Encoding
473
        if body is not None:
474
            e.Body = body.read(len(body))
475
        return e
476
477
478
def extensionobject_to_binary(obj):
479
    """
480
    Convert Python object to binary-coded ExtensionObject.
481
    If obj is None, convert to empty ExtensionObject (TypeId = 0, no Body).
482
    Returns a binary string
483
    """
484
    if isinstance(obj, ua.ExtensionObject):
485
        return struct_to_binary(obj)
486
    if obj is None:
487
        TypeId = ua.NodeId()
488
        Encoding = 0
489
        Body = None
490
    else:
491
        TypeId = ua.extension_object_ids[obj.__class__.__name__]
492
        Encoding = 0x01
493
        Body = struct_to_binary(obj)
494
    packet = []
495
    packet.append(nodeid_to_binary(TypeId))
496
    packet.append(Primitives.Byte.pack(Encoding))
497
    if Body:
498
        packet.append(Primitives.Bytes.pack(Body))
499
    return b''.join(packet)
500
501
502
def from_binary(uatype, data):
503
    """
504
    unpack data given an uatype as a string or a python class having a ua_types memeber
505
    """
506
    if isinstance(uatype, (str, unicode)) and uatype.startswith("ListOf"):
507
        size = Primitives.Int32.unpack(data)
508
        if size == -1:
509
            return None
510
        res = []
511
        for _ in range(size):
512
            res.append(from_binary(uatype[6:], data))
513
        return res
514
    elif isinstance(uatype, (str, unicode)) and hasattr(ua.VariantType, uatype):
515
        vtype = getattr(ua.VariantType, uatype)
516
        return unpack_uatype(vtype, data)
517
    else:
518
        return struct_from_binary(uatype, data)
519
520
521
def struct_from_binary(objtype, data):
522
    """
523
    unpack an ua struct. Arguments are an objtype as Python class or string
524
    """
525
    if isinstance(objtype, (unicode, str)):
526
        objtype = getattr(ua, objtype)
527
    if issubclass(objtype, Enum):
528
        return objtype(Primitives.UInt32.unpack(data))
529
    obj = objtype()
530
    for name, uatype in obj.ua_types:
531
        # if our member has a swtich and it is not set we skip it
532
        if hasattr(obj, "ua_switches") and name in obj.ua_switches:
533
            container_name, idx = obj.ua_switches[name]
534
            val = getattr(obj, container_name)
535
            if not test_bit(val, idx):
536
                continue
537
        val = from_binary(uatype, data)
538
        setattr(obj, name, val)
539
    return obj
540
541
542
def header_to_binary(hdr):
543
    b = []
544
    b.append(struct.pack("<3ss", hdr.MessageType, hdr.ChunkType))
545
    size = hdr.body_size + 8
546
    if hdr.MessageType in (ua.MessageType.SecureOpen, ua.MessageType.SecureClose, ua.MessageType.SecureMessage):
547
        size += 4
548
    b.append(Primitives.UInt32.pack(size))
549
    if hdr.MessageType in (ua.MessageType.SecureOpen, ua.MessageType.SecureClose, ua.MessageType.SecureMessage):
550
        b.append(Primitives.UInt32.pack(hdr.ChannelId))
551
    return b"".join(b)
552
553
554
def header_from_binary(data):
555
    hdr = ua.Header()
556
    hdr.MessageType, hdr.ChunkType, hdr.packet_size = struct.unpack("<3scI", data.read(8))
557
    hdr.body_size = hdr.packet_size - 8
558
    if hdr.MessageType in (ua.MessageType.SecureOpen, ua.MessageType.SecureClose, ua.MessageType.SecureMessage):
559
        hdr.body_size -= 4
560
        hdr.ChannelId = Primitives.UInt32.unpack(data)
561
    return hdr
562