Completed
Push — master ( eb5e78...c75f71 )
by Olivier
04:30
created

NodeId.has_null_identifier()   A

Complexity

Conditions 4

Size

Total Lines 6

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 5
CRAP Score 4

Importance

Changes 0
Metric Value
cc 4
dl 0
loc 6
ccs 5
cts 5
cp 1
crap 4
rs 9.2
c 0
b 0
f 0
1
"""
2
implement ua datatypes
3
"""
4 1
import logging
5 1
from enum import Enum, IntEnum, EnumMeta
6 1
from datetime import datetime, timedelta, tzinfo, MAXYEAR
7 1
from calendar import timegm
8 1
import sys
9 1
import os
10 1
import uuid
11 1
import struct
12 1
import re
13 1
import itertools
14 1
if sys.version_info.major > 2:
15
    unicode = str
16 1
17 1
from opcua.ua import status_codes
18 1
from opcua.common.uaerrors import UaError
19 1
from opcua.common.uaerrors import UaStatusCodeError
20
from opcua.common.uaerrors import UaStringParsingError
21
22 1
23
logger = logging.getLogger('opcua.uaprotocol')
24
25
26 1
# types that will packed and unpacked directly using struct (string, bytes and datetime are handles as special cases
27
UaTypes = ("Boolean", "SByte", "Byte", "Int8", "UInt8", "Int16", "UInt16", "Int32", "UInt32", "Int64", "UInt64", "Float", "Double")
28
29 1
30 1
EPOCH_AS_FILETIME = 116444736000000000  # January 1, 1970 as MS file time
31 1
HUNDREDS_OF_NANOSECONDS = 10000000
32
FILETIME_EPOCH_AS_DATETIME = datetime(1601, 1, 1)
33
34 1
35
class UTC(tzinfo):
36
37
    """UTC"""
38 1
39
    def utcoffset(self, dt):
40
        return timedelta(0)
41 1
42
    def tzname(self, dt):
43
        return "UTC"
44 1
45 1
    def dst(self, dt):
46
        return timedelta(0)
47
48
49 1
# method copied from David Buxton <[email protected]> sample code
50 1
def datetime_to_win_epoch(dt):
51 1
    if (dt.tzinfo is None) or (dt.tzinfo.utcoffset(dt) is None):
52 1
        dt = dt.replace(tzinfo=UTC())
53 1
    ft = EPOCH_AS_FILETIME + (timegm(dt.timetuple()) * HUNDREDS_OF_NANOSECONDS)
54
    return ft + (dt.microsecond * 10)
55
56 1
57 1
def win_epoch_to_datetime(epch):
58 1
    try:
