Completed
Push — master ( 129344...8d583b )
by Olivier
05:38 queued 03:18
created

asyncua.ua.uatypes   F

Complexity

Total Complexity 213

Size/Duplication

Total Lines 1046
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 664
dl 0
loc 1046
rs 1.936
c 0
b 0
f 0
wmc 213

12 Functions

Rating   Name   Duplication   Size   Complexity  
A get_win_epoch() 0 2 1
A win_epoch_to_datetime() 0 7 2
A datetime_to_win_epoch() 0 6 3
F get_default_value() 0 42 19
A flatten() 0 10 5
A get_shape() 0 8 3
A datatype_to_varianttype() 0 14 3
A get_extensionobject_class_type() 0 8 2
A register_enum() 0 9 1
A register_extension_object() 0 13 2
A flatten_and_get_shape() 0 8 3
A _split_list() 0 3 1

64 Methods

Rating   Name   Duplication   Size   Complexity  
A _FrozenClass.__setattr__() 0 4 3
A UTC.utcoffset() 0 2 1
A _MaskEnum.parse_bitfield() 0 6 2
A _MaskEnum._bits() 0 14 4
A StatusCode.__str__() 0 2 1
A StatusCode.check() 0 8 2
A StatusCode.is_good() 0 9 2
A StatusCode.name() 0 4 1
A _MaskEnum.mask() 0 3 1
A UTC.tzname() 0 2 1
A StatusCode.__ne__() 0 2 1
A UTC.dst() 0 2 1
A _MaskEnum.to_bitfield() 0 9 2
A StatusCode.__init__() 0 6 2
A StatusCode.doc() 0 4 1
A StatusCode.__eq__() 0 2 1
B NodeId.__init__() 0 28 8
A StringNodeId.__init__() 0 2 1
A NodeId.__hash__() 0 2 1
A NodeId.from_string() 0 6 2
C NodeId.to_string() 0 23 10
C NodeId._from_string() 0 38 11
A ExtensionObject.__bool__() 0 2 1
A QualifiedName.__eq__() 0 2 1
A ExtensionObject.__str__() 0 3 2
A LocalizedText.__eq__() 0 4 4
A NodeId.check_identifier_type_compatibility() 0 14 4
A QualifiedName.__str__() 0 2 1
A VariantTypeCustom.__eq__() 0 2 1
A GuidNodeId.__init__() 0 2 1
A LocalizedText.Text() 0 3 3
A QualifiedName.from_string() 0 12 3
A NumericNodeId.__init__() 0 2 1
A NodeId.to_binary() 0 3 1
A NodeId.is_null() 0 4 2
A VariantTypeCustom.__str__() 0 2 1
A LocalizedText.Locale() 0 3 3
A LocalizedText.from_string() 0 9 4
A NodeId.__ne__() 0 2 1
A LocalizedText.__str__() 0 2 1
A QualifiedName.__init__() 0 6 2
A QualifiedName.__lt__() 0 7 3
A VariantTypeCustom.__init__() 0 5 2
A LocalizedText.__init__() 0 9 3
A NodeId.has_null_identifier() 0 6 4
A ExtensionObject.__init__() 0 5 1
A LocalizedText.to_string() 0 6 3
A NodeId.__str__() 0 2 1
A ByteStringNodeId.__init__() 0 2 1
A TwoByteNodeId.__init__() 0 2 1
A NodeId.__repr__() 0 2 1
A FourByteNodeId.__init__() 0 2 1
A LocalizedText.__ne__() 0 2 1
A QualifiedName.to_string() 0 2 1
A NodeId.__lt__() 0 4 2
A NodeId.__eq__() 0 2 1
A QualifiedName.__ne__() 0 2 1
B DataValue.__str__() 0 13 6
A Variant.__ne__() 0 2 1
A Variant.__eq__() 0 4 4
D Variant.__init__() 0 25 13
A Variant.__str__() 0 2 1
A DataValue.__init__() 0 14 3
F Variant._guess_type() 0 33 15

How to fix   Complexity   

Complexity

Complex classes like asyncua.ua.uatypes often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
implement ua datatypes
3
"""
4
5
import logging
6
from enum import Enum, IntEnum
7
from calendar import timegm
8
import os
9
import uuid
10
import re
11
import itertools
12
from datetime import datetime, timedelta, MAXYEAR, tzinfo
13
14
from asyncua.ua import status_codes
15
from .uaerrors import UaError, UaStatusCodeError, UaStringParsingError
16
17
logger = logging.getLogger(__name__)
18
19
EPOCH_AS_FILETIME = 116444736000000000  # January 1, 1970 as MS file time
20
HUNDREDS_OF_NANOSECONDS = 10000000
21
FILETIME_EPOCH_AS_DATETIME = datetime(1601, 1, 1)
22
23
24
class UTC(tzinfo):
25
    """
26
    UTC
