Completed
Pull Request — master (#388)
by Olivier
05:04
created

_Primitive   A

Complexity

Total Complexity 7

Size/Duplication

Total Lines 18
Duplicated Lines 0 %

Test Coverage

Coverage 93.33%

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 18
ccs 14
cts 15
cp 0.9333
rs 10
wmc 7

2 Methods

Rating   Name   Duplication   Size   Complexity  
A pack_array() 0 7 3
A unpack_array() 0 8 4
1
"""
2
Binary protocol specific functions and constants
3
"""
4
5 1
import sys
6 1
import struct
7 1
import logging
8 1
from datetime import datetime, timedelta, tzinfo, MAXYEAR
9 1
from calendar import timegm
10 1
import uuid
11
12 1
from opcua.ua.uaerrors import UaError
13
14
15 1
if sys.version_info.major > 2:
16 1
    unicode = str
17
18 1
logger = logging.getLogger('__name__')
19
20 1
EPOCH_AS_FILETIME = 116444736000000000  # January 1, 1970 as MS file time
21 1
HUNDREDS_OF_NANOSECONDS = 10000000
22 1
FILETIME_EPOCH_AS_DATETIME = datetime(1601, 1, 1)
23
24
25 1
def test_bit(data, offset):
26 1
    mask = 1 << offset
27 1
    return data & mask
28
29
30 1
def set_bit(data, offset):
31 1
    mask = 1 << offset
32 1
    return data | mask
33
34
35 1
def unset_bit(data, offset):
36 1
    mask = 1 << offset
37 1
    return data & ~mask
38
39
40 1
class UTC(tzinfo):
41
    """
42
    UTC
43
    """
44
45 1
    def utcoffset(self, dt):
46
        return timedelta(0)
47
48 1
    def tzname(self, dt):
49
        return "UTC"
50
51 1
    def dst(self, dt):
52 1
        return timedelta(0)
53
54
55
# method copied from David Buxton <[email protected]> sample code
56 1
def datetime_to_win_epoch(dt):
57 1
    if (dt.tzinfo is None) or (dt.tzinfo.utcoffset(dt) is None):
58 1
        dt = dt.replace(tzinfo=UTC())
59 1
    ft = EPOCH_AS_FILETIME + (timegm(dt.timetuple()) * HUNDREDS_OF_NANOSECONDS)
60 1
    return ft + (dt.microsecond * 10)
61
62
63 1
def win_epoch_to_datetime(epch):
64 1
    try:
65 1
        return FILETIME_EPOCH_AS_DATETIME + timedelta(microseconds=epch // 10)
66
    except OverflowError:
67
        # FILETIMEs after 31 Dec 9999 can't be converted to datetime
68
        logger.warning("datetime overflow: %s", epch)
69
        return datetime(MAXYEAR, 12, 31, 23, 59, 59, 999999)
70
71
72 1
def build_array_format_py2(prefix, length, fmtchar):
73
    return prefix + str(length) + fmtchar
74
75
76 1
def build_array_format_py3(prefix, length, fmtchar):
77
    return prefix + str(length) + chr(fmtchar)
78
79
80 1
if sys.version_info.major < 3:
81
    build_array_format = build_array_format_py2
82
else:
83 1
    build_array_format = build_array_format_py3
84
85
86 1
class _Primitive(object):
87
88 1
    def pack_array(self, array):
89 1
        if array is None:
90
            return b'\xff\xff\xff\xff'
91 1
        length = len(array)
92 1
        b = [self.pack(val) for val in array]
93 1
        b.insert(0, Primitives.Int32.pack(length))
94 1
        return b"".join(b)
95
96 1
    def unpack_array(self, data):
97 1
        length = Primitives.Int32.unpack(data)
98 1
        if length == -1:
99 1
            return None
100 1
        elif length == 0:
101 1
            return []
102
        else:
103 1
            return [self.unpack(data) for _ in range(length)]
104
105
106 1
class _DateTime(_Primitive):
107
108 1
    @staticmethod
109
    def pack(dt):
110 1
        epch = datetime_to_win_epoch(dt)
111 1
        return Primitives.Int64.pack(epch)
112
113 1
    @staticmethod
114
    def unpack(data):
115 1
        epch = Primitives.Int64.unpack(data)
116 1
        return win_epoch_to_datetime(epch)
117
118
119 1
class _String(_Primitive):
120
121 1
    @staticmethod
122
    def pack(string):
123 1
        if string is None:
124 1
            return Primitives.Int32.pack(-1)
125 1
        if isinstance(string, unicode):
126 1
            string = string.encode('utf-8')
127 1
        length = len(string)
128 1
        return Primitives.Int32.pack(length) + string
129
130 1
    @staticmethod
131
    def unpack(data):
132 1
        b = _Bytes.unpack(data)
133 1
        if sys.version_info.major < 3:
134
            return b
135
        else:
136 1
            if b is None:
137 1
                return b
138 1
            return b.decode("utf-8")
139
140
141 1
class _Bytes(_Primitive):
142
143 1
    @staticmethod
144
    def pack(data):
145 1
        return _String.pack(data)
146
147 1
    @staticmethod
148
    def unpack(data):
149 1
        length = Primitives.Int32.unpack(data)
150 1
        if length == -1:
151 1
            return None
152 1
        return data.read(length)
153
154
155 1
class _Null(_Primitive):
156
157 1
    @staticmethod
158
    def pack(data):
159 1
        return b""
160
161 1
    @staticmethod
162
    def unpack(data):
163 1
        return None
164
165
166 1
class _Guid(_Primitive):
167
168 1
    @staticmethod
169
    def pack(guid):
170
        # convert python UUID 6 field format to OPC UA 4 field format
171 1
        f1 = Primitives.UInt32.pack(guid.time_low)
172 1
        f2 = Primitives.UInt16.pack(guid.time_mid)
173 1
        f3 = Primitives.UInt16.pack(guid.time_hi_version)
174 1
        f4a = Primitives.Byte.pack(guid.clock_seq_hi_variant)
175 1
        f4b = Primitives.Byte.pack(guid.clock_seq_low)
176 1
        f4c = struct.pack('>Q', guid.node)[2:8]  # no primitive .pack available for 6 byte int
177 1
        f4 = f4a+f4b+f4c
178
        # concat byte fields
179 1
        b = f1+f2+f3+f4
180
181 1
        return b
182
183 1
    @staticmethod
184
    def unpack(data):
185
        # convert OPC UA 4 field format to python UUID bytes
186 1
        f1 = struct.pack('>I', Primitives.UInt32.unpack(data))
187 1
        f2 = struct.pack('>H', Primitives.UInt16.unpack(data))
188 1
        f3 = struct.pack('>H', Primitives.UInt16.unpack(data))
189 1
        f4 = data.read(8)
190
        # concat byte fields
191 1
        b = f1 + f2 + f3 + f4
192
193 1
        return uuid.UUID(bytes=b)
194
195
196 1
class _Primitive1(_Primitive):
197 1
    def __init__(self, fmt):
198 1
        self.struct = struct.Struct(fmt)
199 1
        self.size = self.struct.size
200 1
        self.format = self.struct.format
201
202 1
    def pack(self, data):
203 1
        return struct.pack(self.format, data)
204
205 1
    def unpack(self, data):
206 1
        return struct.unpack(self.format, data.read(self.size))[0]
207
    
208
    #def pack_array(self, array):
209
        #"""
210
        #Basically the same as the method in _Primitive but MAYBE a bit more efficient....
211
        #"""
212
        #if array is None:
213
            #return b'\xff\xff\xff\xff'
214
        #length = len(array)
215
        #if length == 0:
216
            #return b'\x00\x00\x00\x00'
217
        #if length == 1:
218
            #return b'\x01\x00\x00\x00' + self.pack(array[0])
219
        #return struct.pack(build_array_format("<i", length, self.format[1]), length, *array)
220
221
222 1
class Primitives1(object):
223 1
    Int8 = _Primitive1("<b")
224 1
    SByte = Int8
225 1
    Int16 = _Primitive1("<h")
226 1
    Int32 = _Primitive1("<i")
227 1
    Int64 = _Primitive1("<q")
228 1
    UInt8 = _Primitive1("<B")
229 1
    Char = UInt8
230 1
    Byte = UInt8
231 1
    UInt16 = _Primitive1("<H")
232 1
    UInt32 = _Primitive1("<I")
233 1
    UInt64 = _Primitive1("<Q")
234 1
    Boolean = _Primitive1("<?")
235 1
    Double = _Primitive1("<d")
236 1
    Float = _Primitive1("<f")
237
238
239 1
class Primitives(Primitives1):
240 1
    Null = _Null()
241 1
    String = _String()
242 1
    Bytes = _Bytes()
243 1
    ByteString = _Bytes()
244 1
    CharArray = _Bytes()
245 1
    DateTime = _DateTime()
246 1
    Guid = _Guid()
247
248
249 1
def pack_uatype_array(vtype, array):
250 1
    if array is None:
251 1
        return b'\xff\xff\xff\xff'
252 1
    length = len(array)
253 1
    b = [pack_uatype(vtype, val) for val in array]
254 1
    b.insert(0, Primitives.Int32.pack(length))
255 1
    return b"".join(b)
256
257
258 1
def pack_uatype(vtype, value):
259 1
    if hasattr(Primitives, vtype.name):
260 1
        return getattr(Primitives, vtype.name).pack(value)
261 1
    elif vtype.value > 25:
262 1
        return Primitives.Bytes.pack(value)
263 1
    elif vtype.name == "ExtensionObject":
264
        # dependency loop: classes in uaprotocol_auto use Variant defined in this file,
265
        # but Variant can contain any object from uaprotocol_auto as ExtensionObject.
266
        # Using local import to avoid import loop
267 1
        from opcua.ua import extensionobject_to_binary
268 1
        return extensionobject_to_binary(value)
269
    else:
270 1
        try:
271 1
            return value.to_binary()
272
        except AttributeError:
273
            raise UaError("{0} could not be packed with value {1}".format(vtype, value))
274
275
276 1
def unpack_uatype(vtype, data):
277 1
    if hasattr(Primitives, vtype.name):
278 1
        st = getattr(Primitives, vtype.name)
279 1
        return st.unpack(data)
280 1
    elif vtype.value > 25:
281 1
        return Primitives.Bytes.unpack(data)
282 1
    elif vtype.name == "ExtensionObject":
283
        # dependency loop: classes in uaprotocol_auto use Variant defined in this file,
284
        # but Variant can contain any object from uaprotocol_auto as ExtensionObject.
285
        # Using local import to avoid import loop
286 1
        from opcua.ua import extensionobject_from_binary
287 1
        return extensionobject_from_binary(data)
288
    else:
289 1
        from opcua.ua import uatypes
290 1
        if hasattr(uatypes, vtype.name):
291 1
            klass = getattr(uatypes, vtype.name)
292 1
            return klass.from_binary(data)
293
        else:
294
            raise UaError("can not unpack unknown vtype {0!s}".format(vtype))
295
296
297 1
def unpack_uatype_array(vtype, data):
298 1
    if hasattr(Primitives, vtype.name):
299 1
        st = getattr(Primitives, vtype.name)
300 1
        return st.unpack_array(data)
301
    else:
302 1
        length = Primitives.Int32.unpack(data)
303 1
        if length == -1:
304
            return None
305
        else:
306 1
            return [unpack_uatype(vtype, data) for _ in range(length)]
307
308
309