Test Failed
Pull Request — master (#490)
by Olivier
02:39
created

variant_to_binary()   B

Complexity

Conditions 5

Size

Total Lines 17

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 30

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 5
c 1
b 0
f 0
dl 0
loc 17
ccs 0
cts 0
cp 0
crap 30
rs 8.5454
1
"""
2
Binary protocol specific functions and constants
3
"""
4
5 1
import sys
6 1
import struct
7 1
import logging
8 1
from datetime import datetime, timedelta, tzinfo, MAXYEAR
9 1
from calendar import timegm
10 1
import uuid
11
12 1
from opcua.ua.uaerrors import UaError
13
from opcua.ua.uatypes import UaError
14
15 1
16
if sys.version_info.major > 2:
17
    unicode = str
18 1
19
logger = logging.getLogger('__name__')
20 1
21 1
EPOCH_AS_FILETIME = 116444736000000000  # January 1, 1970 as MS file time
22 1
HUNDREDS_OF_NANOSECONDS = 10000000
23
FILETIME_EPOCH_AS_DATETIME = datetime(1601, 1, 1)
24
25 1
26 1
def test_bit(data, offset):
27 1
    mask = 1 << offset
28
    return data & mask
29
30 1
31 1
def set_bit(data, offset):
32 1
    mask = 1 << offset
33
    return data | mask
34
35 1
36 1
def unset_bit(data, offset):
37 1
    mask = 1 << offset
38
    return data & ~mask
39
40 1
41
class UTC(tzinfo):
42
    """
43
    UTC
44
    """
45 1
46
    def utcoffset(self, dt):
47
        return timedelta(0)
48 1
49
    def tzname(self, dt):
50
        return "UTC"
51 1
52 1
    def dst(self, dt):
53
        return timedelta(0)
54
55
56 1
# method copied from David Buxton <[email protected]> sample code
57 1
def datetime_to_win_epoch(dt):
58 1
    if (dt.tzinfo is None) or (dt.tzinfo.utcoffset(dt) is None):
59 1
        dt = dt.replace(tzinfo=UTC())
60 1
    ft = EPOCH_AS_FILETIME + (timegm(dt.timetuple()) * HUNDREDS_OF_NANOSECONDS)
61
    return ft + (dt.microsecond * 10)
62
63 1
64 1
def win_epoch_to_datetime(epch):
65 1
    try:
66
        return FILETIME_EPOCH_AS_DATETIME + timedelta(microseconds=epch // 10)
67
    except OverflowError:
68
        # FILETIMEs after 31 Dec 9999 can't be converted to datetime
69
        logger.warning("datetime overflow: %s", epch)
70
        return datetime(MAXYEAR, 12, 31, 23, 59, 59, 999999)
71
72 1
73
def build_array_format_py2(prefix, length, fmtchar):
74
    return prefix + str(length) + fmtchar
75
76 1
77
def build_array_format_py3(prefix, length, fmtchar):
78
    return prefix + str(length) + chr(fmtchar)
79
80 1
81 1
if sys.version_info.major < 3:
82
    build_array_format = build_array_format_py2
83
else:
84
    build_array_format = build_array_format_py3
85
86 1
87
class _Primitive(object):
88 1
89 1
    def pack_array(self, array):
90
        if array is None:
91 1
            return b'\xff\xff\xff\xff'
92 1
        length = len(array)
93 1
        b = [self.pack(val) for val in array]
94 1
        b.insert(0, Primitives.Int32.pack(length))
95
        return b"".join(b)
96 1
97 1
    def unpack_array(self, data):
98 1
        length = Primitives.Int32.unpack(data)
99 1
        if length == -1:
100 1
            return None
101 1
        elif length == 0:
102
            return []
103 1
        else:
104
            return [self.unpack(data) for _ in range(length)]
105
106 1
107
class _DateTime(_Primitive):
108 1
109
    @staticmethod
110 1
    def pack(dt):
111 1
        epch = datetime_to_win_epoch(dt)
112
        return Primitives.Int64.pack(epch)
113 1
114
    @staticmethod
115 1
    def unpack(data):
116 1
        epch = Primitives.Int64.unpack(data)
117
        return win_epoch_to_datetime(epch)
118
119 1
120
class _String(_Primitive):
121 1
122
    @staticmethod
123 1
    def pack(string):
124 1
        if string is None:
125 1
            return Primitives.Int32.pack(-1)
126
        if isinstance(string, unicode):
127 1
            string = string.encode('utf-8')
128 1
        length = len(string)
129
        return Primitives.Int32.pack(length) + string
130 1
131
    @staticmethod
132 1
    def unpack(data):
133 1
        b = _Bytes.unpack(data)
134 1
        if sys.version_info.major < 3:
135
            return b
136
        else:
137
            if b is None:
138
                return b
139
            return b.decode("utf-8")
140
141 1
142
class _Bytes(_Primitive):
143 1
144
    @staticmethod
145 1
    def pack(data):
146
        return _String.pack(data)
147 1
148
    @staticmethod
149 1
    def unpack(data):
150 1
        length = Primitives.Int32.unpack(data)
151 1
        if length == -1:
152 1
            return None
153
        return data.read(length)
154
155 1
156
class _Null(_Primitive):
157 1
158
    @staticmethod
159 1
    def pack(data):
160
        return b""
161 1
162
    @staticmethod
163 1
    def unpack(data):
164
        return None
165
166 1
167
class _Guid(_Primitive):
168 1
169
    @staticmethod
170
    def pack(guid):
171 1
        # convert python UUID 6 field format to OPC UA 4 field format
172 1
        f1 = Primitives.UInt32.pack(guid.time_low)
173 1
        f2 = Primitives.UInt16.pack(guid.time_mid)
174 1
        f3 = Primitives.UInt16.pack(guid.time_hi_version)
175 1
        f4a = Primitives.Byte.pack(guid.clock_seq_hi_variant)
176 1
        f4b = Primitives.Byte.pack(guid.clock_seq_low)
177 1
        f4c = struct.pack('>Q', guid.node)[2:8]  # no primitive .pack available for 6 byte int
178
        f4 = f4a+f4b+f4c
179 1
        # concat byte fields
180
        b = f1+f2+f3+f4
181 1
182
        return b
183 1
184
    @staticmethod
185
    def unpack(data):
186 1
        # convert OPC UA 4 field format to python UUID bytes
187 1
        f1 = struct.pack('>I', Primitives.UInt32.unpack(data))
188 1
        f2 = struct.pack('>H', Primitives.UInt16.unpack(data))
189 1
        f3 = struct.pack('>H', Primitives.UInt16.unpack(data))
190
        f4 = data.read(8)
191 1
        # concat byte fields
192
        b = f1 + f2 + f3 + f4
193 1
194
        return uuid.UUID(bytes=b)
195
196 1
197 1
class _Primitive1(_Primitive):
198 1
    def __init__(self, fmt):
199 1
        self.struct = struct.Struct(fmt)
200 1
        self.size = self.struct.size
201
        self.format = self.struct.format
202 1
203 1
    def pack(self, data):
204
        return struct.pack(self.format, data)
205 1
206 1
    def unpack(self, data):
207
        return struct.unpack(self.format, data.read(self.size))[0]
208
    
209
    #def pack_array(self, array):
210
        #"""
211
        #Basically the same as the method in _Primitive but MAYBE a bit more efficient....
212
        #"""
213
        #if array is None:
214
            #return b'\xff\xff\xff\xff'
215
        #length = len(array)
216
        #if length == 0:
217
            #return b'\x00\x00\x00\x00'
218
        #if length == 1:
219
            #return b'\x01\x00\x00\x00' + self.pack(array[0])
220
        #return struct.pack(build_array_format("<i", length, self.format[1]), length, *array)
221
222 1
223 1
class Primitives1(object):
224 1
    Int8 = _Primitive1("<b")
225 1
    SByte = Int8
226 1
    Int16 = _Primitive1("<h")
227 1
    Int32 = _Primitive1("<i")
228 1
    Int64 = _Primitive1("<q")
229 1
    UInt8 = _Primitive1("<B")
230 1
    Char = UInt8
231 1
    Byte = UInt8
232 1
    UInt16 = _Primitive1("<H")
233 1
    UInt32 = _Primitive1("<I")
234 1
    UInt64 = _Primitive1("<Q")
235 1
    Boolean = _Primitive1("<?")
236 1
    Double = _Primitive1("<d")
237
    Float = _Primitive1("<f")
238
239 1
240 1
class Primitives(Primitives1):
241 1
    Null = _Null()
242 1
    String = _String()
243 1
    Bytes = _Bytes()
244 1
    ByteString = _Bytes()
245 1
    CharArray = _Bytes()
246 1
    DateTime = _DateTime()
247
    Guid = _Guid()
248
249 1
250 1
def pack_uatype_array(vtype, array):
251 1
    if array is None:
252 1
        return b'\xff\xff\xff\xff'
253 1
    length = len(array)
254 1
    b = [pack_uatype(vtype, val) for val in array]
255 1
    b.insert(0, Primitives.Int32.pack(length))
256
    return b"".join(b)
257
258 1
259 1
def pack_uatype(vtype, value):
260 1
    if hasattr(Primitives, vtype.name):
261 1
        return getattr(Primitives, vtype.name).pack(value)
262 1
    elif vtype.value > 25:
263 1
        return Primitives.Bytes.pack(value)
264
    elif vtype.name == "ExtensionObject":
265
        # dependency loop: classes in uaprotocol_auto use Variant defined in this file,
266
        # but Variant can contain any object from uaprotocol_auto as ExtensionObject.
267 1
        # Using local import to avoid import loop
268 1
        from opcua.ua import extensionobject_to_binary
269
        return extensionobject_to_binary(value)
270 1
    else:
271 1
        try:
272 1
            return value.to_binary()
273 1
        except AttributeError:
274
            raise UaError("{0} could not be packed with value {1}".format(vtype, value))
275
276 1
277 1
def unpack_uatype(vtype, data):
278 1
    if hasattr(Primitives, vtype.name):
279 1
        st = getattr(Primitives, vtype.name)
280 1
        return st.unpack(data)
281 1
    elif vtype.value > 25:
282 1
        return Primitives.Bytes.unpack(data)
283
    elif vtype.name == "ExtensionObject":
284
        # dependency loop: classes in uaprotocol_auto use Variant defined in this file,
285
        # but Variant can contain any object from uaprotocol_auto as ExtensionObject.
286 1
        # Using local import to avoid import loop
287 1
        from opcua.ua import extensionobject_from_binary
288
        return extensionobject_from_binary(data)
289 1
    else:
290 1
        from opcua.ua import uatypes
291 1
        if hasattr(uatypes, vtype.name):
292 1
            klass = getattr(uatypes, vtype.name)
293
            return klass.from_binary(data)
294
        else:
295
            raise UaError("can not unpack unknown vtype {0!s}".format(vtype))
296
297 1
298 1
def unpack_uatype_array(vtype, data):
299 1
    if hasattr(Primitives, vtype.name):
300 1
        st = getattr(Primitives, vtype.name)
301
        return st.unpack_array(data)
302 1
    else:
303 1
        length = Primitives.Int32.unpack(data)
304
        if length == -1:
305
            return None
306 1
        else:
307
            return [unpack_uatype(vtype, data) for _ in range(length)]
308
309
310
"""
311
def to_binary(obj):
312
    if isintance(obj, NodeId):
313
        return nodeid_to_binary(obj)
314
    elif hasattr(val, "ua_types"):
315
        return to_binary(val)
316
    else:
317
        raise ValueError("Do not know how to convert {} to binary: {}".format(type(obj), obj))
318
"""
319
320
321
def struct_to_binary(obj):
322
    packet = []
323
    has_switch = hasattr(obj, "ua_switches")
324
    if has_switch:
325
        for name, switch in obj.ua_switches:
326
            member = getattr(name, obj)
327
            container, idx = switch
328
            if name is not None:
329
                container |= (idx << 0)
330
    for name, uatype in obj.ua_types:
331
        val = getattr(obj, name)
332
        if uatype.startswith("ListOf"):
333
            packet.append(list_to_binary(val, uatype))
334
        else:
335
            if has_switch and val is None and name in obj.ua_switches:
336
                    pass
337
            else:
338
                packet.append(to_binary(val, uatype))
339
    return b''.join(packet)
340
341
342
def to_binary(val, uatype):
343
    if hasattr(Primitives, uatype):
344
        st = getattr(Primitives, uatype)
345
        return st.pack(val)
346
    elif hasattr(val, "ua_types"):
347
        return struct_to_binary(val)
348
    elif uatype == "NodeId":
349
        return nodeid_to_binary(val)
350
    elif uatype == "Variant":
351
        return variant_to_binary(val)
352
353
354
def list_to_binary(val, uatype):
355
    if val is None:
356
        return Primitives.Int32.pack(-1)
357
    else:
358
        pack = []
359
        pack.append(Primitives.Int32.pack(len(val)))
360
        for el in val:
361
            pack.append(to_binary(el, uatype))
362
        return b''.join(pack)
363
364
365
def variant_to_binary(var):
366
    b = []
367
    encoding = var.VariantType.value & 0b111111
368
    if var.is_array or isinstance(var.Value, (list, tuple)):
369
        var.is_array = True
370
        encoding = set_bit(encoding, 7)
371
        if var.Dimensions is not None:
372
            encoding = set_bit(encoding, 6)
373
        b.append(Primitives.UInt8.pack(encoding))
374
        b.append(pack_uatype_array(var.VariantType, flatten(var.Value)))
375
        if var.Dimensions is not None:
376
            b.append(pack_uatype_array(VariantType.Int32, var.Dimensions))
377
    else:
378
        b.append(Primitives.UInt8.pack(encoding))
379
        b.append(pack_uatype(var.VariantType, var.Value))
380
381
    return b"".join(b)
382
383
384
def variant_from_binary(data):
385
    dimensions = None
386
    array = False
387
    encoding = ord(data.read(1))
388
    int_type = encoding & 0b00111111
389
    vtype = ua.datatype_to_varianttype(int_type)
390
    if test_bit(encoding, 7):
391
        value = unpack_uatype_array(vtype, data)
392
        array = True
393
    else:
394
        value = unpack_uatype(vtype, data)
395
    if test_bit(encoding, 6):
396
        dimensions = unpack_uatype_array(VariantType.Int32, data)
397
        value = reshape(value, dimensions)
398
    return ua.Variant(value, vtype, dimensions, is_array=array)
399
400
401
def reshape(flat, dims):
402
    subdims = dims[1:]
403
    subsize = 1
404
    for i in subdims:
405
        if i == 0:
406
            i = 1
407
        subsize *= i
408
    while dims[0] * subsize > len(flat):
409
        flat.append([])
410
    if not subdims or subdims == [0]:
411
        return flat
412
    return [reshape(flat[i: i + subsize], subdims) for i in range(0, len(flat), subsize)]
413
414
415
def extensionobject_from_binary(data):
416
    """
417
    Convert binary-coded ExtensionObject to a Python object.
418
    Returns an object, or None if TypeId is zero
419
    """
420
    typeid = NodeId.from_binary(data)
421
    Encoding = ord(data.read(1))
422
    body = None
423
    if Encoding & (1 << 0):
424
        length = uabin.Primitives.Int32.unpack(data)
425
        if length < 1:
426
            body = Buffer(b"")
427
        else:
428
            body = data.copy(length)
429
            data.skip(length)
430
    if typeid.Identifier == 0:
431
        return None
432
    elif typeid in extension_object_classes:
433
        klass = extension_object_classes[typeid]
434
        if body is None:
435
            raise UaError("parsing ExtensionObject {0} without data".format(klass.__name__))
436
        return klass.from_binary(body)
437
    else:
438
        e = ExtensionObject()
439
        e.TypeId = typeid
440
        e.Encoding = Encoding
441
        if body is not None:
442
            e.Body = body.read(len(body))
443
        return e
444
445
446
def extensionobject_to_binary(obj):
447
    """
448
    Convert Python object to binary-coded ExtensionObject.
449
    If obj is None, convert to empty ExtensionObject (TypeId = 0, no Body).
450
    Returns a binary string
451
    """
452
    if isinstance(obj, ExtensionObject):
453
        return obj.to_binary()
454
    if obj is None:
455
        TypeId = NodeId()
456
        Encoding = 0
457
        Body = None
458
    else:
459
        TypeId = extension_object_ids[obj.__class__.__name__]
460
        Encoding = 0x01
461
        Body = obj.to_binary()
462
    packet = []
463
    packet.append(TypeId.to_binary())
464
    packet.append(uabin.Primitives.UInt8.pack(Encoding))
465
    if Body:
466
        packet.append(uabin.Primitives.Bytes.pack(Body))
467
    return b''.join(packet)
468