27
    """
28
    def utcoffset(self, dt):
29
        return timedelta(0)
30
31
    def tzname(self, dt):
32
        return "UTC"
33
34
    def dst(self, dt):
35
        return timedelta(0)
36
37
38
def datetime_to_win_epoch(dt: datetime):
39
    """method copied from David Buxton <[email protected]> sample code"""
40
    if (dt.tzinfo is None) or (dt.tzinfo.utcoffset(dt) is None):
41
        dt = dt.replace(tzinfo=UTC())
42
    ft = EPOCH_AS_FILETIME + (timegm(dt.timetuple()) * HUNDREDS_OF_NANOSECONDS)
43
    return ft + (dt.microsecond * 10)
44
45
46
def get_win_epoch():
47
    return win_epoch_to_datetime(0)
48
49
50
def win_epoch_to_datetime(epch):
51
    try:
52
        return FILETIME_EPOCH_AS_DATETIME + timedelta(microseconds=epch // 10)
53
    except OverflowError:
54
        # FILETIMEs after 31 Dec 9999 can't be converted to datetime
55
        logger.warning('datetime overflow: %s', epch)
56
        return datetime(MAXYEAR, 12, 31, 23, 59, 59, 999999)
57
58
59
class _FrozenClass(object):
60
    """
61
    Make it impossible to add members to a class.
62
    Not pythonic at all but we found out it prevents many many
63
    bugs in use of protocol structures
64
    """
65
    _freeze = False
66
67
    def __setattr__(self, key, value):
68
        if self._freeze and not hasattr(self, key):
69
            raise TypeError(f"Error adding member '{key}' to class '{self.__class__.__name__}'," f" class is frozen, members are {self.__dict__.keys()}")
70
        object.__setattr__(self, key, value)
71
72
73
if "PYOPCUA_TYPO_CHECK" in os.environ:
74
    # typo check is cpu consuming, but it will make debug easy.
75
    # set PYOPCUA_TYPO_CHECK will make all uatype classes inherit from _FrozenClass
76
    logger.warning('uaypes typo checking is active')
77
    FrozenClass = _FrozenClass
78
else:
79
    FrozenClass = object
80
81
82
class ValueRank(IntEnum):
83
    """
84
    Defines dimensions of a variable.
85
    This enum does not support all cases since ValueRank support any n>0
86
    but since it is an IntEnum it can be replace by a normal int
87
    """
88
    ScalarOrOneDimension = -3
89
    Any = -2
90
    Scalar = -1
91
    OneOrMoreDimensions = 0
92
    OneDimension = 1
93
    # the next names are not in spec but so common we express them here
94
    TwoDimensions = 2
95
    ThreeDimensions = 3
96
    FourDimensions = 4
97
98
99
class _MaskEnum(IntEnum):
100
    @classmethod
101
    def parse_bitfield(cls, the_int):
102
        """ Take an integer and interpret it as a set of enum values. """
103
        if not isinstance(the_int, int):
104
            raise ValueError(f"Argument should be an int, we received {the_int} fo type {type(the_int)}")
105
        return {cls(b) for b in cls._bits(the_int)}
106
107
    @classmethod
108
    def to_bitfield(cls, collection):
109
        """ Takes some enum values and creates an integer from them. """
110
        # make sure all elements are of the correct type (use itertools.tee in case we get passed an
111
        # iterator)
112
        iter1, iter2 = itertools.tee(iter(collection))
113
        if not all(isinstance(x, cls) for x in iter1):
114
            raise TypeError(f"All elements have to be of type {cls}")
115
        return sum(x.mask for x in iter2)
116
117
    @property
118
    def mask(self):
119
        return 1 << self.value
120
121
    @staticmethod
122
    def _bits(n):
123
        """ Iterate over the bits in n.
124
125
            e.g. bits(44) yields at 2, 3, 5
126
        """
127
        if not n >= 0:  # avoid infinite recursion
128
            raise ValueError()
129
        pos = 0
130
        while n:
131
            if n & 0x1:
132
                yield pos
133
            n = n // 2
134
            pos += 1
135
136
137
class AccessLevel(_MaskEnum):
138
    """
139
    Bit index to indicate what the access level is.
140
141
    Spec Part 3, appears multiple times, e.g. paragraph 5.6.2 Variable NodeClass
142
    """
143
    CurrentRead = 0
144
    CurrentWrite = 1
145
    HistoryRead = 2
146
    HistoryWrite = 3
147
    SemanticChange = 4
148
    StatusWrite = 5
149
    TimestampWrite = 6
150
151
152
class WriteMask(_MaskEnum):
153
    """
154
    Bit index to indicate which attribute of a node is writable
155
156
    Spec Part 3, Paragraph 5.2.7 WriteMask
157
    """
158
    AccessLevel = 0
159
    ArrayDimensions = 1
160
    BrowseName = 2
161
    ContainsNoLoops = 3
162
    DataType = 4
163
    Description = 5
164
    DisplayName = 6
165
    EventNotifier = 7
166
    Executable = 8
167
    Historizing = 9
168
    InverseName = 10
169
    IsAbstract = 11
170
    MinimumSamplingInterval = 12
171
    NodeClass = 13
172
    NodeId = 14
173
    Symmetric = 15
174
    UserAccessLevel = 16
175
    UserExecutable = 17
176
    UserWriteMask = 18
177
    ValueRank = 19
178
    WriteMask = 20
179
    ValueForVariableType = 21
180
181
182
class EventNotifier(_MaskEnum):
183
    """
