Completed
Pull Request — master (#494)
by Olivier
03:37
created

_Guid.pack()   A

Complexity

Conditions 1

Size

Total Lines 14

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 1.729

Importance

Changes 0
Metric Value
cc 1
c 0
b 0
f 0
dl 0
loc 14
ccs 1
cts 10
cp 0.1
crap 1.729
rs 9.4285
1
"""
2
Binary protocol specific functions and constants
3
"""
4
5 1
import sys
6 1
import struct
7 1
import logging
8 1
from datetime import datetime, timedelta, tzinfo, MAXYEAR
9 1
from calendar import timegm
10 1
import uuid
11
12 1
from opcua.ua.uaerrors import UaError
13
14
15 1
if sys.version_info.major > 2:
16
    unicode = str
17
18 1
logger = logging.getLogger('__name__')
19
20 1
EPOCH_AS_FILETIME = 116444736000000000  # January 1, 1970 as MS file time
21 1
HUNDREDS_OF_NANOSECONDS = 10000000
22 1
FILETIME_EPOCH_AS_DATETIME = datetime(1601, 1, 1)
23
24
25 1
def test_bit(data, offset):
26
    mask = 1 << offset
27
    return data & mask
28
29
30 1
def set_bit(data, offset):
31
    mask = 1 << offset
32
    return data | mask
33
34
35 1
def unset_bit(data, offset):
36
    mask = 1 << offset
37
    return data & ~mask
38
39
40 1
class UTC(tzinfo):
41
    """
42
    UTC
43
    """
44
45 1
    def utcoffset(self, dt):
46
        return timedelta(0)
47
48 1
    def tzname(self, dt):
49
        return "UTC"
50
51 1
    def dst(self, dt):
52
        return timedelta(0)
53
54
55
# method copied from David Buxton <[email protected]> sample code
56 1
def datetime_to_win_epoch(dt):
57
    if (dt.tzinfo is None) or (dt.tzinfo.utcoffset(dt) is None):
58
        dt = dt.replace(tzinfo=UTC())
59
    ft = EPOCH_AS_FILETIME + (timegm(dt.timetuple()) * HUNDREDS_OF_NANOSECONDS)
60
    return ft + (dt.microsecond * 10)
61
62
63 1
def win_epoch_to_datetime(epch):
64
    try:
65
        return FILETIME_EPOCH_AS_DATETIME + timedelta(microseconds=epch // 10)
66
    except OverflowError:
67
        # FILETIMEs after 31 Dec 9999 can't be converted to datetime
68
        logger.warning("datetime overflow: %s", epch)
69
        return datetime(MAXYEAR, 12, 31, 23, 59, 59, 999999)
70
71
72 1
def build_array_format_py2(prefix, length, fmtchar):
73
    return prefix + str(length) + fmtchar
74
75
76 1
def build_array_format_py3(prefix, length, fmtchar):
77
    return prefix + str(length) + chr(fmtchar)
78
79
80 1
if sys.version_info.major < 3:
81 1
    build_array_format = build_array_format_py2
82
else:
83
    build_array_format = build_array_format_py3
84
85
86 1
class _Primitive(object):
87
88 1
    def pack_array(self, array):
89
        if array is None:
90
            return b'\xff\xff\xff\xff'
91
        length = len(array)
92
        b = [self.pack(val) for val in array]
93
        b.insert(0, Primitives.Int32.pack(length))
94
        return b"".join(b)
95
96 1
    def unpack_array(self, data):
97
        length = Primitives.Int32.unpack(data)
98
        if length == -1:
99
            return None
100
        elif length == 0:
101
            return []
102
        else:
103
            return [self.unpack(data) for _ in range(length)]
104
105
106 1
class _DateTime(_Primitive):
107
108 1
    @staticmethod
109
    def pack(dt):
110
        epch = datetime_to_win_epoch(dt)
111
        return Primitives.Int64.pack(epch)
112
113 1
    @staticmethod
114
    def unpack(data):
115
        epch = Primitives.Int64.unpack(data)
116
        return win_epoch_to_datetime(epch)
117
118
119 1
class _String(_Primitive):
120
121 1
    @staticmethod
122
    def pack(string):
123
        if string is None:
124
            return Primitives.Int32.pack(-1)
125
        if isinstance(string, unicode):
126
            string = string.encode('utf-8')
127
        length = len(string)
128
        return Primitives.Int32.pack(length) + string
129
130 1
    @staticmethod
131
    def unpack(data):
132
        b = _Bytes.unpack(data)
133
        if sys.version_info.major < 3:
134
            return b
135
        else:
136
            if b is None:
137
                return b
138
            return b.decode("utf-8")
139
140
141 1
class _Bytes(_Primitive):
142
143 1
    @staticmethod
144
    def pack(data):
145
        return _String.pack(data)
146
147 1
    @staticmethod
148
    def unpack(data):
149
        length = Primitives.Int32.unpack(data)
150
        if length == -1:
151
            return None
152
        return data.read(length)
153
154
155 1
class _Null(_Primitive):
156
157 1
    @staticmethod
158
    def pack(data):
159
        return b""
160
161 1
    @staticmethod
162
    def unpack(data):
163
        return None
164
165
166 1
class _Guid(_Primitive):
167
168 1
    @staticmethod
169
    def pack(guid):
170
        # convert python UUID 6 field format to OPC UA 4 field format
171
        f1 = Primitives.UInt32.pack(guid.time_low)
172
        f2 = Primitives.UInt16.pack(guid.time_mid)
173
        f3 = Primitives.UInt16.pack(guid.time_hi_version)
174
        f4a = Primitives.Byte.pack(guid.clock_seq_hi_variant)
175
        f4b = Primitives.Byte.pack(guid.clock_seq_low)
176
        f4c = struct.pack('>Q', guid.node)[2:8]  # no primitive .pack available for 6 byte int
177
        f4 = f4a+f4b+f4c
178
        # concat byte fields
179
        b = f1+f2+f3+f4
180
181
        return b
182
183 1
    @staticmethod
184
    def unpack(data):
185
        # convert OPC UA 4 field format to python UUID bytes
186
        f1 = struct.pack('>I', Primitives.UInt32.unpack(data))
187
        f2 = struct.pack('>H', Primitives.UInt16.unpack(data))
188
        f3 = struct.pack('>H', Primitives.UInt16.unpack(data))
189
        f4 = data.read(8)
190
        # concat byte fields
191
        b = f1 + f2 + f3 + f4
192
193
        return uuid.UUID(bytes=b)
194
195
196 1
class _Primitive1(_Primitive):
197 1
    def __init__(self, fmt):
198 1
        self.struct = struct.Struct(fmt)
199 1
        self.size = self.struct.size
200 1
        self.format = self.struct.format
201
202 1
    def pack(self, data):
203
        return struct.pack(self.format, data)
204
205 1
    def unpack(self, data):
206
        return struct.unpack(self.format, data.read(self.size))[0]
207
    
208
    #def pack_array(self, array):
209
        #"""
210
        #Basically the same as the method in _Primitive but MAYBE a bit more efficient....
211
        #"""
212
        #if array is None:
213
            #return b'\xff\xff\xff\xff'
214
        #length = len(array)
215
        #if length == 0:
216
            #return b'\x00\x00\x00\x00'
217
        #if length == 1:
218
            #return b'\x01\x00\x00\x00' + self.pack(array[0])
219
        #return struct.pack(build_array_format("<i", length, self.format[1]), length, *array)
220
221
222 1
class Primitives1(object):
223 1
    Int8 = _Primitive1("<b")
224 1
    SByte = Int8
225 1
    Int16 = _Primitive1("<h")
226 1
    Int32 = _Primitive1("<i")
227 1
    Int64 = _Primitive1("<q")
228 1
    UInt8 = _Primitive1("<B")
229 1
    Char = UInt8
230 1
    Byte = UInt8
231 1
    UInt16 = _Primitive1("<H")
232 1
    UInt32 = _Primitive1("<I")
233 1
    UInt64 = _Primitive1("<Q")
234 1
    Boolean = _Primitive1("<?")
235 1
    Double = _Primitive1("<d")
236 1
    Float = _Primitive1("<f")
237
238
239 1
class Primitives(Primitives1):
240 1
    Null = _Null()
241 1
    String = _String()
242 1
    Bytes = _Bytes()
243 1
    ByteString = _Bytes()
244 1
    CharArray = _Bytes()
245 1
    DateTime = _DateTime()
246 1
    Guid = _Guid()
247
248
249 1
def pack_uatype_array(vtype, array):
250
    if array is None:
251
        return b'\xff\xff\xff\xff'
252
    length = len(array)
253
    b = [pack_uatype(vtype, val) for val in array]
254
    b.insert(0, Primitives.Int32.pack(length))
255
    return b"".join(b)
256
257
258 1
def pack_uatype(vtype, value):
259
    if hasattr(Primitives, vtype.name):
260
        return getattr(Primitives, vtype.name).pack(value)
261
    elif vtype.value > 25:
262
        return Primitives.Bytes.pack(value)
263
    elif vtype.name == "ExtensionObject":
264
        # dependency loop: classes in uaprotocol_auto use Variant defined in this file,
265
        # but Variant can contain any object from uaprotocol_auto as ExtensionObject.
266
        # Using local import to avoid import loop
267
        from opcua.ua import extensionobject_to_binary
268
        return extensionobject_to_binary(value)
269
    else:
270
        try:
271
            return value.to_binary()
272
        except AttributeError:
273
            raise UaError("{0} could not be packed with value {1}".format(vtype, value))
274
275
276 1
def unpack_uatype(vtype, data):
277
    if hasattr(Primitives, vtype.name):
278
        st = getattr(Primitives, vtype.name)
279
        return st.unpack(data)
280
    elif vtype.value > 25:
281
        return Primitives.Bytes.unpack(data)
282
    elif vtype.name == "ExtensionObject":
283
        # dependency loop: classes in uaprotocol_auto use Variant defined in this file,
284
        # but Variant can contain any object from uaprotocol_auto as ExtensionObject.
285
        # Using local import to avoid import loop
286
        from opcua.ua import extensionobject_from_binary
287
        return extensionobject_from_binary(data)
288
    else:
289
        from opcua.ua import uatypes
290
        if hasattr(uatypes, vtype.name):
291
            klass = getattr(uatypes, vtype.name)
292
            return klass.from_binary(data)
293
        else:
294
            raise UaError("can not unpack unknown vtype {0!s}".format(vtype))
295
296
297 1
def unpack_uatype_array(vtype, data):
298
    if hasattr(Primitives, vtype.name):
299
        st = getattr(Primitives, vtype.name)
300
        return st.unpack_array(data)
301
    else:
302
        length = Primitives.Int32.unpack(data)
303
        if length == -1:
304
            return None
305
        else:
306
            return [unpack_uatype(vtype, data) for _ in range(length)]
307
308
309