Passed
Push — dev ( ca56e9...c44e46 )
by Olivier
05:06 queued 02:39
created

opcua.ua.NodeId   B

Complexity

Total Complexity 50

Size/Duplication

Total Lines 178
Duplicated Lines 0 %

Test Coverage

Coverage 85.93%
Metric Value
dl 0
loc 178
ccs 116
cts 135
cp 0.8593
rs 8.6206
wmc 50

10 Methods

Rating   Name   Duplication   Size   Complexity  
A __key() 0 5 2
B to_binary() 0 16 6
F from_binary() 0 30 9
D to_string() 0 23 10
A __eq__() 0 2 1
F _from_string() 0 38 11
A __hash__() 0 2 1
C __init__() 0 22 7
A from_string() 0 6 2
A __str__() 0 2 1

How to fix   Complexity   

Complex Class

Complex classes like opcua.ua.NodeId often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
implement ua datatypes
3
"""
4 1
import logging
5 1
from enum import Enum
6 1
from datetime import datetime, timedelta, tzinfo, MAXYEAR
7 1
from calendar import timegm
8 1
import sys
9 1
import os
10 1
import uuid
11 1
import struct
12 1
if sys.version_info.major > 2:
13 1
    unicode = str
14
15 1
from opcua.ua import status_codes
16 1
from opcua.common.uaerrors import UaError
17 1
from opcua.common.uaerrors import UaStatusCodeError
18 1
from opcua.common.uaerrors import UaStringParsingError
19
20
21 1
logger = logging.getLogger('opcua.uaprotocol')
22
23
24
# types that will packed and unpacked directly using struct (string, bytes and datetime are handles as special cases
25 1
UaTypes = ("Boolean", "SByte", "Byte", "Int8", "UInt8", "Int16", "UInt16", "Int32", "UInt32", "Int64", "UInt64", "Float", "Double")
26
27
28 1
EPOCH_AS_FILETIME = 116444736000000000  # January 1, 1970 as MS file time
29 1
HUNDREDS_OF_NANOSECONDS = 10000000
30 1
FILETIME_EPOCH_AS_DATETIME = datetime(1601, 1, 1)
31
32
33 1
class UTC(tzinfo):
34
35
    """UTC"""
36
37 1
    def utcoffset(self, dt):
38
        return timedelta(0)
39
40 1
    def tzname(self, dt):
41
        return "UTC"
42
43 1
    def dst(self, dt):
44 1
        return timedelta(0)
45
46
47
# method copied from David Buxton <[email protected]> sample code
48 1
def datetime_to_win_epoch(dt):
49 1
    if (dt.tzinfo is None) or (dt.tzinfo.utcoffset(dt) is None):
50 1
        dt = dt.replace(tzinfo=UTC())
51 1
    ft = EPOCH_AS_FILETIME + (timegm(dt.timetuple()) * HUNDREDS_OF_NANOSECONDS)
52 1
    return ft + (dt.microsecond * 10)
53
54
55 1
def win_epoch_to_datetime(epch):
56 1
    try:
57 1
        return FILETIME_EPOCH_AS_DATETIME + timedelta(microseconds=epch // 10)
58
    except OverflowError:
59
        # FILETIMEs after 31 Dec 9999 can't be converted to datetime
60
        logger.warning("datetime overflow: {}".format(epch))
61
        return datetime(MAXYEAR, 12, 31, 23, 59, 59, 999999)
62
63
64 1
def build_array_format_py2(prefix, length, fmtchar):
65
    return prefix + str(length) + fmtchar
66
67
68 1
def build_array_format_py3(prefix, length, fmtchar):
69 1
    return prefix + str(length) + chr(fmtchar)
70
71
72 1
if sys.version_info.major < 3:
73
    build_array_format = build_array_format_py2
74
else:
75 1
    build_array_format = build_array_format_py3
76
77
78 1
def pack_uatype_array_primitive(st, value, length):
79 1
    if length == 1:
80 1
        return b'\x01\x00\x00\x00' + st.pack(value[0])
81
    else:
82 1
        return struct.pack(build_array_format("<i", length, st.format[1]), length, *value)
83
84
85 1
def pack_uatype_array(uatype, value):
86 1
    if value is None:
87
        return b'\xff\xff\xff\xff'
88 1
    length = len(value)
89 1
    if length == 0:
90 1
        return b'\x00\x00\x00\x00'
91 1
    if uatype in uatype2struct:
92 1
        return pack_uatype_array_primitive(uatype2struct[uatype], value, length)
93 1
    b = []
94 1
    b.append(uatype_Int32.pack(length))
95 1
    for val in value:
96 1
        b.append(pack_uatype(uatype, val))
97 1
    return b"".join(b)
98
99
100 1
def pack_uatype(uatype, value):
101 1
    if uatype in uatype2struct:
102 1
        return uatype2struct[uatype].pack(value)
103 1
    elif uatype == "Null":
104 1
        return b''
105 1
    elif uatype == "String":
106 1
        return pack_string(value)
107 1
    elif uatype in ("CharArray", "ByteString", "Custom"):
108 1
        return pack_bytes(value)
109 1
    elif uatype == "DateTime":
110 1
        return pack_datetime(value)
111 1
    elif uatype == "ExtensionObject":
112
        # dependency loop: classes in uaprotocol_auto use Variant defined in this file,
113
        # but Variant can contain any object from uaprotocol_auto as ExtensionObject.
114
        # Using local import to avoid import loop
115 1
        from opcua.ua.uaprotocol_auto import extensionobject_to_binary
116 1
        return extensionobject_to_binary(value)
117
    else:
118 1
        return value.to_binary()
119
120 1
uatype_Int8 = struct.Struct("<b")
121 1
uatype_SByte = uatype_Int8
122 1
uatype_Int16 = struct.Struct("<h")
123 1
uatype_Int32 = struct.Struct("<i")
124 1
uatype_Int64 = struct.Struct("<q")
125 1
uatype_UInt8 = struct.Struct("<B")
126 1
uatype_Char = uatype_UInt8
127 1
uatype_Byte = uatype_UInt8
128 1
uatype_UInt16 = struct.Struct("<H")
129 1
uatype_UInt32 = struct.Struct("<I")
130 1
uatype_UInt64 = struct.Struct("<Q")
131 1
uatype_Boolean = struct.Struct("<?")
132 1
uatype_Double = struct.Struct("<d")
133 1
uatype_Float = struct.Struct("<f")
134
135 1
uatype2struct = {
136
    "Int8": uatype_Int8,
137
    "SByte": uatype_SByte,
138
    "Int16": uatype_Int16,
139
    "Int32": uatype_Int32,
140
    "Int64": uatype_Int64,
141
    "UInt8": uatype_UInt8,
142
    "Char": uatype_Char,
143
    "Byte": uatype_Byte,
144
    "UInt16": uatype_UInt16,
145
    "UInt32": uatype_UInt32,
146
    "UInt64": uatype_UInt64,
147
    "Boolean": uatype_Boolean,
148
    "Double": uatype_Double,
149
    "Float": uatype_Float,
150
}
151
152
153 1
def unpack_uatype(uatype, data):
154 1
    if uatype in uatype2struct:
155 1
        st = uatype2struct[uatype]
156 1
        return st.unpack(data.read(st.size))[0]
157 1
    elif uatype == "String":
158 1
        return unpack_string(data)
159 1
    elif uatype in ("CharArray", "ByteString", "Custom"):
160 1
        return unpack_bytes(data)
161 1
    elif uatype == "DateTime":
162 1
        return unpack_datetime(data)
163 1
    elif uatype == "ExtensionObject":
164
        # dependency loop: classes in uaprotocol_auto use Variant defined in this file,
165
        # but Variant can contain any object from uaprotocol_auto as ExtensionObject.
166
        # Using local import to avoid import loop
167 1
        from opcua.ua.uaprotocol_auto import extensionobject_from_binary
168 1
        return extensionobject_from_binary(data)
169
    else:
170 1
        glbs = globals()
171 1
        if uatype in glbs:
172 1
            klass = glbs[uatype]
173 1
            if hasattr(klass, 'from_binary'):
174 1
                return klass.from_binary(data)
175
        raise UaError("can not unpack unknown uatype %s" % uatype)
176
177
178 1
def unpack_uatype_array(uatype, data):
179 1
    length = uatype_Int32.unpack(data.read(4))[0]
180 1
    if length == -1:
181
        return None
182 1
    elif length == 0:
183 1
        return []
184 1
    elif uatype in uatype2struct:
185 1
        st = uatype2struct[uatype]
186 1
        if length == 1:
187 1
            return list(st.unpack(data.read(st.size)))
188
        else:
189 1
            arrst = struct.Struct(build_array_format("<", length, st.format[1]))
190 1
            return list(arrst.unpack(data.read(arrst.size)))
191
    else:
192 1
        result = []
193 1
        for _ in range(0, length):
194 1
            result.append(unpack_uatype(uatype, data))
195 1
        return result
196
197
198 1
def pack_datetime(value):
199 1
    epch = datetime_to_win_epoch(value)
200 1
    return uatype_Int64.pack(epch)
201
202
203 1
def unpack_datetime(data):
204 1
    epch = uatype_Int64.unpack(data.read(8))[0]
205 1
    return win_epoch_to_datetime(epch)
206
207
208 1
def pack_string(string):
209 1
    if isinstance(string, unicode):
210 1
        string = string.encode('utf-8')
211 1
    length = len(string)
212 1
    if length == 0:
213 1
        return b'\xff\xff\xff\xff'
214 1
    return uatype_Int32.pack(length) + string
215
216 1
pack_bytes = pack_string
217
218
219 1
def unpack_bytes(data):
220 1
    length = uatype_Int32.unpack(data.read(4))[0]
221 1
    if length == -1:
222 1
        return b''
223 1
    return data.read(length)
224
225
226 1
def py3_unpack_string(data):
227 1
    b = unpack_bytes(data)
228 1
    return b.decode("utf-8")
229
230
231 1
if sys.version_info.major < 3:
232
    unpack_string = unpack_bytes
233
else:
234 1
    unpack_string = py3_unpack_string
235
236
237 1
def test_bit(data, offset):
238 1
    mask = 1 << offset
239 1
    return data & mask
240
241
242 1
def set_bit(data, offset):
243 1
    mask = 1 << offset
244 1
    return data | mask
245
246
247 1
class _FrozenClass(object):
248
249
    """