184
    Bit index to indicate how a node can be used for events.
185
186
    Spec Part 3, appears multiple times, e.g. Paragraph 5.4 View NodeClass
187
    """
188
    SubscribeToEvents = 0
189
    # Reserved        = 1
190
    HistoryRead = 2
191
    HistoryWrite = 3
192
193
194
class StatusCode(FrozenClass):
195
    """
196
    :ivar value:
197
    :vartype value: int
198
    :ivar name:
199
    :vartype name: string
200
    :ivar doc:
201
    :vartype doc: string
202
    """
203
204
    ua_types = [("value", "UInt32")]
205
206
    def __init__(self, value=0):
207
        if isinstance(value, str):
208
            self.value = getattr(status_codes.StatusCodes, value)
209
        else:
210
            self.value = value
211
        self._freeze = True
212
213
    def check(self):
214
        """
215
        Raises an exception if the status code is anything else than 0 (good).
216
217
        Use the is_good() method if you do not want an exception.
218
        """
219
        if not self.is_good():
220
            raise UaStatusCodeError(self.value)
221
222
    def is_good(self):
223
        """
224
        return True if status is Good.
225
        """
226
        mask = 3 << 30
227
        if mask & self.value:
228
            return False
229
        else:
230
            return True
231
232
    @property
233
    def name(self):
234
        name, _ = status_codes.get_name_and_doc(self.value)
235
        return name
236
237
    @property
238
    def doc(self):
239
        _, doc = status_codes.get_name_and_doc(self.value)
240
        return doc
241
242
    def __str__(self):
243
        return f'StatusCode({self.name})'
244
245
    __repr__ = __str__
246
247
    def __eq__(self, other):
248
        return self.value == other.value
249
250
    def __ne__(self, other):
251
        return not self.__eq__(other)
252
253
254
class NodeIdType(IntEnum):
255
    TwoByte = 0
256
    FourByte = 1
257
    Numeric = 2
258
    String = 3
259
    Guid = 4
260
    ByteString = 5
261
262
263
class NodeId(object):
264
    """
265
    NodeId Object
266
267
    Args:
268
        identifier: The identifier might be an int, a string, bytes or a Guid
269
        namespaceidx(int): The index of the namespace
270
        nodeidtype(NodeIdType): The type of the nodeid if it cannot be guess or you want something
271
        special like twobyte nodeid or fourbytenodeid
272
273
274
    :ivar Identifier:
275
    :vartype Identifier: NodeId
276
    :ivar NamespaceIndex:
277
    :vartype NamespaceIndex: Int
278
    :ivar NamespaceUri:
279
    :vartype NamespaceUri: String
280
    :ivar ServerIndex:
281
    :vartype ServerIndex: Int
