Passed
Push — dev ( 97f75d...d1bef8 )
by Olivier
03:40
created

opcua.ua.Variant.from_binary()   B

Complexity

Conditions 5

Size

Total Lines 20

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 15
CRAP Score 5.0061
Metric Value
cc 5
dl 0
loc 20
ccs 15
cts 16
cp 0.9375
crap 5.0061
rs 8.5454
1
"""
2
implement ua datatypes
3
"""
4 1
import logging
5 1
from enum import Enum
6 1
from datetime import datetime, timedelta, tzinfo
7 1
from calendar import timegm
8 1
import sys
9 1
import os
10 1
import uuid
11 1
import struct
12 1
if sys.version_info.major > 2:
13 1
    unicode = str
14
15 1
from opcua.ua import status_codes
16 1
from opcua.common.uaerrors import UAError
17 1
from opcua.common.uaerrors import UAStatusCodeError
18
19
20 1
logger = logging.getLogger('opcua.uaprotocol')
21
22
23
# types that will packed and unpacked directly using struct (string, bytes and datetime are handles as special cases
24 1
UaTypes = ("Boolean", "SByte", "Byte", "Int8", "UInt8", "Int16", "UInt16", "Int32", "UInt32", "Int64", "UInt64", "Float", "Double")
25
26
27 1
EPOCH_AS_FILETIME = 116444736000000000  # January 1, 1970 as MS file time
28 1
HUNDREDS_OF_NANOSECONDS = 10000000
29 1
FILETIME_EPOCH_AS_DATETIME = datetime(1601, 1, 1)
30
31
32 1
class UTC(tzinfo):
33
34
    """UTC"""
35
36 1
    def utcoffset(self, dt):
37
        return timedelta(0)
38
39 1
    def tzname(self, dt):
40
        return "UTC"
41
42 1
    def dst(self, dt):
43 1
        return timedelta(0)
44
45
46
# method copied from David Buxton <[email protected]> sample code
47 1
def datetime_to_win_epoch(dt):
48 1
    if (dt.tzinfo is None) or (dt.tzinfo.utcoffset(dt) is None):
49 1
        dt = dt.replace(tzinfo=UTC())
50 1
    ft = EPOCH_AS_FILETIME + (timegm(dt.timetuple()) * HUNDREDS_OF_NANOSECONDS)
51 1
    return ft + (dt.microsecond * 10)