250
    make it impossible to add members to a class.
251
    This is a hack since I found out that most bugs are due to misspelling a variable in protocol
252
    """
253 1
    _freeze = False
254
255 1
    def __setattr__(self, key, value):
256 1
        if self._freeze and not hasattr(self, key):
257
            raise TypeError("Error adding member '{}' to class '{}', class is frozen, members are {}".format(key, self.__class__.__name__, self.__dict__.keys()))
258 1
        object.__setattr__(self, key, value)
259
260 1
if "PYOPCUA_NO_TYPO_CHECK" in os.environ:
261
    # typo check is cpu consuming, but it will make debug easy.
262
    # if typo check is not need (in production), please set env PYOPCUA_NO_TYPO_CHECK.
263
    # this will make all uatype class inherit from object intead of _FrozenClass
264
    # and skip the typo check.
265
    FrozenClass = object
266
else:
267 1
    FrozenClass = _FrozenClass
268
269
270 1
class Guid(FrozenClass):
271
272 1
    def __init__(self):
273 1
        self.uuid = uuid.uuid4()
274 1
        self._freeze = True
275
276 1
    def to_binary(self):
277 1
        return self.uuid.bytes
278
279 1
    def __hash__(self):
280
        return hash(self.uuid.bytes)
281
282 1
    @staticmethod
283
    def from_binary(data):
284 1
        g = Guid()
285 1
        g.uuid = uuid.UUID(bytes=data.read(16))
286 1
        return g
287
288 1
    def __eq__(self, other):
289 1
        return isinstance(other, Guid) and self.uuid == other.uuid
290
291
292 1
class StatusCode(FrozenClass):
293
294
    """
