Completed
Pull Request — master (#406)
by Denis
03:42
created

_String   A

Complexity

Total Complexity 6

Size/Duplication

Total Lines 20
Duplicated Lines 0 %

Test Coverage

Coverage 73.32%

Importance

Changes 1
Bugs 0 Features 0
Metric Value
dl 0
loc 20
ccs 11
cts 15
cp 0.7332
rs 10
c 1
b 0
f 0
wmc 6

2 Methods

Rating   Name   Duplication   Size   Complexity  
A unpack() 0 9 3
A pack() 0 8 3
1
"""
2
Binary protocol specific functions and constants
3
"""
4
5 1
import sys
6 1
import struct
7 1
import logging
8 1
from datetime import datetime, timedelta, tzinfo, MAXYEAR
9 1
from calendar import timegm
10 1
import uuid
11
12 1
from opcua.ua.uaerrors import UaError
13
14
15 1
if sys.version_info.major > 2:
16
    unicode = str
17
18 1
logger = logging.getLogger('__name__')
19
20 1
EPOCH_AS_FILETIME = 116444736000000000  # January 1, 1970 as MS file time
21 1
HUNDREDS_OF_NANOSECONDS = 10000000
22 1
FILETIME_EPOCH_AS_DATETIME = datetime(1601, 1, 1)
23
24
25 1
def test_bit(data, offset):
26 1
    mask = 1 << offset
27 1
    return data & mask
28
29
30 1
def set_bit(data, offset):
31 1
    mask = 1 << offset
32 1
    return data | mask
33
34
35 1
def unset_bit(data, offset):
36 1
    mask = 1 << offset
37 1
    return data & ~mask
38
39
40 1
class UTC(tzinfo):
41
    """
42
    UTC
43
    """
44
45 1
    def utcoffset(self, dt):
46
        return timedelta(0)
47
48 1
    def tzname(self, dt):
49
        return "UTC"
50
51 1
    def dst(self, dt):
52 1
        return timedelta(0)
53
54
55
# method copied from David Buxton <[email protected]> sample code
56 1
def datetime_to_win_epoch(dt):
57 1
    if (dt.tzinfo is None) or (dt.tzinfo.utcoffset(dt) is None):
58 1
        dt = dt.replace(tzinfo=UTC())
59 1
    ft = EPOCH_AS_FILETIME + (timegm(dt.timetuple()) * HUNDREDS_OF_NANOSECONDS)
60 1
    return ft + (dt.microsecond * 10)
61
62
63 1
def win_epoch_to_datetime(epch):
64 1
    try:
65 1
        return FILETIME_EPOCH_AS_DATETIME + timedelta(microseconds=epch // 10)
66
    except OverflowError:
67
        # FILETIMEs after 31 Dec 9999 can't be converted to datetime
68
        logger.warning("datetime overflow: %s", epch)
69
        return datetime(MAXYEAR, 12, 31, 23, 59, 59, 999999)
70
71
72 1
def build_array_format_py2(prefix, length, fmtchar):
73
    return prefix + str(length) + fmtchar
74
75
76 1
def build_array_format_py3(prefix, length, fmtchar):
77
    return prefix + str(length) + chr(fmtchar)
78
79
80 1
if sys.version_info.major < 3:
81 1
    build_array_format = build_array_format_py2
82
else:
83
    build_array_format = build_array_format_py3
84
85
86 1
class _Primitive(object):
87
88 1
    def pack_array(self, array):
89
        if array is None:
90
            return b'\xff\xff\xff\xff'
91
        length = len(array)
92
        b = [self.pack(val) for val in array]
93
        b.insert(0, Primitives.Int32.pack(length))
94
95 1
    def unpack_array(self, data):
96 1
        length = Primitives.Int32.unpack(data)
97 1
        if length == -1:
98 1
            return None
99 1
        elif length == 0:
100 1
            return []
101
        else:
102 1
            return [self.unpack(data) for _ in range(length)]
103
104
105 1
class _DateTime(_Primitive):
106
107 1
    @staticmethod
108
    def pack(dt):
109 1
        epch = datetime_to_win_epoch(dt)
110 1
        return Primitives.Int64.pack(epch)
111
112 1
    @staticmethod
113
    def unpack(data):
114 1
        epch = Primitives.Int64.unpack(data)
115 1
        return win_epoch_to_datetime(epch)
116
117
118 1
class _String(_Primitive):
119
120 1
    @staticmethod
121
    def pack(string):
122 1
        if string is None:
123 1
            return Primitives.Int32.pack(-1)
124 1
        if isinstance(string, unicode):
125
            string = string.encode('utf-8')
126 1
        length = len(string)
127 1
        return Primitives.Int32.pack(length) + string
128
129 1
    @staticmethod
130
    def unpack(data):
131 1
        b = _Bytes.unpack(data)
132 1
        if sys.version_info.major < 3:
133 1
            return b
134
        else:
135
            if b is None:
136
                return b
137
            return b.decode("utf-8")
138
139
140 1
class _Bytes(_Primitive):
141
142 1
    @staticmethod
143
    def pack(data):
144 1
        return _String.pack(data)
145
146 1
    @staticmethod
147
    def unpack(data):
148 1
        length = Primitives.Int32.unpack(data)
149 1
        if length == -1:
150 1
            return None
151 1
        return data.read(length)
152
153
154 1
class _Null(_Primitive):
155
156 1
    @staticmethod
157
    def pack(data):
158 1
        return b""
159
160 1
    @staticmethod
161
    def unpack(data):
162 1
        return None
163
164
165 1
class _Guid(_Primitive):
166
167 1
    @staticmethod
168
    def pack(guid):
169
        # convert python UUID 6 field format to OPC UA 4 field format
170 1
        f1 = Primitives.UInt32.pack(guid.time_low)
171 1
        f2 = Primitives.UInt16.pack(guid.time_mid)
172 1
        f3 = Primitives.UInt16.pack(guid.time_hi_version)
173 1
        f4a = Primitives.Byte.pack(guid.clock_seq_hi_variant)
174 1
        f4b = Primitives.Byte.pack(guid.clock_seq_low)
175 1
        f4c = struct.pack('>Q', guid.node)[2:8]  # no primitive .pack available for 6 byte int
176 1
        f4 = f4a+f4b+f4c
177
        # concat byte fields
178 1
        b = f1+f2+f3+f4
179
180 1
        return b
181
182 1
    @staticmethod
183
    def unpack(data):
184
        # convert OPC UA 4 field format to python UUID bytes
185 1
        f1 = struct.pack('>I', Primitives.UInt32.unpack(data))
186 1
        f2 = struct.pack('>H', Primitives.UInt16.unpack(data))
187 1
        f3 = struct.pack('>H', Primitives.UInt16.unpack(data))
188 1
        f4 = data.read(8)
189
        # concat byte fields
190 1
        b = f1 + f2 + f3 + f4
191
192 1
        return uuid.UUID(bytes=b)
193
194
195 1
class _Primitive1(_Primitive):
196 1
    def __init__(self, fmt):
197 1
        self.struct = struct.Struct(fmt)
198 1
        self.size = self.struct.size
199 1
        self.format = self.struct.format
200
201 1
    def pack(self, data):
202 1
        return struct.pack(self.format, data)
203
204 1
    def unpack(self, data):
205 1
        return struct.unpack(self.format, data.read(self.size))[0]
206
    
207
    #def pack_array(self, array):
208
        #"""
209
        #Basically the same as the method in _Primitive but MAYBE a bit more efficient....
210
        #"""
211
        #if array is None:
212
            #return b'\xff\xff\xff\xff'
213
        #length = len(array)
214
        #if length == 0:
215
            #return b'\x00\x00\x00\x00'
216
        #if length == 1:
217
            #return b'\x01\x00\x00\x00' + self.pack(array[0])
218
        #return struct.pack(build_array_format("<i", length, self.format[1]), length, *array)
219
220
221 1
class Primitives1(object):
222 1
    Int8 = _Primitive1("<b")
223 1
    SByte = Int8
224 1
    Int16 = _Primitive1("<h")
225 1
    Int32 = _Primitive1("<i")
226 1
    Int64 = _Primitive1("<q")
227 1
    UInt8 = _Primitive1("<B")
228 1
    Char = UInt8
229 1
    Byte = UInt8
230 1
    UInt16 = _Primitive1("<H")
231 1
    UInt32 = _Primitive1("<I")
232 1
    UInt64 = _Primitive1("<Q")
233 1
    Boolean = _Primitive1("<?")
234 1
    Double = _Primitive1("<d")
235 1
    Float = _Primitive1("<f")
236
237
238 1
class Primitives(Primitives1):
239 1
    Null = _Null()
240 1
    String = _String()
241 1
    Bytes = _Bytes()
242 1
    ByteString = _Bytes()
243 1
    CharArray = _Bytes()
244 1
    DateTime = _DateTime()
245 1
    Guid = _Guid()
246
247
248 1
def pack_uatype_array(vtype, array):
249 1
    if array is None:
250 1
        return b'\xff\xff\xff\xff'
251 1
    length = len(array)
252 1
    b = [pack_uatype(vtype, val) for val in array]
253 1
    b.insert(0, Primitives.Int32.pack(length))
254 1
    return b"".join(b)
255
256
257 1
def pack_uatype(vtype, value):
258 1
    if hasattr(Primitives, vtype.name):
259 1
        return getattr(Primitives, vtype.name).pack(value)
260 1
    elif vtype.value > 25:
261 1
        return Primitives.Bytes.pack(value)
262 1
    elif vtype.name == "ExtensionObject":
263
        # dependency loop: classes in uaprotocol_auto use Variant defined in this file,
264
        # but Variant can contain any object from uaprotocol_auto as ExtensionObject.
265
        # Using local import to avoid import loop
266 1
        from opcua.ua.uaprotocol_auto import extensionobject_to_binary
267 1
        return extensionobject_to_binary(value)
268
    else:
269 1
        try:
270 1
            return value.to_binary()
271
        except AttributeError:
272
            raise UaError("{0} could not be packed with value {1}".format(vtype, value))
273
274
275 1
def unpack_uatype(vtype, data):
276 1
    if hasattr(Primitives, vtype.name):
277 1
        st = getattr(Primitives, vtype.name)
278 1
        return st.unpack(data)
279 1
    elif vtype.value > 25:
280 1
        return Primitives.Bytes.unpack(data)
281 1
    elif vtype.name == "ExtensionObject":
282
        # dependency loop: classes in uaprotocol_auto use Variant defined in this file,
283
        # but Variant can contain any object from uaprotocol_auto as ExtensionObject.
284
        # Using local import to avoid import loop
285 1
        from opcua.ua.uaprotocol_auto import extensionobject_from_binary
286 1
        return extensionobject_from_binary(data)
287
    else:
288 1
        from opcua.ua import uatypes
289 1
        if hasattr(uatypes, vtype.name):
290 1
            klass = getattr(uatypes, vtype.name)
291 1
            return klass.from_binary(data)
292
        else:
293
            raise UaError("can not unpack unknown vtype {0!s}".format(vtype))
294
295
296 1
def unpack_uatype_array(vtype, data):
297 1
    if hasattr(Primitives, vtype.name):
298 1
        st = getattr(Primitives, vtype.name)
299 1
        return st.unpack_array(data)
300
    else:
301 1
        length = Primitives.Int32.unpack(data)
302 1
        if length == -1:
303
            return None
304
        else:
305 1
            return [unpack_uatype(vtype, data) for _ in range(length)]
306
307
308