Completed
Push — dev ( 3d1b3a...abb445 )
by Olivier
02:17
created

opcua.NodeId.__hash__()   A

Complexity

Conditions 1

Size

Total Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1
Metric Value
dl 0
loc 2
ccs 2
cts 2
cp 1
rs 10
cc 1
crap 1
1
"""
2
implement ua datatypes
3
"""
4 1
import logging
5 1
from enum import Enum
6 1
from datetime import datetime, timedelta, tzinfo
7 1
from calendar import timegm
8 1
import sys
9 1
if sys.version_info.major > 2:
10 1
    unicode = str
11
12 1
import uuid
13 1
import struct
14
15 1
import opcua.status_code as status_code
16 1
from opcua.object_ids import ObjectIds
17
18 1
logger = logging.getLogger('opcua.uaprotocol')
19
20
# types that will packed and unpacked directly using struct (string, bytes and datetime are handles as special cases
21 1
UaTypes = ("Boolean", "SByte", "Byte", "Int8", "UInt8", "Int16", "UInt16", "Int32", "UInt32", "Int64", "UInt64", "Float", "Double")
22
23
24 1
EPOCH_AS_FILETIME = 116444736000000000  # January 1, 1970 as MS file time
25 1
HUNDREDS_OF_NANOSECONDS = 10000000
26 1
FILETIME_EPOCH_AS_DATETIME = datetime(1601, 1, 1)
27
28
29 1
class UTC(tzinfo):
30
31
    """UTC"""
32
33 1
    def utcoffset(self, dt):
34
        return timedelta(0)
35
36 1
    def tzname(self, dt):
37
        return "UTC"
38
39 1
    def dst(self, dt):
40 1
        return timedelta(0)
41
42
43
# method copied from David Buxton <[email protected]> sample code
44 1
def datetime_to_win_epoch(dt):
45 1
    if (dt.tzinfo is None) or (dt.tzinfo.utcoffset(dt) is None):
46 1
        dt = dt.replace(tzinfo=UTC())
47 1
    ft = EPOCH_AS_FILETIME + (timegm(dt.timetuple()) * HUNDREDS_OF_NANOSECONDS)
48 1
    return ft + (dt.microsecond * 10)
