Completed
Push — master ( 5318bd...777e81 )
by Olivier
04:55
created

_Guid.pack()   A

Complexity

Conditions 1

Size

Total Lines 14

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
c 0
b 0
f 0
dl 0
loc 14
rs 9.4285
1
"""
2
Binary protocol specific functions and constants
3
"""
4
5
import sys
6
import struct
7
import logging
8
from datetime import datetime, timedelta, tzinfo, MAXYEAR
9
from calendar import timegm
10
import uuid
11
12
from opcua.ua.uaerrors import UaError
13
14
15
if sys.version_info.major > 2:
16
    unicode = str
17
18
logger = logging.getLogger('__name__')
19
20
EPOCH_AS_FILETIME = 116444736000000000  # January 1, 1970 as MS file time
21
HUNDREDS_OF_NANOSECONDS = 10000000
22
FILETIME_EPOCH_AS_DATETIME = datetime(1601, 1, 1)
23
24
25
def test_bit(data, offset):
26
    mask = 1 << offset
27
    return data & mask
28
29
30
def set_bit(data, offset):
31
    mask = 1 << offset
32
    return data | mask
33
34
35
def unset_bit(data, offset):
36
    mask = 1 << offset
37
    return data & ~mask
38
39
40
class UTC(tzinfo):
41
    """
42
    UTC
43
    """
44
45
    def utcoffset(self, dt):
46
        return timedelta(0)
47
48
    def tzname(self, dt):
49
        return "UTC"
50
51
    def dst(self, dt):
52
        return timedelta(0)
53
54
55
# method copied from David Buxton <[email protected]> sample code
56
def datetime_to_win_epoch(dt):
57
    if (dt.tzinfo is None) or (dt.tzinfo.utcoffset(dt) is None):
58
        dt = dt.replace(tzinfo=UTC())
59
    ft = EPOCH_AS_FILETIME + (timegm(dt.timetuple()) * HUNDREDS_OF_NANOSECONDS)
60
    return ft + (dt.microsecond * 10)
61
62
63
def win_epoch_to_datetime(epch):
64
    try:
