Passed
Push — master ( af65d7...b924a2 )
by Olivier
02:29
created

asyncua.ua.uatypes.StatusCode.doc()   A

Complexity

Conditions 1

Size

Total Lines 4
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 4
nop 1
dl 0
loc 4
rs 10
c 0
b 0
f 0
1
"""
2
implement ua datatypes
3
"""
4
5
import logging
6
from enum import Enum, IntEnum
7
from calendar import timegm
8
import os
9
import uuid
10
import re
11
import itertools
12
from datetime import datetime, timedelta, MAXYEAR, tzinfo
13
14
from asyncua.ua import status_codes
15
from .uaerrors import UaError, UaStatusCodeError, UaStringParsingError
16
17
logger = logging.getLogger(__name__)
18
19
EPOCH_AS_FILETIME = 116444736000000000  # January 1, 1970 as MS file time
20
HUNDREDS_OF_NANOSECONDS = 10000000
21
FILETIME_EPOCH_AS_DATETIME = datetime(1601, 1, 1)
22
23
24
class UTC(tzinfo):
25
    """
26
    UTC
27
    """
28
29
    def utcoffset(self, dt):
30
        return timedelta(0)
31
32
    def tzname(self, dt):
33
        return "UTC"
34
35
    def dst(self, dt):
36
        return timedelta(0)
37
38
39
def datetime_to_win_epoch(dt):
40
    """method copied from David Buxton <[email protected]> sample code"""
41
    if (dt.tzinfo is None) or (dt.tzinfo.utcoffset(dt) is None):
42
        dt = dt.replace(tzinfo=UTC())
43
    ft = EPOCH_AS_FILETIME + (timegm(dt.timetuple()) * HUNDREDS_OF_NANOSECONDS)
44
    return ft + (dt.microsecond * 10)
45
46
47
def get_win_epoch():
48
    return win_epoch_to_datetime(0)
49
50
51
def win_epoch_to_datetime(epch):
52
    try:
