Completed
Pull Request — master (#255)
by
unknown
04:23
created

Variant.__init__()   B

Complexity

Conditions 6

Size

Total Lines 14

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 13
CRAP Score 6

Importance

Changes 0
Metric Value
cc 6
dl 0
loc 14
ccs 13
cts 13
cp 1
crap 6
rs 8
c 0
b 0
f 0
1
"""
2
implement ua datatypes
3
"""
4 1
import logging
5 1
from enum import Enum, IntEnum
6 1
from datetime import datetime, timedelta, tzinfo, MAXYEAR
7 1
from calendar import timegm
8 1
import sys
9 1
import os
10 1
import uuid
11 1
import struct
12 1
import re
13 1
if sys.version_info.major > 2:
14 1
    unicode = str
15
16 1
from opcua.ua import status_codes
17 1
from opcua.common.uaerrors import UaError
18 1
from opcua.common.uaerrors import UaStatusCodeError
19 1
from opcua.common.uaerrors import UaStringParsingError
20
21
22 1
logger = logging.getLogger('opcua.uaprotocol')
23
24
25
# types that will packed and unpacked directly using struct (string, bytes and datetime are handles as special cases
26 1
UaTypes = ("Boolean", "SByte", "Byte", "Int8", "UInt8", "Int16", "UInt16", "Int32", "UInt32", "Int64", "UInt64", "Float", "Double")
27
28
29 1
EPOCH_AS_FILETIME = 116444736000000000  # January 1, 1970 as MS file time
30 1
HUNDREDS_OF_NANOSECONDS = 10000000
31 1
FILETIME_EPOCH_AS_DATETIME = datetime(1601, 1, 1)
32
33
34 1
class UTC(tzinfo):
35
36
    """UTC"""
37
38 1
    def utcoffset(self, dt):
39
        return timedelta(0)
40
41 1
    def tzname(self, dt):
42
        return "UTC"
43
44 1
    def dst(self, dt):
45 1
        return timedelta(0)
46
47
48
# method copied from David Buxton <[email protected]> sample code
49 1
def datetime_to_win_epoch(dt):
50 1
    if (dt.tzinfo is None) or (dt.tzinfo.utcoffset(dt) is None):
51 1
        dt = dt.replace(tzinfo=UTC())
52 1
    ft = EPOCH_AS_FILETIME + (timegm(dt.timetuple()) * HUNDREDS_OF_NANOSECONDS)
53 1
    return ft + (dt.microsecond * 10)
54
55
56 1
def win_epoch_to_datetime(epch):
57 1
    try:
58 1
        return FILETIME_EPOCH_AS_DATETIME + timedelta(microseconds=epch // 10)
59
    except OverflowError:
60
        # FILETIMEs after 31 Dec 9999 can't be converted to datetime
61
        logger.warning("datetime overflow: {}".format(epch))
62
        return datetime(MAXYEAR, 12, 31, 23, 59, 59, 999999)
63
64
65
#  minimum datetime as in ua spec, used for history
66 1
DateTimeMinValue = datetime(1606, 1, 1, 12, 0, 0)
67
68
69 1
def build_array_format_py2(prefix, length, fmtchar):
70
    return prefix + str(length) + fmtchar
71
72
73 1
def build_array_format_py3(prefix, length, fmtchar):
74 1
    return prefix + str(length) + chr(fmtchar)
75
76
77 1
if sys.version_info.major < 3:
78
    build_array_format = build_array_format_py2
79
else:
80 1
    build_array_format = build_array_format_py3
81
82
83 1
def pack_uatype_array_primitive(st, value, length):
84 1
    if length == 1:
85 1
        return b'\x01\x00\x00\x00' + st.pack(value[0])
86
    else:
87 1
        return struct.pack(build_array_format("<i", length, st.format[1]), length, *value)
88
89
90 1
def pack_uatype_array(uatype, value):
91 1
    if value is None:
92
        return b'\xff\xff\xff\xff'
93 1
    length = len(value)
94 1
    if length == 0:
95 1
        return b'\x00\x00\x00\x00'
96 1
    if uatype in uatype2struct:
97 1
        return pack_uatype_array_primitive(uatype2struct[uatype], value, length)
98 1
    b = []
99 1
    b.append(uatype_Int32.pack(length))
100 1
    for val in value:
101 1
        b.append(pack_uatype(uatype, val))
102 1
    return b"".join(b)
103
104
105 1
def pack_uatype(uatype, value):
106 1
    if uatype in uatype2struct:
107 1
        if value is None:
108
            value = 0
109 1
        return uatype2struct[uatype].pack(value)
110 1
    elif uatype == "Null":
111 1
        return b''
112 1
    elif uatype == "String":
113 1
        return pack_string(value)
114 1
    elif uatype in ("CharArray", "ByteString", "Custom"):
115 1
        return pack_bytes(value)
116 1
    elif uatype == "DateTime":
117 1
        return pack_datetime(value)
118 1
    elif uatype == "LocalizedText":
119
        if isinstance(value, LocalizedText):
120
            return value.to_binary()
121
        else:
122 1
            return LocalizedText(value).to_binary()
123 1
    elif uatype == "ExtensionObject":
124
        # dependency loop: classes in uaprotocol_auto use Variant defined in this file,
125 1
        # but Variant can contain any object from uaprotocol_auto as ExtensionObject.
126
        # Using local import to avoid import loop
127 1
        from opcua.ua.uaprotocol_auto import extensionobject_to_binary
128 1
        return extensionobject_to_binary(value)
129 1
    else:
130 1
        return value.to_binary()
131 1
132 1
uatype_Int8 = struct.Struct("<b")
133 1
uatype_SByte = uatype_Int8
134 1
uatype_Int16 = struct.Struct("<h")
135 1
uatype_Int32 = struct.Struct("<i")
136 1
uatype_Int64 = struct.Struct("<q")
137 1
uatype_UInt8 = struct.Struct("<B")
138 1
uatype_Char = uatype_UInt8
139 1
uatype_Byte = uatype_UInt8
140 1
uatype_UInt16 = struct.Struct("<H")
141
uatype_UInt32 = struct.Struct("<I")
142 1
uatype_UInt64 = struct.Struct("<Q")
143
uatype_Boolean = struct.Struct("<?")
144
uatype_Double = struct.Struct("<d")
145
uatype_Float = struct.Struct("<f")
146
147
uatype2struct = {
148
    "Int8": uatype_Int8,
149
    "SByte": uatype_SByte,
150
    "Int16": uatype_Int16,
151
    "Int32": uatype_Int32,
152
    "Int64": uatype_Int64,
153
    "UInt8": uatype_UInt8,
154
    "Char": uatype_Char,
155
    "Byte": uatype_Byte,
156
    "UInt16": uatype_UInt16,
157
    "UInt32": uatype_UInt32,
158
    "UInt64": uatype_UInt64,
159
    "Boolean": uatype_Boolean,
160 1
    "Double": uatype_Double,
161 1
    "Float": uatype_Float,
162 1
}
163 1
164 1
165 1
def unpack_uatype(uatype, data):
166 1
    if uatype in uatype2struct:
167 1
        st = uatype2struct[uatype]
168 1
        return st.unpack(data.read(st.size))[0]
169 1
    elif uatype == "String":
170 1
        return unpack_string(data)
171
    elif uatype in ("CharArray", "ByteString", "Custom"):
172
        return unpack_bytes(data)
173
    elif uatype == "DateTime":
174 1
        return unpack_datetime(data)
175 1
    elif uatype == "ExtensionObject":
176
        # dependency loop: classes in uaprotocol_auto use Variant defined in this file,
177 1
        # but Variant can contain any object from uaprotocol_auto as ExtensionObject.
178 1
        # Using local import to avoid import loop
179 1
        from opcua.ua.uaprotocol_auto import extensionobject_from_binary
180 1
        return extensionobject_from_binary(data)
181 1
    else:
182
        glbs = globals()
183
        if uatype in glbs:
184
            klass = glbs[uatype]
185 1
            if hasattr(klass, 'from_binary'):
186 1
                return klass.from_binary(data)
187 1
        raise UaError("can not unpack unknown uatype %s" % uatype)
188
189 1
190 1
def unpack_uatype_array(uatype, data):
191 1
    length = uatype_Int32.unpack(data.read(4))[0]
192 1
    if length == -1:
193 1
        return None
194 1
    elif length == 0:
195
        return []
196 1
    elif uatype in uatype2struct:
197 1
        st = uatype2struct[uatype]
198
        if length == 1:
199 1
            return list(st.unpack(data.read(st.size)))
200 1
        else:
201 1
            arrst = struct.Struct(build_array_format("<", length, st.format[1]))
202 1
            return list(arrst.unpack(data.read(arrst.size)))
203
    else:
204
        result = []
205 1
        for _ in range(0, length):
206 1
            result.append(unpack_uatype(uatype, data))
207 1
        return result
208
209
210 1
def pack_datetime(value):
211 1
    epch = datetime_to_win_epoch(value)
212 1
    return uatype_Int64.pack(epch)
213
214
215 1
def unpack_datetime(data):
216 1
    epch = uatype_Int64.unpack(data.read(8))[0]
217 1
    return win_epoch_to_datetime(epch)
218 1
219 1
220 1
def pack_string(string):
221 1
    if string is None:
222
        return uatype_Int32.pack(-1)
223 1
    if isinstance(string, unicode):
224
        string = string.encode('utf-8')
225
    length = len(string)
226 1
    return uatype_Int32.pack(length) + string
227 1
228 1
pack_bytes = pack_string
229 1
230 1
231
def unpack_bytes(data):
232
    length = uatype_Int32.unpack(data.read(4))[0]
233 1
    if length == -1:
234 1
        return None
235 1
    return data.read(length)
236 1
237 1
238
def py3_unpack_string(data):
239
    b = unpack_bytes(data)
240 1
    if b is None:
241
        return b
242
    return b.decode("utf-8")
243 1
244
245
if sys.version_info.major < 3:
246 1
    unpack_string = unpack_bytes
247 1
else:
248 1
    unpack_string = py3_unpack_string
249
250
251 1
def test_bit(data, offset):
252 1
    mask = 1 << offset
253 1
    return data & mask
254
255
256 1
def set_bit(data, offset):
257 1
    mask = 1 << offset
258 1
    return data | mask
259
260
261 1
def unset_bit(data, offset):
262
    mask = 1 << offset
263
    return data & ~mask
264
265
266
class _FrozenClass(object):
267 1
268
    """
269 1
    make it impossible to add members to a class.
270 1
    This is a hack since I found out that most bugs are due to misspelling a variable in protocol
271
    """
272 1
    _freeze = False
273
274 1
    def __setattr__(self, key, value):
275
        if self._freeze and not hasattr(self, key):
276
            raise TypeError("Error adding member '{}' to class '{}', class is frozen, members are {}".format(key, self.__class__.__name__, self.__dict__.keys()))
277
        object.__setattr__(self, key, value)
278
279
if "PYOPCUA_NO_TYPO_CHECK" in os.environ:
280
    # typo check is cpu consuming, but it will make debug easy.
281 1
    # if typo check is not need (in production), please set env PYOPCUA_NO_TYPO_CHECK.
282
    # this will make all uatype class inherit from object intead of _FrozenClass
283
    # and skip the typo check.
284 1
    FrozenClass = object
285
else:
286
    FrozenClass = _FrozenClass
287
288
289
class ValueRank(IntEnum):
290 1
    """
291 1
    Defines dimensions of a variable.
292 1
    This enum does not support all cases since ValueRank support any n>0
293 1
    but since it is an IntEnum it can be replace by a normal int
294 1
    """
295
    ScalarOrOneDimension = -3
296 1
    Any = -2
297 1
    Scalar = -1
298 1
    OneOrMoreDimensions = 0
299
    OneDimension = 1
300
    # the next names are not in spec but so common we express them here
301 1
    TwoDimensions = 2
302
    ThreeDimensions = 3
303
    FourDimensions = 3
304 1
305 1
306 1
class AccessLevel(IntEnum):
307 1
    """
308 1
    """
309
    CurrentRead = 0
310
    CurrentWrite = 1
311 1
    HistoryRead = 2
312
    HistoryWrite = 3
313
    SemanticChange = 4
314
315 1
316 1
class AccessLevelMask(IntEnum):
317 1
    """
318 1
    Mask for access level
319 1
    """
320
    CurrentRead = 1 << AccessLevel.CurrentRead
321
    CurrentWrite = 1 << AccessLevel.CurrentWrite
322 1
    HistoryRead = 1 << AccessLevel.HistoryRead
323
    HistoryWrite = 1 << AccessLevel.HistoryWrite
324 1
    SemanticChange = 1 << AccessLevel.SemanticChange
325 1
326 1
327
class WriteMask(IntEnum):
328 1
    """
329 1
    Mask to indicate which attribute of a node is writable
330
    Rmq: This is not a mask but bit index....
331 1
    """
332
    AccessLevel = 0
333
    ArrayDimensions = 1
334 1
    BrowseName = 2
335
    ContainsNoLoops = 3
336 1
    DataType = 4
337 1
    Description = 5
338 1
    DisplayName = 6
339
    EventNotifier = 7
340 1
    Executable = 8
341 1
    Historizing = 9
342
    InverseName = 10
343
    IsAbstract = 11
344 1
    MinimumSamplingInterval = 12
345
    NodeClass = 13
346
    NodeId = 14
347
    Symmetric = 15
348
    UserAccessLevel = 16
349
    UserExecutable = 17
350
    UserWriteMask = 18
351
    ValueRank = 19
352
    WriteMask = 20
353
    ValueForVariableType = 21
354
355 1
356 1
class EventNotifier(IntEnum):
357 1
    """
358 1
    """
359
    SubscribeToEvents = 0
360 1
    HistoryRead = 2
361 1
    HistoryWrite = 3
362
363 1
364
class Guid(FrozenClass):
365 1
366 1
    def __init__(self):
367 1
        self.uuid = uuid.uuid4()
368
        self._freeze = True
369 1
370
    def to_binary(self):
371
        return self.uuid.bytes
372
373
    def __hash__(self):
374 1
        return hash(self.uuid.bytes)
375 1
376
    @staticmethod
377 1
    def from_binary(data):
378
        g = Guid()
379
        g.uuid = uuid.UUID(bytes=data.read(16))
380
        return g
381 1
382 1
    def __eq__(self, other):
383 1
        return isinstance(other, Guid) and self.uuid == other.uuid
384
385 1
386
class StatusCode(FrozenClass):
387 1
388
    """
389 1
    :ivar value:
390
    :vartype value: int
391
    :ivar name:
392 1
    :vartype name: string
393
    :ivar doc:
394
    :vartype doc: string
395
    """
396 1
397 1
    def __init__(self, value=0):
398 1
        if isinstance(value, str):
399 1
            self.name = value
400 1
            self.value = getattr(status_codes.StatusCodes, value)
401 1
        else:
402 1
            self.value = value
403
            self.name, self.doc = status_codes.get_name_and_doc(value)
404
        self._freeze = True
405 1
406
    def to_binary(self):
407
        return uatype_UInt32.pack(self.value)
408
409
    @staticmethod
410
    def from_binary(data):
411
        val = uatype_UInt32.unpack(data.read(4))[0]
412
        sc = StatusCode(val)
413
        return sc
414
415
    def check(self):
416
        """
417
        Raises an exception if the status code is anything else than 0 (good).
418
419
        Use the is_good() method if you do not want an exception.
420
        """
421
        if self.value != 0:
422
            raise UaStatusCodeError(self.value)
423
424
    def is_good(self):
425
        """
426 1
        return True if status is Good.
427 1
        """
428 1
        if self.value == 0:
429 1
            return True
430 1
        return False
431 1
432 1
    def __str__(self):
433 1
        return 'StatusCode({})'.format(self.name)
434 1
    __repr__ = __str__
435 1
436 1
    def __eq__(self, other):
437 1
        return self.value == other.value
438 1
439 1
    def __ne__(self, other):
440 1
        return not self.__eq__(other)
441 1
442 1
443 1
class NodeIdType(Enum):
444
    TwoByte = 0
445
    FourByte = 1
446
    Numeric = 2
447
    String = 3
448
    Guid = 4
449 1
    ByteString = 5
450 1
451 1
452
class NodeId(FrozenClass):
453 1
454
    """
455 1
    NodeId Object
456 1
457
    Args:
458 1
        identifier: The identifier might be an int, a string, bytes or a Guid
459 1
        namespaceidx(int): The index of the namespace
460
        nodeidtype(NodeIdType): The type of the nodeid if it cannor be guess or you want something special like twobyte nodeid or fourbytenodeid
461 1
462 1
463
    :ivar Identifier:
464 1
    :vartype Identifier: NodeId
465 1
    :ivar NamespaceIndex:
466 1
    :vartype NamespaceIndex: Int
467 1
    :ivar NamespaceUri:
468
    :vartype NamespaceUri: String
469 1
    :ivar ServerIndex:
470 1
    :vartype ServerIndex: Int
471 1
    """
472 1
473 1
    def __init__(self, identifier=None, namespaceidx=0, nodeidtype=None):
474 1
        self.Identifier = identifier
475
        self.NamespaceIndex = namespaceidx
476 1
        self.NodeIdType = nodeidtype
477
        self.NamespaceUri = ""
478 1
        self.ServerIndex = 0
479 1
        self._freeze = True
480 1
        if not isinstance(self.NamespaceIndex, int):
481 1
            raise UaError("NamespaceIndex must be an int")
482
        if self.Identifier is None:
483 1
            self.Identifier = 0
484
            self.NodeIdType = NodeIdType.TwoByte
485 1
            return
486 1
        if self.NodeIdType is None:
487 1
            if isinstance(self.Identifier, int):
488 1
                self.NodeIdType = NodeIdType.Numeric
489 1
            elif isinstance(self.Identifier, str):
490 1
                self.NodeIdType = NodeIdType.String
491 1
            elif isinstance(self.Identifier, bytes):
492 1
                self.NodeIdType = NodeIdType.ByteString
493 1
            else:
494 1
                raise UaError("NodeId: Could not guess type of NodeId, set NodeIdType")
495 1
496 1
    def __key(self):
497 1
        if self.NodeIdType in (NodeIdType.TwoByte, NodeIdType.FourByte, NodeIdType.Numeric):  # twobyte, fourbyte and numeric may represent the same node
498 1
            return self.NamespaceIndex, self.Identifier
499 1
        else:
500 1
            return self.NodeIdType, self.NamespaceIndex, self.Identifier
501 1
502 1
    def __eq__(self, node):
503 1
        return isinstance(node, NodeId) and self.__key() == node.__key()
504 1
505 1
    def __ne__(self, other):
506
        return not self.__eq__(other)
507
508 1
    def __hash__(self):
509
        return hash(self.__key())
510
511 1
    def is_null(self):
512 1
        if self.NamespaceIndex != 0:
513
            return False
514
        return self.has_null_identifier()
515 1
516 1
    def has_null_identifier(self):
517 1
        if not self.Identifier:
518 1
            return True
519 1
        if self.NodeIdType == NodeIdType.Guid and re.match(b'0.', self.Identifier):
520 1
            return True
521
        return False
522 1
523 1
    @staticmethod
524 1
    def from_string(string):
525 1
        try:
526 1
            return NodeId._from_string(string)
527 1
        except ValueError as ex:
528 1
            raise UaStringParsingError("Error parsing string {}".format(string), ex)
529 1
530 1
    @staticmethod
531 1
    def _from_string(string):
532 1
        l = string.split(";")
533 1
        identifier = None
534 1
        namespace = 0
535
        ntype = None
536
        srv = None
537
        nsu = None
538
        for el in l:
539 1
            if not el:
540 1
                continue
541
            k, v = el.split("=", 1)
542 1
            k = k.strip()
543
            v = v.strip()
544 1
            if k == "ns":
545
                namespace = int(v)
546 1
            elif k == "i":
547 1
                ntype = NodeIdType.Numeric
548 1
                identifier = int(v)
549
            elif k == "s":
550 1
                ntype = NodeIdType.String
551 1
                identifier = v
552 1
            elif k == "g":
553 1
                ntype = NodeIdType.Guid
554 1
                identifier = v
555 1
            elif k == "b":
556 1
                ntype = NodeIdType.ByteString
557 1
                identifier = v
558 1
            elif k == "srv":
559
                srv = v
560 1
            elif k == "nsu":
561 1
                nsu = v
562
        if identifier is None:
563
            raise UaStringParsingError("Could not find identifier in string: " + string)
564 1
        nodeid = NodeId(identifier, namespace, ntype)
565
        nodeid.NamespaceUri = nsu
566
        nodeid.ServerIndex = srv
567
        return nodeid
568 1
569
    def to_string(self):
570 1
        string = ""
571 1
        if self.NamespaceIndex != 0:
572 1
            string += "ns={};".format(self.NamespaceIndex)
573
        ntype = None
574 1
        if self.NodeIdType == NodeIdType.Numeric:
575 1
            ntype = "i"
576 1
        elif self.NodeIdType == NodeIdType.String:
577 1
            ntype = "s"
578 1
        elif self.NodeIdType == NodeIdType.TwoByte:
579 1
            ntype = "i"
580 1
        elif self.NodeIdType == NodeIdType.FourByte:
581 1
            ntype = "i"
582 1
        elif self.NodeIdType == NodeIdType.Guid:
583 1
            ntype = "g"
584 1
        elif self.NodeIdType == NodeIdType.ByteString:
585 1
            ntype = "b"
586 1
        string += "{}={}".format(ntype, self.Identifier)
587 1
        if self.ServerIndex:
588 1
            string = "srv=" + str(self.ServerIndex) + string
589
        if self.NamespaceUri:
590
            string += "nsu={}".format(self.NamespaceUri)
591
        return string
592 1
593
    def __str__(self):
594 1
        return "{}NodeId({})".format(self.NodeIdType.name, self.to_string())
595
    __repr__ = __str__
596
597 1
    def to_binary(self):
598
        if self.NodeIdType == NodeIdType.TwoByte:
599
            return struct.pack("<BB", self.NodeIdType.value, self.Identifier)
600 1
        elif self.NodeIdType == NodeIdType.FourByte:
601
            return struct.pack("<BBH", self.NodeIdType.value, self.NamespaceIndex, self.Identifier)
602 1
        elif self.NodeIdType == NodeIdType.Numeric:
603 1
            return struct.pack("<BHI", self.NodeIdType.value, self.NamespaceIndex, self.Identifier)
604
        elif self.NodeIdType == NodeIdType.String:
605
            return struct.pack("<BH", self.NodeIdType.value, self.NamespaceIndex) + \
606 1
                pack_string(self.Identifier)
607
        elif self.NodeIdType == NodeIdType.ByteString:
608 1
            return struct.pack("<BH", self.NodeIdType.value, self.NamespaceIndex) + \
609 1
                pack_bytes(self.Identifier)
610
        else:
611
            return struct.pack("<BH", self.NodeIdType.value, self.NamespaceIndex) + \
612 1
                self.Identifier.to_binary()
613
        #FIXME: Missing NNamespaceURI and ServerIndex
614 1
615 1
    @staticmethod
616
    def from_binary(data):
617
        nid = NodeId()
618 1
        encoding = ord(data.read(1))
619
        nid.NodeIdType = NodeIdType(encoding & 0b00111111)
620 1
621 1
        if nid.NodeIdType == NodeIdType.TwoByte:
622
            nid.Identifier = ord(data.read(1))
623
        elif nid.NodeIdType == NodeIdType.FourByte:
624 1
            nid.NamespaceIndex, nid.Identifier = struct.unpack("<BH", data.read(3))
625
        elif nid.NodeIdType == NodeIdType.Numeric:
626 1
            nid.NamespaceIndex, nid.Identifier = struct.unpack("<HI", data.read(6))
627 1
        elif nid.NodeIdType == NodeIdType.String:
628
            nid.NamespaceIndex = uatype_UInt16.unpack(data.read(2))[0]
629
            nid.Identifier = unpack_string(data)
630 1
        elif nid.NodeIdType == NodeIdType.ByteString:
631
            nid.NamespaceIndex = uatype_UInt16.unpack(data.read(2))[0]
632 1
            nid.Identifier = unpack_bytes(data)
633 1
        elif nid.NodeIdType == NodeIdType.Guid:
634
            nid.NamespaceIndex = uatype_UInt16.unpack(data.read(2))[0]
635
            nid.Identifier = Guid.from_binary(data)
636 1
        else:
637
            raise UaError("Unknown NodeId encoding: " + str(nid.NodeIdType))
638
639 1
        if test_bit(encoding, 7):
640
            nid.NamespaceUri = unpack_string(data)
641
        if test_bit(encoding, 6):
642
            nid.ServerIndex = uatype_UInt32.unpack(data.read(4))[0]
643
644
        return nid
645 1
646 1
647
class TwoByteNodeId(NodeId):
648 1
649 1
    def __init__(self, identifier):
650 1
        NodeId.__init__(self, identifier, 0, NodeIdType.TwoByte)
651
652 1
653 1
class FourByteNodeId(NodeId):
654
655 1
    def __init__(self, identifier, namespace=0):
656
        NodeId.__init__(self, identifier, namespace, NodeIdType.FourByte)
657 1
658 1
659 1
class NumericNodeId(NodeId):
660 1
661 1
    def __init__(self, identifier, namespace=0):
662 1
        NodeId.__init__(self, identifier, namespace, NodeIdType.Numeric)
663
664 1
665 1
class ByteStringNodeId(NodeId):
666 1
667
    def __init__(self, identifier, namespace=0):
668 1
        NodeId.__init__(self, identifier, namespace, NodeIdType.ByteString)
669 1
670 1
671 1
class GuidNodeId(NodeId):
672 1
673
    def __init__(self, identifier, namespace=0):
674 1
        NodeId.__init__(self, identifier, namespace, NodeIdType.Guid)
675
676 1
677 1
class StringNodeId(NodeId):
678 1
679 1
    def __init__(self, identifier, namespace=0):
680
        NodeId.__init__(self, identifier, namespace, NodeIdType.String)
681 1
682 1
683
ExpandedNodeId = NodeId
684 1
685
686
class QualifiedName(FrozenClass):
687 1
688
    '''
689
    A string qualified with a namespace index.
690 1
    '''
691
692
    def __init__(self, name=None, namespaceidx=0):
693 1
        if not isinstance(namespaceidx, int):
694
            raise UaError("namespaceidx must be an int")
695
        self.NamespaceIndex = namespaceidx
696
        self.Name = name
697
        self._freeze = True
698
699 1
    def to_string(self):
700 1
        return "{}:{}".format(self.NamespaceIndex, self.Name)
701 1
702 1
    @staticmethod
703 1
    def from_string(string):
704 1
        if ":" in string:
705 1
            try:
706 1
                idx, name = string.split(":", 1)
707 1
                idx = int(idx)
708
            except (TypeError, ValueError) as ex:
709 1
                raise UaStringParsingError("Error parsing string {}".format(string), ex)
710 1
        else:
711 1
            idx = 0
712
            name = string
713 1
        return QualifiedName(name, idx)
714 1
715 1
    def to_binary(self):
716 1
        packet = []
717
        packet.append(uatype_UInt16.pack(self.NamespaceIndex))
718 1
        packet.append(pack_string(self.Name))
719 1
        return b''.join(packet)
720 1
721
    @staticmethod
722 1
    def from_binary(data):
723
        obj = QualifiedName()
724 1
        obj.NamespaceIndex = uatype_UInt16.unpack(data.read(2))[0]
725 1
        obj.Name = unpack_string(data)
726 1
        return obj
727
728 1
    def __eq__(self, bname):
729 1
        return isinstance(bname, QualifiedName) and self.Name == bname.Name and self.NamespaceIndex == bname.NamespaceIndex
730 1
731
    def __ne__(self, other):
732 1
        return not self.__eq__(other)
733
734
    def __lt__(self, other):
735
        if not isinstance(other, QualifiedName):
736 1
            raise TypeError("Cannot compare QualifiedName and {}".format(other))
737
        if self.NamespaceIndex == other.NamespaceIndex:
738
            return self.Name < other.Name
739
        else:
740 1
            return self.NamespaceIndex < other.NamespaceIndex
741
742 1
    def __str__(self):
743 1
        return 'QualifiedName({}:{})'.format(self.NamespaceIndex, self.Name)
744 1
745 1
    __repr__ = __str__
746
747 1
748 1
class LocalizedText(FrozenClass):
749
750
    '''
751 1
    A string qualified with a namespace index.
752
    '''
753
754
    def __init__(self, text=None):
755
        self.Encoding = 0
756
        self.Text = text
757
        if isinstance(self.Text, unicode):
758
            self.Text = self.Text.encode('utf-8')
759
        if self.Text:
760
            self.Encoding |= (1 << 1)
761
        self.Locale = None 
762
        self._freeze = True
763
764
    def to_binary(self):
765 1
        packet = []
766 1
        if self.Locale:
767 1
            self.Encoding |= (1 << 0)
768 1
        if self.Text:
769 1
            self.Encoding |= (1 << 1)
770
        packet.append(uatype_UInt8.pack(self.Encoding))
771 1
        if self.Locale:
772 1
            packet.append(pack_bytes(self.Locale))
773 1
        if self.Text:
774 1
            packet.append(pack_bytes(self.Text))
775 1
        return b''.join(packet)
776 1
777 1
    @staticmethod
778 1
    def from_binary(data):
779 1
        obj = LocalizedText()
780
        obj.Encoding = ord(data.read(1))
781 1
        if obj.Encoding & (1 << 0):
782
            obj.Locale = unpack_bytes(data)
783
        if obj.Encoding & (1 << 1):
784
            obj.Text = unpack_bytes(data)
785
        return obj
786
787
    def to_string(self):
788
        # FIXME: use local
789
        if self.Text is None:
790 1
            return ""
791
        return self.Text.decode('utf-8')
792
793
    def __str__(self):
794
        return 'LocalizedText(' + 'Encoding:' + str(self.Encoding) + ', ' + \
795
            'Locale:' + str(self.Locale) + ', ' + \
796
            'Text:' + str(self.Text) + ')'
797
    __repr__ = __str__
798 1
799
    def __eq__(self, other):
800
        if isinstance(other, LocalizedText) and self.Locale == other.Locale and self.Text == other.Text:
801
            return True
802 1
        return False
803
804
    def __ne__(self, other):
805 1 View Code Duplication
        return not self.__eq__(other)
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
806
807
808
class ExtensionObject(FrozenClass):
809
810
    '''
811
812
    Any UA object packed as an ExtensionObject
813
814
815
    :ivar TypeId:
816
    :vartype TypeId: NodeId
817
    :ivar Body:
818
    :vartype Body: bytes
819
820
    '''
821
822
    def __init__(self):
823
        self.TypeId = NodeId()
824
        self.Encoding = 0
825
        self.Body = b''
826
        self._freeze = True
827
828
    def to_binary(self):
829
        packet = []
830
        if self.Body:
831
            self.Encoding |= (1 << 0)
832
        packet.append(self.TypeId.to_binary())
833
        packet.append(pack_uatype('UInt8', self.Encoding))
834
        if self.Body:
835
            packet.append(pack_uatype('ByteString', self.Body))
836
        return b''.join(packet)
837
838
    @staticmethod
839
    def from_binary(data):
840 1
        obj = ExtensionObject()
841 1
        obj.TypeId = NodeId.from_binary(data)
842 1
        obj.Encoding = unpack_uatype('UInt8', data)
843 1
        if obj.Encoding & (1 << 0):
844 1
            obj.Body = unpack_uatype('ByteString', data)
845 1
        return obj
846 1
847 1
    @staticmethod
848 1
    def from_object(obj):
849 1
        ext = ExtensionObject()
850 1
        oid = getattr(ObjectIds, "{}_Encoding_DefaultBinary".format(obj.__class__.__name__))
851 1
        ext.TypeId = FourByteNodeId(oid)
852 1
        ext.Body = obj.to_binary()
853 1
        return ext
854 1
855 1
    def __str__(self):
856 1
        return 'ExtensionObject(' + 'TypeId:' + str(self.TypeId) + ', ' + \
857 1
            'Encoding:' + str(self.Encoding) + ', ' + str(len(self.Body)) + ' bytes)'
858 1
859 1
    __repr__ = __str__
860 1
861 1
862 1
class VariantType(Enum):
863 1
864 1
    '''
865 1
    The possible types of a variant.
866
867
    :ivar Null:
868 1
    :ivar Boolean:
869 1
    :ivar SByte:
870 1
    :ivar Byte:
871 1
    :ivar Int16:
872 1
    :ivar UInt16:
873 1
    :ivar Int32:
874
    :ivar UInt32:
875 1
    :ivar Int64:
876
    :ivar UInt64:
877 1
    :ivar Float:
878
    :ivar Double:
879 1
    :ivar String:
880 1
    :ivar DateTime:
881
    :ivar Guid:
882
    :ivar ByteString:
883 1
    :ivar XmlElement:
884
    :ivar NodeId:
885
    :ivar ExpandedNodeId:
886
    :ivar StatusCode:
887
    :ivar QualifiedName:
888
    :ivar LocalizedText:
889
    :ivar ExtensionObject:
890
    :ivar DataValue:
891
    :ivar Variant:
892
    :ivar DiagnosticInfo:
893
894
895
896
    '''
897 1
    Null = 0
898 1
    Boolean = 1
899 1
    SByte = 2
900 1
    Byte = 3
901 1
    Int16 = 4
902 1
    UInt16 = 5
903 1
    Int32 = 6
904 1
    UInt32 = 7
905 1
    Int64 = 8
906 1
    UInt64 = 9
907 1
    Float = 10
908 1
    Double = 11
909 1
    String = 12
910 1
    DateTime = 13
911
    Guid = 14
912 1
    ByteString = 15
913 1
    XmlElement = 16
914 1
    NodeId = 17
915 1
    ExpandedNodeId = 18
916
    StatusCode = 19
917 1
    QualifiedName = 20
918 1
    LocalizedText = 21
919
    ExtensionObject = 22
920 1
    DataValue = 23
921 1
    Variant = 24
922 1
    DiagnosticInfo = 25
923 1
924 1
925
class VariantTypeCustom(object):
926 1
    def __init__(self, val):
927 1
        self.name = "Custom"
928 1
        self.value = val
929 1
        if self.value > 0b00111111:
930 1
            raise UaError("Cannot create VariantType. VariantType must be {} > x > {}, received {}".format(0b111111, 25, val))
931 1
932 1
    def __str__(self):
933 1
        return "VariantType.Custom:{}".format(self.value)
934 1
    __repr__ = __str__
935 1
936 1
    def __eq__(self, other):
937 1
        return self.value == other.value
938 1
939 1
940 1
class Variant(FrozenClass):
941
942 1
    """
943 1
    Create an OPC-UA Variant object.
944 1
    if no argument a Null Variant is created.
945 1
    if not variant type is given, attemps to guess type from python type
946 1
    if a variant is given as value, the new objects becomes a copy of the argument
947
948
    :ivar Value:
949
    :vartype Value: Any supported type
950 1
    :ivar VariantType:
951 1
    :vartype VariantType: VariantType
952 1
    """
953
954 1
    def __init__(self, value=None, varianttype=None, dimensions=None):
955 1
        self.Value = value
956 1
        self.VariantType = varianttype
957 1
        self.Dimensions = dimensions
958 1
        self._freeze = True
959 1
        if isinstance(value, Variant):
960 1
            self.Value = value.Value
961 1
            self.VariantType = value.VariantType
962 1
        if self.VariantType is None:
963 1
            self.VariantType = self._guess_type(self.Value)
964 1
        if self.Dimensions is None and type(self.Value) in (list, tuple):
965
            dims = get_shape(self.Value)
966 1
            if len(dims) > 1:
967 1
                self.Dimensions = dims
968
969 1
    def __eq__(self, other):
970
        if isinstance(other, Variant) and self.VariantType == other.VariantType and self.Value == other.Value:
971 1
            return True
972
        return False
973 1
974 1
    def __ne__(self, other):
975 1
        return not self.__eq__(other)
976 1
977 1
    def _guess_type(self, val):
978
        if isinstance(val, (list, tuple)):
979 1
            error_val = val
980 1
        while isinstance(val, (list, tuple)):
981 1
            if len(val) == 0:
982 1
                raise UaError("could not guess UA type of variable {}".format(error_val))
983 1
            val = val[0]
984
        if val is None:
985 1
            return VariantType.Null
986 1
        elif isinstance(val, bool):
987 1
            return VariantType.Boolean
988 1
        elif isinstance(val, float):
989
            return VariantType.Double
990 1
        elif isinstance(val, int):
991
            return VariantType.Int64
992
        elif type(val) in (str, unicode):
993 1
            return VariantType.String
994 1
        elif isinstance(val, bytes):
995 1
            return VariantType.ByteString
996 1
        elif isinstance(val, datetime):
997 1
            return VariantType.DateTime
998 1
        else:
999 1
            if isinstance(val, object):
1000 1
                try:
1001 1
                    return getattr(VariantType, val.__class__.__name__)
1002 1
                except AttributeError:
1003 1
                    return VariantType.ExtensionObject
1004 1
            else:
1005
                raise UaError("Could not guess UA type of {} with type {}, specify UA type".format(val, type(val)))
1006
1007 1
    def __str__(self):
1008
        return "Variant(val:{!s},type:{})".format(self.Value, self.VariantType)
1009
    __repr__ = __str__
1010
1011
    def to_binary(self):
1012 1
        b = []
1013
        encoding = self.VariantType.value & 0b111111
1014
        if type(self.Value) in (list, tuple):
1015
            if self.Dimensions is not None:
1016
                encoding = set_bit(encoding, 6)
1017
            encoding = set_bit(encoding, 7)
1018
            b.append(uatype_UInt8.pack(encoding))
1019
            b.append(pack_uatype_array(self.VariantType.name, flatten(self.Value)))
1020
            if self.Dimensions is not None:
1021
                b.append(pack_uatype_array("Int32", self.Dimensions))
1022
        else:
1023 1
            b.append(uatype_UInt8.pack(encoding))
1024 1
            b.append(pack_uatype(self.VariantType.name, self.Value))
1025 1
1026 1
        return b"".join(b)
1027 1
1028 1
    @staticmethod
1029 1
    def from_binary(data):
1030 1
        dimensions = None
1031
        encoding = ord(data.read(1))
1032
        int_type = encoding & 0b00111111
1033 1
        vtype = DataType_to_VariantType(int_type)
1034 1
        if vtype == VariantType.Null:
1035 1
            return Variant(None, vtype, encoding)
1036 1
        if test_bit(encoding, 7):
1037 1
            value = unpack_uatype_array(vtype.name, data)
1038 1
        else:
1039 1
            value = unpack_uatype(vtype.name, data)
1040 1
        if test_bit(encoding, 6):
1041
            dimensions = unpack_uatype_array("Int32", data)
1042
            value = reshape(value, dimensions)
1043 1 View Code Duplication
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1044
        return Variant(value, vtype, dimensions)
1045
1046
1047 1
def reshape(flat, dims):
1048
    subdims = dims[1:]
1049
    subsize = 1
1050
    for i in subdims:
1051
        if i == 0:
1052
            i = 1
1053
        subsize *= i
1054
    while dims[0] * subsize > len(flat):
1055 1
        flat.append([])
1056
    if not subdims or subdims == [0]:
1057
        return flat
1058 1
    return [reshape(flat[i: i + subsize], subdims) for i in range(0, len(flat), subsize)]
1059
1060
1061
def _split_list(l, n):
1062 1
    n = max(1, n)
1063
    return [l[i:i + n] for i in range(0, len(l), n)]
1064
1065 1
1066
def flatten_and_get_shape(mylist):
1067
    dims = []
1068 1
    dims.append(len(mylist))
1069
    while isinstance(mylist[0], (list, tuple)):
1070
        dims.append(len(mylist[0]))
1071 1
        mylist = [item for sublist in mylist for item in sublist]
1072
        if len(mylist) == 0:
1073
            break
1074
    return mylist, dims
1075
1076
1077
def flatten(mylist):
1078
    if len(mylist) == 0:
1079
        return mylist
1080
    while isinstance(mylist[0], (list, tuple)):
1081
        mylist = [item for sublist in mylist for item in sublist]
1082
        if len(mylist) == 0:
1083
            break
1084
    return mylist
1085
1086
1087
def get_shape(mylist):
1088
    dims = []
1089
    while isinstance(mylist, (list, tuple)):
1090
        dims.append(len(mylist))
1091
        if len(mylist) == 0:
1092 1
            break
1093 1
        mylist = mylist[0]
1094 1
    return dims
1095 1
1096 1
1097 1
class XmlElement(FrozenClass):
1098 1
    '''
1099
    An XML element encoded as an UTF-8 string.
1100 1
    '''
1101 1
    def __init__(self, binary=None):
1102 1
        if binary is not None:
1103 1
            self._binary_init(binary)
1104 1
            self._freeze = True
1105 1
            return
1106
        self.Value = []
1107 1
        self._freeze = True
1108 1
1109 1
    def to_binary(self):
1110 1
        return pack_string(self.Value)
1111 1
1112 1
    @staticmethod
1113 1
    def from_binary(data):
1114 1
        return XmlElement(data)
1115 1
1116 1
    def _binary_init(self, data):
1117 1
        self.Value = unpack_string(data)
1118
1119 1
    def __str__(self):
1120
        return 'XmlElement(Value:' + str(self.Value) + ')'
1121 1
1122 1
    __repr__ = __str__
1123 1
1124 1
1125 1
class DataValue(FrozenClass):
1126 1
1127 1
    '''
1128 1
    A value with an associated timestamp, and quality.
1129 1
    Automatically generated from xml , copied and modified here to fix errors in xml spec
1130 1
1131
    :ivar Value:
1132 1
    :vartype Value: Variant
1133
    :ivar StatusCode:
1134 1
    :vartype StatusCode: StatusCode
1135
    :ivar SourceTimestamp:
1136 1
    :vartype SourceTimestamp: datetime
1137
    :ivar SourcePicoSeconds:
1138 1
    :vartype SourcePicoSeconds: int
1139 1
    :ivar ServerTimestamp:
1140 1
    :vartype ServerTimestamp: datetime
1141
    :ivar ServerPicoseconds:
1142
    :vartype ServerPicoseconds: int
1143 1
1144 1
    '''
1145
1146
    def __init__(self, variant=None, status=None):
1147 1
        self.Encoding = 0
1148 1
        if not isinstance(variant, Variant):
1149 1
            variant = Variant(variant)
1150 1
        self.Value = variant
1151 1
        if status is None:
1152 1
            self.StatusCode = StatusCode()
1153 1
        else:
1154
            self.StatusCode = status
1155 1
        self.SourceTimestamp = None  # DateTime()
1156
        self.SourcePicoseconds = None
1157 1
        self.ServerTimestamp = None  # DateTime()
1158
        self.ServerPicoseconds = None
1159 1
        self._freeze = True
1160
1161
    def to_binary(self):
1162
        packet = []
1163
        if self.Value:
1164
            self.Encoding |= (1 << 0)
1165
        if self.StatusCode:
1166
            self.Encoding |= (1 << 1)
1167
        if self.SourceTimestamp:
1168
            self.Encoding |= (1 << 2)
1169
        if self.ServerTimestamp:
1170
            self.Encoding |= (1 << 3)
1171
        if self.SourcePicoseconds:
1172
            self.Encoding |= (1 << 4)
1173
        if self.ServerPicoseconds:
1174 1
            self.Encoding |= (1 << 5)
1175
        packet.append(uatype_UInt8.pack(self.Encoding))
1176
        if self.Value:
1177
            packet.append(self.Value.to_binary())
1178
        if self.StatusCode:
1179
            packet.append(self.StatusCode.to_binary())
1180
        if self.SourceTimestamp:
1181
            packet.append(pack_datetime(self.SourceTimestamp))  # self.SourceTimestamp.to_binary())
1182
        if self.ServerTimestamp:
1183
            packet.append(pack_datetime(self.ServerTimestamp))  # self.ServerTimestamp.to_binary())
1184
        if self.SourcePicoseconds:
1185
            packet.append(uatype_UInt16.pack(self.SourcePicoseconds))
1186
        if self.ServerPicoseconds:
1187
            packet.append(uatype_UInt16.pack(self.ServerPicoseconds))
1188
        return b''.join(packet)
1189
1190
    @staticmethod
1191
    def from_binary(data):
1192
        encoding = ord(data.read(1))
1193
        if encoding & (1 << 0):
1194
            value = Variant.from_binary(data)
1195
        else:
1196
            value = None
1197
        if encoding & (1 << 1):
1198
            status = StatusCode.from_binary(data)
1199
        else:
1200
            status = None
1201
        obj = DataValue(value, status)
1202
        obj.Encoding = encoding
1203
        if obj.Encoding & (1 << 2):
1204
            obj.SourceTimestamp = unpack_datetime(data)  # DateTime.from_binary(data)
1205
        if obj.Encoding & (1 << 3):
1206
            obj.ServerTimestamp = unpack_datetime(data)  # DateTime.from_binary(data)
1207
        if obj.Encoding & (1 << 4):
1208
            obj.SourcePicoseconds = uatype_UInt16.unpack(data.read(2))[0]
1209
        if obj.Encoding & (1 << 5):
1210
            obj.ServerPicoseconds = uatype_UInt16.unpack(data.read(2))[0]
1211
        return obj
1212
1213
    def __str__(self):
1214
        s = 'DataValue(Value:{}'.format(self.Value)
1215
        if self.StatusCode is not None:
1216
            s += ', StatusCode:{}'.format(self.StatusCode)
1217
        if self.SourceTimestamp is not None:
1218
            s += ', SourceTimestamp:{}'.format(self.SourceTimestamp)
1219
        if self.ServerTimestamp is not None:
1220
            s += ', ServerTimestamp:{}'.format(self.ServerTimestamp)
1221
        if self.SourcePicoseconds is not None:
1222
            s += ', SourcePicoseconds:{}'.format(self.SourcePicoseconds)
1223
        if self.ServerPicoseconds is not None:
1224
            s += ', ServerPicoseconds:{}'.format(self.ServerPicoseconds)
1225
        s += ')'
1226
        return s
1227
1228
    __repr__ = __str__
1229
1230
1231
def DataType_to_VariantType(int_type):
1232
    """
1233
    Takes a NodeId or int and return a VariantType
1234
    This is only supported if int_type < 63 due to VariantType encoding
1235
    """
1236
    if isinstance(int_type, NodeId):
1237
        int_type = int_type.Identifier
1238
1239
    if int_type <= 25:
1240
        return VariantType(int_type)
1241
    else:
1242
        return VariantTypeCustom(int_type)
1243
1244
1245
def int_to_AccessLevel(level):
1246
    """
1247
    take an int and return a list of AccessLevel Enum
1248
    """
1249
    res = []
1250
    for val in AccessLevel:
1251
        test_bit(level, val.value)
1252
        res.append(val)
1253
    return res
1254
1255
1256
def int_to_WriteMask(level):
1257
    """
1258
    take an int and return a list of WriteMask Enum
1259
    """
1260
    res = []
1261
    for val in WriteMask:
1262
        test_bit(level, val.value)
1263
        res.append(val)
1264
    return res
1265
1266
1267
def int_to_EventNotifier(level):
1268
    """
1269
    take an int and return a list of EventNotifier Enum
1270
    """
1271
    res = []
1272
    for val in EventNotifier:
1273
        test_bit(level, val.value)
1274
        res.append(val)
1275
    return res
1276