Test Failed
Pull Request — master (#490)
by Olivier
03:12
created

nodeid_to_binary()   B

Complexity

Conditions 7

Size

Total Lines 19

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 56

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 7
dl 0
loc 19
ccs 0
cts 0
cp 0
crap 56
rs 7.3333
c 1
b 0
f 0
1
"""
2
Binary protocol specific functions and constants
3
"""
4
5 1
import sys
6 1
import struct
7 1
import logging
8 1
from datetime import datetime, timedelta, tzinfo, MAXYEAR
9 1
from calendar import timegm
10 1
import uuid
11
from enum import IntEnum, Enum
12 1
13
from opcua.ua.uaerrors import UaError
14
from opcua.common.utils import Buffer
15 1
from opcua import ua
16
17
18 1
if sys.version_info.major > 2:
19
    unicode = str
20 1
21 1
logger = logging.getLogger('__name__')
22 1
23
EPOCH_AS_FILETIME = 116444736000000000  # January 1, 1970 as MS file time
24
HUNDREDS_OF_NANOSECONDS = 10000000
25 1
FILETIME_EPOCH_AS_DATETIME = datetime(1601, 1, 1)
26 1
27 1
28
def test_bit(data, offset):
29
    mask = 1 << offset
30 1
    return data & mask
31 1
32 1
33
def set_bit(data, offset):
34
    mask = 1 << offset
35 1
    return data | mask
36 1
37 1
38
def unset_bit(data, offset):
39
    mask = 1 << offset
40 1
    return data & ~mask
41
42
43
class UTC(tzinfo):
44
    """
45 1
    UTC
46
    """
47
48 1
    def utcoffset(self, dt):
49
        return timedelta(0)
50
51 1
    def tzname(self, dt):
52 1
        return "UTC"
53
54
    def dst(self, dt):
55
        return timedelta(0)
56 1
57 1
58 1
# method copied from David Buxton <[email protected]> sample code
59 1
def datetime_to_win_epoch(dt):
60 1
    if (dt.tzinfo is None) or (dt.tzinfo.utcoffset(dt) is None):
61
        dt = dt.replace(tzinfo=UTC())
62
    ft = EPOCH_AS_FILETIME + (timegm(dt.timetuple()) * HUNDREDS_OF_NANOSECONDS)
63 1
    return ft + (dt.microsecond * 10)
64 1
65 1
66
def win_epoch_to_datetime(epch):
67
    try:
68
        return FILETIME_EPOCH_AS_DATETIME + timedelta(microseconds=epch // 10)
69
    except OverflowError:
70
        # FILETIMEs after 31 Dec 9999 can't be converted to datetime
71
        logger.warning("datetime overflow: %s", epch)
72 1
        return datetime(MAXYEAR, 12, 31, 23, 59, 59, 999999)
73
74
75
def build_array_format_py2(prefix, length, fmtchar):
76 1
    return prefix + str(length) + fmtchar
77
78
79
def build_array_format_py3(prefix, length, fmtchar):
80 1
    return prefix + str(length) + chr(fmtchar)
81 1
82
83
if sys.version_info.major < 3:
84
    build_array_format = build_array_format_py2
85
else:
86 1
    build_array_format = build_array_format_py3
87
88 1
89 1
class _Primitive(object):
90
91 1
    def pack_array(self, array):
92 1
        if array is None:
93 1
            return b'\xff\xff\xff\xff'
94 1
        length = len(array)
95
        b = [self.pack(val) for val in array]
96 1
        b.insert(0, Primitives.Int32.pack(length))
97 1
        return b"".join(b)
98 1
99 1
    def unpack_array(self, data):
100 1
        length = Primitives.Int32.unpack(data)
101 1
        if length == -1:
102
            return None
103 1
        elif length == 0:
104
            return []
105
        else:
106 1
            return [self.unpack(data) for _ in range(length)]
107
108 1
109
class _DateTime(_Primitive):
110 1
111 1
    @staticmethod
112
    def pack(dt):
113 1
        epch = datetime_to_win_epoch(dt)
114
        return Primitives.Int64.pack(epch)
115 1
116 1
    @staticmethod
117
    def unpack(data):
118
        epch = Primitives.Int64.unpack(data)
119 1
        return win_epoch_to_datetime(epch)
120
121 1
122
class _String(_Primitive):
123 1
124 1
    @staticmethod
125 1
    def pack(string):
126
        if string is None:
127 1
            return Primitives.Int32.pack(-1)
128 1
        if isinstance(string, unicode):
129
            string = string.encode('utf-8')
130 1
        length = len(string)
131
        return Primitives.Int32.pack(length) + string
132 1
133 1
    @staticmethod
134 1
    def unpack(data):
135
        b = _Bytes.unpack(data)
136
        if sys.version_info.major < 3:
137
            return b
138
        else:
139
            if b is None:
140
                return b
141 1
            return b.decode("utf-8")
142
143 1
144
class _Bytes(_Primitive):
145 1
146
    @staticmethod
147 1
    def pack(data):
148
        return _String.pack(data)
149 1
150 1
    @staticmethod
151 1
    def unpack(data):
152 1
        length = Primitives.Int32.unpack(data)
153
        if length == -1:
154
            return None
155 1
        return data.read(length)
156
157 1
158
class _Null(_Primitive):
159 1
160
    @staticmethod
161 1
    def pack(data):
162
        return b""
163 1
164
    @staticmethod
165
    def unpack(data):
166 1
        return None
167
168 1
169
class _Guid(_Primitive):
170
171 1
    @staticmethod
172 1
    def pack(guid):
173 1
        # convert python UUID 6 field format to OPC UA 4 field format
174 1
        f1 = Primitives.UInt32.pack(guid.time_low)
175 1
        f2 = Primitives.UInt16.pack(guid.time_mid)
176 1
        f3 = Primitives.UInt16.pack(guid.time_hi_version)
177 1
        f4a = Primitives.Byte.pack(guid.clock_seq_hi_variant)
178
        f4b = Primitives.Byte.pack(guid.clock_seq_low)
179 1
        f4c = struct.pack('>Q', guid.node)[2:8]  # no primitive .pack available for 6 byte int
180
        f4 = f4a+f4b+f4c
181 1
        # concat byte fields
182
        b = f1+f2+f3+f4
183 1
184
        return b
185
186 1
    @staticmethod
187 1
    def unpack(data):
188 1
        # convert OPC UA 4 field format to python UUID bytes
189 1
        f1 = struct.pack('>I', Primitives.UInt32.unpack(data))
190
        f2 = struct.pack('>H', Primitives.UInt16.unpack(data))
191 1
        f3 = struct.pack('>H', Primitives.UInt16.unpack(data))
192
        f4 = data.read(8)
193 1
        # concat byte fields
194
        b = f1 + f2 + f3 + f4
195
196 1
        return uuid.UUID(bytes=b)
197 1
198 1
199 1
class _Primitive1(_Primitive):
200 1
    def __init__(self, fmt):
201
        self.struct = struct.Struct(fmt)
202 1
        self.size = self.struct.size
203 1
        self.format = self.struct.format
204
205 1
    def pack(self, data):
206 1
        return struct.pack(self.format, data)
207
208
    def unpack(self, data):
209
        return struct.unpack(self.format, data.read(self.size))[0]
210
    
211
    #def pack_array(self, array):
212
        #"""
213
        #Basically the same as the method in _Primitive but MAYBE a bit more efficient....
214
        #"""
215
        #if array is None:
216
            #return b'\xff\xff\xff\xff'
217
        #length = len(array)
218
        #if length == 0:
219
            #return b'\x00\x00\x00\x00'
220
        #if length == 1:
221
            #return b'\x01\x00\x00\x00' + self.pack(array[0])
222 1
        #return struct.pack(build_array_format("<i", length, self.format[1]), length, *array)
223 1
224 1
225 1
class Primitives1(object):
226 1
    Int8 = _Primitive1("<b")
227 1
    SByte = Int8
228 1
    Int16 = _Primitive1("<h")
229 1
    Int32 = _Primitive1("<i")
230 1
    Int64 = _Primitive1("<q")
231 1
    UInt8 = _Primitive1("<B")
232 1
    Char = UInt8
233 1
    Byte = UInt8
234 1
    UInt16 = _Primitive1("<H")
235 1
    UInt32 = _Primitive1("<I")
236 1
    UInt64 = _Primitive1("<Q")
237
    Boolean = _Primitive1("<?")
238
    Double = _Primitive1("<d")
239 1
    Float = _Primitive1("<f")
240 1
241 1
242 1
class Primitives(Primitives1):
243 1
    Null = _Null()
244 1
    String = _String()
245 1
    Bytes = _Bytes()
246 1
    ByteString = _Bytes()
247
    CharArray = _String()
248
    DateTime = _DateTime()
249 1
    Guid = _Guid()
250 1
251 1
252 1
def pack_uatype_array(vtype, array):
253 1
    if array is None:
254 1
        return b'\xff\xff\xff\xff'
255 1
    length = len(array)
256
    b = [pack_uatype(vtype, val) for val in array]
257
    b.insert(0, Primitives.Int32.pack(length))
258 1
    return b"".join(b)
259 1
260 1
261 1
def pack_uatype(vtype, value):
262 1
    if hasattr(Primitives, vtype.name):
263 1
        return getattr(Primitives, vtype.name).pack(value)
264
    elif vtype.value > 25:
265
        return Primitives.Bytes.pack(value)
266
    elif vtype.name == "ExtensionObject":
267 1
        return extensionobject_to_binary(value)
268 1
    else:
269
        try:
270 1
            return value.to_binary()
271 1
        except AttributeError:
272 1
            raise UaError("{0} could not be packed with value {1}".format(vtype, value))
273 1
274
275
def unpack_uatype(vtype, data):
276 1
    if hasattr(Primitives, vtype.name):
277 1
        st = getattr(Primitives, vtype.name)
278 1
        return st.unpack(data)
279 1
    elif vtype.value > 25:
280 1
        return Primitives.Bytes.unpack(data)
281 1
    elif vtype.name == "ExtensionObject":
282 1
        return extensionobject_from_binary(data)
283
    else:
284
        from opcua.ua import uatypes
285
        if hasattr(uatypes, vtype.name):
286 1
            klass = getattr(uatypes, vtype.name)
287 1
            return klass.from_binary(data)
288
        else:
289 1
            raise UaError("can not unpack unknown vtype {0!s}".format(vtype))
290 1
291 1
292 1
def unpack_uatype_array(vtype, data):
293
    if hasattr(Primitives, vtype.name):
294
        st = getattr(Primitives, vtype.name)
295
        return st.unpack_array(data)
296
    else:
297 1
        length = Primitives.Int32.unpack(data)
298 1
        if length == -1:
299 1
            return None
300 1
        else:
301
            return [unpack_uatype(vtype, data) for _ in range(length)]
302 1
303 1
304
"""
305
def to_binary(obj):
306 1
    if isintance(obj, NodeId):
307
        return nodeid_to_binary(obj)
308
    elif hasattr(val, "ua_types"):
309
        return to_binary(val)
310
    else:
311
        raise ValueError("Do not know how to convert {} to binary: {}".format(type(obj), obj))
312
"""
313
314
315
def struct_to_binary(obj):
316
    packet = []
317
    has_switch = hasattr(obj, "ua_switches")
318
    if has_switch:
319
        for name, switch in obj.ua_switches.items():
320
            member = getattr(obj, name)
321
            container_name, idx = switch
322
            print("Test for", name, member)
323
            if member is not None:
324
                container_val = getattr(obj, container_name)
325
                container_val = container_val | 1 << idx
326
                setattr(obj, container_name, container_val)
327
    for name, uatype in obj.ua_types:
328
        print(name, uatype)
329
        val = getattr(obj, name)
330
        if uatype.startswith("ListOf"):
331
            packet.append(list_to_binary(val, uatype[6:]))
332
        else:
333
            if has_switch and val is None and name in obj.ua_switches:
334
                pass
335
            else:
336
                print("append", val, uatype)
337
                packet.append(to_binary(val, uatype))
338
    return b''.join(packet)
339
340
341
def to_binary(val, uatype=None):
342
    print("TB", val, uatype)
343
    if isinstance(uatype, (str, unicode)) and hasattr(Primitives, uatype):
344
        st = getattr(Primitives, uatype)
345
        return st.pack(val)
346
    elif isinstance(val, (IntEnum, Enum)):
347
        return Primitives.UInt32.pack(val.value)
348
    elif isinstance(val, ua.NodeId):
349
        return nodeid_to_binary(val)
350
    elif isinstance(val, ua.Header):
351
        return header_to_binary(val)
352
    elif isinstance(val, ua.Variant):
353
        return variant_to_binary(val)
354
    elif uatype == "ExtensionObject":
355
        #pack val into an extensionobject
356
        return extensionobject_to_binary(val)
357
    elif hasattr(val, "ua_types"):
358
        return struct_to_binary(val)
359
    else:
360
        raise ValueError("No known way to pack {} of type {} to ua binary".format(val, uatype))
361
362
363
def list_to_binary(val, uatype):
364
    if val is None:
365
        return Primitives.Int32.pack(-1)
366
    else:
367
        pack = []
368
        pack.append(Primitives.Int32.pack(len(val)))
369
        for el in val:
370
            pack.append(to_binary(el, uatype))
371
        return b''.join(pack)
372
373
374
def nodeid_to_binary(nodeid):
375
    if nodeid.NodeIdType == ua.NodeIdType.TwoByte:
376
        return struct.pack("<BB", nodeid.NodeIdType.value, nodeid.Identifier)
377
    elif nodeid.NodeIdType == ua.NodeIdType.FourByte:
378
        return struct.pack("<BBH", nodeid.NodeIdType.value, nodeid.NamespaceIndex, nodeid.Identifier)
379
    elif nodeid.NodeIdType == ua.NodeIdType.Numeric:
380
        return struct.pack("<BHI", nodeid.NodeIdType.value, nodeid.NamespaceIndex, nodeid.Identifier)
381
    elif nodeid.NodeIdType == ua.NodeIdType.String:
382
        return struct.pack("<BH", nodeid.NodeIdType.value, nodeid.NamespaceIndex) + \
383
            Primitives.String.pack(nodeid.Identifier)
384
    elif nodeid.NodeIdType == ua.NodeIdType.ByteString:
385
        return struct.pack("<BH", nodeid.NodeIdType.value, nodeid.NamespaceIndex) + \
386
            Primitives.Bytes.pack(nodeid.Identifier)
387
    elif nodeid.NodeIdType == ua.NodeIdType.Guid:
388
        return struct.pack("<BH", nodeid.NodeIdType.value, nodeid.NamespaceIndex) + \
389
               Primitives.Guid.pack(nodeid.Identifier)
390
    else:
391
        return struct.pack("<BH", nodeid.NodeIdType.value, nodeid.NamespaceIndex) + \
392
            nodeid.Identifier.to_binary()
393
    # FIXME: Missing NNamespaceURI and ServerIndex
394
395
396
def nodeid_from_binary(data):
397
    nid = ua.NodeId()
398
    encoding = ord(data.read(1))
399
    nid.NodeIdType = ua.NodeIdType(encoding & 0b00111111)
400
401
    if nid.NodeIdType == ua.NodeIdType.TwoByte:
402
        nid.Identifier = ord(data.read(1))
403
    elif nid.NodeIdType == ua.NodeIdType.FourByte:
404
        nid.NamespaceIndex, nid.Identifier = struct.unpack("<BH", data.read(3))
405
    elif nid.NodeIdType == ua.NodeIdType.Numeric:
406
        nid.NamespaceIndex, nid.Identifier = struct.unpack("<HI", data.read(6))
407
    elif nid.NodeIdType == ua.NodeIdType.String:
408
        nid.NamespaceIndex = Primitives.UInt16.unpack(data)
409
        nid.Identifier = Primitives.String.unpack(data)
410
    elif nid.NodeIdType == ua.NodeIdType.ByteString:
411
        nid.NamespaceIndex = Primitives.UInt16.unpack(data)
412
        nid.Identifier = Primitives.Bytes.unpack(data)
413
    elif nid.NodeIdType == ua.NodeIdType.Guid:
414
        nid.NamespaceIndex = Primitives.UInt16.unpack(data)
415
        nid.Identifier = Primitives.Guid.unpack(data)
416
    else:
417
        raise UaError("Unknown NodeId encoding: " + str(nid.NodeIdType))
418
419
    if test_bit(encoding, 7):
420
        nid.NamespaceUri = Primitives.String.unpack(data)
421
    if test_bit(encoding, 6):
422
        nid.ServerIndex = Primitives.UInt32.unpack(data)
423
424
    return nid
425
426
427
def variant_to_binary(var):
428
    b = []
429
    encoding = var.VariantType.value & 0b111111
430
    if var.is_array or isinstance(var.Value, (list, tuple)):
431
        var.is_array = True
432
        encoding = set_bit(encoding, 7)
433
        if var.Dimensions is not None:
434
            encoding = set_bit(encoding, 6)
435
        b.append(Primitives.UInt8.pack(encoding))
436
        b.append(pack_uatype_array(var.VariantType, ua.flatten(var.Value)))
437
        if var.Dimensions is not None:
438
            b.append(pack_uatype_array(ua.VariantType.Int32, var.Dimensions))
439
    else:
440
        b.append(Primitives.UInt8.pack(encoding))
441
        b.append(pack_uatype(var.VariantType, var.Value))
442
443
    return b"".join(b)
444
445
446
def variant_from_binary(data):
447
    dimensions = None
448
    array = False
449
    encoding = ord(data.read(1))
450
    int_type = encoding & 0b00111111
451
    vtype = ua.datatype_to_varianttype(int_type)
452
    if test_bit(encoding, 7):
453
        value = unpack_uatype_array(vtype, data)
454
        array = True
455
    else:
456
        value = unpack_uatype(vtype, data)
457
    if test_bit(encoding, 6):
458
        dimensions = unpack_uatype_array(ua.VariantType.Int32, data)
459
        value = reshape(value, dimensions)
460
    return ua.Variant(value, vtype, dimensions, is_array=array)
461
462
463
def reshape(flat, dims):
464
    subdims = dims[1:]
465
    subsize = 1
466
    for i in subdims:
467
        if i == 0:
468
            i = 1
469
        subsize *= i
470
    while dims[0] * subsize > len(flat):
471
        flat.append([])
472
    if not subdims or subdims == [0]:
473
        return flat
474
    return [reshape(flat[i: i + subsize], subdims) for i in range(0, len(flat), subsize)]
475
476
477
def extensionobject_from_binary(data):
478
    """
479
    Convert binary-coded ExtensionObject to a Python object.
480
    Returns an object, or None if TypeId is zero
481
    """
482
    typeid = nodeid_from_binary(data)
483
    Encoding = ord(data.read(1))
484
    body = None
485
    if Encoding & (1 << 0):
486
        length = Primitives.Int32.unpack(data)
487
        if length < 1:
488
            body = Buffer(b"")
489
        else:
490
            body = data.copy(length)
491
            data.skip(length)
492
    if typeid.Identifier == 0:
493
        return None
494
    elif typeid in ua.extension_object_classes:
495
        klass = ua.extension_object_classes[typeid]
496
        if body is None:
497
            raise UaError("parsing ExtensionObject {0} without data".format(klass.__name__))
498
        return from_binary(klass, body)
499
    else:
500
        e = ua.ExtensionObject()
501
        e.TypeId = typeid
502
        e.Encoding = Encoding
503
        if body is not None:
504
            e.Body = body.read(len(body))
505
        return e
506
507
508
def extensionobject_to_binary(obj):
509
    """
510
    Convert Python object to binary-coded ExtensionObject.
511
    If obj is None, convert to empty ExtensionObject (TypeId = 0, no Body).
512
    Returns a binary string
513
    """
514
    if isinstance(obj, ua.ExtensionObject):
515
        return obj.to_binary()
516
    if obj is None:
517
        TypeId = ua.NodeId()
518
        Encoding = 0
519
        Body = None
520
    else:
521
        TypeId = ua.extension_object_ids[obj.__class__.__name__]
522
        Encoding = 0x01
523
        Body = to_binary(obj)
524
    packet = []
525
    packet.append(to_binary(TypeId))
526
    packet.append(Primitives.UInt8.pack(Encoding))
527
    if Body:
528
        packet.append(Primitives.Bytes.pack(Body))
529
    return b''.join(packet)
530
531
532
def from_binary(uatype, data):
533
    print("FROM BIN", uatype, data)
534
    if isinstance(uatype, (str, unicode)) and uatype.startswith("ListOf"):
535
        size = Primitives.Int32.unpack(data)
536
        res = []
537
        for _ in range(size):
538
            res.append(from_binary(uatype[6:], data))
539
        return res
540
541
    elif isinstance(uatype, (str, unicode)) and hasattr(Primitives, uatype):
542
        st = getattr(Primitives, uatype)
543
        res = st.unpack(data)
544
        return res
545
        return st.unpack(data)
546
    elif uatype == ua.NodeId or uatype in ("NodeId", "ExpandedNodeId"):
547
        return nodeid_from_binary(data)
548
    elif uatype == ua.Variant or uatype == "Variant":
549
        return variant_from_binary(data)
550
    else:
551
        return struct_from_binary(uatype, data)
552
553
554
def struct_from_binary(objtype, data):
555
    print("\nSfB", objtype, data, "\n")
556
    if isinstance(objtype, (unicode, str)):
557
        objtype = getattr(ua, objtype)
558
    print("OBJTYPE", objtype, type(objtype))
559
    if issubclass(objtype, Enum):
560
        return objtype(Primitives.UInt32.unpack(data))
561
    obj = objtype()
562
    print("LOOK", obj.ua_types)
563
    for name, uatype in obj.ua_types:
564
        print("AN", name, uatype)
565
    for name, uatype in obj.ua_types:
566
        # if our member has a swtich and it is not set we skip it
567
        if hasattr(obj, "ua_switches") and name in obj.ua_switches:
568
            container_name, idx = obj.ua_switches[name]
569
            val = getattr(obj, container_name)
570
            if not test_bit(val, idx):
571
                continue
572
        val = from_binary(uatype, data) 
573
        setattr(obj, name, val)
574
    print("RET", obj)
575
    return obj
576
577
578
def header_to_binary(hdr):
579
    b = []
580
    b.append(struct.pack("<3ss", hdr.MessageType, hdr.ChunkType))
581
    size = hdr.body_size + 8
582
    if hdr.MessageType in (ua.MessageType.SecureOpen, ua.MessageType.SecureClose, ua.MessageType.SecureMessage):
583
        size += 4
584
    b.append(Primitives.UInt32.pack(size))
585
    if hdr.MessageType in (ua.MessageType.SecureOpen, ua.MessageType.SecureClose, ua.MessageType.SecureMessage):
586
        b.append(Primitives.UInt32.pack(hdr.ChannelId))
587
    return b"".join(b)
588
589
590
def header_from_binary(data):
591
    hdr = ua.Header()
592
    hdr.MessageType, hdr.ChunkType, hdr.packet_size = struct.unpack("<3scI", data.read(8))
593
    hdr.body_size = hdr.packet_size - 8
594
    if hdr.MessageType in (ua.MessageType.SecureOpen, ua.MessageType.SecureClose, ua.MessageType.SecureMessage):
595
        hdr.body_size -= 4
596
        hdr.ChannelId = Primitives.UInt32.unpack(data)
597
    return hdr
598
599