Test Failed
Pull Request — master (#490)
by Olivier
02:59
created

from_binary()   A

Complexity

Conditions 4

Size

Total Lines 10

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 20

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 4
dl 0
loc 10
ccs 0
cts 0
cp 0
crap 20
rs 9.2
c 1
b 0
f 0
1
"""
2
Binary protocol specific functions and constants
3
"""
4
5 1
import sys
6 1
import struct
7 1
import logging
8 1
from datetime import datetime, timedelta, tzinfo, MAXYEAR
9 1
from calendar import timegm
10 1
import uuid
11
12 1
from opcua.ua.uaerrors import UaError
13
from opcua.common.utils import Buffer
14
from opcua import ua
15 1
16
17
if sys.version_info.major > 2:
18 1
    unicode = str
19
20 1
logger = logging.getLogger('__name__')
21 1
22 1
EPOCH_AS_FILETIME = 116444736000000000  # January 1, 1970 as MS file time
23
HUNDREDS_OF_NANOSECONDS = 10000000
24
FILETIME_EPOCH_AS_DATETIME = datetime(1601, 1, 1)
25 1
26 1
27 1
def test_bit(data, offset):
28
    mask = 1 << offset
29
    return data & mask
30 1
31 1
32 1
def set_bit(data, offset):
33
    mask = 1 << offset
34
    return data | mask
35 1
36 1
37 1
def unset_bit(data, offset):
38
    mask = 1 << offset
39
    return data & ~mask
40 1
41
42
class UTC(tzinfo):
43
    """
44
    UTC
45 1
    """
46
47
    def utcoffset(self, dt):
48 1
        return timedelta(0)
49
50
    def tzname(self, dt):
51 1
        return "UTC"
52 1
53
    def dst(self, dt):
54
        return timedelta(0)
55
56 1
57 1
# method copied from David Buxton <[email protected]> sample code
58 1
def datetime_to_win_epoch(dt):
59 1
    if (dt.tzinfo is None) or (dt.tzinfo.utcoffset(dt) is None):
60 1
        dt = dt.replace(tzinfo=UTC())
61
    ft = EPOCH_AS_FILETIME + (timegm(dt.timetuple()) * HUNDREDS_OF_NANOSECONDS)
62
    return ft + (dt.microsecond * 10)
63 1
64 1
65 1
def win_epoch_to_datetime(epch):
66
    try:
67
        return FILETIME_EPOCH_AS_DATETIME + timedelta(microseconds=epch // 10)
68
    except OverflowError:
69
        # FILETIMEs after 31 Dec 9999 can't be converted to datetime
70
        logger.warning("datetime overflow: %s", epch)
71
        return datetime(MAXYEAR, 12, 31, 23, 59, 59, 999999)
72 1
73
74
def build_array_format_py2(prefix, length, fmtchar):
75
    return prefix + str(length) + fmtchar
76 1
77
78
def build_array_format_py3(prefix, length, fmtchar):
79
    return prefix + str(length) + chr(fmtchar)
80 1
81 1
82
if sys.version_info.major < 3:
83
    build_array_format = build_array_format_py2
84
else:
85
    build_array_format = build_array_format_py3
86 1
87
88 1
class _Primitive(object):
89 1
90
    def pack_array(self, array):
91 1
        if array is None:
92 1
            return b'\xff\xff\xff\xff'
93 1
        length = len(array)
94 1
        b = [self.pack(val) for val in array]
95
        b.insert(0, Primitives.Int32.pack(length))
96 1
        return b"".join(b)
97 1
98 1
    def unpack_array(self, data):
99 1
        length = Primitives.Int32.unpack(data)
100 1
        if length == -1:
101 1
            return None
102
        elif length == 0:
103 1
            return []
104
        else:
105
            return [self.unpack(data) for _ in range(length)]
106 1
107
108 1
class _DateTime(_Primitive):
109
110 1
    @staticmethod
111 1
    def pack(dt):
112
        epch = datetime_to_win_epoch(dt)
113 1
        return Primitives.Int64.pack(epch)
114
115 1
    @staticmethod
116 1
    def unpack(data):
117
        epch = Primitives.Int64.unpack(data)
118
        return win_epoch_to_datetime(epch)
119 1
120
121 1
class _String(_Primitive):
122
123 1
    @staticmethod
124 1
    def pack(string):
125 1
        if string is None:
126
            return Primitives.Int32.pack(-1)
127 1
        if isinstance(string, unicode):
128 1
            string = string.encode('utf-8')
129
        length = len(string)
130 1
        return Primitives.Int32.pack(length) + string
131
132 1
    @staticmethod
133 1
    def unpack(data):
134 1
        b = _Bytes.unpack(data)
135
        if sys.version_info.major < 3:
136
            return b
137
        else:
138
            if b is None:
139
                return b
140
            return b.decode("utf-8")
141 1
142
143 1
class _Bytes(_Primitive):
144
145 1
    @staticmethod
146
    def pack(data):
147 1
        return _String.pack(data)
148
149 1
    @staticmethod
150 1
    def unpack(data):
151 1
        length = Primitives.Int32.unpack(data)
152 1
        if length == -1:
153
            return None
154
        return data.read(length)
155 1
156
157 1
class _Null(_Primitive):
158
159 1
    @staticmethod
160
    def pack(data):
161 1
        return b""
162
163 1
    @staticmethod
164
    def unpack(data):
165
        return None
166 1
167
168 1
class _Guid(_Primitive):
169
170
    @staticmethod
171 1
    def pack(guid):
172 1
        # convert python UUID 6 field format to OPC UA 4 field format
173 1
        f1 = Primitives.UInt32.pack(guid.time_low)
174 1
        f2 = Primitives.UInt16.pack(guid.time_mid)
175 1
        f3 = Primitives.UInt16.pack(guid.time_hi_version)
176 1
        f4a = Primitives.Byte.pack(guid.clock_seq_hi_variant)
177 1
        f4b = Primitives.Byte.pack(guid.clock_seq_low)
178
        f4c = struct.pack('>Q', guid.node)[2:8]  # no primitive .pack available for 6 byte int
179 1
        f4 = f4a+f4b+f4c
180
        # concat byte fields
181 1
        b = f1+f2+f3+f4
182
183 1
        return b
184
185
    @staticmethod
186 1
    def unpack(data):
187 1
        # convert OPC UA 4 field format to python UUID bytes
188 1
        f1 = struct.pack('>I', Primitives.UInt32.unpack(data))
189 1
        f2 = struct.pack('>H', Primitives.UInt16.unpack(data))
190
        f3 = struct.pack('>H', Primitives.UInt16.unpack(data))
191 1
        f4 = data.read(8)
192
        # concat byte fields
193 1
        b = f1 + f2 + f3 + f4
194
195
        return uuid.UUID(bytes=b)
196 1
197 1
198 1
class _Primitive1(_Primitive):
199 1
    def __init__(self, fmt):
200 1
        self.struct = struct.Struct(fmt)
201
        self.size = self.struct.size
202 1
        self.format = self.struct.format
203 1
204
    def pack(self, data):
205 1
        return struct.pack(self.format, data)
206 1
207
    def unpack(self, data):
208
        return struct.unpack(self.format, data.read(self.size))[0]
209
    
210
    #def pack_array(self, array):
211
        #"""
212
        #Basically the same as the method in _Primitive but MAYBE a bit more efficient....
213
        #"""
214
        #if array is None:
215
            #return b'\xff\xff\xff\xff'
216
        #length = len(array)
217
        #if length == 0:
218
            #return b'\x00\x00\x00\x00'
219
        #if length == 1:
220
            #return b'\x01\x00\x00\x00' + self.pack(array[0])
221
        #return struct.pack(build_array_format("<i", length, self.format[1]), length, *array)
222 1
223 1
224 1
class Primitives1(object):
225 1
    Int8 = _Primitive1("<b")
226 1
    SByte = Int8
227 1
    Int16 = _Primitive1("<h")
228 1
    Int32 = _Primitive1("<i")
229 1
    Int64 = _Primitive1("<q")
230 1
    UInt8 = _Primitive1("<B")
231 1
    Char = UInt8
232 1
    Byte = UInt8
233 1
    UInt16 = _Primitive1("<H")
234 1
    UInt32 = _Primitive1("<I")
235 1
    UInt64 = _Primitive1("<Q")
236 1
    Boolean = _Primitive1("<?")
237
    Double = _Primitive1("<d")
238
    Float = _Primitive1("<f")
239 1
240 1
241 1
class Primitives(Primitives1):
242 1
    Null = _Null()
243 1
    String = _String()
244 1
    Bytes = _Bytes()
245 1
    ByteString = _Bytes()
246 1
    CharArray = _String()
247
    DateTime = _DateTime()
248
    Guid = _Guid()
249 1
250 1
251 1
def pack_uatype_array(vtype, array):
252 1
    if array is None:
253 1
        return b'\xff\xff\xff\xff'
254 1
    length = len(array)
255 1
    b = [pack_uatype(vtype, val) for val in array]
256
    b.insert(0, Primitives.Int32.pack(length))
257
    return b"".join(b)
258 1
259 1
260 1
def pack_uatype(vtype, value):
261 1
    if hasattr(Primitives, vtype.name):
262 1
        return getattr(Primitives, vtype.name).pack(value)
263 1
    elif vtype.value > 25:
264
        return Primitives.Bytes.pack(value)
265
    elif vtype.name == "ExtensionObject":
266
        return extensionobject_to_binary(value)
267 1
    else:
268 1
        try:
269
            return value.to_binary()
270 1
        except AttributeError:
271 1
            raise UaError("{0} could not be packed with value {1}".format(vtype, value))
272 1
273 1
274
def unpack_uatype(vtype, data):
275
    if hasattr(Primitives, vtype.name):
276 1
        st = getattr(Primitives, vtype.name)
277 1
        return st.unpack(data)
278 1
    elif vtype.value > 25:
279 1
        return Primitives.Bytes.unpack(data)
280 1
    elif vtype.name == "ExtensionObject":
281 1
        return extensionobject_from_binary(data)
282 1
    else:
283
        from opcua.ua import uatypes
284
        if hasattr(uatypes, vtype.name):
285
            klass = getattr(uatypes, vtype.name)
286 1
            return klass.from_binary(data)
287 1
        else:
288
            raise UaError("can not unpack unknown vtype {0!s}".format(vtype))
289 1
290 1
291 1
def unpack_uatype_array(vtype, data):
292 1
    if hasattr(Primitives, vtype.name):
293
        st = getattr(Primitives, vtype.name)
294
        return st.unpack_array(data)
295
    else:
296
        length = Primitives.Int32.unpack(data)
297 1
        if length == -1:
298 1
            return None
299 1
        else:
300 1
            return [unpack_uatype(vtype, data) for _ in range(length)]
301
302 1
303 1
"""
304
def to_binary(obj):
305
    if isintance(obj, NodeId):
306 1
        return nodeid_to_binary(obj)
307
    elif hasattr(val, "ua_types"):
308
        return to_binary(val)
309
    else:
310
        raise ValueError("Do not know how to convert {} to binary: {}".format(type(obj), obj))
311
"""
312
313
314
def struct_to_binary(obj):
315
    packet = []
316
    has_switch = hasattr(obj, "ua_switches")
317
    if has_switch:
318
        for name, switch in obj.ua_switches.items():
319
            member = getattr(obj, name)
320
            container_name, idx = switch
321
            if member is not None:
322
                container_val = getattr(obj, container_name)
323
                container_val = container_val | 1 << idx
324
                setattr(obj, container_name, container_val)
325
    for name, uatype in obj.ua_types:
326
        val = getattr(obj, name)
327
        if uatype.startswith("ListOf"):
328
            packet.append(list_to_binary(val, uatype))
329
        else:
330
            if has_switch and val is None and name in obj.ua_switches:
331
                pass
332
            else:
333
                packet.append(to_binary(val, uatype))
334
    return b''.join(packet)
335
336
337
def to_binary(val, uatype):
338
    if hasattr(Primitives, uatype):
339
        st = getattr(Primitives, uatype)
340
        return st.pack(val)
341
    elif hasattr(val, "ua_types"):
342
        return struct_to_binary(val)
343
    elif uatype == "NodeId":
344
        return nodeid_to_binary(val)
345
    elif uatype == "Variant":
346
        return variant_to_binary(val)
347
    else:
348
        raise ValueError("Cannot pack {} of type {} to ua binary".format(val, uatype))
349
350
351
def list_to_binary(val, uatype):
352
    if val is None:
353
        return Primitives.Int32.pack(-1)
354
    else:
355
        pack = []
356
        pack.append(Primitives.Int32.pack(len(val)))
357
        for el in val:
358
            pack.append(to_binary(el, uatype))
359
        return b''.join(pack)
360
361
362
def nodeid_to_binary(nodeid):
363
    if nodeid.NodeIdType == ua.NodeIdType.TwoByte:
364
        return struct.pack("<BB", nodeid.NodeIdType.value, nodeid.Identifier)
365
    elif nodeid.NodeIdType == ua.NodeIdType.FourByte:
366
        return struct.pack("<BBH", nodeid.NodeIdType.value, nodeid.NamespaceIndex, nodeid.Identifier)
367
    elif nodeid.NodeIdType == ua.NodeIdType.Numeric:
368
        return struct.pack("<BHI", nodeid.NodeIdType.value, nodeid.NamespaceIndex, nodeid.Identifier)
369
    elif nodeid.NodeIdType == ua.NodeIdType.String:
370
        return struct.pack("<BH", nodeid.NodeIdType.value, nodeid.NamespaceIndex) + \
371
            Primitives.String.pack(nodeid.Identifier)
372
    elif nodeid.NodeIdType == ua.NodeIdType.ByteString:
373
        return struct.pack("<BH", nodeid.NodeIdType.value, nodeid.NamespaceIndex) + \
374
            Primitives.Bytes.pack(nodeid.Identifier)
375
    elif nodeid.NodeIdType == ua.NodeIdType.Guid:
376
        return struct.pack("<BH", nodeid.NodeIdType.value, nodeid.NamespaceIndex) + \
377
               Primitives.Guid.pack(nodeid.Identifier)
378
    else:
379
        return struct.pack("<BH", nodeid.NodeIdType.value, nodeid.NamespaceIndex) + \
380
            nodeid.Identifier.to_binary()
381
    # FIXME: Missing NNamespaceURI and ServerIndex
382
383
384
def nodeid_from_binary(data):
385
    nid = ua.NodeId()
386
    encoding = ord(data.read(1))
387
    nid.NodeIdType = ua.NodeIdType(encoding & 0b00111111)
388
389
    if nid.NodeIdType == ua.NodeIdType.TwoByte:
390
        nid.Identifier = ord(data.read(1))
391
    elif nid.NodeIdType == ua.NodeIdType.FourByte:
392
        nid.NamespaceIndex, nid.Identifier = struct.unpack("<BH", data.read(3))
393
    elif nid.NodeIdType == ua.NodeIdType.Numeric:
394
        nid.NamespaceIndex, nid.Identifier = struct.unpack("<HI", data.read(6))
395
    elif nid.NodeIdType == ua.NodeIdType.String:
396
        nid.NamespaceIndex = Primitives.UInt16.unpack(data)
397
        nid.Identifier = Primitives.String.unpack(data)
398
    elif nid.NodeIdType == ua.NodeIdType.ByteString:
399
        nid.NamespaceIndex = Primitives.UInt16.unpack(data)
400
        nid.Identifier = Primitives.Bytes.unpack(data)
401
    elif nid.NodeIdType == ua.NodeIdType.Guid:
402
        nid.NamespaceIndex = Primitives.UInt16.unpack(data)
403
        nid.Identifier = Primitives.Guid.unpack(data)
404
    else:
405
        raise UaError("Unknown NodeId encoding: " + str(nid.NodeIdType))
406
407
    if test_bit(encoding, 7):
408
        nid.NamespaceUri = Primitives.String.unpack(data)
409
    if test_bit(encoding, 6):
410
        nid.ServerIndex = Primitives.UInt32.unpack(data)
411
412
    return nid
413
414
415
def variant_to_binary(var):
416
    b = []
417
    encoding = var.VariantType.value & 0b111111
418
    if var.is_array or isinstance(var.Value, (list, tuple)):
419
        var.is_array = True
420
        encoding = set_bit(encoding, 7)
421
        if var.Dimensions is not None:
422
            encoding = set_bit(encoding, 6)
423
        b.append(Primitives.UInt8.pack(encoding))
424
        b.append(pack_uatype_array(var.VariantType, ua.flatten(var.Value)))
425
        if var.Dimensions is not None:
426
            b.append(pack_uatype_array(ua.VariantType.Int32, var.Dimensions))
427
    else:
428
        b.append(Primitives.UInt8.pack(encoding))
429
        b.append(pack_uatype(var.VariantType, var.Value))
430
431
    return b"".join(b)
432
433
434
def variant_from_binary(data):
435
    dimensions = None
436
    array = False
437
    encoding = ord(data.read(1))
438
    int_type = encoding & 0b00111111
439
    vtype = ua.datatype_to_varianttype(int_type)
440
    if test_bit(encoding, 7):
441
        value = unpack_uatype_array(vtype, data)
442
        array = True
443
    else:
444
        value = unpack_uatype(vtype, data)
445
    if test_bit(encoding, 6):
446
        dimensions = unpack_uatype_array(ua.VariantType.Int32, data)
447
        value = reshape(value, dimensions)
448
    return ua.Variant(value, vtype, dimensions, is_array=array)
449
450
451
def reshape(flat, dims):
452
    subdims = dims[1:]
453
    subsize = 1
454
    for i in subdims:
455
        if i == 0:
456
            i = 1
457
        subsize *= i
458
    while dims[0] * subsize > len(flat):
459
        flat.append([])
460
    if not subdims or subdims == [0]:
461
        return flat
462
    return [reshape(flat[i: i + subsize], subdims) for i in range(0, len(flat), subsize)]
463
464
465
def extensionobject_from_binary(data):
466
    """
467
    Convert binary-coded ExtensionObject to a Python object.
468
    Returns an object, or None if TypeId is zero
469
    """
470
    typeid = nodeid_from_binary(data)
471
    Encoding = ord(data.read(1))
472
    body = None
473
    if Encoding & (1 << 0):
474
        length = Primitives.Int32.unpack(data)
475
        if length < 1:
476
            body = Buffer(b"")
477
        else:
478
            body = data.copy(length)
479
            data.skip(length)
480
    if typeid.Identifier == 0:
481
        return None
482
    elif typeid in ua.extension_object_classes:
483
        klass = ua.extension_object_classes[typeid]
484
        if body is None:
485
            raise UaError("parsing ExtensionObject {0} without data".format(klass.__name__))
486
        return klass.from_binary(body)
487
    else:
488
        e = ua.ExtensionObject()
489
        e.TypeId = typeid
490
        e.Encoding = Encoding
491
        if body is not None:
492
            e.Body = body.read(len(body))
493
        return e
494
495
496
def extensionobject_to_binary(obj):
497
    """
498
    Convert Python object to binary-coded ExtensionObject.
499
    If obj is None, convert to empty ExtensionObject (TypeId = 0, no Body).
500
    Returns a binary string
501
    """
502
    if isinstance(obj, ua.ExtensionObject):
503
        return obj.to_binary()
504
    if obj is None:
505
        TypeId = ua.NodeId()
506
        Encoding = 0
507
        Body = None
508
    else:
509
        TypeId = ua.extension_object_ids[obj.__class__.__name__]
510
        Encoding = 0x01
511
        Body = obj.to_binary()
512
    packet = []
513
    packet.append(TypeId.to_binary())
514
    packet.append(Primitives.UInt8.pack(Encoding))
515
    if Body:
516
        packet.append(Primitives.Bytes.pack(Body))
517
    return b''.join(packet)
518
519
520
def from_binary(uatype, data):
521
    if hasattr(Primitives, uatype):
522
        st = getattr(Primitives, uatype)
523
        return st.unpack(data)
524
    elif uatype == "NodeId":
525
        return nodeid_from_binary(data)
526
    elif uatype == "Variant":
527
        return variant_from_binary(data)
528
    else:
529
        return struct_from_binary(uatype, data)
530
531
532
def struct_from_binary(objtype, data):
533
    print("SfB", objtype, data)
534
    #obj = objtype()
535
    obj = getattr(ua, objtype)()
536
    for name, uatype in obj.ua_types:
537
        # if our member has a swtich and it is not set we skip it
538
        if hasattr(obj, "ua_switches") and name in obj.ua_switches:
539
            container_name, idx = obj.ua_switches[name]
540
            val = getattr(obj, container_name)
541
            if not test_bit(val, idx):
542
                continue
543
        val = from_binary(uatype, data) 
544
        setattr(obj, name, val)
545
    return obj
546