Completed
Pull Request — master (#72)
by
unknown
01:41
created

opcua.Guid.to_binary()   A

Complexity

Conditions 1

Size

Total Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1
Metric Value
dl 0
loc 2
ccs 2
cts 2
cp 1
rs 10
cc 1
crap 1
1
"""
2
implement ua datatypes
3
"""
4 1
import logging
5 1
from enum import Enum
6 1
from datetime import datetime, timedelta, tzinfo
7 1
from calendar import timegm
8 1
import sys
9 1
if sys.version_info.major > 2:
10 1
    unicode = str
11
12 1
import uuid
13 1
import struct
14
15 1
import opcua.status_code as status_code
16 1
from opcua.object_ids import ObjectIds
17
18 1
logger = logging.getLogger('opcua.uaprotocol')
19
20
# types that will packed and unpacked directly using struct (string, bytes and datetime are handles as special cases
21 1
UaTypes = ("Boolean", "SByte", "Byte", "Int8", "UInt8", "Int16", "UInt16", "Int32", "UInt32", "Int64", "UInt64", "Float", "Double")
22
23
24 1
EPOCH_AS_FILETIME = 116444736000000000  # January 1, 1970 as MS file time
25 1
HUNDREDS_OF_NANOSECONDS = 10000000
26 1
FILETIME_EPOCH_AS_DATETIME = datetime(1601, 1, 1)
27
28
29 1
class UTC(tzinfo):
30
31
    """UTC"""
32
33 1
    def utcoffset(self, dt):
34
        return timedelta(0)
35
36 1
    def tzname(self, dt):
37
        return "UTC"
38
39 1
    def dst(self, dt):
40 1
        return timedelta(0)
41
42
43
# method copied from David Buxton <[email protected]> sample code
44 1
def datetime_to_win_epoch(dt):
45 1
    if (dt.tzinfo is None) or (dt.tzinfo.utcoffset(dt) is None):
46 1
        dt = dt.replace(tzinfo=UTC())
47 1
    ft = EPOCH_AS_FILETIME + (timegm(dt.timetuple()) * HUNDREDS_OF_NANOSECONDS)
48 1
    return ft + (dt.microsecond * 10)
49
50
51 1
def win_epoch_to_datetime(epch):
52 1
    return FILETIME_EPOCH_AS_DATETIME + timedelta(microseconds=epch // 10)
53
54
55 1
def uatype_to_fmt(uatype):
56 1
    if uatype == "Char":
57
        return "B"
58 1
    elif uatype == "SByte":
59
        return "B"
60 1
    elif uatype == "Int6":
61
        return "b"
62 1
    elif uatype == "Int8":
63
        return "b"
64 1
    elif uatype == "Int16":
65
        return "h"
66 1
    elif uatype == "Int32":
67 1
        return "i"
68 1
    elif uatype == "Int64":
69 1
        return "q"
70 1
    elif uatype == "UInt8":
71 1
        return "B"
72 1
    elif uatype == "UInt16":
73
        return "H"
74 1
    elif uatype == "UInt32":
75 1
        return "I"
76 1
    elif uatype == "UInt64":
77
        return "Q"
78 1
    elif uatype == "Boolean":
79 1
        return "?"
80 1
    elif uatype == "Double":
81 1
        return "d"
82 1
    elif uatype == "Float":
83
        return "f"
84 1
    elif uatype == "Byte":
85 1
        return "B"
86
    else:
87
        raise Exception("Error unknown uatype: " + uatype)
88
89
90 1
def pack_uatype_array(uatype, value):
91 1
    if value is None:
92
        return struct.pack("<i", -1)
93 1
    b = []
94 1
    b.append(struct.pack("<i", len(value)))
95 1
    for val in value:
96 1
        b.append(pack_uatype(uatype, val))
97 1
    return b"".join(b)
98
99
100 1
def pack_uatype(uatype, value):
101 1
    if uatype == "Null":
102 1
        return b''
103 1
    elif uatype == "String":
104 1
        return pack_string(value)
105 1
    elif uatype in ("CharArray", "ByteString"):
106 1
        return pack_bytes(value)
107 1
    elif uatype == "DateTime":
108 1
        epch = datetime_to_win_epoch(value)
109 1
        return struct.pack('<q', epch)
110 1
    elif uatype in UaTypes:
111 1
        fmt = '<' + uatype_to_fmt(uatype)
112 1
        return struct.pack(fmt, value)
113 1
    elif uatype == "ExtensionObject":
114
        # dependency loop: classes in uaprotocol_auto use Variant defined in this file,
115
        # but Variant can contain any object from uaprotocol_auto as ExtensionObject.
116
        # Using local import to avoid import loop
117 1
        from opcua.uaprotocol_auto import extensionobject_to_binary
118 1
        return extensionobject_to_binary(value)
119
    else:
120 1
        return value.to_binary()
121
122
123 1
def unpack_uatype(uatype, data):
124 1
    if uatype == "String":
125 1
        return unpack_string(data)
126 1
    elif uatype in ("CharArray", "ByteString"):
127 1
        return unpack_bytes(data)
128 1
    elif uatype == "DateTime":
129 1
        epch = struct.unpack('<q', data.read(8))[0]
130 1
        return win_epoch_to_datetime(epch)
131 1
    elif uatype in UaTypes:
132 1
        fmt = '<' + uatype_to_fmt(uatype)
133 1
        size = struct.calcsize(fmt)
134 1
        return struct.unpack(fmt, data.read(size))[0]
135 1
    elif uatype == "ExtensionObject":
136
        # dependency loop: classes in uaprotocol_auto use Variant defined in this file,
137
        # but Variant can contain any object from uaprotocol_auto as ExtensionObject.
138
        # Using local import to avoid import loop
139 1
        from opcua.uaprotocol_auto import extensionobject_from_binary
140 1
        return extensionobject_from_binary(data)
141
    else:
142 1
        code = "{}.from_binary(data)".format(uatype)
143 1
        tmp = eval(code)
144 1
        return tmp
145
146
147 1
def unpack_uatype_array(uatype, data):
148 1
    length = struct.unpack('<i', data.read(4))[0]
149 1
    if length == -1:
150
        return None
151
    else:
152 1
        result = []
153 1
        for _ in range(0, length):
154 1
            result.append(unpack_uatype(uatype, data))
155 1
        return result
156
157
158 1
def pack_string(string):
159 1
    if type(string) is unicode:
160 1
        string = string.encode('utf-8')
161 1
    length = len(string)
162 1
    if length == 0:
163 1
        return struct.pack("<i", -1)
164 1
    return struct.pack("<i", length) + string
165
166 1
pack_bytes = pack_string
167
168
169 1
def unpack_bytes(data):
170 1
    length = struct.unpack("<i", data.read(4))[0]
171 1
    if length == -1:
172 1
        return b''
173 1
    return data.read(length)
174
175
176 1
def unpack_string(data):
177 1
    b = unpack_bytes(data)
178 1
    if sys.version_info.major < 3:
179
        return str(b)
180 1
    return b.decode("utf-8")
181
182
183 1
def test_bit(data, offset):
184 1
    mask = 1 << offset
185 1
    return data & mask
186
187
188 1
def set_bit(data, offset):
189
    mask = 1 << offset
190
    return data | mask
191
192
193 1
class FrozenClass(object):
194
195
    """
196
    make it impossible to add members to a class.
197
    This is a hack since I found out that most bugs are due to misspelling a variable in protocol
198
    """
199 1
    __isfrozen = False
200
201 1
    def __setattr__(self, key, value):
202 1
        if self.__isfrozen and not hasattr(self, key):
203
            raise TypeError("Error adding member '{}' to class '{}', class is frozen, members are {}".format(key, self.__class__.__name__, self.__dict__.keys()))
204 1
        object.__setattr__(self, key, value)
205
206 1
    def _freeze(self):
207 1
        self.__isfrozen = True
208
209
210 1
class Guid(object):
211
212 1
    def __init__(self):
213 1
        self.uuid = uuid.uuid4()
214
215 1
    def to_binary(self):
216 1
        return self.uuid.bytes_le #bytes -> bytes_le
217
218 1
    @staticmethod
219
    def from_binary(data):
220 1
        g = Guid()
221 1
        g.uuid = uuid.UUID(bytes=data.read(16))
222 1
        return g
223
224 1
    @staticmethod
225 1
    def from_string(identifier):
226
        g = Guid()
227
        g.uuid = uuid.UUID(hex=identifier)
228 1
        return g
229
230
    def __eq__(self, other):
231
        return isinstance(other, Guid) and self.uuid == other.uuid
232
233
234
class StatusCode(FrozenClass):
235
236
    """
237
    :ivar value:
238
    :vartype value: int
239 1
    :ivar name:
240 1
    :vartype name: string
241 1
    :ivar doc:
242
    :vartype doc: string
243 1
    """
244 1
245
    def __init__(self, value=0):
246 1
        self.value = value
247
        self.name, self.doc = status_code.get_name_and_doc(value)
248 1
249 1
    def to_binary(self):
250 1
        return struct.pack("<I", self.value)
251
252 1
    @staticmethod
253
    def from_binary(data):
254
        val = struct.unpack("<I", data.read(4))[0]
255
        sc = StatusCode(val)
256
        return sc
257 1
258 1
    def check(self):
259
        """
260 1
        raise en exception if status code is anything else than 0
261
        use is is_good() method if not exception is desired
262
        """
263
        if self.value != 0:
264 1
            raise Exception("{}({})".format(self.doc, self.name))
265 1
266 1
    def is_good(self):
267
        """
268 1
        return True if status is Good.
269
        """
270 1
        if self.value == 0:
271
            return True
272
        return False
273 1
274 1
    def __str__(self):
275 1
        return 'StatusCode({})'.format(self.name)
276 1
    __repr__ = __str__
277 1
278 1
279 1
class NodeIdType(object):
280
    TwoByte = 0
281
    FourByte = 1
282 1
    Numeric = 2
283
    String = 3
284
    Guid = 4
285
    ByteString = 5
286
287
288
class NodeId(FrozenClass):
289
290
    """
291
    NodeId Object
292
293
    Args:
294
        identifier: The identifier might be an int, a string, bytes or a Guid
295
        namespaceidx(int): The index of the namespace
296
        nodeidtype(NodeIdType): The type of the nodeid if it cannor be guess or you want something special like twobyte nodeid or fourbytenodeid
297
298
299
    :ivar Identifier:
300
    :vartype Identifier: NodeId
301
    :ivar NamespaceIndex:
302
    :vartype NamespaceIndex: Int
303 1
    :ivar NamespaceUri:
304 1
    :vartype NamespaceUri: String
305 1
    :ivar ServerIndex:
306 1
    :vartype ServerIndex: Int
307 1
    """
308 1
309 1
    def __init__(self, identifier=None, namespaceidx=0, nodeidtype=None):
310 1
        self.Identifier = identifier
311 1
        self.NamespaceIndex = namespaceidx
312 1
        self.NodeIdType = nodeidtype
313 1
        self.NamespaceUri = ""
314 1
        self.ServerIndex = 0
315 1
        if self.Identifier is None:
316 1
            self.Identifier = 0
317 1
            self.NodeIdType = NodeIdType.TwoByte
318
            return
319
        if self.NodeIdType is None:
320
            if isinstance(self.Identifier, int):
321
                self.NodeIdType = NodeIdType.Numeric
322
            elif isinstance(self.Identifier, str):
323 1
                self.NodeIdType = NodeIdType.String
324 1
            elif isinstance(self.Identifier, bytes):
325 1
                self.NodeIdType = NodeIdType.ByteString
326
            else:
327 1
                raise Exception("NodeId: Could not guess type of NodeId, set NodeIdType")
328
329 1
    def __key(self):
330 1
        if self.NodeIdType in (NodeIdType.TwoByte, NodeIdType.FourByte, NodeIdType.Numeric):  # twobyte, fourbyte and numeric may represent the same node
331
            return self.NamespaceIndex, self.Identifier
332 1
        else:
333 1
            return self.NodeIdType, self.NamespaceIndex, self.Identifier
334
335 1
    def __eq__(self, node):
336
        return isinstance(node, NodeId) and self.__key() == node.__key()
337 1
338 1
    def __hash__(self):
339 1
        return hash(self.__key())
340 1
341 1
    @staticmethod
342 1
    def from_string(string):
343 1
        l = string.split(";")
344 1
        identifier = None
345 1
        namespace = 0
346 1
        ntype = None
347 1
        srv = None
348 1
        nsu = None
349 1
        for el in l:
350 1
            if not el:
351 1
                continue
352 1
            k, v = el.split("=")
353 1
            k = k.strip()
354 1
            v = v.strip()
355 1
            if k == "ns":
356 1
                namespace = int(v)
357 1
            elif k == "i":
358
                ntype = NodeIdType.Numeric
359
                identifier = int(v)
360 1
            elif k == "s":
361
                ntype = NodeIdType.String
362
                identifier = v
363 1
            elif k == "g":
364 1
                ntype = NodeIdType.Guid
365
                identifier = Guid.from_string(v)
366
            elif k == "b":
367 1
                ntype = NodeIdType.ByteString
368
                identifier = v
369 1
            elif k == "srv":
370 1
                srv = v
371 1
            elif k == "nsu":
372 1
                nsu = v
373
        if identifier is None:
374 1
            raise Exception("Could not parse nodeid string: " + string)
375 1
        nodeid = NodeId(identifier, namespace, ntype)
376 1
        nodeid.NamespaceUri = nsu
377 1
        nodeid.ServerIndex = srv
378 1
        return nodeid
379 1
380
    def to_string(self):
381 1
        string = ""
382 1
        if self.NamespaceIndex != 0:
383 1
            string += "ns={};".format(self.NamespaceIndex)
384 1
        ntype = None
385
        if self.NodeIdType == NodeIdType.Numeric:
386
            ntype = "i"
387
        elif self.NodeIdType == NodeIdType.String:
388
            ntype = "s"
389
        elif self.NodeIdType == NodeIdType.TwoByte:
390
            ntype = "i"
391 1
        elif self.NodeIdType == NodeIdType.FourByte:
392 1
            ntype = "i"
393
        elif self.NodeIdType == NodeIdType.Guid:
394 1
            ntype = "g"
395
        elif self.NodeIdType == NodeIdType.ByteString:
396 1
            ntype = "b"
397
        string += "{}={}".format(ntype, self.Identifier)
398 1
        if self.ServerIndex:
399 1
            string = "srv=" + str(self.ServerIndex) + string
400 1
        if self.NamespaceUri:
401
            string += "nsu={}".format(self.NamespaceUri)
402 1
        return string
403 1
404 1
    def __str__(self):
405 1
        return "NodeId({})".format(self.to_string())
406 1
    __repr__ = __str__
407 1
408 1
    def to_binary(self):
409 1
        b = []
410 1
        b.append(struct.pack("<B", self.NodeIdType))
411 1
        if self.NodeIdType == NodeIdType.TwoByte:
412 1
            b.append(struct.pack("<B", self.Identifier))
413 1
        elif self.NodeIdType == NodeIdType.FourByte:
414 1
            b.append(struct.pack("<BH", self.NamespaceIndex, self.Identifier))
415 1
        elif self.NodeIdType == NodeIdType.Numeric:
416 1
            b.append(struct.pack("<HI", self.NamespaceIndex, self.Identifier))
417
        elif self.NodeIdType == NodeIdType.String:
418 1
            b.append(struct.pack("<H", self.NamespaceIndex))
419 1
            b.append(pack_string(self.Identifier))
420 1
        elif self.NodeIdType == NodeIdType.ByteString:
421
            b.append(struct.pack("<H", self.NamespaceIndex))
422 1
            b.append(pack_bytes(self.Identifier))
423
        else:
424 1
            b.append(struct.pack("<H", self.NamespaceIndex))
425 1
            b.append(self.Identifier.to_binary())
426 1
        return b"".join(b)
427
428 1
    @staticmethod
429 1
    def from_binary(data):
430 1
        nid = NodeId()
431 1
        encoding = struct.unpack("<B", data.read(1))[0]
432 1
        nid.NodeIdType = encoding & 0b00111111
433 1
434 1
        if nid.NodeIdType == NodeIdType.TwoByte:
435 1
            nid.Identifier = struct.unpack("<B", data.read(1))[0]
436 1
        elif nid.NodeIdType == NodeIdType.FourByte:
437 1
            nid.NamespaceIndex, nid.Identifier = struct.unpack("<BH", data.read(3))
438 1
        elif nid.NodeIdType == NodeIdType.Numeric:
439 1
            nid.NamespaceIndex, nid.Identifier = struct.unpack("<HI", data.read(6))
440 1
        elif nid.NodeIdType == NodeIdType.String:
441 1
            nid.NamespaceIndex = struct.unpack("<H", data.read(2))[0]
442 1
            nid.Identifier = unpack_string(data)
443
        elif nid.NodeIdType == NodeIdType.ByteString:
444
            nid.NamespaceIndex = struct.unpack("<H", data.read(2))[0]
445
            nid.Identifier = unpack_bytes(data)
446 1
        elif nid.NodeIdType == NodeIdType.Guid:
447
            nid.NamespaceIndex = struct.unpack("<H", data.read(2))[0]
448 1
            nid.Identifier = Guid.from_binary(data)
449
        else:
450
            raise Exception("Unknown NodeId encoding: " + str(nid.NodeIdType))
451 1
452
        if test_bit(encoding, 6):
453
            nid.NamespaceUri = unpack_string(data)
454 1
        if test_bit(encoding, 7):
455
            nid.ServerIndex = struct.unpack("<I", data.read(4))[0]
456 1
457 1
        return nid
458
459
460 1
class TwoByteNodeId(NodeId):
461
462 1
    def __init__(self, identifier):
463 1
        NodeId.__init__(self, identifier, 0, NodeIdType.TwoByte)
464
465
466 1
class FourByteNodeId(NodeId):
467
468 1
    def __init__(self, identifier, namespace=0):
469 1
        NodeId.__init__(self, identifier, namespace, NodeIdType.FourByte)
470
471
472 1
class NumericNodeId(NodeId):
473
474 1
    def __init__(self, identifier, namespace=0):
475 1
        NodeId.__init__(self, identifier, namespace, NodeIdType.Numeric)
476
477
478 1
class ByteStringNodeId(NodeId):
479
480 1
    def __init__(self, identifier, namespace=0):
481 1
        NodeId.__init__(self, identifier, namespace, NodeIdType.ByteString)
482
483
484 1
class GuidNodeId(NodeId):
485
486 1
    def __init__(self, identifier, namespace=0):
487 1
        NodeId.__init__(self, identifier, namespace, NodeIdType.Guid)
488
489
490 1
class StringNodeId(NodeId):
491
492
    def __init__(self, identifier, namespace=0):
493 1
        NodeId.__init__(self, identifier, namespace, NodeIdType.String)
494
495
496
ExpandedNodeId = NodeId
497
498
499 1
class QualifiedName(FrozenClass):
500 1
501 1
    '''
502
    A string qualified with a namespace index.
503 1
    '''
504 1
505
    def __init__(self, name="", namespaceidx=0):
506 1
        self.NamespaceIndex = namespaceidx
507
        self.Name = name
508 1
509 1
    def to_string(self):
510
        return "{}:{}".format(self.NamespaceIndex, self.Name)
511 1
512 1
    @staticmethod
513 1
    def from_string(string):
514
        if ":" in string:
515 1
            idx, name = string.split(":", 1)
516 1
        else:
517 1
            idx = 0
518 1
            name = string
519 1
        return QualifiedName(name, int(idx))
520 1
521
    def to_binary(self):
522 1
        packet = []
523
        fmt = '<H'
524 1
        packet.append(struct.pack(fmt, self.NamespaceIndex))
525 1
        packet.append(pack_string(self.Name))
526 1
        return b''.join(packet)
527 1
528 1
    @staticmethod
529
    def from_binary(data):
530 1
        obj = QualifiedName()
531 1
        fmt = '<H'
532
        obj.NamespaceIndex = struct.unpack(fmt, data.read(2))[0]
533 1
        obj.Name = unpack_string(data)
534
        return obj
535
536 1
    def __eq__(self, bname):
537
        return isinstance(bname, QualifiedName) and self.Name == bname.Name and self.NamespaceIndex == bname.NamespaceIndex
538
539 1
    def __str__(self):
540
        return 'QualifiedName({}:{})'.format(self.NamespaceIndex, self.Name)
541
542
    __repr__ = __str__
543
544
545 1
class LocalizedText(FrozenClass):
546 1
547 1
    '''
548 1
    A string qualified with a namespace index.
549 1
    '''
550 1
551 1
    def __init__(self, text=""):
552 1
        self.Encoding = 0
553 1
        self.Text = text
554
        if type(self.Text) is unicode:
555 1
            self.Text = self.Text.encode('utf-8')
556 1
        if self.Text:
557 1
            self.Encoding |= (1 << 1)
558
        self.Locale = b''
559 1
        self._freeze()
560 1
561 1
    def to_binary(self):
562 1
        packet = []
563
        if self.Locale:
564 1
            self.Encoding |= (1 << 0)
565 1
        if self.Text:
566 1
            self.Encoding |= (1 << 1)
567
        packet.append(pack_uatype('UInt8', self.Encoding))
568 1
        if self.Locale:
569
            packet.append(pack_uatype('CharArray', self.Locale))
570 1
        if self.Text:
571 1
            packet.append(pack_uatype('CharArray', self.Text))
572 1
        return b''.join(packet)
573
574 1
    @staticmethod
575 1
    def from_binary(data):
576 1
        obj = LocalizedText()
577
        obj.Encoding = unpack_uatype('UInt8', data)
578 1
        if obj.Encoding & (1 << 0):
579
            obj.Locale = unpack_uatype('CharArray', data)
580
        if obj.Encoding & (1 << 1):
581
            obj.Text = unpack_uatype('CharArray', data)
582 1
        return obj
583
584
    def to_string(self):
585
        # FIXME: use local
586 1
        return self.Text.decode()
587
588 1
    def __str__(self):
589 1
        return 'LocalizedText(' + 'Encoding:' + str(self.Encoding) + ', ' + \
590 1
            'Locale:' + str(self.Locale) + ', ' + \
591 1
            'Text:' + str(self.Text) + ')'
592
    __repr__ = __str__
593
594
    def __eq__(self, other):
595 1
        if isinstance(other, LocalizedText) and self.Locale == other.Locale and self.Text == other.Text:
596
            return True
597
        return False
598
599
600
601
class VariantType(Enum):
602
603
    '''
604
    The possible types of a variant.
605
606
    :ivar Null:
607
    :ivar Boolean:
608
    :ivar SByte:
609
    :ivar Byte:
610
    :ivar Int16:
611
    :ivar UInt16:
612
    :ivar Int32:
613
    :ivar UInt32:
614
    :ivar Int64:
615
    :ivar UInt64:
616
    :ivar Float:
617
    :ivar Double:
618
    :ivar String:
619
    :ivar DateTime:
620
    :ivar Guid:
621
    :ivar ByteString:
622
    :ivar XmlElement:
623
    :ivar NodeId:
624
    :ivar ExpandedNodeId:
625
    :ivar StatusCode:
626
    :ivar QualifiedName:
627
    :ivar LocalizedText:
628
    :ivar ExtensionObject:
629
    :ivar DataValue:
630 1
    :ivar Variant:
631 1
    :ivar DiagnosticInfo:
632 1
633 1
634 1
635 1
    '''
636 1
    Null = 0
637 1
    Boolean = 1
638 1
    SByte = 2
639 1
    Byte = 3
640 1
    Int16 = 4
641 1
    UInt16 = 5
642 1
    Int32 = 6
643 1
    UInt32 = 7
644 1
    Int64 = 8
645 1
    UInt64 = 9
646 1
    Float = 10
647 1
    Double = 11
648 1
    String = 12
649 1
    DateTime = 13
650 1
    Guid = 14
651 1
    ByteString = 15
652 1
    XmlElement = 16
653 1
    NodeId = 17
654 1
    ExpandedNodeId = 18
655 1
    StatusCode = 19
656
    QualifiedName = 20
657
    LocalizedText = 21
658 1
    ExtensionObject = 22
659
    DataValue = 23
660
    Variant = 24
661
    DiagnosticInfo = 25
662
663
664
class Variant(FrozenClass):
665
666
    """
667
    Create an OPC-UA Variant object.
668
    if no argument a Null Variant is created.
669
    if not variant type is given, attemps to guess type from python type
670
    if a variant is given as value, the new objects becomes a copy of the argument
671
672 1
    :ivar Value:
673 1
    :vartype Value: Any supported type
674 1
    :ivar VariantType:
675 1
    :vartype VariantType: VariantType
676 1
    """
677 1
678 1
    def __init__(self, value=None, varianttype=None):
679 1
        self.Encoding = 0
680 1
        self.Value = value
681 1
        self.VariantType = varianttype
682
        if isinstance(value, Variant):
683 1
            self.Value = value.Value
684
            self.VariantType = value.VariantType
685 1
        if self.VariantType is None:
686
            if type(self.Value) in (list, tuple):
687 1
                if len(self.Value) == 0:
688 1
                    raise Exception("could not guess UA variable type")
689 1
                self.VariantType = self._guess_type(self.Value[0])
690 1
            else:
691
                self.VariantType = self._guess_type(self.Value)
692 1
693 1
    def __eq__(self, other):
694 1
        if isinstance(other, Variant) and self.VariantType == other.VariantType and self.Value == other.Value:
695 1
            return True
696 1
        return False
697 1
698 1
    def _guess_type(self, val):
699 1
        if val is None:
700 1
            return VariantType.Null
701 1
        elif isinstance(val, bool):
702 1
            return VariantType.Boolean
703 1
        elif isinstance(val, float):
704 1
            return VariantType.Double
705 1
        elif isinstance(val, int):
706 1
            return VariantType.Int64
707
        elif type(val) in (str, unicode):
708 1
            return VariantType.String
709 1
        elif isinstance(val, bytes):
710 1
            return VariantType.ByteString
711 1
        elif isinstance(val, datetime):
712 1
            return VariantType.DateTime
713
        else:
714
            if isinstance(val, object):
715
                try:
716 1
                    return getattr(VariantType, val.__class__.__name__)
717 1
                except AttributeError:
718 1
                    return VariantType.ExtensionObject
719
            else:
720 1
                raise Exception("Could not guess UA type of {} with type {}, specify UA type".format(val, type(val)))
721 1
722 1
    def __str__(self):
723 1
        return "Variant(val:{},type:{})".format(self.Value, self.VariantType)
724 1
    __repr__ = __str__
725 1
726 1
    def to_binary(self):
727
        b = []
728 1
        mask = self.Encoding & 0b01111111
729 1
        self.Encoding = (self.VariantType.value | mask)
730 1
        if type(self.Value) in (list, tuple):
731
            self.Encoding |= (1 << 7)
732 1
            b.append(pack_uatype_array(self.VariantType.name, self.Value))
733
        else:
734 1
            b.append(pack_uatype(self.VariantType.name, self.Value))
735 1
        b.insert(0, struct.pack("<B", self.Encoding))
736 1
        return b"".join(b)
737 1
738 1
    @staticmethod
739 1
    def from_binary(data):
740 1
        obj = Variant()
741 1
        obj.Encoding = unpack_uatype('UInt8', data)
742
        val = obj.Encoding & 0b01111111
743 1
        obj.VariantType = VariantType(val)
744 1
        if obj.VariantType == VariantType.Null:
745
            return obj
746
        if obj.Encoding & (1 << 7):
747 1
            obj.Value = unpack_uatype_array(obj.VariantType.name, data)
748
        else:
749
            obj.Value = unpack_uatype(obj.VariantType.name, data)
750
        return obj
751
752
753
class DataValue(FrozenClass):
754
755
    '''
756
    A value with an associated timestamp, and quality.
757
    Automatically generated from xml , copied and modified here to fix errors in xml spec
758
759
    :ivar Value:
760
    :vartype Value: Variant
761
    :ivar StatusCode:
762
    :vartype StatusCode: StatusCode
763
    :ivar SourceTimestamp:
764
    :vartype SourceTimestamp: datetime
765
    :ivar SourcePicoSeconds:
766
    :vartype SourcePicoSeconds: int
767
    :ivar ServerTimestamp:
768 1
    :vartype ServerTimestamp: datetime
769 1
    :ivar ServerPicoseconds:
770 1
    :vartype ServerPicoseconds: int
771 1
772 1
    '''
773 1
774 1
    def __init__(self, variant=None):
775 1
        self.Encoding = 0
776 1
        if not isinstance(variant, Variant):
777 1
            variant = Variant(variant)
778
        self.Value = variant
779 1
        self.StatusCode = StatusCode()
780 1
        self.SourceTimestamp = None  # DateTime()
781 1
        self.SourcePicoseconds = None
782 1
        self.ServerTimestamp = None  # DateTime()
783 1
        self.ServerPicoseconds = None
784 1
785 1
    def to_binary(self):
786 1
        packet = []
787 1
        if self.Value:
788 1
            self.Encoding |= (1 << 0)
789 1
        if self.StatusCode:
790
            self.Encoding |= (1 << 1)
791 1
        if self.SourceTimestamp:
792
            self.Encoding |= (1 << 2)
793 1
        if self.ServerTimestamp:
794 1
            self.Encoding |= (1 << 3)
795 1
        if self.SourcePicoseconds:
796 1
            self.Encoding |= (1 << 4)
797 1
        if self.ServerPicoseconds:
798 1
            self.Encoding |= (1 << 5)
799 1
        packet.append(pack_uatype('UInt8', self.Encoding))
800 1
        if self.Value:
801 1
            packet.append(self.Value.to_binary())
802 1
        if self.StatusCode:
803
            packet.append(self.StatusCode.to_binary())
804 1
        if self.SourceTimestamp:
805
            packet.append(pack_uatype('DateTime', self.SourceTimestamp))  # self.SourceTimestamp.to_binary())
806 1
        if self.ServerTimestamp:
807
            packet.append(pack_uatype('DateTime', self.ServerTimestamp))  # self.ServerTimestamp.to_binary())
808 1
        if self.SourcePicoseconds:
809
            packet.append(pack_uatype('UInt16', self.SourcePicoseconds))
810 1
        if self.ServerPicoseconds:
811 1
            packet.append(pack_uatype('UInt16', self.ServerPicoseconds))
812 1
        return b''.join(packet)
813 1
814 1
    @staticmethod
815 1
    def from_binary(data):
816 1
        obj = DataValue()
817 1
        obj.Encoding = unpack_uatype('UInt8', data)
818 1
        if obj.Encoding & (1 << 0):
819 1
            obj.Value = Variant.from_binary(data)
820 1
        if obj.Encoding & (1 << 1):
821
            obj.StatusCode = StatusCode.from_binary(data)
822 1
        if obj.Encoding & (1 << 2):
823
            obj.SourceTimestamp = unpack_uatype('DateTime', data)  # DateTime.from_binary(data)
824 1
        if obj.Encoding & (1 << 3):
825
            obj.ServerTimestamp = unpack_uatype('DateTime', data)  # DateTime.from_binary(data)
826 1
        if obj.Encoding & (1 << 4):
827
            obj.SourcePicoseconds = unpack_uatype('UInt16', data)
828
        if obj.Encoding & (1 << 5):
829
            obj.ServerPicoseconds = unpack_uatype('UInt16', data)
830
        return obj
831
832
    def __str__(self):
833
        s = 'DataValue(Value:{}'.format(self.Value)
834
        if self.StatusCode is not None:
835
            s += ', StatusCode:{}'.format(self.StatusCode)
836
        if self.SourceTimestamp is not None:
837
            s += ', SourceTimestamp:{}'.format(self.SourceTimestamp)
838
        if self.ServerTimestamp is not None:
839
            s += ', ServerTimestamp:{}'.format(self.ServerTimestamp)
840
        if self.SourcePicoseconds is not None:
841 1
            s += ', SourcePicoseconds:{}'.format(self.SourcePicoseconds)
842
        if self.ServerPicoseconds is not None:
843
            s += ', ServerPicoseconds:{}'.format(self.ServerPicoseconds)
844 1
        s += ')'
845
        return s
846
847 1
    __repr__ = __str__
848
849 1
850 1
__nodeid_counter = 2000
851
852
853
def generate_nodeid(idx):
854
    global __nodeid_counter
855
    __nodeid_counter += 1
856
    return NodeId(__nodeid_counter, idx)
857