65
        return FILETIME_EPOCH_AS_DATETIME + timedelta(microseconds=epch // 10)
66
    except OverflowError:
67
        # FILETIMEs after 31 Dec 9999 can't be converted to datetime
68
        logger.warning("datetime overflow: %s", epch)
69
        return datetime(MAXYEAR, 12, 31, 23, 59, 59, 999999)
70
71
72
def build_array_format_py2(prefix, length, fmtchar):
73
    return prefix + str(length) + fmtchar
74
75
76
def build_array_format_py3(prefix, length, fmtchar):
77
    return prefix + str(length) + chr(fmtchar)
78
79
80
if sys.version_info.major < 3:
81
    build_array_format = build_array_format_py2
82
else:
83
    build_array_format = build_array_format_py3
84
85
86
class _Primitive(object):
87
88
    def pack_array(self, array):
89
        if array is None:
90
            return b'\xff\xff\xff\xff'
91
        length = len(array)
92
        b = [self.pack(val) for val in array]
93
        b.insert(0, Primitives.Int32.pack(length))
94
95
    def unpack_array(self, data):
96
        length = Primitives.Int32.unpack(data)
97
        if length == -1:
98
            return None
99
        elif length == 0:
100
            return []
101
        else:
102
            return [self.unpack(data) for _ in range(length)]
103
104
105
class _DateTime(_Primitive):
106
107
    @staticmethod
108
    def pack(dt):
109
        epch = datetime_to_win_epoch(dt)
110
        return Primitives.Int64.pack(epch)
111
112
    @staticmethod
113
    def unpack(data):
114
        epch = Primitives.Int64.unpack(data)
115
        return win_epoch_to_datetime(epch)
116
117
118
class _String(_Primitive):
119
120
    @staticmethod
121
    def pack(string):
122
        if string is None:
123
            return Primitives.Int32.pack(-1)
124
        if isinstance(string, unicode):
125
            string = string.encode('utf-8')
126
        length = len(string)
127
        return Primitives.Int32.pack(length) + string
128
129
    @staticmethod
130
    def unpack(data):
131
        b = _Bytes.unpack(data)
132
        if sys.version_info.major < 3:
133
            return b
134
        else:
135
            if b is None:
136
                return b
137
            return b.decode("utf-8")
138
139
140
class _Bytes(_Primitive):
141
142
    @staticmethod
143
    def pack(data):
144
        return _String.pack(data)
145
146
    @staticmethod
147
    def unpack(data):
148
        length = Primitives.Int32.unpack(data)
149
        if length == -1:
150
            return None
151
        return data.read(length)
152
153
154
class _Null(_Primitive):
155
156
    @staticmethod
157
    def pack(data):
158
        return b""
159
160
    @staticmethod
161
    def unpack(data):
162
        return None
163
164
165
class _Guid(_Primitive):
166
167
    @staticmethod
168
    def pack(guid):
169
        # convert python UUID 6 field format to OPC UA 4 field format
170
        f1 = Primitives.UInt32.pack(guid.time_low)
171
        f2 = Primitives.UInt16.pack(guid.time_mid)
172
        f3 = Primitives.UInt16.pack(guid.time_hi_version)
173
        f4a = Primitives.Byte.pack(guid.clock_seq_hi_variant)
174
        f4b = Primitives.Byte.pack(guid.clock_seq_low)
175
        f4c = struct.pack('>Q', guid.node)[2:8]  # no primitive .pack available for 6 byte int
176
        f4 = f4a+f4b+f4c
177
        # concat byte fields
178
        b = f1+f2+f3+f4
179
180
        return b
181
182
    @staticmethod
183
    def unpack(data):
184
        # convert OPC UA 4 field format to python UUID bytes
185
        f1 = struct.pack('>I', Primitives.UInt32.unpack(data))
186
        f2 = struct.pack('>H', Primitives.UInt16.unpack(data))
187
        f3 = struct.pack('>H', Primitives.UInt16.unpack(data))
188
        f4 = data.read(8)
189
        # concat byte fields
190
        b = f1 + f2 + f3 + f4
191
192
        return uuid.UUID(bytes=b)
193
194
195
class _Primitive1(_Primitive):
196
    def __init__(self, fmt):
197
        self.struct = struct.Struct(fmt)
198
        self.size = self.struct.size
199
        self.format = self.struct.format
200
201
    def pack(self, data):
202
        return struct.pack(self.format, data)
203
204
    def unpack(self, data):
205
        return struct.unpack(self.format, data.read(self.size))[0]
206
    
207
    #def pack_array(self, array):
208
        #"""
209
        #Basically the same as the method in _Primitive but MAYBE a bit more efficient....
210
        #"""
211
        #if array is None:
212
            #return b'\xff\xff\xff\xff'
213
        #length = len(array)
214
        #if length == 0:
215
            #return b'\x00\x00\x00\x00'
216
        #if length == 1:
217
            #return b'\x01\x00\x00\x00' + self.pack(array[0])
218
        #return struct.pack(build_array_format("<i", length, self.format[1]), length, *array)
219
220
221
class Primitives1(object):
222
    Int8 = _Primitive1("<b")
223
    SByte = Int8
224
    Int16 = _Primitive1("<h")
225
    Int32 = _Primitive1("<i")
226
    Int64 = _Primitive1("<q")
227
    UInt8 = _Primitive1("<B")
228
    Char = UInt8
229
    Byte = UInt8
230
    UInt16 = _Primitive1("<H")
231
    UInt32 = _Primitive1("<I")
232
    UInt64 = _Primitive1("<Q")
233
    Boolean = _Primitive1("<?")
234
    Double = _Primitive1("<d")
235
    Float = _Primitive1("<f")
236
237
238
class Primitives(Primitives1):
239
    Null = _Null()
240
    String = _String()
241
    Bytes = _Bytes()
242
    ByteString = _Bytes()
243
    CharArray = _Bytes()
244
    DateTime = _DateTime()
245
    Guid = _Guid()
246
247
248
def pack_uatype_array(vtype, array):
249
    if array is None:
250
        return b'\xff\xff\xff\xff'
251
    length = len(array)
252
    b = [pack_uatype(vtype, val) for val in array]
253
    b.insert(0, Primitives.Int32.pack(length))
254
    return b"".join(b)
255
256
257
def pack_uatype(vtype, value):
258
    if hasattr(Primitives, vtype.name):
259
        return getattr(Primitives, vtype.name).pack(value)
260
    elif vtype.value > 25:
261
        return Primitives.Bytes.pack(value)
262
    elif vtype.name == "ExtensionObject":
263
        # dependency loop: classes in uaprotocol_auto use Variant defined in this file,
264
        # but Variant can contain any object from uaprotocol_auto as ExtensionObject.
265
        # Using local import to avoid import loop
266
        from opcua.ua.uaprotocol_auto import extensionobject_to_binary
267
        return extensionobject_to_binary(value)
268
    else:
269
        try:
270
            return value.to_binary()
271
        except AttributeError:
272
            raise UaError("{} could not be packed with value {}".format(vtype, value))
273
274
275
def unpack_uatype(vtype, data):
276
    if hasattr(Primitives, vtype.name):
277
        st = getattr(Primitives, vtype.name)
278
        return st.unpack(data)
279
    elif vtype.value > 25:
280
        return Primitives.Bytes.unpack(data)
281
    elif vtype.name == "ExtensionObject":
282
        # dependency loop: classes in uaprotocol_auto use Variant defined in this file,
283
        # but Variant can contain any object from uaprotocol_auto as ExtensionObject.
284
        # Using local import to avoid import loop
285
        from opcua.ua.uaprotocol_auto import extensionobject_from_binary
286
        return extensionobject_from_binary(data)
287
    else:
288
        from opcua.ua import uatypes
289
        if hasattr(uatypes, vtype.name):
290
            klass = getattr(uatypes, vtype.name)
291
            return klass.from_binary(data)
292
        else:
293
            raise UaError("can not unpack unknown vtype %s" % vtype)
294
295
296
def unpack_uatype_array(vtype, data):
297
    if hasattr(Primitives, vtype.name):
298
        st = getattr(Primitives, vtype.name)
299
        return st.unpack_array(data)
300
    else:
301
        length = Primitives.Int32.unpack(data)
302
        if length == -1:
303
            return None
304
        else:
305
            return [unpack_uatype(vtype, data) for _ in range(length)]
306
307
308