Completed
Push — dev ( 3b343f...d958a8 )
by Olivier
04:35 queued 02:25
created

opcua.win_epoch_to_datetime()   A

Complexity

Conditions 1

Size

Total Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1
Metric Value
cc 1
dl 0
loc 2
ccs 2
cts 2
cp 1
crap 1
rs 10
1
"""
2
implement ua datatypes
3
"""
4 1
import logging
5 1
from enum import Enum
6 1
from datetime import datetime, timedelta, tzinfo
7 1
from calendar import timegm
8 1
import sys
9 1
if sys.version_info.major > 2:
10 1
    unicode = str
11
12 1
import uuid
13 1
import struct
14
15 1
import opcua.status_code as status_code
16 1
from opcua.object_ids import ObjectIds
17
18 1
logger = logging.getLogger('opcua.uaprotocol')
19
20
# types that will packed and unpacked directly using struct (string, bytes and datetime are handles as special cases
21 1
UaTypes = ("Boolean", "SByte", "Byte", "Int8", "UInt8", "Int16", "UInt16", "Int32", "UInt32", "Int64", "UInt64", "Float", "Double")
22
23
24 1
EPOCH_AS_FILETIME = 116444736000000000  # January 1, 1970 as MS file time
25 1
HUNDREDS_OF_NANOSECONDS = 10000000
26 1
FILETIME_EPOCH_AS_DATETIME = datetime(1601, 1, 1)
27
28
29 1
class UTC(tzinfo):
30
31
    """UTC"""
32
33 1
    def utcoffset(self, dt):
34
        return timedelta(0)
35
36 1
    def tzname(self, dt):
37
        return "UTC"
38
39 1
    def dst(self, dt):
40 1
        return timedelta(0)
41
42
43
# method copied from David Buxton <[email protected]> sample code
44 1
def datetime_to_win_epoch(dt):
45 1
    if (dt.tzinfo is None) or (dt.tzinfo.utcoffset(dt) is None):
46 1
        dt = dt.replace(tzinfo=UTC())
47 1
    ft = EPOCH_AS_FILETIME + (timegm(dt.timetuple()) * HUNDREDS_OF_NANOSECONDS)
48 1
    return ft + (dt.microsecond * 10)