282
    """
283
    def __init__(self, identifier=None, namespaceidx=0, nodeidtype=None):
284
285
        self.Identifier = identifier
286
        self.NamespaceIndex = namespaceidx
287
        self.NodeIdType = nodeidtype
288
        self.NamespaceUri = ""
289
        self.ServerIndex = 0
290
        self._freeze = True
291
        if self.Identifier is None:
292
            self.Identifier = 0
293
            if namespaceidx == 0:
294
                self.NodeIdType = NodeIdType.TwoByte
295
            else:  # TwoByte NodeId does not encode namespace.
296
                self.NodeIdType = NodeIdType.Numeric
297
            return
298
        if self.NodeIdType is None:
299
            if isinstance(self.Identifier, int):
300
                self.NodeIdType = NodeIdType.Numeric
301
            elif isinstance(self.Identifier, str):
302
                self.NodeIdType = NodeIdType.String
303
            elif isinstance(self.Identifier, bytes):
304
                self.NodeIdType = NodeIdType.ByteString
305
            elif isinstance(self.Identifier, uuid.UUID):
306
                self.NodeIdType = NodeIdType.Guid
307
            else:
308
                raise UaError("NodeId: Could not guess type of NodeId, set NodeIdType")
309
        else:
310
            self.check_identifier_type_compatibility()
311
312
    def check_identifier_type_compatibility(self):
313
        '''
314
        Check whether the given identifier can be interpreted as the given node identifier type.
315
        '''
316
        valid_type_combinations = [
317
            (int, [NodeIdType.Numeric, NodeIdType.TwoByte, NodeIdType.FourByte]),
318
            (str, [NodeIdType.String, NodeIdType.ByteString]),
319
            (bytes, [NodeIdType.ByteString, NodeIdType.TwoByte, NodeIdType.FourByte]),
320
            (uuid.UUID, [NodeIdType.Guid])
321
        ]
322
        for identifier, valid_node_types in valid_type_combinations:
323
            if isinstance(self.Identifier, identifier) and self.NodeIdType in valid_node_types:
324
                return
325
        raise UaError(f"NodeId of type {self.NodeIdType} has an incompatible identifier {self.Identifier} of type {type(self.Identifier)}")
326
327
    def __eq__(self, node):
328
        return isinstance(node, NodeId) and self.NamespaceIndex == node.NamespaceIndex and self.Identifier == node.Identifier
329
330
    def __ne__(self, other):
331
        return not self.__eq__(other)
332
333
    def __hash__(self):
334
        return hash((self.NamespaceIndex, self.Identifier))
335
336
    def __lt__(self, other):
337
        if not isinstance(other, NodeId):
338
            raise AttributeError("Can only compare to NodeId")
339
        return (self.NodeIdType, self.NamespaceIndex, self.Identifier) < (other.NodeIdType, other.NamespaceIndex, other.Identifier)
340
341
    def is_null(self):
342
        if self.NamespaceIndex != 0:
343
            return False
344
        return self.has_null_identifier()
345
346
    def has_null_identifier(self):
347
        if not self.Identifier:
348
            return True
349
        if self.NodeIdType is NodeIdType.Guid and self.Identifier.int == 0:
350
            return True
351
        return False
352
353
    @staticmethod
354
    def from_string(string):
355
        try:
356
            return NodeId._from_string(string)
357
        except ValueError as ex:
358
            raise UaStringParsingError(f"Error parsing string {string}", ex)
359
360
    @staticmethod
361
    def _from_string(string):
362
        l = string.split(";")
363
        identifier = None
364
        namespace = 0
365
        ntype = None
366
        srv = None
367
        nsu = None
368
        for el in l:
369
            if not el:
370
                continue
371
            k, v = el.split("=", 1)
372
            k = k.strip()
373
            v = v.strip()
374
            if k == "ns":
375
                namespace = int(v)
376
            elif k == "i":
377
                ntype = NodeIdType.Numeric
378
                identifier = int(v)
379
            elif k == "s":
380
                ntype = NodeIdType.String
381
                identifier = v
382
            elif k == "g":
383
                ntype = NodeIdType.Guid
384
                identifier = uuid.UUID(f"urn:uuid:{v}")
385
            elif k == "b":
386
                ntype = NodeIdType.ByteString
387
                identifier = v
388
            elif k == "srv":
389
                srv = v
390
            elif k == "nsu":
391
                nsu = v
392
        if identifier is None:
393
            raise UaStringParsingError(f"Could not find identifier in string: {string}")
394
        nodeid = NodeId(identifier, namespace, ntype)
395
        nodeid.NamespaceUri = nsu
396
        nodeid.ServerIndex = srv
397
        return nodeid
398
399
    def to_string(self):
400
        string = []
401
        if self.NamespaceIndex != 0:
402
            string.append(f"ns={self.NamespaceIndex}")
403
        ntype = None
404
        if self.NodeIdType == NodeIdType.Numeric:
405
            ntype = "i"
406
        elif self.NodeIdType == NodeIdType.String:
407
            ntype = "s"
408
        elif self.NodeIdType == NodeIdType.TwoByte:
409
            ntype = "i"
410
        elif self.NodeIdType == NodeIdType.FourByte:
411
            ntype = "i"
412
        elif self.NodeIdType == NodeIdType.Guid:
413
            ntype = "g"
414
        elif self.NodeIdType == NodeIdType.ByteString:
415
            ntype = "b"
416
        string.append(f"{ntype}={self.Identifier}")
417
        if self.ServerIndex:
418
            string.append(f"srv={self.ServerIndex}")
419
        if self.NamespaceUri:
420
            string.append(f"nsu={self.NamespaceUri}")
421
        return ";".join(string)
422
423
    def __str__(self):
424
        return self.to_string()
425
426
    def __repr__(self):
427
        return f"{self.NodeIdType.name}NodeId({self.to_string()})"
428
429
    def to_binary(self):
430
        import asyncua
431
        return asyncua.ua.ua_binary.nodeid_to_binary(self)
432
433
434
class TwoByteNodeId(NodeId):
435
    def __init__(self, identifier):
436
        NodeId.__init__(self, identifier, 0, NodeIdType.TwoByte)
437
438
439
class FourByteNodeId(NodeId):
440
    def __init__(self, identifier, namespace=0):
441
        NodeId.__init__(self, identifier, namespace, NodeIdType.FourByte)
442
443
444
class NumericNodeId(NodeId):
445
    def __init__(self, identifier, namespace=0):
446
        NodeId.__init__(self, identifier, namespace, NodeIdType.Numeric)
447
448
449
class ByteStringNodeId(NodeId):
450
    def __init__(self, identifier, namespace=0):
451
        NodeId.__init__(self, identifier, namespace, NodeIdType.ByteString)
452
453
454
class GuidNodeId(NodeId):
455
    def __init__(self, identifier, namespace=0):
456
        NodeId.__init__(self, identifier, namespace, NodeIdType.Guid)
457
458
459
class StringNodeId(NodeId):
460
    def __init__(self, identifier, namespace=0):
461
        NodeId.__init__(self, identifier, namespace, NodeIdType.String)
462
463
464
ExpandedNodeId = NodeId
465
466
467
class QualifiedName(FrozenClass):
468
    """