53
        return FILETIME_EPOCH_AS_DATETIME + timedelta(microseconds=epch // 10)
54
    except OverflowError:
55
        # FILETIMEs after 31 Dec 9999 can't be converted to datetime
56
        logger.warning('datetime overflow: %s', epch)
57
        return datetime(MAXYEAR, 12, 31, 23, 59, 59, 999999)
58
59
60
class _FrozenClass(object):
61
    """
62
    Make it impossible to add members to a class.
63
    Not pythonic at all but we found out it prevents many many
64
    bugs in use of protocol structures
65
    """
66
    _freeze = False
67
68
    def __setattr__(self, key, value):
69
        if self._freeze and not hasattr(self, key):
70
            raise TypeError(
71
                f"Error adding member '{key}' to class '{self.__class__.__name__}', class is frozen, members are {self.__dict__.keys()}"
72
            )
73
        object.__setattr__(self, key, value)
74
75
76
if "PYOPCUA_TYPO_CHECK" in os.environ:
77
    # typo check is cpu consuming, but it will make debug easy.
78
    # set PYOPCUA_TYPO_CHECK will make all uatype classes inherit from _FrozenClass
79
    logger.warning('uaypes typo checking is active')
80
    FrozenClass = _FrozenClass
81
else:
82
    FrozenClass = object
83
84
85
class ValueRank(IntEnum):
86
    """
87
    Defines dimensions of a variable.
88
    This enum does not support all cases since ValueRank support any n>0
89
    but since it is an IntEnum it can be replace by a normal int
90
    """
91
    ScalarOrOneDimension = -3
92
    Any = -2
93
    Scalar = -1
94
    OneOrMoreDimensions = 0
95
    OneDimension = 1
96
    # the next names are not in spec but so common we express them here
97
    TwoDimensions = 2
98
    ThreeDimensions = 3
99
    FourDimensions = 4
100
101
102
class _MaskEnum(IntEnum):
103
    @classmethod
104
    def parse_bitfield(cls, the_int):
105
        """ Take an integer and interpret it as a set of enum values. """
106
        if not isinstance(the_int, int):
107
            raise ValueError(f"Argument should be an int, we received {the_int} fo type {type(the_int)}")
108
        return {cls(b) for b in cls._bits(the_int)}
109
110
    @classmethod
111
    def to_bitfield(cls, collection):
112
        """ Takes some enum values and creates an integer from them. """
113
        # make sure all elements are of the correct type (use itertools.tee in case we get passed an
114
        # iterator)
115
        iter1, iter2 = itertools.tee(iter(collection))
116
        if not all(isinstance(x, cls) for x in iter1):
117
            raise TypeError(f"All elements have to be of type {cls}")
118
        return sum(x.mask for x in iter2)
119
120
    @property
121
    def mask(self):
122
        return 1 << self.value
123
124
    @staticmethod
125
    def _bits(n):
126
        """ Iterate over the bits in n.
127
128
            e.g. bits(44) yields at 2, 3, 5
129
        """
130
        if not n >= 0:  # avoid infinite recursion
131
            raise ValueError()
132
        pos = 0
133
        while n:
134
            if n & 0x1:
135
                yield pos
136
            n = n // 2
137
            pos += 1
138
139
140
class AccessLevel(_MaskEnum):
141
    """
142
    Bit index to indicate what the access level is.
143
144
    Spec Part 3, appears multiple times, e.g. paragraph 5.6.2 Variable NodeClass
145
    """
146
    CurrentRead = 0
147
    CurrentWrite = 1
148
    HistoryRead = 2
149
    HistoryWrite = 3
150
    SemanticChange = 4
151
    StatusWrite = 5
152
    TimestampWrite = 6
153
154
155
class WriteMask(_MaskEnum):
156
    """
157
    Bit index to indicate which attribute of a node is writable
158
159
    Spec Part 3, Paragraph 5.2.7 WriteMask
160
    """
161
    AccessLevel = 0
162
    ArrayDimensions = 1
163
    BrowseName = 2
164
    ContainsNoLoops = 3
165
    DataType = 4
166
    Description = 5
167
    DisplayName = 6
168
    EventNotifier = 7
169
    Executable = 8
170
    Historizing = 9
171
    InverseName = 10
172
    IsAbstract = 11
173
    MinimumSamplingInterval = 12
174
    NodeClass = 13
175
    NodeId = 14
176
    Symmetric = 15
177
    UserAccessLevel = 16
178
    UserExecutable = 17
179
    UserWriteMask = 18
180
    ValueRank = 19
181
    WriteMask = 20
182
    ValueForVariableType = 21
183
184
185
class EventNotifier(_MaskEnum):
186
    """
187
    Bit index to indicate how a node can be used for events.
188
189
    Spec Part 3, appears multiple times, e.g. Paragraph 5.4 View NodeClass
190
    """
191
    SubscribeToEvents = 0
192
    # Reserved        = 1
193
    HistoryRead = 2
194
    HistoryWrite = 3
195
196
197
class StatusCode(FrozenClass):
198
    """
199
    :ivar value:
200
    :vartype value: int
201
    :ivar name:
202
    :vartype name: string
203
    :ivar doc:
204
    :vartype doc: string
205
    """
206
207
    ua_types = [("value", "UInt32")]
208
209
    def __init__(self, value=0):
210
        if isinstance(value, str):
211
            self.value = getattr(status_codes.StatusCodes, value)
212
        else:
213
            self.value = value
214
        self._freeze = True
215
216
    def check(self):
217
        """
218
        Raises an exception if the status code is anything else than 0 (good).
219
220
        Use the is_good() method if you do not want an exception.
221
        """
222
        if not self.is_good():
223
            raise UaStatusCodeError(self.value)
224
225
    def is_good(self):
226
        """
227
        return True if status is Good.
228
        """
229
        mask = 3 << 30
230
        if mask & self.value:
231
            return False
232
        else:
233
            return True
234
235
    @property
236
    def name(self):
237
        name, _ = status_codes.get_name_and_doc(self.value)
238
        return name
239
240
    @property
241
    def doc(self):
242
        _, doc = status_codes.get_name_and_doc(self.value)
243
        return doc
244
245
    def __str__(self):
246
        return f'StatusCode({self.name})'
247
248
    __repr__ = __str__
249
250
    def __eq__(self, other):
251
        return self.value == other.value
252
253
    def __ne__(self, other):
254
        return not self.__eq__(other)
255
256
257
class NodeIdType(IntEnum):
258
    TwoByte = 0
259
    FourByte = 1
260
    Numeric = 2
261
    String = 3
262
    Guid = 4
263
    ByteString = 5
264
265
266
class NodeId(object):
267
    """
268
    NodeId Object
269
270
    Args:
271
        identifier: The identifier might be an int, a string, bytes or a Guid
272
        namespaceidx(int): The index of the namespace
273
        nodeidtype(NodeIdType): The type of the nodeid if it cannor be guess or you want something special like twobyte nodeid or fourbytenodeid
274
275
276
    :ivar Identifier:
277
    :vartype Identifier: NodeId
278
    :ivar NamespaceIndex:
279
    :vartype NamespaceIndex: Int
280
    :ivar NamespaceUri:
281
    :vartype NamespaceUri: String
282
    :ivar ServerIndex:
283
    :vartype ServerIndex: Int
284
    """
285
286
    def __init__(self, identifier=None, namespaceidx=0, nodeidtype=None):
287
288
        self.Identifier = identifier
289
        self.NamespaceIndex = namespaceidx
290
        self.NodeIdType = nodeidtype
291
        self.NamespaceUri = ""
292
        self.ServerIndex = 0
293
        self._freeze = True
294
        if self.Identifier is None:
295
            self.Identifier = 0
296
            self.NodeIdType = NodeIdType.TwoByte
297
            return
298
        if self.NodeIdType is None:
299
            if isinstance(self.Identifier, int):
300
                self.NodeIdType = NodeIdType.Numeric
301
            elif isinstance(self.Identifier, str):
302
                self.NodeIdType = NodeIdType.String
303
            elif isinstance(self.Identifier, bytes):
304
                self.NodeIdType = NodeIdType.ByteString
305
            elif isinstance(self.Identifier, uuid.UUID):
306
                self.NodeIdType = NodeIdType.Guid
307
            else:
308
                raise UaError("NodeId: Could not guess type of NodeId, set NodeIdType")
309
310
    def __eq__(self, node):
311
        return isinstance(node,
312
                          NodeId) and self.NamespaceIndex == node.NamespaceIndex and self.Identifier == node.Identifier
313
314
    def __ne__(self, other):
315
        return not self.__eq__(other)
316
317
    def __hash__(self):
318
        return hash((self.NamespaceIndex, self.Identifier))
319
320
    def __lt__(self, other):
321
        if not isinstance(other, NodeId):
322
            raise AttributeError("Can only compare to NodeId")
323
        return (self.NodeIdType, self.NamespaceIndex, self.Identifier) < (other.NodeIdType, other.NamespaceIndex,
324
                                                                          other.Identifier)
325
326
    def is_null(self):
327
        if self.NamespaceIndex != 0:
328
            return False
329
        return self.has_null_identifier()
330
331
    def has_null_identifier(self):
332
        if not self.Identifier:
333
            return True
334
        if self.NodeIdType == NodeIdType.Guid and re.match(b'0.', self.Identifier):
335
            return True
336
        return False
337
338
    @staticmethod
339
    def from_string(string):
340
        try:
341
            return NodeId._from_string(string)
342
        except ValueError as ex:
343
            raise UaStringParsingError(f"Error parsing string {string}", ex)
344
345
    @staticmethod
346
    def _from_string(string):
347
        l = string.split(";")
348
        identifier = None
349
        namespace = 0
350
        ntype = None
351
        srv = None
352
        nsu = None
353
        for el in l:
354
            if not el:
355
                continue
356
            k, v = el.split("=", 1)
357
            k = k.strip()
358
            v = v.strip()
359
            if k == "ns":
360
                namespace = int(v)
361
            elif k == "i":
362
                ntype = NodeIdType.Numeric
363
                identifier = int(v)
364
            elif k == "s":
365
                ntype = NodeIdType.String
366
                identifier = v
367
            elif k == "g":
368
                ntype = NodeIdType.Guid
369
                identifier = uuid.UUID(f"urn:uuid:{v}")
370
            elif k == "b":
371
                ntype = NodeIdType.ByteString
372
                identifier = v
373
            elif k == "srv":
374
                srv = v
375
            elif k == "nsu":
376
                nsu = v
377
        if identifier is None:
378
            raise UaStringParsingError(f"Could not find identifier in string: {string}")
379
        nodeid = NodeId(identifier, namespace, ntype)
380
        nodeid.NamespaceUri = nsu
381
        nodeid.ServerIndex = srv
382
        return nodeid
383
384
    def to_string(self):
385
        string = []
386
        if self.NamespaceIndex != 0:
387
            string.append("ns={0}".format(self.NamespaceIndex))
388
        ntype = None
389
        if self.NodeIdType == NodeIdType.Numeric:
390
            ntype = "i"
391
        elif self.NodeIdType == NodeIdType.String:
392
            ntype = "s"
393
        elif self.NodeIdType == NodeIdType.TwoByte:
394
            ntype = "i"
395
        elif self.NodeIdType == NodeIdType.FourByte:
396
            ntype = "i"
397
        elif self.NodeIdType == NodeIdType.Guid:
398
            ntype = "g"
399
        elif self.NodeIdType == NodeIdType.ByteString:
400
            ntype = "b"
401
        string.append("{0}={1}".format(ntype, self.Identifier))
402
        if self.ServerIndex:
403
            string.append("srv={}".format(self.ServerIndex))
404
        if self.NamespaceUri:
405
            string.append("nsu={0}".format(self.NamespaceUri))
406
        return ";".join(string)
407
408
    def __str__(self):
409
        return f"{self.NodeIdType.name}NodeId({self.to_string()})"
410
411
    __repr__ = __str__
412
413
    def to_binary(self):
414
        import asyncua
415
        return asyncua.ua.ua_binary.nodeid_to_binary(self)
416
417
418
class TwoByteNodeId(NodeId):
419
    def __init__(self, identifier):
420
        NodeId.__init__(self, identifier, 0, NodeIdType.TwoByte)
421
422
423
class FourByteNodeId(NodeId):
424
    def __init__(self, identifier, namespace=0):
425
        NodeId.__init__(self, identifier, namespace, NodeIdType.FourByte)
426
427
428
class NumericNodeId(NodeId):
429
    def __init__(self, identifier, namespace=0):
430
        NodeId.__init__(self, identifier, namespace, NodeIdType.Numeric)
431
432
433
class ByteStringNodeId(NodeId):
434
    def __init__(self, identifier, namespace=0):
435
        NodeId.__init__(self, identifier, namespace, NodeIdType.ByteString)
436
437
438
class GuidNodeId(NodeId):
439
    def __init__(self, identifier, namespace=0):
440
        NodeId.__init__(self, identifier, namespace, NodeIdType.Guid)
441
442
443
class StringNodeId(NodeId):
444
    def __init__(self, identifier, namespace=0):
445
        NodeId.__init__(self, identifier, namespace, NodeIdType.String)
446
447
448
ExpandedNodeId = NodeId
449
450
451
class QualifiedName(FrozenClass):
452
    """
453
    A string qualified with a namespace index.
454
    """
455
456
    ua_types = [
457
        ('NamespaceIndex', 'UInt16'),
458
        ('Name', 'String'),
459
    ]
460
461
    def __init__(self, name=None, namespaceidx=0):
462
        if not isinstance(namespaceidx, int):
463
            raise UaError(f"namespaceidx must be an int not {namespaceidx} of type {type(namespaceidx)}")
464
        self.NamespaceIndex = namespaceidx
465
        self.Name = name
466
        self._freeze = True
467
468
    def to_string(self):
469
        return "{0}:{1}".format(self.NamespaceIndex, self.Name)
470
471
    @staticmethod
472
    def from_string(string):
473
        if ":" in string:
474
            try:
475
                idx, name = string.split(":", 1)
476
                idx = int(idx)
477
            except (TypeError, ValueError) as ex:
478
                raise UaStringParsingError(f"Error parsing string {string}", ex)
479
        else:
480
            idx = 0
481
            name = string
482
        return QualifiedName(name, idx)
483
484
    def __eq__(self, bname):
485
        return isinstance(bname,
486
                          QualifiedName) and self.Name == bname.Name and self.NamespaceIndex == bname.NamespaceIndex
487
488
    def __ne__(self, other):
489
        return not self.__eq__(other)
490
491
    def __lt__(self, other):
492
        if not isinstance(other, QualifiedName):
493
            raise TypeError(f"Cannot compare QualifiedName and {other}")
494
        if self.NamespaceIndex == other.NamespaceIndex:
495
            return self.Name < other.Name
496
        else:
497
            return self.NamespaceIndex < other.NamespaceIndex
498
499
    def __str__(self):
500
        return f'QualifiedName({self.NamespaceIndex}:{self.Name})'
501
502
    __repr__ = __str__
503
504
505
class LocalizedText(FrozenClass):
506
    """
507
    A string qualified with a namespace index.
508
    """
509
510
    ua_switches = {
511
        'Locale': ('Encoding', 0),
512
        'Text': ('Encoding', 1),
513
    }
514
515
    ua_types = (
516
        ('Encoding', 'Byte'),
517
        ('Locale', 'String'),
518
        ('Text', 'String'),
519
    )
520
521
    def __init__(self, text=None):
522
        self.Encoding = 0
523
        self._text = None
524
        if text:
525
            self.Text = text
526
        self.Locale = None
527
        self._freeze = True
528
529
    @property
530
    def Text(self):
531
        return self._text
532
533
    @Text.setter
534
    def Text(self, text):
535
        if not isinstance(text, str):
536
            raise ValueError("A LocalizedText object takes a string as argument, not a {}, {}".format(type(text), text))
537
        self._text = text
538
        if self._text:
539
            self.Encoding |= (1 << 1)
540
541
    def to_string(self):
542
        # FIXME: use local
543
        if self.Text is None:
544
            return ""
545
        return self.Text
546
547
    def __str__(self):
548
        return f'LocalizedText(Encoding:{self.Encoding}, Locale:{self.Locale}, Text:{self.Text})'
549
550
    __repr__ = __str__
551
552
    def __eq__(self, other):
553
        if isinstance(other, LocalizedText) and self.Locale == other.Locale and self.Text == other.Text:
554
            return True
555
        return False
556
557
    def __ne__(self, other):
558
        return not self.__eq__(other)
559
560
561
class ExtensionObject(FrozenClass):
562
    """
563
    Any UA object packed as an ExtensionObject
564
565
    :ivar TypeId:
566
    :vartype TypeId: NodeId
567
    :ivar Body:
568
    :vartype Body: bytes
569
    """
570
    ua_switches = {
571
        'Body': ('Encoding', 0),
572
    }
573
574
    ua_types = (
575
        ("TypeId", "NodeId"),
576
        ("Encoding", "Byte"),
577
        ("Body", "ByteString"),
578
    )
579
580
    def __init__(self):
581
        self.TypeId = NodeId()
582
        self.Encoding = 0
583
        self.Body = None
584
        self._freeze = True
585
586
    def __bool__(self):
587
        return self.Body is not None
588
589
    __nonzero__ = __bool__  # Python2 compatibilty
590
591
    def __str__(self):
592
        size = len(self.Body) if self.Body is not None else None
593
        return f'ExtensionObject(TypeId:{self.TypeId}, Encoding:{self.Encoding}, {size} bytes)'
594
595
    __repr__ = __str__
596
597
598
class VariantType(Enum):
599
    """
600
    The possible types of a variant.
601
602
    :ivar Null:
603
    :ivar Boolean:
604
    :ivar SByte:
605
    :ivar Byte:
606
    :ivar Int16:
607
    :ivar UInt16:
608
    :ivar Int32:
609
    :ivar UInt32:
610
    :ivar Int64:
611
    :ivar UInt64:
612
    :ivar Float:
613
    :ivar Double:
614
    :ivar String:
615
    :ivar DateTime:
616
    :ivar Guid:
617
    :ivar ByteString:
618
    :ivar XmlElement:
619
    :ivar NodeId:
620
    :ivar ExpandedNodeId:
621
    :ivar StatusCode:
622
    :ivar QualifiedName:
623
    :ivar LocalizedText:
624
    :ivar ExtensionObject:
625
    :ivar DataValue:
626
    :ivar Variant:
627
    :ivar DiagnosticInfo:
628
    """
629
630
    Null = 0
631
    Boolean = 1
632
    SByte = 2
633
    Byte = 3
634
    Int16 = 4
635
    UInt16 = 5
636
    Int32 = 6
637
    UInt32 = 7
638
    Int64 = 8
639
    UInt64 = 9
640
    Float = 10
641
    Double = 11
642
    String = 12
643
    DateTime = 13
644
    Guid = 14
645
    ByteString = 15
646
    XmlElement = 16
647
    NodeId = 17
648
    ExpandedNodeId = 18
649
    StatusCode = 19
650
    QualifiedName = 20
651
    LocalizedText = 21
652
    ExtensionObject = 22
653
    DataValue = 23
654
    Variant = 24
655
    DiagnosticInfo = 25
656
657
658
class VariantTypeCustom(object):
659
    """
660
    Looks like sometime we get variant with other values than those
661
    defined in VariantType.
662
    FIXME: We should not need this class, as far as I iunderstand the spec
663
    variants can only be of VariantType
664
    """
665
666
    def __init__(self, val):
667
        self.name = "Custom"
668
        self.value = val
669
        if self.value > 0b00111111:
670
            raise UaError(f"Cannot create VariantType. VariantType must be {0b111111} > x > {25}, received {val}")
671
672
    def __str__(self):
673
        return f"VariantType.Custom:{self.value}"
674
675
    __repr__ = __str__
676
677
    def __eq__(self, other):
678
        return self.value == other.value
679
680
681
class Variant(FrozenClass):
682
    """
683
    Create an OPC-UA Variant object.
684
    if no argument a Null Variant is created.
685
    if not variant type is given, attemps to guess type from python type
686
    if a variant is given as value, the new objects becomes a copy of the argument
687
688
    :ivar Value:
689
    :vartype Value: Any supported type
690
    :ivar VariantType:
691
    :vartype VariantType: VariantType
692
    :ivar Dimension:
693
    :vartype Dimensions: The length of each dimensions. Usually guessed from value.
694
    :ivar is_array:
695
    :vartype is_array: If the variant is an array. Usually guessed from value.
696
    """
697
698
    def __init__(self, value=None, varianttype=None, dimensions=None, is_array=None):
699
        self.Value = value
700
        self.VariantType = varianttype
701
        self.Dimensions = dimensions
702
        self.is_array = is_array
703
        if self.is_array is None:
704
            if isinstance(value, (list, tuple)):
705
                self.is_array = True
706
            else:
707
                self.is_array = False
708
        self._freeze = True
709
        if isinstance(value, Variant):
710
            self.Value = value.Value
711
            self.VariantType = value.VariantType
712
        if self.VariantType is None:
713
            self.VariantType = self._guess_type(self.Value)
714
        if self.Value is None and not self.is_array and self.VariantType not in (VariantType.Null, VariantType.String,
715
                                                                                 VariantType.DateTime):
716
            raise UaError(f"Non array Variant of type {self.VariantType} cannot have value None")
717
        if self.Dimensions is None and isinstance(self.Value, (list, tuple)):
718
            dims = get_shape(self.Value)
719
            if len(dims) > 1:
720
                self.Dimensions = dims
721
722
    def __eq__(self, other):
723
        if isinstance(other, Variant) and self.VariantType == other.VariantType and self.Value == other.Value:
724
            return True
725
        return False
726
727
    def __ne__(self, other):
728
        return not self.__eq__(other)
729
730
    def _guess_type(self, val):
731
        if isinstance(val, (list, tuple)):
732
            error_val = val
733
        while isinstance(val, (list, tuple)):
734
            if len(val) == 0:
735
                raise UaError(f"could not guess UA type of variable {error_val}")
736
            val = val[0]
737
        if val is None:
738
            return VariantType.Null
739
        elif isinstance(val, bool):
740
            return VariantType.Boolean
741
        elif isinstance(val, float):
742
            return VariantType.Double
743
        elif isinstance(val, IntEnum):
744
            return VariantType.Int32
745
        elif isinstance(val, int):
746
            return VariantType.Int64
747
        elif isinstance(val, str):
748
            return VariantType.String
749
        elif isinstance(val, bytes):
750
            return VariantType.ByteString
751
        elif isinstance(val, datetime):
752
            return VariantType.DateTime
753
        elif isinstance(val, uuid.UUID):
754
            return VariantType.Guid
755
        else:
756
            if isinstance(val, object):
757
                try:
758
                    return getattr(VariantType, val.__class__.__name__)
759
                except AttributeError:
760
                    return VariantType.ExtensionObject
761
            else:
762
                raise UaError(f"Could not guess UA type of {val} with type {type(val)}, specify UA type")
763
764
    def __str__(self):
765
        return f"Variant(val:{self.Value!s},type:{self.VariantType})"
766
767
    __repr__ = __str__
768
769
770
def _split_list(l, n):
771
    n = max(1, n)
772
    return [l[i:i + n] for i in range(0, len(l), n)]
773
774
775
def flatten_and_get_shape(mylist):
776
    dims = []
777
    dims.append(len(mylist))
778
    while isinstance(mylist[0], (list, tuple)):
779
        dims.append(len(mylist[0]))
780
        mylist = [item for sublist in mylist for item in sublist]
781
        if len(mylist) == 0:
782
            break
783
    return mylist, dims
784
785
786
def flatten(mylist):
787
    if mylist is None:
788
        return None
789
    elif len(mylist) == 0:
790
        return mylist
791
    while isinstance(mylist[0], (list, tuple)):
792
        mylist = [item for sublist in mylist for item in sublist]
793
        if len(mylist) == 0:
794
            break
795
    return mylist
796
797
798
def get_shape(mylist):
799
    dims = []
800
    while isinstance(mylist, (list, tuple)):
801
        dims.append(len(mylist))
802
        if len(mylist) == 0:
803
            break
804
        mylist = mylist[0]
805
    return dims
806
807
808
class DataValue(FrozenClass):
809
    """
810
    A value with an associated timestamp, and quality.
811
    Automatically generated from xml , copied and modified here to fix errors in xml spec
812
813
    :ivar Value:
814
    :vartype Value: Variant
815
    :ivar StatusCode:
816
    :vartype StatusCode: StatusCode
817
    :ivar SourceTimestamp:
818
    :vartype SourceTimestamp: datetime
819
    :ivar SourcePicoSeconds:
820
    :vartype SourcePicoSeconds: int
821
    :ivar ServerTimestamp:
822
    :vartype ServerTimestamp: datetime
823
    :ivar ServerPicoseconds:
824
    :vartype ServerPicoseconds: int
825
    """
826
827
    ua_switches = {
828
        'Value': ('Encoding', 0),
829
        'StatusCode': ('Encoding', 1),
830
        'SourceTimestamp': ('Encoding', 2),
831
        'ServerTimestamp': ('Encoding', 3),
832
        'SourcePicoseconds': ('Encoding', 4),
833
        'ServerPicoseconds': ('Encoding', 5),
834
    }
835
836
    ua_types = (
837
        ('Encoding', 'Byte'),
838
        ('Value', 'Variant'),
839
        ('StatusCode', 'StatusCode'),
840
        ('SourceTimestamp', 'DateTime'),
841
        ('SourcePicoseconds', 'UInt16'),
842
        ('ServerTimestamp', 'DateTime'),
843
        ('ServerPicoseconds', 'UInt16'),
844
    )
845
846
    def __init__(self, variant=None, status=None):
847
        self.Encoding = 0
848
        if not isinstance(variant, Variant):
849
            variant = Variant(variant)
850
        self.Value = variant
851
        if status is None:
852
            self.StatusCode = StatusCode()
853
        else:
854
            self.StatusCode = status
855
        self.SourceTimestamp = None  # DateTime()
856
        self.SourcePicoseconds = None
857
        self.ServerTimestamp = None  # DateTime()
858
        self.ServerPicoseconds = None
859
        self._freeze = True
860
861
    def __str__(self):
862
        s = []
863
        if self.StatusCode is not None:
864
            s.append(f', StatusCode:{self.StatusCode}')
865
        if self.SourceTimestamp is not None:
866
            s.append(f', SourceTimestamp:{self.SourceTimestamp}')
867
        if self.ServerTimestamp is not None:
868
            s.append(f', ServerTimestamp:{self.ServerTimestamp}')
869
        if self.SourcePicoseconds is not None:
870
            s.append(f', SourcePicoseconds:{self.SourcePicoseconds}')
871
        if self.ServerPicoseconds is not None:
872
            s.append(f', ServerPicoseconds:{self.ServerPicoseconds}')
873
        return f'DataValue(Value:{self.Value}{"".join(s)})'
874
875
    __repr__ = __str__
876
877
878
def datatype_to_varianttype(int_type):
879
    """
880
    Takes a NodeId or int and return a VariantType
881
    This is only supported if int_type < 63 due to VariantType encoding
882
    At low level we do not have access to address space thus decoding is limited
883
    a better version of this method can be find in ua_utils.py
884
    """
885
    if isinstance(int_type, NodeId):
886
        int_type = int_type.Identifier
887
888
    if int_type <= 25:
889
        return VariantType(int_type)
890
    else:
891
        return VariantTypeCustom(int_type)
892
893
894
def get_default_value(vtype):
895
    """
896
    Given a variant type return default value for this type
897
    """
898
    if vtype == VariantType.Null:
899
        return None
900
    elif vtype == VariantType.Boolean:
901
        return False
902
    elif vtype in (VariantType.SByte, VariantType.Byte):
903
        return 0
904
    elif vtype == VariantType.ByteString:
905
        return b""
906
    elif 4 <= vtype.value <= 9:
907
        return 0
908
    elif vtype in (VariantType.Float, VariantType.Double):
909
        return 0.0
910
    elif vtype == VariantType.String:
911
        return None  # a string can be null
912
    elif vtype == VariantType.DateTime:
913
        return datetime.utcnow()
914
    elif vtype == VariantType.Guid:
915
        return uuid.uuid4()
916
    elif vtype == VariantType.XmlElement:
917
        return None  #Not sure this is correct
918
    elif vtype == VariantType.NodeId:
919
        return NodeId()
920
    elif vtype == VariantType.ExpandedNodeId:
921
        return NodeId()
922
    elif vtype == VariantType.StatusCode:
923
        return StatusCode()
924
    elif vtype == VariantType.QualifiedName:
925
        return QualifiedName()
926
    elif vtype == VariantType.LocalizedText:
927
        return LocalizedText()
928
    elif vtype == VariantType.ExtensionObject:
929
        return ExtensionObject()
930
    elif vtype == VariantType.DataValue:
931
        return DataValue()
932
    elif vtype == VariantType.Variant:
933
        return Variant()
934
    else:
935
        raise RuntimeError(f"function take a uatype as argument, got: {vtype}")
936
937
938
# These dictionnaries are used to register extensions classes for automatic
939
# decoding and encoding
940
extension_object_classes = {}
941
extension_object_ids = {}
942
943
944
def register_extension_object(name, nodeid, class_type):
945
    """
946
    Register a new extension object for automatic decoding and make them available in ua module
947
    """
948
    logger.info("registring new extension object: %s %s %s", name, nodeid, class_type)
949
    extension_object_classes[nodeid] = class_type
950
    extension_object_ids[name] = nodeid
951
    # FIXME: Next line is not exactly a Python best practices, so feel free to propose something else
952
    # add new extensions objects to ua modules to automate decoding
953
    import asyncua.ua
954
    setattr(asyncua.ua, name, class_type)
955
956
957
def get_extensionobject_class_type(typeid):
958
    """
959
    Returns the registered class type for typid of an extension object
960
    """
961
    if typeid in extension_object_classes:
962
        return extension_object_classes[typeid]
963
    else:
964
        return None
965
966
967
class SecurityPolicyType(Enum):
968
    """
969
    The supported types of SecurityPolicy.
970
971
    "None"
972
    "Basic128Rsa15_Sign"
973
    "Basic128Rsa15_SignAndEncrypt"
974
    "Basic256_Sign"
975
    "Basic256_SignAndEncrypt"
976
    "Basic256Sha256_Sign"
977
    "Basic256Sha256_SignAndEncrypt"
978
979
    """
980
981
    NoSecurity = 0
982
    Basic128Rsa15_Sign = 1
983
    Basic128Rsa15_SignAndEncrypt = 2
984
    Basic256_Sign = 3
985
    Basic256_SignAndEncrypt = 4
986
    Basic256Sha256_Sign = 5
987
    Basic256Sha256_SignAndEncrypt = 6
988