Completed
Push — master ( 9142a5...19ce54 )
by Olivier
52:28 queued 49:33
created

opcua.ua.UAError

Complexity

Total Complexity 0

Size/Duplication

Total Lines 2
Duplicated Lines 0 %

Test Coverage

Coverage 100%
Metric Value
dl 0
loc 2
ccs 2
cts 2
cp 1
wmc 0
1
"""
2
implement ua datatypes
3
"""
4 1
import logging
5 1
from enum import Enum
6 1
from datetime import datetime, timedelta, tzinfo
7 1
from calendar import timegm
8 1
import sys
9 1
import os
10 1
import uuid
11 1
import struct
12 1
if sys.version_info.major > 2:
13 1
    unicode = str
14
15 1
from opcua.ua import status_codes
16 1
from opcua.common.utils import UAError
17
18
19 1
logger = logging.getLogger('opcua.uaprotocol')
20
21
22
# types that will packed and unpacked directly using struct (string, bytes and datetime are handles as special cases
23 1
UaTypes = ("Boolean", "SByte", "Byte", "Int8", "UInt8", "Int16", "UInt16", "Int32", "UInt32", "Int64", "UInt64", "Float", "Double")
24
25
26 1
EPOCH_AS_FILETIME = 116444736000000000  # January 1, 1970 as MS file time
27 1
HUNDREDS_OF_NANOSECONDS = 10000000
28 1
FILETIME_EPOCH_AS_DATETIME = datetime(1601, 1, 1)
29
30
31 1
class UTC(tzinfo):
32
33
    """UTC"""
34
35 1
    def utcoffset(self, dt):
36
        return timedelta(0)
37
38 1
    def tzname(self, dt):
39
        return "UTC"
40
41 1
    def dst(self, dt):
42 1
        return timedelta(0)
43
44
45
# method copied from David Buxton <[email protected]> sample code
46 1
def datetime_to_win_epoch(dt):
47 1
    if (dt.tzinfo is None) or (dt.tzinfo.utcoffset(dt) is None):
48 1
        dt = dt.replace(tzinfo=UTC())
49 1
    ft = EPOCH_AS_FILETIME + (timegm(dt.timetuple()) * HUNDREDS_OF_NANOSECONDS)
50 1
    return ft + (dt.microsecond * 10)