59
        return FILETIME_EPOCH_AS_DATETIME + timedelta(microseconds=epch // 10)
60
    except OverflowError:
61
        # FILETIMEs after 31 Dec 9999 can't be converted to datetime
62
        logger.warning("datetime overflow: {}".format(epch))
63
        return datetime(MAXYEAR, 12, 31, 23, 59, 59, 999999)
64
65
66 1
#  minimum datetime as in ua spec, used for history
67
DateTimeMinValue = datetime(1606, 1, 1, 12, 0, 0)
68
69 1
70
def build_array_format_py2(prefix, length, fmtchar):
71
    return prefix + str(length) + fmtchar
72
73 1
74 1
def build_array_format_py3(prefix, length, fmtchar):
75
    return prefix + str(length) + chr(fmtchar)
76
77 1
78
if sys.version_info.major < 3:
79
    build_array_format = build_array_format_py2
80 1
else:
81
    build_array_format = build_array_format_py3
82
83 1
84 1
def pack_uatype_array_primitive(st, value, length):
85 1
    if length == 1:
86
        return b'\x01\x00\x00\x00' + st.pack(value[0])
87 1
    else:
88
        return struct.pack(build_array_format("<i", length, st.format[1]), length, *value)
89
90 1
91 1
def pack_uatype_array(uatype, value):
92
    if value is None:
93 1
        return b'\xff\xff\xff\xff'
94 1
    length = len(value)
95 1
    if length == 0:
96 1
        return b'\x00\x00\x00\x00'
97 1
    if uatype in uatype2struct:
98 1
        return pack_uatype_array_primitive(uatype2struct[uatype], value, length)
99 1
    b = []
100 1
    b.append(uatype_Int32.pack(length))
101 1
    for val in value:
102 1
        b.append(pack_uatype(uatype, val))
103
    return b"".join(b)
104
105 1
106 1
def pack_uatype(uatype, value):
107 1
    if uatype in uatype2struct:
108
        if value is None:
109 1
            value = 0
110 1
        return uatype2struct[uatype].pack(value)
111 1
    elif uatype == "Null":
112 1
        return b''
113 1
    elif uatype == "String":
114 1
        return pack_string(value)
115 1
    elif uatype in ("CharArray", "ByteString", "Custom"):
116 1
        return pack_bytes(value)
117 1
    elif uatype == "DateTime":
118 1
        return pack_datetime(value)
119
    elif uatype == "LocalizedText":
120
        if isinstance(value, LocalizedText):
121
            return value.to_binary()
122 1
        else:
123 1
            return LocalizedText(value).to_binary()
124
    elif uatype == "ExtensionObject":
125 1
        # dependency loop: classes in uaprotocol_auto use Variant defined in this file,
126
        # but Variant can contain any object from uaprotocol_auto as ExtensionObject.
127 1
        # Using local import to avoid import loop
128 1
        from opcua.ua.uaprotocol_auto import extensionobject_to_binary
129 1
        return extensionobject_to_binary(value)
130 1
    else:
131 1
        return value.to_binary()
132 1
133 1
uatype_Int8 = struct.Struct("<b")
134 1
uatype_SByte = uatype_Int8
135 1
uatype_Int16 = struct.Struct("<h")
136 1
uatype_Int32 = struct.Struct("<i")
137 1
uatype_Int64 = struct.Struct("<q")
138 1
uatype_UInt8 = struct.Struct("<B")
139 1
uatype_Char = uatype_UInt8
140 1
uatype_Byte = uatype_UInt8
141
uatype_UInt16 = struct.Struct("<H")
142 1
uatype_UInt32 = struct.Struct("<I")
143
uatype_UInt64 = struct.Struct("<Q")
144
uatype_Boolean = struct.Struct("<?")
145
uatype_Double = struct.Struct("<d")
146
uatype_Float = struct.Struct("<f")
147
148
uatype2struct = {
149
    "Int8": uatype_Int8,
150
    "SByte": uatype_SByte,
151
    "Int16": uatype_Int16,
152
    "Int32": uatype_Int32,
153
    "Int64": uatype_Int64,
154
    "UInt8": uatype_UInt8,
155
    "Char": uatype_Char,
156
    "Byte": uatype_Byte,
157
    "UInt16": uatype_UInt16,
158
    "UInt32": uatype_UInt32,
159
    "UInt64": uatype_UInt64,
160 1
    "Boolean": uatype_Boolean,
161 1
    "Double": uatype_Double,
162 1
    "Float": uatype_Float,
163 1
}
164 1
165 1
166 1
def unpack_uatype(uatype, data):
167 1
    if uatype in uatype2struct:
168 1
        st = uatype2struct[uatype]
169 1
        return st.unpack(data.read(st.size))[0]
170 1
    elif uatype == "String":
171
        return unpack_string(data)
172
    elif uatype in ("CharArray", "ByteString", "Custom"):
173
        return unpack_bytes(data)
174 1
    elif uatype == "DateTime":
175 1
        return unpack_datetime(data)
176
    elif uatype == "ExtensionObject":
177 1
        # dependency loop: classes in uaprotocol_auto use Variant defined in this file,
178 1
        # but Variant can contain any object from uaprotocol_auto as ExtensionObject.
179 1
        # Using local import to avoid import loop
180 1
        from opcua.ua.uaprotocol_auto import extensionobject_from_binary
181 1
        return extensionobject_from_binary(data)
182
    else:
183
        glbs = globals()
184
        if uatype in glbs:
185 1
            klass = glbs[uatype]
186 1
            if hasattr(klass, 'from_binary'):
187 1
                return klass.from_binary(data)
188
        raise UaError("can not unpack unknown uatype %s" % uatype)
189 1
190 1
191 1
def unpack_uatype_array(uatype, data):
192 1
    length = uatype_Int32.unpack(data.read(4))[0]
193 1
    if length == -1:
194 1
        return None
195
    elif length == 0:
196 1
        return []
197 1
    elif uatype in uatype2struct:
198
        st = uatype2struct[uatype]
199 1
        if length == 1:
200 1
            return list(st.unpack(data.read(st.size)))
201 1
        else:
202 1
            arrst = struct.Struct(build_array_format("<", length, st.format[1]))
203
            return list(arrst.unpack(data.read(arrst.size)))
204
    else:
205 1
        result = []
206 1
        for _ in range(0, length):
207 1
            result.append(unpack_uatype(uatype, data))
208
        return result
209
210 1
211 1
def pack_datetime(value):
212 1
    epch = datetime_to_win_epoch(value)
213
    return uatype_Int64.pack(epch)
214
215 1
216 1
def unpack_datetime(data):
217 1
    epch = uatype_Int64.unpack(data.read(8))[0]
218 1
    return win_epoch_to_datetime(epch)
219 1
220 1
221 1
def pack_string(string):
222
    if string is None:
223 1
        return uatype_Int32.pack(-1)
224
    if isinstance(string, unicode):
225
        string = string.encode('utf-8')
226 1
    length = len(string)
227 1
    return uatype_Int32.pack(length) + string
228 1
229 1
pack_bytes = pack_string
230 1
231
232
def unpack_bytes(data):
233 1
    length = uatype_Int32.unpack(data.read(4))[0]
234 1
    if length == -1:
235 1
        return None
236 1
    return data.read(length)
237 1
238
239
def py3_unpack_string(data):
240 1
    b = unpack_bytes(data)
241
    if b is None:
242
        return b
243 1
    return b.decode("utf-8")
244
245
246 1
if sys.version_info.major < 3:
247 1
    unpack_string = unpack_bytes
248 1
else:
249
    unpack_string = py3_unpack_string
250
251 1
252 1
def test_bit(data, offset):
253 1
    mask = 1 << offset
254
    return data & mask
255
256 1
257 1
def set_bit(data, offset):
258 1
    mask = 1 << offset
259
    return data | mask
260
261 1
262
def unset_bit(data, offset):
263
    mask = 1 << offset
264
    return data & ~mask
265
266
267 1
class _FrozenClass(object):
268
269 1
    """
270 1
    make it impossible to add members to a class.
271
    This is a hack since I found out that most bugs are due to misspelling a variable in protocol
272 1
    """
273
    _freeze = False
274 1
275
    def __setattr__(self, key, value):
276
        if self._freeze and not hasattr(self, key):
277
            raise TypeError("Error adding member '{}' to class '{}', class is frozen, members are {}".format(
278
                key, self.__class__.__name__, self.__dict__.keys()))
279
        object.__setattr__(self, key, value)
280
281 1
if "PYOPCUA_NO_TYPO_CHECK" in os.environ:
282
    # typo check is cpu consuming, but it will make debug easy.
283
    # if typo check is not need (in production), please set env PYOPCUA_NO_TYPO_CHECK.
284 1
    # this will make all uatype class inherit from object intead of _FrozenClass
285
    # and skip the typo check.
286
    FrozenClass = object
287
else:
288
    FrozenClass = _FrozenClass
289
290 1
291 1
class ValueRank(IntEnum):
292 1
    """
293 1
    Defines dimensions of a variable.
294 1
    This enum does not support all cases since ValueRank support any n>0
295
    but since it is an IntEnum it can be replace by a normal int
296 1
    """
297 1
    ScalarOrOneDimension = -3
298 1
    Any = -2
299
    Scalar = -1
300
    OneOrMoreDimensions = 0
301 1
    OneDimension = 1
302
    # the next names are not in spec but so common we express them here
303
    TwoDimensions = 2
304 1
    ThreeDimensions = 3
305 1
    FourDimensions = 4
306 1
307 1
308 1
class _MaskEnum(IntEnum):
309
310
    @classmethod
311 1
    def parse_bitfield(cls, the_int):
312
        """ Take an integer and interpret it as a set of enum values. """
313
        assert isinstance(the_int, int)
314
315 1
        return {cls(b) for b in cls._bits(the_int)}
316 1
317 1
    @classmethod
318 1
    def to_bitfield(cls, collection):
319 1
        """ Takes some enum values and creates an integer from them. """
320
        # make sure all elements are of the correct type (use itertools.tee in case we get passed an
321
        # iterator)
322 1
        iter1, iter2 = itertools.tee(iter(collection))
323
        assert all(isinstance(x, cls) for x in iter1)
324 1
325 1
        return sum(x.mask for x in iter2)
326 1
327
    @property
328 1
    def mask(self):
329 1
        return 1 << self.value
330
331 1
    @staticmethod
332
    def _bits(n):
333
        """ Iterate over the bits in n.
334 1
335
            e.g. bits(44) yields at 2, 3, 5
336 1
        """
337 1
        assert n >= 0 # avoid infinite recursion
338 1
339
        pos = 0
340 1
        while n:
341 1
            if n & 0x1:
342
                yield pos
343
            n = n // 2
344 1
            pos += 1
345
346
class AccessLevel(_MaskEnum):
347
    """
348
    Bit index to indicate what the access level is.
349
350
    Spec Part 3, appears multiple times, e.g. paragraph 5.6.2 Variable NodeClass
351
    """
352
    CurrentRead    = 0
353
    CurrentWrite   = 1
354
    HistoryRead    = 2
355 1
    HistoryWrite   = 3
356 1
    SemanticChange = 4
357 1
    StatusWrite    = 5
358 1
    TimestampWrite = 6
359
360 1
class WriteMask(_MaskEnum):
361 1
    """
362
    Bit index to indicate which attribute of a node is writable
363 1
364
    Spec Part 3, Paragraph 5.2.7 WriteMask
365 1
    """
366 1
    AccessLevel             = 0
367 1
    ArrayDimensions         = 1
368
    BrowseName              = 2
369 1
    ContainsNoLoops         = 3
370
    DataType                = 4
371
    Description             = 5
372
    DisplayName             = 6
373
    EventNotifier           = 7
374 1
    Executable              = 8
375 1
    Historizing             = 9
376
    InverseName             = 10
377 1
    IsAbstract              = 11
378
    MinimumSamplingInterval = 12
379
    NodeClass               = 13
380
    NodeId                  = 14
381 1
    Symmetric               = 15
382 1
    UserAccessLevel         = 16
383 1
    UserExecutable          = 17
384
    UserWriteMask           = 18
385 1
    ValueRank               = 19
386
    WriteMask               = 20
387 1
    ValueForVariableType    = 21
388
389 1
390
class EventNotifier(_MaskEnum):
391
    """
392 1
    Bit index to indicate how a node can be used for events.
393
394
    Spec Part 3, appears multiple times, e.g. Paragraph 5.4 View NodeClass
395
    """
396 1
    SubscribeToEvents = 0
397 1
    # Reserved        = 1
398 1
    HistoryRead       = 2
399 1
    HistoryWrite      = 3
400 1
401 1
402 1
class Guid(FrozenClass):
403
404
    def __init__(self):
405 1
        self.uuid = uuid.uuid4()
406
        self._freeze = True
407
408
    def to_binary(self):
409
        return self.uuid.bytes
410
411
    def __hash__(self):
412
        return hash(self.uuid.bytes)
413
414
    @staticmethod
415
    def from_binary(data):
416
        g = Guid()
417
        g.uuid = uuid.UUID(bytes=data.read(16))
418
        return g
419
420
    def __eq__(self, other):
421
        return isinstance(other, Guid) and self.uuid == other.uuid
422
423
424
class StatusCode(FrozenClass):
425
426 1
    """
427 1
    :ivar value:
428 1
    :vartype value: int
429 1
    :ivar name:
430 1
    :vartype name: string
431 1
    :ivar doc:
432 1
    :vartype doc: string
433 1
    """
434 1
435 1
    def __init__(self, value=0):
436 1
        if isinstance(value, str):
437 1
            self.name = value
438 1
            self.value = getattr(status_codes.StatusCodes, value)
439 1
        else:
440 1
            self.value = value
441 1
            self.name, self.doc = status_codes.get_name_and_doc(value)
442 1
        self._freeze = True
443 1
444
    def to_binary(self):
445
        return uatype_UInt32.pack(self.value)
446
447
    @staticmethod
448
    def from_binary(data):
449 1
        val = uatype_UInt32.unpack(data.read(4))[0]
450 1
        sc = StatusCode(val)
451 1
        return sc
452
453 1
    def check(self):
454
        """
455 1
        Raises an exception if the status code is anything else than 0 (good).
456 1
457
        Use the is_good() method if you do not want an exception.
458 1
        """
459 1
        if self.value != 0:
460
            raise UaStatusCodeError(self.value)
461 1
462 1
    def is_good(self):
463
        """
464 1
        return True if status is Good.
465 1
        """
466 1
        if self.value == 0:
467 1
            return True
468
        return False
469 1
470 1
    def __str__(self):
471 1
        return 'StatusCode({})'.format(self.name)
472 1
    __repr__ = __str__
473 1
474 1
    def __eq__(self, other):
475
        return self.value == other.value
476 1
477
    def __ne__(self, other):
478 1
        return not self.__eq__(other)
479 1
480 1
481 1
class NodeIdType(Enum):
482
    TwoByte = 0
483 1
    FourByte = 1
484
    Numeric = 2
485 1
    String = 3
486 1
    Guid = 4
487 1
    ByteString = 5
488 1
489 1
490 1
class NodeId(FrozenClass):
491 1
492 1
    """
493 1
    NodeId Object
494 1
495 1
    Args:
496 1
        identifier: The identifier might be an int, a string, bytes or a Guid
497 1
        namespaceidx(int): The index of the namespace
498 1
        nodeidtype(NodeIdType): The type of the nodeid if it cannor be guess or you want something special like twobyte nodeid or fourbytenodeid
499 1
500 1
501 1
    :ivar Identifier:
502 1
    :vartype Identifier: NodeId
503 1
    :ivar NamespaceIndex:
504 1
    :vartype NamespaceIndex: Int
505 1
    :ivar NamespaceUri:
506
    :vartype NamespaceUri: String
507
    :ivar ServerIndex:
508 1
    :vartype ServerIndex: Int
509
    """
510
511 1
    def __init__(self, identifier=None, namespaceidx=0, nodeidtype=None):
512 1
        self.Identifier = identifier
513
        self.NamespaceIndex = namespaceidx
514
        self.NodeIdType = nodeidtype
515 1
        self.NamespaceUri = ""
516 1
        self.ServerIndex = 0
517 1
        self._freeze = True
518 1
        if not isinstance(self.NamespaceIndex, int):
519 1
            raise UaError("NamespaceIndex must be an int")
520 1
        if self.Identifier is None:
521
            self.Identifier = 0
522 1
            self.NodeIdType = NodeIdType.TwoByte
523 1
            return
524 1
        if self.NodeIdType is None:
525 1
            if isinstance(self.Identifier, int):
526 1
                self.NodeIdType = NodeIdType.Numeric
527 1
            elif isinstance(self.Identifier, str):
528 1
                self.NodeIdType = NodeIdType.String
529 1
            elif isinstance(self.Identifier, bytes):
530 1
                self.NodeIdType = NodeIdType.ByteString
531 1
            else:
532 1
                raise UaError("NodeId: Could not guess type of NodeId, set NodeIdType")
533 1
534 1
    def __key(self):
535
        if self.NodeIdType in (NodeIdType.TwoByte, NodeIdType.FourByte, NodeIdType.Numeric):  # twobyte, fourbyte and numeric may represent the same node
536
            return self.NamespaceIndex, self.Identifier
537
        else:
538
            return self.NodeIdType, self.NamespaceIndex, self.Identifier
539 1
540 1
    def __eq__(self, node):
541
        return isinstance(node, NodeId) and self.__key() == node.__key()
542 1
543
    def __ne__(self, other):
544 1
        return not self.__eq__(other)
545
546 1
    def __hash__(self):
547 1
        return hash(self.__key())
548 1
549
    def is_null(self):
550 1
        if self.NamespaceIndex != 0:
551 1
            return False
552 1
        return self.has_null_identifier()
553 1
554 1
    def has_null_identifier(self):
555 1
        if not self.Identifier:
556 1
            return True
557 1
        if self.NodeIdType == NodeIdType.Guid and re.match(b'0.', self.Identifier):
558 1
            return True
559
        return False
560 1
561 1
    @staticmethod
562
    def from_string(string):
563
        try:
564 1
            return NodeId._from_string(string)
565
        except ValueError as ex:
566
            raise UaStringParsingError("Error parsing string {}".format(string), ex)
567
568 1
    @staticmethod
569
    def _from_string(string):
570 1
        l = string.split(";")
571 1
        identifier = None
572 1
        namespace = 0
573
        ntype = None
574 1
        srv = None
575 1
        nsu = None
576 1
        for el in l:
577 1
            if not el:
578 1
                continue
579 1
            k, v = el.split("=", 1)
580 1
            k = k.strip()
581 1
            v = v.strip()
582 1
            if k == "ns":
583 1
                namespace = int(v)
584 1
            elif k == "i":
585 1
                ntype = NodeIdType.Numeric
586 1
                identifier = int(v)
587 1
            elif k == "s":
588 1
                ntype = NodeIdType.String
589
                identifier = v
590
            elif k == "g":
591
                ntype = NodeIdType.Guid
592 1
                identifier = v
593
            elif k == "b":
594 1
                ntype = NodeIdType.ByteString
595
                identifier = v
596
            elif k == "srv":
597 1
                srv = v
598
            elif k == "nsu":
599
                nsu = v
600 1
        if identifier is None:
601
            raise UaStringParsingError("Could not find identifier in string: " + string)
602 1
        nodeid = NodeId(identifier, namespace, ntype)
603 1
        nodeid.NamespaceUri = nsu
604
        nodeid.ServerIndex = srv
605
        return nodeid
606 1
607
    def to_string(self):
608 1
        string = ""
609 1
        if self.NamespaceIndex != 0:
610
            string += "ns={};".format(self.NamespaceIndex)
611
        ntype = None
612 1
        if self.NodeIdType == NodeIdType.Numeric:
613
            ntype = "i"
614 1
        elif self.NodeIdType == NodeIdType.String:
615 1
            ntype = "s"
616
        elif self.NodeIdType == NodeIdType.TwoByte:
617
            ntype = "i"
618 1
        elif self.NodeIdType == NodeIdType.FourByte:
619
            ntype = "i"
620 1
        elif self.NodeIdType == NodeIdType.Guid:
621 1
            ntype = "g"
622
        elif self.NodeIdType == NodeIdType.ByteString:
623
            ntype = "b"
624 1
        string += "{}={}".format(ntype, self.Identifier)
625
        if self.ServerIndex:
626 1
            string = "srv=" + str(self.ServerIndex) + string
627 1
        if self.NamespaceUri:
628
            string += "nsu={}".format(self.NamespaceUri)
629
        return string
630 1
631
    def __str__(self):
632 1
        return "{}NodeId({})".format(self.NodeIdType.name, self.to_string())
633 1
    __repr__ = __str__
634
635
    def to_binary(self):
636 1
        if self.NodeIdType == NodeIdType.TwoByte:
637
            return struct.pack("<BB", self.NodeIdType.value, self.Identifier)
638
        elif self.NodeIdType == NodeIdType.FourByte:
639 1
            return struct.pack("<BBH", self.NodeIdType.value, self.NamespaceIndex, self.Identifier)
640
        elif self.NodeIdType == NodeIdType.Numeric:
641
            return struct.pack("<BHI", self.NodeIdType.value, self.NamespaceIndex, self.Identifier)
642
        elif self.NodeIdType == NodeIdType.String:
643
            return struct.pack("<BH", self.NodeIdType.value, self.NamespaceIndex) + \
644
                pack_string(self.Identifier)
645 1
        elif self.NodeIdType == NodeIdType.ByteString:
646 1
            return struct.pack("<BH", self.NodeIdType.value, self.NamespaceIndex) + \
647
                pack_bytes(self.Identifier)
648 1
        else:
649 1
            return struct.pack("<BH", self.NodeIdType.value, self.NamespaceIndex) + \
650 1
                self.Identifier.to_binary()
651
        #FIXME: Missing NNamespaceURI and ServerIndex
652 1
653 1
    @staticmethod
654
    def from_binary(data):
655 1
        nid = NodeId()
656
        encoding = ord(data.read(1))
657 1
        nid.NodeIdType = NodeIdType(encoding & 0b00111111)
658 1
659 1
        if nid.NodeIdType == NodeIdType.TwoByte:
660 1
            nid.Identifier = ord(data.read(1))
661 1
        elif nid.NodeIdType == NodeIdType.FourByte:
662 1
            nid.NamespaceIndex, nid.Identifier = struct.unpack("<BH", data.read(3))
663
        elif nid.NodeIdType == NodeIdType.Numeric:
664 1
            nid.NamespaceIndex, nid.Identifier = struct.unpack("<HI", data.read(6))
665 1
        elif nid.NodeIdType == NodeIdType.String:
666 1
            nid.NamespaceIndex = uatype_UInt16.unpack(data.read(2))[0]
667
            nid.Identifier = unpack_string(data)
668 1
        elif nid.NodeIdType == NodeIdType.ByteString:
669 1
            nid.NamespaceIndex = uatype_UInt16.unpack(data.read(2))[0]
670 1
            nid.Identifier = unpack_bytes(data)
671 1
        elif nid.NodeIdType == NodeIdType.Guid:
672 1
            nid.NamespaceIndex = uatype_UInt16.unpack(data.read(2))[0]
673
            nid.Identifier = Guid.from_binary(data)
674 1
        else:
675
            raise UaError("Unknown NodeId encoding: " + str(nid.NodeIdType))
676 1
677 1
        if test_bit(encoding, 7):
678 1
            nid.NamespaceUri = unpack_string(data)
679 1
        if test_bit(encoding, 6):
680
            nid.ServerIndex = uatype_UInt32.unpack(data.read(4))[0]
681 1
682 1
        return nid
683
684 1
685
class TwoByteNodeId(NodeId):
686
687 1
    def __init__(self, identifier):
688
        NodeId.__init__(self, identifier, 0, NodeIdType.TwoByte)
689
690 1
691
class FourByteNodeId(NodeId):
692
693 1
    def __init__(self, identifier, namespace=0):
694
        NodeId.__init__(self, identifier, namespace, NodeIdType.FourByte)
695
696
697
class NumericNodeId(NodeId):
698
699 1
    def __init__(self, identifier, namespace=0):
700 1
        NodeId.__init__(self, identifier, namespace, NodeIdType.Numeric)
701 1
702 1
703 1
class ByteStringNodeId(NodeId):
704 1
705 1
    def __init__(self, identifier, namespace=0):
706 1
        NodeId.__init__(self, identifier, namespace, NodeIdType.ByteString)
707 1
708
709 1
class GuidNodeId(NodeId):
710 1
711 1
    def __init__(self, identifier, namespace=0):
712
        NodeId.__init__(self, identifier, namespace, NodeIdType.Guid)
713 1
714 1
715 1
class StringNodeId(NodeId):
716 1
717
    def __init__(self, identifier, namespace=0):
718 1
        NodeId.__init__(self, identifier, namespace, NodeIdType.String)
719 1
720 1
721
ExpandedNodeId = NodeId
722 1
723
724 1
class QualifiedName(FrozenClass):
725 1
726 1
    '''
727
    A string qualified with a namespace index.
728 1
    '''
729 1
730 1
    def __init__(self, name=None, namespaceidx=0):
731
        if not isinstance(namespaceidx, int):
732 1
            raise UaError("namespaceidx must be an int")
733
        self.NamespaceIndex = namespaceidx
734
        self.Name = name
735
        self._freeze = True
736 1
737
    def to_string(self):
738
        return "{}:{}".format(self.NamespaceIndex, self.Name)
739
740 1
    @staticmethod
741
    def from_string(string):
742 1
        if ":" in string:
743 1
            try:
744 1
                idx, name = string.split(":", 1)
745 1
                idx = int(idx)
746
            except (TypeError, ValueError) as ex:
747 1
                raise UaStringParsingError("Error parsing string {}".format(string), ex)
748 1
        else:
749
            idx = 0
750
            name = string
751 1
        return QualifiedName(name, idx)
752
753
    def to_binary(self):
754
        packet = []
755
        packet.append(uatype_UInt16.pack(self.NamespaceIndex))
756
        packet.append(pack_string(self.Name))
757
        return b''.join(packet)
758
759
    @staticmethod
760
    def from_binary(data):
761
        obj = QualifiedName()
762
        obj.NamespaceIndex = uatype_UInt16.unpack(data.read(2))[0]
763
        obj.Name = unpack_string(data)
764
        return obj
765 1
766 1
    def __eq__(self, bname):
767 1
        return isinstance(bname, QualifiedName) and self.Name == bname.Name and self.NamespaceIndex == bname.NamespaceIndex
768 1
769 1
    def __ne__(self, other):
770
        return not self.__eq__(other)
771 1
772 1
    def __lt__(self, other):
773 1
        if not isinstance(other, QualifiedName):
774 1
            raise TypeError("Cannot compare QualifiedName and {}".format(other))
775 1
        if self.NamespaceIndex == other.NamespaceIndex:
776 1
            return self.Name < other.Name
777 1
        else:
778 1
            return self.NamespaceIndex < other.NamespaceIndex
779 1
780
    def __str__(self):
781 1
        return 'QualifiedName({}:{})'.format(self.NamespaceIndex, self.Name)
782
783
    __repr__ = __str__
784
785
786
class LocalizedText(FrozenClass):
787
788
    '''
789
    A string qualified with a namespace index.
790 1
    '''
791
792
    def __init__(self, text=None):
793
        self.Encoding = 0
794
        self.Text = text
795
        if isinstance(self.Text, unicode):
796
            self.Text = self.Text.encode('utf-8')
797
        if self.Text:
798 1
            self.Encoding |= (1 << 1)
799
        self.Locale = None 
800
        self._freeze = True
801
802 1
    def to_binary(self):
803
        packet = []
804
        if self.Locale:
805 1 View Code Duplication
            self.Encoding |= (1 << 0)
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
806
        if self.Text:
807
            self.Encoding |= (1 << 1)
808
        packet.append(uatype_UInt8.pack(self.Encoding))
809
        if self.Locale:
810
            packet.append(pack_bytes(self.Locale))
811
        if self.Text:
812
            packet.append(pack_bytes(self.Text))
813
        return b''.join(packet)
814
815
    @staticmethod
816
    def from_binary(data):
817
        obj = LocalizedText()
818
        obj.Encoding = ord(data.read(1))
819
        if obj.Encoding & (1 << 0):
820
            obj.Locale = unpack_bytes(data)
821
        if obj.Encoding & (1 << 1):
822
            obj.Text = unpack_bytes(data)
823
        return obj
824
825
    def to_string(self):
826
        # FIXME: use local
827
        if self.Text is None:
828
            return ""
829
        return self.Text.decode('utf-8')
830
831
    def __str__(self):
832
        return 'LocalizedText(' + 'Encoding:' + str(self.Encoding) + ', ' + \
833
            'Locale:' + str(self.Locale) + ', ' + \
834
            'Text:' + str(self.Text) + ')'
835
    __repr__ = __str__
836
837
    def __eq__(self, other):
838
        if isinstance(other, LocalizedText) and self.Locale == other.Locale and self.Text == other.Text:
839
            return True
840 1
        return False
841 1
842 1
    def __ne__(self, other):
843 1
        return not self.__eq__(other)
844 1
845 1
846 1
class ExtensionObject(FrozenClass):
847 1
848 1
    '''
849 1
850 1
    Any UA object packed as an ExtensionObject
851 1
852 1
853 1
    :ivar TypeId:
854 1
    :vartype TypeId: NodeId
855 1
    :ivar Body:
856 1
    :vartype Body: bytes
857 1
858 1
    '''
859 1
860 1
    def __init__(self):
861 1
        self.TypeId = NodeId()
862 1
        self.Encoding = 0
863 1
        self.Body = b''
864 1
        self._freeze = True
865 1
866
    def to_binary(self):
867
        packet = []
868 1
        if self.Body:
869 1
            self.Encoding |= (1 << 0)
870 1
        packet.append(self.TypeId.to_binary())
871 1
        packet.append(pack_uatype('UInt8', self.Encoding))
872 1
        if self.Body:
873 1
            packet.append(pack_uatype('ByteString', self.Body))
874
        return b''.join(packet)
875 1
876
    @staticmethod
877 1
    def from_binary(data):
878
        obj = ExtensionObject()
879 1
        obj.TypeId = NodeId.from_binary(data)
880 1
        obj.Encoding = unpack_uatype('UInt8', data)
881
        if obj.Encoding & (1 << 0):
882
            obj.Body = unpack_uatype('ByteString', data)
883 1
        return obj
884
885
    @staticmethod
886
    def from_object(obj):
887
        ext = ExtensionObject()
888
        oid = getattr(ObjectIds, "{}_Encoding_DefaultBinary".format(obj.__class__.__name__))
889
        ext.TypeId = FourByteNodeId(oid)
890
        ext.Body = obj.to_binary()
891
        return ext
892
893
    def __str__(self):
894
        return 'ExtensionObject(' + 'TypeId:' + str(self.TypeId) + ', ' + \
895
            'Encoding:' + str(self.Encoding) + ', ' + str(len(self.Body)) + ' bytes)'
896
897 1
    __repr__ = __str__
898 1
899 1
900 1
class VariantType(Enum):
901 1
902 1
    '''
903 1
    The possible types of a variant.
904 1
905 1
    :ivar Null:
906 1
    :ivar Boolean:
907 1
    :ivar SByte:
908 1
    :ivar Byte:
909 1
    :ivar Int16:
910 1
    :ivar UInt16:
911
    :ivar Int32:
912 1
    :ivar UInt32:
913 1
    :ivar Int64:
914 1
    :ivar UInt64:
915 1
    :ivar Float:
916
    :ivar Double:
917 1
    :ivar String:
918 1
    :ivar DateTime:
919
    :ivar Guid:
920 1
    :ivar ByteString:
921 1
    :ivar XmlElement:
922 1
    :ivar NodeId:
923 1
    :ivar ExpandedNodeId:
924 1
    :ivar StatusCode:
925
    :ivar QualifiedName:
926 1
    :ivar LocalizedText:
927 1
    :ivar ExtensionObject:
928 1
    :ivar DataValue:
929 1
    :ivar Variant:
930 1
    :ivar DiagnosticInfo:
931 1
932 1
933 1
934 1
    '''
935 1
    Null = 0
936 1
    Boolean = 1
937 1
    SByte = 2
938 1
    Byte = 3
939 1
    Int16 = 4
940 1
    UInt16 = 5
941
    Int32 = 6
942 1
    UInt32 = 7
943 1
    Int64 = 8
944 1
    UInt64 = 9
945 1
    Float = 10
946 1
    Double = 11
947
    String = 12
948
    DateTime = 13
949
    Guid = 14
950 1
    ByteString = 15
951 1
    XmlElement = 16
952 1
    NodeId = 17
953
    ExpandedNodeId = 18
954 1
    StatusCode = 19
955 1
    QualifiedName = 20
956 1
    LocalizedText = 21
957 1
    ExtensionObject = 22
958 1
    DataValue = 23
959 1
    Variant = 24
960 1
    DiagnosticInfo = 25
961 1
962 1
963 1
class VariantTypeCustom(object):
964 1
    def __init__(self, val):
965
        self.name = "Custom"
966 1
        self.value = val
967 1
        if self.value > 0b00111111:
968
            raise UaError("Cannot create VariantType. VariantType must be {} > x > {}, received {}".format(0b111111, 25, val))
969 1
970
    def __str__(self):
971 1
        return "VariantType.Custom:{}".format(self.value)
972
    __repr__ = __str__
973 1
974 1
    def __eq__(self, other):
975 1
        return self.value == other.value
976 1
977 1
978
class Variant(FrozenClass):
979 1
980 1
    """
981 1
    Create an OPC-UA Variant object.
982 1
    if no argument a Null Variant is created.
983 1
    if not variant type is given, attemps to guess type from python type
984
    if a variant is given as value, the new objects becomes a copy of the argument
985 1
986 1
    :ivar Value:
987 1
    :vartype Value: Any supported type
988 1
    :ivar VariantType:
989
    :vartype VariantType: VariantType
990 1
    """
991
992
    def __init__(self, value=None, varianttype=None, dimensions=None):
993 1
        self.Value = value
994 1
        self.VariantType = varianttype
995 1
        self.Dimensions = dimensions
996 1
        self._freeze = True
997 1
        if isinstance(value, Variant):
998 1
            self.Value = value.Value
999 1
            self.VariantType = value.VariantType
1000 1
        if self.VariantType is None:
1001 1
            self.VariantType = self._guess_type(self.Value)
1002 1
        if self.Dimensions is None and type(self.Value) in (list, tuple):
1003 1
            dims = get_shape(self.Value)
1004 1
            if len(dims) > 1:
1005
                self.Dimensions = dims
1006
1007 1
    def __eq__(self, other):
1008
        if isinstance(other, Variant) and self.VariantType == other.VariantType and self.Value == other.Value:
1009
            return True
1010
        return False
1011
1012 1
    def __ne__(self, other):
1013
        return not self.__eq__(other)
1014
1015
    def _guess_type(self, val):
1016
        if isinstance(val, (list, tuple)):
1017
            error_val = val
1018
        while isinstance(val, (list, tuple)):
1019
            if len(val) == 0:
1020
                raise UaError("could not guess UA type of variable {}".format(error_val))
1021
            val = val[0]
1022
        if val is None:
1023 1
            return VariantType.Null
1024 1
        elif isinstance(val, bool):
1025 1
            return VariantType.Boolean
1026 1
        elif isinstance(val, float):
1027 1
            return VariantType.Double
1028 1
        elif isinstance(val, int):
1029 1
            return VariantType.Int64
1030 1
        elif type(val) in (str, unicode):
1031
            return VariantType.String
1032
        elif isinstance(val, bytes):
1033 1
            return VariantType.ByteString
1034 1
        elif isinstance(val, datetime):
1035 1
            return VariantType.DateTime
1036 1
        else:
1037 1
            if isinstance(val, object):
1038 1
                try:
1039 1
                    return getattr(VariantType, val.__class__.__name__)
1040 1
                except AttributeError:
1041
                    return VariantType.ExtensionObject
1042
            else:
1043 1 View Code Duplication
                raise UaError("Could not guess UA type of {} with type {}, specify UA type".format(val, type(val)))
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1044
1045
    def __str__(self):
1046
        return "Variant(val:{!s},type:{})".format(self.Value, self.VariantType)
1047 1
    __repr__ = __str__
1048
1049
    def to_binary(self):
1050
        b = []
1051
        encoding = self.VariantType.value & 0b111111
1052
        if type(self.Value) in (list, tuple):
1053
            if self.Dimensions is not None:
1054
                encoding = set_bit(encoding, 6)
1055 1
            encoding = set_bit(encoding, 7)
1056
            b.append(uatype_UInt8.pack(encoding))
1057
            b.append(pack_uatype_array(self.VariantType.name, flatten(self.Value)))
1058 1
            if self.Dimensions is not None:
1059
                b.append(pack_uatype_array("Int32", self.Dimensions))
1060
        else:
1061
            b.append(uatype_UInt8.pack(encoding))
1062 1
            b.append(pack_uatype(self.VariantType.name, self.Value))
1063
1064
        return b"".join(b)
1065 1
1066
    @staticmethod
1067
    def from_binary(data):
1068 1
        dimensions = None
1069
        encoding = ord(data.read(1))
1070
        int_type = encoding & 0b00111111
1071 1
        vtype = DataType_to_VariantType(int_type)
1072
        if vtype == VariantType.Null:
1073
            return Variant(None, vtype, encoding)
1074
        if test_bit(encoding, 7):
1075
            value = unpack_uatype_array(vtype.name, data)
1076
        else:
1077
            value = unpack_uatype(vtype.name, data)
1078
        if test_bit(encoding, 6):
1079
            dimensions = unpack_uatype_array("Int32", data)
1080
            value = reshape(value, dimensions)
1081
1082
        return Variant(value, vtype, dimensions)
1083
1084
1085
def reshape(flat, dims):
1086
    subdims = dims[1:]
1087
    subsize = 1
1088
    for i in subdims:
1089
        if i == 0:
1090
            i = 1
1091
        subsize *= i
1092 1
    while dims[0] * subsize > len(flat):
1093 1
        flat.append([])
1094 1
    if not subdims or subdims == [0]:
1095 1
        return flat
1096 1
    return [reshape(flat[i: i + subsize], subdims) for i in range(0, len(flat), subsize)]
1097 1
1098 1
1099
def _split_list(l, n):
1100 1
    n = max(1, n)
1101 1
    return [l[i:i + n] for i in range(0, len(l), n)]
1102 1
1103 1
1104 1
def flatten_and_get_shape(mylist):
1105 1
    dims = []
1106
    dims.append(len(mylist))
1107 1
    while isinstance(mylist[0], (list, tuple)):
1108 1
        dims.append(len(mylist[0]))
1109 1
        mylist = [item for sublist in mylist for item in sublist]
1110 1
        if len(mylist) == 0:
1111 1
            break
1112 1
    return mylist, dims
1113 1
1114 1
1115 1
def flatten(mylist):
1116 1
    if len(mylist) == 0:
1117 1
        return mylist
1118
    while isinstance(mylist[0], (list, tuple)):
1119 1
        mylist = [item for sublist in mylist for item in sublist]
1120
        if len(mylist) == 0:
1121 1
            break
1122 1
    return mylist
1123 1
1124 1
1125 1
def get_shape(mylist):
1126 1
    dims = []
1127 1
    while isinstance(mylist, (list, tuple)):
1128 1
        dims.append(len(mylist))
1129 1
        if len(mylist) == 0:
1130 1
            break
1131
        mylist = mylist[0]
1132 1
    return dims
1133
1134 1
1135
class XmlElement(FrozenClass):
1136 1
    '''
1137
    An XML element encoded as an UTF-8 string.
1138 1
    '''
1139 1
    def __init__(self, binary=None):
1140 1
        if binary is not None:
1141
            self._binary_init(binary)
1142
            self._freeze = True
1143 1
            return
1144 1
        self.Value = []
1145
        self._freeze = True
1146
1147 1
    def to_binary(self):
1148 1
        return pack_string(self.Value)
1149 1
1150 1
    @staticmethod
1151 1
    def from_binary(data):
1152 1
        return XmlElement(data)
1153 1
1154
    def _binary_init(self, data):
1155 1
        self.Value = unpack_string(data)
1156
1157 1
    def __str__(self):
1158
        return 'XmlElement(Value:' + str(self.Value) + ')'
1159 1
1160
    __repr__ = __str__
1161
1162
1163
class DataValue(FrozenClass):
1164
1165
    '''
1166
    A value with an associated timestamp, and quality.
1167
    Automatically generated from xml , copied and modified here to fix errors in xml spec
1168
1169
    :ivar Value:
1170
    :vartype Value: Variant
1171
    :ivar StatusCode:
1172
    :vartype StatusCode: StatusCode
1173
    :ivar SourceTimestamp:
1174 1
    :vartype SourceTimestamp: datetime
1175
    :ivar SourcePicoSeconds:
1176
    :vartype SourcePicoSeconds: int
1177
    :ivar ServerTimestamp:
1178
    :vartype ServerTimestamp: datetime
1179
    :ivar ServerPicoseconds:
1180
    :vartype ServerPicoseconds: int
1181
1182
    '''
1183
1184
    def __init__(self, variant=None, status=None):
1185
        self.Encoding = 0
1186
        if not isinstance(variant, Variant):
1187
            variant = Variant(variant)
1188
        self.Value = variant
1189
        if status is None:
1190
            self.StatusCode = StatusCode()
1191
        else:
1192
            self.StatusCode = status
1193
        self.SourceTimestamp = None  # DateTime()
1194
        self.SourcePicoseconds = None
1195
        self.ServerTimestamp = None  # DateTime()
1196
        self.ServerPicoseconds = None
1197
        self._freeze = True
1198
1199
    def to_binary(self):
1200
        packet = []
1201
        if self.Value:
1202
            self.Encoding |= (1 << 0)
1203
        if self.StatusCode:
1204
            self.Encoding |= (1 << 1)
1205
        if self.SourceTimestamp:
1206
            self.Encoding |= (1 << 2)
1207
        if self.ServerTimestamp:
1208
            self.Encoding |= (1 << 3)
1209
        if self.SourcePicoseconds:
1210
            self.Encoding |= (1 << 4)
1211
        if self.ServerPicoseconds:
1212
            self.Encoding |= (1 << 5)
1213
        packet.append(uatype_UInt8.pack(self.Encoding))
1214
        if self.Value:
1215
            packet.append(self.Value.to_binary())
1216
        if self.StatusCode:
1217
            packet.append(self.StatusCode.to_binary())
1218
        if self.SourceTimestamp:
1219
            packet.append(pack_datetime(self.SourceTimestamp))  # self.SourceTimestamp.to_binary())
1220
        if self.ServerTimestamp:
1221
            packet.append(pack_datetime(self.ServerTimestamp))  # self.ServerTimestamp.to_binary())
1222
        if self.SourcePicoseconds:
1223
            packet.append(uatype_UInt16.pack(self.SourcePicoseconds))
1224
        if self.ServerPicoseconds:
1225
            packet.append(uatype_UInt16.pack(self.ServerPicoseconds))
1226
        return b''.join(packet)
1227
1228
    @staticmethod
1229
    def from_binary(data):
1230
        encoding = ord(data.read(1))
1231
        if encoding & (1 << 0):
1232
            value = Variant.from_binary(data)
1233
        else:
1234
            value = None
1235
        if encoding & (1 << 1):
1236
            status = StatusCode.from_binary(data)
1237
        else:
1238
            status = None
1239
        obj = DataValue(value, status)
1240
        obj.Encoding = encoding
1241
        if obj.Encoding & (1 << 2):
1242
            obj.SourceTimestamp = unpack_datetime(data)  # DateTime.from_binary(data)
1243
        if obj.Encoding & (1 << 3):
1244
            obj.ServerTimestamp = unpack_datetime(data)  # DateTime.from_binary(data)
1245
        if obj.Encoding & (1 << 4):
1246
            obj.SourcePicoseconds = uatype_UInt16.unpack(data.read(2))[0]
1247
        if obj.Encoding & (1 << 5):
1248
            obj.ServerPicoseconds = uatype_UInt16.unpack(data.read(2))[0]
1249
        return obj
1250
1251
    def __str__(self):
1252
        s = 'DataValue(Value:{}'.format(self.Value)
1253
        if self.StatusCode is not None:
1254
            s += ', StatusCode:{}'.format(self.StatusCode)
1255
        if self.SourceTimestamp is not None:
1256
            s += ', SourceTimestamp:{}'.format(self.SourceTimestamp)
1257
        if self.ServerTimestamp is not None:
1258
            s += ', ServerTimestamp:{}'.format(self.ServerTimestamp)
1259
        if self.SourcePicoseconds is not None:
1260
            s += ', SourcePicoseconds:{}'.format(self.SourcePicoseconds)
1261
        if self.ServerPicoseconds is not None:
1262
            s += ', ServerPicoseconds:{}'.format(self.ServerPicoseconds)
1263
        s += ')'
1264
        return s
1265
1266
    __repr__ = __str__
1267
1268
1269
def DataType_to_VariantType(int_type):
1270
    """
1271
    Takes a NodeId or int and return a VariantType
1272
    This is only supported if int_type < 63 due to VariantType encoding
1273
    """
1274
    if isinstance(int_type, NodeId):
1275
        int_type = int_type.Identifier
1276
1277
    if int_type <= 25:
1278
        return VariantType(int_type)
1279
    else:
1280
        return VariantTypeCustom(int_type)
1281