49
50
51 1
def win_epoch_to_datetime(epch):
52 1
    return FILETIME_EPOCH_AS_DATETIME + timedelta(microseconds=epch // 10)
53
54
55 1
def build_array_format_py2(prefix, length, fmtchar):
56
    return prefix + str(length) + fmtchar
57
58
59 1
def build_array_format_py3(prefix, length, fmtchar):
60 1
    return prefix + str(length) + chr(fmtchar)
61
62
63 1
if sys.version_info.major < 3:
64
    build_array_format = build_array_format_py2
65
else:
66 1
    build_array_format = build_array_format_py3
67
68
69 1
def pack_uatype_array_primitive(st, value, length):
70 1
    if length == 1:
71 1
        return b'\x01\x00\x00\x00' + st.pack(value[0])
72
    else:
73 1
        return struct.pack(build_array_format("<i", length, st.format[1]), length, *value)
74
75
76 1
def pack_uatype_array(uatype, value):
77 1
    if value is None:
78
        return b'\xff\xff\xff\xff'
79 1
    length = len(value)
80 1
    if length == 0:
81
        return b'\x00\x00\x00\x00'
82 1
    if uatype in uatype2struct:
83 1
        return pack_uatype_array_primitive(uatype2struct[uatype], value, length)
84 1
    b = []
85 1
    b.append(uatype_Int32.pack(length))
86 1
    for val in value:
87 1
        b.append(pack_uatype(uatype, val))
88 1
    return b"".join(b)
89
90
91 1
def pack_uatype(uatype, value):
92 1
    if uatype in uatype2struct:
93 1
        return uatype2struct[uatype].pack(value)
94 1
    elif uatype == "Null":
95 1
        return b''
96 1
    elif uatype == "String":
97 1
        return pack_string(value)
98 1
    elif uatype in ("CharArray", "ByteString"):
99 1
        return pack_bytes(value)
100 1
    elif uatype == "DateTime":
101 1
        return pack_datetime(value)
102 1
    elif uatype == "ExtensionObject":
103
        # dependency loop: classes in uaprotocol_auto use Variant defined in this file,
104
        # but Variant can contain any object from uaprotocol_auto as ExtensionObject.
105
        # Using local import to avoid import loop
106 1
        from opcua.uaprotocol_auto import extensionobject_to_binary
107 1
        return extensionobject_to_binary(value)
108
    else:
109 1
        return value.to_binary()
110
111 1
uatype_Int8 = struct.Struct("<b")
112 1
uatype_SByte = uatype_Int8
113 1
uatype_Int16 = struct.Struct("<h")
114 1
uatype_Int32 = struct.Struct("<i")
115 1
uatype_Int64 = struct.Struct("<q")
116 1
uatype_UInt8 = struct.Struct("<B")
117 1
uatype_Char = uatype_UInt8
118 1
uatype_Byte = uatype_UInt8
119 1
uatype_UInt16 = struct.Struct("<H")
120 1
uatype_UInt32 = struct.Struct("<I")
121 1
uatype_UInt64 = struct.Struct("<Q")
122 1
uatype_Boolean = struct.Struct("<?")
123 1
uatype_Double = struct.Struct("<d")
124 1
uatype_Float = struct.Struct("<f")
125
126 1
uatype2struct = {
127
    "Int8": uatype_Int8,
128
    "SByte": uatype_SByte,
129
    "Int16": uatype_Int16,
130
    "Int32": uatype_Int32,
131
    "Int64": uatype_Int64,
132
    "UInt8": uatype_UInt8,
133
    "Char": uatype_Char,
134
    "Byte": uatype_Byte,
135
    "UInt16": uatype_UInt16,
136
    "UInt32": uatype_UInt32,
137
    "UInt64": uatype_UInt64,
138
    "Boolean": uatype_Boolean,
139
    "Double": uatype_Double,
140
    "Float": uatype_Float,
141
}
142
143
144 1
def unpack_uatype(uatype, data):
145 1
    if uatype in uatype2struct:
146 1
        st = uatype2struct[uatype]
147 1
        return st.unpack(data.read(st.size))[0]
148 1
    elif uatype == "String":
149 1
        return unpack_string(data)
150 1
    elif uatype in ("CharArray", "ByteString"):
151 1
        return unpack_bytes(data)
152 1
    elif uatype == "DateTime":
153 1
        return unpack_datetime(data)
154 1
    elif uatype == "ExtensionObject":
155
        # dependency loop: classes in uaprotocol_auto use Variant defined in this file,
156
        # but Variant can contain any object from uaprotocol_auto as ExtensionObject.
157
        # Using local import to avoid import loop
158 1
        from opcua.uaprotocol_auto import extensionobject_from_binary
159 1
        return extensionobject_from_binary(data)
160
    else:
161 1
        glbs = globals()
162 1
        if uatype in glbs:
163 1
            klass = glbs[uatype]
164 1
            if hasattr(klass, 'from_binary'):
165 1
                return klass.from_binary(data)
166
        raise Exception("can not unpack unknown uatype %s" % uatype)
167
168
169 1
def unpack_uatype_array(uatype, data):
170 1
    length = uatype_Int32.unpack(data.read(4))[0]
171 1
    if length == -1:
172
        return None
173 1
    elif length == 0:
174 1
        return []
175 1
    elif uatype in uatype2struct:
176 1
        st = uatype2struct[uatype]
177 1
        if length == 1:
178 1
            return list(st.unpack(data.read(st.size)))
179
        else:
180 1
            arrst = struct.Struct(build_array_format("<", length, st.format[1]))
181 1
            return list(arrst.unpack(data.read(arrst.size)))
182
    else:
183 1
        result = []
184 1
        for _ in range(0, length):
185 1
            result.append(unpack_uatype(uatype, data))
186 1
        return result
187
188
189 1
def pack_datetime(value):
190 1
    epch = datetime_to_win_epoch(value)
191 1
    return uatype_Int64.pack(epch)
192
193
194 1
def unpack_datetime(data):
195 1
    epch = uatype_Int64.unpack(data.read(8))[0]
196 1
    return win_epoch_to_datetime(epch)
197
198
199 1
def pack_string(string):
200 1
    if type(string) is unicode:
201 1
        string = string.encode('utf-8')
202 1
    length = len(string)
203 1
    if length == 0:
204 1
        return b'\xff\xff\xff\xff'
205 1
    return uatype_Int32.pack(length) + string
206
207 1
pack_bytes = pack_string
208
209
210 1
def unpack_bytes(data):
211 1
    length = uatype_Int32.unpack(data.read(4))[0]
212 1
    if length == -1:
213 1
        return b''
214 1
    return data.read(length)
215
216
217 1
def py3_unpack_string(data):
218 1
    b = unpack_bytes(data)
219 1
    return b.decode("utf-8")
220
221
222 1
if sys.version_info.major < 3:
223
    unpack_string = unpack_bytes
224
else:
225 1
    unpack_string = py3_unpack_string
226
227
228 1
def test_bit(data, offset):
229 1
    mask = 1 << offset
230 1
    return data & mask
231
232
233 1
def set_bit(data, offset):
234
    mask = 1 << offset
235
    return data | mask
236
237
238 1
class FrozenClass(object):
239
240
    """
241
    make it impossible to add members to a class.
242
    This is a hack since I found out that most bugs are due to misspelling a variable in protocol
243
    """
244 1
    __isfrozen = False
245
246 1
    def __setattr__(self, key, value):
247 1
        if self.__isfrozen and not hasattr(self, key):
248
            raise TypeError("Error adding member '{}' to class '{}', class is frozen, members are {}".format(key, self.__class__.__name__, self.__dict__.keys()))
249 1
        object.__setattr__(self, key, value)
250
251 1
    def _freeze(self):
252 1
        self.__isfrozen = True
253
254
255 1
class Guid(FrozenClass):
256
257 1
    def __init__(self):
258 1
        self.uuid = uuid.uuid4()
259 1
        self._freeze()
260
261 1
    def to_binary(self):
262 1
        return self.uuid.bytes
263
264 1
    @staticmethod
265
    def from_binary(data):
266 1
        g = Guid()
267 1
        g.uuid = uuid.UUID(bytes=data.read(16))
268 1
        return g
269
270 1
    def __eq__(self, other):
271 1
        return isinstance(other, Guid) and self.uuid == other.uuid
272
273
274 1
class StatusCode(FrozenClass):
275
276
    """
277
    :ivar value:
278
    :vartype value: int
279
    :ivar name:
280
    :vartype name: string
281
    :ivar doc:
282
    :vartype doc: string
283
    """
284
285 1
    def __init__(self, value=0):
286 1
        self.value = value
287 1
        self.name, self.doc = status_code.get_name_and_doc(value)
288 1
        self._freeze()
289
290 1
    def to_binary(self):
291 1
        return uatype_UInt32.pack(self.value)
292
293 1
    @staticmethod
294
    def from_binary(data):
295 1
        val = uatype_UInt32.unpack(data.read(4))[0]
296 1
        sc = StatusCode(val)
297 1
        return sc
298
299 1
    def check(self):
300
        """
301
        raise en exception if status code is anything else than 0
302
        use is is_good() method if not exception is desired
303
        """
304 1
        if self.value != 0:
305 1
            raise Exception("{}({})".format(self.doc, self.name))
306
307 1
    def is_good(self):
308
        """
309
        return True if status is Good.
310
        """
311 1
        if self.value == 0:
312 1
            return True
313 1
        return False
314
315 1
    def __str__(self):
316
        return 'StatusCode({})'.format(self.name)
317 1
    __repr__ = __str__
318
319
320 1
class NodeIdType(object):
321 1
    TwoByte = 0
322 1
    FourByte = 1
323 1
    Numeric = 2
324 1
    String = 3
325 1
    Guid = 4
326 1
    ByteString = 5
327
328
329 1
class NodeId(FrozenClass):
330
331
    """
332
    NodeId Object
333
334
    Args:
335
        identifier: The identifier might be an int, a string, bytes or a Guid
336
        namespaceidx(int): The index of the namespace
337
        nodeidtype(NodeIdType): The type of the nodeid if it cannor be guess or you want something special like twobyte nodeid or fourbytenodeid
338
339
340
    :ivar Identifier:
341
    :vartype Identifier: NodeId
342
    :ivar NamespaceIndex:
343
    :vartype NamespaceIndex: Int
344
    :ivar NamespaceUri:
345
    :vartype NamespaceUri: String
346
    :ivar ServerIndex:
347
    :vartype ServerIndex: Int
348
    """
349
350 1
    def __init__(self, identifier=None, namespaceidx=0, nodeidtype=None):
351 1
        self.Identifier = identifier
352 1
        self.NamespaceIndex = namespaceidx
353 1
        self.NodeIdType = nodeidtype
354 1
        self.NamespaceUri = ""
355 1
        self.ServerIndex = 0
356 1
        self._freeze()
357 1
        if self.Identifier is None:
358 1
            self.Identifier = 0
359 1
            self.NodeIdType = NodeIdType.TwoByte
360 1
            return
361 1
        if self.NodeIdType is None:
362 1
            if isinstance(self.Identifier, int):
363 1
                self.NodeIdType = NodeIdType.Numeric
364 1
            elif isinstance(self.Identifier, str):
365 1
                self.NodeIdType = NodeIdType.String
366
            elif isinstance(self.Identifier, bytes):
367
                self.NodeIdType = NodeIdType.ByteString
368
            else:
369
                raise Exception("NodeId: Could not guess type of NodeId, set NodeIdType")
370
371 1
    def __key(self):
372 1
        if self.NodeIdType in (NodeIdType.TwoByte, NodeIdType.FourByte, NodeIdType.Numeric):  # twobyte, fourbyte and numeric may represent the same node
373 1
            return self.NamespaceIndex, self.Identifier
374
        else:
375 1
            return self.NodeIdType, self.NamespaceIndex, self.Identifier
376
377 1
    def __eq__(self, node):
378 1
        return isinstance(node, NodeId) and self.__key() == node.__key()
379
380 1
    def __hash__(self):
381 1
        return hash(self.__key())
382
383 1
    @staticmethod
384
    def from_string(string):
385 1
        l = string.split(";")
386 1
        identifier = None
387 1
        namespace = 0
388 1
        ntype = None
389 1
        srv = None
390 1
        nsu = None
391 1
        for el in l:
392 1
            if not el:
393 1
                continue
394 1
            k, v = el.split("=")
395 1
            k = k.strip()
396 1
            v = v.strip()
397 1
            if k == "ns":
398 1
                namespace = int(v)
399 1
            elif k == "i":
400 1
                ntype = NodeIdType.Numeric
401 1
                identifier = int(v)
402 1
            elif k == "s":
403 1
                ntype = NodeIdType.String
404 1
                identifier = v
405 1
            elif k == "g":
406
                ntype = NodeIdType.Guid
407
                identifier = v
408 1
            elif k == "b":
409
                ntype = NodeIdType.ByteString
410
                identifier = v
411 1
            elif k == "srv":
412 1
                srv = v
413
            elif k == "nsu":
414
                nsu = v
415 1
        if identifier is None:
416
            raise Exception("Could not parse nodeid string: " + string)
417 1
        nodeid = NodeId(identifier, namespace, ntype)
418 1
        nodeid.NamespaceUri = nsu
419 1
        nodeid.ServerIndex = srv
420 1
        return nodeid
421
422 1
    def to_string(self):
423 1
        string = ""
424 1
        if self.NamespaceIndex != 0:
425 1
            string += "ns={};".format(self.NamespaceIndex)
426 1
        ntype = None
427 1
        if self.NodeIdType == NodeIdType.Numeric:
428
            ntype = "i"
429 1
        elif self.NodeIdType == NodeIdType.String:
430 1
            ntype = "s"
431 1
        elif self.NodeIdType == NodeIdType.TwoByte:
432 1
            ntype = "i"
433
        elif self.NodeIdType == NodeIdType.FourByte:
434
            ntype = "i"
435
        elif self.NodeIdType == NodeIdType.Guid:
436
            ntype = "g"
437
        elif self.NodeIdType == NodeIdType.ByteString:
438
            ntype = "b"
439 1
        string += "{}={}".format(ntype, self.Identifier)
440 1
        if self.ServerIndex:
441
            string = "srv=" + str(self.ServerIndex) + string
442 1
        if self.NamespaceUri:
443
            string += "nsu={}".format(self.NamespaceUri)
444 1
        return string
445
446 1
    def __str__(self):
447 1
        return "NodeId({})".format(self.to_string())
448 1
    __repr__ = __str__
449
450 1
    def to_binary(self):
451 1
        if self.NodeIdType == NodeIdType.TwoByte:
452 1
            return struct.pack("<BB", self.NodeIdType, self.Identifier)
453 1
        elif self.NodeIdType == NodeIdType.FourByte:
454 1
            return struct.pack("<BBH", self.NodeIdType, self.NamespaceIndex, self.Identifier)
455 1
        elif self.NodeIdType == NodeIdType.Numeric:
456 1
            return struct.pack("<BHI", self.NodeIdType, self.NamespaceIndex, self.Identifier)
457 1
        elif self.NodeIdType == NodeIdType.String:
458 1
            return struct.pack("<BH", self.NodeIdType, self.NamespaceIndex) + \
459
                pack_string(self.Identifier)
460 1
        elif self.NodeIdType == NodeIdType.ByteString:
461 1
            return struct.pack("<BH", self.NodeIdType, self.NamespaceIndex) + \
462
                pack_bytes(self.Identifier)
463
        else:
464 1
            return struct.pack("<BH", self.NodeIdType, self.NamespaceIndex) + \
465
                self.Identifier.to_binary()
466
467 1
    @staticmethod
468
    def from_binary(data):
469 1
        nid = NodeId()
470 1
        encoding = ord(data.read(1))
471 1
        nid.NodeIdType = encoding & 0b00111111
472
473 1
        if nid.NodeIdType == NodeIdType.TwoByte:
474 1
            nid.Identifier = ord(data.read(1))
475 1
        elif nid.NodeIdType == NodeIdType.FourByte:
476 1
            nid.NamespaceIndex, nid.Identifier = struct.unpack("<BH", data.read(3))
477 1
        elif nid.NodeIdType == NodeIdType.Numeric:
478 1
            nid.NamespaceIndex, nid.Identifier = struct.unpack("<HI", data.read(6))
479 1
        elif nid.NodeIdType == NodeIdType.String:
480 1
            nid.NamespaceIndex = uatype_UInt16.unpack(data.read(2))[0]
481 1
            nid.Identifier = unpack_string(data)
482 1
        elif nid.NodeIdType == NodeIdType.ByteString:
483 1
            nid.NamespaceIndex = uatype_UInt16.unpack(data.read(2))[0]
484 1
            nid.Identifier = unpack_bytes(data)
485 1
        elif nid.NodeIdType == NodeIdType.Guid:
486 1
            nid.NamespaceIndex = uatype_UInt16.unpack(data.read(2))[0]
487 1
            nid.Identifier = Guid.from_binary(data)
488
        else:
489
            raise Exception("Unknown NodeId encoding: " + str(nid.NodeIdType))
490
491 1
        if test_bit(encoding, 6):
492
            nid.NamespaceUri = unpack_string(data)
493 1
        if test_bit(encoding, 7):
494
            nid.ServerIndex = uatype_UInt32.unpack(data.read(4))[0]
495
496 1
        return nid
497
498
499 1
class TwoByteNodeId(NodeId):
500
501 1
    def __init__(self, identifier):
502 1
        NodeId.__init__(self, identifier, 0, NodeIdType.TwoByte)
503
504
505 1
class FourByteNodeId(NodeId):
506
507 1
    def __init__(self, identifier, namespace=0):
508 1
        NodeId.__init__(self, identifier, namespace, NodeIdType.FourByte)
509
510
511 1
class NumericNodeId(NodeId):
512
513 1
    def __init__(self, identifier, namespace=0):
514 1
        NodeId.__init__(self, identifier, namespace, NodeIdType.Numeric)
515
516
517 1
class ByteStringNodeId(NodeId):
518
519 1
    def __init__(self, identifier, namespace=0):
520 1
        NodeId.__init__(self, identifier, namespace, NodeIdType.ByteString)
521
522
523 1
class GuidNodeId(NodeId):
524
525 1
    def __init__(self, identifier, namespace=0):
526 1
        NodeId.__init__(self, identifier, namespace, NodeIdType.Guid)
527
528
529 1
class StringNodeId(NodeId):
530
531 1
    def __init__(self, identifier, namespace=0):
532 1
        NodeId.__init__(self, identifier, namespace, NodeIdType.String)
533
534
535 1
ExpandedNodeId = NodeId
536
537
538 1
class QualifiedName(FrozenClass):
539
540
    '''
541
    A string qualified with a namespace index.
542
    '''
543
544 1
    def __init__(self, name="", namespaceidx=0):
545 1
        self.NamespaceIndex = namespaceidx
546 1
        self.Name = name
547 1
        self._freeze()
548
549 1
    def to_string(self):
550 1
        return "{}:{}".format(self.NamespaceIndex, self.Name)
551
552 1
    @staticmethod
553
    def from_string(string):
554 1
        if ":" in string:
555 1
            idx, name = string.split(":", 1)
556
        else:
557 1
            idx = 0
558 1
            name = string
559 1
        return QualifiedName(name, int(idx))
560
561 1
    def to_binary(self):
562 1
        packet = []
563 1
        packet.append(uatype_UInt16.pack(self.NamespaceIndex))
564 1
        packet.append(pack_string(self.Name))
565 1
        return b''.join(packet)
566
567 1
    @staticmethod
568
    def from_binary(data):
569 1
        obj = QualifiedName()
570 1
        obj.NamespaceIndex = uatype_UInt16.unpack(data.read(2))[0]
571 1
        obj.Name = unpack_string(data)
572 1
        return obj
573
574 1
    def __eq__(self, bname):
575 1
        return isinstance(bname, QualifiedName) and self.Name == bname.Name and self.NamespaceIndex == bname.NamespaceIndex
576
577 1
    def __str__(self):
578
        return 'QualifiedName({}:{})'.format(self.NamespaceIndex, self.Name)
579
580 1
    __repr__ = __str__
581
582
583 1
class LocalizedText(FrozenClass):
584
585
    '''
586
    A string qualified with a namespace index.
587
    '''
588
589 1
    def __init__(self, text=""):
590 1
        self.Encoding = 0
591 1
        self.Text = text
592 1
        if type(self.Text) is unicode:
593 1
            self.Text = self.Text.encode('utf-8')
594 1
        if self.Text:
595 1
            self.Encoding |= (1 << 1)
596 1
        self.Locale = b''
597 1
        self._freeze()
598
599 1
    def to_binary(self):
600 1
        packet = []
601 1
        if self.Locale:
602
            self.Encoding |= (1 << 0)
603 1
        if self.Text:
604 1
            self.Encoding |= (1 << 1)
605 1
        packet.append(uatype_UInt8.pack(self.Encoding))
606 1
        if self.Locale:
607
            packet.append(pack_bytes(self.Locale))
608 1
        if self.Text:
609 1
            packet.append(pack_bytes(self.Text))
610 1
        return b''.join(packet)
611
612 1
    @staticmethod
613
    def from_binary(data):
614 1
        obj = LocalizedText()
615 1
        obj.Encoding = ord(data.read(1))
616 1
        if obj.Encoding & (1 << 0):
617
            obj.Locale = unpack_bytes(data)
618 1
        if obj.Encoding & (1 << 1):
619 1
            obj.Text = unpack_bytes(data)
620 1
        return obj
621
622 1
    def to_string(self):
623
        # FIXME: use local
624
        return self.Text.decode()
625
626 1
    def __str__(self):
627
        return 'LocalizedText(' + 'Encoding:' + str(self.Encoding) + ', ' + \
628
            'Locale:' + str(self.Locale) + ', ' + \
629
            'Text:' + str(self.Text) + ')'
630 1
    __repr__ = __str__
631
632 1
    def __eq__(self, other):
633 1
        if isinstance(other, LocalizedText) and self.Locale == other.Locale and self.Text == other.Text:
634 1
            return True
635 1
        return False
636
637
638 1
class VariantType(Enum):
639
640
    '''
641
    The possible types of a variant.
642
643
    :ivar Null:
644
    :ivar Boolean:
645
    :ivar SByte:
646
    :ivar Byte:
647
    :ivar Int16:
648
    :ivar UInt16:
649
    :ivar Int32:
650
    :ivar UInt32:
651
    :ivar Int64:
652
    :ivar UInt64:
653
    :ivar Float:
654
    :ivar Double:
655
    :ivar String:
656
    :ivar DateTime:
657
    :ivar Guid:
658
    :ivar ByteString:
659
    :ivar XmlElement:
660
    :ivar NodeId:
661
    :ivar ExpandedNodeId:
662
    :ivar StatusCode:
663
    :ivar QualifiedName:
664
    :ivar LocalizedText:
665
    :ivar ExtensionObject:
666
    :ivar DataValue:
667
    :ivar Variant:
668
    :ivar DiagnosticInfo:
669
670
671
672
    '''
673 1
    Null = 0
674 1
    Boolean = 1
675 1
    SByte = 2
676 1
    Byte = 3
677 1
    Int16 = 4
678 1
    UInt16 = 5
679 1
    Int32 = 6
680 1
    UInt32 = 7
681 1
    Int64 = 8
682 1
    UInt64 = 9
683 1
    Float = 10
684 1
    Double = 11
685 1
    String = 12
686 1
    DateTime = 13
687 1
    Guid = 14
688 1
    ByteString = 15
689 1
    XmlElement = 16
690 1
    NodeId = 17
691 1
    ExpandedNodeId = 18
692 1
    StatusCode = 19
693 1
    QualifiedName = 20
694 1
    LocalizedText = 21
695 1
    ExtensionObject = 22
696 1
    DataValue = 23
697 1
    Variant = 24
698 1
    DiagnosticInfo = 25
699
700
701 1
class Variant(FrozenClass):
702
703
    """
704
    Create an OPC-UA Variant object.
705
    if no argument a Null Variant is created.
706
    if not variant type is given, attemps to guess type from python type
707
    if a variant is given as value, the new objects becomes a copy of the argument
708
709
    :ivar Value:
710
    :vartype Value: Any supported type
711
    :ivar VariantType:
712
    :vartype VariantType: VariantType
713
    """
714
715 1
    def __init__(self, value=None, varianttype=None, encoding=0):
716 1
        self.Encoding = encoding
717 1
        self.Value = value
718 1
        self.VariantType = varianttype
719 1
        if isinstance(value, Variant):
720 1
            self.Value = value.Value
721 1
            self.VariantType = value.VariantType
722 1
        if self.VariantType is None:
723 1
            if type(self.Value) in (list, tuple):
724 1
                if len(self.Value) == 0:
725
                    raise Exception("could not guess UA variable type")
726 1
                self.VariantType = self._guess_type(self.Value[0])
727
            else:
728 1
                self.VariantType = self._guess_type(self.Value)
729 1
        self._freeze()
730
731 1
    def __eq__(self, other):
732 1
        if isinstance(other, Variant) and self.VariantType == other.VariantType and self.Value == other.Value:
733 1
            return True
734 1
        return False
735
736 1
    def _guess_type(self, val):
737 1
        if val is None:
738 1
            return VariantType.Null
739 1
        elif isinstance(val, bool):
740 1
            return VariantType.Boolean
741 1
        elif isinstance(val, float):
742 1
            return VariantType.Double
743 1
        elif isinstance(val, int):
744 1
            return VariantType.Int64
745 1
        elif type(val) in (str, unicode):
746 1
            return VariantType.String
747 1
        elif isinstance(val, bytes):
748 1
            return VariantType.ByteString
749 1
        elif isinstance(val, datetime):
750 1
            return VariantType.DateTime
751
        else:
752 1
            if isinstance(val, object):
753 1
                try:
754 1
                    return getattr(VariantType, val.__class__.__name__)
755 1
                except AttributeError:
756 1
                    return VariantType.ExtensionObject
757
            else:
758
                raise Exception("Could not guess UA type of {} with type {}, specify UA type".format(val, type(val)))
759
760 1
    def __str__(self):
761 1
        return "Variant(val:{},type:{})".format(self.Value, self.VariantType)
762 1
    __repr__ = __str__
763
764 1
    def to_binary(self):
765 1
        b = []
766 1
        mask = self.Encoding & 0b01111111
767 1
        self.Encoding = (self.VariantType.value | mask)
768 1
        if type(self.Value) in (list, tuple):
769 1
            self.Encoding |= (1 << 7)
770 1
            b.append(uatype_UInt8.pack(self.Encoding))
771 1
            b.append(pack_uatype_array(self.VariantType.name, self.Value))
772
        else:
773 1
            b.append(uatype_UInt8.pack(self.Encoding))
774 1
            b.append(pack_uatype(self.VariantType.name, self.Value))
775 1
        return b"".join(b)
776
777 1
    @staticmethod
778
    def from_binary(data):
779 1
        encoding = ord(data.read(1))
780 1
        vtype = VariantType(encoding & 0b01111111)
781 1
        if vtype == VariantType.Null:
782 1
            return Variant(None, vtype, encoding)
783 1
        if encoding & (1 << 7):
784 1
            value = unpack_uatype_array(vtype.name, data)
785
        else:
786 1
            value = unpack_uatype(vtype.name, data)
787 1
        return Variant(value, vtype, encoding)
788
789
790 1
class DataValue(FrozenClass):
791
792
    '''
793
    A value with an associated timestamp, and quality.
794
    Automatically generated from xml , copied and modified here to fix errors in xml spec
795
796
    :ivar Value:
797
    :vartype Value: Variant
798
    :ivar StatusCode:
799
    :vartype StatusCode: StatusCode
800
    :ivar SourceTimestamp:
801
    :vartype SourceTimestamp: datetime
802
    :ivar SourcePicoSeconds:
803
    :vartype SourcePicoSeconds: int
804
    :ivar ServerTimestamp:
805
    :vartype ServerTimestamp: datetime
806
    :ivar ServerPicoseconds:
807
    :vartype ServerPicoseconds: int
808
809
    '''
810
811 1
    def __init__(self, variant=None, status=None):
812 1
        self.Encoding = 0
813 1
        if not isinstance(variant, Variant):
814 1
            variant = Variant(variant)
815 1
        self.Value = variant
816 1
        if status is None:
817 1
            self.StatusCode = StatusCode()
818
        else:
819 1
            self.StatusCode = status
820 1
        self.SourceTimestamp = None  # DateTime()
821 1
        self.SourcePicoseconds = None
822 1
        self.ServerTimestamp = None  # DateTime()
823 1
        self.ServerPicoseconds = None
824 1
        self._freeze()
825
826 1
    def to_binary(self):
827 1
        packet = []
828 1
        if self.Value:
829 1
            self.Encoding |= (1 << 0)
830 1
        if self.StatusCode:
831 1
            self.Encoding |= (1 << 1)
832 1
        if self.SourceTimestamp:
833 1
            self.Encoding |= (1 << 2)
834 1
        if self.ServerTimestamp:
835 1
            self.Encoding |= (1 << 3)
836 1
        if self.SourcePicoseconds:
837
            self.Encoding |= (1 << 4)
838 1
        if self.ServerPicoseconds:
839
            self.Encoding |= (1 << 5)
840 1
        packet.append(uatype_UInt8.pack(self.Encoding))
841 1
        if self.Value:
842 1
            packet.append(self.Value.to_binary())
843 1
        if self.StatusCode:
844 1
            packet.append(self.StatusCode.to_binary())
845 1
        if self.SourceTimestamp:
846 1
            packet.append(pack_datetime(self.SourceTimestamp))  # self.SourceTimestamp.to_binary())
847 1
        if self.ServerTimestamp:
848 1
            packet.append(pack_datetime(self.ServerTimestamp))  # self.ServerTimestamp.to_binary())
849 1
        if self.SourcePicoseconds:
850
            packet.append(uatype_UInt16.pack(self.SourcePicoseconds))
851 1
        if self.ServerPicoseconds:
852
            packet.append(uatype_UInt16.pack(self.ServerPicoseconds))
853 1
        return b''.join(packet)
854
855 1
    @staticmethod
856
    def from_binary(data):
857 1
        encoding = ord(data.read(1))
858 1
        if encoding & (1 << 0):
859 1
            value = Variant.from_binary(data)
860
        else:
861
            value = None
862 1
        if encoding & (1 << 1):
863 1
            status = StatusCode.from_binary(data)
864
        else:
865
            status = None
866 1
        obj = DataValue(value, status)
867 1
        obj.Encoding = encoding
868 1
        if obj.Encoding & (1 << 2):
869 1
            obj.SourceTimestamp = unpack_datetime(data)  # DateTime.from_binary(data)
870 1
        if obj.Encoding & (1 << 3):
871 1
            obj.ServerTimestamp = unpack_datetime(data)  # DateTime.from_binary(data)
872 1
        if obj.Encoding & (1 << 4):
873
            obj.SourcePicoseconds = uatype_UInt16.unpack(data.read(2))[0]
874 1
        if obj.Encoding & (1 << 5):
875
            obj.ServerPicoseconds = uatype_UInt16.unpack(data.read(2))[0]
876 1
        return obj
877
878 1
    def __str__(self):
879
        s = 'DataValue(Value:{}'.format(self.Value)
880
        if self.StatusCode is not None:
881
            s += ', StatusCode:{}'.format(self.StatusCode)
882
        if self.SourceTimestamp is not None:
883
            s += ', SourceTimestamp:{}'.format(self.SourceTimestamp)
884
        if self.ServerTimestamp is not None:
885
            s += ', ServerTimestamp:{}'.format(self.ServerTimestamp)
886
        if self.SourcePicoseconds is not None:
887
            s += ', SourcePicoseconds:{}'.format(self.SourcePicoseconds)
888
        if self.ServerPicoseconds is not None:
889
            s += ', ServerPicoseconds:{}'.format(self.ServerPicoseconds)
890
        s += ')'
891
        return s
892
893 1
    __repr__ = __str__
894
895
896 1
__nodeid_counter = 2000
897
898
899 1
def generate_nodeid(idx):
900
    global __nodeid_counter
901 1
    __nodeid_counter += 1
902
    return NodeId(__nodeid_counter, idx)
903