Passed
Branch dev (949ba8)
by Olivier
03:17 queued 01:08
created

opcua.ExtensionObject.from_object()   A

Complexity

Conditions 1

Size

Total Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 1
Metric Value
dl 0
loc 7
ccs 6
cts 6
cp 1
rs 9.4286
cc 1
crap 1
1
"""
2
implement ua datatypes
3
"""
4 1
import logging
5 1
from enum import Enum
6 1
from datetime import datetime, timedelta, tzinfo
7 1
from calendar import timegm
8 1
import sys
9 1
if sys.version_info.major > 2:
10 1
    unicode = str
11
12 1
import uuid
13 1
import struct
14
15 1
import opcua.status_code as status_code
16 1
from opcua.object_ids import ObjectIds
17
18 1
logger = logging.getLogger('opcua.uaprotocol')
19
20
# types that will packed and unpacked directly using struct (string, bytes and datetime are handles as special cases
21 1
UaTypes = ("Boolean", "SByte", "Byte", "Int8", "UInt8", "Int16", "UInt16", "Int32", "UInt32", "Int64", "UInt64", "Float", "Double")
22
23
24 1
EPOCH_AS_FILETIME = 116444736000000000  # January 1, 1970 as MS file time
25 1
HUNDREDS_OF_NANOSECONDS = 10000000
26
27
28 1
class UTC(tzinfo):
29
30
    """UTC"""
31
32 1
    def utcoffset(self, dt):
33
        return timedelta(0)
34
35 1
    def tzname(self, dt):
36
        return "UTC"
37
38 1
    def dst(self, dt):
39 1
        return timedelta(0)
40
41
42
# methods copied from  David Buxton <[email protected]> sample code
43 1
def datetime_to_win_epoch(dt):
44 1
    if (dt.tzinfo is None) or (dt.tzinfo.utcoffset(dt) is None):
45 1
        dt = dt.replace(tzinfo=UTC())
46 1
    ft = EPOCH_AS_FILETIME + (timegm(dt.timetuple()) * HUNDREDS_OF_NANOSECONDS)
47 1
    return ft + (dt.microsecond * 10)
48
49
50 1
def win_epoch_to_datetime(epch):
51 1
    (s, ns100) = divmod(epch - EPOCH_AS_FILETIME, HUNDREDS_OF_NANOSECONDS)
52 1
    try:
53 1
        dt = datetime.utcfromtimestamp(s)
54
    except Exception as ex: #FIXME: find out what kind of exceptin is raised!!!
55
        logger.debug("Exception occurred during conversion within 'win_epoch_to_datetime'. %s", ex)
56
        return datetime.now()
