Completed
Push — master ( abb445...558fdd )
by Olivier
02:23
created

opcua._FrozenClass.__setattr__()   A

Complexity

Conditions 3

Size

Total Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 3.1406
Metric Value
dl 0
loc 4
ccs 3
cts 4
cp 0.75
rs 10
cc 3
crap 3.1406
1
"""
2
implement ua datatypes
3
"""
4 1
import logging
5 1
from enum import Enum
6 1
from datetime import datetime, timedelta, tzinfo
7 1
from calendar import timegm
8 1
import sys
9 1
import os
10 1
if sys.version_info.major > 2:
11 1
    unicode = str
12
13 1
import uuid
14 1
import struct
15
16 1
import opcua.status_code as status_code
17 1
from opcua.object_ids import ObjectIds
18
19 1
logger = logging.getLogger('opcua.uaprotocol')
20
21
# types that will packed and unpacked directly using struct (string, bytes and datetime are handles as special cases
22 1
UaTypes = ("Boolean", "SByte", "Byte", "Int8", "UInt8", "Int16", "UInt16", "Int32", "UInt32", "Int64", "UInt64", "Float", "Double")
23
24
25 1
EPOCH_AS_FILETIME = 116444736000000000  # January 1, 1970 as MS file time
26 1
HUNDREDS_OF_NANOSECONDS = 10000000
27 1
FILETIME_EPOCH_AS_DATETIME = datetime(1601, 1, 1)
28
29
30 1
class UTC(tzinfo):
31
32
    """UTC"""
33
34 1
    def utcoffset(self, dt):
35
        return timedelta(0)
36
37 1
    def tzname(self, dt):
38
        return "UTC"
39
40 1
    def dst(self, dt):
41 1
        return timedelta(0)
42
43
44
# method copied from David Buxton <[email protected]> sample code
45 1
def datetime_to_win_epoch(dt):
46 1
    if (dt.tzinfo is None) or (dt.tzinfo.utcoffset(dt) is None):
47 1
        dt = dt.replace(tzinfo=UTC())
48 1
    ft = EPOCH_AS_FILETIME + (timegm(dt.timetuple()) * HUNDREDS_OF_NANOSECONDS)
49 1
    return ft + (dt.microsecond * 10)