469
    A string qualified with a namespace index.
470
    """
471
472
    ua_types = [
473
        ('NamespaceIndex', 'UInt16'),
474
        ('Name', 'String'),
475
    ]
476
477
    def __init__(self, name=None, namespaceidx=0):
478
        if not isinstance(namespaceidx, int):
479
            raise UaError(f"namespaceidx must be an int not {namespaceidx} of type {type(namespaceidx)}")
480
        self.NamespaceIndex = namespaceidx
481
        self.Name = name
482
        self._freeze = True
483
484
    def to_string(self):
485
        return f"{self.NamespaceIndex}:{self.Name}"
486
487
    @staticmethod
488
    def from_string(string):
489
        if ":" in string:
490
            try:
491
                idx, name = string.split(":", 1)
492
                idx = int(idx)
493
            except (TypeError, ValueError) as ex:
494
                raise UaStringParsingError(f"Error parsing string {string}", ex)
495
        else:
496
            idx = 0
497
            name = string
498
        return QualifiedName(name, idx)
499
500
    def __eq__(self, bname):
501
        return isinstance(bname, QualifiedName) and self.Name == bname.Name and self.NamespaceIndex == bname.NamespaceIndex
502
503
    def __ne__(self, other):
504
        return not self.__eq__(other)
505
506
    def __lt__(self, other):
507
        if not isinstance(other, QualifiedName):
508
            raise TypeError(f"Cannot compare QualifiedName and {other}")
509
        if self.NamespaceIndex == other.NamespaceIndex:
510
            return self.Name < other.Name
511
        else:
512
            return self.NamespaceIndex < other.NamespaceIndex
513
514
    def __str__(self):
515
        return f'QualifiedName({self.NamespaceIndex}:{self.Name})'
516
517
    __repr__ = __str__
518
519
520
class LocalizedText(FrozenClass):
521
    """
522
    A string qualified with a namespace index.
523
    """
524
525
    ua_switches = {
526
        'Locale': ('Encoding', 0),
527
        'Text': ('Encoding', 1),
528
    }
529
530
    ua_types = (
531
        ('Encoding', 'Byte'),
532
        ('Locale', 'String'),
533
        ('Text', 'String'),
534
    )
535
536
    def __init__(self, text=None, locale=None):
537
        self.Encoding = 0
538
        self._text = None
539
        self._locale = None
540
        if text:
541
            self.Text = text
542
        if locale:
543
            self.Locale = locale
544
        self._freeze = True
545
546
    @property
547
    def Text(self):
548
        return self._text
549
550
    @property
551
    def Locale(self):
552
        return self._locale
553
554
    @Text.setter
555
    def Text(self, text):
556
        if not isinstance(text, str):
557
            raise ValueError(f"A LocalizedText object takes a string as argument \"text\", not a {type(text)}, {text}")
558
        self._text = text
559
        if self._text:
560
            self.Encoding |= (1 << 1)
561
562
    @Locale.setter
563
    def Locale(self, locale):
564
        if not isinstance(locale, str):
565
            raise ValueError(f"A LocalizedText object takes a string as argument \"locale\"," f" not a {type(locale)}, {locale}")
566
        self._locale = locale
567
        if self._locale:
568
            self.Encoding |= (1)
569
570
    def to_string(self):
571
        if self.Text is None:
572
            return ""
573
        if self.Locale is None:
574
            return self.Text
575
        return self.__str__()
576
577
    @staticmethod
578
    def from_string(string):
579
        m = re.match(r"^LocalizedText\(Encoding:(.*), Locale:(.*), Text:(.*)\)$", string)
580
        if m:
581
            text = m.group(3) if m.group(3) != str(None) else None
582
            locale = m.group(2) if m.group(2) != str(None) else None
583
            return LocalizedText(text=text, locale=locale)
584
        else:
585
            return LocalizedText(string)
586
587
    def __str__(self):
588
        return f'LocalizedText(Encoding:{self.Encoding}, Locale:{self.Locale}, Text:{self.Text})'
589
590
    __repr__ = __str__
591
592
    def __eq__(self, other):
593
        if isinstance(other, LocalizedText) and self.Locale == other.Locale and self.Text == other.Text:
594
            return True
595
        return False
596
597
    def __ne__(self, other):
598
        return not self.__eq__(other)
599
600
601
class ExtensionObject(FrozenClass):
602
    """
603
    Any UA object packed as an ExtensionObject
604
605
    :ivar TypeId:
606
    :vartype TypeId: NodeId
607
    :ivar Body:
608
    :vartype Body: bytes
609
    """
610
    ua_switches = {
611
        'Body': ('Encoding', 0),
612
    }
613
614
    ua_types = (
615
        ("TypeId", "NodeId"),
616
        ("Encoding", "Byte"),
617
        ("Body", "ByteString"),
618
    )
619
620
    def __init__(self):
621
        self.TypeId = NodeId()
622
        self.Encoding = 0
623
        self.Body = None
624
        self._freeze = True
625
626
    def __bool__(self):
627
        return self.Body is not None
628
629
    __nonzero__ = __bool__  # Python2 compatibilty
630
631
    def __str__(self):
632
        size = len(self.Body) if self.Body is not None else None
633
        return f'ExtensionObject(TypeId:{self.TypeId}, Encoding:{self.Encoding}, {size} bytes)'
634
635
    __repr__ = __str__
636
637
638
class VariantType(Enum):
639
    """
