Passed
Push — master ( dd687a...50432a )
by Olivier
02:34
created

NodeId.check_identifier_type_compatibility()   A

Complexity

Conditions 4

Size

Total Lines 14
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 4
eloc 10
nop 1
dl 0
loc 14
rs 9.9
c 0
b 0
f 0
1
"""
2
implement ua datatypes
3
"""
4
5
import logging
6
from enum import Enum, IntEnum
7
from calendar import timegm
8
import os
9
import uuid
10
import re
11
import itertools
12
from datetime import datetime, timedelta, MAXYEAR, tzinfo
13
14
from asyncua.ua import status_codes
15
from .uaerrors import UaError, UaStatusCodeError, UaStringParsingError
16
17
logger = logging.getLogger(__name__)
18
19
EPOCH_AS_FILETIME = 116444736000000000  # January 1, 1970 as MS file time
20
HUNDREDS_OF_NANOSECONDS = 10000000
21
FILETIME_EPOCH_AS_DATETIME = datetime(1601, 1, 1)
22
23
24
class UTC(tzinfo):
25
    """
26
    UTC
27
    """
28
    def utcoffset(self, dt):
29
        return timedelta(0)
30
31
    def tzname(self, dt):
32
        return "UTC"
33
34
    def dst(self, dt):
35
        return timedelta(0)
36
37
38
def datetime_to_win_epoch(dt: datetime):
39
    """method copied from David Buxton <[email protected]> sample code"""
40
    if (dt.tzinfo is None) or (dt.tzinfo.utcoffset(dt) is None):
41
        dt = dt.replace(tzinfo=UTC())
42
    ft = EPOCH_AS_FILETIME + (timegm(dt.timetuple()) * HUNDREDS_OF_NANOSECONDS)
43
    return ft + (dt.microsecond * 10)
44
45
46
def get_win_epoch():
47
    return win_epoch_to_datetime(0)
48
49
50
def win_epoch_to_datetime(epch):
51
    try:
