Test Failed
Pull Request — master (#490)
by Olivier
02:56
created

to_binary()   B

Complexity

Conditions 5

Size

Total Lines 10

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 30

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 5
c 1
b 0
f 0
dl 0
loc 10
ccs 0
cts 0
cp 0
crap 30
rs 8.5454
1
"""
2
Binary protocol specific functions and constants
3
"""
4
5 1
import sys
6 1
import struct
7 1
import logging
8 1
from datetime import datetime, timedelta, tzinfo, MAXYEAR
9 1
from calendar import timegm
10 1
import uuid
11
12 1
from opcua.ua.uaerrors import UaError
13
from opcua.ua.uatypes import UaError
14
15 1
16
if sys.version_info.major > 2:
17
    unicode = str
18 1
19
logger = logging.getLogger('__name__')
20 1
21 1
EPOCH_AS_FILETIME = 116444736000000000  # January 1, 1970 as MS file time
22 1
HUNDREDS_OF_NANOSECONDS = 10000000
23
FILETIME_EPOCH_AS_DATETIME = datetime(1601, 1, 1)
24
25 1
26 1
def test_bit(data, offset):
27 1
    mask = 1 << offset
28
    return data & mask
29
30 1
31 1
def set_bit(data, offset):
32 1
    mask = 1 << offset
33
    return data | mask
34
35 1
36 1
def unset_bit(data, offset):
37 1
    mask = 1 << offset
38
    return data & ~mask
39
40 1
41
class UTC(tzinfo):
42
    """
43
    UTC
44
    """
45 1
46
    def utcoffset(self, dt):
47
        return timedelta(0)
48 1
49
    def tzname(self, dt):
50
        return "UTC"
51 1
52 1
    def dst(self, dt):
53
        return timedelta(0)
54
55
56 1
# method copied from David Buxton <[email protected]> sample code
57 1
def datetime_to_win_epoch(dt):
58 1
    if (dt.tzinfo is None) or (dt.tzinfo.utcoffset(dt) is None):
59 1
        dt = dt.replace(tzinfo=UTC())
60 1
    ft = EPOCH_AS_FILETIME + (timegm(dt.timetuple()) * HUNDREDS_OF_NANOSECONDS)
61
    return ft + (dt.microsecond * 10)
62
63 1
64 1
def win_epoch_to_datetime(epch):
65 1
    try:
66
        return FILETIME_EPOCH_AS_DATETIME + timedelta(microseconds=epch // 10)
67
    except OverflowError:
68
        # FILETIMEs after 31 Dec 9999 can't be converted to datetime
69
        logger.warning("datetime overflow: %s", epch)
70
        return datetime(MAXYEAR, 12, 31, 23, 59, 59, 999999)
71
72 1
73
def build_array_format_py2(prefix, length, fmtchar):
74
    return prefix + str(length) + fmtchar
75
76 1
77
def build_array_format_py3(prefix, length, fmtchar):
78
    return prefix + str(length) + chr(fmtchar)
79
80 1
81 1
if sys.version_info.major < 3:
82
    build_array_format = build_array_format_py2
83
else:
84
    build_array_format = build_array_format_py3
85
86 1
87
class _Primitive(object):
88 1
89 1
    def pack_array(self, array):
90
        if array is None:
91 1
            return b'\xff\xff\xff\xff'
92 1
        length = len(array)
93 1
        b = [self.pack(val) for val in array]
94 1
        b.insert(0, Primitives.Int32.pack(length))
95
        return b"".join(b)
96 1
97 1
    def unpack_array(self, data):
98 1
        length = Primitives.Int32.unpack(data)
99 1
        if length == -1:
100 1
            return None
101 1
        elif length == 0:
102
            return []
103 1
        else:
104
            return [self.unpack(data) for _ in range(length)]
105
106 1
107
class _DateTime(_Primitive):
108 1
109
    @staticmethod
110 1
    def pack(dt):
111 1
        epch = datetime_to_win_epoch(dt)
112
        return Primitives.Int64.pack(epch)
113 1
114
    @staticmethod
115 1
    def unpack(data):
116 1
        epch = Primitives.Int64.unpack(data)
117
        return win_epoch_to_datetime(epch)
118
119 1
120
class _String(_Primitive):
121 1
122
    @staticmethod
123 1
    def pack(string):
124 1
        if string is None:
125 1
            return Primitives.Int32.pack(-1)
126
        if isinstance(string, unicode):
127 1
            string = string.encode('utf-8')
128 1
        length = len(string)
129
        return Primitives.Int32.pack(length) + string
130 1
131
    @staticmethod
132 1
    def unpack(data):
133 1
        b = _Bytes.unpack(data)
134 1
        if sys.version_info.major < 3:
135
            return b
136
        else:
137
            if b is None:
138
                return b
139
            return b.decode("utf-8")
140
141 1
142
class _Bytes(_Primitive):
143 1
144
    @staticmethod
145 1
    def pack(data):
146
        return _String.pack(data)
147 1
148
    @staticmethod
149 1
    def unpack(data):
150 1
        length = Primitives.Int32.unpack(data)
151 1
        if length == -1:
152 1
            return None
153
        return data.read(length)
154
155 1
156
class _Null(_Primitive):
157 1
158
    @staticmethod
159 1
    def pack(data):
160
        return b""
161 1
162
    @staticmethod
163 1
    def unpack(data):
164
        return None
165
166 1
167
class _Guid(_Primitive):
168 1
169
    @staticmethod
170
    def pack(guid):
171 1
        # convert python UUID 6 field format to OPC UA 4 field format
172 1
        f1 = Primitives.UInt32.pack(guid.time_low)
173 1
        f2 = Primitives.UInt16.pack(guid.time_mid)
174 1
        f3 = Primitives.UInt16.pack(guid.time_hi_version)
175 1
        f4a = Primitives.Byte.pack(guid.clock_seq_hi_variant)
176 1
        f4b = Primitives.Byte.pack(guid.clock_seq_low)
177 1
        f4c = struct.pack('>Q', guid.node)[2:8]  # no primitive .pack available for 6 byte int
178
        f4 = f4a+f4b+f4c
179 1
        # concat byte fields
180
        b = f1+f2+f3+f4
181 1
182
        return b
183 1
184
    @staticmethod
185
    def unpack(data):
186 1
        # convert OPC UA 4 field format to python UUID bytes
187 1
        f1 = struct.pack('>I', Primitives.UInt32.unpack(data))
188 1
        f2 = struct.pack('>H', Primitives.UInt16.unpack(data))
189 1
        f3 = struct.pack('>H', Primitives.UInt16.unpack(data))
190
        f4 = data.read(8)
191 1
        # concat byte fields
192
        b = f1 + f2 + f3 + f4
193 1
194
        return uuid.UUID(bytes=b)
195
196 1
197 1
class _Primitive1(_Primitive):
198 1
    def __init__(self, fmt):
199 1
        self.struct = struct.Struct(fmt)
200 1
        self.size = self.struct.size
201
        self.format = self.struct.format
202 1
203 1
    def pack(self, data):
204
        return struct.pack(self.format, data)
205 1
206 1
    def unpack(self, data):
207
        return struct.unpack(self.format, data.read(self.size))[0]
208
    
209
    #def pack_array(self, array):
210
        #"""
211
        #Basically the same as the method in _Primitive but MAYBE a bit more efficient....
212
        #"""
213
        #if array is None:
214
            #return b'\xff\xff\xff\xff'
215
        #length = len(array)
216
        #if length == 0:
217
            #return b'\x00\x00\x00\x00'
218
        #if length == 1:
219
            #return b'\x01\x00\x00\x00' + self.pack(array[0])
220
        #return struct.pack(build_array_format("<i", length, self.format[1]), length, *array)
221
222 1
223 1
class Primitives1(object):
224 1
    Int8 = _Primitive1("<b")
225 1
    SByte = Int8
226 1
    Int16 = _Primitive1("<h")
227 1
    Int32 = _Primitive1("<i")
228 1
    Int64 = _Primitive1("<q")
229 1
    UInt8 = _Primitive1("<B")
230 1
    Char = UInt8
231 1
    Byte = UInt8
232 1
    UInt16 = _Primitive1("<H")
233 1
    UInt32 = _Primitive1("<I")
234 1
    UInt64 = _Primitive1("<Q")
235 1
    Boolean = _Primitive1("<?")
236 1
    Double = _Primitive1("<d")
237
    Float = _Primitive1("<f")
238
239 1
240 1
class Primitives(Primitives1):
241 1
    Null = _Null()
242 1
    String = _String()
243 1
    Bytes = _Bytes()
244 1
    ByteString = _Bytes()
245 1
    CharArray = _String()
246 1
    DateTime = _DateTime()
247
    Guid = _Guid()
248
249 1
250 1
def pack_uatype_array(vtype, array):
251 1
    if array is None:
252 1
        return b'\xff\xff\xff\xff'
253 1
    length = len(array)
254 1
    b = [pack_uatype(vtype, val) for val in array]
255 1
    b.insert(0, Primitives.Int32.pack(length))
256
    return b"".join(b)
257
258 1
259 1
def pack_uatype(vtype, value):
260 1
    if hasattr(Primitives, vtype.name):
261 1
        return getattr(Primitives, vtype.name).pack(value)
262 1
    elif vtype.value > 25:
263 1
        return Primitives.Bytes.pack(value)
264
    elif vtype.name == "ExtensionObject":
265
        # dependency loop: classes in uaprotocol_auto use Variant defined in this file,
266
        # but Variant can contain any object from uaprotocol_auto as ExtensionObject.
267 1
        # Using local import to avoid import loop
268 1
        from opcua.ua import extensionobject_to_binary
269
        return extensionobject_to_binary(value)
270 1
    else:
271 1
        try:
272 1
            return value.to_binary()
273 1
        except AttributeError:
274
            raise UaError("{0} could not be packed with value {1}".format(vtype, value))
275
276 1
277 1
def unpack_uatype(vtype, data):
278 1
    if hasattr(Primitives, vtype.name):
279 1
        st = getattr(Primitives, vtype.name)
280 1
        return st.unpack(data)
281 1
    elif vtype.value > 25:
282 1
        return Primitives.Bytes.unpack(data)
283
    elif vtype.name == "ExtensionObject":
284
        # dependency loop: classes in uaprotocol_auto use Variant defined in this file,
285
        # but Variant can contain any object from uaprotocol_auto as ExtensionObject.
286 1
        # Using local import to avoid import loop
287 1
        from opcua.ua import extensionobject_from_binary
288
        return extensionobject_from_binary(data)
289 1
    else:
290 1
        from opcua.ua import uatypes
291 1
        if hasattr(uatypes, vtype.name):
292 1
            klass = getattr(uatypes, vtype.name)
293
            return klass.from_binary(data)
294
        else:
295
            raise UaError("can not unpack unknown vtype {0!s}".format(vtype))
296
297 1
298 1
def unpack_uatype_array(vtype, data):
299 1
    if hasattr(Primitives, vtype.name):
300 1
        st = getattr(Primitives, vtype.name)
301
        return st.unpack_array(data)
302 1
    else:
303 1
        length = Primitives.Int32.unpack(data)
304
        if length == -1:
305
            return None
306 1
        else:
307
            return [unpack_uatype(vtype, data) for _ in range(length)]
308
309
310
"""
311
def to_binary(obj):
312
    if isintance(obj, NodeId):
313
        return nodeid_to_binary(obj)
314
    elif hasattr(val, "ua_types"):
315
        return to_binary(val)
316
    else:
317
        raise ValueError("Do not know how to convert {} to binary: {}".format(type(obj), obj))
318
"""
319
320
321
def struct_to_binary(obj):
322
    packet = []
323
    has_switch = hasattr(obj, "ua_switches")
324
    if has_switch:
325
        for name, switch in obj.ua_switches.items():
326
            member = getattr(obj, name)
327
            container_name, idx = switch
328
            if member is not None:
329
                container_val = getattr(obj, container_name)
330
                container_val = container_val | 1 << idx
331
                setattr(obj, container_name, container_val)
332
    for name, uatype in obj.ua_types:
333
        val = getattr(obj, name)
334
        if uatype.startswith("ListOf"):
335
            packet.append(list_to_binary(val, uatype))
336
        else:
337
            if has_switch and val is None and name in obj.ua_switches:
338
                    pass
339
            else:
340
                packet.append(to_binary(val, uatype))
341
    return b''.join(packet)
342
343
344
def to_binary(val, uatype):
345
    if hasattr(Primitives, uatype):
346
        st = getattr(Primitives, uatype)
347
        return st.pack(val)
348
    elif hasattr(val, "ua_types"):
349
        return struct_to_binary(val)
350
    elif uatype == "NodeId":
351
        return nodeid_to_binary(val)
352
    elif uatype == "Variant":
353
        return variant_to_binary(val)
354
355
356
def list_to_binary(val, uatype):
357
    if val is None:
358
        return Primitives.Int32.pack(-1)
359
    else:
360
        pack = []
361
        pack.append(Primitives.Int32.pack(len(val)))
362
        for el in val:
363
            pack.append(to_binary(el, uatype))
364
        return b''.join(pack)
365
366
367
def variant_to_binary(var):
368
    b = []
369
    encoding = var.VariantType.value & 0b111111
370
    if var.is_array or isinstance(var.Value, (list, tuple)):
371
        var.is_array = True
372
        encoding = set_bit(encoding, 7)
373
        if var.Dimensions is not None:
374
            encoding = set_bit(encoding, 6)
375
        b.append(Primitives.UInt8.pack(encoding))
376
        b.append(pack_uatype_array(var.VariantType, flatten(var.Value)))
377
        if var.Dimensions is not None:
378
            b.append(pack_uatype_array(VariantType.Int32, var.Dimensions))
379
    else:
380
        b.append(Primitives.UInt8.pack(encoding))
381
        b.append(pack_uatype(var.VariantType, var.Value))
382
383
    return b"".join(b)
384
385
386
def variant_from_binary(data):
387
    dimensions = None
388
    array = False
389
    encoding = ord(data.read(1))
390
    int_type = encoding & 0b00111111
391
    vtype = ua.datatype_to_varianttype(int_type)
392
    if test_bit(encoding, 7):
393
        value = unpack_uatype_array(vtype, data)
394
        array = True
395
    else:
396
        value = unpack_uatype(vtype, data)
397
    if test_bit(encoding, 6):
398
        dimensions = unpack_uatype_array(VariantType.Int32, data)
399
        value = reshape(value, dimensions)
400
    return ua.Variant(value, vtype, dimensions, is_array=array)
401
402
403
def reshape(flat, dims):
404
    subdims = dims[1:]
405
    subsize = 1
406
    for i in subdims:
407
        if i == 0:
408
            i = 1
409
        subsize *= i
410
    while dims[0] * subsize > len(flat):
411
        flat.append([])
412
    if not subdims or subdims == [0]:
413
        return flat
414
    return [reshape(flat[i: i + subsize], subdims) for i in range(0, len(flat), subsize)]
415
416
417
def extensionobject_from_binary(data):
418
    """
419
    Convert binary-coded ExtensionObject to a Python object.
420
    Returns an object, or None if TypeId is zero
421
    """
422
    typeid = NodeId.from_binary(data)
423
    Encoding = ord(data.read(1))
424
    body = None
425
    if Encoding & (1 << 0):
426
        length = uabin.Primitives.Int32.unpack(data)
427
        if length < 1:
428
            body = Buffer(b"")
429
        else:
430
            body = data.copy(length)
431
            data.skip(length)
432
    if typeid.Identifier == 0:
433
        return None
434
    elif typeid in extension_object_classes:
435
        klass = extension_object_classes[typeid]
436
        if body is None:
437
            raise UaError("parsing ExtensionObject {0} without data".format(klass.__name__))
438
        return klass.from_binary(body)
439
    else:
440
        e = ExtensionObject()
441
        e.TypeId = typeid
442
        e.Encoding = Encoding
443
        if body is not None:
444
            e.Body = body.read(len(body))
445
        return e
446
447
448
def extensionobject_to_binary(obj):
449
    """
450
    Convert Python object to binary-coded ExtensionObject.
451
    If obj is None, convert to empty ExtensionObject (TypeId = 0, no Body).
452
    Returns a binary string
453
    """
454
    if isinstance(obj, ExtensionObject):
455
        return obj.to_binary()
456
    if obj is None:
457
        TypeId = NodeId()
458
        Encoding = 0
459
        Body = None
460
    else:
461
        TypeId = extension_object_ids[obj.__class__.__name__]
462
        Encoding = 0x01
463
        Body = obj.to_binary()
464
    packet = []
465
    packet.append(TypeId.to_binary())
466
    packet.append(uabin.Primitives.UInt8.pack(Encoding))
467
    if Body:
468
        packet.append(uabin.Primitives.Bytes.pack(Body))
469
    return b''.join(packet)
470