50
51
52 1
def win_epoch_to_datetime(epch):
53 1
    return FILETIME_EPOCH_AS_DATETIME + timedelta(microseconds=epch // 10)
54
55
56 1
def build_array_format_py2(prefix, length, fmtchar):
57
    return prefix + str(length) + fmtchar
58
59
60 1
def build_array_format_py3(prefix, length, fmtchar):
61 1
    return prefix + str(length) + chr(fmtchar)
62
63
64 1
if sys.version_info.major < 3:
65
    build_array_format = build_array_format_py2
66
else:
67 1
    build_array_format = build_array_format_py3
68
69
70 1
def pack_uatype_array_primitive(st, value, length):
71 1
    if length == 1:
72 1
        return b'\x01\x00\x00\x00' + st.pack(value[0])
73
    else:
74 1
        return struct.pack(build_array_format("<i", length, st.format[1]), length, *value)
75
76
77 1
def pack_uatype_array(uatype, value):
78 1
    if value is None:
79
        return b'\xff\xff\xff\xff'
80 1
    length = len(value)
81 1
    if length == 0:
82
        return b'\x00\x00\x00\x00'
83 1
    if uatype in uatype2struct:
84 1
        return pack_uatype_array_primitive(uatype2struct[uatype], value, length)
85 1
    b = []
86 1
    b.append(uatype_Int32.pack(length))
87 1
    for val in value:
88 1
        b.append(pack_uatype(uatype, val))
89 1
    return b"".join(b)
90
91
92 1
def pack_uatype(uatype, value):
93 1
    if uatype in uatype2struct:
94 1
        return uatype2struct[uatype].pack(value)
95 1
    elif uatype == "Null":
96 1
        return b''
97 1
    elif uatype == "String":
98 1
        return pack_string(value)
99 1
    elif uatype in ("CharArray", "ByteString"):
100 1
        return pack_bytes(value)
101 1
    elif uatype == "DateTime":
102 1
        return pack_datetime(value)
103 1
    elif uatype == "ExtensionObject":
104
        # dependency loop: classes in uaprotocol_auto use Variant defined in this file,
105
        # but Variant can contain any object from uaprotocol_auto as ExtensionObject.
106
        # Using local import to avoid import loop
107 1
        from opcua.uaprotocol_auto import extensionobject_to_binary
108 1
        return extensionobject_to_binary(value)
109
    else:
110 1
        return value.to_binary()
111
112 1
uatype_Int8 = struct.Struct("<b")
113 1
uatype_SByte = uatype_Int8
114 1
uatype_Int16 = struct.Struct("<h")
115 1
uatype_Int32 = struct.Struct("<i")
116 1
uatype_Int64 = struct.Struct("<q")
117 1
uatype_UInt8 = struct.Struct("<B")
118 1
uatype_Char = uatype_UInt8
119 1
uatype_Byte = uatype_UInt8
120 1
uatype_UInt16 = struct.Struct("<H")
121 1
uatype_UInt32 = struct.Struct("<I")
122 1
uatype_UInt64 = struct.Struct("<Q")
123 1
uatype_Boolean = struct.Struct("<?")
124 1
uatype_Double = struct.Struct("<d")
125 1
uatype_Float = struct.Struct("<f")
126
127 1
uatype2struct = {
128
    "Int8": uatype_Int8,
129
    "SByte": uatype_SByte,
130
    "Int16": uatype_Int16,
131
    "Int32": uatype_Int32,
132
    "Int64": uatype_Int64,
133
    "UInt8": uatype_UInt8,
134
    "Char": uatype_Char,
135
    "Byte": uatype_Byte,
136
    "UInt16": uatype_UInt16,
137
    "UInt32": uatype_UInt32,
138
    "UInt64": uatype_UInt64,
139
    "Boolean": uatype_Boolean,
140
    "Double": uatype_Double,
141
    "Float": uatype_Float,
142
}
143
144
145 1
def unpack_uatype(uatype, data):
146 1
    if uatype in uatype2struct:
147 1
        st = uatype2struct[uatype]
148 1
        return st.unpack(data.read(st.size))[0]
149 1
    elif uatype == "String":
150 1
        return unpack_string(data)
151 1
    elif uatype in ("CharArray", "ByteString"):
152 1
        return unpack_bytes(data)
153 1
    elif uatype == "DateTime":
154 1
        return unpack_datetime(data)
155 1
    elif uatype == "ExtensionObject":
156
        # dependency loop: classes in uaprotocol_auto use Variant defined in this file,
157
        # but Variant can contain any object from uaprotocol_auto as ExtensionObject.
158
        # Using local import to avoid import loop
159 1
        from opcua.uaprotocol_auto import extensionobject_from_binary
160 1
        return extensionobject_from_binary(data)
161
    else:
162 1
        glbs = globals()
163 1
        if uatype in glbs:
164 1
            klass = glbs[uatype]
165 1
            if hasattr(klass, 'from_binary'):
166 1
                return klass.from_binary(data)
167
        raise Exception("can not unpack unknown uatype %s" % uatype)
168
169
170 1
def unpack_uatype_array(uatype, data):
171 1
    length = uatype_Int32.unpack(data.read(4))[0]
172 1
    if length == -1:
173
        return None
174 1
    elif length == 0:
175 1
        return []
176 1
    elif uatype in uatype2struct:
177 1
        st = uatype2struct[uatype]
178 1
        if length == 1:
179 1
            return list(st.unpack(data.read(st.size)))
180
        else:
181 1
            arrst = struct.Struct(build_array_format("<", length, st.format[1]))
182 1
            return list(arrst.unpack(data.read(arrst.size)))
183
    else:
184 1
        result = []
185 1
        for _ in range(0, length):
186 1
            result.append(unpack_uatype(uatype, data))
187 1
        return result
188
189
190 1
def pack_datetime(value):
191 1
    epch = datetime_to_win_epoch(value)
192 1
    return uatype_Int64.pack(epch)
193
194
195 1
def unpack_datetime(data):
196 1
    epch = uatype_Int64.unpack(data.read(8))[0]
197 1
    return win_epoch_to_datetime(epch)
198
199
200 1
def pack_string(string):
201 1
    if type(string) is unicode:
202 1
        string = string.encode('utf-8')
203 1
    length = len(string)
204 1
    if length == 0:
205 1
        return b'\xff\xff\xff\xff'
206 1
    return uatype_Int32.pack(length) + string
207
208 1
pack_bytes = pack_string
209
210
211 1
def unpack_bytes(data):
212 1
    length = uatype_Int32.unpack(data.read(4))[0]
213 1
    if length == -1:
214 1
        return b''
215 1
    return data.read(length)
216
217
218 1
def py3_unpack_string(data):
219 1
    b = unpack_bytes(data)
220 1
    return b.decode("utf-8")
221
222
223 1
if sys.version_info.major < 3:
224
    unpack_string = unpack_bytes
225
else:
226 1
    unpack_string = py3_unpack_string
227
228
229 1
def test_bit(data, offset):
230 1
    mask = 1 << offset
231 1
    return data & mask
232
233
234 1
def set_bit(data, offset):
235
    mask = 1 << offset
236
    return data | mask
237
238
239 1
class _FrozenClass(object):
240
241
    """
242
    make it impossible to add members to a class.
243
    This is a hack since I found out that most bugs are due to misspelling a variable in protocol
244
    """
245 1
    _freeze = False
246
247 1
    def __setattr__(self, key, value):
248 1
        if self._freeze and not hasattr(self, key):
249
            raise TypeError("Error adding member '{}' to class '{}', class is frozen, members are {}".format(key, self.__class__.__name__, self.__dict__.keys()))
250 1
        object.__setattr__(self, key, value)
251
252 1
if "PYOPCUA_NO_TYPO_CHECK" in os.environ:
253
    # typo check is cpu consuming, but it will make debug easy.
254
    # if typo check is not need (in production), please set env PYOPCUA_NO_TYPO_CHECK.
255
    # this will make all uatype class inherit from object intead of _FrozenClass
256
    # and skip the typo check.
257
    FrozenClass = object
258
else:
259 1
    FrozenClass = _FrozenClass
260
261
262 1
class Guid(FrozenClass):
263
264 1
    def __init__(self):
265 1
        self.uuid = uuid.uuid4()
266 1
        self._freeze = True
267
268 1
    def to_binary(self):
269 1
        return self.uuid.bytes
270
271 1
    @staticmethod
272
    def from_binary(data):
273 1
        g = Guid()
274 1
        g.uuid = uuid.UUID(bytes=data.read(16))
275 1
        return g
276
277 1
    def __eq__(self, other):
278 1
        return isinstance(other, Guid) and self.uuid == other.uuid
279
280
281 1
class StatusCode(FrozenClass):
282
283
    """
284
    :ivar value:
285
    :vartype value: int
286
    :ivar name:
287
    :vartype name: string
288
    :ivar doc:
289
    :vartype doc: string
290
    """
291
292 1
    def __init__(self, value=0):
293 1
        self.value = value
294 1
        self.name, self.doc = status_code.get_name_and_doc(value)
295 1
        self._freeze = True
296
297 1
    def to_binary(self):
298 1
        return uatype_UInt32.pack(self.value)
299
300 1
    @staticmethod
301
    def from_binary(data):
302 1
        val = uatype_UInt32.unpack(data.read(4))[0]
303 1
        sc = StatusCode(val)
304 1
        return sc
305
306 1
    def check(self):
307
        """
308
        raise en exception if status code is anything else than 0
309
        use is is_good() method if not exception is desired
310
        """
311 1
        if self.value != 0:
312 1
            raise Exception("{}({})".format(self.doc, self.name))
313
314 1
    def is_good(self):
315
        """
316
        return True if status is Good.
317
        """
318 1
        if self.value == 0:
319 1
            return True
320 1
        return False
321
322 1
    def __str__(self):
323
        return 'StatusCode({})'.format(self.name)
324 1
    __repr__ = __str__
325
326
327 1
class NodeIdType(object):
328 1
    TwoByte = 0
329 1
    FourByte = 1
330 1
    Numeric = 2
331 1
    String = 3
332 1
    Guid = 4
333 1
    ByteString = 5
334
335
336 1
class NodeId(FrozenClass):
337
338
    """
339
    NodeId Object
340
341
    Args:
342
        identifier: The identifier might be an int, a string, bytes or a Guid
343
        namespaceidx(int): The index of the namespace
344
        nodeidtype(NodeIdType): The type of the nodeid if it cannor be guess or you want something special like twobyte nodeid or fourbytenodeid
345
346
347
    :ivar Identifier:
348
    :vartype Identifier: NodeId
349
    :ivar NamespaceIndex:
350
    :vartype NamespaceIndex: Int
351
    :ivar NamespaceUri:
352
    :vartype NamespaceUri: String
353
    :ivar ServerIndex:
354
    :vartype ServerIndex: Int
355
    """
356
357 1
    def __init__(self, identifier=None, namespaceidx=0, nodeidtype=None):
358 1
        self.Identifier = identifier
359 1
        self.NamespaceIndex = namespaceidx
360 1
        self.NodeIdType = nodeidtype
361 1
        self.NamespaceUri = ""
362 1
        self.ServerIndex = 0
363 1
        self._freeze = True
364 1
        if self.Identifier is None:
365 1
            self.Identifier = 0
366 1
            self.NodeIdType = NodeIdType.TwoByte
367 1
            return
368 1
        if self.NodeIdType is None:
369 1
            if isinstance(self.Identifier, int):
370 1
                self.NodeIdType = NodeIdType.Numeric
371 1
            elif isinstance(self.Identifier, str):
372 1
                self.NodeIdType = NodeIdType.String
373
            elif isinstance(self.Identifier, bytes):
374
                self.NodeIdType = NodeIdType.ByteString
375
            else:
376
                raise Exception("NodeId: Could not guess type of NodeId, set NodeIdType")
377
378 1
    def __key(self):
379 1
        if self.NodeIdType in (NodeIdType.TwoByte, NodeIdType.FourByte, NodeIdType.Numeric):  # twobyte, fourbyte and numeric may represent the same node
380 1
            return self.NamespaceIndex, self.Identifier
381
        else:
382 1
            return self.NodeIdType, self.NamespaceIndex, self.Identifier
383
384 1
    def __eq__(self, node):
385 1
        return isinstance(node, NodeId) and self.__key() == node.__key()
386
387 1
    def __hash__(self):
388 1
        return hash(self.__key())
389
390 1
    @staticmethod
391
    def from_string(string):
392 1
        l = string.split(";")
393 1
        identifier = None
394 1
        namespace = 0
395 1
        ntype = None
396 1
        srv = None
397 1
        nsu = None
398 1
        for el in l:
399 1
            if not el:
400 1
                continue
401 1
            k, v = el.split("=")
402 1
            k = k.strip()
403 1
            v = v.strip()
404 1
            if k == "ns":
405 1
                namespace = int(v)
406 1
            elif k == "i":
407 1
                ntype = NodeIdType.Numeric
408 1
                identifier = int(v)
409 1
            elif k == "s":
410 1
                ntype = NodeIdType.String
411 1
                identifier = v
412 1
            elif k == "g":
413
                ntype = NodeIdType.Guid
414
                identifier = v
415 1
            elif k == "b":
416
                ntype = NodeIdType.ByteString
417
                identifier = v
418 1
            elif k == "srv":
419 1
                srv = v
420
            elif k == "nsu":
421
                nsu = v
422 1
        if identifier is None:
423
            raise Exception("Could not parse nodeid string: " + string)
424 1
        nodeid = NodeId(identifier, namespace, ntype)
425 1
        nodeid.NamespaceUri = nsu
426 1
        nodeid.ServerIndex = srv
427 1
        return nodeid
428
429 1
    def to_string(self):
430 1
        string = ""
431 1
        if self.NamespaceIndex != 0:
432 1
            string += "ns={};".format(self.NamespaceIndex)
433 1
        ntype = None
434 1
        if self.NodeIdType == NodeIdType.Numeric:
435
            ntype = "i"
436 1
        elif self.NodeIdType == NodeIdType.String:
437 1
            ntype = "s"
438 1
        elif self.NodeIdType == NodeIdType.TwoByte:
439 1
            ntype = "i"
440
        elif self.NodeIdType == NodeIdType.FourByte:
441
            ntype = "i"
442
        elif self.NodeIdType == NodeIdType.Guid:
443
            ntype = "g"
444
        elif self.NodeIdType == NodeIdType.ByteString:
445
            ntype = "b"
446 1
        string += "{}={}".format(ntype, self.Identifier)
447 1
        if self.ServerIndex:
448
            string = "srv=" + str(self.ServerIndex) + string
449 1
        if self.NamespaceUri:
450
            string += "nsu={}".format(self.NamespaceUri)
451 1
        return string
452
453 1
    def __str__(self):
454 1
        return "NodeId({})".format(self.to_string())
455 1
    __repr__ = __str__
456
457 1
    def to_binary(self):
458 1
        if self.NodeIdType == NodeIdType.TwoByte:
459 1
            return struct.pack("<BB", self.NodeIdType, self.Identifier)
460 1
        elif self.NodeIdType == NodeIdType.FourByte:
461 1
            return struct.pack("<BBH", self.NodeIdType, self.NamespaceIndex, self.Identifier)
462 1
        elif self.NodeIdType == NodeIdType.Numeric:
463 1
            return struct.pack("<BHI", self.NodeIdType, self.NamespaceIndex, self.Identifier)
464 1
        elif self.NodeIdType == NodeIdType.String:
465 1
            return struct.pack("<BH", self.NodeIdType, self.NamespaceIndex) + \
466
                pack_string(self.Identifier)
467 1
        elif self.NodeIdType == NodeIdType.ByteString:
468 1
            return struct.pack("<BH", self.NodeIdType, self.NamespaceIndex) + \
469
                pack_bytes(self.Identifier)
470
        else:
471 1
            return struct.pack("<BH", self.NodeIdType, self.NamespaceIndex) + \
472
                self.Identifier.to_binary()
473
474 1
    @staticmethod
475
    def from_binary(data):
476 1
        nid = NodeId()
477 1
        encoding = ord(data.read(1))
478 1
        nid.NodeIdType = encoding & 0b00111111
479
480 1
        if nid.NodeIdType == NodeIdType.TwoByte:
481 1
            nid.Identifier = ord(data.read(1))
482 1
        elif nid.NodeIdType == NodeIdType.FourByte:
483 1
            nid.NamespaceIndex, nid.Identifier = struct.unpack("<BH", data.read(3))
484 1
        elif nid.NodeIdType == NodeIdType.Numeric:
485 1
            nid.NamespaceIndex, nid.Identifier = struct.unpack("<HI", data.read(6))
486 1
        elif nid.NodeIdType == NodeIdType.String:
487 1
            nid.NamespaceIndex = uatype_UInt16.unpack(data.read(2))[0]
488 1
            nid.Identifier = unpack_string(data)
489 1
        elif nid.NodeIdType == NodeIdType.ByteString:
490 1
            nid.NamespaceIndex = uatype_UInt16.unpack(data.read(2))[0]
491 1
            nid.Identifier = unpack_bytes(data)
492 1
        elif nid.NodeIdType == NodeIdType.Guid:
493 1
            nid.NamespaceIndex = uatype_UInt16.unpack(data.read(2))[0]
494 1
            nid.Identifier = Guid.from_binary(data)
495
        else:
496
            raise Exception("Unknown NodeId encoding: " + str(nid.NodeIdType))
497
498 1
        if test_bit(encoding, 6):
499
            nid.NamespaceUri = unpack_string(data)
500 1
        if test_bit(encoding, 7):
501
            nid.ServerIndex = uatype_UInt32.unpack(data.read(4))[0]
502
503 1
        return nid
504
505
506 1
class TwoByteNodeId(NodeId):
507
508 1
    def __init__(self, identifier):
509 1
        NodeId.__init__(self, identifier, 0, NodeIdType.TwoByte)
510
511
512 1
class FourByteNodeId(NodeId):
513
514 1
    def __init__(self, identifier, namespace=0):
515 1
        NodeId.__init__(self, identifier, namespace, NodeIdType.FourByte)
516
517
518 1
class NumericNodeId(NodeId):
519
520 1
    def __init__(self, identifier, namespace=0):
521 1
        NodeId.__init__(self, identifier, namespace, NodeIdType.Numeric)
522
523
524 1
class ByteStringNodeId(NodeId):
525
526 1
    def __init__(self, identifier, namespace=0):
527 1
        NodeId.__init__(self, identifier, namespace, NodeIdType.ByteString)
528
529
530 1
class GuidNodeId(NodeId):
531
532 1
    def __init__(self, identifier, namespace=0):
533 1
        NodeId.__init__(self, identifier, namespace, NodeIdType.Guid)
534
535
536 1
class StringNodeId(NodeId):
537
538 1
    def __init__(self, identifier, namespace=0):
539 1
        NodeId.__init__(self, identifier, namespace, NodeIdType.String)
540
541
542 1
ExpandedNodeId = NodeId
543
544
545 1
class QualifiedName(FrozenClass):
546
547
    '''
548
    A string qualified with a namespace index.
549
    '''
550
551 1
    def __init__(self, name="", namespaceidx=0):
552 1
        self.NamespaceIndex = namespaceidx
553 1
        self.Name = name
554 1
        self._freeze = True
555
556 1
    def to_string(self):
557 1
        return "{}:{}".format(self.NamespaceIndex, self.Name)
558
559 1
    @staticmethod
560
    def from_string(string):
561 1
        if ":" in string:
562 1
            idx, name = string.split(":", 1)
563
        else:
564 1
            idx = 0
565 1
            name = string
566 1
        return QualifiedName(name, int(idx))
567
568 1
    def to_binary(self):
569 1
        packet = []
570 1
        packet.append(uatype_UInt16.pack(self.NamespaceIndex))
571 1
        packet.append(pack_string(self.Name))
572 1
        return b''.join(packet)
573
574 1
    @staticmethod
575
    def from_binary(data):
576 1
        obj = QualifiedName()
577 1
        obj.NamespaceIndex = uatype_UInt16.unpack(data.read(2))[0]
578 1
        obj.Name = unpack_string(data)
579 1
        return obj
580
581 1
    def __eq__(self, bname):
582 1
        return isinstance(bname, QualifiedName) and self.Name == bname.Name and self.NamespaceIndex == bname.NamespaceIndex
583
584 1
    def __str__(self):
585
        return 'QualifiedName({}:{})'.format(self.NamespaceIndex, self.Name)
586
587 1
    __repr__ = __str__
588
589
590 1
class LocalizedText(FrozenClass):
591
592
    '''
593
    A string qualified with a namespace index.
594
    '''
595
596 1
    def __init__(self, text=""):
597 1
        self.Encoding = 0
598 1
        self.Text = text
599 1
        if type(self.Text) is unicode:
600 1
            self.Text = self.Text.encode('utf-8')
601 1
        if self.Text:
602 1
            self.Encoding |= (1 << 1)
603 1
        self.Locale = b''
604 1
        self._freeze = True
605
606 1
    def to_binary(self):
607 1
        packet = []
608 1
        if self.Locale:
609
            self.Encoding |= (1 << 0)
610 1
        if self.Text:
611 1
            self.Encoding |= (1 << 1)
612 1
        packet.append(uatype_UInt8.pack(self.Encoding))
613 1
        if self.Locale:
614
            packet.append(pack_bytes(self.Locale))
615 1
        if self.Text:
616 1
            packet.append(pack_bytes(self.Text))
617 1
        return b''.join(packet)
618
619 1
    @staticmethod
620
    def from_binary(data):
621 1
        obj = LocalizedText()
622 1
        obj.Encoding = ord(data.read(1))
623 1
        if obj.Encoding & (1 << 0):
624
            obj.Locale = unpack_bytes(data)
625 1
        if obj.Encoding & (1 << 1):
626 1
            obj.Text = unpack_bytes(data)
627 1
        return obj
628
629 1
    def to_string(self):
630
        # FIXME: use local
631
        return self.Text.decode()
632
633 1
    def __str__(self):
634
        return 'LocalizedText(' + 'Encoding:' + str(self.Encoding) + ', ' + \
635
            'Locale:' + str(self.Locale) + ', ' + \
636
            'Text:' + str(self.Text) + ')'
637 1
    __repr__ = __str__
638
639 1
    def __eq__(self, other):
640 1
        if isinstance(other, LocalizedText) and self.Locale == other.Locale and self.Text == other.Text:
641 1
            return True
642 1
        return False
643
644
645 1
class VariantType(Enum):
646
647
    '''
648
    The possible types of a variant.
649
650
    :ivar Null:
651
    :ivar Boolean:
652
    :ivar SByte:
653
    :ivar Byte:
654
    :ivar Int16:
655
    :ivar UInt16:
656
    :ivar Int32:
657
    :ivar UInt32:
658
    :ivar Int64:
659
    :ivar UInt64:
660
    :ivar Float:
661
    :ivar Double:
662
    :ivar String:
663
    :ivar DateTime:
664
    :ivar Guid:
665
    :ivar ByteString:
666
    :ivar XmlElement:
667
    :ivar NodeId:
668
    :ivar ExpandedNodeId:
669
    :ivar StatusCode:
670
    :ivar QualifiedName:
671
    :ivar LocalizedText:
672
    :ivar ExtensionObject:
673
    :ivar DataValue:
674
    :ivar Variant:
675
    :ivar DiagnosticInfo:
676
677
678
679
    '''
680 1
    Null = 0
681 1
    Boolean = 1
682 1
    SByte = 2
683 1
    Byte = 3
684 1
    Int16 = 4
685 1
    UInt16 = 5
686 1
    Int32 = 6
687 1
    UInt32 = 7
688 1
    Int64 = 8
689 1
    UInt64 = 9
690 1
    Float = 10
691 1
    Double = 11
692 1
    String = 12
693 1
    DateTime = 13
694 1
    Guid = 14
695 1
    ByteString = 15
696 1
    XmlElement = 16
697 1
    NodeId = 17
698 1
    ExpandedNodeId = 18
699 1
    StatusCode = 19
700 1
    QualifiedName = 20
701 1
    LocalizedText = 21
702 1
    ExtensionObject = 22
703 1
    DataValue = 23
704 1
    Variant = 24
705 1
    DiagnosticInfo = 25
706
707
708 1
class Variant(FrozenClass):
709
710
    """
711
    Create an OPC-UA Variant object.
712
    if no argument a Null Variant is created.
713
    if not variant type is given, attemps to guess type from python type
714
    if a variant is given as value, the new objects becomes a copy of the argument
715
716
    :ivar Value:
717
    :vartype Value: Any supported type
718
    :ivar VariantType:
719
    :vartype VariantType: VariantType
720
    """
721
722 1
    def __init__(self, value=None, varianttype=None, encoding=0):
723 1
        self.Encoding = encoding
724 1
        self.Value = value
725 1
        self.VariantType = varianttype
726 1
        if isinstance(value, Variant):
727 1
            self.Value = value.Value
728 1
            self.VariantType = value.VariantType
729 1
        if self.VariantType is None:
730 1
            if type(self.Value) in (list, tuple):
731 1
                if len(self.Value) == 0:
732
                    raise Exception("could not guess UA variable type")
733 1
                self.VariantType = self._guess_type(self.Value[0])
734
            else:
735 1
                self.VariantType = self._guess_type(self.Value)
736 1
        self._freeze = True
737
738 1
    def __eq__(self, other):
739 1
        if isinstance(other, Variant) and self.VariantType == other.VariantType and self.Value == other.Value:
740 1
            return True
741 1
        return False
742
743 1
    def _guess_type(self, val):
744 1
        if val is None:
745 1
            return VariantType.Null
746 1
        elif isinstance(val, bool):
747 1
            return VariantType.Boolean
748 1
        elif isinstance(val, float):
749 1
            return VariantType.Double
750 1
        elif isinstance(val, int):
751 1
            return VariantType.Int64
752 1
        elif type(val) in (str, unicode):
753 1
            return VariantType.String
754 1
        elif isinstance(val, bytes):
755 1
            return VariantType.ByteString
756 1
        elif isinstance(val, datetime):
757 1
            return VariantType.DateTime
758
        else:
759 1
            if isinstance(val, object):
760 1
                try:
761 1
                    return getattr(VariantType, val.__class__.__name__)
762 1
                except AttributeError:
763 1
                    return VariantType.ExtensionObject
764
            else:
765
                raise Exception("Could not guess UA type of {} with type {}, specify UA type".format(val, type(val)))
766
767 1
    def __str__(self):
768 1
        return "Variant(val:{},type:{})".format(self.Value, self.VariantType)
769 1
    __repr__ = __str__
770
771 1
    def to_binary(self):
772 1
        b = []
773 1
        mask = self.Encoding & 0b01111111
774 1
        self.Encoding = (self.VariantType.value | mask)
775 1
        if type(self.Value) in (list, tuple):
776 1
            self.Encoding |= (1 << 7)
777 1
            b.append(uatype_UInt8.pack(self.Encoding))
778 1
            b.append(pack_uatype_array(self.VariantType.name, self.Value))
779
        else:
780 1
            b.append(uatype_UInt8.pack(self.Encoding))
781 1
            b.append(pack_uatype(self.VariantType.name, self.Value))
782 1
        return b"".join(b)
783
784 1
    @staticmethod
785
    def from_binary(data):
786 1
        encoding = ord(data.read(1))
787 1
        vtype = VariantType(encoding & 0b01111111)
788 1
        if vtype == VariantType.Null:
789 1
            return Variant(None, vtype, encoding)
790 1
        if encoding & (1 << 7):
791 1
            value = unpack_uatype_array(vtype.name, data)
792
        else:
793 1
            value = unpack_uatype(vtype.name, data)
794 1
        return Variant(value, vtype, encoding)
795
796
797 1
class DataValue(FrozenClass):
798
799
    '''
800
    A value with an associated timestamp, and quality.
801
    Automatically generated from xml , copied and modified here to fix errors in xml spec
802
803
    :ivar Value:
804
    :vartype Value: Variant
805
    :ivar StatusCode:
806
    :vartype StatusCode: StatusCode
807
    :ivar SourceTimestamp:
808
    :vartype SourceTimestamp: datetime
809
    :ivar SourcePicoSeconds:
810
    :vartype SourcePicoSeconds: int
811
    :ivar ServerTimestamp:
812
    :vartype ServerTimestamp: datetime
813
    :ivar ServerPicoseconds:
814
    :vartype ServerPicoseconds: int
815
816
    '''
817
818 1
    def __init__(self, variant=None, status=None):
819 1
        self.Encoding = 0
820 1
        if not isinstance(variant, Variant):
821 1
            variant = Variant(variant)
822 1
        self.Value = variant
823 1
        if status is None:
824 1
            self.StatusCode = StatusCode()
825
        else:
826 1
            self.StatusCode = status
827 1
        self.SourceTimestamp = None  # DateTime()
828 1
        self.SourcePicoseconds = None
829 1
        self.ServerTimestamp = None  # DateTime()
830 1
        self.ServerPicoseconds = None
831 1
        self._freeze = True
832
833 1
    def to_binary(self):
834 1
        packet = []
835 1
        if self.Value:
836 1
            self.Encoding |= (1 << 0)
837 1
        if self.StatusCode:
838 1
            self.Encoding |= (1 << 1)
839 1
        if self.SourceTimestamp:
840 1
            self.Encoding |= (1 << 2)
841 1
        if self.ServerTimestamp:
842 1
            self.Encoding |= (1 << 3)
843 1
        if self.SourcePicoseconds:
844
            self.Encoding |= (1 << 4)
845 1
        if self.ServerPicoseconds:
846
            self.Encoding |= (1 << 5)
847 1
        packet.append(uatype_UInt8.pack(self.Encoding))
848 1
        if self.Value:
849 1
            packet.append(self.Value.to_binary())
850 1
        if self.StatusCode:
851 1
            packet.append(self.StatusCode.to_binary())
852 1
        if self.SourceTimestamp:
853 1
            packet.append(pack_datetime(self.SourceTimestamp))  # self.SourceTimestamp.to_binary())
854 1
        if self.ServerTimestamp:
855 1
            packet.append(pack_datetime(self.ServerTimestamp))  # self.ServerTimestamp.to_binary())
856 1
        if self.SourcePicoseconds:
857
            packet.append(uatype_UInt16.pack(self.SourcePicoseconds))
858 1
        if self.ServerPicoseconds:
859
            packet.append(uatype_UInt16.pack(self.ServerPicoseconds))
860 1
        return b''.join(packet)
861
862 1
    @staticmethod
863
    def from_binary(data):
864 1
        encoding = ord(data.read(1))
865 1
        if encoding & (1 << 0):
866 1
            value = Variant.from_binary(data)
867
        else:
868
            value = None
869 1
        if encoding & (1 << 1):
870 1
            status = StatusCode.from_binary(data)
871
        else:
872
            status = None
873 1
        obj = DataValue(value, status)
874 1
        obj.Encoding = encoding
875 1
        if obj.Encoding & (1 << 2):
876 1
            obj.SourceTimestamp = unpack_datetime(data)  # DateTime.from_binary(data)
877 1
        if obj.Encoding & (1 << 3):
878 1
            obj.ServerTimestamp = unpack_datetime(data)  # DateTime.from_binary(data)
879 1
        if obj.Encoding & (1 << 4):
880
            obj.SourcePicoseconds = uatype_UInt16.unpack(data.read(2))[0]
881 1
        if obj.Encoding & (1 << 5):
882
            obj.ServerPicoseconds = uatype_UInt16.unpack(data.read(2))[0]
883 1
        return obj
884
885 1
    def __str__(self):
886
        s = 'DataValue(Value:{}'.format(self.Value)
887
        if self.StatusCode is not None:
888
            s += ', StatusCode:{}'.format(self.StatusCode)
889
        if self.SourceTimestamp is not None:
890
            s += ', SourceTimestamp:{}'.format(self.SourceTimestamp)
891
        if self.ServerTimestamp is not None:
892
            s += ', ServerTimestamp:{}'.format(self.ServerTimestamp)
893
        if self.SourcePicoseconds is not None:
894
            s += ', SourcePicoseconds:{}'.format(self.SourcePicoseconds)
895
        if self.ServerPicoseconds is not None:
896
            s += ', ServerPicoseconds:{}'.format(self.ServerPicoseconds)
897
        s += ')'
898
        return s
899
900 1
    __repr__ = __str__
901
902
903 1
__nodeid_counter = 2000
904
905
906 1
def generate_nodeid(idx):
907
    global __nodeid_counter
908 1
    __nodeid_counter += 1
909
    return NodeId(__nodeid_counter, idx)
910