Test Failed
Pull Request — master (#490)
by Olivier
06:42
created

NodeId.from_string()   A

Complexity

Conditions 2

Size

Total Lines 6

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 6

Importance

Changes 0
Metric Value
cc 2
c 0
b 0
f 0
dl 0
loc 6
ccs 0
cts 5
cp 0
crap 6
rs 9.4285
1
"""
2
implement ua datatypes
3
"""
4
5
import logging
6
from enum import Enum, IntEnum
7
from datetime import datetime
8
import sys
9
import os
10
import uuid
11
import re
12
import itertools
13
14
from opcua.ua import ua_binary as uabin
15
from opcua.ua import status_codes
16
from opcua.ua import ObjectIds
17
from opcua.ua.uaerrors import UaError
18
from opcua.ua.uaerrors import UaStatusCodeError
19
from opcua.ua.uaerrors import UaStringParsingError
20
21
logger = logging.getLogger(__name__)
22
23
if sys.version_info.major > 2:
24
    unicode = str
25
26
27
def get_win_epoch():
28
    return uabin.win_epoch_to_datetime(0)
29
30
31
class _FrozenClass(object):
32
    """
33
    Make it impossible to add members to a class.
34
    Not pythonic at all but we found out it prevents many many
35
    bugs in use of protocol structures
36
    """
37
    _freeze = False
38
39
    def __setattr__(self, key, value):
40
        if self._freeze and not hasattr(self, key):
41
            raise TypeError("Error adding member '{0}' to class '{1}', class is frozen, members are {2}".format(
42
                key, self.__class__.__name__, self.__dict__.keys()))
43
        object.__setattr__(self, key, value)
44
45
46
if "PYOPCUA_NO_TYPO_CHECK" in os.environ:
47
    # typo check is cpu consuming, but it will make debug easy.
48
    # if typo check is not need (in production), please set env PYOPCUA_NO_TYPO_CHECK.
49
    # this will make all uatype class inherit from object intead of _FrozenClass
50
    # and skip the typo check.
51
    FrozenClass = object
52
else:
53
    FrozenClass = _FrozenClass
54
55
56
class ValueRank(IntEnum):
57
    """
58
    Defines dimensions of a variable.
59
    This enum does not support all cases since ValueRank support any n>0
60
    but since it is an IntEnum it can be replace by a normal int
61
    """
62
    ScalarOrOneDimension = -3
63
    Any = -2
64
    Scalar = -1
65
    OneOrMoreDimensions = 0
66
    OneDimension = 1
67
    # the next names are not in spec but so common we express them here
68
    TwoDimensions = 2
69
    ThreeDimensions = 3
70
    FourDimensions = 4
71
72
73
class _MaskEnum(IntEnum):
74
    @classmethod
75
    def parse_bitfield(cls, the_int):
76
        """ Take an integer and interpret it as a set of enum values. """
77
        assert isinstance(the_int, int)
78
79
        return {cls(b) for b in cls._bits(the_int)}
80
81
    @classmethod
82
    def to_bitfield(cls, collection):
83
        """ Takes some enum values and creates an integer from them. """
84
        # make sure all elements are of the correct type (use itertools.tee in case we get passed an
85
        # iterator)
86
        iter1, iter2 = itertools.tee(iter(collection))
87
        assert all(isinstance(x, cls) for x in iter1)
88
89
        return sum(x.mask for x in iter2)
90
91
    @property
92
    def mask(self):
93
        return 1 << self.value
94
95
    @staticmethod
96
    def _bits(n):
97
        """ Iterate over the bits in n.
98
99
            e.g. bits(44) yields at 2, 3, 5
100
        """
101
        assert n >= 0  # avoid infinite recursion
102
103
        pos = 0
104
        while n:
105
            if n & 0x1:
106
                yield pos
107
            n = n // 2
108
            pos += 1
109
110
111
class AccessLevel(_MaskEnum):
112
    """
113
    Bit index to indicate what the access level is.
114
115
    Spec Part 3, appears multiple times, e.g. paragraph 5.6.2 Variable NodeClass
116
    """
117
    CurrentRead = 0
118
    CurrentWrite = 1
119
    HistoryRead = 2
120
    HistoryWrite = 3
121
    SemanticChange = 4
122
    StatusWrite = 5
123
    TimestampWrite = 6
124
125
126
class WriteMask(_MaskEnum):
127
    """
128
    Bit index to indicate which attribute of a node is writable
129
130
    Spec Part 3, Paragraph 5.2.7 WriteMask
131
    """
132
    AccessLevel = 0
133
    ArrayDimensions = 1
134
    BrowseName = 2
135
    ContainsNoLoops = 3
136
    DataType = 4
137
    Description = 5
138
    DisplayName = 6
139
    EventNotifier = 7
140
    Executable = 8
141
    Historizing = 9
142
    InverseName = 10
143
    IsAbstract = 11
144
    MinimumSamplingInterval = 12
145
    NodeClass = 13
146
    NodeId = 14
147
    Symmetric = 15
148
    UserAccessLevel = 16
149
    UserExecutable = 17
150
    UserWriteMask = 18
151
    ValueRank = 19
152
    WriteMask = 20
153
    ValueForVariableType = 21
154
155
156
class EventNotifier(_MaskEnum):
157
    """
158
    Bit index to indicate how a node can be used for events.
159
160
    Spec Part 3, appears multiple times, e.g. Paragraph 5.4 View NodeClass
161
    """
162
    SubscribeToEvents = 0
163
    # Reserved        = 1
164
    HistoryRead = 2
165
    HistoryWrite = 3
166
167
168
class StatusCode(FrozenClass):
169
    """
170
    :ivar value:
171
    :vartype value: int
172
    :ivar name:
173
    :vartype name: string
174
    :ivar doc:
175
    :vartype doc: string
176
    """
177
178
    ua_types = [("value", "UInt32")]
179
180
    def __init__(self, value=0):
181
        if isinstance(value, str):
182
            self.name = value
183
            self.value = getattr(status_codes.StatusCodes, value)
184
        else:
185
            self.value = value
186
            self.name, self.doc = status_codes.get_name_and_doc(value)
187
        self._freeze = True
188
189
    def check(self):
190
        """
191
        Raises an exception if the status code is anything else than 0 (good).
192
193
        Use the is_good() method if you do not want an exception.
194
        """
195
        if not self.is_good():
196
            raise UaStatusCodeError(self.value)
197
198
    def is_good(self):
199
        """
200
        return True if status is Good.
201
        """
202
        mask = 3 << 30
203
        if mask & self.value:
204
            return False
205
        else:
206
            return True
207
208
    def __str__(self):
209
        return 'StatusCode({0})'.format(self.name)
210
211
    __repr__ = __str__
212
213
    def __eq__(self, other):
214
        return self.value == other.value
215
216
    def __ne__(self, other):
217
        return not self.__eq__(other)
218
219
220
class NodeIdType(IntEnum):
221
    TwoByte = 0
222
    FourByte = 1
223
    Numeric = 2
224
    String = 3
225
    Guid = 4
226
    ByteString = 5
227
228
229
class NodeId(FrozenClass):
230
    """
231
    NodeId Object
232
233
    Args:
234
        identifier: The identifier might be an int, a string, bytes or a Guid
235
        namespaceidx(int): The index of the namespace
236
        nodeidtype(NodeIdType): The type of the nodeid if it cannor be guess or you want something special like twobyte nodeid or fourbytenodeid
237
238
239
    :ivar Identifier:
240
    :vartype Identifier: NodeId
241
    :ivar NamespaceIndex:
242
    :vartype NamespaceIndex: Int
243
    :ivar NamespaceUri:
244
    :vartype NamespaceUri: String
245
    :ivar ServerIndex:
246
    :vartype ServerIndex: Int
247
    """
248
249
    def __init__(self, identifier=None, namespaceidx=0, nodeidtype=None):
250
251
        self.Identifier = identifier
252
        self.NamespaceIndex = namespaceidx
253
        self.NodeIdType = nodeidtype
254
        self.NamespaceUri = ""
255
        self.ServerIndex = 0
256
        self._freeze = True
257
        if not isinstance(self.NamespaceIndex, int):
258
            raise UaError("NamespaceIndex must be an int")
259
        if self.Identifier is None:
260
            self.Identifier = 0
261
            self.NodeIdType = NodeIdType.TwoByte
262
            return
263
        if self.NodeIdType is None:
264
            if isinstance(self.Identifier, int):
265
                self.NodeIdType = NodeIdType.Numeric
266
            elif isinstance(self.Identifier, str):
267
                self.NodeIdType = NodeIdType.String
268
            elif isinstance(self.Identifier, bytes):
269
                self.NodeIdType = NodeIdType.ByteString
270
            elif isinstance(self.Identifier, uuid.UUID):
271
                self.NodeIdType = NodeIdType.Guid
272
            else:
273
                raise UaError("NodeId: Could not guess type of NodeId, set NodeIdType")
274
275
    def _key(self):
276
        if self.NodeIdType in (NodeIdType.TwoByte, NodeIdType.FourByte, NodeIdType.Numeric):
277
            # twobyte, fourbyte and numeric may represent the same node
278
            return (NodeIdType.Numeric, self.NamespaceIndex, self.Identifier)
279
        return (self.NodeIdType, self.NamespaceIndex, self.Identifier)
280
281
    def __eq__(self, node):
282
        return isinstance(node, NodeId) and self._key() == node._key()
283
284
    def __ne__(self, other):
285
        return not self.__eq__(other)
286
287
    def __hash__(self):
288
        return hash(self._key())
289
290
    def __lt__(self, other):
291
        if not isinstance(other, NodeId):
292
            raise AttributeError("Can only compare to NodeId")
293
        return self._key() < other._key()
294
295
    def is_null(self):
296
        if self.NamespaceIndex != 0:
297
            return False
298
        return self.has_null_identifier()
299
300
    def has_null_identifier(self):
301
        if not self.Identifier:
302
            return True
303
        if self.NodeIdType == NodeIdType.Guid and re.match(b'0.', self.Identifier):
304
            return True
305
        return False
306
307
    @staticmethod
308
    def from_string(string):
309
        try:
310
            return NodeId._from_string(string)
311
        except ValueError as ex:
312
            raise UaStringParsingError("Error parsing string {0}".format(string), ex)
313
314
    @staticmethod
315
    def _from_string(string):
316
        l = string.split(";")
317
        identifier = None
318
        namespace = 0
319
        ntype = None
320
        srv = None
321
        nsu = None
322
        for el in l:
323
            if not el:
324
                continue
325
            k, v = el.split("=", 1)
326
            k = k.strip()
327
            v = v.strip()
328
            if k == "ns":
329
                namespace = int(v)
330
            elif k == "i":
331
                ntype = NodeIdType.Numeric
332
                identifier = int(v)
333
            elif k == "s":
334
                ntype = NodeIdType.String
335
                identifier = v
336
            elif k == "g":
337
                ntype = NodeIdType.Guid
338
                identifier = v
339
            elif k == "b":
340
                ntype = NodeIdType.ByteString
341
                identifier = v
342
            elif k == "srv":
343
                srv = v
344
            elif k == "nsu":
345
                nsu = v
346
        if identifier is None:
347
            raise UaStringParsingError("Could not find identifier in string: " + string)
348
        nodeid = NodeId(identifier, namespace, ntype)
349
        nodeid.NamespaceUri = nsu
350
        nodeid.ServerIndex = srv
351
        return nodeid
352
353
    def to_string(self):
354
        string = ""
355
        if self.NamespaceIndex != 0:
356
            string += "ns={0};".format(self.NamespaceIndex)
357
        ntype = None
358
        if self.NodeIdType == NodeIdType.Numeric:
359
            ntype = "i"
360
        elif self.NodeIdType == NodeIdType.String:
361
            ntype = "s"
362
        elif self.NodeIdType == NodeIdType.TwoByte:
363
            ntype = "i"
364
        elif self.NodeIdType == NodeIdType.FourByte:
365
            ntype = "i"
366
        elif self.NodeIdType == NodeIdType.Guid:
367
            ntype = "g"
368
        elif self.NodeIdType == NodeIdType.ByteString:
369
            ntype = "b"
370
        string += "{0}={1}".format(ntype, self.Identifier)
371
        if self.ServerIndex:
372
            string = "; srv=" + str(self.ServerIndex) + string
373
        if self.NamespaceUri:
374
            string += "; nsu={0}".format(self.NamespaceUri)
375
        return string
376
377
    def __str__(self):
378
        return "{0}NodeId({1})".format(self.NodeIdType.name, self.to_string())
379
380
    __repr__ = __str__
381
382
    def to_binary(self):
383
        import opcua
384
        return opcua.ua.ua_binary.nodeid_to_binary(self)
385
386
387
class TwoByteNodeId(NodeId):
388
    def __init__(self, identifier):
389
        NodeId.__init__(self, identifier, 0, NodeIdType.TwoByte)
390
391
392
class FourByteNodeId(NodeId):
393
    def __init__(self, identifier, namespace=0):
394
        NodeId.__init__(self, identifier, namespace, NodeIdType.FourByte)
395
396
397
class NumericNodeId(NodeId):
398
    def __init__(self, identifier, namespace=0):
399
        NodeId.__init__(self, identifier, namespace, NodeIdType.Numeric)
400
401
402
class ByteStringNodeId(NodeId):
403
    def __init__(self, identifier, namespace=0):
404
        NodeId.__init__(self, identifier, namespace, NodeIdType.ByteString)
405
406
407
class GuidNodeId(NodeId):
408
    def __init__(self, identifier, namespace=0):
409
        NodeId.__init__(self, identifier, namespace, NodeIdType.Guid)
410
411
412
class StringNodeId(NodeId):
413
    def __init__(self, identifier, namespace=0):
414
        NodeId.__init__(self, identifier, namespace, NodeIdType.String)
415
416
417
ExpandedNodeId = NodeId
418
419
420
class QualifiedName(FrozenClass):
421
    """
422
    A string qualified with a namespace index.
423
    """
424
425
    ua_types = [
426
        ('NamespaceIndex', 'UInt16'),
427
        ('Name', 'String'),
428
    ]
429
430
    def __init__(self, name=None, namespaceidx=0):
431
        if not isinstance(namespaceidx, int):
432
            raise UaError("namespaceidx must be an int")
433
        self.NamespaceIndex = namespaceidx
434
        self.Name = name
435
        self._freeze = True
436
437
    def to_string(self):
438
        return "{0}:{1}".format(self.NamespaceIndex, self.Name)
439
440
    @staticmethod
441
    def from_string(string):
442
        if ":" in string:
443
            try:
444
                idx, name = string.split(":", 1)
445
                idx = int(idx)
446
            except (TypeError, ValueError) as ex:
447
                raise UaStringParsingError("Error parsing string {0}".format(string), ex)
448
        else:
449
            idx = 0
450
            name = string
451
        return QualifiedName(name, idx)
452
453
    def __eq__(self, bname):
454
        return isinstance(bname,
455
                          QualifiedName) and self.Name == bname.Name and self.NamespaceIndex == bname.NamespaceIndex
456
457
    def __ne__(self, other):
458
        return not self.__eq__(other)
459
460
    def __lt__(self, other):
461
        if not isinstance(other, QualifiedName):
462
            raise TypeError("Cannot compare QualifiedName and {0}".format(other))
463
        if self.NamespaceIndex == other.NamespaceIndex:
464
            return self.Name < other.Name
465
        else:
466
            return self.NamespaceIndex < other.NamespaceIndex
467
468
    def __str__(self):
469
        return 'QualifiedName({0}:{1})'.format(self.NamespaceIndex, self.Name)
470
471
    __repr__ = __str__
472
473
474
class LocalizedText(FrozenClass):
475
    """
476
    A string qualified with a namespace index.
477
    """
478
479
    ua_switches = {
480
        'Locale': ('Encoding', 0),
481
        'Text': ('Encoding', 1),
482
    }
483
484
    ua_types = (
485
            ('Encoding', 'Byte'), 
486
            ('Locale', 'String'), 
487
            ('Text', 'String'), )
488
489
    def __init__(self, text=None):
490
        self.Encoding = 0
491
        if text is not None and not isinstance(text, str):
492
            raise ValueError("A LocalizedText object takes a string as argument, not a {}, {}".format(text, type(text)))
493
        self.Text = text
494
        if self.Text:
495
            self.Encoding |= (1 << 1)
496
        self.Locale = None
497
        self._freeze = True
498
499
    def to_string(self):
500
        # FIXME: use local
501
        if self.Text is None:
502
            return ""
503
        return self.Text
504
505
    def __str__(self):
506
        return 'LocalizedText(' + 'Encoding:' + str(self.Encoding) + ', ' + \
507
            'Locale:' + str(self.Locale) + ', ' + \
508
            'Text:' + str(self.Text) +')'
509
510
    __repr__ = __str__
511
512
    def __eq__(self, other):
513
        if isinstance(other, LocalizedText) and self.Locale == other.Locale and self.Text == other.Text:
514
            return True
515
        return False
516
517
    def __ne__(self, other):
518
        return not self.__eq__(other)
519
520
521
class ExtensionObject(FrozenClass):
522
    """
523
    Any UA object packed as an ExtensionObject
524
525
    :ivar TypeId:
526
    :vartype TypeId: NodeId
527
    :ivar Body:
528
    :vartype Body: bytes
529
    """
530
    ua_switches = {
531
        'Body': ('Encoding', 0),
532
    }
533
534
    ua_types = (
535
            ("TypeId", "NodeId"), 
536
            ("Encoding", "Byte"), 
537
            ("Body", "ByteString"), 
538
            )
539
540
    def __init__(self):
541
        self.TypeId = NodeId()
542
        self.Encoding = 0
543
        self.Body = None
544
        self._freeze = True
545
546
    def __bool__(self):
547
        return self.Body is not None
548
549
    @staticmethod
550
    def from_object(obj):
551
        ext = ExtensionObject()
552
        oid = getattr(ObjectIds, "{0}_Encoding_DefaultBinary".format(obj.__class__.__name__))
553
        ext.TypeId = FourByteNodeId(oid)
554
        ext.Body = obj.to_binary()
555
        return ext
556
557
    def __str__(self):
558
        size = len(self.Body) if self.Body is not None else None
559
        return 'ExtensionObject(' + 'TypeId:' + str(self.TypeId) + ', ' + \
560
            'Encoding:' + str(self.Encoding) + ', ' + str(size) + ' bytes)'
561
562
    __repr__ = __str__
563
564
565
class VariantType(Enum):
566
    """
567
    The possible types of a variant.
568
569
    :ivar Null:
570
    :ivar Boolean:
571
    :ivar SByte:
572
    :ivar Byte:
573
    :ivar Int16:
574
    :ivar UInt16:
575
    :ivar Int32:
576
    :ivar UInt32:
577
    :ivar Int64:
578
    :ivar UInt64:
579
    :ivar Float:
580
    :ivar Double:
581
    :ivar String:
582
    :ivar DateTime:
583
    :ivar Guid:
584
    :ivar ByteString:
585
    :ivar XmlElement:
586
    :ivar NodeId:
587
    :ivar ExpandedNodeId:
588
    :ivar StatusCode:
589
    :ivar QualifiedName:
590
    :ivar LocalizedText:
591
    :ivar ExtensionObject:
592
    :ivar DataValue:
593
    :ivar Variant:
594
    :ivar DiagnosticInfo:
595
    """
596
597
    Null = 0
598
    Boolean = 1
599
    SByte = 2
600
    Byte = 3
601
    Int16 = 4
602
    UInt16 = 5
603
    Int32 = 6
604
    UInt32 = 7
605
    Int64 = 8
606
    UInt64 = 9
607
    Float = 10
608
    Double = 11
609
    String = 12
610
    DateTime = 13
611
    Guid = 14
612
    ByteString = 15
613
    XmlElement = 16
614
    NodeId = 17
615
    ExpandedNodeId = 18
616
    StatusCode = 19
617
    QualifiedName = 20
618
    LocalizedText = 21
619
    ExtensionObject = 22
620
    DataValue = 23
621
    Variant = 24
622
    DiagnosticInfo = 25
623
624
625
class VariantTypeCustom(object):
626
    """
627
    Looks like sometime we get variant with other values than those
628
    defined in VariantType.
629
    FIXME: We should not need this class, as far as I iunderstand the spec
630
    variants can only be of VariantType
631
    """
632
633
    def __init__(self, val):
634
        self.name = "Custom"
635
        self.value = val
636
        if self.value > 0b00111111:
637
            raise UaError(
638
                "Cannot create VariantType. VariantType must be {0} > x > {1}, received {2}".format(0b111111, 25, val))
639
640
    def __str__(self):
641
        return "VariantType.Custom:{0}".format(self.value)
642
643
    __repr__ = __str__
644
645
    def __eq__(self, other):
646
        return self.value == other.value
647
648
649
class Variant(FrozenClass):
650
    """
651
    Create an OPC-UA Variant object.
652
    if no argument a Null Variant is created.
653
    if not variant type is given, attemps to guess type from python type
654
    if a variant is given as value, the new objects becomes a copy of the argument
655
656
    :ivar Value:
657
    :vartype Value: Any supported type
658
    :ivar VariantType:
659
    :vartype VariantType: VariantType
660
    :ivar Dimension:
661
    :vartype Dimensions: The length of each dimensions. Usually guessed from value.
662
    :ivar is_array:
663
    :vartype is_array: If the variant is an array. Usually guessed from value.
664
    """
665
666
    def __init__(self, value=None, varianttype=None, dimensions=None, is_array=None):
667
        self.Value = value
668
        self.VariantType = varianttype
669
        self.Dimensions = dimensions
670
        self.is_array = is_array
671
        if self.is_array is None:
672
            if isinstance(value, (list, tuple)):
673
                self.is_array = True
674
            else:
675
                self.is_array = False
676
        self._freeze = True
677
        if isinstance(value, Variant):
678
            self.Value = value.Value
679
            self.VariantType = value.VariantType
680
        if self.VariantType is None:
681
            self.VariantType = self._guess_type(self.Value)
682
        if self.Value is None and not self.is_array and self.VariantType not in (VariantType.Null, VariantType.String,
683
                                                                                 VariantType.DateTime):
684
            raise UaError("Non array Variant of type {0} cannot have value None".format(self.VariantType))
685
        if self.Dimensions is None and isinstance(self.Value, (list, tuple)):
686
            dims = get_shape(self.Value)
687
            if len(dims) > 1:
688
                self.Dimensions = dims
689
690
    def __eq__(self, other):
691
        if isinstance(other, Variant) and self.VariantType == other.VariantType and self.Value == other.Value:
692
            return True
693
        return False
694
695
    def __ne__(self, other):
696
        return not self.__eq__(other)
697
698
    def _guess_type(self, val):
699
        if isinstance(val, (list, tuple)):
700
            error_val = val
701
        while isinstance(val, (list, tuple)):
702
            if len(val) == 0:
703
                raise UaError("could not guess UA type of variable {0}".format(error_val))
704
            val = val[0]
705
        if val is None:
706
            return VariantType.Null
707
        elif isinstance(val, bool):
708
            return VariantType.Boolean
709
        elif isinstance(val, float):
710
            return VariantType.Double
711
        elif isinstance(val, IntEnum):
712
            return VariantType.Int32
713
        elif isinstance(val, int):
714
            return VariantType.Int64
715
        elif isinstance(val, (str, unicode)):
716
            return VariantType.String
717
        elif isinstance(val, bytes):
718
            return VariantType.ByteString
719
        elif isinstance(val, datetime):
720
            return VariantType.DateTime
721
        elif isinstance(val, uuid.UUID):
722
            return VariantType.Guid
723
        else:
724
            if isinstance(val, object):
725
                try:
726
                    return getattr(VariantType, val.__class__.__name__)
727
                except AttributeError:
728
                    return VariantType.ExtensionObject
729
            else:
730
                raise UaError("Could not guess UA type of {0} with type {1}, specify UA type".format(val, type(val)))
731
732
    def __str__(self):
733
        return "Variant(val:{0!s},type:{1})".format(self.Value, self.VariantType)
734
735
    __repr__ = __str__
736
737
    def to_binary(self):
738
        from opcua.ua.ua_binary import variant_to_binary
739
        return variant_to_binary(self)
740
741
742
def _split_list(l, n):
743
    n = max(1, n)
744
    return [l[i:i + n] for i in range(0, len(l), n)]
745
746
747
def flatten_and_get_shape(mylist):
748
    dims = []
749
    dims.append(len(mylist))
750
    while isinstance(mylist[0], (list, tuple)):
751
        dims.append(len(mylist[0]))
752
        mylist = [item for sublist in mylist for item in sublist]
753
        if len(mylist) == 0:
754
            break
755
    return mylist, dims
756
757
758
def flatten(mylist):
759
    if mylist is None:
760
        return None
761
    elif len(mylist) == 0:
762
        return mylist
763
    while isinstance(mylist[0], (list, tuple)):
764
        mylist = [item for sublist in mylist for item in sublist]
765
        if len(mylist) == 0:
766
            break
767
    return mylist
768
769
770
def get_shape(mylist):
771
    dims = []
772
    while isinstance(mylist, (list, tuple)):
773
        dims.append(len(mylist))
774
        if len(mylist) == 0:
775
            break
776
        mylist = mylist[0]
777
    return dims
778
779
780
class DataValue(FrozenClass):
781
    """
782
    A value with an associated timestamp, and quality.
783
    Automatically generated from xml , copied and modified here to fix errors in xml spec
784
785
    :ivar Value:
786
    :vartype Value: Variant
787
    :ivar StatusCode:
788
    :vartype StatusCode: StatusCode
789
    :ivar SourceTimestamp:
790
    :vartype SourceTimestamp: datetime
791
    :ivar SourcePicoSeconds:
792
    :vartype SourcePicoSeconds: int
793
    :ivar ServerTimestamp:
794
    :vartype ServerTimestamp: datetime
795
    :ivar ServerPicoseconds:
796
    :vartype ServerPicoseconds: int
797
    """
798
799
    ua_switches = {
800
        'Value': ('Encoding', 0),
801
        'StatusCode': ('Encoding', 1),
802
        'SourceTimestamp': ('Encoding', 2),
803
        'ServerTimestamp': ('Encoding', 3),
804
        'SourcePicoseconds': ('Encoding', 4),
805 View Code Duplication
        'ServerPicoseconds': ('Encoding', 5),
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
806
    }
807
808
    ua_types = (
809
            ('Encoding', 'Byte'), 
810
            ('Value', 'Variant'), 
811
            ('StatusCode', 'StatusCode'), 
812
            ('SourceTimestamp', 'DateTime'),
813
            ('SourcePicoseconds', 'UInt16'), 
814
            ('ServerTimestamp', 'DateTime'), 
815
            ('ServerPicoseconds', 'UInt16'), 
816
            )
817
818
    def __init__(self, variant=None, status=None):
819
        self.Encoding = 0
820
        if not isinstance(variant, Variant):
821
            variant = Variant(variant)
822
        self.Value = variant
823
        if status is None:
824
            self.StatusCode = StatusCode()
825
        else:
826
            self.StatusCode = status
827
        self.SourceTimestamp = None  # DateTime()
828
        self.SourcePicoseconds = None
829
        self.ServerTimestamp = None  # DateTime()
830
        self.ServerPicoseconds = None
831
        self._freeze = True
832
833
    def __str__(self):
834
        s = 'DataValue(Value:{0}'.format(self.Value)
835
        if self.StatusCode is not None:
836
            s += ', StatusCode:{0}'.format(self.StatusCode)
837
        if self.SourceTimestamp is not None:
838
            s += ', SourceTimestamp:{0}'.format(self.SourceTimestamp)
839
        if self.ServerTimestamp is not None:
840
            s += ', ServerTimestamp:{0}'.format(self.ServerTimestamp)
841
        if self.SourcePicoseconds is not None:
842
            s += ', SourcePicoseconds:{0}'.format(self.SourcePicoseconds)
843
        if self.ServerPicoseconds is not None:
844
            s += ', ServerPicoseconds:{0}'.format(self.ServerPicoseconds)
845
        s += ')'
846
        return s
847
848
    __repr__ = __str__
849
850
851
def datatype_to_varianttype(int_type):
852
    """
853
    Takes a NodeId or int and return a VariantType
854
    This is only supported if int_type < 63 due to VariantType encoding
855
    At low level we do not have access to address space thus decoding is limited
856
    a better version of this method can be find in ua_utils.py
857
    """
858
    if isinstance(int_type, NodeId):
859
        int_type = int_type.Identifier
860
861
    if int_type <= 25:
862
        return VariantType(int_type)
863
    else:
864
        return VariantTypeCustom(int_type)
865
866
867
def get_default_value(vtype):
868
    """
869
    Given a variant type return default value for this type
870
    """
871
    if vtype == VariantType.Null:
872
        return None
873
    elif vtype == VariantType.Boolean:
874
        return False
875
    elif vtype in (VariantType.SByte, VariantType.Byte):
876
        return 0
877
    elif vtype == VariantType.ByteString:
878
        return b""
879
    elif 4 <= vtype.value <= 9:
880
        return 0
881
    elif vtype in (VariantType.Float, VariantType.Double):
882
        return 0.0
883
    elif vtype == VariantType.String:
884
        return None  # a string can be null
885
    elif vtype == VariantType.DateTime:
886
        return datetime.utcnow()
887
    elif vtype == VariantType.Guid:
888
        return uuid.uuid4()
889
    elif vtype == VariantType.XmlElement:
890
        return None  #Not sure this is correct
891
    elif vtype == VariantType.NodeId:
892
        return NodeId()
893
    elif vtype == VariantType.ExpandedNodeId:
894
        return NodeId()
895
    elif vtype == VariantType.StatusCode:
896
        return StatusCode()
897
    elif vtype == VariantType.QualifiedName:
898
        return QualifiedName()
899
    elif vtype == VariantType.LocalizedText:
900
        return LocalizedText()
901
    elif vtype == VariantType.ExtensionObject:
902
        return ExtensionObject()
903
    elif vtype == VariantType.DataValue:
904
        return DataValue()
905
    elif vtype == VariantType.Variant:
906
        return Variant()
907
    else:
908
        raise RuntimeError("function take a uatype as argument, got:", vtype)
909
910
911
# These dictionnaries are used to register extensions classes for automatic
912
# decoding and encoding
913
extension_object_classes = {}
914
extension_object_ids = {}
915
916
917
def register_extension_object(name, nodeid, class_type):
918
    """
919
    Register a new extension object for automatic decoding and make them available in ua module
920
    """
921
    logger.warning("registring new extension object: %s %s %s", name, nodeid, class_type)
922
    extension_object_classes[nodeid] = class_type
923
    extension_object_ids[name] = nodeid
924
    # FIXME: Next line is not exactly a Python best practices, so feel free to propose something else
925
    # add new extensions objects to ua modules to automate decoding
926
    import opcua.ua
927
    setattr(opcua.ua, name, class_type)
928
929
930
def get_extensionobject_class_type(typeid):
931
    """
932
    Returns the registered class type for typid of an extension object
933
    """
934
    if typeid in extension_object_classes:
935
        return extension_object_classes[typeid]
936
    else:
937
        return None
938