57 1
    dt = dt.replace(microsecond=(ns100 // 10))
58 1
    return dt
59
60
61 1
def uatype_to_fmt(uatype):
62 1
    if uatype == "Char":
63
        return "B"
64 1
    elif uatype == "SByte":
65
        return "B"
66 1
    elif uatype == "Int6":
67
        return "b"
68 1
    elif uatype == "Int8":
69
        return "b"
70 1
    elif uatype == "Int16":
71
        return "h"
72 1
    elif uatype == "Int32":
73 1
        return "i"
74 1
    elif uatype == "Int64":
75 1
        return "q"
76 1
    elif uatype == "UInt8":
77 1
        return "B"
78 1
    elif uatype == "UInt16":
79
        return "H"
80 1
    elif uatype == "UInt32":
81 1
        return "I"
82 1
    elif uatype == "UInt64":
83
        return "Q"
84 1
    elif uatype == "Boolean":
85 1
        return "?"
86 1
    elif uatype == "Double":
87 1
        return "d"
88 1
    elif uatype == "Float":
89
        return "f"
90 1
    elif uatype == "Byte":
91 1
        return "B"
92
    else:
93
        raise Exception("Error unknown uatype: " + uatype)
94
95
96 1
def pack_uatype_array(uatype, value):
97 1
    if value is None:
98
        return struct.pack("<i", -1)
99 1
    b = []
100 1
    b.append(struct.pack("<i", len(value)))
101 1
    for val in value:
102 1
        b.append(pack_uatype(uatype, val))
103 1
    return b"".join(b)
104
105
106 1
def pack_uatype(uatype, value):
107 1
    if uatype == "Null":
108 1
        return b''
109 1
    elif uatype == "String":
110 1
        return pack_string(value)
111 1
    elif uatype in ("CharArray", "ByteString"):
112 1
        return pack_bytes(value)
113 1
    elif uatype == "DateTime":
114 1
        epch = datetime_to_win_epoch(value)
115 1
        return struct.pack('<q', epch)
116 1
    elif uatype in UaTypes:
117 1
        fmt = '<' + uatype_to_fmt(uatype)
118 1
        return struct.pack(fmt, value)
119 1
    elif uatype == "ExtensionObject":
120
        # dependency loop: classes in uaprotocol_auto use Variant defined in this file,
121
        # but Variant can contain any object from uaprotocol_auto as ExtensionObject.
122
        # Using local import to avoid import loop
123 1
        from opcua.uaprotocol_auto import extensionobject_to_binary
124 1
        return extensionobject_to_binary(value)
125
    else:
126 1
        return value.to_binary()
127
128
129 1
def unpack_uatype(uatype, data):
130 1
    if uatype == "String":
131 1
        return unpack_string(data)
132 1
    elif uatype in ("CharArray", "ByteString"):
133 1
        return unpack_bytes(data)
134 1
    elif uatype == "DateTime":
135 1
        epch = struct.unpack('<q', data.read(8))[0]
136 1
        return win_epoch_to_datetime(epch)
137 1
    elif uatype in UaTypes:
138 1
        fmt = '<' + uatype_to_fmt(uatype)
139 1
        size = struct.calcsize(fmt)
140 1
        return struct.unpack(fmt, data.read(size))[0]
141 1
    elif uatype == "ExtensionObject":
142
        # dependency loop: classes in uaprotocol_auto use Variant defined in this file,
143
        # but Variant can contain any object from uaprotocol_auto as ExtensionObject.
144
        # Using local import to avoid import loop
145 1
        from opcua.uaprotocol_auto import extensionobject_from_binary
146 1
        return extensionobject_from_binary(data)
147
    else:
148 1
        code = "{}.from_binary(data)".format(uatype)
149 1
        tmp = eval(code)
150 1
        return tmp
151
152
153 1
def unpack_uatype_array(uatype, data):
154 1
    length = struct.unpack('<i', data.read(4))[0]
155 1
    if length == -1:
156
        return None
157
    else:
158 1
        result = []
159 1
        for _ in range(0, length):
160 1
            result.append(unpack_uatype(uatype, data))
161 1
        return result
162
163
164 1
def pack_string(string):
165 1
    if type(string) is unicode:
166 1
        string = string.encode('utf-8')
167 1
    length = len(string)
168 1
    if length == 0:
169 1
        return struct.pack("<i", -1)
170 1
    return struct.pack("<i", length) + string
171
172 1
pack_bytes = pack_string
173
174
175 1
def unpack_bytes(data):
176 1
    length = struct.unpack("<i", data.read(4))[0]
177 1
    if length == -1:
178 1
        return b''
179 1
    return data.read(length)
180
181
182 1
def unpack_string(data):
183 1
    b = unpack_bytes(data)
184 1
    if sys.version_info.major < 3:
185
        return str(b)
186 1
    return b.decode("utf-8")
187
188
189 1
def test_bit(data, offset):
190 1
    mask = 1 << offset
191 1
    return data & mask
192
193
194 1
def set_bit(data, offset):
195
    mask = 1 << offset
196
    return data | mask
197
198
199 1
class FrozenClass(object):
200
201
    """
202
    make it impossible to add members to a class.
203
    This is a hack since I found out that most bugs are due to misspelling a variable in protocol
204
    """
205 1
    __isfrozen = False
206
207 1
    def __setattr__(self, key, value):
208 1
        if self.__isfrozen and not hasattr(self, key):
209
            raise TypeError("Error adding member '{}' to class '{}', class is frozen, members are {}".format(key, self.__class__.__name__, self.__dict__.keys()))
210 1
        object.__setattr__(self, key, value)
211
212 1
    def _freeze(self):
213 1
        self.__isfrozen = True
214
215
216 1
class Guid(object):
217
218 1
    def __init__(self):
219 1
        self.uuid = uuid.uuid4()
220
221 1
    def to_binary(self):
222 1
        return self.uuid.bytes
223
224 1
    @staticmethod
225
    def from_binary(data):
226 1
        g = Guid()
227 1
        g.uuid = uuid.UUID(bytes=data.read(16))
228 1
        return g
229
230 1
    def __eq__(self, other):
231 1
        return isinstance(other, Guid) and self.uuid == other.uuid
232
233
234 1
class StatusCode(FrozenClass):
235
236
    """
237
    :ivar value:
238
    :vartype value: int
239
    :ivar name:
240
    :vartype name: string
241
    :ivar doc:
242
    :vartype doc: string
243
    """
244
245 1
    def __init__(self, value=0):
246 1
        self.value = value
247 1
        self.name, self.doc = status_code.get_name_and_doc(value)
248
249 1
    def to_binary(self):
250 1
        return struct.pack("<I", self.value)
251
252 1
    @staticmethod
253
    def from_binary(data):
254 1
        val = struct.unpack("<I", data.read(4))[0]
255 1
        sc = StatusCode(val)
256 1
        return sc
257
258 1
    def check(self):
259
        """
260
        raise en exception if status code is anything else than 0
261
        use is is_good() method if not exception is desired
262
        """
263 1
        if self.value != 0:
264 1
            raise Exception("{}({})".format(self.doc, self.name))
265
266 1
    def is_good(self):
267
        """
268
        return True if status is Good.
269
        """
270 1
        if self.value == 0:
271 1
            return True
272 1
        return False
273
274 1
    def __str__(self):
275
        return 'StatusCode({})'.format(self.name)
276 1
    __repr__ = __str__
277
278
279 1
class NodeIdType(object):
280 1
    TwoByte = 0
281 1
    FourByte = 1
282 1
    Numeric = 2
283 1
    String = 3
284 1
    Guid = 4
285 1
    ByteString = 5
286
287
288 1
class NodeId(FrozenClass):
289
290
    """
291
    NodeId Object
292
293
    Args:
294
        identifier: The identifier might be an int, a string, bytes or a Guid
295
        namespaceidx(int): The index of the namespace
296
        nodeidtype(NodeIdType): The type of the nodeid if it cannor be guess or you want something special like twobyte nodeid or fourbytenodeid
297
298
299
    :ivar Identifier:
300
    :vartype Identifier: NodeId
301
    :ivar NamespaceIndex:
302
    :vartype NamespaceIndex: Int
303
    :ivar NamespaceUri:
304
    :vartype NamespaceUri: String
305
    :ivar ServerIndex:
306
    :vartype ServerIndex: Int
307
    """
308
309 1
    def __init__(self, identifier=None, namespaceidx=0, nodeidtype=None):
310 1
        self.Identifier = identifier
311 1
        self.NamespaceIndex = namespaceidx
312 1
        self.NodeIdType = nodeidtype
313 1
        self.NamespaceUri = ""
314 1
        self.ServerIndex = 0
315 1
        if self.Identifier is None:
316 1
            self.Identifier = 0
317 1
            self.NodeIdType = NodeIdType.TwoByte
318 1
            return
319 1
        if self.NodeIdType is None:
320 1
            if isinstance(self.Identifier, int):
321 1
                self.NodeIdType = NodeIdType.Numeric
322 1
            elif isinstance(self.Identifier, str):
323 1
                self.NodeIdType = NodeIdType.String
324
            elif isinstance(self.Identifier, bytes):
325
                self.NodeIdType = NodeIdType.ByteString
326
            else:
327
                raise Exception("NodeId: Could not guess type of NodeId, set NodeIdType")
328
329 1
    def __key(self):
330 1
        if self.NodeIdType in (NodeIdType.TwoByte, NodeIdType.FourByte, NodeIdType.Numeric):  # twobyte, fourbyte and numeric may represent the same node
331 1
            return self.NamespaceIndex, self.Identifier
332
        else:
333 1
            return self.NodeIdType, self.NamespaceIndex, self.Identifier
334
335 1
    def __eq__(self, node):
336 1
        return isinstance(node, NodeId) and self.__key() == node.__key()
337
338 1
    def __hash__(self):
339 1
        return hash(self.__key())
340
341 1
    @staticmethod
342
    def from_string(string):
343 1
        l = string.split(";")
344 1
        identifier = None
345 1
        namespace = 0
346 1
        ntype = None
347 1
        srv = None
348 1
        nsu = None
349 1
        for el in l:
350 1
            if not el:
351 1
                continue
352 1
            k, v = el.split("=")
353 1
            k = k.strip()
354 1
            v = v.strip()
355 1
            if k == "ns":
356 1
                namespace = int(v)
357 1
            elif k == "i":
358 1
                ntype = NodeIdType.Numeric
359 1
                identifier = int(v)
360 1
            elif k == "s":
361 1
                ntype = NodeIdType.String
362 1
                identifier = v
363 1
            elif k == "g":
364
                ntype = NodeIdType.Guid
365
                identifier = v
366 1
            elif k == "b":
367
                ntype = NodeIdType.ByteString
368
                identifier = v
369 1
            elif k == "srv":
370 1
                srv = v
371
            elif k == "nsu":
372
                nsu = v
373 1
        if identifier is None:
374
            raise Exception("Could not parse nodeid string: " + string)
375 1
        nodeid = NodeId(identifier, namespace, ntype)
376 1
        nodeid.NamespaceUri = nsu
377 1
        nodeid.ServerIndex = srv
378 1
        return nodeid
379
380 1
    def to_string(self):
381 1
        string = ""
382 1
        if self.NamespaceIndex != 0:
383 1
            string += "ns={};".format(self.NamespaceIndex)
384 1
        ntype = None
385 1
        if self.NodeIdType == NodeIdType.Numeric:
386
            ntype = "i"
387 1
        elif self.NodeIdType == NodeIdType.String:
388 1
            ntype = "s"
389 1
        elif self.NodeIdType == NodeIdType.TwoByte:
390 1
            ntype = "i"
391
        elif self.NodeIdType == NodeIdType.FourByte:
392
            ntype = "i"
393
        elif self.NodeIdType == NodeIdType.Guid:
394
            ntype = "g"
395
        elif self.NodeIdType == NodeIdType.ByteString:
396
            ntype = "b"
397 1
        string += "{}={}".format(ntype, self.Identifier)
398 1
        if self.ServerIndex:
399
            string = "srv=" + str(self.ServerIndex) + string
400 1
        if self.NamespaceUri:
401
            string += "nsu={}".format(self.NamespaceUri)
402 1
        return string
403
404 1
    def __str__(self):
405 1
        return "NodeId({})".format(self.to_string())
406 1
    __repr__ = __str__
407
408 1
    def to_binary(self):
409 1
        b = []
410 1
        b.append(struct.pack("<B", self.NodeIdType))
411 1
        if self.NodeIdType == NodeIdType.TwoByte:
412 1
            b.append(struct.pack("<B", self.Identifier))
413 1
        elif self.NodeIdType == NodeIdType.FourByte:
414 1
            b.append(struct.pack("<BH", self.NamespaceIndex, self.Identifier))
415 1
        elif self.NodeIdType == NodeIdType.Numeric:
416 1
            b.append(struct.pack("<HI", self.NamespaceIndex, self.Identifier))
417 1
        elif self.NodeIdType == NodeIdType.String:
418 1
            b.append(struct.pack("<H", self.NamespaceIndex))
419 1
            b.append(pack_string(self.Identifier))
420 1
        elif self.NodeIdType == NodeIdType.ByteString:
421 1
            b.append(struct.pack("<H", self.NamespaceIndex))
422 1
            b.append(pack_bytes(self.Identifier))
423
        else:
424 1
            b.append(struct.pack("<H", self.NamespaceIndex))
425 1
            b.append(self.Identifier.to_binary())
426 1
        return b"".join(b)
427
428 1
    @staticmethod
429
    def from_binary(data):
430 1
        nid = NodeId()
431 1
        encoding = struct.unpack("<B", data.read(1))[0]
432 1
        nid.NodeIdType = encoding & 0b00111111
433
434 1
        if nid.NodeIdType == NodeIdType.TwoByte:
435 1
            nid.Identifier = struct.unpack("<B", data.read(1))[0]
436 1
        elif nid.NodeIdType == NodeIdType.FourByte:
437 1
            nid.NamespaceIndex, nid.Identifier = struct.unpack("<BH", data.read(3))
438 1
        elif nid.NodeIdType == NodeIdType.Numeric:
439 1
            nid.NamespaceIndex, nid.Identifier = struct.unpack("<HI", data.read(6))
440 1
        elif nid.NodeIdType == NodeIdType.String:
441 1
            nid.NamespaceIndex = struct.unpack("<H", data.read(2))[0]
442 1
            nid.Identifier = unpack_string(data)
443 1
        elif nid.NodeIdType == NodeIdType.ByteString:
444 1
            nid.NamespaceIndex = struct.unpack("<H", data.read(2))[0]
445 1
            nid.Identifier = unpack_bytes(data)
446 1
        elif nid.NodeIdType == NodeIdType.Guid:
447 1
            nid.NamespaceIndex = struct.unpack("<H", data.read(2))[0]
448 1
            nid.Identifier = Guid.from_binary(data)
449
        else:
450
            raise Exception("Unknown NodeId encoding: " + str(nid.NodeIdType))
451
452 1
        if test_bit(encoding, 6):
453
            nid.NamespaceUri = unpack_string(data)
454 1
        if test_bit(encoding, 7):
455
            nid.ServerIndex = struct.unpack("<I", data.read(4))[0]
456
457 1
        return nid
458
459
460 1
class TwoByteNodeId(NodeId):
461
462 1
    def __init__(self, identifier):
463 1
        NodeId.__init__(self, identifier, 0, NodeIdType.TwoByte)
464
465
466 1
class FourByteNodeId(NodeId):
467
468 1
    def __init__(self, identifier, namespace=0):
469 1
        NodeId.__init__(self, identifier, namespace, NodeIdType.FourByte)
470
471
472 1
class NumericNodeId(NodeId):
473
474 1
    def __init__(self, identifier, namespace=0):
475 1
        NodeId.__init__(self, identifier, namespace, NodeIdType.Numeric)
476
477
478 1
class ByteStringNodeId(NodeId):
479
480 1
    def __init__(self, identifier, namespace=0):
481 1
        NodeId.__init__(self, identifier, namespace, NodeIdType.ByteString)
482
483
484 1
class GuidNodeId(NodeId):
485
486 1
    def __init__(self, identifier, namespace=0):
487 1
        NodeId.__init__(self, identifier, namespace, NodeIdType.Guid)
488
489
490 1
class StringNodeId(NodeId):
491
492 1
    def __init__(self, identifier, namespace=0):
493 1
        NodeId.__init__(self, identifier, namespace, NodeIdType.String)
494
495
496 1
ExpandedNodeId = NodeId
497
498
499 1
class QualifiedName(FrozenClass):
500
501
    '''
502
    A string qualified with a namespace index.
503
    '''
504
505 1
    def __init__(self, name="", namespaceidx=0):
506 1
        self.NamespaceIndex = namespaceidx
507 1
        self.Name = name
508
509 1
    def to_string(self):
510 1
        return "{}:{}".format(self.NamespaceIndex, self.Name)
511
512 1
    @staticmethod
513
    def from_string(string):
514 1
        if ":" in string:
515 1
            idx, name = string.split(":", 1)
516
        else:
517 1
            idx = 0
518 1
            name = string
519 1
        return QualifiedName(name, int(idx))
520
521 1
    def to_binary(self):
522 1
        packet = []
523 1
        fmt = '<H'
524 1
        packet.append(struct.pack(fmt, self.NamespaceIndex))
525 1
        packet.append(pack_string(self.Name))
526 1
        return b''.join(packet)
527
528 1
    @staticmethod
529
    def from_binary(data):
530 1
        obj = QualifiedName()
531 1
        fmt = '<H'
532 1
        obj.NamespaceIndex = struct.unpack(fmt, data.read(2))[0]
533 1
        obj.Name = unpack_string(data)
534 1
        return obj
535
536 1
    def __eq__(self, bname):
537 1
        return isinstance(bname, QualifiedName) and self.Name == bname.Name and self.NamespaceIndex == bname.NamespaceIndex
538
539 1
    def __str__(self):
540
        return 'QualifiedName({}:{})'.format(self.NamespaceIndex, self.Name)
541
542 1
    __repr__ = __str__
543
544
545 1
class LocalizedText(FrozenClass):
546
547
    '''
548
    A string qualified with a namespace index.
549
    '''
550
551 1
    def __init__(self, text=""):
552 1
        self.Encoding = 0
553 1
        self.Text = text
554 1
        if type(self.Text) is unicode:
555 1
            self.Text = self.Text.encode('utf-8')
556 1
        if self.Text:
557 1
            self.Encoding |= (1 << 1)
558 1
        self.Locale = b''
559 1
        self._freeze()
560
561 1
    def to_binary(self):
562 1
        packet = []
563 1
        if self.Locale:
564
            self.Encoding |= (1 << 0)
565 1
        if self.Text:
566 1
            self.Encoding |= (1 << 1)
567 1
        packet.append(pack_uatype('UInt8', self.Encoding))
568 1
        if self.Locale:
569
            packet.append(pack_uatype('CharArray', self.Locale))
570 1
        if self.Text:
571 1
            packet.append(pack_uatype('CharArray', self.Text))
572 1
        return b''.join(packet)
573
574 1
    @staticmethod
575
    def from_binary(data):
576 1
        obj = LocalizedText()
577 1
        obj.Encoding = unpack_uatype('UInt8', data)
578 1
        if obj.Encoding & (1 << 0):
579
            obj.Locale = unpack_uatype('CharArray', data)
580 1
        if obj.Encoding & (1 << 1):
581 1
            obj.Text = unpack_uatype('CharArray', data)
582 1
        return obj
583
584 1
    def to_string(self):
585
        # FIXME: use local
586
        return self.Text.decode()
587
588 1
    def __str__(self):
589
        return 'LocalizedText(' + 'Encoding:' + str(self.Encoding) + ', ' + \
590
            'Locale:' + str(self.Locale) + ', ' + \
591
            'Text:' + str(self.Text) + ')'
592 1
    __repr__ = __str__
593
594 1
    def __eq__(self, other):
595 1
        if isinstance(other, LocalizedText) and self.Locale == other.Locale and self.Text == other.Text:
596 1
            return True
597 1
        return False
598
599
600
601 1
class VariantType(Enum):
602
603
    '''
604
    The possible types of a variant.
605
606
    :ivar Null:
607
    :ivar Boolean:
608
    :ivar SByte:
609
    :ivar Byte:
610
    :ivar Int16:
611
    :ivar UInt16:
612
    :ivar Int32:
613
    :ivar UInt32:
614
    :ivar Int64:
615
    :ivar UInt64:
616
    :ivar Float:
617
    :ivar Double:
618
    :ivar String:
619
    :ivar DateTime:
620
    :ivar Guid:
621
    :ivar ByteString:
622
    :ivar XmlElement:
623
    :ivar NodeId:
624
    :ivar ExpandedNodeId:
625
    :ivar StatusCode:
626
    :ivar QualifiedName:
627
    :ivar LocalizedText:
628
    :ivar ExtensionObject:
629
    :ivar DataValue:
630
    :ivar Variant:
631
    :ivar DiagnosticInfo:
632
633
634
635
    '''
636 1
    Null = 0
637 1
    Boolean = 1
638 1
    SByte = 2
639 1
    Byte = 3
640 1
    Int16 = 4
641 1
    UInt16 = 5
642 1
    Int32 = 6
643 1
    UInt32 = 7
644 1
    Int64 = 8
645 1
    UInt64 = 9
646 1
    Float = 10
647 1
    Double = 11
648 1
    String = 12
649 1
    DateTime = 13
650 1
    Guid = 14
651 1
    ByteString = 15
652 1
    XmlElement = 16
653 1
    NodeId = 17
654 1
    ExpandedNodeId = 18
655 1
    StatusCode = 19
656 1
    QualifiedName = 20
657 1
    LocalizedText = 21
658 1
    ExtensionObject = 22
659 1
    DataValue = 23
660 1
    Variant = 24
661 1
    DiagnosticInfo = 25
662
663
664 1
class Variant(FrozenClass):
665
666
    """
667
    Create an OPC-UA Variant object.
668
    if no argument a Null Variant is created.
669
    if not variant type is given, attemps to guess type from python type
670
    if a variant is given as value, the new objects becomes a copy of the argument
671
672
    :ivar Value:
673
    :vartype Value: Any supported type
674
    :ivar VariantType:
675
    :vartype VariantType: VariantType
676
    """
677
678 1
    def __init__(self, value=None, varianttype=None):
679 1
        self.Encoding = 0
680 1
        self.Value = value
681 1
        self.VariantType = varianttype
682 1
        if isinstance(value, Variant):
683 1
            self.Value = value.Value
684 1
            self.VariantType = value.VariantType
685 1
        if self.VariantType is None:
686 1
            if type(self.Value) in (list, tuple):
687 1
                if len(self.Value) == 0:
688
                    raise Exception("could not guess UA variable type")
689 1
                self.VariantType = self._guess_type(self.Value[0])
690
            else:
691 1
                self.VariantType = self._guess_type(self.Value)
692
693 1
    def __eq__(self, other):
694 1
        if isinstance(other, Variant) and self.VariantType == other.VariantType and self.Value == other.Value:
695 1
            return True
696
        return False
697
698 1
    def _guess_type(self, val):
699 1
        if val is None:
700 1
            return VariantType.Null
701 1
        elif isinstance(val, bool):
702 1
            return VariantType.Boolean
703 1
        elif isinstance(val, float):
704 1
            return VariantType.Double
705 1
        elif isinstance(val, int):
706 1
            return VariantType.Int64
707 1
        elif type(val) in (str, unicode):
708 1
            return VariantType.String
709 1
        elif isinstance(val, bytes):
710 1
            return VariantType.ByteString
711 1
        elif isinstance(val, datetime):
712 1
            return VariantType.DateTime
713
        else:
714 1
            if isinstance(val, object):
715 1
                try:
716 1
                    return getattr(VariantType, val.__class__.__name__)
717 1
                except AttributeError:
718 1
                    return VariantType.ExtensionObject
719
            else:
720
                raise Exception("Could not guess UA type of {} with type {}, specify UA type".format(val, type(val)))
721
722 1
    def __str__(self):
723 1
        return "Variant(val:{},type:{})".format(self.Value, self.VariantType)
724 1
    __repr__ = __str__
725
726 1
    def to_binary(self):
727 1
        b = []
728 1
        mask = self.Encoding & 0b01111111
729 1
        self.Encoding = (self.VariantType.value | mask)
730 1
        if type(self.Value) in (list, tuple):
731 1
            self.Encoding |= (1 << 7)
732 1
            b.append(pack_uatype_array(self.VariantType.name, self.Value))
733
        else:
734 1
            b.append(pack_uatype(self.VariantType.name, self.Value))
735 1
        b.insert(0, struct.pack("<B", self.Encoding))
736 1
        return b"".join(b)
737
738 1
    @staticmethod
739
    def from_binary(data):
740 1
        obj = Variant()
741 1
        obj.Encoding = unpack_uatype('UInt8', data)
742 1
        val = obj.Encoding & 0b01111111
743 1
        obj.VariantType = VariantType(val)
744 1
        if obj.VariantType == VariantType.Null:
745 1
            return obj
746 1
        if obj.Encoding & (1 << 7):
747 1
            obj.Value = unpack_uatype_array(obj.VariantType.name, data)
748
        else:
749 1
            obj.Value = unpack_uatype(obj.VariantType.name, data)
750 1
        return obj
751
752
753 1
class DataValue(FrozenClass):
754
755
    '''
756
    A value with an associated timestamp, and quality.
757
    Automatically generated from xml , copied and modified here to fix errors in xml spec
758
759
    :ivar Value:
760
    :vartype Value: Variant
761
    :ivar StatusCode:
762
    :vartype StatusCode: StatusCode
763
    :ivar SourceTimestamp:
764
    :vartype SourceTimestamp: datetime
765
    :ivar SourcePicoSeconds:
766
    :vartype SourcePicoSeconds: int
767
    :ivar ServerTimestamp:
768
    :vartype ServerTimestamp: datetime
769
    :ivar ServerPicoseconds:
770
    :vartype ServerPicoseconds: int
771
772
    '''
773
774 1
    def __init__(self, variant=None):
775 1
        self.Encoding = 0
776 1
        if not isinstance(variant, Variant):
777 1
            variant = Variant(variant)
778 1
        self.Value = variant
779 1
        self.StatusCode = StatusCode()
780 1
        self.SourceTimestamp = None  # DateTime()
781 1
        self.SourcePicoseconds = None
782 1
        self.ServerTimestamp = None  # DateTime()
783 1
        self.ServerPicoseconds = None
784
785 1
    def to_binary(self):
786 1
        packet = []
787 1
        if self.Value:
788 1
            self.Encoding |= (1 << 0)
789 1
        if self.StatusCode:
790 1
            self.Encoding |= (1 << 1)
791 1
        if self.SourceTimestamp:
792 1
            self.Encoding |= (1 << 2)
793 1
        if self.ServerTimestamp:
794 1
            self.Encoding |= (1 << 3)
795 1
        if self.SourcePicoseconds:
796
            self.Encoding |= (1 << 4)
797 1
        if self.ServerPicoseconds:
798
            self.Encoding |= (1 << 5)
799 1
        packet.append(pack_uatype('UInt8', self.Encoding))
800 1
        if self.Value:
801 1
            packet.append(self.Value.to_binary())
802 1
        if self.StatusCode:
803 1
            packet.append(self.StatusCode.to_binary())
804 1
        if self.SourceTimestamp:
805 1
            packet.append(pack_uatype('DateTime', self.SourceTimestamp))  # self.SourceTimestamp.to_binary())
806 1
        if self.ServerTimestamp:
807 1
            packet.append(pack_uatype('DateTime', self.ServerTimestamp))  # self.ServerTimestamp.to_binary())
808 1
        if self.SourcePicoseconds:
809
            packet.append(pack_uatype('UInt16', self.SourcePicoseconds))
810 1
        if self.ServerPicoseconds:
811
            packet.append(pack_uatype('UInt16', self.ServerPicoseconds))
812 1
        return b''.join(packet)
813
814 1
    @staticmethod
815
    def from_binary(data):
816 1
        obj = DataValue()
817 1
        obj.Encoding = unpack_uatype('UInt8', data)
818 1
        if obj.Encoding & (1 << 0):
819 1
            obj.Value = Variant.from_binary(data)
820 1
        if obj.Encoding & (1 << 1):
821 1
            obj.StatusCode = StatusCode.from_binary(data)
822 1
        if obj.Encoding & (1 << 2):
823 1
            obj.SourceTimestamp = unpack_uatype('DateTime', data)  # DateTime.from_binary(data)
824 1
        if obj.Encoding & (1 << 3):
825 1
            obj.ServerTimestamp = unpack_uatype('DateTime', data)  # DateTime.from_binary(data)
826 1
        if obj.Encoding & (1 << 4):
827
            obj.SourcePicoseconds = unpack_uatype('UInt16', data)
828 1
        if obj.Encoding & (1 << 5):
829
            obj.ServerPicoseconds = unpack_uatype('UInt16', data)
830 1
        return obj
831
832 1
    def __str__(self):
833
        s = 'DataValue(Value:{}'.format(self.Value)
834
        if self.StatusCode is not None:
835
            s += ', StatusCode:{}'.format(self.StatusCode)
836
        if self.SourceTimestamp is not None:
837
            s += ', SourceTimestamp:{}'.format(self.SourceTimestamp)
838
        if self.ServerTimestamp is not None:
839
            s += ', ServerTimestamp:{}'.format(self.ServerTimestamp)
840
        if self.SourcePicoseconds is not None:
841
            s += ', SourcePicoseconds:{}'.format(self.SourcePicoseconds)
842
        if self.ServerPicoseconds is not None:
843
            s += ', ServerPicoseconds:{}'.format(self.ServerPicoseconds)
844
        s += ')'
845
        return s
846
847 1
    __repr__ = __str__
848
849
850 1
__nodeid_counter = 2000
851
852
853 1
def generate_nodeid(idx):
854
    global __nodeid_counter
855 1
    __nodeid_counter += 1
856 1
    return NodeId(__nodeid_counter, idx)
857
858
859