640
    The possible types of a variant.
641
642
    :ivar Null:
643
    :ivar Boolean:
644
    :ivar SByte:
645
    :ivar Byte:
646
    :ivar Int16:
647
    :ivar UInt16:
648
    :ivar Int32:
649
    :ivar UInt32:
650
    :ivar Int64:
651
    :ivar UInt64:
652
    :ivar Float:
653
    :ivar Double:
654
    :ivar String:
655
    :ivar DateTime:
656
    :ivar Guid:
657
    :ivar ByteString:
658
    :ivar XmlElement:
659
    :ivar NodeId:
660
    :ivar ExpandedNodeId:
661
    :ivar StatusCode:
662
    :ivar QualifiedName:
663
    :ivar LocalizedText:
664
    :ivar ExtensionObject:
665
    :ivar DataValue:
666
    :ivar Variant:
667
    :ivar DiagnosticInfo:
668
    """
669
670
    Null = 0
671
    Boolean = 1
672
    SByte = 2
673
    Byte = 3
674
    Int16 = 4
675
    UInt16 = 5
676
    Int32 = 6
677
    UInt32 = 7
678
    Int64 = 8
679
    UInt64 = 9
680
    Float = 10
681
    Double = 11
682
    String = 12
683
    DateTime = 13
684
    Guid = 14
685
    ByteString = 15
686
    XmlElement = 16
687
    NodeId = 17
688
    ExpandedNodeId = 18
689
    StatusCode = 19
690
    QualifiedName = 20
691
    LocalizedText = 21
692
    ExtensionObject = 22
693
    DataValue = 23
694
    Variant = 24
695
    DiagnosticInfo = 25
696
697
698
class VariantTypeCustom(object):
699
    """
700
    Looks like sometime we get variant with other values than those
701
    defined in VariantType.
702
    FIXME: We should not need this class, as far as I iunderstand the spec
703
    variants can only be of VariantType
704
    """
705
    def __init__(self, val):
706
        self.name = "Custom"
707
        self.value = val
708
        if self.value > 0b00111111:
709
            raise UaError(f"Cannot create VariantType. VariantType must be {0b111111} > x > {25}, received {val}")
710
711
    def __str__(self):
712
        return f"VariantType.Custom:{self.value}"
713
714
    __repr__ = __str__
715
716
    def __eq__(self, other):
717
        return self.value == other.value
718
719
720
class Variant(FrozenClass):
721
    """
722
    Create an OPC-UA Variant object.
723
    if no argument a Null Variant is created.
724
    if not variant type is given, attemps to guess type from python type
725
    if a variant is given as value, the new objects becomes a copy of the argument
726
727
    :ivar Value:
728
    :vartype Value: Any supported type
729
    :ivar VariantType:
730
    :vartype VariantType: VariantType
731
    :ivar Dimension:
732
    :vartype Dimensions: The length of each dimensions. Usually guessed from value.
733
    :ivar is_array:
734
    :vartype is_array: If the variant is an array. Usually guessed from value.