295
    :ivar value:
296
    :vartype value: int
297
    :ivar name:
298
    :vartype name: string
299
    :ivar doc:
300
    :vartype doc: string
301
    """
302
303 1
    def __init__(self, value=0):
304 1
        self.value = value
305 1
        self.name, self.doc = status_codes.get_name_and_doc(value)
306 1
        self._freeze = True
307
308 1
    def to_binary(self):
309 1
        return uatype_UInt32.pack(self.value)
310
311 1
    @staticmethod
312
    def from_binary(data):
313 1
        val = uatype_UInt32.unpack(data.read(4))[0]
314 1
        sc = StatusCode(val)
315 1
        return sc
316
317 1
    def check(self):
318
        """
319
        raise en exception if status code is anything else than 0
320
        use is is_good() method if not exception is desired
321
        """
322 1
        if self.value != 0:
323 1
            raise UaStatusCodeError("{}({})".format(self.doc, self.name))
324
325 1
    def is_good(self):
326
        """
327
        return True if status is Good.
328
        """
329 1
        if self.value == 0:
330 1
            return True
331 1
        return False
332
333 1
    def __str__(self):
334
        return 'StatusCode({})'.format(self.name)
335 1
    __repr__ = __str__
336
337
338 1
class NodeIdType(Enum):
339 1
    TwoByte = 0
340 1
    FourByte = 1
341 1
    Numeric = 2
342 1
    String = 3
343 1
    Guid = 4
344 1
    ByteString = 5
345
346
347 1
class NodeId(FrozenClass):
348
349
    """
350
    NodeId Object
351
352
    Args:
353
        identifier: The identifier might be an int, a string, bytes or a Guid
354
        namespaceidx(int): The index of the namespace
355
        nodeidtype(NodeIdType): The type of the nodeid if it cannor be guess or you want something special like twobyte nodeid or fourbytenodeid
356
357
358
    :ivar Identifier:
359
    :vartype Identifier: NodeId
360
    :ivar NamespaceIndex:
361
    :vartype NamespaceIndex: Int
362
    :ivar NamespaceUri:
363
    :vartype NamespaceUri: String
364
    :ivar ServerIndex:
365
    :vartype ServerIndex: Int
