Completed
Pull Request — master (#122)
by Olivier
02:23
created

opcua.ua.Variant   A

Complexity

Total Complexity 32

Size/Duplication

Total Lines 103
Duplicated Lines 0 %

Test Coverage

Coverage 97.4%
Metric Value
dl 0
loc 103
ccs 75
cts 77
cp 0.974
rs 9.6
wmc 32

6 Methods

Rating   Name   Duplication   Size   Complexity  
A to_binary() 0 16 4
A __str__() 0 2 1
B __init__() 0 14 6
A __eq__() 0 4 4
F _guess_type() 0 27 12
B from_binary() 0 20 5
1
"""
2
implement ua datatypes
3
"""
4 1
import logging
5 1
from enum import Enum
6 1
from datetime import datetime, timedelta, tzinfo, MAXYEAR
7 1
from calendar import timegm
8 1
import sys
9 1
import os
10 1
import uuid
11 1
import struct
12 1
if sys.version_info.major > 2:
13 1
    unicode = str
14
15 1
from opcua.ua import status_codes
16 1
from opcua.common.uaerrors import UAError
17 1
from opcua.common.uaerrors import UAStatusCodeError
18
19
20 1
logger = logging.getLogger('opcua.uaprotocol')
21
22
23
# types that will packed and unpacked directly using struct (string, bytes and datetime are handles as special cases
24 1
UaTypes = ("Boolean", "SByte", "Byte", "Int8", "UInt8", "Int16", "UInt16", "Int32", "UInt32", "Int64", "UInt64", "Float", "Double")
25
26
27 1
EPOCH_AS_FILETIME = 116444736000000000  # January 1, 1970 as MS file time
28 1
HUNDREDS_OF_NANOSECONDS = 10000000
29 1
FILETIME_EPOCH_AS_DATETIME = datetime(1601, 1, 1)
30
31
32 1
class UTC(tzinfo):
33
34
    """UTC"""
35
36 1
    def utcoffset(self, dt):
37
        return timedelta(0)
38
39 1
    def tzname(self, dt):
40
        return "UTC"
41
42 1
    def dst(self, dt):
43 1
        return timedelta(0)
44
45
46
# method copied from David Buxton <[email protected]> sample code
47 1
def datetime_to_win_epoch(dt):
48 1
    if (dt.tzinfo is None) or (dt.tzinfo.utcoffset(dt) is None):
49 1
        dt = dt.replace(tzinfo=UTC())
50 1
    ft = EPOCH_AS_FILETIME + (timegm(dt.timetuple()) * HUNDREDS_OF_NANOSECONDS)
51 1
    return ft + (dt.microsecond * 10)
52
53
54 1
def win_epoch_to_datetime(epch):
55 1
    try:
56 1
        return FILETIME_EPOCH_AS_DATETIME + timedelta(microseconds=epch // 10)
57
    except OverflowError:
58
        # FILETIMEs after 31 Dec 9999 can't be converted to datetime
59
        logger.warning("datetime overflow: {}".format(epch))
60
        return datetime(MAXYEAR, 12, 31, 23, 59, 59, 999999)
61
62
63 1
def build_array_format_py2(prefix, length, fmtchar):
64
    return prefix + str(length) + fmtchar
65
66
67 1
def build_array_format_py3(prefix, length, fmtchar):
68 1
    return prefix + str(length) + chr(fmtchar)
69
70
71 1
if sys.version_info.major < 3:
72
    build_array_format = build_array_format_py2
73
else:
74 1
    build_array_format = build_array_format_py3
75
76
77 1
def pack_uatype_array_primitive(st, value, length):
78 1
    if length == 1:
79 1
        return b'\x01\x00\x00\x00' + st.pack(value[0])
80
    else:
81 1
        return struct.pack(build_array_format("<i", length, st.format[1]), length, *value)
82
83
84 1
def pack_uatype_array(uatype, value):
85 1
    if value is None:
86
        return b'\xff\xff\xff\xff'
87 1
    length = len(value)
88 1
    if length == 0:
89 1
        return b'\x00\x00\x00\x00'
90 1
    if uatype in uatype2struct:
91 1
        return pack_uatype_array_primitive(uatype2struct[uatype], value, length)
92 1
    b = []
93 1
    b.append(uatype_Int32.pack(length))
94 1
    for val in value:
95 1
        b.append(pack_uatype(uatype, val))
96 1
    return b"".join(b)
97
98
99 1
def pack_uatype(uatype, value):
100 1
    if uatype in uatype2struct:
101 1
        return uatype2struct[uatype].pack(value)
102 1
    elif uatype == "Null":
103 1
        return b''
104 1
    elif uatype == "String":
105 1
        return pack_string(value)
106 1
    elif uatype in ("CharArray", "ByteString", "Custom"):
107 1
        return pack_bytes(value)
108 1
    elif uatype == "DateTime":
109 1
        return pack_datetime(value)
110 1
    elif uatype == "ExtensionObject":
111
        # dependency loop: classes in uaprotocol_auto use Variant defined in this file,
112
        # but Variant can contain any object from uaprotocol_auto as ExtensionObject.
113
        # Using local import to avoid import loop
114 1
        from opcua.ua.uaprotocol_auto import extensionobject_to_binary
115 1
        return extensionobject_to_binary(value)
116
    else:
117 1
        return value.to_binary()
118
119 1
uatype_Int8 = struct.Struct("<b")
120 1
uatype_SByte = uatype_Int8
121 1
uatype_Int16 = struct.Struct("<h")
122 1
uatype_Int32 = struct.Struct("<i")
123 1
uatype_Int64 = struct.Struct("<q")
124 1
uatype_UInt8 = struct.Struct("<B")
125 1
uatype_Char = uatype_UInt8
126 1
uatype_Byte = uatype_UInt8
127 1
uatype_UInt16 = struct.Struct("<H")
128 1
uatype_UInt32 = struct.Struct("<I")
129 1
uatype_UInt64 = struct.Struct("<Q")
130 1
uatype_Boolean = struct.Struct("<?")
131 1
uatype_Double = struct.Struct("<d")
132 1
uatype_Float = struct.Struct("<f")
133
134 1
uatype2struct = {
135
    "Int8": uatype_Int8,
136
    "SByte": uatype_SByte,
137
    "Int16": uatype_Int16,
138
    "Int32": uatype_Int32,
139
    "Int64": uatype_Int64,
140
    "UInt8": uatype_UInt8,
141
    "Char": uatype_Char,
142
    "Byte": uatype_Byte,
143
    "UInt16": uatype_UInt16,
144
    "UInt32": uatype_UInt32,
145
    "UInt64": uatype_UInt64,
146
    "Boolean": uatype_Boolean,
147
    "Double": uatype_Double,
148
    "Float": uatype_Float,
149
}
150
151
152 1
def unpack_uatype(uatype, data):
153 1
    if uatype in uatype2struct:
154 1
        st = uatype2struct[uatype]
155 1
        return st.unpack(data.read(st.size))[0]
156 1
    elif uatype == "String":
157 1
        return unpack_string(data)
158 1
    elif uatype in ("CharArray", "ByteString", "Custom"):
159 1
        return unpack_bytes(data)
160 1
    elif uatype == "DateTime":
161 1
        return unpack_datetime(data)
162 1
    elif uatype == "ExtensionObject":
163
        # dependency loop: classes in uaprotocol_auto use Variant defined in this file,
164
        # but Variant can contain any object from uaprotocol_auto as ExtensionObject.
165
        # Using local import to avoid import loop
166 1
        from opcua.ua.uaprotocol_auto import extensionobject_from_binary
167 1
        return extensionobject_from_binary(data)
168
    else:
169 1
        glbs = globals()
170 1
        if uatype in glbs:
171 1
            klass = glbs[uatype]
172 1
            if hasattr(klass, 'from_binary'):
173 1
                return klass.from_binary(data)
174
        raise UAError("can not unpack unknown uatype %s" % uatype)
175
176
177 1
def unpack_uatype_array(uatype, data):
178 1
    length = uatype_Int32.unpack(data.read(4))[0]
179 1
    if length == -1:
180
        return None
181 1
    elif length == 0:
182 1
        return []
183 1
    elif uatype in uatype2struct:
184 1
        st = uatype2struct[uatype]
185 1
        if length == 1:
186 1
            return list(st.unpack(data.read(st.size)))
187
        else:
188 1
            arrst = struct.Struct(build_array_format("<", length, st.format[1]))
189 1
            return list(arrst.unpack(data.read(arrst.size)))
190
    else:
191 1
        result = []
192 1
        for _ in range(0, length):
193 1
            result.append(unpack_uatype(uatype, data))
194 1
        return result
195
196
197 1
def pack_datetime(value):
198 1
    epch = datetime_to_win_epoch(value)
199 1
    return uatype_Int64.pack(epch)
200
201
202 1
def unpack_datetime(data):
203 1
    epch = uatype_Int64.unpack(data.read(8))[0]
204 1
    return win_epoch_to_datetime(epch)
205
206
207 1
def pack_string(string):
208 1
    if isinstance(string, unicode):
209 1
        string = string.encode('utf-8')
210 1
    length = len(string)
211 1
    if length == 0:
212 1
        return b'\xff\xff\xff\xff'
213 1
    return uatype_Int32.pack(length) + string
214
215 1
pack_bytes = pack_string
216
217
218 1
def unpack_bytes(data):
219 1
    length = uatype_Int32.unpack(data.read(4))[0]
220 1
    if length == -1:
221 1
        return b''
222 1
    return data.read(length)
223
224
225 1
def py3_unpack_string(data):
226 1
    b = unpack_bytes(data)
227 1
    return b.decode("utf-8")
228
229
230 1
if sys.version_info.major < 3:
231
    unpack_string = unpack_bytes
232
else:
233 1
    unpack_string = py3_unpack_string
234
235
236 1
def test_bit(data, offset):
237 1
    mask = 1 << offset
238 1
    return data & mask
239
240
241 1
def set_bit(data, offset):
242 1
    mask = 1 << offset
243 1
    return data | mask
244
245
246 1
class _FrozenClass(object):
247
248
    """
249
    make it impossible to add members to a class.
250
    This is a hack since I found out that most bugs are due to misspelling a variable in protocol
251
    """
252 1
    _freeze = False
253
254 1
    def __setattr__(self, key, value):
255 1
        if self._freeze and not hasattr(self, key):
256
            raise TypeError("Error adding member '{}' to class '{}', class is frozen, members are {}".format(key, self.__class__.__name__, self.__dict__.keys()))
257 1
        object.__setattr__(self, key, value)
258
259 1
if "PYOPCUA_NO_TYPO_CHECK" in os.environ:
260
    # typo check is cpu consuming, but it will make debug easy.
261
    # if typo check is not need (in production), please set env PYOPCUA_NO_TYPO_CHECK.
262
    # this will make all uatype class inherit from object intead of _FrozenClass
263
    # and skip the typo check.
264
    FrozenClass = object
265
else:
266 1
    FrozenClass = _FrozenClass
267
268
269 1
class Guid(FrozenClass):
270
271 1
    def __init__(self):
272 1
        self.uuid = uuid.uuid4()
273 1
        self._freeze = True
274
275 1
    def to_binary(self):
276 1
        return self.uuid.bytes
277
278 1
    def __hash__(self):
279
        return hash(self.uuid.bytes)
280
281 1
    @staticmethod
282
    def from_binary(data):
283 1
        g = Guid()
284 1
        g.uuid = uuid.UUID(bytes=data.read(16))
285 1
        return g
286
287 1
    def __eq__(self, other):
288 1
        return isinstance(other, Guid) and self.uuid == other.uuid
289
290
291 1
class StatusCode(FrozenClass):
292
293
    """
294
    :ivar value:
295
    :vartype value: int
296
    :ivar name:
297
    :vartype name: string
298
    :ivar doc:
299
    :vartype doc: string
300
    """
301
302 1
    def __init__(self, value=0):
303 1
        self.value = value
304 1
        self.name, self.doc = status_codes.get_name_and_doc(value)
305 1
        self._freeze = True
306
307 1
    def to_binary(self):
308 1
        return uatype_UInt32.pack(self.value)
309
310 1
    @staticmethod
311
    def from_binary(data):
312 1
        val = uatype_UInt32.unpack(data.read(4))[0]
313 1
        sc = StatusCode(val)
314 1
        return sc
315
316 1
    def check(self):
317
        """
318
        raise en exception if status code is anything else than 0
319
        use is is_good() method if not exception is desired
320
        """
321 1
        if self.value != 0:
322 1
            raise UAStatusCodeError("{}({})".format(self.doc, self.name))
323
324 1
    def is_good(self):
325
        """
326
        return True if status is Good.
327
        """
328 1
        if self.value == 0:
329 1
            return True
330 1
        return False
331
332 1
    def __str__(self):
333
        return 'StatusCode({})'.format(self.name)
334 1
    __repr__ = __str__
335
336
337 1
class NodeIdType(Enum):
338 1
    TwoByte = 0
339 1
    FourByte = 1
340 1
    Numeric = 2
341 1
    String = 3
342 1
    Guid = 4
343 1
    ByteString = 5
344
345
346 1
class NodeId(FrozenClass):
347
348
    """
349
    NodeId Object
350
351
    Args:
352
        identifier: The identifier might be an int, a string, bytes or a Guid
353
        namespaceidx(int): The index of the namespace
354
        nodeidtype(NodeIdType): The type of the nodeid if it cannor be guess or you want something special like twobyte nodeid or fourbytenodeid
355
356
357
    :ivar Identifier:
358
    :vartype Identifier: NodeId
359
    :ivar NamespaceIndex:
360
    :vartype NamespaceIndex: Int
361
    :ivar NamespaceUri:
362
    :vartype NamespaceUri: String
363
    :ivar ServerIndex:
364
    :vartype ServerIndex: Int
365
    """
366
367 1
    def __init__(self, identifier=None, namespaceidx=0, nodeidtype=None):
368 1
        self.Identifier = identifier
369 1
        self.NamespaceIndex = namespaceidx
370 1
        self.NodeIdType = nodeidtype
371 1
        self.NamespaceUri = ""
372 1
        self.ServerIndex = 0
373 1
        self._freeze = True
374 1
        if self.Identifier is None:
375 1
            self.Identifier = 0
376 1
            self.NodeIdType = NodeIdType.TwoByte
377 1
            return
378 1
        if self.NodeIdType is None:
379 1
            if isinstance(self.Identifier, int):
380 1
                self.NodeIdType = NodeIdType.Numeric
381 1
            elif isinstance(self.Identifier, str):
382 1
                self.NodeIdType = NodeIdType.String
383
            elif isinstance(self.Identifier, bytes):
384
                self.NodeIdType = NodeIdType.ByteString
385
            else:
386
                raise UAError("NodeId: Could not guess type of NodeId, set NodeIdType")
387
388 1
    def __key(self):
389 1
        if self.NodeIdType in (NodeIdType.TwoByte, NodeIdType.FourByte, NodeIdType.Numeric):  # twobyte, fourbyte and numeric may represent the same node
390 1
            return self.NamespaceIndex, self.Identifier
391
        else:
392 1
            return self.NodeIdType, self.NamespaceIndex, self.Identifier
393
394 1
    def __eq__(self, node):
395 1
        return isinstance(node, NodeId) and self.__key() == node.__key()
396
397 1
    def __hash__(self):
398 1
        return hash(self.__key())
399
400 1
    @staticmethod
401
    def from_string(string):
402 1
        l = string.split(";")
403 1
        identifier = None
404 1
        namespace = 0
405 1
        ntype = None
406 1
        srv = None
407 1
        nsu = None
408 1
        for el in l:
409 1
            if not el:
410 1
                continue
411 1
            k, v = el.split("=")
412 1
            k = k.strip()
413 1
            v = v.strip()
414 1
            if k == "ns":
415 1
                namespace = int(v)
416 1
            elif k == "i":
417 1
                ntype = NodeIdType.Numeric
418 1
                identifier = int(v)
419 1
            elif k == "s":
420 1
                ntype = NodeIdType.String
421 1
                identifier = v
422 1
            elif k == "g":
423
                ntype = NodeIdType.Guid
424
                identifier = v
425 1
            elif k == "b":
426
                ntype = NodeIdType.ByteString
427
                identifier = v
428 1
            elif k == "srv":
429 1
                srv = v
430
            elif k == "nsu":
431
                nsu = v
432 1
        if identifier is None:
433
            raise UAError("Could not parse nodeid string: " + string)
434 1
        nodeid = NodeId(identifier, namespace, ntype)
435 1
        nodeid.NamespaceUri = nsu
436 1
        nodeid.ServerIndex = srv
437 1
        return nodeid
438
439 1
    def to_string(self):
440 1
        string = ""
441 1
        if self.NamespaceIndex != 0:
442 1
            string += "ns={};".format(self.NamespaceIndex)
443 1
        ntype = None
444 1
        if self.NodeIdType == NodeIdType.Numeric:
445
            ntype = "i"
446 1
        elif self.NodeIdType == NodeIdType.String:
447 1
            ntype = "s"
448 1
        elif self.NodeIdType == NodeIdType.TwoByte:
449 1
            ntype = "i"
450 1
        elif self.NodeIdType == NodeIdType.FourByte:
451 1
            ntype = "i"
452
        elif self.NodeIdType == NodeIdType.Guid:
453
            ntype = "g"
454
        elif self.NodeIdType == NodeIdType.ByteString:
455
            ntype = "b"
456 1
        string += "{}={}".format(ntype, self.Identifier)
457 1
        if self.ServerIndex:
458
            string = "srv=" + str(self.ServerIndex) + string
459 1
        if self.NamespaceUri:
460
            string += "nsu={}".format(self.NamespaceUri)
461 1
        return string
462
463 1
    def __str__(self):
464 1
        return "{}NodeId({})".format(self.NodeIdType.name, self.to_string())
465 1
    __repr__ = __str__
466
467 1
    def to_binary(self):
468 1
        if self.NodeIdType == NodeIdType.TwoByte:
469 1
            return struct.pack("<BB", self.NodeIdType.value, self.Identifier)
470 1
        elif self.NodeIdType == NodeIdType.FourByte:
471 1
            return struct.pack("<BBH", self.NodeIdType.value, self.NamespaceIndex, self.Identifier)
472 1
        elif self.NodeIdType == NodeIdType.Numeric:
473 1
            return struct.pack("<BHI", self.NodeIdType.value, self.NamespaceIndex, self.Identifier)
474 1
        elif self.NodeIdType == NodeIdType.String:
475 1
            return struct.pack("<BH", self.NodeIdType.value, self.NamespaceIndex) + \
476
                pack_string(self.Identifier)
477 1
        elif self.NodeIdType == NodeIdType.ByteString:
478 1
            return struct.pack("<BH", self.NodeIdType.value, self.NamespaceIndex) + \
479
                pack_bytes(self.Identifier)
480
        else:
481 1
            return struct.pack("<BH", self.NodeIdType.value, self.NamespaceIndex) + \
482
                self.Identifier.to_binary()
483
        #FIXME: Missing NNamespaceURI and ServerIndex
484
485 1
    @staticmethod
486
    def from_binary(data):
487 1
        nid = NodeId()
488 1
        encoding = ord(data.read(1))
489 1
        nid.NodeIdType = NodeIdType(encoding & 0b00111111)
490
491 1
        if nid.NodeIdType == NodeIdType.TwoByte:
492 1
            nid.Identifier = ord(data.read(1))
493 1
        elif nid.NodeIdType == NodeIdType.FourByte:
494 1
            nid.NamespaceIndex, nid.Identifier = struct.unpack("<BH", data.read(3))
495 1
        elif nid.NodeIdType == NodeIdType.Numeric:
496 1
            nid.NamespaceIndex, nid.Identifier = struct.unpack("<HI", data.read(6))
497 1
        elif nid.NodeIdType == NodeIdType.String:
498 1
            nid.NamespaceIndex = uatype_UInt16.unpack(data.read(2))[0]
499 1
            nid.Identifier = unpack_string(data)
500 1
        elif nid.NodeIdType == NodeIdType.ByteString:
501 1
            nid.NamespaceIndex = uatype_UInt16.unpack(data.read(2))[0]
502 1
            nid.Identifier = unpack_bytes(data)
503 1
        elif nid.NodeIdType == NodeIdType.Guid:
504 1
            nid.NamespaceIndex = uatype_UInt16.unpack(data.read(2))[0]
505 1
            nid.Identifier = Guid.from_binary(data)
506
        else:
507
            raise UAError("Unknown NodeId encoding: " + str(nid.NodeIdType))
508
509 1
        if test_bit(encoding, 7):
510
            nid.NamespaceUri = unpack_string(data)
511 1
        if test_bit(encoding, 6):
512
            nid.ServerIndex = uatype_UInt32.unpack(data.read(4))[0]
513
514 1
        return nid
515
516
517 1
class TwoByteNodeId(NodeId):
518
519 1
    def __init__(self, identifier):
520 1
        NodeId.__init__(self, identifier, 0, NodeIdType.TwoByte)
521
522
523 1
class FourByteNodeId(NodeId):
524
525 1
    def __init__(self, identifier, namespace=0):
526 1
        NodeId.__init__(self, identifier, namespace, NodeIdType.FourByte)
527
528
529 1
class NumericNodeId(NodeId):
530
531 1
    def __init__(self, identifier, namespace=0):
532 1
        NodeId.__init__(self, identifier, namespace, NodeIdType.Numeric)
533
534
535 1
class ByteStringNodeId(NodeId):
536
537 1
    def __init__(self, identifier, namespace=0):
538 1
        NodeId.__init__(self, identifier, namespace, NodeIdType.ByteString)
539
540
541 1
class GuidNodeId(NodeId):
542
543 1
    def __init__(self, identifier, namespace=0):
544 1
        NodeId.__init__(self, identifier, namespace, NodeIdType.Guid)
545
546
547 1
class StringNodeId(NodeId):
548
549 1
    def __init__(self, identifier, namespace=0):
550 1
        NodeId.__init__(self, identifier, namespace, NodeIdType.String)
551
552
553 1
ExpandedNodeId = NodeId
554
555
556 1
class QualifiedName(FrozenClass):
557
558
    '''
559
    A string qualified with a namespace index.
560
    '''
561
562 1
    def __init__(self, name="", namespaceidx=0):
563 1
        if not isinstance(namespaceidx, int):
564
            raise UAError("namespaceidx must be an int")
565 1
        self.NamespaceIndex = namespaceidx
566 1
        self.Name = name
567 1
        self._freeze = True
568
569 1
    def to_string(self):
570 1
        return "{}:{}".format(self.NamespaceIndex, self.Name)
571
572 1
    @staticmethod
573
    def from_string(string):
574 1
        if ":" in string:
575 1
            idx, name = string.split(":", 1)
576
        else:
577 1
            idx = 0
578 1
            name = string
579 1
        return QualifiedName(name, int(idx))
580
581 1
    def to_binary(self):
582 1
        packet = []
583 1
        packet.append(uatype_UInt16.pack(self.NamespaceIndex))
584 1
        packet.append(pack_string(self.Name))
585 1
        return b''.join(packet)
586
587 1
    @staticmethod
588
    def from_binary(data):
589 1
        obj = QualifiedName()
590 1
        obj.NamespaceIndex = uatype_UInt16.unpack(data.read(2))[0]
591 1
        obj.Name = unpack_string(data)
592 1
        return obj
593
594 1
    def __eq__(self, bname):
595 1
        return isinstance(bname, QualifiedName) and self.Name == bname.Name and self.NamespaceIndex == bname.NamespaceIndex
596
597 1
    def __str__(self):
598
        return 'QualifiedName({}:{})'.format(self.NamespaceIndex, self.Name)
599
600 1
    __repr__ = __str__
601
602
603 1
class LocalizedText(FrozenClass):
604
605
    '''
606
    A string qualified with a namespace index.
607
    '''
608
609 1
    def __init__(self, text=""):
610 1
        self.Encoding = 0
611 1
        self.Text = text
612 1
        if isinstance(self.Text, unicode):
613 1
            self.Text = self.Text.encode('utf-8')
614 1
        if self.Text:
615 1
            self.Encoding |= (1 << 1)
616 1
        self.Locale = b''
617 1
        self._freeze = True
618
619 1
    def to_binary(self):
620 1
        packet = []
621 1
        if self.Locale:
622
            self.Encoding |= (1 << 0)
623 1
        if self.Text:
624 1
            self.Encoding |= (1 << 1)
625 1
        packet.append(uatype_UInt8.pack(self.Encoding))
626 1
        if self.Locale:
627
            packet.append(pack_bytes(self.Locale))
628 1
        if self.Text:
629 1
            packet.append(pack_bytes(self.Text))
630 1
        return b''.join(packet)
631
632 1
    @staticmethod
633
    def from_binary(data):
634 1
        obj = LocalizedText()
635 1
        obj.Encoding = ord(data.read(1))
636 1
        if obj.Encoding & (1 << 0):
637
            obj.Locale = unpack_bytes(data)
638 1
        if obj.Encoding & (1 << 1):
639 1
            obj.Text = unpack_bytes(data)
640 1
        return obj
641
642 1
    def to_string(self):
643
        # FIXME: use local
644
        return self.Text.decode()
645
646 1
    def __str__(self):
647
        return 'LocalizedText(' + 'Encoding:' + str(self.Encoding) + ', ' + \
648
            'Locale:' + str(self.Locale) + ', ' + \
649
            'Text:' + str(self.Text) + ')'
650 1
    __repr__ = __str__
651
652 1
    def __eq__(self, other):
653 1
        if isinstance(other, LocalizedText) and self.Locale == other.Locale and self.Text == other.Text:
654 1
            return True
655 1
        return False
656
657
658 1
class ExtensionObject(FrozenClass):
659
660
    '''
661
662
    Any UA object packed as an ExtensionObject
663
664
665
    :ivar TypeId:
666
    :vartype TypeId: NodeId
667
    :ivar Body:
668
    :vartype Body: bytes
669
670
    '''
671
672 1
    def __init__(self):
673 1
        self.TypeId = NodeId()
674 1
        self.Encoding = 0
675 1
        self.Body = b''
676 1
        self._freeze = True
677
678 1
    def to_binary(self):
679 1
        packet = []
680 1
        if self.Body:
681 1
            self.Encoding |= (1 << 0)
682 1
        packet.append(self.TypeId.to_binary())
683 1
        packet.append(pack_uatype('UInt8', self.Encoding))
684 1
        if self.Body:
685 1
            packet.append(pack_uatype('ByteString', self.Body))
686 1
        return b''.join(packet)
687
688 1
    @staticmethod
689
    def from_binary(data):
690
        obj = ExtensionObject()
691
        obj.TypeId = NodeId.from_binary(data)
692
        obj.Encoding = unpack_uatype('UInt8', data)
693
        if obj.Encoding & (1 << 0):
694
            obj.Body = unpack_uatype('ByteString', data)
695
        return obj
696
697 1
    @staticmethod
698
    def from_object(obj):
699
        ext = ExtensionObject()
700
        oid = getattr(ObjectIds, "{}_Encoding_DefaultBinary".format(obj.__class__.__name__))
701
        ext.TypeId = FourByteNodeId(oid)
702
        ext.Body = obj.to_binary()
703
        return ext
704
705 1
    def __str__(self):
706
        return 'ExtensionObject(' + 'TypeId:' + str(self.TypeId) + ', ' + \
707
            'Encoding:' + str(self.Encoding) + ', ' + str(len(self.Body)) + ' bytes)'
708
709 1
    __repr__ = __str__
710
711
712 1
class VariantType(Enum):
713
714
    '''
715
    The possible types of a variant.
716
717
    :ivar Null:
718
    :ivar Boolean:
719
    :ivar SByte:
720
    :ivar Byte:
721
    :ivar Int16:
722
    :ivar UInt16:
723
    :ivar Int32:
724
    :ivar UInt32:
725
    :ivar Int64:
726
    :ivar UInt64:
727
    :ivar Float:
728
    :ivar Double:
729
    :ivar String:
730
    :ivar DateTime:
731
    :ivar Guid:
732
    :ivar ByteString:
733
    :ivar XmlElement:
734
    :ivar NodeId:
735
    :ivar ExpandedNodeId:
736
    :ivar StatusCode:
737
    :ivar QualifiedName:
738
    :ivar LocalizedText:
739
    :ivar ExtensionObject:
740
    :ivar DataValue:
741
    :ivar Variant:
742
    :ivar DiagnosticInfo:
743
744
745
746
    '''
747 1
    Null = 0
748 1
    Boolean = 1
749 1
    SByte = 2
750 1
    Byte = 3
751 1
    Int16 = 4
752 1
    UInt16 = 5
753 1
    Int32 = 6
754 1
    UInt32 = 7
755 1
    Int64 = 8
756 1
    UInt64 = 9
757 1
    Float = 10
758 1
    Double = 11
759 1
    String = 12
760 1
    DateTime = 13
761 1
    Guid = 14
762 1
    ByteString = 15
763 1
    XmlElement = 16
764 1
    NodeId = 17
765 1
    ExpandedNodeId = 18
766 1
    StatusCode = 19
767 1
    QualifiedName = 20
768 1
    LocalizedText = 21
769 1
    ExtensionObject = 22
770 1
    DataValue = 23
771 1
    Variant = 24
772 1
    DiagnosticInfo = 25
773
774
775 1
class VariantTypeCustom(object):
776 1
    def __init__(self, val):
777 1
        self.name = "Custom"
778 1
        self.value = val
779 1
        if self.value > 0b00111111:
780 1
            raise UAError("Cannot create VariantType. VariantType must be %s > x > %s", 0b111111, 25)
781
782 1
    def __str__(self):
783
        return "VariantType.Custom:{}".format(self.value)
784 1
    __repr__ = __str__
785
786 1
    def __eq__(self, other):
787 1
        return self.value == other.value
788
789
790 1
class Variant(FrozenClass):
791
792
    """
793
    Create an OPC-UA Variant object.
794
    if no argument a Null Variant is created.
795
    if not variant type is given, attemps to guess type from python type
796
    if a variant is given as value, the new objects becomes a copy of the argument
797
798
    :ivar Value:
799
    :vartype Value: Any supported type
800
    :ivar VariantType:
801
    :vartype VariantType: VariantType
802
    """
803
804 1
    def __init__(self, value=None, varianttype=None, dimensions=None):
805 1
        self.Value = value
806 1
        self.VariantType = varianttype
807 1
        self.Dimensions = dimensions
808 1
        self._freeze = True
809 1
        if isinstance(value, Variant):
810 1
            self.Value = value.Value
811 1
            self.VariantType = value.VariantType
812 1
        if self.VariantType is None:
813 1
            self.VariantType = self._guess_type(self.Value)
814 1
        if self.Dimensions is None and type(self.Value) in (list, tuple):
815 1
            dims = get_shape(self.Value)
816 1
            if len(dims) > 1:
817 1
                self.Dimensions = dims
818
819 1
    def __eq__(self, other):
820 1
        if isinstance(other, Variant) and self.VariantType == other.VariantType and self.Value == other.Value:
821 1
            return True
822 1
        return False
823
824 1
    def _guess_type(self, val):
825 1
        while isinstance(val, (list, tuple)):
826 1
            if len(val) == 0:
827
                raise UAError("could not guess UA variable type")
828 1
            val = val[0]
829 1
        if val is None:
830 1
            return VariantType.Null
831 1
        elif isinstance(val, bool):
832 1
            return VariantType.Boolean
833 1
        elif isinstance(val, float):
834 1
            return VariantType.Double
835 1
        elif isinstance(val, int):
836 1
            return VariantType.Int64
837 1
        elif type(val) in (str, unicode):
838 1
            return VariantType.String
839 1
        elif isinstance(val, bytes):
840 1
            return VariantType.ByteString
841 1
        elif isinstance(val, datetime):
842 1
            return VariantType.DateTime
843
        else:
844 1
            if isinstance(val, object):
845 1
                try:
846 1
                    return getattr(VariantType, val.__class__.__name__)
847 1
                except AttributeError:
848 1
                    return VariantType.ExtensionObject
849
            else:
850
                raise UAError("Could not guess UA type of {} with type {}, specify UA type".format(val, type(val)))
851
852 1
    def __str__(self):
853 1
        return "Variant(val:{!s},type:{})".format(self.Value, self.VariantType)
854 1
    __repr__ = __str__
855
856 1
    def to_binary(self):
857 1
        b = []
858 1
        encoding = self.VariantType.value & 0b111111
859 1
        if type(self.Value) in (list, tuple):
860 1
            if self.Dimensions is not None:
861 1
                encoding = set_bit(encoding, 6)
862 1
            encoding = set_bit(encoding, 7)
863 1
            b.append(uatype_UInt8.pack(encoding))
864 1
            b.append(pack_uatype_array(self.VariantType.name, flatten(self.Value)))
865 1
            if self.Dimensions is not None:
866 1
                b.append(pack_uatype_array("Int32", self.Dimensions))
867
        else:
868 1
            b.append(uatype_UInt8.pack(encoding))
869 1
            b.append(pack_uatype(self.VariantType.name, self.Value))
870
871 1
        return b"".join(b)
872
873 1
    @staticmethod
874
    def from_binary(data):
875 1
        dimensions = None
876 1
        encoding = ord(data.read(1))
877 1
        int_type = encoding & 0b00111111
878 1
        if int_type > 25:
879 1
            vtype = VariantTypeCustom(int_type)
880
        else:
881 1
            vtype = VariantType(int_type)
882 1
        if vtype == VariantType.Null:
883 1
            return Variant(None, vtype, encoding)
884 1
        if test_bit(encoding, 7):
885 1
            value = unpack_uatype_array(vtype.name, data)
886
        else:
887 1
            value = unpack_uatype(vtype.name, data)
888 1
        if test_bit(encoding, 6):
889 1
            dimensions = unpack_uatype_array("Int32", data)
890 1
            value = reshape(value, dimensions)
891
892 1
        return Variant(value, vtype, dimensions)
893
894
895 1
def reshape(flat, dims):
896 1
    subdims = dims[1:]
897 1
    subsize = 1
898 1
    for i in subdims:
899 1
        if i == 0:
900 1
            i = 1
901 1
        subsize *= i
902 1
    while dims[0] * subsize > len(flat):
903 1
        flat.append([])
904 1
    if not subdims or subdims == [0]:
905 1
        return flat
906 1
    return [reshape(flat[i: i + subsize], subdims) for i in range(0, len(flat), subsize)]
907
908
909 1
def _split_list(l, n):
910
    n = max(1, n)
911
    return [l[i:i + n] for i in range(0, len(l), n)]
912
913
914 1
def flatten_and_get_shape(mylist):
915
    dims = []
916
    dims.append(len(mylist))
917
    while isinstance(mylist[0], (list, tuple)):
918
        dims.append(len(mylist[0]))
919
        mylist = [item for sublist in mylist for item in sublist]
920
        if len(mylist) == 0:
921
            break
922
    return mylist, dims
923
924
925 1
def flatten(mylist):
926 1
    if len(mylist) == 0:
927
        return mylist
928 1
    while isinstance(mylist[0], (list, tuple)):
929 1
        mylist = [item for sublist in mylist for item in sublist]
930 1
        if len(mylist) == 0:
931 1
            break
932 1
    return mylist
933
934
935 1
def get_shape(mylist):
936 1
    dims = []
937 1
    while isinstance(mylist, (list, tuple)):
938 1
        dims.append(len(mylist))
939 1
        if len(mylist) == 0:
940 1
            break
941 1
        mylist = mylist[0]
942 1
    return dims
943
944
945 1
class XmlElement(FrozenClass):
946
    '''
947
    An XML element encoded as an UTF-8 string.
948
    '''
949 1
    def __init__(self, binary=None):
950
        if binary is not None:
951
            self._binary_init(binary)
952
            self._freeze = True
953
            return
954
        self.Value = []
955
        self._freeze = True
956
957 1
    def to_binary(self):
958
        return pack_string(self.Value)
959
960 1
    @staticmethod
961
    def from_binary(data):
962
        return XmlElement(data)
963
964 1
    def _binary_init(self, data):
965
        self.Value = unpack_string(data)
966
967 1
    def __str__(self):
968
        return 'XmlElement(Value:' + str(self.Value) + ')'
969
970 1
    __repr__ = __str__
971
972
973
974
975 1
class DataValue(FrozenClass):
976
977
    '''
978
    A value with an associated timestamp, and quality.
979
    Automatically generated from xml , copied and modified here to fix errors in xml spec
980
981
    :ivar Value:
982
    :vartype Value: Variant
983
    :ivar StatusCode:
984
    :vartype StatusCode: StatusCode
985
    :ivar SourceTimestamp:
986
    :vartype SourceTimestamp: datetime
987
    :ivar SourcePicoSeconds:
988
    :vartype SourcePicoSeconds: int
989
    :ivar ServerTimestamp:
990
    :vartype ServerTimestamp: datetime
991
    :ivar ServerPicoseconds:
992
    :vartype ServerPicoseconds: int
993
994
    '''
995
996 1
    def __init__(self, variant=None, status=None):
997 1
        self.Encoding = 0
998 1
        if not isinstance(variant, Variant):
999 1
            variant = Variant(variant)
1000 1
        self.Value = variant
1001 1
        if status is None:
1002 1
            self.StatusCode = StatusCode()
1003
        else:
1004 1
            self.StatusCode = status
1005 1
        self.SourceTimestamp = None  # DateTime()
1006 1
        self.SourcePicoseconds = None
1007 1
        self.ServerTimestamp = None  # DateTime()
1008 1
        self.ServerPicoseconds = None
1009 1
        self._freeze = True
1010
1011 1
    def to_binary(self):
1012 1
        packet = []
1013 1
        if self.Value:
1014 1
            self.Encoding |= (1 << 0)
1015 1
        if self.StatusCode:
1016 1
            self.Encoding |= (1 << 1)
1017 1
        if self.SourceTimestamp:
1018 1
            self.Encoding |= (1 << 2)
1019 1
        if self.ServerTimestamp:
1020 1
            self.Encoding |= (1 << 3)
1021 1
        if self.SourcePicoseconds:
1022
            self.Encoding |= (1 << 4)
1023 1
        if self.ServerPicoseconds:
1024
            self.Encoding |= (1 << 5)
1025 1
        packet.append(uatype_UInt8.pack(self.Encoding))
1026 1
        if self.Value:
1027 1
            packet.append(self.Value.to_binary())
1028 1
        if self.StatusCode:
1029 1
            packet.append(self.StatusCode.to_binary())
1030 1
        if self.SourceTimestamp:
1031 1
            packet.append(pack_datetime(self.SourceTimestamp))  # self.SourceTimestamp.to_binary())
1032 1
        if self.ServerTimestamp:
1033 1
            packet.append(pack_datetime(self.ServerTimestamp))  # self.ServerTimestamp.to_binary())
1034 1
        if self.SourcePicoseconds:
1035
            packet.append(uatype_UInt16.pack(self.SourcePicoseconds))
1036 1
        if self.ServerPicoseconds:
1037
            packet.append(uatype_UInt16.pack(self.ServerPicoseconds))
1038 1
        return b''.join(packet)
1039
1040 1
    @staticmethod
1041
    def from_binary(data):
1042 1
        encoding = ord(data.read(1))
1043 1
        if encoding & (1 << 0):
1044 1
            value = Variant.from_binary(data)
1045
        else:
1046
            value = None
1047 1
        if encoding & (1 << 1):
1048 1
            status = StatusCode.from_binary(data)
1049
        else:
1050
            status = None
1051 1
        obj = DataValue(value, status)
1052 1
        obj.Encoding = encoding
1053 1
        if obj.Encoding & (1 << 2):
1054 1
            obj.SourceTimestamp = unpack_datetime(data)  # DateTime.from_binary(data)
1055 1
        if obj.Encoding & (1 << 3):
1056 1
            obj.ServerTimestamp = unpack_datetime(data)  # DateTime.from_binary(data)
1057 1
        if obj.Encoding & (1 << 4):
1058
            obj.SourcePicoseconds = uatype_UInt16.unpack(data.read(2))[0]
1059 1
        if obj.Encoding & (1 << 5):
1060
            obj.ServerPicoseconds = uatype_UInt16.unpack(data.read(2))[0]
1061 1
        return obj
1062
1063 1
    def __str__(self):
1064
        s = 'DataValue(Value:{}'.format(self.Value)
1065
        if self.StatusCode is not None:
1066
            s += ', StatusCode:{}'.format(self.StatusCode)
1067
        if self.SourceTimestamp is not None:
1068
            s += ', SourceTimestamp:{}'.format(self.SourceTimestamp)
1069
        if self.ServerTimestamp is not None:
1070
            s += ', ServerTimestamp:{}'.format(self.ServerTimestamp)
1071
        if self.SourcePicoseconds is not None:
1072
            s += ', SourcePicoseconds:{}'.format(self.SourcePicoseconds)
1073
        if self.ServerPicoseconds is not None:
1074
            s += ', ServerPicoseconds:{}'.format(self.ServerPicoseconds)
1075
        s += ')'
1076
        return s
1077
1078 1
    __repr__ = __str__
1079
1080
1081 1
__nodeid_counter = 2000
1082
1083
1084 1
def generate_nodeid(idx):
1085
    global __nodeid_counter
1086 1
    __nodeid_counter += 1
1087
    return NodeId(__nodeid_counter, idx)
1088