52
53
54 1
def win_epoch_to_datetime(epch):
55 1
    return FILETIME_EPOCH_AS_DATETIME + timedelta(microseconds=epch // 10)
56
57
58 1
def build_array_format_py2(prefix, length, fmtchar):
59
    return prefix + str(length) + fmtchar
60
61
62 1
def build_array_format_py3(prefix, length, fmtchar):
63 1
    return prefix + str(length) + chr(fmtchar)
64
65
66 1
if sys.version_info.major < 3:
67
    build_array_format = build_array_format_py2
68
else:
69 1
    build_array_format = build_array_format_py3
70
71
72 1
def pack_uatype_array_primitive(st, value, length):
73 1
    if length == 1:
74 1
        return b'\x01\x00\x00\x00' + st.pack(value[0])
75
    else:
76 1
        return struct.pack(build_array_format("<i", length, st.format[1]), length, *value)
77
78
79 1
def pack_uatype_array(uatype, value):
80 1
    if value is None:
81
        return b'\xff\xff\xff\xff'
82 1
    length = len(value)
83 1
    if length == 0:
84 1
        return b'\x00\x00\x00\x00'
85 1
    if uatype in uatype2struct:
86 1
        return pack_uatype_array_primitive(uatype2struct[uatype], value, length)
87 1
    b = []
88 1
    b.append(uatype_Int32.pack(length))
89 1
    for val in value:
90 1
        b.append(pack_uatype(uatype, val))
91 1
    return b"".join(b)
92
93
94 1
def pack_uatype(uatype, value):
95 1
    if uatype in uatype2struct:
96 1
        return uatype2struct[uatype].pack(value)
97 1
    elif uatype == "Null":
98 1
        return b''
99 1
    elif uatype == "String":
100 1
        return pack_string(value)
101 1
    elif uatype in ("CharArray", "ByteString", "Custom"):
102 1
        return pack_bytes(value)
103 1
    elif uatype == "DateTime":
104 1
        return pack_datetime(value)
105 1
    elif uatype == "ExtensionObject":
106
        # dependency loop: classes in uaprotocol_auto use Variant defined in this file,
107
        # but Variant can contain any object from uaprotocol_auto as ExtensionObject.
108
        # Using local import to avoid import loop
109 1
        from opcua.ua.uaprotocol_auto import extensionobject_to_binary
110 1
        return extensionobject_to_binary(value)
111
    else:
112 1
        return value.to_binary()
113
114 1
uatype_Int8 = struct.Struct("<b")
115 1
uatype_SByte = uatype_Int8
116 1
uatype_Int16 = struct.Struct("<h")
117 1
uatype_Int32 = struct.Struct("<i")
118 1
uatype_Int64 = struct.Struct("<q")
119 1
uatype_UInt8 = struct.Struct("<B")
120 1
uatype_Char = uatype_UInt8
121 1
uatype_Byte = uatype_UInt8
122 1
uatype_UInt16 = struct.Struct("<H")
123 1
uatype_UInt32 = struct.Struct("<I")
124 1
uatype_UInt64 = struct.Struct("<Q")
125 1
uatype_Boolean = struct.Struct("<?")
126 1
uatype_Double = struct.Struct("<d")
127 1
uatype_Float = struct.Struct("<f")
128
129 1
uatype2struct = {
130
    "Int8": uatype_Int8,
131
    "SByte": uatype_SByte,
132
    "Int16": uatype_Int16,
133
    "Int32": uatype_Int32,
134
    "Int64": uatype_Int64,
135
    "UInt8": uatype_UInt8,
136
    "Char": uatype_Char,
137
    "Byte": uatype_Byte,
138
    "UInt16": uatype_UInt16,
139
    "UInt32": uatype_UInt32,
140
    "UInt64": uatype_UInt64,
141
    "Boolean": uatype_Boolean,
142
    "Double": uatype_Double,
143
    "Float": uatype_Float,
144
}
145
146
147 1
def unpack_uatype(uatype, data):
148 1
    if uatype in uatype2struct:
149 1
        st = uatype2struct[uatype]
150 1
        return st.unpack(data.read(st.size))[0]
151 1
    elif uatype == "String":
152 1
        return unpack_string(data)
153 1
    elif uatype in ("CharArray", "ByteString", "Custom"):
154 1
        return unpack_bytes(data)
155 1
    elif uatype == "DateTime":
156 1
        return unpack_datetime(data)
157 1
    elif uatype == "ExtensionObject":
158
        # dependency loop: classes in uaprotocol_auto use Variant defined in this file,
159
        # but Variant can contain any object from uaprotocol_auto as ExtensionObject.
160
        # Using local import to avoid import loop
161 1
        from opcua.ua.uaprotocol_auto import extensionobject_from_binary
162 1
        return extensionobject_from_binary(data)
163
    else:
164 1
        glbs = globals()
165 1
        if uatype in glbs:
166 1
            klass = glbs[uatype]
167 1
            if hasattr(klass, 'from_binary'):
168 1
                return klass.from_binary(data)
169
        raise UAError("can not unpack unknown uatype %s" % uatype)
170
171
172 1
def unpack_uatype_array(uatype, data):
173 1
    length = uatype_Int32.unpack(data.read(4))[0]
174 1
    if length == -1:
175
        return None
176 1
    elif length == 0:
177 1
        return []
178 1
    elif uatype in uatype2struct:
179 1
        st = uatype2struct[uatype]
180 1
        if length == 1:
181 1
            return list(st.unpack(data.read(st.size)))
182
        else:
183 1
            arrst = struct.Struct(build_array_format("<", length, st.format[1]))
184 1
            return list(arrst.unpack(data.read(arrst.size)))
185
    else:
186 1
        result = []
187 1
        for _ in range(0, length):
188 1
            result.append(unpack_uatype(uatype, data))
189 1
        return result
190
191
192 1
def pack_datetime(value):
193 1
    epch = datetime_to_win_epoch(value)
194 1
    return uatype_Int64.pack(epch)
195
196
197 1
def unpack_datetime(data):
198 1
    epch = uatype_Int64.unpack(data.read(8))[0]
199 1
    return win_epoch_to_datetime(epch)
200
201
202 1
def pack_string(string):
203 1
    if isinstance(string, unicode):
204 1
        string = string.encode('utf-8')
205 1
    length = len(string)
206 1
    if length == 0:
207 1
        return b'\xff\xff\xff\xff'
208 1
    return uatype_Int32.pack(length) + string
209
210 1
pack_bytes = pack_string
211
212
213 1
def unpack_bytes(data):
214 1
    length = uatype_Int32.unpack(data.read(4))[0]
215 1
    if length == -1:
216 1
        return b''
217 1
    return data.read(length)
218
219
220 1
def py3_unpack_string(data):
221 1
    b = unpack_bytes(data)
222 1
    return b.decode("utf-8")
223
224
225 1
if sys.version_info.major < 3:
226
    unpack_string = unpack_bytes
227
else:
228 1
    unpack_string = py3_unpack_string
229
230
231 1
def test_bit(data, offset):
232 1
    mask = 1 << offset
233 1
    return data & mask
234
235
236 1
def set_bit(data, offset):
237 1
    mask = 1 << offset
238 1
    return data | mask
239
240
241 1
class _FrozenClass(object):
242
243
    """
244
    make it impossible to add members to a class.
245
    This is a hack since I found out that most bugs are due to misspelling a variable in protocol
246
    """
247 1
    _freeze = False
248
249 1
    def __setattr__(self, key, value):
250 1
        if self._freeze and not hasattr(self, key):
251
            raise TypeError("Error adding member '{}' to class '{}', class is frozen, members are {}".format(key, self.__class__.__name__, self.__dict__.keys()))
252 1
        object.__setattr__(self, key, value)
253
254 1
if "PYOPCUA_NO_TYPO_CHECK" in os.environ:
255
    # typo check is cpu consuming, but it will make debug easy.
256
    # if typo check is not need (in production), please set env PYOPCUA_NO_TYPO_CHECK.
257
    # this will make all uatype class inherit from object intead of _FrozenClass
258
    # and skip the typo check.
259
    FrozenClass = object
260
else:
261 1
    FrozenClass = _FrozenClass
262
263
264 1
class Guid(FrozenClass):
265
266 1
    def __init__(self):
267 1
        self.uuid = uuid.uuid4()
268 1
        self._freeze = True
269
270 1
    def to_binary(self):
271 1
        return self.uuid.bytes
272
273 1
    def __hash__(self):
274
        return hash(self.uuid.bytes)
275
276 1
    @staticmethod
277
    def from_binary(data):
278 1
        g = Guid()
279 1
        g.uuid = uuid.UUID(bytes=data.read(16))
280 1
        return g
281
282 1
    def __eq__(self, other):
283 1
        return isinstance(other, Guid) and self.uuid == other.uuid
284
285
286 1
class StatusCode(FrozenClass):
287
288
    """
289
    :ivar value:
290
    :vartype value: int
291
    :ivar name:
292
    :vartype name: string
293
    :ivar doc:
294
    :vartype doc: string
295
    """
296
297 1
    def __init__(self, value=0):
298 1
        self.value = value
299 1
        self.name, self.doc = status_codes.get_name_and_doc(value)
300 1
        self._freeze = True
301
302 1
    def to_binary(self):
303 1
        return uatype_UInt32.pack(self.value)
304
305 1
    @staticmethod
306
    def from_binary(data):
307 1
        val = uatype_UInt32.unpack(data.read(4))[0]
308 1
        sc = StatusCode(val)
309 1
        return sc
310
311 1
    def check(self):
312
        """
313
        raise en exception if status code is anything else than 0
314
        use is is_good() method if not exception is desired
315
        """
316 1
        if self.value != 0:
317 1
            raise UAStatusCodeError("{}({})".format(self.doc, self.name))
318
319 1
    def is_good(self):
320
        """
321
        return True if status is Good.
322
        """
323 1
        if self.value == 0:
324 1
            return True
325 1
        return False
326
327 1
    def __str__(self):
328
        return 'StatusCode({})'.format(self.name)
329 1
    __repr__ = __str__
330
331
332 1
class NodeIdType(Enum):
333 1
    TwoByte = 0
334 1
    FourByte = 1
335 1
    Numeric = 2
336 1
    String = 3
337 1
    Guid = 4
338 1
    ByteString = 5
339
340
341 1
class NodeId(FrozenClass):
342
343
    """
344
    NodeId Object
345
346
    Args:
347
        identifier: The identifier might be an int, a string, bytes or a Guid
348
        namespaceidx(int): The index of the namespace
349
        nodeidtype(NodeIdType): The type of the nodeid if it cannor be guess or you want something special like twobyte nodeid or fourbytenodeid
350
351
352
    :ivar Identifier:
353
    :vartype Identifier: NodeId
354
    :ivar NamespaceIndex:
355
    :vartype NamespaceIndex: Int
356
    :ivar NamespaceUri:
357
    :vartype NamespaceUri: String
358
    :ivar ServerIndex:
359
    :vartype ServerIndex: Int
360
    """
361
362 1
    def __init__(self, identifier=None, namespaceidx=0, nodeidtype=None):
363 1
        self.Identifier = identifier
364 1
        self.NamespaceIndex = namespaceidx
365 1
        self.NodeIdType = nodeidtype
366 1
        self.NamespaceUri = ""
367 1
        self.ServerIndex = 0
368 1
        self._freeze = True
369 1
        if self.Identifier is None:
370 1
            self.Identifier = 0
371 1
            self.NodeIdType = NodeIdType.TwoByte
372 1
            return
373 1
        if self.NodeIdType is None:
374 1
            if isinstance(self.Identifier, int):
375 1
                self.NodeIdType = NodeIdType.Numeric
376 1
            elif isinstance(self.Identifier, str):
377 1
                self.NodeIdType = NodeIdType.String
378
            elif isinstance(self.Identifier, bytes):
379
                self.NodeIdType = NodeIdType.ByteString
380
            else:
381
                raise UAError("NodeId: Could not guess type of NodeId, set NodeIdType")
382
383 1
    def __key(self):
384 1
        if self.NodeIdType in (NodeIdType.TwoByte, NodeIdType.FourByte, NodeIdType.Numeric):  # twobyte, fourbyte and numeric may represent the same node
385 1
            return self.NamespaceIndex, self.Identifier
386
        else:
387 1
            return self.NodeIdType, self.NamespaceIndex, self.Identifier
388
389 1
    def __eq__(self, node):
390 1
        return isinstance(node, NodeId) and self.__key() == node.__key()
391
392 1
    def __hash__(self):
393 1
        return hash(self.__key())
394
395 1
    @staticmethod
396
    def from_string(string):
397 1
        l = string.split(";")
398 1
        identifier = None
399 1
        namespace = 0
400 1
        ntype = None
401 1
        srv = None
402 1
        nsu = None
403 1
        for el in l:
404 1
            if not el:
405 1
                continue
406 1
            k, v = el.split("=")
407 1
            k = k.strip()
408 1
            v = v.strip()
409 1
            if k == "ns":
410 1
                namespace = int(v)
411 1
            elif k == "i":
412 1
                ntype = NodeIdType.Numeric
413 1
                identifier = int(v)
414 1
            elif k == "s":
415 1
                ntype = NodeIdType.String
416 1
                identifier = v
417 1
            elif k == "g":
418
                ntype = NodeIdType.Guid
419
                identifier = v
420 1
            elif k == "b":
421
                ntype = NodeIdType.ByteString
422
                identifier = v
423 1
            elif k == "srv":
424 1
                srv = v
425
            elif k == "nsu":
426
                nsu = v
427 1
        if identifier is None:
428
            raise UAError("Could not parse nodeid string: " + string)
429 1
        nodeid = NodeId(identifier, namespace, ntype)
430 1
        nodeid.NamespaceUri = nsu
431 1
        nodeid.ServerIndex = srv
432 1
        return nodeid
433
434 1
    def to_string(self):
435 1
        string = ""
436 1
        if self.NamespaceIndex != 0:
437 1
            string += "ns={};".format(self.NamespaceIndex)
438 1
        ntype = None
439 1
        if self.NodeIdType == NodeIdType.Numeric:
440
            ntype = "i"
441 1
        elif self.NodeIdType == NodeIdType.String:
442 1
            ntype = "s"
443 1
        elif self.NodeIdType == NodeIdType.TwoByte:
444 1
            ntype = "i"
445 1
        elif self.NodeIdType == NodeIdType.FourByte:
446 1
            ntype = "i"
447
        elif self.NodeIdType == NodeIdType.Guid:
448
            ntype = "g"
449
        elif self.NodeIdType == NodeIdType.ByteString:
450
            ntype = "b"
451 1
        string += "{}={}".format(ntype, self.Identifier)
452 1
        if self.ServerIndex:
453
            string = "srv=" + str(self.ServerIndex) + string
454 1
        if self.NamespaceUri:
455
            string += "nsu={}".format(self.NamespaceUri)
456 1
        return string
457
458 1
    def __str__(self):
459 1
        return "{}NodeId({})".format(self.NodeIdType.name, self.to_string())
460 1
    __repr__ = __str__
461
462 1
    def to_binary(self):
463 1
        if self.NodeIdType == NodeIdType.TwoByte:
464 1
            return struct.pack("<BB", self.NodeIdType.value, self.Identifier)
465 1
        elif self.NodeIdType == NodeIdType.FourByte:
466 1
            return struct.pack("<BBH", self.NodeIdType.value, self.NamespaceIndex, self.Identifier)
467 1
        elif self.NodeIdType == NodeIdType.Numeric:
468 1
            return struct.pack("<BHI", self.NodeIdType.value, self.NamespaceIndex, self.Identifier)
469 1
        elif self.NodeIdType == NodeIdType.String:
470 1
            return struct.pack("<BH", self.NodeIdType.value, self.NamespaceIndex) + \
471
                pack_string(self.Identifier)
472 1
        elif self.NodeIdType == NodeIdType.ByteString:
473 1
            return struct.pack("<BH", self.NodeIdType.value, self.NamespaceIndex) + \
474
                pack_bytes(self.Identifier)
475
        else:
476 1
            return struct.pack("<BH", self.NodeIdType.value, self.NamespaceIndex) + \
477
                self.Identifier.to_binary()
478
        #FIXME: Missing NNamespaceURI and ServerIndex
479
480 1
    @staticmethod
481
    def from_binary(data):
482 1
        nid = NodeId()
483 1
        encoding = ord(data.read(1))
484 1
        nid.NodeIdType = NodeIdType(encoding & 0b00111111)
485
486 1
        if nid.NodeIdType == NodeIdType.TwoByte:
487 1
            nid.Identifier = ord(data.read(1))
488 1
        elif nid.NodeIdType == NodeIdType.FourByte:
489 1
            nid.NamespaceIndex, nid.Identifier = struct.unpack("<BH", data.read(3))
490 1
        elif nid.NodeIdType == NodeIdType.Numeric:
491 1
            nid.NamespaceIndex, nid.Identifier = struct.unpack("<HI", data.read(6))
492 1
        elif nid.NodeIdType == NodeIdType.String:
493 1
            nid.NamespaceIndex = uatype_UInt16.unpack(data.read(2))[0]
494 1
            nid.Identifier = unpack_string(data)
495 1
        elif nid.NodeIdType == NodeIdType.ByteString:
496 1
            nid.NamespaceIndex = uatype_UInt16.unpack(data.read(2))[0]
497 1
            nid.Identifier = unpack_bytes(data)
498 1
        elif nid.NodeIdType == NodeIdType.Guid:
499 1
            nid.NamespaceIndex = uatype_UInt16.unpack(data.read(2))[0]
500 1
            nid.Identifier = Guid.from_binary(data)
501
        else:
502
            raise UAError("Unknown NodeId encoding: " + str(nid.NodeIdType))
503
504 1
        if test_bit(encoding, 7):
505
            nid.NamespaceUri = unpack_string(data)
506 1
        if test_bit(encoding, 6):
507
            nid.ServerIndex = uatype_UInt32.unpack(data.read(4))[0]
508
509 1
        return nid
510
511
512 1
class TwoByteNodeId(NodeId):
513
514 1
    def __init__(self, identifier):
515 1
        NodeId.__init__(self, identifier, 0, NodeIdType.TwoByte)
516
517
518 1
class FourByteNodeId(NodeId):
519
520 1
    def __init__(self, identifier, namespace=0):
521 1
        NodeId.__init__(self, identifier, namespace, NodeIdType.FourByte)
522
523
524 1
class NumericNodeId(NodeId):
525
526 1
    def __init__(self, identifier, namespace=0):
527 1
        NodeId.__init__(self, identifier, namespace, NodeIdType.Numeric)
528
529
530 1
class ByteStringNodeId(NodeId):
531
532 1
    def __init__(self, identifier, namespace=0):
533 1
        NodeId.__init__(self, identifier, namespace, NodeIdType.ByteString)
534
535
536 1
class GuidNodeId(NodeId):
537
538 1
    def __init__(self, identifier, namespace=0):
539 1
        NodeId.__init__(self, identifier, namespace, NodeIdType.Guid)
540
541
542 1
class StringNodeId(NodeId):
543
544 1
    def __init__(self, identifier, namespace=0):
545 1
        NodeId.__init__(self, identifier, namespace, NodeIdType.String)
546
547
548 1
ExpandedNodeId = NodeId
549
550
551 1
class QualifiedName(FrozenClass):
552
553
    '''
554
    A string qualified with a namespace index.
555
    '''
556
557 1
    def __init__(self, name="", namespaceidx=0):
558 1
        if not isinstance(namespaceidx, int):
559
            raise UAError("namespaceidx must be an int")
560 1
        self.NamespaceIndex = namespaceidx
561 1
        self.Name = name
562 1
        self._freeze = True
563
564 1
    def to_string(self):
565 1
        return "{}:{}".format(self.NamespaceIndex, self.Name)
566
567 1
    @staticmethod
568
    def from_string(string):
569 1
        if ":" in string:
570 1
            idx, name = string.split(":", 1)
571
        else:
572 1
            idx = 0
573 1
            name = string
574 1
        return QualifiedName(name, int(idx))
575
576 1
    def to_binary(self):
577 1
        packet = []
578 1
        packet.append(uatype_UInt16.pack(self.NamespaceIndex))
579 1
        packet.append(pack_string(self.Name))
580 1
        return b''.join(packet)
581
582 1
    @staticmethod
583
    def from_binary(data):
584 1
        obj = QualifiedName()
585 1
        obj.NamespaceIndex = uatype_UInt16.unpack(data.read(2))[0]
586 1
        obj.Name = unpack_string(data)
587 1
        return obj
588
589 1
    def __eq__(self, bname):
590 1
        return isinstance(bname, QualifiedName) and self.Name == bname.Name and self.NamespaceIndex == bname.NamespaceIndex
591
592 1
    def __str__(self):
593
        return 'QualifiedName({}:{})'.format(self.NamespaceIndex, self.Name)
594
595 1
    __repr__ = __str__
596
597
598 1
class LocalizedText(FrozenClass):
599
600
    '''
601
    A string qualified with a namespace index.
602
    '''
603
604 1
    def __init__(self, text=""):
605 1
        self.Encoding = 0
606 1
        self.Text = text
607 1
        if isinstance(self.Text, unicode):
608 1
            self.Text = self.Text.encode('utf-8')
609 1
        if self.Text:
610 1
            self.Encoding |= (1 << 1)
611 1
        self.Locale = b''
612 1
        self._freeze = True
613
614 1
    def to_binary(self):
615 1
        packet = []
616 1
        if self.Locale:
617
            self.Encoding |= (1 << 0)
618 1
        if self.Text:
619 1
            self.Encoding |= (1 << 1)
620 1
        packet.append(uatype_UInt8.pack(self.Encoding))
621 1
        if self.Locale:
622
            packet.append(pack_bytes(self.Locale))
623 1
        if self.Text:
624 1
            packet.append(pack_bytes(self.Text))
625 1
        return b''.join(packet)
626
627 1
    @staticmethod
628
    def from_binary(data):
629 1
        obj = LocalizedText()
630 1
        obj.Encoding = ord(data.read(1))
631 1
        if obj.Encoding & (1 << 0):
632
            obj.Locale = unpack_bytes(data)
633 1
        if obj.Encoding & (1 << 1):
634 1
            obj.Text = unpack_bytes(data)
635 1
        return obj
636
637 1
    def to_string(self):
638
        # FIXME: use local
639
        return self.Text.decode()
640
641 1
    def __str__(self):
642
        return 'LocalizedText(' + 'Encoding:' + str(self.Encoding) + ', ' + \
643
            'Locale:' + str(self.Locale) + ', ' + \
644
            'Text:' + str(self.Text) + ')'
645 1
    __repr__ = __str__
646
647 1
    def __eq__(self, other):
648 1
        if isinstance(other, LocalizedText) and self.Locale == other.Locale and self.Text == other.Text:
649 1
            return True
650 1
        return False
651
652
653 1
class ExtensionObject(FrozenClass):
654
655
    '''
656
657
    Any UA object packed as an ExtensionObject
658
659
660
    :ivar TypeId:
661
    :vartype TypeId: NodeId
662
    :ivar Body:
663
    :vartype Body: bytes
664
665
    '''
666
667 1
    def __init__(self):
668 1
        self.TypeId = NodeId()
669 1
        self.Encoding = 0
670 1
        self.Body = b''
671 1
        self._freeze = True
672
673 1
    def to_binary(self):
674 1
        packet = []
675 1
        if self.Body:
676 1
            self.Encoding |= (1 << 0)
677 1
        packet.append(self.TypeId.to_binary())
678 1
        packet.append(pack_uatype('UInt8', self.Encoding))
679 1
        if self.Body:
680 1
            packet.append(pack_uatype('ByteString', self.Body))
681 1
        return b''.join(packet)
682
683 1
    @staticmethod
684
    def from_binary(data):
685
        obj = ExtensionObject()
686
        obj.TypeId = NodeId.from_binary(data)
687
        obj.Encoding = unpack_uatype('UInt8', data)
688
        if obj.Encoding & (1 << 0):
689
            obj.Body = unpack_uatype('ByteString', data)
690
        return obj
691
692 1
    @staticmethod
693
    def from_object(obj):
694
        ext = ExtensionObject()
695
        oid = getattr(ObjectIds, "{}_Encoding_DefaultBinary".format(obj.__class__.__name__))
696
        ext.TypeId = FourByteNodeId(oid)
697
        ext.Body = obj.to_binary()
698
        return ext
699
700 1
    def __str__(self):
701
        return 'ExtensionObject(' + 'TypeId:' + str(self.TypeId) + ', ' + \
702
            'Encoding:' + str(self.Encoding) + ', ' + str(len(self.Body)) + ' bytes)'
703
704 1
    __repr__ = __str__
705
706
707 1
class VariantType(Enum):
708
709
    '''
710
    The possible types of a variant.
711
712
    :ivar Null:
713
    :ivar Boolean:
714
    :ivar SByte:
715
    :ivar Byte:
716
    :ivar Int16:
717
    :ivar UInt16:
718
    :ivar Int32:
719
    :ivar UInt32:
720
    :ivar Int64:
721
    :ivar UInt64:
722
    :ivar Float:
723
    :ivar Double:
724
    :ivar String:
725
    :ivar DateTime:
726
    :ivar Guid:
727
    :ivar ByteString:
728
    :ivar XmlElement:
729
    :ivar NodeId:
730
    :ivar ExpandedNodeId:
731
    :ivar StatusCode:
732
    :ivar QualifiedName:
733
    :ivar LocalizedText:
734
    :ivar ExtensionObject:
735
    :ivar DataValue:
736
    :ivar Variant:
737
    :ivar DiagnosticInfo:
738
739
740
741
    '''
742 1
    Null = 0
743 1
    Boolean = 1
744 1
    SByte = 2
745 1
    Byte = 3
746 1
    Int16 = 4
747 1
    UInt16 = 5
748 1
    Int32 = 6
749 1
    UInt32 = 7
750 1
    Int64 = 8
751 1
    UInt64 = 9
752 1
    Float = 10
753 1
    Double = 11
754 1
    String = 12
755 1
    DateTime = 13
756 1
    Guid = 14
757 1
    ByteString = 15
758 1
    XmlElement = 16
759 1
    NodeId = 17
760 1
    ExpandedNodeId = 18
761 1
    StatusCode = 19
762 1
    QualifiedName = 20
763 1
    LocalizedText = 21
764 1
    ExtensionObject = 22
765 1
    DataValue = 23
766 1
    Variant = 24
767 1
    DiagnosticInfo = 25
768
769
770 1
class VariantTypeCustom(object):
771 1
    def __init__(self, val):
772
        self.name = "Custom"
773
        self.value = val
774
775 1
    def __str__(self):
776
        return "VariantType.Custom:{}".format(self.value)
777 1
    __repr__ = __str__
778
779
780 1
class Variant(FrozenClass):
781
782
    """
783
    Create an OPC-UA Variant object.
784
    if no argument a Null Variant is created.
785
    if not variant type is given, attemps to guess type from python type
786
    if a variant is given as value, the new objects becomes a copy of the argument
787
788
    :ivar Value:
789
    :vartype Value: Any supported type
790
    :ivar VariantType:
791
    :vartype VariantType: VariantType
792
    """
793
794 1
    def __init__(self, value=None, varianttype=None, encoding=0, dimensions=None):
795 1
        self.Encoding = encoding
796 1
        self.Value = value
797 1
        self.VariantType = varianttype
798 1
        self.Dimensions = dimensions
799 1
        self._freeze = True
800 1
        if isinstance(value, Variant):
801 1
            self.Value = value.Value
802 1
            self.VariantType = value.VariantType
803 1
        if self.VariantType is None:
804 1
            self.VariantType = self._guess_type(self.Value)
805 1
        if self.Dimensions is None and type(self.Value) in (list, tuple):
806 1
            dims = get_shape(self.Value)
807 1
            if len(dims) > 1:
808 1
                self.Dimensions = dims
809
810 1
    def __eq__(self, other):
811 1
        if isinstance(other, Variant) and self.VariantType == other.VariantType and self.Value == other.Value:
812 1
            return True
813 1
        return False
814
815 1
    def _guess_type(self, val):
816 1
        while isinstance(val, (list, tuple)):
817 1
            if len(val) == 0:
818
                raise UAError("could not guess UA variable type")
819 1
            val = val[0]
820 1
        if val is None:
821 1
            return VariantType.Null
822 1
        elif isinstance(val, bool):
823 1
            return VariantType.Boolean
824 1
        elif isinstance(val, float):
825 1
            return VariantType.Double
826 1
        elif isinstance(val, int):
827 1
            return VariantType.Int64
828 1
        elif type(val) in (str, unicode):
829 1
            return VariantType.String
830 1
        elif isinstance(val, bytes):
831 1
            return VariantType.ByteString
832 1
        elif isinstance(val, datetime):
833 1
            return VariantType.DateTime
834
        else:
835 1
            if isinstance(val, object):
836 1
                try:
837 1
                    return getattr(VariantType, val.__class__.__name__)
838 1
                except AttributeError:
839 1
                    return VariantType.ExtensionObject
840
            else:
841
                raise UAError("Could not guess UA type of {} with type {}, specify UA type".format(val, type(val)))
842
843 1
    def __str__(self):
844 1
        return "Variant(val:{!s},type:{})".format(self.Value, self.VariantType)
845 1
    __repr__ = __str__
846
847 1
    def to_binary(self):
848 1
        b = []
849 1
        mask = self.Encoding & 0b00111111
850 1
        self.Encoding = (self.VariantType.value | mask)
851 1
        if type(self.Value) in (list, tuple):
852 1
            if self.Dimensions is not None:
853 1
                self.Encoding = set_bit(self.Encoding, 6)
854 1
            self.Encoding = set_bit(self.Encoding, 7)
855 1
            b.append(uatype_UInt8.pack(self.Encoding))
856 1
            b.append(pack_uatype_array(self.VariantType.name, flatten(self.Value)))
857 1
            if self.Dimensions is not None:
858 1
                b.append(pack_uatype_array("Int32", self.Dimensions))
859
        else:
860 1
            b.append(uatype_UInt8.pack(self.Encoding))
861 1
            b.append(pack_uatype(self.VariantType.name, self.Value))
862
863 1
        return b"".join(b)
864
865 1
    @staticmethod
866
    def from_binary(data):
867 1
        dimensions = None
868 1
        encoding = ord(data.read(1))
869 1
        int_type = encoding & 0b00111111
870 1
        if int_type > 25:
871
            vtype = VariantTypeCustom(int_type)
872
        else:
873 1
            vtype = VariantType(int_type)
874 1
        if vtype == VariantType.Null:
875 1
            return Variant(None, vtype, encoding)
876 1
        if test_bit(encoding, 7):
877 1
            value = unpack_uatype_array(vtype.name, data)
878
        else:
879 1
            value = unpack_uatype(vtype.name, data)
880 1
        if test_bit(encoding, 6):
881 1
            dimensions = unpack_uatype_array("Int32", data)
882 1
            value = reshape(value, dimensions)
883
884 1
        return Variant(value, vtype, encoding, dimensions)
885
886
887 1
def reshape(flat, dims):
888 1
    subdims = dims[1:]
889 1
    subsize = 1
890 1
    for i in subdims:
891 1
        if i == 0:
892 1
            i = 1
893 1
        subsize *= i
894 1
    while dims[0] * subsize > len(flat):
895 1
        flat.append([])
896 1
    if not subdims or subdims == [0]:
897 1
        return flat
898 1
    return [reshape(flat[i: i + subsize], subdims) for i in range(0, len(flat), subsize)]
899
900
901 1
def _split_list(l, n):
902
    n = max(1, n)
903
    return [l[i:i + n] for i in range(0, len(l), n)]
904
905
906 1
def flatten_and_get_shape(mylist):
907
    dims = []
908
    dims.append(len(mylist))
909
    while isinstance(mylist[0], (list, tuple)):
910
        dims.append(len(mylist[0]))
911
        mylist = [item for sublist in mylist for item in sublist]
912
        if len(mylist) == 0:
913
            break
914
    return mylist, dims
915
916
917 1
def flatten(mylist):
918 1
    if len(mylist) == 0:
919
        return mylist
920 1
    while isinstance(mylist[0], (list, tuple)):
921 1
        mylist = [item for sublist in mylist for item in sublist]
922 1
        if len(mylist) == 0:
923 1
            break
924 1
    return mylist
925
926
927 1
def get_shape(mylist):
928 1
    dims = []
929 1
    while isinstance(mylist, (list, tuple)):
930 1
        dims.append(len(mylist))
931 1
        if len(mylist) == 0:
932 1
            break
933 1
        mylist = mylist[0]
934 1
    return dims
935
936
937 1
class XmlElement(FrozenClass):
938
    '''
939
    An XML element encoded as an UTF-8 string.
940
    '''
941 1
    def __init__(self, binary=None):
942
        if binary is not None:
943
            self._binary_init(binary)
944
            self._freeze = True
945
            return
946
        self.Value = []
947
        self._freeze = True
948
949 1
    def to_binary(self):
950
        return pack_string(self.Value)
951
952 1
    @staticmethod
953
    def from_binary(data):
954
        return XmlElement(data)
955
956 1
    def _binary_init(self, data):
957
        self.Value = unpack_string(data)
958
959 1
    def __str__(self):
960
        return 'XmlElement(Value:' + str(self.Value) + ')'
961
962 1
    __repr__ = __str__
963
964
965
966
967 1
class DataValue(FrozenClass):
968
969
    '''
970
    A value with an associated timestamp, and quality.
971
    Automatically generated from xml , copied and modified here to fix errors in xml spec
972
973
    :ivar Value:
974
    :vartype Value: Variant
975
    :ivar StatusCode:
976
    :vartype StatusCode: StatusCode
977
    :ivar SourceTimestamp:
978
    :vartype SourceTimestamp: datetime
979
    :ivar SourcePicoSeconds:
980
    :vartype SourcePicoSeconds: int
981
    :ivar ServerTimestamp:
982
    :vartype ServerTimestamp: datetime
983
    :ivar ServerPicoseconds:
984
    :vartype ServerPicoseconds: int
985
986
    '''
987
988 1
    def __init__(self, variant=None, status=None):
989 1
        self.Encoding = 0
990 1
        if not isinstance(variant, Variant):
991 1
            variant = Variant(variant)
992 1
        self.Value = variant
993 1
        if status is None:
994 1
            self.StatusCode = StatusCode()
995
        else:
996 1
            self.StatusCode = status
997 1
        self.SourceTimestamp = None  # DateTime()
998 1
        self.SourcePicoseconds = None
999 1
        self.ServerTimestamp = None  # DateTime()
1000 1
        self.ServerPicoseconds = None
1001 1
        self._freeze = True
1002
1003 1
    def to_binary(self):
1004 1
        packet = []
1005 1
        if self.Value:
1006 1
            self.Encoding |= (1 << 0)
1007 1
        if self.StatusCode:
1008 1
            self.Encoding |= (1 << 1)
1009 1
        if self.SourceTimestamp:
1010 1
            self.Encoding |= (1 << 2)
1011 1
        if self.ServerTimestamp:
1012 1
            self.Encoding |= (1 << 3)
1013 1
        if self.SourcePicoseconds:
1014
            self.Encoding |= (1 << 4)
1015 1
        if self.ServerPicoseconds:
1016
            self.Encoding |= (1 << 5)
1017 1
        packet.append(uatype_UInt8.pack(self.Encoding))
1018 1
        if self.Value:
1019 1
            packet.append(self.Value.to_binary())
1020 1
        if self.StatusCode:
1021 1
            packet.append(self.StatusCode.to_binary())
1022 1
        if self.SourceTimestamp:
1023 1
            packet.append(pack_datetime(self.SourceTimestamp))  # self.SourceTimestamp.to_binary())
1024 1
        if self.ServerTimestamp:
1025 1
            packet.append(pack_datetime(self.ServerTimestamp))  # self.ServerTimestamp.to_binary())
1026 1
        if self.SourcePicoseconds:
1027
            packet.append(uatype_UInt16.pack(self.SourcePicoseconds))
1028 1
        if self.ServerPicoseconds:
1029
            packet.append(uatype_UInt16.pack(self.ServerPicoseconds))
1030 1
        return b''.join(packet)
1031
1032 1
    @staticmethod
1033
    def from_binary(data):
1034 1
        encoding = ord(data.read(1))
1035 1
        if encoding & (1 << 0):
1036 1
            value = Variant.from_binary(data)
1037
        else:
1038
            value = None
1039 1
        if encoding & (1 << 1):
1040 1
            status = StatusCode.from_binary(data)
1041
        else:
1042
            status = None
1043 1
        obj = DataValue(value, status)
1044 1
        obj.Encoding = encoding
1045 1
        if obj.Encoding & (1 << 2):
1046 1
            obj.SourceTimestamp = unpack_datetime(data)  # DateTime.from_binary(data)
1047 1
        if obj.Encoding & (1 << 3):
1048 1
            obj.ServerTimestamp = unpack_datetime(data)  # DateTime.from_binary(data)
1049 1
        if obj.Encoding & (1 << 4):
1050
            obj.SourcePicoseconds = uatype_UInt16.unpack(data.read(2))[0]
1051 1
        if obj.Encoding & (1 << 5):
1052
            obj.ServerPicoseconds = uatype_UInt16.unpack(data.read(2))[0]
1053 1
        return obj
1054
1055 1
    def __str__(self):
1056
        s = 'DataValue(Value:{}'.format(self.Value)
1057
        if self.StatusCode is not None:
1058
            s += ', StatusCode:{}'.format(self.StatusCode)
1059
        if self.SourceTimestamp is not None:
1060
            s += ', SourceTimestamp:{}'.format(self.SourceTimestamp)
1061
        if self.ServerTimestamp is not None:
1062
            s += ', ServerTimestamp:{}'.format(self.ServerTimestamp)
1063
        if self.SourcePicoseconds is not None:
1064
            s += ', SourcePicoseconds:{}'.format(self.SourcePicoseconds)
1065
        if self.ServerPicoseconds is not None:
1066
            s += ', ServerPicoseconds:{}'.format(self.ServerPicoseconds)
1067
        s += ')'
1068
        return s
1069
1070 1
    __repr__ = __str__
1071
1072
1073 1
__nodeid_counter = 2000
1074
1075
1076 1
def generate_nodeid(idx):
1077
    global __nodeid_counter
1078 1
    __nodeid_counter += 1
1079
    return NodeId(__nodeid_counter, idx)
1080