366
    """
367
368 1
    def __init__(self, identifier=None, namespaceidx=0, nodeidtype=None):
369 1
        self.Identifier = identifier
370 1
        self.NamespaceIndex = namespaceidx
371 1
        self.NodeIdType = nodeidtype
372 1
        self.NamespaceUri = ""
373 1
        self.ServerIndex = 0
374 1
        self._freeze = True
375 1
        if not isinstance(self.NamespaceIndex, int):
376 1
            raise UaError("NamespaceIndex must be an int")
377 1
        if self.Identifier is None:
378 1
            self.Identifier = 0
379 1
            self.NodeIdType = NodeIdType.TwoByte
380 1
            return
381 1
        if self.NodeIdType is None:
382 1
            if isinstance(self.Identifier, int):
383 1
                self.NodeIdType = NodeIdType.Numeric
384 1
            elif isinstance(self.Identifier, str):
385 1
                self.NodeIdType = NodeIdType.String
386
            elif isinstance(self.Identifier, bytes):
387
                self.NodeIdType = NodeIdType.ByteString
388
            else:
389
                raise UaError("NodeId: Could not guess type of NodeId, set NodeIdType")
390
391 1
    def __key(self):
392 1
        if self.NodeIdType in (NodeIdType.TwoByte, NodeIdType.FourByte, NodeIdType.Numeric):  # twobyte, fourbyte and numeric may represent the same node
393 1
            return self.NamespaceIndex, self.Identifier
394
        else:
395 1
            return self.NodeIdType, self.NamespaceIndex, self.Identifier
396
397 1
    def __eq__(self, node):
398 1
        return isinstance(node, NodeId) and self.__key() == node.__key()
399
400 1
    def __hash__(self):
401 1
        return hash(self.__key())
402
403 1
    @staticmethod
404
    def from_string(string):
405 1
        try:
406 1
            return NodeId._from_string(string)
407 1
        except ValueError as ex:
408 1
            raise UaStringParsingError("Error parsing string {}".format(string), ex)
409
410 1
    @staticmethod
411
    def _from_string(string):
412 1
        l = string.split(";")
413 1
        identifier = None
414 1
        namespace = 0
415 1
        ntype = None
416 1
        srv = None
417 1
        nsu = None
418 1
        for el in l:
419 1
            if not el:
420 1
                continue
421 1
            k, v = el.split("=", 1)
422 1
            k = k.strip()
423 1
            v = v.strip()
424 1
            if k == "ns":
425 1
                namespace = int(v)
426 1
            elif k == "i":
427 1
                ntype = NodeIdType.Numeric
428 1
                identifier = int(v)
429 1
            elif k == "s":
430 1
                ntype = NodeIdType.String
431 1
                identifier = v
432 1
            elif k == "g":
433
                ntype = NodeIdType.Guid
434
                identifier = v
435 1
            elif k == "b":
436
                ntype = NodeIdType.ByteString
437
                identifier = v
438 1
            elif k == "srv":
439 1
                srv = v
440
            elif k == "nsu":
441
                nsu = v
442 1
        if identifier is None:
443 1
            raise UaStringParsingError("Could not find identifier in string: " + string)
444 1
        nodeid = NodeId(identifier, namespace, ntype)
445 1
        nodeid.NamespaceUri = nsu
446 1
        nodeid.ServerIndex = srv
447 1
        return nodeid
448
449 1
    def to_string(self):
450 1
        string = ""
451 1
        if self.NamespaceIndex != 0:
452 1
            string += "ns={};".format(self.NamespaceIndex)
453 1
        ntype = None
454 1
        if self.NodeIdType == NodeIdType.Numeric:
455
            ntype = "i"
456 1
        elif self.NodeIdType == NodeIdType.String:
457 1
            ntype = "s"
458 1
        elif self.NodeIdType == NodeIdType.TwoByte:
459 1
            ntype = "i"
460 1
        elif self.NodeIdType == NodeIdType.FourByte:
461 1
            ntype = "i"
462
        elif self.NodeIdType == NodeIdType.Guid:
463
            ntype = "g"
464
        elif self.NodeIdType == NodeIdType.ByteString:
465
            ntype = "b"
466 1
        string += "{}={}".format(ntype, self.Identifier)
467 1
        if self.ServerIndex:
468
            string = "srv=" + str(self.ServerIndex) + string
469 1
        if self.NamespaceUri:
470
            string += "nsu={}".format(self.NamespaceUri)
471 1
        return string
472
473 1
    def __str__(self):
474 1
        return "{}NodeId({})".format(self.NodeIdType.name, self.to_string())
475 1
    __repr__ = __str__
476
477 1
    def to_binary(self):
478 1
        if self.NodeIdType == NodeIdType.TwoByte:
479 1
            return struct.pack("<BB", self.NodeIdType.value, self.Identifier)
480 1
        elif self.NodeIdType == NodeIdType.FourByte:
481 1
            return struct.pack("<BBH", self.NodeIdType.value, self.NamespaceIndex, self.Identifier)
482 1
        elif self.NodeIdType == NodeIdType.Numeric:
483 1
            return struct.pack("<BHI", self.NodeIdType.value, self.NamespaceIndex, self.Identifier)
484 1
        elif self.NodeIdType == NodeIdType.String:
485 1
            return struct.pack("<BH", self.NodeIdType.value, self.NamespaceIndex) + \
486
                pack_string(self.Identifier)
487 1
        elif self.NodeIdType == NodeIdType.ByteString:
488 1
            return struct.pack("<BH", self.NodeIdType.value, self.NamespaceIndex) + \
489
                pack_bytes(self.Identifier)
490
        else:
491 1
            return struct.pack("<BH", self.NodeIdType.value, self.NamespaceIndex) + \
492
                self.Identifier.to_binary()
493
        #FIXME: Missing NNamespaceURI and ServerIndex
494
495 1
    @staticmethod
496
    def from_binary(data):
497 1
        nid = NodeId()
498 1
        encoding = ord(data.read(1))
499 1
        nid.NodeIdType = NodeIdType(encoding & 0b00111111)
500
501 1
        if nid.NodeIdType == NodeIdType.TwoByte:
502 1
            nid.Identifier = ord(data.read(1))
503 1
        elif nid.NodeIdType == NodeIdType.FourByte:
504 1
            nid.NamespaceIndex, nid.Identifier = struct.unpack("<BH", data.read(3))
505 1
        elif nid.NodeIdType == NodeIdType.Numeric:
506 1
            nid.NamespaceIndex, nid.Identifier = struct.unpack("<HI", data.read(6))
507 1
        elif nid.NodeIdType == NodeIdType.String:
508 1
            nid.NamespaceIndex = uatype_UInt16.unpack(data.read(2))[0]
509 1
            nid.Identifier = unpack_string(data)
510 1
        elif nid.NodeIdType == NodeIdType.ByteString:
511 1
            nid.NamespaceIndex = uatype_UInt16.unpack(data.read(2))[0]
512 1
            nid.Identifier = unpack_bytes(data)
513 1
        elif nid.NodeIdType == NodeIdType.Guid:
514 1
            nid.NamespaceIndex = uatype_UInt16.unpack(data.read(2))[0]
515 1
            nid.Identifier = Guid.from_binary(data)
516
        else:
517
            raise UaError("Unknown NodeId encoding: " + str(nid.NodeIdType))
518
519 1
        if test_bit(encoding, 7):
520
            nid.NamespaceUri = unpack_string(data)
521 1
        if test_bit(encoding, 6):
522
            nid.ServerIndex = uatype_UInt32.unpack(data.read(4))[0]
523
524 1
        return nid
525
526
527 1
class TwoByteNodeId(NodeId):
528
529 1
    def __init__(self, identifier):
530 1
        NodeId.__init__(self, identifier, 0, NodeIdType.TwoByte)
531
532
533 1
class FourByteNodeId(NodeId):
534
535 1
    def __init__(self, identifier, namespace=0):
536 1
        NodeId.__init__(self, identifier, namespace, NodeIdType.FourByte)
537
538
539 1
class NumericNodeId(NodeId):
540
541 1
    def __init__(self, identifier, namespace=0):
542 1
        NodeId.__init__(self, identifier, namespace, NodeIdType.Numeric)
543
544
545 1
class ByteStringNodeId(NodeId):
546
547 1
    def __init__(self, identifier, namespace=0):
548 1
        NodeId.__init__(self, identifier, namespace, NodeIdType.ByteString)
549
550
551 1
class GuidNodeId(NodeId):
552
553 1
    def __init__(self, identifier, namespace=0):
554 1
        NodeId.__init__(self, identifier, namespace, NodeIdType.Guid)
555
556
557 1
class StringNodeId(NodeId):
558
559 1
    def __init__(self, identifier, namespace=0):
560 1
        NodeId.__init__(self, identifier, namespace, NodeIdType.String)
561
562
563 1
ExpandedNodeId = NodeId
564
565
566 1
class QualifiedName(FrozenClass):
567
568
    '''
569
    A string qualified with a namespace index.
570
    '''
571
572 1
    def __init__(self, name="", namespaceidx=0):
573 1
        if not isinstance(namespaceidx, int):
574
            raise UaError("namespaceidx must be an int")
575 1
        self.NamespaceIndex = namespaceidx
576 1
        self.Name = name
577 1
        self._freeze = True
578
579 1
    def to_string(self):
580 1
        return "{}:{}".format(self.NamespaceIndex, self.Name)
581
582 1
    @staticmethod
583
    def from_string(string):
584 1
        if ":" in string:
585 1
            try:
586 1
                idx, name = string.split(":", 1)
587 1
                idx = int(idx)
588 1
            except (TypeError, ValueError) as ex:
589 1
                raise UaStringParsingError("Error parsing string {}".format(string), ex)
590
        else:
591 1
            idx = 0
592 1
            name = string
593 1
        return QualifiedName(name, idx)
594
595 1
    def to_binary(self):
596 1
        packet = []
597 1
        packet.append(uatype_UInt16.pack(self.NamespaceIndex))
598 1
        packet.append(pack_string(self.Name))
599 1
        return b''.join(packet)
600
601 1
    @staticmethod
602
    def from_binary(data):
603 1
        obj = QualifiedName()
604 1
        obj.NamespaceIndex = uatype_UInt16.unpack(data.read(2))[0]
605 1
        obj.Name = unpack_string(data)
606 1
        return obj
607
608 1
    def __eq__(self, bname):
609 1
        return isinstance(bname, QualifiedName) and self.Name == bname.Name and self.NamespaceIndex == bname.NamespaceIndex
610
611 1
    def __str__(self):
612
        return 'QualifiedName({}:{})'.format(self.NamespaceIndex, self.Name)
613
614 1
    __repr__ = __str__
615
616
617 1
class LocalizedText(FrozenClass):
618
619
    '''
620
    A string qualified with a namespace index.
621
    '''
622
623 1
    def __init__(self, text=""):
624 1
        self.Encoding = 0
625 1
        self.Text = text
626 1
        if isinstance(self.Text, unicode):
627 1
            self.Text = self.Text.encode('utf-8')
628 1
        if self.Text:
629 1
            self.Encoding |= (1 << 1)
630 1
        self.Locale = b''
631 1
        self._freeze = True
632
633 1
    def to_binary(self):
634 1
        packet = []
635 1
        if self.Locale:
636
            self.Encoding |= (1 << 0)
637 1
        if self.Text:
638 1
            self.Encoding |= (1 << 1)
639 1
        packet.append(uatype_UInt8.pack(self.Encoding))
640 1
        if self.Locale:
641
            packet.append(pack_bytes(self.Locale))
642 1
        if self.Text:
643 1
            packet.append(pack_bytes(self.Text))
644 1
        return b''.join(packet)
645
646 1
    @staticmethod
647
    def from_binary(data):
648 1
        obj = LocalizedText()
649 1
        obj.Encoding = ord(data.read(1))
650 1
        if obj.Encoding & (1 << 0):
651
            obj.Locale = unpack_bytes(data)
652 1
        if obj.Encoding & (1 << 1):
653 1
            obj.Text = unpack_bytes(data)
654 1
        return obj
655
656 1
    def to_string(self):
657
        # FIXME: use local
658
        return self.Text.decode()
659
660 1
    def __str__(self):
661
        return 'LocalizedText(' + 'Encoding:' + str(self.Encoding) + ', ' + \
662
            'Locale:' + str(self.Locale) + ', ' + \
663
            'Text:' + str(self.Text) + ')'
664 1
    __repr__ = __str__
665
666 1
    def __eq__(self, other):
667 1
        if isinstance(other, LocalizedText) and self.Locale == other.Locale and self.Text == other.Text:
668 1
            return True
669 1
        return False
670
671
672 1
class ExtensionObject(FrozenClass):
673
674
    '''
675
676
    Any UA object packed as an ExtensionObject
677
678
679
    :ivar TypeId:
680
    :vartype TypeId: NodeId
681
    :ivar Body:
682
    :vartype Body: bytes
683
684
    '''
685
686 1
    def __init__(self):
687 1
        self.TypeId = NodeId()
688 1
        self.Encoding = 0
689 1
        self.Body = b''
690 1
        self._freeze = True
691
692 1
    def to_binary(self):
693 1
        packet = []
694 1
        if self.Body:
695 1
            self.Encoding |= (1 << 0)
696 1
        packet.append(self.TypeId.to_binary())
697 1
        packet.append(pack_uatype('UInt8', self.Encoding))
698 1
        if self.Body:
699 1
            packet.append(pack_uatype('ByteString', self.Body))
700 1
        return b''.join(packet)
701
702 1
    @staticmethod
703
    def from_binary(data):
704
        obj = ExtensionObject()
705
        obj.TypeId = NodeId.from_binary(data)
706
        obj.Encoding = unpack_uatype('UInt8', data)
707
        if obj.Encoding & (1 << 0):
708
            obj.Body = unpack_uatype('ByteString', data)
709
        return obj
710
711 1
    @staticmethod
712
    def from_object(obj):
713
        ext = ExtensionObject()
714
        oid = getattr(ObjectIds, "{}_Encoding_DefaultBinary".format(obj.__class__.__name__))
715
        ext.TypeId = FourByteNodeId(oid)
716
        ext.Body = obj.to_binary()
717
        return ext
718
719 1
    def __str__(self):
720
        return 'ExtensionObject(' + 'TypeId:' + str(self.TypeId) + ', ' + \
721
            'Encoding:' + str(self.Encoding) + ', ' + str(len(self.Body)) + ' bytes)'
722
723 1
    __repr__ = __str__
724
725
726 1
class VariantType(Enum):
727
728
    '''
729
    The possible types of a variant.
730
731
    :ivar Null:
732
    :ivar Boolean:
733
    :ivar SByte:
734
    :ivar Byte:
735
    :ivar Int16:
736
    :ivar UInt16:
737
    :ivar Int32:
738
    :ivar UInt32:
739
    :ivar Int64:
740
    :ivar UInt64:
741
    :ivar Float:
742
    :ivar Double:
743
    :ivar String:
744
    :ivar DateTime:
745
    :ivar Guid:
746
    :ivar ByteString:
747
    :ivar XmlElement:
748
    :ivar NodeId:
749
    :ivar ExpandedNodeId:
750
    :ivar StatusCode:
751
    :ivar QualifiedName:
752
    :ivar LocalizedText:
753
    :ivar ExtensionObject:
754
    :ivar DataValue:
755
    :ivar Variant:
756
    :ivar DiagnosticInfo:
757
758
759
760
    '''
761 1
    Null = 0
762 1
    Boolean = 1
763 1
    SByte = 2
764 1
    Byte = 3
765 1
    Int16 = 4
766 1
    UInt16 = 5
767 1
    Int32 = 6
768 1
    UInt32 = 7
769 1
    Int64 = 8
770 1
    UInt64 = 9
771 1
    Float = 10
772 1
    Double = 11
773 1
    String = 12
774 1
    DateTime = 13
775 1
    Guid = 14
776 1
    ByteString = 15
777 1
    XmlElement = 16
778 1
    NodeId = 17
779 1
    ExpandedNodeId = 18
780 1
    StatusCode = 19
781 1
    QualifiedName = 20
782 1
    LocalizedText = 21
783 1
    ExtensionObject = 22
784 1
    DataValue = 23
785 1
    Variant = 24
786 1
    DiagnosticInfo = 25
787
788
789 1
class VariantTypeCustom(object):
790 1
    def __init__(self, val):
791 1
        self.name = "Custom"
792 1
        self.value = val
793 1
        if self.value > 0b00111111:
794 1
            raise UaError("Cannot create VariantType. VariantType must be %s > x > %s", 0b111111, 25)
795
796 1
    def __str__(self):
797
        return "VariantType.Custom:{}".format(self.value)
798 1
    __repr__ = __str__
799
800 1
    def __eq__(self, other):
801 1
        return self.value == other.value
802
803
804 1
class Variant(FrozenClass):
805
806
    """
