Passed
Pull Request — master (#52)
by
unknown
03:26
created

asyncua.ua.uatypes.LocalizedText.Locale()   A

Complexity

Conditions 3

Size

Total Lines 3
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 7
nop 2
dl 0
loc 3
rs 10
c 0
b 0
f 0
1
"""
2
implement ua datatypes
3
"""
4
5
import logging
6
from enum import Enum, IntEnum
7
from calendar import timegm
8
import os
9
import uuid
10
import re
11
import itertools
12
from datetime import datetime, timedelta, MAXYEAR, tzinfo
13
14
from asyncua.ua import status_codes
15
from .uaerrors import UaError, UaStatusCodeError, UaStringParsingError
16
17
logger = logging.getLogger(__name__)
18
19
EPOCH_AS_FILETIME = 116444736000000000  # January 1, 1970 as MS file time
20
HUNDREDS_OF_NANOSECONDS = 10000000
21
FILETIME_EPOCH_AS_DATETIME = datetime(1601, 1, 1)
22
23
24
class UTC(tzinfo):
25
    """
26
    UTC
27
    """
28
29
    def utcoffset(self, dt):
30
        return timedelta(0)
31
32
    def tzname(self, dt):
33
        return "UTC"
34
35
    def dst(self, dt):
36
        return timedelta(0)
37
38
39
def datetime_to_win_epoch(dt):
40
    """method copied from David Buxton <[email protected]> sample code"""
41
    if (dt.tzinfo is None) or (dt.tzinfo.utcoffset(dt) is None):
42
        dt = dt.replace(tzinfo=UTC())
43
    ft = EPOCH_AS_FILETIME + (timegm(dt.timetuple()) * HUNDREDS_OF_NANOSECONDS)
44
    return ft + (dt.microsecond * 10)
45
46
47
def get_win_epoch():
48
    return win_epoch_to_datetime(0)
49
50
51
def win_epoch_to_datetime(epch):
52
    try:
53
        return FILETIME_EPOCH_AS_DATETIME + timedelta(microseconds=epch // 10)
54
    except OverflowError:
55
        # FILETIMEs after 31 Dec 9999 can't be converted to datetime
56
        logger.warning('datetime overflow: %s', epch)
57
        return datetime(MAXYEAR, 12, 31, 23, 59, 59, 999999)
58
59
60
class _FrozenClass(object):
61
    """
62
    Make it impossible to add members to a class.
63
    Not pythonic at all but we found out it prevents many many
64
    bugs in use of protocol structures
65
    """
66
    _freeze = False
67
68
    def __setattr__(self, key, value):
69
        if self._freeze and not hasattr(self, key):
70
            raise TypeError(
71
                f"Error adding member '{key}' to class '{self.__class__.__name__}', class is frozen, members are {self.__dict__.keys()}"
72
            )
73
        object.__setattr__(self, key, value)
74
75
76
if "PYOPCUA_TYPO_CHECK" in os.environ:
77
    # typo check is cpu consuming, but it will make debug easy.
78
    # set PYOPCUA_TYPO_CHECK will make all uatype classes inherit from _FrozenClass
79
    logger.warning('uaypes typo checking is active')
80
    FrozenClass = _FrozenClass
81
else:
82
    FrozenClass = object
83
84
85
class ValueRank(IntEnum):
86
    """
87
    Defines dimensions of a variable.
88
    This enum does not support all cases since ValueRank support any n>0
89
    but since it is an IntEnum it can be replace by a normal int
90
    """
91
    ScalarOrOneDimension = -3
92
    Any = -2
93
    Scalar = -1
94
    OneOrMoreDimensions = 0
95
    OneDimension = 1
96
    # the next names are not in spec but so common we express them here
97
    TwoDimensions = 2
98
    ThreeDimensions = 3
99
    FourDimensions = 4
100
101
102
class _MaskEnum(IntEnum):
103
    @classmethod
104
    def parse_bitfield(cls, the_int):
105
        """ Take an integer and interpret it as a set of enum values. """
106
        if not isinstance(the_int, int):
107
            raise ValueError(f"Argument should be an int, we received {the_int} fo type {type(the_int)}")
108
        return {cls(b) for b in cls._bits(the_int)}
109
110
    @classmethod
111
    def to_bitfield(cls, collection):
112
        """ Takes some enum values and creates an integer from them. """
113
        # make sure all elements are of the correct type (use itertools.tee in case we get passed an
114
        # iterator)
115
        iter1, iter2 = itertools.tee(iter(collection))
116
        if not all(isinstance(x, cls) for x in iter1):
117
            raise TypeError(f"All elements have to be of type {cls}")
118
        return sum(x.mask for x in iter2)
119
120
    @property
121
    def mask(self):
122
        return 1 << self.value
123
124
    @staticmethod
125
    def _bits(n):
126
        """ Iterate over the bits in n.
127
128
            e.g. bits(44) yields at 2, 3, 5
129
        """
130
        if not n >= 0:  # avoid infinite recursion
131
            raise ValueError()
132
        pos = 0
133
        while n:
134
            if n & 0x1:
135
                yield pos
136
            n = n // 2
137
            pos += 1
138
139
140
class AccessLevel(_MaskEnum):
141
    """
142
    Bit index to indicate what the access level is.
143
144
    Spec Part 3, appears multiple times, e.g. paragraph 5.6.2 Variable NodeClass
145
    """
146
    CurrentRead = 0
147
    CurrentWrite = 1
148
    HistoryRead = 2
149
    HistoryWrite = 3
150
    SemanticChange = 4
151
    StatusWrite = 5
152
    TimestampWrite = 6
153
154
155
class WriteMask(_MaskEnum):
156
    """
157
    Bit index to indicate which attribute of a node is writable
158
159
    Spec Part 3, Paragraph 5.2.7 WriteMask
160
    """
161
    AccessLevel = 0
162
    ArrayDimensions = 1
163
    BrowseName = 2
164
    ContainsNoLoops = 3
165
    DataType = 4
166
    Description = 5
167
    DisplayName = 6
168
    EventNotifier = 7
169
    Executable = 8
170
    Historizing = 9
171
    InverseName = 10
172
    IsAbstract = 11
173
    MinimumSamplingInterval = 12
174
    NodeClass = 13
175
    NodeId = 14
176
    Symmetric = 15
177
    UserAccessLevel = 16
178
    UserExecutable = 17
179
    UserWriteMask = 18
180
    ValueRank = 19
181
    WriteMask = 20
182
    ValueForVariableType = 21
183
184
185
class EventNotifier(_MaskEnum):
186
    """
187
    Bit index to indicate how a node can be used for events.
188
189
    Spec Part 3, appears multiple times, e.g. Paragraph 5.4 View NodeClass
190
    """
191
    SubscribeToEvents = 0
192
    # Reserved        = 1
193
    HistoryRead = 2
194
    HistoryWrite = 3
195
196
197
class StatusCode(FrozenClass):
198
    """
199
    :ivar value:
200
    :vartype value: int
201
    :ivar name:
202
    :vartype name: string
203
    :ivar doc:
204
    :vartype doc: string
205
    """
206
207
    ua_types = [("value", "UInt32")]
208
209
    def __init__(self, value=0):
210
        if isinstance(value, str):
211
            self.value = getattr(status_codes.StatusCodes, value)
212
        else:
213
            self.value = value
214
        self._freeze = True
215
216
    def check(self):
217
        """
218
        Raises an exception if the status code is anything else than 0 (good).
219
220
        Use the is_good() method if you do not want an exception.
221
        """
222
        if not self.is_good():
223
            raise UaStatusCodeError(self.value)
224
225
    def is_good(self):
226
        """
227
        return True if status is Good.
228
        """
229
        mask = 3 << 30
230
        if mask & self.value:
231
            return False
232
        else:
233
            return True
234
235
    @property
236
    def name(self):
237
        name, _ = status_codes.get_name_and_doc(self.value)
238
        return name
239
240
    @property
241
    def doc(self):
242
        _, doc = status_codes.get_name_and_doc(self.value)
243
        return doc
244
245
    def __str__(self):
246
        return f'StatusCode({self.name})'
247
248
    __repr__ = __str__
249
250
    def __eq__(self, other):
251
        return self.value == other.value
252
253
    def __ne__(self, other):
254
        return not self.__eq__(other)
255
256
257
class NodeIdType(IntEnum):
258
    TwoByte = 0
259
    FourByte = 1
260
    Numeric = 2
261
    String = 3
262
    Guid = 4
263
    ByteString = 5
264
265
266
class NodeId(object):
267
    """
268
    NodeId Object
269
270
    Args:
271
        identifier: The identifier might be an int, a string, bytes or a Guid
272
        namespaceidx(int): The index of the namespace
273
        nodeidtype(NodeIdType): The type of the nodeid if it cannor be guess or you want something special like twobyte nodeid or fourbytenodeid
274
275
276
    :ivar Identifier:
277
    :vartype Identifier: NodeId
278
    :ivar NamespaceIndex:
279
    :vartype NamespaceIndex: Int
280
    :ivar NamespaceUri:
281
    :vartype NamespaceUri: String
282
    :ivar ServerIndex:
283
    :vartype ServerIndex: Int
284
    """
285
286
    def __init__(self, identifier=None, namespaceidx=0, nodeidtype=None):
287
288
        self.Identifier = identifier
289
        self.NamespaceIndex = namespaceidx
290
        self.NodeIdType = nodeidtype
291
        self.NamespaceUri = ""
292
        self.ServerIndex = 0
293
        self._freeze = True
294
        if self.Identifier is None:
295
            self.Identifier = 0
296
            self.NodeIdType = NodeIdType.TwoByte
297
            return
298
        if self.NodeIdType is None:
299
            if isinstance(self.Identifier, int):
300
                self.NodeIdType = NodeIdType.Numeric
301
            elif isinstance(self.Identifier, str):
302
                self.NodeIdType = NodeIdType.String
303
            elif isinstance(self.Identifier, bytes):
304
                self.NodeIdType = NodeIdType.ByteString
305
            elif isinstance(self.Identifier, uuid.UUID):
306
                self.NodeIdType = NodeIdType.Guid
307
            else:
308
                raise UaError("NodeId: Could not guess type of NodeId, set NodeIdType")
309
310
    def __eq__(self, node):
311
        return isinstance(node,
312
                          NodeId) and self.NamespaceIndex == node.NamespaceIndex and self.Identifier == node.Identifier
313
314
    def __ne__(self, other):
315
        return not self.__eq__(other)
316
317
    def __hash__(self):
318
        return hash((self.NamespaceIndex, self.Identifier))
319
320
    def __lt__(self, other):
321
        if not isinstance(other, NodeId):
322
            raise AttributeError("Can only compare to NodeId")
323
        return (self.NodeIdType, self.NamespaceIndex, self.Identifier) < (other.NodeIdType, other.NamespaceIndex,
324
                                                                          other.Identifier)
325
326
    def is_null(self):
327
        if self.NamespaceIndex != 0:
328
            return False
329
        return self.has_null_identifier()
330
331
    def has_null_identifier(self):
332
        if not self.Identifier:
333
            return True
334
        if self.NodeIdType == NodeIdType.Guid and re.match(b'0.', self.Identifier):
335
            return True
336
        return False
337
338
    @staticmethod
339
    def from_string(string):
340
        try:
341
            return NodeId._from_string(string)
342
        except ValueError as ex:
343
            raise UaStringParsingError(f"Error parsing string {string}", ex)
344
345
    @staticmethod
346
    def _from_string(string):
347
        l = string.split(";")
348
        identifier = None
349
        namespace = 0
350
        ntype = None
351
        srv = None
352
        nsu = None
353
        for el in l:
354
            if not el:
355
                continue
356
            k, v = el.split("=", 1)
357
            k = k.strip()
358
            v = v.strip()
359
            if k == "ns":
360
                namespace = int(v)
361
            elif k == "i":
362
                ntype = NodeIdType.Numeric
363
                identifier = int(v)
364
            elif k == "s":
365
                ntype = NodeIdType.String
366
                identifier = v
367
            elif k == "g":
368
                ntype = NodeIdType.Guid
369
                identifier = uuid.UUID(f"urn:uuid:{v}")
370
            elif k == "b":
371
                ntype = NodeIdType.ByteString
372
                identifier = v
373
            elif k == "srv":
374
                srv = v
375
            elif k == "nsu":
376
                nsu = v
377
        if identifier is None:
378
            raise UaStringParsingError(f"Could not find identifier in string: {string}")
379
        nodeid = NodeId(identifier, namespace, ntype)
380
        nodeid.NamespaceUri = nsu
381
        nodeid.ServerIndex = srv
382
        return nodeid
383
384
    def to_string(self):
385
        string = []
386
        if self.NamespaceIndex != 0:
387
            string.append("ns={0}".format(self.NamespaceIndex))
388
        ntype = None
389
        if self.NodeIdType == NodeIdType.Numeric:
390
            ntype = "i"
391
        elif self.NodeIdType == NodeIdType.String:
392
            ntype = "s"
393
        elif self.NodeIdType == NodeIdType.TwoByte:
394
            ntype = "i"
395
        elif self.NodeIdType == NodeIdType.FourByte:
396
            ntype = "i"
397
        elif self.NodeIdType == NodeIdType.Guid:
398
            ntype = "g"
399
        elif self.NodeIdType == NodeIdType.ByteString:
400
            ntype = "b"
401
        string.append("{0}={1}".format(ntype, self.Identifier))
402
        if self.ServerIndex:
403
            string.append("srv={}".format(self.ServerIndex))
404
        if self.NamespaceUri:
405
            string.append("nsu={0}".format(self.NamespaceUri))
406
        return ";".join(string)
407
408
    def __str__(self):
409
        return f"{self.NodeIdType.name}NodeId({self.to_string()})"
410
411
    __repr__ = __str__
412
413
    def to_binary(self):
414
        import asyncua
415
        return asyncua.ua.ua_binary.nodeid_to_binary(self)
416
417
418
class TwoByteNodeId(NodeId):
419
    def __init__(self, identifier):
420
        NodeId.__init__(self, identifier, 0, NodeIdType.TwoByte)
421
422
423
class FourByteNodeId(NodeId):
424
    def __init__(self, identifier, namespace=0):
425
        NodeId.__init__(self, identifier, namespace, NodeIdType.FourByte)
426
427
428
class NumericNodeId(NodeId):
429
    def __init__(self, identifier, namespace=0):
430
        NodeId.__init__(self, identifier, namespace, NodeIdType.Numeric)
431
432
433
class ByteStringNodeId(NodeId):
434
    def __init__(self, identifier, namespace=0):
435
        NodeId.__init__(self, identifier, namespace, NodeIdType.ByteString)
436
437
438
class GuidNodeId(NodeId):
439
    def __init__(self, identifier, namespace=0):
440
        NodeId.__init__(self, identifier, namespace, NodeIdType.Guid)
441
442
443
class StringNodeId(NodeId):
444
    def __init__(self, identifier, namespace=0):
445
        NodeId.__init__(self, identifier, namespace, NodeIdType.String)
446
447
448
ExpandedNodeId = NodeId
449
450
451
class QualifiedName(FrozenClass):
452
    """
453
    A string qualified with a namespace index.
454
    """
455
456
    ua_types = [
457
        ('NamespaceIndex', 'UInt16'),
458
        ('Name', 'String'),
459
    ]
460
461
    def __init__(self, name=None, namespaceidx=0):
462
        if not isinstance(namespaceidx, int):
463
            raise UaError(f"namespaceidx must be an int not {namespaceidx} of type {type(namespaceidx)}")
464
        self.NamespaceIndex = namespaceidx
465
        self.Name = name
466
        self._freeze = True
467
468
    def to_string(self):
469
        return "{0}:{1}".format(self.NamespaceIndex, self.Name)
470
471
    @staticmethod
472
    def from_string(string):
473
        if ":" in string:
474
            try:
475
                idx, name = string.split(":", 1)
476
                idx = int(idx)
477
            except (TypeError, ValueError) as ex:
478
                raise UaStringParsingError(f"Error parsing string {string}", ex)
479
        else:
480
            idx = 0
481
            name = string
482
        return QualifiedName(name, idx)
483
484
    def __eq__(self, bname):
485
        return isinstance(bname,
486
                          QualifiedName) and self.Name == bname.Name and self.NamespaceIndex == bname.NamespaceIndex
487
488
    def __ne__(self, other):
489
        return not self.__eq__(other)
490
491
    def __lt__(self, other):
492
        if not isinstance(other, QualifiedName):
493
            raise TypeError(f"Cannot compare QualifiedName and {other}")
494
        if self.NamespaceIndex == other.NamespaceIndex:
495
            return self.Name < other.Name
496
        else:
497
            return self.NamespaceIndex < other.NamespaceIndex
498
499
    def __str__(self):
500
        return f'QualifiedName({self.NamespaceIndex}:{self.Name})'
501
502
    __repr__ = __str__
503
504
505
class LocalizedText(FrozenClass):
506
    """
507
    A string qualified with a namespace index.
508
    """
509
510
    ua_switches = {
511
        'Locale': ('Encoding', 0),
512
        'Text': ('Encoding', 1),
513
    }
514
515
    ua_types = (
516
        ('Encoding', 'Byte'),
517
        ('Locale', 'String'),
518
        ('Text', 'String'),
519
    )
520
521
    def __init__(self, text=None, locale=None):
522
        self.Encoding = 0
523
        self._text = None
524
        self._locale = None
525
        if text:
526
            self.Text = text
527
        if locale:
528
            self.Locale = locale
529
        self._freeze = True
530
531
    @property
532
    def Text(self):
533
        return self._text
534
535
    @property
536
    def Locale(self):
537
        return self._locale
538
539
    @Text.setter
540
    def Text(self, text):
541
        if not isinstance(text, str):
542
            raise ValueError("A LocalizedText object takes a string as argument \"text\", not a {}, {}".format(type(text), text))
543
        self._text = text
544
        if self._text:
545
            self.Encoding |= (1 << 1)
546
547
    @Locale.setter
548
    def Locale(self, locale):
549
        if not isinstance(locale, str):
550
            raise ValueError("A LocalizedText object takes a string as argument \"locale\", not a {}, {}".format(type(locale), locale))
551
        self._locale = locale
552
        if self._locale:
553
            self.Encoding |= (1)
554
555
    def to_string(self):
556
        if self.Text is None:
557
            return ""
558
        if self.Locale is None:
559
            return self.Text
560
        return self.__str__()
561
562
    @staticmethod
563
    def from_string(string):
564
        m = re.match("^LocalizedText\(Encoding:(.*), Locale:(.*), Text:(.*)\)$", string)
565
        if m:
566
            text = m.group(3) if m.group(3)!=str(None) else None
567
            locale = m.group(2) if m.group(2)!=str(None) else None
568
            return LocalizedText(text=text,locale=locale)            
569
        else:
570
            return LocalizedText(string)
571
572
    def __str__(self):
573
        return f'LocalizedText(Encoding:{self.Encoding}, Locale:{self.Locale}, Text:{self.Text})'
574
575
    __repr__ = __str__
576
577
    def __eq__(self, other):
578
        if isinstance(other, LocalizedText) and self.Locale == other.Locale and self.Text == other.Text:
579
            return True
580
        return False
581
582
    def __ne__(self, other):
583
        return not self.__eq__(other)
584
585
586
class ExtensionObject(FrozenClass):
587
    """
588
    Any UA object packed as an ExtensionObject
589
590
    :ivar TypeId:
591
    :vartype TypeId: NodeId
592
    :ivar Body:
593
    :vartype Body: bytes
594
    """
595
    ua_switches = {
596
        'Body': ('Encoding', 0),
597
    }
598
599
    ua_types = (
600
        ("TypeId", "NodeId"),
601
        ("Encoding", "Byte"),
602
        ("Body", "ByteString"),
603
    )
604
605
    def __init__(self):
606
        self.TypeId = NodeId()
607
        self.Encoding = 0
608
        self.Body = None
609
        self._freeze = True
610
611
    def __bool__(self):
612
        return self.Body is not None
613
614
    __nonzero__ = __bool__  # Python2 compatibilty
615
616
    def __str__(self):
617
        size = len(self.Body) if self.Body is not None else None
618
        return f'ExtensionObject(TypeId:{self.TypeId}, Encoding:{self.Encoding}, {size} bytes)'
619
620
    __repr__ = __str__
621
622
623
class VariantType(Enum):
624
    """
625
    The possible types of a variant.
626
627
    :ivar Null:
628
    :ivar Boolean:
629
    :ivar SByte:
630
    :ivar Byte:
631
    :ivar Int16:
632
    :ivar UInt16:
633
    :ivar Int32:
634
    :ivar UInt32:
635
    :ivar Int64:
636
    :ivar UInt64:
637
    :ivar Float:
638
    :ivar Double:
639
    :ivar String:
640
    :ivar DateTime:
641
    :ivar Guid:
642
    :ivar ByteString:
643
    :ivar XmlElement:
644
    :ivar NodeId:
645
    :ivar ExpandedNodeId:
646
    :ivar StatusCode:
647
    :ivar QualifiedName:
648
    :ivar LocalizedText:
649
    :ivar ExtensionObject:
650
    :ivar DataValue:
651
    :ivar Variant:
652
    :ivar DiagnosticInfo:
653
    """
654
655
    Null = 0
656
    Boolean = 1
657
    SByte = 2
658
    Byte = 3
659
    Int16 = 4
660
    UInt16 = 5
661
    Int32 = 6
662
    UInt32 = 7
663
    Int64 = 8
664
    UInt64 = 9
665
    Float = 10
666
    Double = 11
667
    String = 12
668
    DateTime = 13
669
    Guid = 14
670
    ByteString = 15
671
    XmlElement = 16
672
    NodeId = 17
673
    ExpandedNodeId = 18
674
    StatusCode = 19
675
    QualifiedName = 20
676
    LocalizedText = 21
677
    ExtensionObject = 22
678
    DataValue = 23
679
    Variant = 24
680
    DiagnosticInfo = 25
681
682
683
class VariantTypeCustom(object):
684
    """
685
    Looks like sometime we get variant with other values than those
686
    defined in VariantType.
687
    FIXME: We should not need this class, as far as I iunderstand the spec
688
    variants can only be of VariantType
689
    """
690
691
    def __init__(self, val):
692
        self.name = "Custom"
693
        self.value = val
694
        if self.value > 0b00111111:
695
            raise UaError(f"Cannot create VariantType. VariantType must be {0b111111} > x > {25}, received {val}")
696
697
    def __str__(self):
698
        return f"VariantType.Custom:{self.value}"
699
700
    __repr__ = __str__
701
702
    def __eq__(self, other):
703
        return self.value == other.value
704
705
706
class Variant(FrozenClass):
707
    """
708
    Create an OPC-UA Variant object.
709
    if no argument a Null Variant is created.
710
    if not variant type is given, attemps to guess type from python type
711
    if a variant is given as value, the new objects becomes a copy of the argument
712
713
    :ivar Value:
714
    :vartype Value: Any supported type
715
    :ivar VariantType:
716
    :vartype VariantType: VariantType
717
    :ivar Dimension:
718
    :vartype Dimensions: The length of each dimensions. Usually guessed from value.
719
    :ivar is_array:
720
    :vartype is_array: If the variant is an array. Usually guessed from value.
721
    """
722
723
    def __init__(self, value=None, varianttype=None, dimensions=None, is_array=None):
724
        self.Value = value
725
        self.VariantType = varianttype
726
        self.Dimensions = dimensions
727
        self.is_array = is_array
728
        if self.is_array is None:
729
            if isinstance(value, (list, tuple)):
730
                self.is_array = True
731
            else:
732
                self.is_array = False
733
        self._freeze = True
734
        if isinstance(value, Variant):
735
            self.Value = value.Value
736
            self.VariantType = value.VariantType
737
        if self.VariantType is None:
738
            self.VariantType = self._guess_type(self.Value)
739
        if self.Value is None and not self.is_array and self.VariantType not in (VariantType.Null, VariantType.String,
740
                                                                                 VariantType.DateTime):
741
            raise UaError(f"Non array Variant of type {self.VariantType} cannot have value None")
742
        if self.Dimensions is None and isinstance(self.Value, (list, tuple)):
743
            dims = get_shape(self.Value)
744
            if len(dims) > 1:
745
                self.Dimensions = dims
746
747
    def __eq__(self, other):
748
        if isinstance(other, Variant) and self.VariantType == other.VariantType and self.Value == other.Value:
749
            return True
750
        return False
751
752
    def __ne__(self, other):
753
        return not self.__eq__(other)
754
755
    def _guess_type(self, val):
756
        if isinstance(val, (list, tuple)):
757
            error_val = val
758
        while isinstance(val, (list, tuple)):
759
            if len(val) == 0:
760
                raise UaError(f"could not guess UA type of variable {error_val}")
761
            val = val[0]
762
        if val is None:
763
            return VariantType.Null
764
        elif isinstance(val, bool):
765
            return VariantType.Boolean
766
        elif isinstance(val, float):
767
            return VariantType.Double
768
        elif isinstance(val, IntEnum):
769
            return VariantType.Int32
770
        elif isinstance(val, int):
771
            return VariantType.Int64
772
        elif isinstance(val, str):
773
            return VariantType.String
774
        elif isinstance(val, bytes):
775
            return VariantType.ByteString
776
        elif isinstance(val, datetime):
777
            return VariantType.DateTime
778
        elif isinstance(val, uuid.UUID):
779
            return VariantType.Guid
780
        else:
781
            if isinstance(val, object):
782
                try:
783
                    return getattr(VariantType, val.__class__.__name__)
784
                except AttributeError:
785
                    return VariantType.ExtensionObject
786
            else:
787
                raise UaError(f"Could not guess UA type of {val} with type {type(val)}, specify UA type")
788
789
    def __str__(self):
790
        return f"Variant(val:{self.Value!s},type:{self.VariantType})"
791
792
    __repr__ = __str__
793
794
795
def _split_list(l, n):
796
    n = max(1, n)
797
    return [l[i:i + n] for i in range(0, len(l), n)]
798
799
800
def flatten_and_get_shape(mylist):
801
    dims = []
802
    dims.append(len(mylist))
803
    while isinstance(mylist[0], (list, tuple)):
804
        dims.append(len(mylist[0]))
805
        mylist = [item for sublist in mylist for item in sublist]
806
        if len(mylist) == 0:
807
            break
808
    return mylist, dims
809
810
811
def flatten(mylist):
812
    if mylist is None:
813
        return None
814
    elif len(mylist) == 0:
815
        return mylist
816
    while isinstance(mylist[0], (list, tuple)):
817
        mylist = [item for sublist in mylist for item in sublist]
818
        if len(mylist) == 0:
819
            break
820
    return mylist
821
822
823
def get_shape(mylist):
824
    dims = []
825
    while isinstance(mylist, (list, tuple)):
826
        dims.append(len(mylist))
827
        if len(mylist) == 0:
828
            break
829
        mylist = mylist[0]
830
    return dims
831
832
833
class DataValue(FrozenClass):
834
    """
835
    A value with an associated timestamp, and quality.
836
    Automatically generated from xml , copied and modified here to fix errors in xml spec
837
838
    :ivar Value:
839
    :vartype Value: Variant
840
    :ivar StatusCode:
841
    :vartype StatusCode: StatusCode
842
    :ivar SourceTimestamp:
843
    :vartype SourceTimestamp: datetime
844
    :ivar SourcePicoSeconds:
845
    :vartype SourcePicoSeconds: int
846
    :ivar ServerTimestamp:
847
    :vartype ServerTimestamp: datetime
848
    :ivar ServerPicoseconds:
849
    :vartype ServerPicoseconds: int
850
    """
851
852
    ua_switches = {
853
        'Value': ('Encoding', 0),
854
        'StatusCode': ('Encoding', 1),
855
        'SourceTimestamp': ('Encoding', 2),
856
        'ServerTimestamp': ('Encoding', 3),
857
        'SourcePicoseconds': ('Encoding', 4),
858
        'ServerPicoseconds': ('Encoding', 5),
859
    }
860
861
    ua_types = (
862
        ('Encoding', 'Byte'),
863
        ('Value', 'Variant'),
864
        ('StatusCode', 'StatusCode'),
865
        ('SourceTimestamp', 'DateTime'),
866
        ('SourcePicoseconds', 'UInt16'),
867
        ('ServerTimestamp', 'DateTime'),
868
        ('ServerPicoseconds', 'UInt16'),
869
    )
870
871
    def __init__(self, variant=None, status=None):
872
        self.Encoding = 0
873
        if not isinstance(variant, Variant):
874
            variant = Variant(variant)
875
        self.Value = variant
876
        if status is None:
877
            self.StatusCode = StatusCode()
878
        else:
879
            self.StatusCode = status
880
        self.SourceTimestamp = None  # DateTime()
881
        self.SourcePicoseconds = None
882
        self.ServerTimestamp = None  # DateTime()
883
        self.ServerPicoseconds = None
884
        self._freeze = True
885
886
    def __str__(self):
887
        s = []
888
        if self.StatusCode is not None:
889
            s.append(f', StatusCode:{self.StatusCode}')
890
        if self.SourceTimestamp is not None:
891
            s.append(f', SourceTimestamp:{self.SourceTimestamp}')
892
        if self.ServerTimestamp is not None:
893
            s.append(f', ServerTimestamp:{self.ServerTimestamp}')
894
        if self.SourcePicoseconds is not None:
895
            s.append(f', SourcePicoseconds:{self.SourcePicoseconds}')
896
        if self.ServerPicoseconds is not None:
897
            s.append(f', ServerPicoseconds:{self.ServerPicoseconds}')
898
        return f'DataValue(Value:{self.Value}{"".join(s)})'
899
900
    __repr__ = __str__
901
902
903
def datatype_to_varianttype(int_type):
904
    """
905
    Takes a NodeId or int and return a VariantType
906
    This is only supported if int_type < 63 due to VariantType encoding
907
    At low level we do not have access to address space thus decoding is limited
908
    a better version of this method can be find in ua_utils.py
909
    """
910
    if isinstance(int_type, NodeId):
911
        int_type = int_type.Identifier
912
913
    if int_type <= 25:
914
        return VariantType(int_type)
915
    else:
916
        return VariantTypeCustom(int_type)
917
918
919
def get_default_value(vtype):
920
    """
921
    Given a variant type return default value for this type
922
    """
923
    if vtype == VariantType.Null:
924
        return None
925
    elif vtype == VariantType.Boolean:
926
        return False
927
    elif vtype in (VariantType.SByte, VariantType.Byte):
928
        return 0
929
    elif vtype == VariantType.ByteString:
930
        return b""
931
    elif 4 <= vtype.value <= 9:
932
        return 0
933
    elif vtype in (VariantType.Float, VariantType.Double):
934
        return 0.0
935
    elif vtype == VariantType.String:
936
        return None  # a string can be null
937
    elif vtype == VariantType.DateTime:
938
        return datetime.utcnow()
939
    elif vtype == VariantType.Guid:
940
        return uuid.uuid4()
941
    elif vtype == VariantType.XmlElement:
942
        return None  #Not sure this is correct
943
    elif vtype == VariantType.NodeId:
944
        return NodeId()
945
    elif vtype == VariantType.ExpandedNodeId:
946
        return NodeId()
947
    elif vtype == VariantType.StatusCode:
948
        return StatusCode()
949
    elif vtype == VariantType.QualifiedName:
950
        return QualifiedName()
951
    elif vtype == VariantType.LocalizedText:
952
        return LocalizedText()
953
    elif vtype == VariantType.ExtensionObject:
954
        return ExtensionObject()
955
    elif vtype == VariantType.DataValue:
956
        return DataValue()
957
    elif vtype == VariantType.Variant:
958
        return Variant()
959
    else:
960
        raise RuntimeError(f"function take a uatype as argument, got: {vtype}")
961
962
963
# These dictionnaries are used to register extensions classes for automatic
964
# decoding and encoding
965
extension_object_classes = {}
966
extension_object_ids = {}
967
968
969
def register_extension_object(name, nodeid, class_type):
970
    """
971
    Register a new extension object for automatic decoding and make them available in ua module
972
    """
973
    logger.info("registring new extension object: %s %s %s", name, nodeid, class_type)
974
    extension_object_classes[nodeid] = class_type
975
    extension_object_ids[name] = nodeid
976
    # FIXME: Next line is not exactly a Python best practices, so feel free to propose something else
977
    # add new extensions objects to ua modules to automate decoding
978
    import asyncua.ua
979
    setattr(asyncua.ua, name, class_type)
980
981
982
def get_extensionobject_class_type(typeid):
983
    """
984
    Returns the registered class type for typid of an extension object
985
    """
986
    if typeid in extension_object_classes:
987
        return extension_object_classes[typeid]
988
    else:
989
        return None
990
991
992
class SecurityPolicyType(Enum):
993
    """
994
    The supported types of SecurityPolicy.
995
996
    "None"
997
    "Basic128Rsa15_Sign"
998
    "Basic128Rsa15_SignAndEncrypt"
999
    "Basic256_Sign"
1000
    "Basic256_SignAndEncrypt"
1001
    "Basic256Sha256_Sign"
1002
    "Basic256Sha256_SignAndEncrypt"
1003
1004
    """
1005
1006
    NoSecurity = 0
1007
    Basic128Rsa15_Sign = 1
1008
    Basic128Rsa15_SignAndEncrypt = 2
1009
    Basic256_Sign = 3
1010
    Basic256_SignAndEncrypt = 4
1011
    Basic256Sha256_Sign = 5
1012
    Basic256Sha256_SignAndEncrypt = 6
1013