735
    """
736
    def __init__(self, value=None, varianttype=None, dimensions=None, is_array=None):
737
        self.Value = value
738
        self.VariantType = varianttype
739
        self.Dimensions = dimensions
740
        self.is_array = is_array
741
        if self.is_array is None:
742
            if isinstance(value, (list, tuple)):
743
                self.is_array = True
744
            else:
745
                self.is_array = False
746
        self._freeze = True
747
        if isinstance(value, Variant):
748
            self.Value = value.Value
749
            self.VariantType = value.VariantType
750
        if self.VariantType is None:
751
            self.VariantType = self._guess_type(self.Value)
752
        if self.Value is None and not self.is_array and self.VariantType not in (VariantType.Null, VariantType.String, VariantType.DateTime, VariantType.ExtensionObject):
753
            if self.Value == None and self.VariantType == VariantType.NodeId:
754
                self.Value = NodeId(0,0)
755
            else:
756
                raise UaError(f"Non array Variant of type {self.VariantType} cannot have value None")
757
        if self.Dimensions is None and isinstance(self.Value, (list, tuple)):
758
            dims = get_shape(self.Value)
759
            if len(dims) > 1:
760
                self.Dimensions = dims
761
762
    def __eq__(self, other):
763
        if isinstance(other, Variant) and self.VariantType == other.VariantType and self.Value == other.Value:
764
            return True
765
        return False
766
767
    def __ne__(self, other):
768
        return not self.__eq__(other)
769
770
    def _guess_type(self, val):
771
        if isinstance(val, (list, tuple)):
772
            error_val = val
773
        while isinstance(val, (list, tuple)):
774
            if len(val) == 0:
775
                raise UaError(f"could not guess UA type of variable {error_val}")
776
            val = val[0]
777
        if val is None:
778
            return VariantType.Null
779
        elif isinstance(val, bool):
780
            return VariantType.Boolean
781
        elif isinstance(val, float):
782
            return VariantType.Double
783
        elif isinstance(val, IntEnum):
784
            return VariantType.Int32
785
        elif isinstance(val, int):
786
            return VariantType.Int64
787
        elif isinstance(val, str):
788
            return VariantType.String
789
        elif isinstance(val, bytes):
790
            return VariantType.ByteString
791
        elif isinstance(val, datetime):
792
            return VariantType.DateTime
793
        elif isinstance(val, uuid.UUID):
794
            return VariantType.Guid
795
        else:
796
            if isinstance(val, object):
797
                try:
798
                    return getattr(VariantType, val.__class__.__name__)
799
                except AttributeError:
800
                    return VariantType.ExtensionObject
801
            else:
802
                raise UaError(f"Could not guess UA type of {val} with type {type(val)}, specify UA type")
803
804
    def __str__(self):
805
        return f"Variant(val:{self.Value!s},type:{self.VariantType})"
806
807
    __repr__ = __str__
808
809
810
def _split_list(l, n):
811
    n = max(1, n)
812
    return [l[i:i + n] for i in range(0, len(l), n)]
813
814
815
def flatten_and_get_shape(mylist):
816
    dims = [len(mylist)]
817
    while isinstance(mylist[0], (list, tuple)):
818
        dims.append(len(mylist[0]))
819
        mylist = [item for sublist in mylist for item in sublist]
820
        if len(mylist) == 0:
821
            break
822
    return mylist, dims
823
824
825
def flatten(mylist):
826
    if mylist is None:
827
        return None
828
    elif len(mylist) == 0:
829
        return mylist
830
    while isinstance(mylist[0], (list, tuple)):
831
        mylist = [item for sublist in mylist for item in sublist]
832
        if len(mylist) == 0:
833
            break
834
    return mylist
835
836
837
def get_shape(mylist):
838
    dims = []
839
    while isinstance(mylist, (list, tuple)):
840
        dims.append(len(mylist))
841
        if len(mylist) == 0:
842
            break
843
        mylist = mylist[0]
844
    return dims
845
846
847
class DataValue(FrozenClass):
848
    """
849
    A value with an associated timestamp, and quality.
850
    Automatically generated from xml , copied and modified here to fix errors in xml spec
851
852
    :ivar Value:
853
    :vartype Value: Variant
854
    :ivar StatusCode:
855
    :vartype StatusCode: StatusCode
856
    :ivar SourceTimestamp:
857
    :vartype SourceTimestamp: datetime
858
    :ivar SourcePicoSeconds:
859
    :vartype SourcePicoSeconds: int
860
    :ivar ServerTimestamp:
861
    :vartype ServerTimestamp: datetime
862
    :ivar ServerPicoseconds:
863
    :vartype ServerPicoseconds: int
864
    """
865
866
    ua_switches = {
867
        'Value': ('Encoding', 0),
868
        'StatusCode': ('Encoding', 1),
869
        'SourceTimestamp': ('Encoding', 2),
870
        'ServerTimestamp': ('Encoding', 3),
871
        'SourcePicoseconds': ('Encoding', 4),
872
        'ServerPicoseconds': ('Encoding', 5),
873
    }
874
875
    ua_types = (
876
        ('Encoding', 'Byte'),
877
        ('Value', 'Variant'),
878
        ('StatusCode', 'StatusCode'),
879
        ('SourceTimestamp', 'DateTime'),
880
        ('SourcePicoseconds', 'UInt16'),
881
        ('ServerTimestamp', 'DateTime'),
882
        ('ServerPicoseconds', 'UInt16'),
883
    )
884
885
    def __init__(self, variant=None, status=None, sourceTimestamp=None, sourcePicoseconds=None, serverTimestamp=None, serverPicoseconds=None):
886
        self.Encoding = 0
887
        if not isinstance(variant, Variant):
888
            variant = Variant(variant)
889
        self.Value = variant
890
        if status is None:
891
            self.StatusCode = StatusCode()
892
        else:
893
            self.StatusCode = status
894
        self.SourceTimestamp = sourceTimestamp
895
        self.SourcePicoseconds = sourcePicoseconds
896
        self.ServerTimestamp = serverTimestamp
897
        self.ServerPicoseconds = serverPicoseconds
898
        self._freeze = True
899
900
    def __str__(self):
901
        s = []
902
        if self.StatusCode is not None:
903
            s.append(f', StatusCode:{self.StatusCode}')
904
        if self.SourceTimestamp is not None:
905
            s.append(f', SourceTimestamp:{self.SourceTimestamp}')
906
        if self.ServerTimestamp is not None:
907
            s.append(f', ServerTimestamp:{self.ServerTimestamp}')
908
        if self.SourcePicoseconds is not None:
909
            s.append(f', SourcePicoseconds:{self.SourcePicoseconds}')
910
        if self.ServerPicoseconds is not None:
911
            s.append(f', ServerPicoseconds:{self.ServerPicoseconds}')
912
        return f'DataValue(Value:{self.Value}{"".join(s)})'
913
914
    __repr__ = __str__
915
916
917
def datatype_to_varianttype(int_type):
918
    """