51
52
53 1
def win_epoch_to_datetime(epch):
54 1
    return FILETIME_EPOCH_AS_DATETIME + timedelta(microseconds=epch // 10)
55
56
57 1
def build_array_format_py2(prefix, length, fmtchar):
58
    return prefix + str(length) + fmtchar
59
60
61 1
def build_array_format_py3(prefix, length, fmtchar):
62 1
    return prefix + str(length) + chr(fmtchar)
63
64
65 1
if sys.version_info.major < 3:
66
    build_array_format = build_array_format_py2
67
else:
68 1
    build_array_format = build_array_format_py3
69
70
71 1
def pack_uatype_array_primitive(st, value, length):
72 1
    if length == 1:
73 1
        return b'\x01\x00\x00\x00' + st.pack(value[0])
74
    else:
75 1
        return struct.pack(build_array_format("<i", length, st.format[1]), length, *value)
76
77
78 1
def pack_uatype_array(uatype, value):
79 1
    if value is None:
80
        return b'\xff\xff\xff\xff'
81 1
    length = len(value)
82 1
    if length == 0:
83
        return b'\x00\x00\x00\x00'
84 1
    if uatype in uatype2struct:
85 1
        return pack_uatype_array_primitive(uatype2struct[uatype], value, length)
86 1
    b = []
87 1
    b.append(uatype_Int32.pack(length))
88 1
    for val in value:
89 1
        b.append(pack_uatype(uatype, val))
90 1
    return b"".join(b)
91
92
93 1
def pack_uatype(uatype, value):
94 1
    if uatype in uatype2struct:
95 1
        return uatype2struct[uatype].pack(value)
96 1
    elif uatype == "Null":
97 1
        return b''
98 1
    elif uatype == "String":
99 1
        return pack_string(value)
100 1
    elif uatype in ("CharArray", "ByteString"):
101 1
        return pack_bytes(value)
102 1
    elif uatype == "DateTime":
103 1
        return pack_datetime(value)
104 1
    elif uatype == "ExtensionObject":
105
        # dependency loop: classes in uaprotocol_auto use Variant defined in this file,
106
        # but Variant can contain any object from uaprotocol_auto as ExtensionObject.
107
        # Using local import to avoid import loop
108 1
        from opcua.ua.uaprotocol_auto import extensionobject_to_binary
109 1
        return extensionobject_to_binary(value)
110
    else:
111 1
        return value.to_binary()
112
113 1
uatype_Int8 = struct.Struct("<b")
114 1
uatype_SByte = uatype_Int8
115 1
uatype_Int16 = struct.Struct("<h")
116 1
uatype_Int32 = struct.Struct("<i")
117 1
uatype_Int64 = struct.Struct("<q")
118 1
uatype_UInt8 = struct.Struct("<B")
119 1
uatype_Char = uatype_UInt8
120 1
uatype_Byte = uatype_UInt8
121 1
uatype_UInt16 = struct.Struct("<H")
122 1
uatype_UInt32 = struct.Struct("<I")
123 1
uatype_UInt64 = struct.Struct("<Q")
124 1
uatype_Boolean = struct.Struct("<?")
125 1
uatype_Double = struct.Struct("<d")
126 1
uatype_Float = struct.Struct("<f")
127
128 1
uatype2struct = {
129
    "Int8": uatype_Int8,
130
    "SByte": uatype_SByte,
131
    "Int16": uatype_Int16,
132
    "Int32": uatype_Int32,
133
    "Int64": uatype_Int64,
134
    "UInt8": uatype_UInt8,
135
    "Char": uatype_Char,
136
    "Byte": uatype_Byte,
137
    "UInt16": uatype_UInt16,
138
    "UInt32": uatype_UInt32,
139
    "UInt64": uatype_UInt64,
140
    "Boolean": uatype_Boolean,
141
    "Double": uatype_Double,
142
    "Float": uatype_Float,
143
}
144
145
146 1
def unpack_uatype(uatype, data):
147 1
    if uatype in uatype2struct:
148 1
        st = uatype2struct[uatype]
149 1
        return st.unpack(data.read(st.size))[0]
150 1
    elif uatype == "String":
151 1
        return unpack_string(data)
152 1
    elif uatype in ("CharArray", "ByteString"):
153 1
        return unpack_bytes(data)
154 1
    elif uatype == "DateTime":
155 1
        return unpack_datetime(data)
156 1
    elif uatype == "ExtensionObject":
157
        # dependency loop: classes in uaprotocol_auto use Variant defined in this file,
158
        # but Variant can contain any object from uaprotocol_auto as ExtensionObject.
159
        # Using local import to avoid import loop
160 1
        from opcua.ua.uaprotocol_auto import extensionobject_from_binary
161 1
        return extensionobject_from_binary(data)
162
    else:
163 1
        glbs = globals()
164 1
        if uatype in glbs:
165 1
            klass = glbs[uatype]
166 1
            if hasattr(klass, 'from_binary'):
167 1
                return klass.from_binary(data)
168
        raise UAError("can not unpack unknown uatype %s" % uatype)
169
170
171 1
def unpack_uatype_array(uatype, data):
172 1
    length = uatype_Int32.unpack(data.read(4))[0]
173 1
    if length == -1:
174
        return None
175 1
    elif length == 0:
176 1
        return []
177 1
    elif uatype in uatype2struct:
178 1
        st = uatype2struct[uatype]
179 1
        if length == 1:
180 1
            return list(st.unpack(data.read(st.size)))
181
        else:
182 1
            arrst = struct.Struct(build_array_format("<", length, st.format[1]))
183 1
            return list(arrst.unpack(data.read(arrst.size)))
184
    else:
185 1
        result = []
186 1
        for _ in range(0, length):
187 1
            result.append(unpack_uatype(uatype, data))
188 1
        return result
189
190
191 1
def pack_datetime(value):
192 1
    epch = datetime_to_win_epoch(value)
193 1
    return uatype_Int64.pack(epch)
194
195
196 1
def unpack_datetime(data):
197 1
    epch = uatype_Int64.unpack(data.read(8))[0]
198 1
    return win_epoch_to_datetime(epch)
199
200
201 1
def pack_string(string):
202 1
    if type(string) is unicode:
203 1
        string = string.encode('utf-8')
204 1
    length = len(string)
205 1
    if length == 0:
206 1
        return b'\xff\xff\xff\xff'
207 1
    return uatype_Int32.pack(length) + string
208
209 1
pack_bytes = pack_string
210
211
212 1
def unpack_bytes(data):
213 1
    length = uatype_Int32.unpack(data.read(4))[0]
214 1
    if length == -1:
215 1
        return b''
216 1
    return data.read(length)
217
218
219 1
def py3_unpack_string(data):
220 1
    b = unpack_bytes(data)
221 1
    return b.decode("utf-8")
222
223
224 1
if sys.version_info.major < 3:
225
    unpack_string = unpack_bytes
226
else:
227 1
    unpack_string = py3_unpack_string
228
229
230 1
def test_bit(data, offset):
231 1
    mask = 1 << offset
232 1
    return data & mask
233
234
235 1
def set_bit(data, offset):
236
    mask = 1 << offset
237
    return data | mask
238
239
240 1
class _FrozenClass(object):
241
242
    """
243
    make it impossible to add members to a class.
244
    This is a hack since I found out that most bugs are due to misspelling a variable in protocol
245
    """
246 1
    _freeze = False
247
248 1
    def __setattr__(self, key, value):
249 1
        if self._freeze and not hasattr(self, key):
250
            raise TypeError("Error adding member '{}' to class '{}', class is frozen, members are {}".format(key, self.__class__.__name__, self.__dict__.keys()))
251 1
        object.__setattr__(self, key, value)
252
253 1
if "PYOPCUA_NO_TYPO_CHECK" in os.environ:
254
    # typo check is cpu consuming, but it will make debug easy.
255
    # if typo check is not need (in production), please set env PYOPCUA_NO_TYPO_CHECK.
256
    # this will make all uatype class inherit from object intead of _FrozenClass
257
    # and skip the typo check.
258
    FrozenClass = object
259
else:
260 1
    FrozenClass = _FrozenClass
261
262
263 1
class Guid(FrozenClass):
264
265 1
    def __init__(self):
266 1
        self.uuid = uuid.uuid4()
267 1
        self._freeze = True
268
269 1
    def to_binary(self):
270 1
        return self.uuid.bytes
271
272 1
    @staticmethod
273
    def from_binary(data):
274 1
        g = Guid()
275 1
        g.uuid = uuid.UUID(bytes=data.read(16))
276 1
        return g
277
278 1
    def __eq__(self, other):
279 1
        return isinstance(other, Guid) and self.uuid == other.uuid
280
281
282 1
class StatusCode(FrozenClass):
283
284
    """
285
    :ivar value:
286
    :vartype value: int
287
    :ivar name:
288
    :vartype name: string
289
    :ivar doc:
290
    :vartype doc: string
291
    """
292
293 1
    def __init__(self, value=0):
294 1
        self.value = value
295 1
        self.name, self.doc = status_codes.get_name_and_doc(value)
296 1
        self._freeze = True
297
298 1
    def to_binary(self):
299 1
        return uatype_UInt32.pack(self.value)
300
301 1
    @staticmethod
302
    def from_binary(data):
303 1
        val = uatype_UInt32.unpack(data.read(4))[0]
304 1
        sc = StatusCode(val)
305 1
        return sc
306
307 1
    def check(self):
308
        """
309
        raise en exception if status code is anything else than 0
310
        use is is_good() method if not exception is desired
311
        """
312 1
        if self.value != 0:
313 1
            raise UAError("{}({})".format(self.doc, self.name))
314
315 1
    def is_good(self):
316
        """
317
        return True if status is Good.
318
        """
319 1
        if self.value == 0:
320 1
            return True
321 1
        return False
322
323 1
    def __str__(self):
324
        return 'StatusCode({})'.format(self.name)
325 1
    __repr__ = __str__
326
327
328 1
class NodeIdType(object):
329 1
    TwoByte = 0
330 1
    FourByte = 1
331 1
    Numeric = 2
332 1
    String = 3
333 1
    Guid = 4
334 1
    ByteString = 5
335
336
337 1
class NodeId(FrozenClass):
338
339
    """
340
    NodeId Object
341
342
    Args:
343
        identifier: The identifier might be an int, a string, bytes or a Guid
344
        namespaceidx(int): The index of the namespace
345
        nodeidtype(NodeIdType): The type of the nodeid if it cannor be guess or you want something special like twobyte nodeid or fourbytenodeid
346
347
348
    :ivar Identifier:
349
    :vartype Identifier: NodeId
350
    :ivar NamespaceIndex:
351
    :vartype NamespaceIndex: Int
352
    :ivar NamespaceUri:
353
    :vartype NamespaceUri: String
354
    :ivar ServerIndex:
355
    :vartype ServerIndex: Int
356
    """
357
358 1
    def __init__(self, identifier=None, namespaceidx=0, nodeidtype=None):
359 1
        self.Identifier = identifier
360 1
        self.NamespaceIndex = namespaceidx
361 1
        self.NodeIdType = nodeidtype
362 1
        self.NamespaceUri = ""
363 1
        self.ServerIndex = 0
364 1
        self._freeze = True
365 1
        if self.Identifier is None:
366 1
            self.Identifier = 0
367 1
            self.NodeIdType = NodeIdType.TwoByte
368 1
            return
369 1
        if self.NodeIdType is None:
370 1
            if isinstance(self.Identifier, int):
371 1
                self.NodeIdType = NodeIdType.Numeric
372 1
            elif isinstance(self.Identifier, str):
373 1
                self.NodeIdType = NodeIdType.String
374
            elif isinstance(self.Identifier, bytes):
375
                self.NodeIdType = NodeIdType.ByteString
376
            else:
377
                raise UAError("NodeId: Could not guess type of NodeId, set NodeIdType")
378
379 1
    def __key(self):
380 1
        if self.NodeIdType in (NodeIdType.TwoByte, NodeIdType.FourByte, NodeIdType.Numeric):  # twobyte, fourbyte and numeric may represent the same node
381 1
            return self.NamespaceIndex, self.Identifier
382
        else:
383 1
            return self.NodeIdType, self.NamespaceIndex, self.Identifier
384
385 1
    def __eq__(self, node):
386 1
        return isinstance(node, NodeId) and self.__key() == node.__key()
387
388 1
    def __hash__(self):
389 1
        return hash(self.__key())
390
391 1
    @staticmethod
392
    def from_string(string):
393 1
        l = string.split(";")
394 1
        identifier = None
395 1
        namespace = 0
396 1
        ntype = None
397 1
        srv = None
398 1
        nsu = None
399 1
        for el in l:
400 1
            if not el:
401 1
                continue
402 1
            k, v = el.split("=")
403 1
            k = k.strip()
404 1
            v = v.strip()
405 1
            if k == "ns":
406 1
                namespace = int(v)
407 1
            elif k == "i":
408 1
                ntype = NodeIdType.Numeric
409 1
                identifier = int(v)
410 1
            elif k == "s":
411 1
                ntype = NodeIdType.String
412 1
                identifier = v
413 1
            elif k == "g":
414
                ntype = NodeIdType.Guid
415
                identifier = v
416 1
            elif k == "b":
417
                ntype = NodeIdType.ByteString
418
                identifier = v
419 1
            elif k == "srv":
420 1
                srv = v
421
            elif k == "nsu":
422
                nsu = v
423 1
        if identifier is None:
424
            raise UAError("Could not parse nodeid string: " + string)
425 1
        nodeid = NodeId(identifier, namespace, ntype)
426 1
        nodeid.NamespaceUri = nsu
427 1
        nodeid.ServerIndex = srv
428 1
        return nodeid
429
430 1
    def to_string(self):
431 1
        string = ""
432 1
        if self.NamespaceIndex != 0:
433 1
            string += "ns={};".format(self.NamespaceIndex)
434 1
        ntype = None
435 1
        if self.NodeIdType == NodeIdType.Numeric:
436
            ntype = "i"
437 1
        elif self.NodeIdType == NodeIdType.String:
438 1
            ntype = "s"
439 1
        elif self.NodeIdType == NodeIdType.TwoByte:
440 1
            ntype = "i"
441 1
        elif self.NodeIdType == NodeIdType.FourByte:
442 1
            ntype = "i"
443
        elif self.NodeIdType == NodeIdType.Guid:
444
            ntype = "g"
445
        elif self.NodeIdType == NodeIdType.ByteString:
446
            ntype = "b"
447 1
        string += "{}={}".format(ntype, self.Identifier)
448 1
        if self.ServerIndex:
449
            string = "srv=" + str(self.ServerIndex) + string
450 1
        if self.NamespaceUri:
451
            string += "nsu={}".format(self.NamespaceUri)
452 1
        return string
453
454 1
    def __str__(self):
455 1
        return "NodeId({})".format(self.to_string())
456 1
    __repr__ = __str__
457
458 1
    def to_binary(self):
459 1
        if self.NodeIdType == NodeIdType.TwoByte:
460 1
            return struct.pack("<BB", self.NodeIdType, self.Identifier)
461 1
        elif self.NodeIdType == NodeIdType.FourByte:
462 1
            return struct.pack("<BBH", self.NodeIdType, self.NamespaceIndex, self.Identifier)
463 1
        elif self.NodeIdType == NodeIdType.Numeric:
464 1
            return struct.pack("<BHI", self.NodeIdType, self.NamespaceIndex, self.Identifier)
465 1
        elif self.NodeIdType == NodeIdType.String:
466 1
            return struct.pack("<BH", self.NodeIdType, self.NamespaceIndex) + \
467
                pack_string(self.Identifier)
468 1
        elif self.NodeIdType == NodeIdType.ByteString:
469 1
            return struct.pack("<BH", self.NodeIdType, self.NamespaceIndex) + \
470
                pack_bytes(self.Identifier)
471
        else:
472 1
            return struct.pack("<BH", self.NodeIdType, self.NamespaceIndex) + \
473
                self.Identifier.to_binary()
474
475 1
    @staticmethod
476
    def from_binary(data):
477 1
        nid = NodeId()
478 1
        encoding = ord(data.read(1))
479 1
        nid.NodeIdType = encoding & 0b00111111
480
481 1
        if nid.NodeIdType == NodeIdType.TwoByte:
482 1
            nid.Identifier = ord(data.read(1))
483 1
        elif nid.NodeIdType == NodeIdType.FourByte:
484 1
            nid.NamespaceIndex, nid.Identifier = struct.unpack("<BH", data.read(3))
485 1
        elif nid.NodeIdType == NodeIdType.Numeric:
486 1
            nid.NamespaceIndex, nid.Identifier = struct.unpack("<HI", data.read(6))
487 1
        elif nid.NodeIdType == NodeIdType.String:
488 1
            nid.NamespaceIndex = uatype_UInt16.unpack(data.read(2))[0]
489 1
            nid.Identifier = unpack_string(data)
490 1
        elif nid.NodeIdType == NodeIdType.ByteString:
491 1
            nid.NamespaceIndex = uatype_UInt16.unpack(data.read(2))[0]
492 1
            nid.Identifier = unpack_bytes(data)
493 1
        elif nid.NodeIdType == NodeIdType.Guid:
494 1
            nid.NamespaceIndex = uatype_UInt16.unpack(data.read(2))[0]
495 1
            nid.Identifier = Guid.from_binary(data)
496
        else:
497
            raise UAError("Unknown NodeId encoding: " + str(nid.NodeIdType))
498
499 1
        if test_bit(encoding, 6):
500
            nid.NamespaceUri = unpack_string(data)
501 1
        if test_bit(encoding, 7):
502
            nid.ServerIndex = uatype_UInt32.unpack(data.read(4))[0]
503
504 1
        return nid
505
506
507 1
class TwoByteNodeId(NodeId):
508
509 1
    def __init__(self, identifier):
510 1
        NodeId.__init__(self, identifier, 0, NodeIdType.TwoByte)
511
512
513 1
class FourByteNodeId(NodeId):
514
515 1
    def __init__(self, identifier, namespace=0):
516 1
        NodeId.__init__(self, identifier, namespace, NodeIdType.FourByte)
517
518
519 1
class NumericNodeId(NodeId):
520
521 1
    def __init__(self, identifier, namespace=0):
522 1
        NodeId.__init__(self, identifier, namespace, NodeIdType.Numeric)
523
524
525 1
class ByteStringNodeId(NodeId):
526
527 1
    def __init__(self, identifier, namespace=0):
528 1
        NodeId.__init__(self, identifier, namespace, NodeIdType.ByteString)
529
530
531 1
class GuidNodeId(NodeId):
532
533 1
    def __init__(self, identifier, namespace=0):
534 1
        NodeId.__init__(self, identifier, namespace, NodeIdType.Guid)
535
536
537 1
class StringNodeId(NodeId):
538
539 1
    def __init__(self, identifier, namespace=0):
540 1
        NodeId.__init__(self, identifier, namespace, NodeIdType.String)
541
542
543 1
ExpandedNodeId = NodeId
544
545
546 1
class QualifiedName(FrozenClass):
547
548
    '''
549
    A string qualified with a namespace index.
550
    '''
551
552 1
    def __init__(self, name="", namespaceidx=0):
553 1
        self.NamespaceIndex = namespaceidx
554 1
        self.Name = name
555 1
        self._freeze = True
556
557 1
    def to_string(self):
558 1
        return "{}:{}".format(self.NamespaceIndex, self.Name)
559
560 1
    @staticmethod
561
    def from_string(string):
562 1
        if ":" in string:
563 1
            idx, name = string.split(":", 1)
564
        else:
565 1
            idx = 0
566 1
            name = string
567 1
        return QualifiedName(name, int(idx))
568
569 1
    def to_binary(self):
570 1
        packet = []
571 1
        packet.append(uatype_UInt16.pack(self.NamespaceIndex))
572 1
        packet.append(pack_string(self.Name))
573 1
        return b''.join(packet)
574
575 1
    @staticmethod
576
    def from_binary(data):
577 1
        obj = QualifiedName()
578 1
        obj.NamespaceIndex = uatype_UInt16.unpack(data.read(2))[0]
579 1
        obj.Name = unpack_string(data)
580 1
        return obj
581
582 1
    def __eq__(self, bname):
583 1
        return isinstance(bname, QualifiedName) and self.Name == bname.Name and self.NamespaceIndex == bname.NamespaceIndex
584
585 1
    def __str__(self):
586
        return 'QualifiedName({}:{})'.format(self.NamespaceIndex, self.Name)
587
588 1
    __repr__ = __str__
589
590
591 1
class LocalizedText(FrozenClass):
592
593
    '''
594
    A string qualified with a namespace index.
595
    '''
596
597 1
    def __init__(self, text=""):
598 1
        self.Encoding = 0
599 1
        self.Text = text
600 1
        if type(self.Text) is unicode:
601 1
            self.Text = self.Text.encode('utf-8')
602 1
        if self.Text:
603 1
            self.Encoding |= (1 << 1)
604 1
        self.Locale = b''
605 1
        self._freeze = True
606
607 1
    def to_binary(self):
608 1
        packet = []
609 1
        if self.Locale:
610
            self.Encoding |= (1 << 0)
611 1
        if self.Text:
612 1
            self.Encoding |= (1 << 1)
613 1
        packet.append(uatype_UInt8.pack(self.Encoding))
614 1
        if self.Locale:
615
            packet.append(pack_bytes(self.Locale))
616 1
        if self.Text:
617 1
            packet.append(pack_bytes(self.Text))
618 1
        return b''.join(packet)
619
620 1
    @staticmethod
621
    def from_binary(data):
622 1
        obj = LocalizedText()
623 1
        obj.Encoding = ord(data.read(1))
624 1
        if obj.Encoding & (1 << 0):
625
            obj.Locale = unpack_bytes(data)
626 1
        if obj.Encoding & (1 << 1):
627 1
            obj.Text = unpack_bytes(data)
628 1
        return obj
629
630 1
    def to_string(self):
631
        # FIXME: use local
632
        return self.Text.decode()
633
634 1
    def __str__(self):
635
        return 'LocalizedText(' + 'Encoding:' + str(self.Encoding) + ', ' + \
636
            'Locale:' + str(self.Locale) + ', ' + \
637
            'Text:' + str(self.Text) + ')'
638 1
    __repr__ = __str__
639
640 1
    def __eq__(self, other):
641 1
        if isinstance(other, LocalizedText) and self.Locale == other.Locale and self.Text == other.Text:
642 1
            return True
643 1
        return False
644
645
646 1
class VariantType(Enum):
647
648
    '''
649
    The possible types of a variant.
650
651
    :ivar Null:
652
    :ivar Boolean:
653
    :ivar SByte:
654
    :ivar Byte:
655
    :ivar Int16:
656
    :ivar UInt16:
657
    :ivar Int32:
658
    :ivar UInt32:
659
    :ivar Int64:
660
    :ivar UInt64:
661
    :ivar Float:
662
    :ivar Double:
663
    :ivar String:
664
    :ivar DateTime:
665
    :ivar Guid:
666
    :ivar ByteString:
667
    :ivar XmlElement:
668
    :ivar NodeId:
669
    :ivar ExpandedNodeId:
670
    :ivar StatusCode:
671
    :ivar QualifiedName:
672
    :ivar LocalizedText:
673
    :ivar ExtensionObject:
674
    :ivar DataValue:
675
    :ivar Variant:
676
    :ivar DiagnosticInfo:
677
678
679
680
    '''
681 1
    Null = 0
682 1
    Boolean = 1
683 1
    SByte = 2
684 1
    Byte = 3
685 1
    Int16 = 4
686 1
    UInt16 = 5
687 1
    Int32 = 6
688 1
    UInt32 = 7
689 1
    Int64 = 8
690 1
    UInt64 = 9
691 1
    Float = 10
692 1
    Double = 11
693 1
    String = 12
694 1
    DateTime = 13
695 1
    Guid = 14
696 1
    ByteString = 15
697 1
    XmlElement = 16
698 1
    NodeId = 17
699 1
    ExpandedNodeId = 18
700 1
    StatusCode = 19
701 1
    QualifiedName = 20
702 1
    LocalizedText = 21
703 1
    ExtensionObject = 22
704 1
    DataValue = 23
705 1
    Variant = 24
706 1
    DiagnosticInfo = 25
707
708
709 1
class Variant(FrozenClass):
710
711
    """
712
    Create an OPC-UA Variant object.
713
    if no argument a Null Variant is created.
714
    if not variant type is given, attemps to guess type from python type
715
    if a variant is given as value, the new objects becomes a copy of the argument
716
717
    :ivar Value:
718
    :vartype Value: Any supported type
719
    :ivar VariantType:
720
    :vartype VariantType: VariantType
721
    """
722
723 1
    def __init__(self, value=None, varianttype=None, encoding=0):
724 1
        self.Encoding = encoding
725 1
        self.Value = value
726 1
        self.VariantType = varianttype
727 1
        if isinstance(value, Variant):
728 1
            self.Value = value.Value
729 1
            self.VariantType = value.VariantType
730 1
        if self.VariantType is None:
731 1
            if type(self.Value) in (list, tuple):
732 1
                if len(self.Value) == 0:
733
                    raise UAError("could not guess UA variable type")
734 1
                self.VariantType = self._guess_type(self.Value[0])
735
            else:
736 1
                self.VariantType = self._guess_type(self.Value)
737 1
        self._freeze = True
738
739 1
    def __eq__(self, other):
740 1
        if isinstance(other, Variant) and self.VariantType == other.VariantType and self.Value == other.Value:
741 1
            return True
742 1
        return False
743
744 1
    def _guess_type(self, val):
745 1
        if val is None:
746 1
            return VariantType.Null
747 1
        elif isinstance(val, bool):
748 1
            return VariantType.Boolean
749 1
        elif isinstance(val, float):
750 1
            return VariantType.Double
751 1
        elif isinstance(val, int):
752 1
            return VariantType.Int64
753 1
        elif type(val) in (str, unicode):
754 1
            return VariantType.String
755 1
        elif isinstance(val, bytes):
756 1
            return VariantType.ByteString
757 1
        elif isinstance(val, datetime):
758 1
            return VariantType.DateTime
759
        else:
760 1
            if isinstance(val, object):
761 1
                try:
762 1
                    return getattr(VariantType, val.__class__.__name__)
763 1
                except AttributeError:
764 1
                    return VariantType.ExtensionObject
765
            else:
766
                raise UAError("Could not guess UA type of {} with type {}, specify UA type".format(val, type(val)))
767
768 1
    def __str__(self):
769 1
        return "Variant(val:{!s},type:{})".format(self.Value, self.VariantType)
770 1
    __repr__ = __str__
771
772 1
    def to_binary(self):
773 1
        b = []
774 1
        mask = self.Encoding & 0b01111111
775 1
        self.Encoding = (self.VariantType.value | mask)
776 1
        if type(self.Value) in (list, tuple):
777 1
            self.Encoding |= (1 << 7)
778 1
            b.append(uatype_UInt8.pack(self.Encoding))
779 1
            b.append(pack_uatype_array(self.VariantType.name, self.Value))
780
        else:
781 1
            b.append(uatype_UInt8.pack(self.Encoding))
782 1
            b.append(pack_uatype(self.VariantType.name, self.Value))
783 1
        return b"".join(b)
784
785 1
    @staticmethod
786
    def from_binary(data):
787 1
        encoding = ord(data.read(1))
788 1
        vtype = VariantType(encoding & 0b01111111)
789 1
        if vtype == VariantType.Null:
790 1
            return Variant(None, vtype, encoding)
791 1
        if encoding & (1 << 7):
792 1
            value = unpack_uatype_array(vtype.name, data)
793
        else:
794 1
            value = unpack_uatype(vtype.name, data)
795 1
        return Variant(value, vtype, encoding)
796
797
798 1
class DataValue(FrozenClass):
799
800
    '''
801
    A value with an associated timestamp, and quality.
802
    Automatically generated from xml , copied and modified here to fix errors in xml spec
803
804
    :ivar Value:
805
    :vartype Value: Variant
806
    :ivar StatusCode:
807
    :vartype StatusCode: StatusCode
808
    :ivar SourceTimestamp:
809
    :vartype SourceTimestamp: datetime
810
    :ivar SourcePicoSeconds:
811
    :vartype SourcePicoSeconds: int
812
    :ivar ServerTimestamp:
813
    :vartype ServerTimestamp: datetime
814
    :ivar ServerPicoseconds:
815
    :vartype ServerPicoseconds: int
816
817
    '''
818
819 1
    def __init__(self, variant=None, status=None):
820 1
        self.Encoding = 0
821 1
        if not isinstance(variant, Variant):
822 1
            variant = Variant(variant)
823 1
        self.Value = variant
824 1
        if status is None:
825 1
            self.StatusCode = StatusCode()
826
        else:
827 1
            self.StatusCode = status
828 1
        self.SourceTimestamp = None  # DateTime()
829 1
        self.SourcePicoseconds = None
830 1
        self.ServerTimestamp = None  # DateTime()
831 1
        self.ServerPicoseconds = None
832 1
        self._freeze = True
833
834 1
    def to_binary(self):
835 1
        packet = []
836 1
        if self.Value:
837 1
            self.Encoding |= (1 << 0)
838 1
        if self.StatusCode:
839 1
            self.Encoding |= (1 << 1)
840 1
        if self.SourceTimestamp:
841 1
            self.Encoding |= (1 << 2)
842 1
        if self.ServerTimestamp:
843 1
            self.Encoding |= (1 << 3)
844 1
        if self.SourcePicoseconds:
845
            self.Encoding |= (1 << 4)
846 1
        if self.ServerPicoseconds:
847
            self.Encoding |= (1 << 5)
848 1
        packet.append(uatype_UInt8.pack(self.Encoding))
849 1
        if self.Value:
850 1
            packet.append(self.Value.to_binary())
851 1
        if self.StatusCode:
852 1
            packet.append(self.StatusCode.to_binary())
853 1
        if self.SourceTimestamp:
854 1
            packet.append(pack_datetime(self.SourceTimestamp))  # self.SourceTimestamp.to_binary())
855 1
        if self.ServerTimestamp:
856 1
            packet.append(pack_datetime(self.ServerTimestamp))  # self.ServerTimestamp.to_binary())
857 1
        if self.SourcePicoseconds:
858
            packet.append(uatype_UInt16.pack(self.SourcePicoseconds))
859 1
        if self.ServerPicoseconds:
860
            packet.append(uatype_UInt16.pack(self.ServerPicoseconds))
861 1
        return b''.join(packet)
862
863 1
    @staticmethod
864
    def from_binary(data):
865 1
        encoding = ord(data.read(1))
866 1
        if encoding & (1 << 0):
867 1
            value = Variant.from_binary(data)
868
        else:
869
            value = None
870 1
        if encoding & (1 << 1):
871 1
            status = StatusCode.from_binary(data)
872
        else:
873
            status = None
874 1
        obj = DataValue(value, status)
875 1
        obj.Encoding = encoding
876 1
        if obj.Encoding & (1 << 2):
877 1
            obj.SourceTimestamp = unpack_datetime(data)  # DateTime.from_binary(data)
878 1
        if obj.Encoding & (1 << 3):
879 1
            obj.ServerTimestamp = unpack_datetime(data)  # DateTime.from_binary(data)
880 1
        if obj.Encoding & (1 << 4):
881
            obj.SourcePicoseconds = uatype_UInt16.unpack(data.read(2))[0]
882 1
        if obj.Encoding & (1 << 5):
883
            obj.ServerPicoseconds = uatype_UInt16.unpack(data.read(2))[0]
884 1
        return obj
885
886 1
    def __str__(self):
887
        s = 'DataValue(Value:{}'.format(self.Value)
888
        if self.StatusCode is not None:
889
            s += ', StatusCode:{}'.format(self.StatusCode)
890
        if self.SourceTimestamp is not None:
891
            s += ', SourceTimestamp:{}'.format(self.SourceTimestamp)
892
        if self.ServerTimestamp is not None:
893
            s += ', ServerTimestamp:{}'.format(self.ServerTimestamp)
894
        if self.SourcePicoseconds is not None:
895
            s += ', SourcePicoseconds:{}'.format(self.SourcePicoseconds)
896
        if self.ServerPicoseconds is not None:
897
            s += ', ServerPicoseconds:{}'.format(self.ServerPicoseconds)
898
        s += ')'
899
        return s
900
901 1
    __repr__ = __str__
902
903
904 1
__nodeid_counter = 2000
905
906
907 1
def generate_nodeid(idx):
908
    global __nodeid_counter
909 1
    __nodeid_counter += 1
910
    return NodeId(__nodeid_counter, idx)
911