49
50
51 1
def win_epoch_to_datetime(epch):
52 1
    return FILETIME_EPOCH_AS_DATETIME + timedelta(microseconds=epch // 10)
53
54
55 1
def uatype_to_fmt(uatype):
56 1
    if uatype == "Char":
57
        return "B"
58 1
    elif uatype == "SByte":
59
        return "B"
60 1
    elif uatype == "Int6":
61
        return "b"
62 1
    elif uatype == "Int8":
63
        return "b"
64 1
    elif uatype == "Int16":
65
        return "h"
66 1
    elif uatype == "Int32":
67 1
        return "i"
68 1
    elif uatype == "Int64":
69 1
        return "q"
70 1
    elif uatype == "UInt8":
71 1
        return "B"
72 1
    elif uatype == "UInt16":
73
        return "H"
74 1
    elif uatype == "UInt32":
75 1
        return "I"
76 1
    elif uatype == "UInt64":
77
        return "Q"
78 1
    elif uatype == "Boolean":
79 1
        return "?"
80 1
    elif uatype == "Double":
81 1
        return "d"
82 1
    elif uatype == "Float":
83
        return "f"
84 1
    elif uatype == "Byte":
85 1
        return "B"
86
    else:
87
        raise Exception("Error unknown uatype: " + uatype)
88
89
90 1
def pack_uatype_array(uatype, value):
91 1
    if value is None:
92
        return struct.pack("<i", -1)
93 1
    b = []
94 1
    b.append(struct.pack("<i", len(value)))
95 1
    for val in value:
96 1
        b.append(pack_uatype(uatype, val))
97 1
    return b"".join(b)
98
99
100 1
def pack_uatype(uatype, value):
101 1
    if uatype == "Null":
102 1
        return b''
103 1
    elif uatype == "String":
104 1
        return pack_string(value)
105 1
    elif uatype in ("CharArray", "ByteString"):
106 1
        return pack_bytes(value)
107 1
    elif uatype == "DateTime":
108 1
        epch = datetime_to_win_epoch(value)
109 1
        return struct.pack('<q', epch)
110 1
    elif uatype in UaTypes:
111 1
        fmt = '<' + uatype_to_fmt(uatype)
112 1
        return struct.pack(fmt, value)
113 1
    elif uatype == "ExtensionObject":
114
        # dependency loop: classes in uaprotocol_auto use Variant defined in this file,
115
        # but Variant can contain any object from uaprotocol_auto as ExtensionObject.
116
        # Using local import to avoid import loop
117 1
        from opcua.uaprotocol_auto import extensionobject_to_binary
118 1
        return extensionobject_to_binary(value)
119
    else:
120 1
        return value.to_binary()
121
122
123 1
def unpack_uatype(uatype, data):
124 1
    if uatype == "String":
125 1
        return unpack_string(data)
126 1
    elif uatype in ("CharArray", "ByteString"):
127 1
        return unpack_bytes(data)
128 1
    elif uatype == "DateTime":
129 1
        epch = struct.unpack('<q', data.read(8))[0]
130 1
        return win_epoch_to_datetime(epch)
131 1
    elif uatype in UaTypes:
132 1
        fmt = '<' + uatype_to_fmt(uatype)
133 1
        size = struct.calcsize(fmt)
134 1
        return struct.unpack(fmt, data.read(size))[0]
135 1
    elif uatype == "ExtensionObject":
136
        # dependency loop: classes in uaprotocol_auto use Variant defined in this file,
137
        # but Variant can contain any object from uaprotocol_auto as ExtensionObject.
138
        # Using local import to avoid import loop
139 1
        from opcua.uaprotocol_auto import extensionobject_from_binary
140 1
        return extensionobject_from_binary(data)
141
    else:
142 1
        code = "{}.from_binary(data)".format(uatype)
143 1
        tmp = eval(code)
144 1
        return tmp
145
146
147 1
def unpack_uatype_array(uatype, data):
148 1
    length = struct.unpack('<i', data.read(4))[0]
149 1
    if length == -1:
150
        return None
151
    else:
152 1
        result = []
153 1
        for _ in range(0, length):
154 1
            result.append(unpack_uatype(uatype, data))
155 1
        return result
156
157
158 1
def pack_string(string):
159 1
    if type(string) is unicode:
160 1
        string = string.encode('utf-8')
161 1
    length = len(string)
162 1
    if length == 0:
163 1
        return struct.pack("<i", -1)
164 1
    return struct.pack("<i", length) + string
165
166 1
pack_bytes = pack_string
167
168
169 1
def unpack_bytes(data):
170 1
    length = struct.unpack("<i", data.read(4))[0]
171 1
    if length == -1:
172 1
        return b''
173 1
    return data.read(length)
174
175
176 1
def unpack_string(data):
177 1
    b = unpack_bytes(data)
178 1
    if sys.version_info.major < 3:
179
        return str(b)
180 1
    return b.decode("utf-8")
181
182
183 1
def test_bit(data, offset):
184 1
    mask = 1 << offset
185 1
    return data & mask
186
187
188 1
def set_bit(data, offset):
189
    mask = 1 << offset
190
    return data | mask
191
192
193 1
class FrozenClass(object):
194
195
    """
196
    make it impossible to add members to a class.
197
    This is a hack since I found out that most bugs are due to misspelling a variable in protocol
198
    """
199 1
    __isfrozen = False
200
201 1
    def __setattr__(self, key, value):
202 1
        if self.__isfrozen and not hasattr(self, key):
203
            raise TypeError("Error adding member '{}' to class '{}', class is frozen, members are {}".format(key, self.__class__.__name__, self.__dict__.keys()))
204 1
        object.__setattr__(self, key, value)
205
206 1
    def _freeze(self):
207 1
        self.__isfrozen = True
208
209
210 1
class Guid(object):
211
212 1
    def __init__(self):
213 1
        self.uuid = uuid.uuid4()
214
215 1
    def to_binary(self):
216 1
        return self.uuid.bytes
217
218 1
    @staticmethod
219
    def from_binary(data):
220 1
        g = Guid()
221 1
        g.uuid = uuid.UUID(bytes=data.read(16))
222 1
        return g
223
224 1
    def __eq__(self, other):
225 1
        return isinstance(other, Guid) and self.uuid == other.uuid
226
227
228 1
class StatusCode(FrozenClass):
229
230
    """
231
    :ivar value:
232
    :vartype value: int
233
    :ivar name:
234
    :vartype name: string
235
    :ivar doc:
236
    :vartype doc: string
237
    """
238
239 1
    def __init__(self, value=0):
240 1
        self.value = value
241 1
        self.name, self.doc = status_code.get_name_and_doc(value)
242
243 1
    def to_binary(self):
244 1
        return struct.pack("<I", self.value)
245
246 1
    @staticmethod
247
    def from_binary(data):
248 1
        val = struct.unpack("<I", data.read(4))[0]
249 1
        sc = StatusCode(val)
250 1
        return sc
251
252 1
    def check(self):
253
        """
254
        raise en exception if status code is anything else than 0
255
        use is is_good() method if not exception is desired
256
        """
257 1
        if self.value != 0:
258 1
            raise Exception("{}({})".format(self.doc, self.name))
259
260 1
    def is_good(self):
261
        """
262
        return True if status is Good.
263
        """
264 1
        if self.value == 0:
265 1
            return True
266 1
        return False
267
268 1
    def __str__(self):
269
        return 'StatusCode({})'.format(self.name)
270 1
    __repr__ = __str__
271
272
273 1
class NodeIdType(object):
274 1
    TwoByte = 0
275 1
    FourByte = 1
276 1
    Numeric = 2
277 1
    String = 3
278 1
    Guid = 4
279 1
    ByteString = 5
280
281
282 1
class NodeId(FrozenClass):
283
284
    """
285
    NodeId Object
286
287
    Args:
288
        identifier: The identifier might be an int, a string, bytes or a Guid
289
        namespaceidx(int): The index of the namespace
290
        nodeidtype(NodeIdType): The type of the nodeid if it cannor be guess or you want something special like twobyte nodeid or fourbytenodeid
291
292
293
    :ivar Identifier:
294
    :vartype Identifier: NodeId
295
    :ivar NamespaceIndex:
296
    :vartype NamespaceIndex: Int
297
    :ivar NamespaceUri:
298
    :vartype NamespaceUri: String
299
    :ivar ServerIndex:
300
    :vartype ServerIndex: Int
301
    """
302
303 1
    def __init__(self, identifier=None, namespaceidx=0, nodeidtype=None):
304 1
        self.Identifier = identifier
305 1
        self.NamespaceIndex = namespaceidx
306 1
        self.NodeIdType = nodeidtype
307 1
        self.NamespaceUri = ""
308 1
        self.ServerIndex = 0
309 1
        if self.Identifier is None:
310 1
            self.Identifier = 0
311 1
            self.NodeIdType = NodeIdType.TwoByte
312 1
            return
313 1
        if self.NodeIdType is None:
314 1
            if isinstance(self.Identifier, int):
315 1
                self.NodeIdType = NodeIdType.Numeric
316 1
            elif isinstance(self.Identifier, str):
317 1
                self.NodeIdType = NodeIdType.String
318
            elif isinstance(self.Identifier, bytes):
319
                self.NodeIdType = NodeIdType.ByteString
320
            else:
321
                raise Exception("NodeId: Could not guess type of NodeId, set NodeIdType")
322
323 1
    def __key(self):
324 1
        if self.NodeIdType in (NodeIdType.TwoByte, NodeIdType.FourByte, NodeIdType.Numeric):  # twobyte, fourbyte and numeric may represent the same node
325 1
            return self.NamespaceIndex, self.Identifier
326
        else:
327 1
            return self.NodeIdType, self.NamespaceIndex, self.Identifier
328
329 1
    def __eq__(self, node):
330 1
        return isinstance(node, NodeId) and self.__key() == node.__key()
331
332 1
    def __hash__(self):
333 1
        return hash(self.__key())
334
335 1
    @staticmethod
336
    def from_string(string):
337 1
        l = string.split(";")
338 1
        identifier = None
339 1
        namespace = 0
340 1
        ntype = None
341 1
        srv = None
342 1
        nsu = None
343 1
        for el in l:
344 1
            if not el:
345 1
                continue
346 1
            k, v = el.split("=")
347 1
            k = k.strip()
348 1
            v = v.strip()
349 1
            if k == "ns":
350 1
                namespace = int(v)
351 1
            elif k == "i":
352 1
                ntype = NodeIdType.Numeric
353 1
                identifier = int(v)
354 1
            elif k == "s":
355 1
                ntype = NodeIdType.String
356 1
                identifier = v
357 1
            elif k == "g":
358
                ntype = NodeIdType.Guid
359
                identifier = v
360 1
            elif k == "b":
361
                ntype = NodeIdType.ByteString
362
                identifier = v
363 1
            elif k == "srv":
364 1
                srv = v
365
            elif k == "nsu":
366
                nsu = v
367 1
        if identifier is None:
368
            raise Exception("Could not parse nodeid string: " + string)
369 1
        nodeid = NodeId(identifier, namespace, ntype)
370 1
        nodeid.NamespaceUri = nsu
371 1
        nodeid.ServerIndex = srv
372 1
        return nodeid
373
374 1
    def to_string(self):
375 1
        string = ""
376 1
        if self.NamespaceIndex != 0:
377 1
            string += "ns={};".format(self.NamespaceIndex)
378 1
        ntype = None
379 1
        if self.NodeIdType == NodeIdType.Numeric:
380
            ntype = "i"
381 1
        elif self.NodeIdType == NodeIdType.String:
382 1
            ntype = "s"
383 1
        elif self.NodeIdType == NodeIdType.TwoByte:
384 1
            ntype = "i"
385
        elif self.NodeIdType == NodeIdType.FourByte:
386
            ntype = "i"
387
        elif self.NodeIdType == NodeIdType.Guid:
388
            ntype = "g"
389
        elif self.NodeIdType == NodeIdType.ByteString:
390
            ntype = "b"
391 1
        string += "{}={}".format(ntype, self.Identifier)
392 1
        if self.ServerIndex:
393
            string = "srv=" + str(self.ServerIndex) + string
394 1
        if self.NamespaceUri:
395
            string += "nsu={}".format(self.NamespaceUri)
396 1
        return string
397
398 1
    def __str__(self):
399 1
        return "NodeId({})".format(self.to_string())
400 1
    __repr__ = __str__
401
402 1
    def to_binary(self):
403 1
        b = []
404 1
        b.append(struct.pack("<B", self.NodeIdType))
405 1
        if self.NodeIdType == NodeIdType.TwoByte:
406 1
            b.append(struct.pack("<B", self.Identifier))
407 1
        elif self.NodeIdType == NodeIdType.FourByte:
408 1
            b.append(struct.pack("<BH", self.NamespaceIndex, self.Identifier))
409 1
        elif self.NodeIdType == NodeIdType.Numeric:
410 1
            b.append(struct.pack("<HI", self.NamespaceIndex, self.Identifier))
411 1
        elif self.NodeIdType == NodeIdType.String:
412 1
            b.append(struct.pack("<H", self.NamespaceIndex))
413 1
            b.append(pack_string(self.Identifier))
414 1
        elif self.NodeIdType == NodeIdType.ByteString:
415 1
            b.append(struct.pack("<H", self.NamespaceIndex))
416 1
            b.append(pack_bytes(self.Identifier))
417
        else:
418 1
            b.append(struct.pack("<H", self.NamespaceIndex))
419 1
            b.append(self.Identifier.to_binary())
420 1
        return b"".join(b)
421
422 1
    @staticmethod
423
    def from_binary(data):
424 1
        nid = NodeId()
425 1
        encoding = struct.unpack("<B", data.read(1))[0]
426 1
        nid.NodeIdType = encoding & 0b00111111
427
428 1
        if nid.NodeIdType == NodeIdType.TwoByte:
429 1
            nid.Identifier = struct.unpack("<B", data.read(1))[0]
430 1
        elif nid.NodeIdType == NodeIdType.FourByte:
431 1
            nid.NamespaceIndex, nid.Identifier = struct.unpack("<BH", data.read(3))
432 1
        elif nid.NodeIdType == NodeIdType.Numeric:
433 1
            nid.NamespaceIndex, nid.Identifier = struct.unpack("<HI", data.read(6))
434 1
        elif nid.NodeIdType == NodeIdType.String:
435 1
            nid.NamespaceIndex = struct.unpack("<H", data.read(2))[0]
436 1
            nid.Identifier = unpack_string(data)
437 1
        elif nid.NodeIdType == NodeIdType.ByteString:
438 1
            nid.NamespaceIndex = struct.unpack("<H", data.read(2))[0]
439 1
            nid.Identifier = unpack_bytes(data)
440 1
        elif nid.NodeIdType == NodeIdType.Guid:
441 1
            nid.NamespaceIndex = struct.unpack("<H", data.read(2))[0]
442 1
            nid.Identifier = Guid.from_binary(data)
443
        else:
444
            raise Exception("Unknown NodeId encoding: " + str(nid.NodeIdType))
445
446 1
        if test_bit(encoding, 6):
447
            nid.NamespaceUri = unpack_string(data)
448 1
        if test_bit(encoding, 7):
449
            nid.ServerIndex = struct.unpack("<I", data.read(4))[0]
450
451 1
        return nid
452
453
454 1
class TwoByteNodeId(NodeId):
455
456 1
    def __init__(self, identifier):
457 1
        NodeId.__init__(self, identifier, 0, NodeIdType.TwoByte)
458
459
460 1
class FourByteNodeId(NodeId):
461
462 1
    def __init__(self, identifier, namespace=0):
463 1
        NodeId.__init__(self, identifier, namespace, NodeIdType.FourByte)
464
465
466 1
class NumericNodeId(NodeId):
467
468 1
    def __init__(self, identifier, namespace=0):
469 1
        NodeId.__init__(self, identifier, namespace, NodeIdType.Numeric)
470
471
472 1
class ByteStringNodeId(NodeId):
473
474 1
    def __init__(self, identifier, namespace=0):
475 1
        NodeId.__init__(self, identifier, namespace, NodeIdType.ByteString)
476
477
478 1
class GuidNodeId(NodeId):
479
480 1
    def __init__(self, identifier, namespace=0):
481 1
        NodeId.__init__(self, identifier, namespace, NodeIdType.Guid)
482
483
484 1
class StringNodeId(NodeId):
485
486 1
    def __init__(self, identifier, namespace=0):
487 1
        NodeId.__init__(self, identifier, namespace, NodeIdType.String)
488
489
490 1
ExpandedNodeId = NodeId
491
492
493 1
class QualifiedName(FrozenClass):
494
495
    '''
496
    A string qualified with a namespace index.
497
    '''
498
499 1
    def __init__(self, name="", namespaceidx=0):
500 1
        self.NamespaceIndex = namespaceidx
501 1
        self.Name = name
502
503 1
    def to_string(self):
504 1
        return "{}:{}".format(self.NamespaceIndex, self.Name)
505
506 1
    @staticmethod
507
    def from_string(string):
508 1
        if ":" in string:
509 1
            idx, name = string.split(":", 1)
510
        else:
511 1
            idx = 0
512 1
            name = string
513 1
        return QualifiedName(name, int(idx))
514
515 1
    def to_binary(self):
516 1
        packet = []
517 1
        fmt = '<H'
518 1
        packet.append(struct.pack(fmt, self.NamespaceIndex))
519 1
        packet.append(pack_string(self.Name))
520 1
        return b''.join(packet)
521
522 1
    @staticmethod
523
    def from_binary(data):
524 1
        obj = QualifiedName()
525 1
        fmt = '<H'
526 1
        obj.NamespaceIndex = struct.unpack(fmt, data.read(2))[0]
527 1
        obj.Name = unpack_string(data)
528 1
        return obj
529
530 1
    def __eq__(self, bname):
531 1
        return isinstance(bname, QualifiedName) and self.Name == bname.Name and self.NamespaceIndex == bname.NamespaceIndex
532
533 1
    def __str__(self):
534
        return 'QualifiedName({}:{})'.format(self.NamespaceIndex, self.Name)
535
536 1
    __repr__ = __str__
537
538
539 1
class LocalizedText(FrozenClass):
540
541
    '''
542
    A string qualified with a namespace index.
543
    '''
544
545 1
    def __init__(self, text=""):
546 1
        self.Encoding = 0
547 1
        self.Text = text
548 1
        if type(self.Text) is unicode:
549 1
            self.Text = self.Text.encode('utf-8')
550 1
        if self.Text:
551 1
            self.Encoding |= (1 << 1)
552 1
        self.Locale = b''
553 1
        self._freeze()
554
555 1
    def to_binary(self):
556 1
        packet = []
557 1
        if self.Locale:
558
            self.Encoding |= (1 << 0)
559 1
        if self.Text:
560 1
            self.Encoding |= (1 << 1)
561 1
        packet.append(pack_uatype('UInt8', self.Encoding))
562 1
        if self.Locale:
563
            packet.append(pack_uatype('CharArray', self.Locale))
564 1
        if self.Text:
565 1
            packet.append(pack_uatype('CharArray', self.Text))
566 1
        return b''.join(packet)
567
568 1
    @staticmethod
569
    def from_binary(data):
570 1
        obj = LocalizedText()
571 1
        obj.Encoding = unpack_uatype('UInt8', data)
572 1
        if obj.Encoding & (1 << 0):
573
            obj.Locale = unpack_uatype('CharArray', data)
574 1
        if obj.Encoding & (1 << 1):
575 1
            obj.Text = unpack_uatype('CharArray', data)
576 1
        return obj
577
578 1
    def to_string(self):
579
        # FIXME: use local
580
        return self.Text.decode()
581
582 1
    def __str__(self):
583
        return 'LocalizedText(' + 'Encoding:' + str(self.Encoding) + ', ' + \
584
            'Locale:' + str(self.Locale) + ', ' + \
585
            'Text:' + str(self.Text) + ')'
586 1
    __repr__ = __str__
587
588 1
    def __eq__(self, other):
589 1
        if isinstance(other, LocalizedText) and self.Locale == other.Locale and self.Text == other.Text:
590 1
            return True
591 1
        return False
592
593
594
595 1
class VariantType(Enum):
596
597
    '''
598
    The possible types of a variant.
599
600
    :ivar Null:
601
    :ivar Boolean:
602
    :ivar SByte:
603
    :ivar Byte:
604
    :ivar Int16:
605
    :ivar UInt16:
606
    :ivar Int32:
607
    :ivar UInt32:
608
    :ivar Int64:
609
    :ivar UInt64:
610
    :ivar Float:
611
    :ivar Double:
612
    :ivar String:
613
    :ivar DateTime:
614
    :ivar Guid:
615
    :ivar ByteString:
616
    :ivar XmlElement:
617
    :ivar NodeId:
618
    :ivar ExpandedNodeId:
619
    :ivar StatusCode:
620
    :ivar QualifiedName:
621
    :ivar LocalizedText:
622
    :ivar ExtensionObject:
623
    :ivar DataValue:
624
    :ivar Variant:
625
    :ivar DiagnosticInfo:
626
627
628
629
    '''
630 1
    Null = 0
631 1
    Boolean = 1
632 1
    SByte = 2
633 1
    Byte = 3
634 1
    Int16 = 4
635 1
    UInt16 = 5
636 1
    Int32 = 6
637 1
    UInt32 = 7
638 1
    Int64 = 8
639 1
    UInt64 = 9
640 1
    Float = 10
641 1
    Double = 11
642 1
    String = 12
643 1
    DateTime = 13
644 1
    Guid = 14
645 1
    ByteString = 15
646 1
    XmlElement = 16
647 1
    NodeId = 17
648 1
    ExpandedNodeId = 18
649 1
    StatusCode = 19
650 1
    QualifiedName = 20
651 1
    LocalizedText = 21
652 1
    ExtensionObject = 22
653 1
    DataValue = 23
654 1
    Variant = 24
655 1
    DiagnosticInfo = 25
656
657
658 1
class Variant(FrozenClass):
659
660
    """
661
    Create an OPC-UA Variant object.
662
    if no argument a Null Variant is created.
663
    if not variant type is given, attemps to guess type from python type
664
    if a variant is given as value, the new objects becomes a copy of the argument
665
666
    :ivar Value:
667
    :vartype Value: Any supported type
668
    :ivar VariantType:
669
    :vartype VariantType: VariantType
670
    """
671
672 1
    def __init__(self, value=None, varianttype=None):
673 1
        self.Encoding = 0
674 1
        self.Value = value
675 1
        self.VariantType = varianttype
676 1
        if isinstance(value, Variant):
677 1
            self.Value = value.Value
678 1
            self.VariantType = value.VariantType
679 1
        if self.VariantType is None:
680 1
            if type(self.Value) in (list, tuple):
681 1
                if len(self.Value) == 0:
682
                    raise Exception("could not guess UA variable type")
683 1
                self.VariantType = self._guess_type(self.Value[0])
684
            else:
685 1
                self.VariantType = self._guess_type(self.Value)
686
687 1
    def __eq__(self, other):
688 1
        if isinstance(other, Variant) and self.VariantType == other.VariantType and self.Value == other.Value:
689 1
            return True
690
        return False
691
692 1
    def _guess_type(self, val):
693 1
        if val is None:
694 1
            return VariantType.Null
695 1
        elif isinstance(val, bool):
696 1
            return VariantType.Boolean
697 1
        elif isinstance(val, float):
698 1
            return VariantType.Double
699 1
        elif isinstance(val, int):
700 1
            return VariantType.Int64
701 1
        elif type(val) in (str, unicode):
702 1
            return VariantType.String
703 1
        elif isinstance(val, bytes):
704 1
            return VariantType.ByteString
705 1
        elif isinstance(val, datetime):
706 1
            return VariantType.DateTime
707
        else:
708 1
            if isinstance(val, object):
709 1
                try:
710 1
                    return getattr(VariantType, val.__class__.__name__)
711 1
                except AttributeError:
712 1
                    return VariantType.ExtensionObject
713
            else:
714
                raise Exception("Could not guess UA type of {} with type {}, specify UA type".format(val, type(val)))
715
716 1
    def __str__(self):
717 1
        return "Variant(val:{},type:{})".format(self.Value, self.VariantType)
718 1
    __repr__ = __str__
719
720 1
    def to_binary(self):
721 1
        b = []
722 1
        mask = self.Encoding & 0b01111111
723 1
        self.Encoding = (self.VariantType.value | mask)
724 1
        if type(self.Value) in (list, tuple):
725 1
            self.Encoding |= (1 << 7)
726 1
            b.append(pack_uatype_array(self.VariantType.name, self.Value))
727
        else:
728 1
            b.append(pack_uatype(self.VariantType.name, self.Value))
729 1
        b.insert(0, struct.pack("<B", self.Encoding))
730 1
        return b"".join(b)
731
732 1
    @staticmethod
733
    def from_binary(data):
734 1
        obj = Variant()
735 1
        obj.Encoding = unpack_uatype('UInt8', data)
736 1
        val = obj.Encoding & 0b01111111
737 1
        obj.VariantType = VariantType(val)
738 1
        if obj.VariantType == VariantType.Null:
739 1
            return obj
740 1
        if obj.Encoding & (1 << 7):
741 1
            obj.Value = unpack_uatype_array(obj.VariantType.name, data)
742
        else:
743 1
            obj.Value = unpack_uatype(obj.VariantType.name, data)
744 1
        return obj
745
746
747 1
class DataValue(FrozenClass):
748
749
    '''
750
    A value with an associated timestamp, and quality.
751
    Automatically generated from xml , copied and modified here to fix errors in xml spec
752
753
    :ivar Value:
754
    :vartype Value: Variant
755
    :ivar StatusCode:
756
    :vartype StatusCode: StatusCode
757
    :ivar SourceTimestamp:
758
    :vartype SourceTimestamp: datetime
759
    :ivar SourcePicoSeconds:
760
    :vartype SourcePicoSeconds: int
761
    :ivar ServerTimestamp:
762
    :vartype ServerTimestamp: datetime
763
    :ivar ServerPicoseconds:
764
    :vartype ServerPicoseconds: int
765
766
    '''
767
768 1
    def __init__(self, variant=None):
769 1
        self.Encoding = 0
770 1
        if not isinstance(variant, Variant):
771 1
            variant = Variant(variant)
772 1
        self.Value = variant
773 1
        self.StatusCode = StatusCode()
774 1
        self.SourceTimestamp = None  # DateTime()
775 1
        self.SourcePicoseconds = None
776 1
        self.ServerTimestamp = None  # DateTime()
777 1
        self.ServerPicoseconds = None
778
779 1
    def to_binary(self):
780 1
        packet = []
781 1
        if self.Value:
782 1
            self.Encoding |= (1 << 0)
783 1
        if self.StatusCode:
784 1
            self.Encoding |= (1 << 1)
785 1
        if self.SourceTimestamp:
786 1
            self.Encoding |= (1 << 2)
787 1
        if self.ServerTimestamp:
788 1
            self.Encoding |= (1 << 3)
789 1
        if self.SourcePicoseconds:
790
            self.Encoding |= (1 << 4)
791 1
        if self.ServerPicoseconds:
792
            self.Encoding |= (1 << 5)
793 1
        packet.append(pack_uatype('UInt8', self.Encoding))
794 1
        if self.Value:
795 1
            packet.append(self.Value.to_binary())
796 1
        if self.StatusCode:
797 1
            packet.append(self.StatusCode.to_binary())
798 1
        if self.SourceTimestamp:
799 1
            packet.append(pack_uatype('DateTime', self.SourceTimestamp))  # self.SourceTimestamp.to_binary())
800 1
        if self.ServerTimestamp:
801 1
            packet.append(pack_uatype('DateTime', self.ServerTimestamp))  # self.ServerTimestamp.to_binary())
802 1
        if self.SourcePicoseconds:
803
            packet.append(pack_uatype('UInt16', self.SourcePicoseconds))
804 1
        if self.ServerPicoseconds:
805
            packet.append(pack_uatype('UInt16', self.ServerPicoseconds))
806 1
        return b''.join(packet)
807
808 1
    @staticmethod
809
    def from_binary(data):
810 1
        obj = DataValue()
811 1
        obj.Encoding = unpack_uatype('UInt8', data)
812 1
        if obj.Encoding & (1 << 0):
813 1
            obj.Value = Variant.from_binary(data)
814 1
        if obj.Encoding & (1 << 1):
815 1
            obj.StatusCode = StatusCode.from_binary(data)
816 1
        if obj.Encoding & (1 << 2):
817 1
            obj.SourceTimestamp = unpack_uatype('DateTime', data)  # DateTime.from_binary(data)
818 1
        if obj.Encoding & (1 << 3):
819 1
            obj.ServerTimestamp = unpack_uatype('DateTime', data)  # DateTime.from_binary(data)
820 1
        if obj.Encoding & (1 << 4):
821
            obj.SourcePicoseconds = unpack_uatype('UInt16', data)
822 1
        if obj.Encoding & (1 << 5):
823
            obj.ServerPicoseconds = unpack_uatype('UInt16', data)
824 1
        return obj
825
826 1
    def __str__(self):
827
        s = 'DataValue(Value:{}'.format(self.Value)
828
        if self.StatusCode is not None:
829
            s += ', StatusCode:{}'.format(self.StatusCode)
830
        if self.SourceTimestamp is not None:
831
            s += ', SourceTimestamp:{}'.format(self.SourceTimestamp)
832
        if self.ServerTimestamp is not None:
833
            s += ', ServerTimestamp:{}'.format(self.ServerTimestamp)
834
        if self.SourcePicoseconds is not None:
835
            s += ', SourcePicoseconds:{}'.format(self.SourcePicoseconds)
836
        if self.ServerPicoseconds is not None:
837
            s += ', ServerPicoseconds:{}'.format(self.ServerPicoseconds)
838
        s += ')'
839
        return s
840
841 1
    __repr__ = __str__
842
843
844 1
__nodeid_counter = 2000
845
846
847 1
def generate_nodeid(idx):
848
    global __nodeid_counter
849 1
    __nodeid_counter += 1
850 1
    return NodeId(__nodeid_counter, idx)
851
852
853