Completed
Pull Request — master (#296)
by Olivier
04:13
created

_Null   A

Complexity

Total Complexity 2

Size/Duplication

Total Lines 9
Duplicated Lines 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 9
rs 10
wmc 2

2 Methods

Rating   Name   Duplication   Size   Complexity  
A unpack() 0 3 1
A pack() 0 3 1
1
"""
2
Binary protocol specific functions and constants
3
"""
4
5
import sys
6
import struct
7
import logging
8
from datetime import datetime, timedelta, tzinfo, MAXYEAR
9
from calendar import timegm
10
11
from opcua.common.uaerrors import UaError
12
13
14
if sys.version_info.major > 2:
15
    unicode = str
16
17
logger = logging.getLogger('__name__')
18
19
EPOCH_AS_FILETIME = 116444736000000000  # January 1, 1970 as MS file time
20
HUNDREDS_OF_NANOSECONDS = 10000000
21
FILETIME_EPOCH_AS_DATETIME = datetime(1601, 1, 1)
22
23
24
def test_bit(data, offset):
25
    mask = 1 << offset
26
    return data & mask
27
28
29
def set_bit(data, offset):
30
    mask = 1 << offset
31
    return data | mask
32
33
34
def unset_bit(data, offset):
35
    mask = 1 << offset
36
    return data & ~mask
37
38
39
class UTC(tzinfo):
40
41
    """UTC"""
42
43
    def utcoffset(self, dt):
44
        return timedelta(0)
45
46
    def tzname(self, dt):
47
        return "UTC"
48
49
    def dst(self, dt):
50
        return timedelta(0)
51
52
53
# method copied from David Buxton <[email protected]> sample code
54
def datetime_to_win_epoch(dt):
55
    if (dt.tzinfo is None) or (dt.tzinfo.utcoffset(dt) is None):
56
        dt = dt.replace(tzinfo=UTC())
57
    ft = EPOCH_AS_FILETIME + (timegm(dt.timetuple()) * HUNDREDS_OF_NANOSECONDS)
58
    return ft + (dt.microsecond * 10)
59
60
61
def win_epoch_to_datetime(epch):
62
    try:
63
        return FILETIME_EPOCH_AS_DATETIME + timedelta(microseconds=epch // 10)
64
    except OverflowError:
65
        # FILETIMEs after 31 Dec 9999 can't be converted to datetime
66
        logger.warning("datetime overflow: %s", epch)
67
        return datetime(MAXYEAR, 12, 31, 23, 59, 59, 999999)
68
69
70
def build_array_format_py2(prefix, length, fmtchar):
71
    return prefix + str(length) + fmtchar
72
73
74
def build_array_format_py3(prefix, length, fmtchar):
75
    return prefix + str(length) + chr(fmtchar)
76
77
78
if sys.version_info.major < 3:
79
    build_array_format = build_array_format_py2
80
else:
81
    build_array_format = build_array_format_py3
82
83
84
class _Primitive(object):
85
86
    def pack_array(self, array):
87
        if array is None:
88
            return b'\xff\xff\xff\xff'
89
        length = len(array)
90
        b = [self.pack(val) for val in array]
91
        b.insert(0, Primitives.Int32.pack(length))
92
93
    def unpack_array(self, data):
94
        length = Primitives.Int32.unpack(data)
95
        if length == -1:
96
            return None
97
        elif length == 0:
98
            return []
99
        else:
100
            return [self.unpack(data) for _ in range(length)]
101
102
103
class _DateTime(_Primitive):
104
105
    @staticmethod
106
    def pack(dt):
107
        epch = datetime_to_win_epoch(dt)
108
        return Primitives.Int64.pack(epch)
109
110
    @staticmethod
111
    def unpack(data):
112
        epch = Primitives.Int64.unpack(data)
113
        return win_epoch_to_datetime(epch)
114
115
116
class _String(_Primitive):
117
118
    @staticmethod
119
    def pack(string):
120
        if string is None:
121
            return Primitives.Int32.pack(-1)
122
        if isinstance(string, unicode):
123
            string = string.encode('utf-8')
124
        length = len(string)
125
        return Primitives.Int32.pack(length) + string
126
127
    @staticmethod
128
    def unpack(data):
129
        b = _Bytes.unpack(data)
130
        if sys.version_info.major < 3:
131
            return b
132
        else:
133
            if b is None:
134
                return b
135
            return b.decode("utf-8")
136
137
138
class _Bytes(_Primitive):
139
140
    @staticmethod
141
    def pack(data):
142
        return _String.pack(data)
143
144
    @staticmethod
145
    def unpack(data):
146
        length = Primitives.Int32.unpack(data)
147
        if length == -1:
148
            return None
149
        return data.read(length)
150
151
152
class _Null(_Primitive):
153
154
    @staticmethod
155
    def pack(data):
156
        return b""
157
158
    @staticmethod
159
    def unpack(data):
160
        return None
161
162
163
class _Primitive1(_Primitive):
164
    def __init__(self, fmt):
165
        self.struct = struct.Struct(fmt)
166
        self.size = self.struct.size
167
        self.format = self.struct.format
168
169
    def pack(self, data):
170
        return struct.pack(self.format, data)
171
172
    def unpack(self, data):
173
        return struct.unpack(self.format, data.read(self.size))[0]
174
    
175
    #def pack_array(self, array):
176
        #"""
177
        #Basically the same as the method in _Primitive but MAYBE a bit more efficitent....
178
        #"""
179
        #if array is None:
180
            #return b'\xff\xff\xff\xff'
181
        #length = len(array)
182
        #if length == 0:
183
            #return b'\x00\x00\x00\x00'
184
        #if length == 1:
185
            #return b'\x01\x00\x00\x00' + self.pack(array[0])
186
        #return struct.pack(build_array_format("<i", length, self.format[1]), length, *array)
187
188
189
190
class Primitives1(object):
191
    Int8 = _Primitive1("<b")
192
    SByte = Int8
193
    Int16 = _Primitive1("<h")
194
    Int32 = _Primitive1("<i")
195
    Int64 = _Primitive1("<q")
196
    UInt8 = _Primitive1("<B")
197
    Char = UInt8
198
    Byte = UInt8
199
    UInt16 = _Primitive1("<H")
200
    UInt32 = _Primitive1("<I")
201
    UInt64 = _Primitive1("<Q")
202
    Boolean = _Primitive1("<?")
203
    Double = _Primitive1("<d")
204
    Float = _Primitive1("<f")
205
206
207
class Primitives(Primitives1):
208
    Null = _Null()
209
    String = _String()
210
    Bytes = _Bytes()
211
    ByteString = _Bytes()
212
    CharArray = _Bytes()
213
    DateTime = _DateTime()
214
215
216
def pack_uatype_array(vtype, array):
217
    if array is None:
218
        return b'\xff\xff\xff\xff'
219
    length = len(array)
220
    b = [pack_uatype(vtype, val) for val in array]
221
    b.insert(0, Primitives.Int32.pack(length))
222
    return b"".join(b)
223
224
225
def pack_uatype(vtype, value):
226
    if hasattr(Primitives, vtype.name):
227
        return getattr(Primitives, vtype.name).pack(value)
228
    elif vtype.value > 25:
229
        return Primitives.Bytes.pack(value)
230
    elif vtype.name == "ExtensionObject":
231
        # dependency loop: classes in uaprotocol_auto use Variant defined in this file,
232
        # but Variant can contain any object from uaprotocol_auto as ExtensionObject.
233
        # Using local import to avoid import loop
234
        from opcua.ua.uaprotocol_auto import extensionobject_to_binary
235
        return extensionobject_to_binary(value)
236
    else:
237
        try:
238
            return value.to_binary()
239
        except AttributeError:
240
            raise UaError("{} could not be packed with value {}".format(vtype, value))
241
242
243
def unpack_uatype(vtype, data):
244
    if hasattr(Primitives, vtype.name):
245
        st = getattr(Primitives, vtype.name)
246
        return st.unpack(data)
247
    elif vtype.value > 25:
248
        return Primitives.Bytes.unpack(data)
249
    elif vtype.name == "ExtensionObject":
250
        # dependency loop: classes in uaprotocol_auto use Variant defined in this file,
251
        # but Variant can contain any object from uaprotocol_auto as ExtensionObject.
252
        # Using local import to avoid import loop
253
        from opcua.ua.uaprotocol_auto import extensionobject_from_binary
254
        return extensionobject_from_binary(data)
255
    else:
256
        from opcua.ua import uatypes
257
        if hasattr(uatypes, vtype.name):
258
            klass = getattr(uatypes, vtype.name)
259
            return klass.from_binary(data)
260
        else:
261
            raise UaError("can not unpack unknown vtype %s" % vtype)
262
263
264
def unpack_uatype_array(vtype, data):
265
    if hasattr(Primitives, vtype.name):
266
        st = getattr(Primitives, vtype.name)
267
        return st.unpack_array(data)
268
    else:
269
        length = Primitives.Int32.unpack(data)
270
        if length == -1:
271
            return None
272
        else:
273
            return [unpack_uatype(vtype, data) for _ in range(length)]
274
275
276