807
    Create an OPC-UA Variant object.
808
    if no argument a Null Variant is created.
809
    if not variant type is given, attemps to guess type from python type
810
    if a variant is given as value, the new objects becomes a copy of the argument
811
812
    :ivar Value:
813
    :vartype Value: Any supported type
814
    :ivar VariantType:
815
    :vartype VariantType: VariantType
816
    """
817
818 1
    def __init__(self, value=None, varianttype=None, dimensions=None):
819 1
        self.Value = value
820 1
        self.VariantType = varianttype
821 1
        self.Dimensions = dimensions
822 1
        self._freeze = True
823 1
        if isinstance(value, Variant):
824 1
            self.Value = value.Value
825 1
            self.VariantType = value.VariantType
826 1
        if self.VariantType is None:
827 1
            self.VariantType = self._guess_type(self.Value)
828 1
        if self.Dimensions is None and type(self.Value) in (list, tuple):
829 1
            dims = get_shape(self.Value)
830 1
            if len(dims) > 1:
831 1
                self.Dimensions = dims
832
833 1
    def __eq__(self, other):
834 1
        if isinstance(other, Variant) and self.VariantType == other.VariantType and self.Value == other.Value:
835 1
            return True
836 1
        return False
837
838 1
    def _guess_type(self, val):
839 1
        while isinstance(val, (list, tuple)):
840 1
            if len(val) == 0:
841
                raise UaError("could not guess UA variable type")
842 1
            val = val[0]
843 1
        if val is None:
844 1
            return VariantType.Null
845 1
        elif isinstance(val, bool):
846 1
            return VariantType.Boolean
847 1
        elif isinstance(val, float):
848 1
            return VariantType.Double
849 1
        elif isinstance(val, int):
850 1
            return VariantType.Int64
851 1
        elif type(val) in (str, unicode):
852 1
            return VariantType.String
853 1
        elif isinstance(val, bytes):
854 1
            return VariantType.ByteString
855 1
        elif isinstance(val, datetime):
856 1
            return VariantType.DateTime
857
        else:
858 1
            if isinstance(val, object):
859 1
                try:
860 1
                    return getattr(VariantType, val.__class__.__name__)
861 1
                except AttributeError:
862 1
                    return VariantType.ExtensionObject
863
            else:
864
                raise UaError("Could not guess UA type of {} with type {}, specify UA type".format(val, type(val)))
865
866 1
    def __str__(self):
867 1
        return "Variant(val:{!s},type:{})".format(self.Value, self.VariantType)
868 1
    __repr__ = __str__
869
870 1
    def to_binary(self):
871 1
        b = []
872 1
        encoding = self.VariantType.value & 0b111111
873 1
        if type(self.Value) in (list, tuple):
874 1
            if self.Dimensions is not None:
875 1
                encoding = set_bit(encoding, 6)
876 1
            encoding = set_bit(encoding, 7)
877 1
            b.append(uatype_UInt8.pack(encoding))
878 1
            b.append(pack_uatype_array(self.VariantType.name, flatten(self.Value)))
879 1
            if self.Dimensions is not None:
880 1
                b.append(pack_uatype_array("Int32", self.Dimensions))
881
        else:
882 1
            b.append(uatype_UInt8.pack(encoding))
883 1
            b.append(pack_uatype(self.VariantType.name, self.Value))
884
885 1
        return b"".join(b)
886
887 1
    @staticmethod
888
    def from_binary(data):
889 1
        dimensions = None
890 1
        encoding = ord(data.read(1))
891 1
        int_type = encoding & 0b00111111
892 1
        if int_type > 25:
893 1
            vtype = VariantTypeCustom(int_type)
894
        else:
895 1
            vtype = VariantType(int_type)
896 1
        if vtype == VariantType.Null:
897 1
            return Variant(None, vtype, encoding)
898 1
        if test_bit(encoding, 7):
899 1
            value = unpack_uatype_array(vtype.name, data)
900
        else:
901 1
            value = unpack_uatype(vtype.name, data)
902 1
        if test_bit(encoding, 6):
903 1
            dimensions = unpack_uatype_array("Int32", data)
904 1
            value = reshape(value, dimensions)
905
906 1
        return Variant(value, vtype, dimensions)
907
908
909 1
def reshape(flat, dims):
910 1
    subdims = dims[1:]
911 1
    subsize = 1
912 1
    for i in subdims:
913 1
        if i == 0:
914 1
            i = 1
915 1
        subsize *= i
916 1
    while dims[0] * subsize > len(flat):
917 1
        flat.append([])
918 1
    if not subdims or subdims == [0]:
919 1
        return flat
920 1
    return [reshape(flat[i: i + subsize], subdims) for i in range(0, len(flat), subsize)]
921
922
923 1
def _split_list(l, n):
924
    n = max(1, n)
925
    return [l[i:i + n] for i in range(0, len(l), n)]
926
927
928 1
def flatten_and_get_shape(mylist):
929
    dims = []
930
    dims.append(len(mylist))
931
    while isinstance(mylist[0], (list, tuple)):
932
        dims.append(len(mylist[0]))
933
        mylist = [item for sublist in mylist for item in sublist]
934
        if len(mylist) == 0:
935
            break
936
    return mylist, dims
937
938
939 1
def flatten(mylist):
940 1
    if len(mylist) == 0:
941
        return mylist
942 1
    while isinstance(mylist[0], (list, tuple)):
943 1
        mylist = [item for sublist in mylist for item in sublist]
944 1
        if len(mylist) == 0:
945 1
            break
946 1
    return mylist
947
948
949 1
def get_shape(mylist):
950 1
    dims = []
951 1
    while isinstance(mylist, (list, tuple)):
952 1
        dims.append(len(mylist))
953 1
        if len(mylist) == 0:
954 1
            break
955 1
        mylist = mylist[0]
956 1
    return dims
957
958
959 1
class XmlElement(FrozenClass):
960
    '''
961
    An XML element encoded as an UTF-8 string.
962
    '''
963 1
    def __init__(self, binary=None):
964
        if binary is not None:
965
            self._binary_init(binary)
966
            self._freeze = True
967
            return
968
        self.Value = []
969
        self._freeze = True
970
971 1
    def to_binary(self):
972
        return pack_string(self.Value)
973
974 1
    @staticmethod
975
    def from_binary(data):
976
        return XmlElement(data)
977
978 1
    def _binary_init(self, data):
979
        self.Value = unpack_string(data)
980
981 1
    def __str__(self):
982
        return 'XmlElement(Value:' + str(self.Value) + ')'
983
984 1
    __repr__ = __str__
985
986
987
988
989 1
class DataValue(FrozenClass):
990
991
    '''
992
    A value with an associated timestamp, and quality.
993
    Automatically generated from xml , copied and modified here to fix errors in xml spec
994
995
    :ivar Value:
996
    :vartype Value: Variant
997
    :ivar StatusCode:
998
    :vartype StatusCode: StatusCode
999
    :ivar SourceTimestamp:
1000
    :vartype SourceTimestamp: datetime
1001
    :ivar SourcePicoSeconds:
1002
    :vartype SourcePicoSeconds: int
1003
    :ivar ServerTimestamp:
1004
    :vartype ServerTimestamp: datetime
1005
    :ivar ServerPicoseconds:
1006
    :vartype ServerPicoseconds: int
1007
1008
    '''
1009
1010 1
    def __init__(self, variant=None, status=None):
1011 1
        self.Encoding = 0
1012 1
        if not isinstance(variant, Variant):
1013 1
            variant = Variant(variant)
1014 1
        self.Value = variant
1015 1
        if status is None:
1016 1
            self.StatusCode = StatusCode()
1017
        else:
1018 1
            self.StatusCode = status
1019 1
        self.SourceTimestamp = None  # DateTime()
1020 1
        self.SourcePicoseconds = None
1021 1
        self.ServerTimestamp = None  # DateTime()
1022 1
        self.ServerPicoseconds = None
1023 1
        self._freeze = True
1024
1025 1
    def to_binary(self):
1026 1
        packet = []
1027 1
        if self.Value:
1028 1
            self.Encoding |= (1 << 0)
1029 1
        if self.StatusCode:
1030 1
            self.Encoding |= (1 << 1)
1031 1
        if self.SourceTimestamp:
1032 1
            self.Encoding |= (1 << 2)
1033 1
        if self.ServerTimestamp:
1034 1
            self.Encoding |= (1 << 3)
1035 1
        if self.SourcePicoseconds:
1036
            self.Encoding |= (1 << 4)
1037 1
        if self.ServerPicoseconds:
1038
            self.Encoding |= (1 << 5)
1039 1
        packet.append(uatype_UInt8.pack(self.Encoding))
1040 1
        if self.Value:
1041 1
            packet.append(self.Value.to_binary())
1042 1
        if self.StatusCode:
1043 1
            packet.append(self.StatusCode.to_binary())
1044 1
        if self.SourceTimestamp:
1045 1
            packet.append(pack_datetime(self.SourceTimestamp))  # self.SourceTimestamp.to_binary())
1046 1
        if self.ServerTimestamp:
1047 1
            packet.append(pack_datetime(self.ServerTimestamp))  # self.ServerTimestamp.to_binary())
1048 1
        if self.SourcePicoseconds:
1049
            packet.append(uatype_UInt16.pack(self.SourcePicoseconds))
1050 1
        if self.ServerPicoseconds:
1051
            packet.append(uatype_UInt16.pack(self.ServerPicoseconds))
1052 1
        return b''.join(packet)
1053
1054 1
    @staticmethod
1055
    def from_binary(data):
1056 1
        encoding = ord(data.read(1))
1057 1
        if encoding & (1 << 0):
1058 1
            value = Variant.from_binary(data)
1059
        else:
1060
            value = None
1061 1
        if encoding & (1 << 1):
1062 1
            status = StatusCode.from_binary(data)
1063
        else:
1064
            status = None
1065 1
        obj = DataValue(value, status)
1066 1
        obj.Encoding = encoding
1067 1
        if obj.Encoding & (1 << 2):
1068 1
            obj.SourceTimestamp = unpack_datetime(data)  # DateTime.from_binary(data)
1069 1
        if obj.Encoding & (1 << 3):
1070 1
            obj.ServerTimestamp = unpack_datetime(data)  # DateTime.from_binary(data)
1071 1
        if obj.Encoding & (1 << 4):
1072
            obj.SourcePicoseconds = uatype_UInt16.unpack(data.read(2))[0]
1073 1
        if obj.Encoding & (1 << 5):
1074
            obj.ServerPicoseconds = uatype_UInt16.unpack(data.read(2))[0]
1075 1
        return obj
1076
1077 1
    def __str__(self):
1078
        s = 'DataValue(Value:{}'.format(self.Value)
1079
        if self.StatusCode is not None:
1080
            s += ', StatusCode:{}'.format(self.StatusCode)
1081
        if self.SourceTimestamp is not None:
1082
            s += ', SourceTimestamp:{}'.format(self.SourceTimestamp)
1083
        if self.ServerTimestamp is not None:
1084
            s += ', ServerTimestamp:{}'.format(self.ServerTimestamp)
1085
        if self.SourcePicoseconds is not None:
1086
            s += ', SourcePicoseconds:{}'.format(self.SourcePicoseconds)
1087
        if self.ServerPicoseconds is not None:
1088
            s += ', ServerPicoseconds:{}'.format(self.ServerPicoseconds)
1089
        s += ')'
1090
        return s
1091
1092 1
    __repr__ = __str__
1093
1094
1095 1
__nodeid_counter = 2000
1096
1097
1098 1
def generate_nodeid(idx):
1099
    global __nodeid_counter
1100 1
    __nodeid_counter += 1
1101
    return NodeId(__nodeid_counter, idx)
1102