919
    Takes a NodeId or int and return a VariantType
920
    This is only supported if int_type < 63 due to VariantType encoding
921
    At low level we do not have access to address space thus decoding is limited
922
    a better version of this method can be find in ua_utils.py
923
    """
924
    if isinstance(int_type, NodeId):
925
        int_type = int_type.Identifier
926
927
    if int_type <= 25:
928
        return VariantType(int_type)
929
    else:
930
        return VariantTypeCustom(int_type)
931
932
933
def get_default_value(vtype):
934
    """
935
    Given a variant type return default value for this type
936
    """
937
    if vtype == VariantType.Null:
938
        return None
939
    elif vtype == VariantType.Boolean:
940
        return False
941
    elif vtype in (VariantType.SByte, VariantType.Byte):
942
        return 0
943
    elif vtype == VariantType.ByteString:
944
        return b""
945
    elif 4 <= vtype.value <= 9:
946
        return 0
947
    elif vtype in (VariantType.Float, VariantType.Double):
948
        return 0.0
949
    elif vtype == VariantType.String:
950
        return None  # a string can be null
951
    elif vtype == VariantType.DateTime:
952
        return datetime.utcnow()
953
    elif vtype == VariantType.Guid:
954
        return uuid.uuid4()
955
    elif vtype == VariantType.XmlElement:
956
        return None  # Not sure this is correct
957
    elif vtype == VariantType.NodeId:
958
        return NodeId()
959
    elif vtype == VariantType.ExpandedNodeId:
960
        return NodeId()
961
    elif vtype == VariantType.StatusCode:
962
        return StatusCode()
963
    elif vtype == VariantType.QualifiedName:
964
        return QualifiedName()
965
    elif vtype == VariantType.LocalizedText:
966
        return LocalizedText()
967
    elif vtype == VariantType.ExtensionObject:
968
        return ExtensionObject()
969
    elif vtype == VariantType.DataValue:
970
        return DataValue()
971
    elif vtype == VariantType.Variant:
972
        return Variant()
973
    else:
974
        raise RuntimeError(f"function take a uatype as argument, got: {vtype}")
975
976
977
# register of custom enums (Those loaded with load_enums())
978
enums_by_datatype = {}
979
enums_datatypes = {}
980
981
982
def register_enum(name, nodeid, class_type):
983
    """
984
    Register a new enum for automatic decoding and make them available in ua module
985
    """
986
    logger.info("registring new enum: %s %s %s", name, nodeid, class_type)
987
    enums_by_datatype[nodeid] = class_type
988
    enums_datatypes[class_type] = nodeid
989
    import asyncua.ua
990
    setattr(asyncua.ua, name, class_type)
991
992
993
# These dictionnaries are used to register extensions classes for automatic
994
# decoding and encoding
995
extension_objects_by_datatype = {}  #Dict[Datatype, type]
996
extension_objects_by_typeid = {}  #Dict[EncodingId, type]
997
extension_object_typeids = {}
998
999
1000
def register_extension_object(name, encoding_nodeid, class_type, datatype_nodeid=None):
1001
    """
1002
    Register a new extension object for automatic decoding and make them available in ua module
1003
    """
1004
    logger.info("registring new extension object: %s %s %s %s", name, encoding_nodeid, class_type, datatype_nodeid)
1005
    if datatype_nodeid:
1006
        extension_objects_by_datatype[datatype_nodeid] = class_type
1007
    extension_objects_by_typeid[encoding_nodeid] = class_type
1008
    extension_object_typeids[name] = encoding_nodeid
1009
    # FIXME: Next line is not exactly a Python best practices, so feel free to propose something else
1010
    # add new extensions objects to ua modules to automate decoding
1011
    import asyncua.ua
1012
    setattr(asyncua.ua, name, class_type)
1013
1014
1015
def get_extensionobject_class_type(typeid):
1016
    """
1017
    Returns the registered class type for typid of an extension object
1018
    """
1019
    if typeid in extension_objects_by_typeid:
1020
        return extension_objects_by_typeid[typeid]
1021
    else:
1022
        return None
1023
1024
1025
class SecurityPolicyType(Enum):
1026
    """
1027
    The supported types of SecurityPolicy.
1028
1029
    "None"
1030
    "Basic128Rsa15_Sign"
1031
    "Basic128Rsa15_SignAndEncrypt"
1032
    "Basic256_Sign"
1033
    "Basic256_SignAndEncrypt"
1034
    "Basic256Sha256_Sign"
1035
    "Basic256Sha256_SignAndEncrypt"
1036
1037
    """
1038
1039
    NoSecurity = 0
1040
    Basic128Rsa15_Sign = 1
1041
    Basic128Rsa15_SignAndEncrypt = 2
1042
    Basic256_Sign = 3
1043
    Basic256_SignAndEncrypt = 4
1044
    Basic256Sha256_Sign = 5
1045
    Basic256Sha256_SignAndEncrypt = 6
1046