52
        return FILETIME_EPOCH_AS_DATETIME + timedelta(microseconds=epch // 10)
53
    except OverflowError:
54
        # FILETIMEs after 31 Dec 9999 can't be converted to datetime
55
        logger.warning('datetime overflow: %s', epch)
56
        return datetime(MAXYEAR, 12, 31, 23, 59, 59, 999999)
57
58
59
class _FrozenClass(object):
60
    """
61
    Make it impossible to add members to a class.
62
    Not pythonic at all but we found out it prevents many many
63
    bugs in use of protocol structures
64
    """
65
    _freeze = False
66
67
    def __setattr__(self, key, value):
68
        if self._freeze and not hasattr(self, key):
69
            raise TypeError(f"Error adding member '{key}' to class '{self.__class__.__name__}'," f" class is frozen, members are {self.__dict__.keys()}")
70
        object.__setattr__(self, key, value)
71
72
73
if "PYOPCUA_TYPO_CHECK" in os.environ:
74
    # typo check is cpu consuming, but it will make debug easy.
75
    # set PYOPCUA_TYPO_CHECK will make all uatype classes inherit from _FrozenClass
76
    logger.warning('uaypes typo checking is active')
77
    FrozenClass = _FrozenClass
78
else:
79
    FrozenClass = object
80
81
82
class ValueRank(IntEnum):
83
    """
84
    Defines dimensions of a variable.
85
    This enum does not support all cases since ValueRank support any n>0
86
    but since it is an IntEnum it can be replace by a normal int
87
    """
88
    ScalarOrOneDimension = -3
89
    Any = -2
90
    Scalar = -1
91
    OneOrMoreDimensions = 0
92
    OneDimension = 1
93
    # the next names are not in spec but so common we express them here
94
    TwoDimensions = 2
95
    ThreeDimensions = 3
96
    FourDimensions = 4
97
98
99
class _MaskEnum(IntEnum):
100
    @classmethod
101
    def parse_bitfield(cls, the_int):
102
        """ Take an integer and interpret it as a set of enum values. """
103
        if not isinstance(the_int, int):
104
            raise ValueError(f"Argument should be an int, we received {the_int} fo type {type(the_int)}")
105
        return {cls(b) for b in cls._bits(the_int)}
106
107
    @classmethod
108
    def to_bitfield(cls, collection):
109
        """ Takes some enum values and creates an integer from them. """
110
        # make sure all elements are of the correct type (use itertools.tee in case we get passed an
111
        # iterator)
112
        iter1, iter2 = itertools.tee(iter(collection))
113
        if not all(isinstance(x, cls) for x in iter1):
114
            raise TypeError(f"All elements have to be of type {cls}")
115
        return sum(x.mask for x in iter2)
116
117
    @property
118
    def mask(self):
119
        return 1 << self.value
120
121
    @staticmethod
122
    def _bits(n):
123
        """ Iterate over the bits in n.
124
125
            e.g. bits(44) yields at 2, 3, 5
126
        """
127
        if not n >= 0:  # avoid infinite recursion
128
            raise ValueError()
129
        pos = 0
130
        while n:
131
            if n & 0x1:
132
                yield pos
133
            n = n // 2
134
            pos += 1
135
136
137
class AccessLevel(_MaskEnum):
138
    """
139
    Bit index to indicate what the access level is.
140
141
    Spec Part 3, appears multiple times, e.g. paragraph 5.6.2 Variable NodeClass
142
    """
143
    CurrentRead = 0
144
    CurrentWrite = 1
145
    HistoryRead = 2
146
    HistoryWrite = 3
147
    SemanticChange = 4
148
    StatusWrite = 5
149
    TimestampWrite = 6
150
151
152
class WriteMask(_MaskEnum):
153
    """
154
    Bit index to indicate which attribute of a node is writable
155
156
    Spec Part 3, Paragraph 5.2.7 WriteMask
157
    """
158
    AccessLevel = 0
159
    ArrayDimensions = 1
160
    BrowseName = 2
161
    ContainsNoLoops = 3
162
    DataType = 4
163
    Description = 5
164
    DisplayName = 6
165
    EventNotifier = 7
166
    Executable = 8
167
    Historizing = 9
168
    InverseName = 10
169
    IsAbstract = 11
170
    MinimumSamplingInterval = 12
171
    NodeClass = 13
172
    NodeId = 14
173
    Symmetric = 15
174
    UserAccessLevel = 16
175
    UserExecutable = 17
176
    UserWriteMask = 18
177
    ValueRank = 19
178
    WriteMask = 20
179
    ValueForVariableType = 21
180
181
182
class EventNotifier(_MaskEnum):
183
    """
184
    Bit index to indicate how a node can be used for events.
185
186
    Spec Part 3, appears multiple times, e.g. Paragraph 5.4 View NodeClass
187
    """
188
    SubscribeToEvents = 0
189
    # Reserved        = 1
190
    HistoryRead = 2
191
    HistoryWrite = 3
192
193
194
class StatusCode(FrozenClass):
195
    """
196
    :ivar value:
197
    :vartype value: int
198
    :ivar name:
199
    :vartype name: string
200
    :ivar doc:
201
    :vartype doc: string
202
    """
203
204
    ua_types = [("value", "UInt32")]
205
206
    def __init__(self, value=0):
207
        if isinstance(value, str):
208
            self.value = getattr(status_codes.StatusCodes, value)
209
        else:
210
            self.value = value
211
        self._freeze = True
212
213
    def check(self):
214
        """
215
        Raises an exception if the status code is anything else than 0 (good).
216
217
        Use the is_good() method if you do not want an exception.
218
        """
219
        if not self.is_good():
220
            raise UaStatusCodeError(self.value)
221
222
    def is_good(self):
223
        """
224
        return True if status is Good.
225
        """
226
        mask = 3 << 30
227
        if mask & self.value:
228
            return False
229
        else:
230
            return True
231
232
    @property
233
    def name(self):
234
        name, _ = status_codes.get_name_and_doc(self.value)
235
        return name
236
237
    @property
238
    def doc(self):
239
        _, doc = status_codes.get_name_and_doc(self.value)
240
        return doc
241
242
    def __str__(self):
243
        return f'StatusCode({self.name})'
244
245
    __repr__ = __str__
246
247
    def __eq__(self, other):
248
        return self.value == other.value
249
250
    def __ne__(self, other):
251
        return not self.__eq__(other)
252
253
254
class NodeIdType(IntEnum):
255
    TwoByte = 0
256
    FourByte = 1
257
    Numeric = 2
258
    String = 3
259
    Guid = 4
260
    ByteString = 5
261
262
263
class NodeId(object):
264
    """
265
    NodeId Object
266
267
    Args:
268
        identifier: The identifier might be an int, a string, bytes or a Guid
269
        namespaceidx(int): The index of the namespace
270
        nodeidtype(NodeIdType): The type of the nodeid if it cannot be guess or you want something
271
        special like twobyte nodeid or fourbytenodeid
272
273
274
    :ivar Identifier:
275
    :vartype Identifier: NodeId
276
    :ivar NamespaceIndex:
277
    :vartype NamespaceIndex: Int
278
    :ivar NamespaceUri:
279
    :vartype NamespaceUri: String
280
    :ivar ServerIndex:
281
    :vartype ServerIndex: Int
282
    """
283
    def __init__(self, identifier=None, namespaceidx=0, nodeidtype=None):
284
285
        self.Identifier = identifier
286
        self.NamespaceIndex = namespaceidx
287
        self.NodeIdType = nodeidtype
288
        self.NamespaceUri = ""
289
        self.ServerIndex = 0
290
        self._freeze = True
291
        if self.Identifier is None:
292
            self.Identifier = 0
293
            if namespaceidx == 0:
294
                self.NodeIdType = NodeIdType.TwoByte
295
            else:  # TwoByte NodeId does not encode namespace.
296
                self.NodeIdType = NodeIdType.Numeric
297
            return
298
        if self.NodeIdType is None:
299
            if isinstance(self.Identifier, int):
300
                self.NodeIdType = NodeIdType.Numeric
301
            elif isinstance(self.Identifier, str):
302
                self.NodeIdType = NodeIdType.String
303
            elif isinstance(self.Identifier, bytes):
304
                self.NodeIdType = NodeIdType.ByteString
305
            elif isinstance(self.Identifier, uuid.UUID):
306
                self.NodeIdType = NodeIdType.Guid
307
            else:
308
                raise UaError("NodeId: Could not guess type of NodeId, set NodeIdType")
309
        else:
310
            self.check_identifier_type_compatibility()
311
312
    def check_identifier_type_compatibility(self):
313
        '''
314
        Check whether the given identifier can be interpreted as the given node identifier type.
315
        '''
316
        valid_type_combinations = [
317
            (int, [NodeIdType.Numeric, NodeIdType.TwoByte, NodeIdType.FourByte]),
318
            (str, [NodeIdType.String, NodeIdType.ByteString]),
319
            (bytes, [NodeIdType.ByteString, NodeIdType.TwoByte, NodeIdType.FourByte]),
320
            (uuid.UUID, [NodeIdType.Guid])
321
        ]
322
        for identifier, valid_node_types in valid_type_combinations:
323
            if isinstance(self.Identifier, identifier) and self.NodeIdType in valid_node_types:
324
                return
325
        raise UaError(f"NodeId of type {self.NodeIdType} has an incompatible identifier {self.Identifier} of type {type(self.Identifier)}")
326
327
    def __eq__(self, node):
328
        return isinstance(node, NodeId) and self.NamespaceIndex == node.NamespaceIndex and self.Identifier == node.Identifier
329
330
    def __ne__(self, other):
331
        return not self.__eq__(other)
332
333
    def __hash__(self):
334
        return hash((self.NamespaceIndex, self.Identifier))
335
336
    def __lt__(self, other):
337
        if not isinstance(other, NodeId):
338
            raise AttributeError("Can only compare to NodeId")
339
        return (self.NodeIdType, self.NamespaceIndex, self.Identifier) < (other.NodeIdType, other.NamespaceIndex, other.Identifier)
340
341
    def is_null(self):
342
        if self.NamespaceIndex != 0:
343
            return False
344
        return self.has_null_identifier()
345
346
    def has_null_identifier(self):
347
        if not self.Identifier:
348
            return True
349
        if self.NodeIdType is NodeIdType.Guid and self.Identifier.int == 0:
350
            return True
351
        return False
352
353
    @staticmethod
354
    def from_string(string):
355
        try:
356
            return NodeId._from_string(string)
357
        except ValueError as ex:
358
            raise UaStringParsingError(f"Error parsing string {string}", ex)
359
360
    @staticmethod
361
    def _from_string(string):
362
        l = string.split(";")
363
        identifier = None
364
        namespace = 0
365
        ntype = None
366
        srv = None
367
        nsu = None
368
        for el in l:
369
            if not el:
370
                continue
371
            k, v = el.split("=", 1)
372
            k = k.strip()
373
            v = v.strip()
374
            if k == "ns":
375
                namespace = int(v)
376
            elif k == "i":
377
                ntype = NodeIdType.Numeric
378
                identifier = int(v)
379
            elif k == "s":
380
                ntype = NodeIdType.String
381
                identifier = v
382
            elif k == "g":
383
                ntype = NodeIdType.Guid
384
                identifier = uuid.UUID(f"urn:uuid:{v}")
385
            elif k == "b":
386
                ntype = NodeIdType.ByteString
387
                identifier = v
388
            elif k == "srv":
389
                srv = v
390
            elif k == "nsu":
391
                nsu = v
392
        if identifier is None:
393
            raise UaStringParsingError(f"Could not find identifier in string: {string}")
394
        nodeid = NodeId(identifier, namespace, ntype)
395
        nodeid.NamespaceUri = nsu
396
        nodeid.ServerIndex = srv
397
        return nodeid
398
399
    def to_string(self):
400
        string = []
401
        if self.NamespaceIndex != 0:
402
            string.append(f"ns={self.NamespaceIndex}")
403
        ntype = None
404
        if self.NodeIdType == NodeIdType.Numeric:
405
            ntype = "i"
406
        elif self.NodeIdType == NodeIdType.String:
407
            ntype = "s"
408
        elif self.NodeIdType == NodeIdType.TwoByte:
409
            ntype = "i"
410
        elif self.NodeIdType == NodeIdType.FourByte:
411
            ntype = "i"
412
        elif self.NodeIdType == NodeIdType.Guid:
413
            ntype = "g"
414
        elif self.NodeIdType == NodeIdType.ByteString:
415
            ntype = "b"
416
        string.append(f"{ntype}={self.Identifier}")
417
        if self.ServerIndex:
418
            string.append(f"srv={self.ServerIndex}")
419
        if self.NamespaceUri:
420
            string.append(f"nsu={self.NamespaceUri}")
421
        return ";".join(string)
422
423
    def __str__(self):
424
        return self.to_string()
425
426
    def __repr__(self):
427
        return f"{self.NodeIdType.name}NodeId({self.to_string()})"
428
429
    def to_binary(self):
430
        import asyncua
431
        return asyncua.ua.ua_binary.nodeid_to_binary(self)
432
433
434
class TwoByteNodeId(NodeId):
435
    def __init__(self, identifier):
436
        NodeId.__init__(self, identifier, 0, NodeIdType.TwoByte)
437
438
439
class FourByteNodeId(NodeId):
440
    def __init__(self, identifier, namespace=0):
441
        NodeId.__init__(self, identifier, namespace, NodeIdType.FourByte)
442
443
444
class NumericNodeId(NodeId):
445
    def __init__(self, identifier, namespace=0):
446
        NodeId.__init__(self, identifier, namespace, NodeIdType.Numeric)
447
448
449
class ByteStringNodeId(NodeId):
450
    def __init__(self, identifier, namespace=0):
451
        NodeId.__init__(self, identifier, namespace, NodeIdType.ByteString)
452
453
454
class GuidNodeId(NodeId):
455
    def __init__(self, identifier, namespace=0):
456
        NodeId.__init__(self, identifier, namespace, NodeIdType.Guid)
457
458
459
class StringNodeId(NodeId):
460
    def __init__(self, identifier, namespace=0):
461
        NodeId.__init__(self, identifier, namespace, NodeIdType.String)
462
463
464
ExpandedNodeId = NodeId
465
466
467
class QualifiedName(FrozenClass):
468
    """
469
    A string qualified with a namespace index.
470
    """
471
472
    ua_types = [
473
        ('NamespaceIndex', 'UInt16'),
474
        ('Name', 'String'),
475
    ]
476
477
    def __init__(self, name=None, namespaceidx=0):
478
        if not isinstance(namespaceidx, int):
479
            raise UaError(f"namespaceidx must be an int not {namespaceidx} of type {type(namespaceidx)}")
480
        self.NamespaceIndex = namespaceidx
481
        self.Name = name
482
        self._freeze = True
483
484
    def to_string(self):
485
        return f"{self.NamespaceIndex}:{self.Name}"
486
487
    @staticmethod
488
    def from_string(string):
489
        if ":" in string:
490
            try:
491
                idx, name = string.split(":", 1)
492
                idx = int(idx)
493
            except (TypeError, ValueError) as ex:
494
                raise UaStringParsingError(f"Error parsing string {string}", ex)
495
        else:
496
            idx = 0
497
            name = string
498
        return QualifiedName(name, idx)
499
500
    def __eq__(self, bname):
501
        return isinstance(bname, QualifiedName) and self.Name == bname.Name and self.NamespaceIndex == bname.NamespaceIndex
502
503
    def __ne__(self, other):
504
        return not self.__eq__(other)
505
506
    def __lt__(self, other):
507
        if not isinstance(other, QualifiedName):
508
            raise TypeError(f"Cannot compare QualifiedName and {other}")
509
        if self.NamespaceIndex == other.NamespaceIndex:
510
            return self.Name < other.Name
511
        else:
512
            return self.NamespaceIndex < other.NamespaceIndex
513
514
    def __str__(self):
515
        return f'QualifiedName({self.NamespaceIndex}:{self.Name})'
516
517
    __repr__ = __str__
518
519
520
class LocalizedText(FrozenClass):
521
    """
522
    A string qualified with a namespace index.
523
    """
524
525
    ua_switches = {
526
        'Locale': ('Encoding', 0),
527
        'Text': ('Encoding', 1),
528
    }
529
530
    ua_types = (
531
        ('Encoding', 'Byte'),
532
        ('Locale', 'String'),
533
        ('Text', 'String'),
534
    )
535
536
    def __init__(self, text=None, locale=None):
537
        self.Encoding = 0
538
        self._text = None
539
        self._locale = None
540
        if text:
541
            self.Text = text
542
        if locale:
543
            self.Locale = locale
544
        self._freeze = True
545
546
    @property
547
    def Text(self):
548
        return self._text
549
550
    @property
551
    def Locale(self):
552
        return self._locale
553
554
    @Text.setter
555
    def Text(self, text):
556
        if not isinstance(text, str):
557
            raise ValueError(f"A LocalizedText object takes a string as argument \"text\", not a {type(text)}, {text}")
558
        self._text = text
559
        if self._text:
560
            self.Encoding |= (1 << 1)
561
562
    @Locale.setter
563
    def Locale(self, locale):
564
        if not isinstance(locale, str):
565
            raise ValueError(f"A LocalizedText object takes a string as argument \"locale\"," f" not a {type(locale)}, {locale}")
566
        self._locale = locale
567
        if self._locale:
568
            self.Encoding |= (1)
569
570
    def to_string(self):
571
        if self.Text is None:
572
            return ""
573
        if self.Locale is None:
574
            return self.Text
575
        return self.__str__()
576
577
    @staticmethod
578
    def from_string(string):
579
        m = re.match(r"^LocalizedText\(Encoding:(.*), Locale:(.*), Text:(.*)\)$", string)
580
        if m:
581
            text = m.group(3) if m.group(3) != str(None) else None
582
            locale = m.group(2) if m.group(2) != str(None) else None
583
            return LocalizedText(text=text, locale=locale)
584
        else:
585
            return LocalizedText(string)
586
587
    def __str__(self):
588
        return f'LocalizedText(Encoding:{self.Encoding}, Locale:{self.Locale}, Text:{self.Text})'
589
590
    __repr__ = __str__
591
592
    def __eq__(self, other):
593
        if isinstance(other, LocalizedText) and self.Locale == other.Locale and self.Text == other.Text:
594
            return True
595
        return False
596
597
    def __ne__(self, other):
598
        return not self.__eq__(other)
599
600
601
class ExtensionObject(FrozenClass):
602
    """
603
    Any UA object packed as an ExtensionObject
604
605
    :ivar TypeId:
606
    :vartype TypeId: NodeId
607
    :ivar Body:
608
    :vartype Body: bytes
609
    """
610
    ua_switches = {
611
        'Body': ('Encoding', 0),
612
    }
613
614
    ua_types = (
615
        ("TypeId", "NodeId"),
616
        ("Encoding", "Byte"),
617
        ("Body", "ByteString"),
618
    )
619
620
    def __init__(self):
621
        self.TypeId = NodeId()
622
        self.Encoding = 0
623
        self.Body = None
624
        self._freeze = True
625
626
    def __bool__(self):
627
        return self.Body is not None
628
629
    __nonzero__ = __bool__  # Python2 compatibilty
630
631
    def __str__(self):
632
        size = len(self.Body) if self.Body is not None else None
633
        return f'ExtensionObject(TypeId:{self.TypeId}, Encoding:{self.Encoding}, {size} bytes)'
634
635
    __repr__ = __str__
636
637
638
class VariantType(Enum):
639
    """
640
    The possible types of a variant.
641
642
    :ivar Null:
643
    :ivar Boolean:
644
    :ivar SByte:
645
    :ivar Byte:
646
    :ivar Int16:
647
    :ivar UInt16:
648
    :ivar Int32:
649
    :ivar UInt32:
650
    :ivar Int64:
651
    :ivar UInt64:
652
    :ivar Float:
653
    :ivar Double:
654
    :ivar String:
655
    :ivar DateTime:
656
    :ivar Guid:
657
    :ivar ByteString:
658
    :ivar XmlElement:
659
    :ivar NodeId:
660
    :ivar ExpandedNodeId:
661
    :ivar StatusCode:
662
    :ivar QualifiedName:
663
    :ivar LocalizedText:
664
    :ivar ExtensionObject:
665
    :ivar DataValue:
666
    :ivar Variant:
667
    :ivar DiagnosticInfo:
668
    """
669
670
    Null = 0
671
    Boolean = 1
672
    SByte = 2
673
    Byte = 3
674
    Int16 = 4
675
    UInt16 = 5
676
    Int32 = 6
677
    UInt32 = 7
678
    Int64 = 8
679
    UInt64 = 9
680
    Float = 10
681
    Double = 11
682
    String = 12
683
    DateTime = 13
684
    Guid = 14
685
    ByteString = 15
686
    XmlElement = 16
687
    NodeId = 17
688
    ExpandedNodeId = 18
689
    StatusCode = 19
690
    QualifiedName = 20
691
    LocalizedText = 21
692
    ExtensionObject = 22
693
    DataValue = 23
694
    Variant = 24
695
    DiagnosticInfo = 25
696
697
698
class VariantTypeCustom(object):
699
    """
700
    Looks like sometime we get variant with other values than those
701
    defined in VariantType.
702
    FIXME: We should not need this class, as far as I iunderstand the spec
703
    variants can only be of VariantType
704
    """
705
    def __init__(self, val):
706
        self.name = "Custom"
707
        self.value = val
708
        if self.value > 0b00111111:
709
            raise UaError(f"Cannot create VariantType. VariantType must be {0b111111} > x > {25}, received {val}")
710
711
    def __str__(self):
712
        return f"VariantType.Custom:{self.value}"
713
714
    __repr__ = __str__
715
716
    def __eq__(self, other):
717
        return self.value == other.value
718
719
720
class Variant(FrozenClass):
721
    """
722
    Create an OPC-UA Variant object.
723
    if no argument a Null Variant is created.
724
    if not variant type is given, attemps to guess type from python type
725
    if a variant is given as value, the new objects becomes a copy of the argument
726
727
    :ivar Value:
728
    :vartype Value: Any supported type
729
    :ivar VariantType:
730
    :vartype VariantType: VariantType
731
    :ivar Dimension:
732
    :vartype Dimensions: The length of each dimensions. Usually guessed from value.
733
    :ivar is_array:
734
    :vartype is_array: If the variant is an array. Usually guessed from value.
735
    """
736
    def __init__(self, value=None, varianttype=None, dimensions=None, is_array=None):
737
        self.Value = value
738
        self.VariantType = varianttype
739
        self.Dimensions = dimensions
740
        self.is_array = is_array
741
        if self.is_array is None:
742
            if isinstance(value, (list, tuple)):
743
                self.is_array = True
744
            else:
745
                self.is_array = False
746
        self._freeze = True
747
        if isinstance(value, Variant):
748
            self.Value = value.Value
749
            self.VariantType = value.VariantType
750
        if self.VariantType is None:
751
            self.VariantType = self._guess_type(self.Value)
752
        if self.Value is None and not self.is_array and self.VariantType not in (VariantType.Null, VariantType.String, VariantType.DateTime, VariantType.ExtensionObject):
753
            raise UaError(f"Non array Variant of type {self.VariantType} cannot have value None")
754
        if self.Dimensions is None and isinstance(self.Value, (list, tuple)):
755
            dims = get_shape(self.Value)
756
            if len(dims) > 1:
757
                self.Dimensions = dims
758
759
    def __eq__(self, other):
760
        if isinstance(other, Variant) and self.VariantType == other.VariantType and self.Value == other.Value:
761
            return True
762
        return False
763
764
    def __ne__(self, other):
765
        return not self.__eq__(other)
766
767
    def _guess_type(self, val):
768
        if isinstance(val, (list, tuple)):
769
            error_val = val
770
        while isinstance(val, (list, tuple)):
771
            if len(val) == 0:
772
                raise UaError(f"could not guess UA type of variable {error_val}")
773
            val = val[0]
774
        if val is None:
775
            return VariantType.Null
776
        elif isinstance(val, bool):
777
            return VariantType.Boolean
778
        elif isinstance(val, float):
779
            return VariantType.Double
780
        elif isinstance(val, IntEnum):
781
            return VariantType.Int32
782
        elif isinstance(val, int):
783
            return VariantType.Int64
784
        elif isinstance(val, str):
785
            return VariantType.String
786
        elif isinstance(val, bytes):
787
            return VariantType.ByteString
788
        elif isinstance(val, datetime):
789
            return VariantType.DateTime
790
        elif isinstance(val, uuid.UUID):
791
            return VariantType.Guid
792
        else:
793
            if isinstance(val, object):
794
                try:
795
                    return getattr(VariantType, val.__class__.__name__)
796
                except AttributeError:
797
                    return VariantType.ExtensionObject
798
            else:
799
                raise UaError(f"Could not guess UA type of {val} with type {type(val)}, specify UA type")
800
801
    def __str__(self):
802
        return f"Variant(val:{self.Value!s},type:{self.VariantType})"
803
804
    __repr__ = __str__
805
806
807
def _split_list(l, n):
808
    n = max(1, n)
809
    return [l[i:i + n] for i in range(0, len(l), n)]
810
811
812
def flatten_and_get_shape(mylist):
813
    dims = [len(mylist)]
814
    while isinstance(mylist[0], (list, tuple)):
815
        dims.append(len(mylist[0]))
816
        mylist = [item for sublist in mylist for item in sublist]
817
        if len(mylist) == 0:
818
            break
819
    return mylist, dims
820
821
822
def flatten(mylist):
823
    if mylist is None:
824
        return None
825
    elif len(mylist) == 0:
826
        return mylist
827
    while isinstance(mylist[0], (list, tuple)):
828
        mylist = [item for sublist in mylist for item in sublist]
829
        if len(mylist) == 0:
830
            break
831
    return mylist
832
833
834
def get_shape(mylist):
835
    dims = []
836
    while isinstance(mylist, (list, tuple)):
837
        dims.append(len(mylist))
838
        if len(mylist) == 0:
839
            break
840
        mylist = mylist[0]
841
    return dims
842
843
844
class DataValue(FrozenClass):
845
    """
846
    A value with an associated timestamp, and quality.
847
    Automatically generated from xml , copied and modified here to fix errors in xml spec
848
849
    :ivar Value:
850
    :vartype Value: Variant
851
    :ivar StatusCode:
852
    :vartype StatusCode: StatusCode
853
    :ivar SourceTimestamp:
854
    :vartype SourceTimestamp: datetime
855
    :ivar SourcePicoSeconds:
856
    :vartype SourcePicoSeconds: int
857
    :ivar ServerTimestamp:
858
    :vartype ServerTimestamp: datetime
859
    :ivar ServerPicoseconds:
860
    :vartype ServerPicoseconds: int
861
    """
862
863
    ua_switches = {
864
        'Value': ('Encoding', 0),
865
        'StatusCode': ('Encoding', 1),
866
        'SourceTimestamp': ('Encoding', 2),
867
        'ServerTimestamp': ('Encoding', 3),
868
        'SourcePicoseconds': ('Encoding', 4),
869
        'ServerPicoseconds': ('Encoding', 5),
870
    }
871
872
    ua_types = (
873
        ('Encoding', 'Byte'),
874
        ('Value', 'Variant'),
875
        ('StatusCode', 'StatusCode'),
876
        ('SourceTimestamp', 'DateTime'),
877
        ('SourcePicoseconds', 'UInt16'),
878
        ('ServerTimestamp', 'DateTime'),
879
        ('ServerPicoseconds', 'UInt16'),
880
    )
881
882
    def __init__(self, variant=None, status=None, sourceTimestamp=None, sourcePicoseconds=None, serverTimestamp=None, serverPicoseconds=None):
883
        self.Encoding = 0
884
        if not isinstance(variant, Variant):
885
            variant = Variant(variant)
886
        self.Value = variant
887
        if status is None:
888
            self.StatusCode = StatusCode()
889
        else:
890
            self.StatusCode = status
891
        self.SourceTimestamp = sourceTimestamp
892
        self.SourcePicoseconds = sourcePicoseconds
893
        self.ServerTimestamp = serverTimestamp
894
        self.ServerPicoseconds = serverPicoseconds
895
        self._freeze = True
896
897
    def __str__(self):
898
        s = []
899
        if self.StatusCode is not None:
900
            s.append(f', StatusCode:{self.StatusCode}')
901
        if self.SourceTimestamp is not None:
902
            s.append(f', SourceTimestamp:{self.SourceTimestamp}')
903
        if self.ServerTimestamp is not None:
904
            s.append(f', ServerTimestamp:{self.ServerTimestamp}')
905
        if self.SourcePicoseconds is not None:
906
            s.append(f', SourcePicoseconds:{self.SourcePicoseconds}')
907
        if self.ServerPicoseconds is not None:
908
            s.append(f', ServerPicoseconds:{self.ServerPicoseconds}')
909
        return f'DataValue(Value:{self.Value}{"".join(s)})'
910
911
    __repr__ = __str__
912
913
914
def datatype_to_varianttype(int_type):
915
    """
916
    Takes a NodeId or int and return a VariantType
917
    This is only supported if int_type < 63 due to VariantType encoding
918
    At low level we do not have access to address space thus decoding is limited
919
    a better version of this method can be find in ua_utils.py
920
    """
921
    if isinstance(int_type, NodeId):
922
        int_type = int_type.Identifier
923
924
    if int_type <= 25:
925
        return VariantType(int_type)
926
    else:
927
        return VariantTypeCustom(int_type)
928
929
930
def get_default_value(vtype):
931
    """
932
    Given a variant type return default value for this type
933
    """
934
    if vtype == VariantType.Null:
935
        return None
936
    elif vtype == VariantType.Boolean:
937
        return False
938
    elif vtype in (VariantType.SByte, VariantType.Byte):
939
        return 0
940
    elif vtype == VariantType.ByteString:
941
        return b""
942
    elif 4 <= vtype.value <= 9:
943
        return 0
944
    elif vtype in (VariantType.Float, VariantType.Double):
945
        return 0.0
946
    elif vtype == VariantType.String:
947
        return None  # a string can be null
948
    elif vtype == VariantType.DateTime:
949
        return datetime.utcnow()
950
    elif vtype == VariantType.Guid:
951
        return uuid.uuid4()
952
    elif vtype == VariantType.XmlElement:
953
        return None  # Not sure this is correct
954
    elif vtype == VariantType.NodeId:
955
        return NodeId()
956
    elif vtype == VariantType.ExpandedNodeId:
957
        return NodeId()
958
    elif vtype == VariantType.StatusCode:
959
        return StatusCode()
960
    elif vtype == VariantType.QualifiedName:
961
        return QualifiedName()
962
    elif vtype == VariantType.LocalizedText:
963
        return LocalizedText()
964
    elif vtype == VariantType.ExtensionObject:
965
        return ExtensionObject()
966
    elif vtype == VariantType.DataValue:
967
        return DataValue()
968
    elif vtype == VariantType.Variant:
969
        return Variant()
970
    else:
971
        raise RuntimeError(f"function take a uatype as argument, got: {vtype}")
972
973
974
# register of custom enums (Those loaded with load_enums())
975
enums_by_datatype = {}
976
enums_datatypes = {}
977
978
979
def register_enum(name, nodeid, class_type):
980
    """
981
    Register a new enum for automatic decoding and make them available in ua module
982
    """
983
    logger.info("registring new enum: %s %s %s", name, nodeid, class_type)
984
    enums_by_datatype[nodeid] = class_type
985
    enums_datatypes[class_type] = nodeid
986
    import asyncua.ua
987
    setattr(asyncua.ua, name, class_type)
988
989
990
# These dictionnaries are used to register extensions classes for automatic
991
# decoding and encoding
992
extension_objects_by_datatype = {}  #Dict[Datatype, type]
993
extension_objects_by_typeid = {}  #Dict[EncodingId, type]
994
extension_object_typeids = {}
995
996
997
def register_extension_object(name, encoding_nodeid, class_type, datatype_nodeid=None):
998
    """
999
    Register a new extension object for automatic decoding and make them available in ua module
1000
    """
1001
    logger.info("registring new extension object: %s %s %s %s", name, encoding_nodeid, class_type, datatype_nodeid)
1002
    if datatype_nodeid:
1003
        extension_objects_by_datatype[datatype_nodeid] = class_type
1004
    extension_objects_by_typeid[encoding_nodeid] = class_type
1005
    extension_object_typeids[name] = encoding_nodeid
1006
    # FIXME: Next line is not exactly a Python best practices, so feel free to propose something else
1007
    # add new extensions objects to ua modules to automate decoding
1008
    import asyncua.ua
1009
    setattr(asyncua.ua, name, class_type)
1010
1011
1012
def get_extensionobject_class_type(typeid):
1013
    """
1014
    Returns the registered class type for typid of an extension object
1015
    """
1016
    if typeid in extension_objects_by_typeid:
1017
        return extension_objects_by_typeid[typeid]
1018
    else:
1019
        return None
1020
1021
1022
class SecurityPolicyType(Enum):
1023
    """
1024
    The supported types of SecurityPolicy.
1025
1026
    "None"
1027
    "Basic128Rsa15_Sign"
1028
    "Basic128Rsa15_SignAndEncrypt"
1029
    "Basic256_Sign"
1030
    "Basic256_SignAndEncrypt"
1031
    "Basic256Sha256_Sign"
1032
    "Basic256Sha256_SignAndEncrypt"
1033
1034
    """
1035
1036
    NoSecurity = 0
1037
    Basic128Rsa15_Sign = 1
1038
    Basic128Rsa15_SignAndEncrypt = 2
1039
    Basic256_Sign = 3
1040
    Basic256_SignAndEncrypt = 4
1041
    Basic256Sha256_Sign = 5
1042
    Basic256Sha256_SignAndEncrypt = 6
1043