Completed
Pull Request — master (#490)
by Olivier
03:32
created

Variant   A

Complexity

Total Complexity 33

Size/Duplication

Total Lines 91
Duplicated Lines 0 %

Test Coverage

Coverage 0%

Importance

Changes 3
Bugs 1 Features 0
Metric Value
c 3
b 1
f 0
dl 0
loc 91
ccs 0
cts 65
cp 0
rs 9.3999
wmc 33

6 Methods

Rating   Name   Duplication   Size   Complexity  
A to_binary() 0 3 1
A __ne__() 0 2 1
F __init__() 0 23 11
F _guess_type() 0 33 15
A __str__() 0 2 1
A __eq__() 0 4 4
1
"""
2
implement ua datatypes
3
"""
4
5
import logging
6
from enum import Enum, IntEnum
7
from datetime import datetime
8
import sys
9
import os
10
import uuid
11
import re
12
import itertools
13
14
from opcua.ua import ua_binary as uabin
15
from opcua.ua import status_codes
16
from opcua.ua import ObjectIds
17
from opcua.ua.uaerrors import UaError
18
from opcua.ua.uaerrors import UaStatusCodeError
19
from opcua.ua.uaerrors import UaStringParsingError
20
21
logger = logging.getLogger(__name__)
22
23
if sys.version_info.major > 2:
24
    unicode = str
25
26
27
def get_win_epoch():
28
    return uabin.win_epoch_to_datetime(0)
29
30
31
class _FrozenClass(object):
32
    """
33
    Make it impossible to add members to a class.
34
    Not pythonic at all but we found out it prevents many many
35
    bugs in use of protocol structures
36
    """
37
    _freeze = False
38
39
    def __setattr__(self, key, value):
40
        if self._freeze and not hasattr(self, key):
41
            raise TypeError("Error adding member '{0}' to class '{1}', class is frozen, members are {2}".format(
42
                key, self.__class__.__name__, self.__dict__.keys()))
43
        object.__setattr__(self, key, value)
44
45
46
if "PYOPCUA_NO_TYPO_CHECK" in os.environ:
47
    # typo check is cpu consuming, but it will make debug easy.
48
    # if typo check is not need (in production), please set env PYOPCUA_NO_TYPO_CHECK.
49
    # this will make all uatype class inherit from object intead of _FrozenClass
50
    # and skip the typo check.
51
    FrozenClass = object
52
else:
53
    FrozenClass = _FrozenClass
54
55
56
class ValueRank(IntEnum):
57
    """
58
    Defines dimensions of a variable.
59
    This enum does not support all cases since ValueRank support any n>0
60
    but since it is an IntEnum it can be replace by a normal int
61
    """
62
    ScalarOrOneDimension = -3
63
    Any = -2
64
    Scalar = -1
65
    OneOrMoreDimensions = 0
66
    OneDimension = 1
67
    # the next names are not in spec but so common we express them here
68
    TwoDimensions = 2
69
    ThreeDimensions = 3
70
    FourDimensions = 4
71
72
73
class _MaskEnum(IntEnum):
74
    @classmethod
75
    def parse_bitfield(cls, the_int):
76
        """ Take an integer and interpret it as a set of enum values. """
77
        assert isinstance(the_int, int)
78
79
        return {cls(b) for b in cls._bits(the_int)}
80
81
    @classmethod
82
    def to_bitfield(cls, collection):
83
        """ Takes some enum values and creates an integer from them. """
84
        # make sure all elements are of the correct type (use itertools.tee in case we get passed an
85
        # iterator)
86
        iter1, iter2 = itertools.tee(iter(collection))
87
        assert all(isinstance(x, cls) for x in iter1)
88
89
        return sum(x.mask for x in iter2)
90
91
    @property
92
    def mask(self):
93
        return 1 << self.value
94
95
    @staticmethod
96
    def _bits(n):
97
        """ Iterate over the bits in n.
98
99
            e.g. bits(44) yields at 2, 3, 5
100
        """
101
        assert n >= 0  # avoid infinite recursion
102
103
        pos = 0
104
        while n:
105
            if n & 0x1:
106
                yield pos
107
            n = n // 2
108
            pos += 1
109
110
111
class AccessLevel(_MaskEnum):
112
    """
113
    Bit index to indicate what the access level is.
114
115
    Spec Part 3, appears multiple times, e.g. paragraph 5.6.2 Variable NodeClass
116
    """
117
    CurrentRead = 0
118
    CurrentWrite = 1
119
    HistoryRead = 2
120
    HistoryWrite = 3
121
    SemanticChange = 4
122
    StatusWrite = 5
123
    TimestampWrite = 6
124
125
126
class WriteMask(_MaskEnum):
127
    """
128
    Bit index to indicate which attribute of a node is writable
129
130
    Spec Part 3, Paragraph 5.2.7 WriteMask
131
    """
132
    AccessLevel = 0
133
    ArrayDimensions = 1
134
    BrowseName = 2
135
    ContainsNoLoops = 3
136
    DataType = 4
137
    Description = 5
138
    DisplayName = 6
139
    EventNotifier = 7
140
    Executable = 8
141
    Historizing = 9
142
    InverseName = 10
143
    IsAbstract = 11
144
    MinimumSamplingInterval = 12
145
    NodeClass = 13
146
    NodeId = 14
147
    Symmetric = 15
148
    UserAccessLevel = 16
149
    UserExecutable = 17
150
    UserWriteMask = 18
151
    ValueRank = 19
152
    WriteMask = 20
153
    ValueForVariableType = 21
154
155
156
class EventNotifier(_MaskEnum):
157
    """
158
    Bit index to indicate how a node can be used for events.
159
160
    Spec Part 3, appears multiple times, e.g. Paragraph 5.4 View NodeClass
161
    """
162
    SubscribeToEvents = 0
163
    # Reserved        = 1
164
    HistoryRead = 2
165
    HistoryWrite = 3
166
167
168
class StatusCode(FrozenClass):
169
    """
170
    :ivar value:
171
    :vartype value: int
172
    :ivar name:
173
    :vartype name: string
174
    :ivar doc:
175
    :vartype doc: string
176
    """
177
178
    ua_types = [("value", "UInt32")]
179
180
    def __init__(self, value=0):
181
        if isinstance(value, str):
182
            self.name = value
183
            self.value = getattr(status_codes.StatusCodes, value)
184
        else:
185
            self.value = value
186
            self.name, self.doc = status_codes.get_name_and_doc(value)
187
        self._freeze = True
188
189
    def check(self):
190
        """
191
        Raises an exception if the status code is anything else than 0 (good).
192
193
        Use the is_good() method if you do not want an exception.
194
        """
195
        if not self.is_good():
196
            raise UaStatusCodeError(self.value)
197
198
    def is_good(self):
199
        """
200
        return True if status is Good.
201
        """
202
        mask = 3 << 30
203
        if mask & self.value:
204
            return False
205
        else:
206
            return True
207
208
    def __str__(self):
209
        return 'StatusCode({0})'.format(self.name)
210
211
    __repr__ = __str__
212
213
    def __eq__(self, other):
214
        return self.value == other.value
215
216
    def __ne__(self, other):
217
        return not self.__eq__(other)
218
219
220
class NodeIdType(IntEnum):
221
    TwoByte = 0
222
    FourByte = 1
223
    Numeric = 2
224
    String = 3
225
    Guid = 4
226
    ByteString = 5
227
228
229
class NodeId(FrozenClass):
230
    """
231
    NodeId Object
232
233
    Args:
234
        identifier: The identifier might be an int, a string, bytes or a Guid
235
        namespaceidx(int): The index of the namespace
236
        nodeidtype(NodeIdType): The type of the nodeid if it cannor be guess or you want something special like twobyte nodeid or fourbytenodeid
237
238
239
    :ivar Identifier:
240
    :vartype Identifier: NodeId
241
    :ivar NamespaceIndex:
242
    :vartype NamespaceIndex: Int
243
    :ivar NamespaceUri:
244
    :vartype NamespaceUri: String
245
    :ivar ServerIndex:
246
    :vartype ServerIndex: Int
247
    """
248
249
    def __init__(self, identifier=None, namespaceidx=0, nodeidtype=None):
250
251
        self.Identifier = identifier
252
        self.NamespaceIndex = namespaceidx
253
        self.NodeIdType = nodeidtype
254
        self.NamespaceUri = ""
255
        self.ServerIndex = 0
256
        self._freeze = True
257
        if not isinstance(self.NamespaceIndex, int):
258
            raise UaError("NamespaceIndex must be an int")
259
        if self.Identifier is None:
260
            self.Identifier = 0
261
            self.NodeIdType = NodeIdType.TwoByte
262
            return
263
        if self.NodeIdType is None:
264
            if isinstance(self.Identifier, int):
265
                self.NodeIdType = NodeIdType.Numeric
266
            elif isinstance(self.Identifier, str):
267
                self.NodeIdType = NodeIdType.String
268
            elif isinstance(self.Identifier, bytes):
269
                self.NodeIdType = NodeIdType.ByteString
270
            elif isinstance(self.Identifier, uuid.UUID):
271
                self.NodeIdType = NodeIdType.Guid
272
            else:
273
                raise UaError("NodeId: Could not guess type of NodeId, set NodeIdType")
274
275
    def _key(self):
276
        if self.NodeIdType in (NodeIdType.TwoByte, NodeIdType.FourByte, NodeIdType.Numeric):
277
            # twobyte, fourbyte and numeric may represent the same node
278
            return (NodeIdType.Numeric, self.NamespaceIndex, self.Identifier)
279
        return (self.NodeIdType, self.NamespaceIndex, self.Identifier)
280
281
    def __eq__(self, node):
282
        return isinstance(node, NodeId) and self._key() == node._key()
283
284
    def __ne__(self, other):
285
        return not self.__eq__(other)
286
287
    def __hash__(self):
288
        return hash(self._key())
289
290
    def __lt__(self, other):
291
        if not isinstance(other, NodeId):
292
            raise AttributeError("Can only compare to NodeId")
293
        return self._key() < other._key()
294
295
    def is_null(self):
296
        if self.NamespaceIndex != 0:
297
            return False
298
        return self.has_null_identifier()
299
300
    def has_null_identifier(self):
301
        if not self.Identifier:
302
            return True
303
        if self.NodeIdType == NodeIdType.Guid and re.match(b'0.', self.Identifier):
304
            return True
305
        return False
306
307
    @staticmethod
308
    def from_string(string):
309
        try:
310
            return NodeId._from_string(string)
311
        except ValueError as ex:
312
            raise UaStringParsingError("Error parsing string {0}".format(string), ex)
313
314
    @staticmethod
315
    def _from_string(string):
316
        l = string.split(";")
317
        identifier = None
318
        namespace = 0
319
        ntype = None
320
        srv = None
321
        nsu = None
322
        for el in l:
323
            if not el:
324
                continue
325
            k, v = el.split("=", 1)
326
            k = k.strip()
327
            v = v.strip()
328
            if k == "ns":
329
                namespace = int(v)
330
            elif k == "i":
331
                ntype = NodeIdType.Numeric
332
                identifier = int(v)
333
            elif k == "s":
334
                ntype = NodeIdType.String
335
                identifier = v
336
            elif k == "g":
337
                ntype = NodeIdType.Guid
338
                identifier = v
339
            elif k == "b":
340
                ntype = NodeIdType.ByteString
341
                identifier = v
342
            elif k == "srv":
343
                srv = v
344
            elif k == "nsu":
345
                nsu = v
346
        if identifier is None:
347
            raise UaStringParsingError("Could not find identifier in string: " + string)
348
        nodeid = NodeId(identifier, namespace, ntype)
349
        nodeid.NamespaceUri = nsu
350
        nodeid.ServerIndex = srv
351
        return nodeid
352
353
    def to_string(self):
354
        string = ""
355
        if self.NamespaceIndex != 0:
356
            string += "ns={0};".format(self.NamespaceIndex)
357
        ntype = None
358
        if self.NodeIdType == NodeIdType.Numeric:
359
            ntype = "i"
360
        elif self.NodeIdType == NodeIdType.String:
361
            ntype = "s"
362
        elif self.NodeIdType == NodeIdType.TwoByte:
363
            ntype = "i"
364
        elif self.NodeIdType == NodeIdType.FourByte:
365
            ntype = "i"
366
        elif self.NodeIdType == NodeIdType.Guid:
367
            ntype = "g"
368
        elif self.NodeIdType == NodeIdType.ByteString:
369
            ntype = "b"
370
        string += "{0}={1}".format(ntype, self.Identifier)
371
        if self.ServerIndex:
372
            string = "srv=" + str(self.ServerIndex) + string
373
        if self.NamespaceUri:
374
            string += "nsu={0}".format(self.NamespaceUri)
375
        return string
376
377
    def __str__(self):
378
        return "{0}NodeId({1})".format(self.NodeIdType.name, self.to_string())
379
380
    __repr__ = __str__
381
382
    def to_binary(self):
383
        import opcua
384
        return opcua.ua.ua_binary.nodeid_to_binary(self)
385
386
387
class TwoByteNodeId(NodeId):
388
    def __init__(self, identifier):
389
        NodeId.__init__(self, identifier, 0, NodeIdType.TwoByte)
390
391
392
class FourByteNodeId(NodeId):
393
    def __init__(self, identifier, namespace=0):
394
        NodeId.__init__(self, identifier, namespace, NodeIdType.FourByte)
395
396
397
class NumericNodeId(NodeId):
398
    def __init__(self, identifier, namespace=0):
399
        NodeId.__init__(self, identifier, namespace, NodeIdType.Numeric)
400
401
402
class ByteStringNodeId(NodeId):
403
    def __init__(self, identifier, namespace=0):
404
        NodeId.__init__(self, identifier, namespace, NodeIdType.ByteString)
405
406
407
class GuidNodeId(NodeId):
408
    def __init__(self, identifier, namespace=0):
409
        NodeId.__init__(self, identifier, namespace, NodeIdType.Guid)
410
411
412
class StringNodeId(NodeId):
413
    def __init__(self, identifier, namespace=0):
414
        NodeId.__init__(self, identifier, namespace, NodeIdType.String)
415
416
417
ExpandedNodeId = NodeId
418
419
420
class QualifiedName(FrozenClass):
421
    """
422
    A string qualified with a namespace index.
423
    """
424
425
    ua_types = [
426
        ('NamespaceIndex', 'UInt16'),
427
        ('Name', 'String'),
428
    ]
429
430
    def __init__(self, name=None, namespaceidx=0):
431
        if not isinstance(namespaceidx, int):
432
            raise UaError("namespaceidx must be an int")
433
        self.NamespaceIndex = namespaceidx
434
        self.Name = name
435
        self._freeze = True
436
437
    def to_string(self):
438
        return "{0}:{1}".format(self.NamespaceIndex, self.Name)
439
440
    @staticmethod
441
    def from_string(string):
442
        if ":" in string:
443
            try:
444
                idx, name = string.split(":", 1)
445
                idx = int(idx)
446
            except (TypeError, ValueError) as ex:
447
                raise UaStringParsingError("Error parsing string {0}".format(string), ex)
448
        else:
449
            idx = 0
450
            name = string
451
        return QualifiedName(name, idx)
452
453
    def __eq__(self, bname):
454
        return isinstance(bname,
455
                          QualifiedName) and self.Name == bname.Name and self.NamespaceIndex == bname.NamespaceIndex
456
457
    def __ne__(self, other):
458
        return not self.__eq__(other)
459
460
    def __lt__(self, other):
461
        if not isinstance(other, QualifiedName):
462
            raise TypeError("Cannot compare QualifiedName and {0}".format(other))
463
        if self.NamespaceIndex == other.NamespaceIndex:
464
            return self.Name < other.Name
465
        else:
466
            return self.NamespaceIndex < other.NamespaceIndex
467
468
    def __str__(self):
469
        return 'QualifiedName({0}:{1})'.format(self.NamespaceIndex, self.Name)
470
471
    __repr__ = __str__
472
473
474
class LocalizedText(FrozenClass):
475
    """
476
    A string qualified with a namespace index.
477
    """
478
479
    ua_switches = {
480
        'Locale': ('Encoding', 0),
481
        'Text': ('Encoding', 1),
482
    }
483
484
    ua_types = (
485
            ('Encoding', 'Byte'), 
486
            ('Locale', 'String'), 
487
            ('Text', 'String'), )
488
489
    def __init__(self, text=None):
490
        self.Encoding = 0
491
        if text is not None and not isinstance(text, str):
492
            raise ValueError("A LocalizedText object takes a string as argument, not a {}, {}".format(text, type(text)))
493
        self.Text = text
494
        if self.Text:
495
            self.Encoding |= (1 << 1)
496
        self.Locale = None
497
        self._freeze = True
498
499
    def to_string(self):
500
        # FIXME: use local
501
        if self.Text is None:
502
            return ""
503
        return self.Text
504
505
    def __str__(self):
506
        return 'LocalizedText(' + 'Encoding:' + str(self.Encoding) + ', ' + \
507
            'Locale:' + str(self.Locale) + ', ' + \
508
            'Text:' + str(self.Text) +')'
509
510
    __repr__ = __str__
511
512
    def __eq__(self, other):
513
        if isinstance(other, LocalizedText) and self.Locale == other.Locale and self.Text == other.Text:
514
            return True
515
        return False
516
517
    def __ne__(self, other):
518
        return not self.__eq__(other)
519
520
521
class ExtensionObject(FrozenClass):
522
    """
523
    Any UA object packed as an ExtensionObject
524
525
    :ivar TypeId:
526
    :vartype TypeId: NodeId
527
    :ivar Body:
528
    :vartype Body: bytes
529
    """
530
    ua_switches = {
531
        'Body': ('Encoding', 0),
532
    }
533
534
    ua_types = (
535
            ("TypeId", "NodeId"), 
536
            ("Encoding", "Byte"), 
537
            ("Body", "ByteString"), 
538
            )
539
540
    def __init__(self):
541
        self.TypeId = NodeId()
542
        self.Encoding = 0
543
        self.Body = None
544
        self._freeze = True
545
546
    @staticmethod
547
    def from_object(obj):
548
        ext = ExtensionObject()
549
        oid = getattr(ObjectIds, "{0}_Encoding_DefaultBinary".format(obj.__class__.__name__))
550
        ext.TypeId = FourByteNodeId(oid)
551
        ext.Body = obj.to_binary()
552
        return ext
553
554
    def __str__(self):
555
        size = len(self.Body) if self.Body is not None else None
556
        return 'ExtensionObject(' + 'TypeId:' + str(self.TypeId) + ', ' + \
557
            'Encoding:' + str(self.Encoding) + ', ' + str(size) + ' bytes)'
558
559
    __repr__ = __str__
560
561
562
class VariantType(Enum):
563
    """
564
    The possible types of a variant.
565
566
    :ivar Null:
567
    :ivar Boolean:
568
    :ivar SByte:
569
    :ivar Byte:
570
    :ivar Int16:
571
    :ivar UInt16:
572
    :ivar Int32:
573
    :ivar UInt32:
574
    :ivar Int64:
575
    :ivar UInt64:
576
    :ivar Float:
577
    :ivar Double:
578
    :ivar String:
579
    :ivar DateTime:
580
    :ivar Guid:
581
    :ivar ByteString:
582
    :ivar XmlElement:
583
    :ivar NodeId:
584
    :ivar ExpandedNodeId:
585
    :ivar StatusCode:
586
    :ivar QualifiedName:
587
    :ivar LocalizedText:
588
    :ivar ExtensionObject:
589
    :ivar DataValue:
590
    :ivar Variant:
591
    :ivar DiagnosticInfo:
592
    """
593
594
    Null = 0
595
    Boolean = 1
596
    SByte = 2
597
    Byte = 3
598
    Int16 = 4
599
    UInt16 = 5
600
    Int32 = 6
601
    UInt32 = 7
602
    Int64 = 8
603
    UInt64 = 9
604
    Float = 10
605
    Double = 11
606
    String = 12
607
    DateTime = 13
608
    Guid = 14
609
    ByteString = 15
610
    XmlElement = 16
611
    NodeId = 17
612
    ExpandedNodeId = 18
613
    StatusCode = 19
614
    QualifiedName = 20
615
    LocalizedText = 21
616
    ExtensionObject = 22
617
    DataValue = 23
618
    Variant = 24
619
    DiagnosticInfo = 25
620
621
622
class VariantTypeCustom(object):
623
    """
624
    Looks like sometime we get variant with other values than those
625
    defined in VariantType.
626
    FIXME: We should not need this class, as far as I iunderstand the spec
627
    variants can only be of VariantType
628
    """
629
630
    def __init__(self, val):
631
        self.name = "Custom"
632
        self.value = val
633
        if self.value > 0b00111111:
634
            raise UaError(
635
                "Cannot create VariantType. VariantType must be {0} > x > {1}, received {2}".format(0b111111, 25, val))
636
637
    def __str__(self):
638
        return "VariantType.Custom:{0}".format(self.value)
639
640
    __repr__ = __str__
641
642
    def __eq__(self, other):
643
        return self.value == other.value
644
645
646
class Variant(FrozenClass):
647
    """
648
    Create an OPC-UA Variant object.
649
    if no argument a Null Variant is created.
650
    if not variant type is given, attemps to guess type from python type
651
    if a variant is given as value, the new objects becomes a copy of the argument
652
653
    :ivar Value:
654
    :vartype Value: Any supported type
655
    :ivar VariantType:
656
    :vartype VariantType: VariantType
657
    :ivar Dimension:
658
    :vartype Dimensions: The length of each dimensions. Usually guessed from value.
659
    :ivar is_array:
660
    :vartype is_array: If the variant is an array. Usually guessed from value.
661
    """
662
663
    def __init__(self, value=None, varianttype=None, dimensions=None, is_array=None):
664
        self.Value = value
665
        self.VariantType = varianttype
666
        self.Dimensions = dimensions
667
        self.is_array = is_array
668
        if self.is_array is None:
669
            if isinstance(value, (list, tuple)):
670
                self.is_array = True
671
            else:
672
                self.is_array = False
673
        self._freeze = True
674
        if isinstance(value, Variant):
675
            self.Value = value.Value
676
            self.VariantType = value.VariantType
677
        if self.VariantType is None:
678
            self.VariantType = self._guess_type(self.Value)
679
        if self.Value is None and not self.is_array and self.VariantType not in (VariantType.Null, VariantType.String,
680
                                                                                 VariantType.DateTime):
681
            raise UaError("Non array Variant of type {0} cannot have value None".format(self.VariantType))
682
        if self.Dimensions is None and isinstance(self.Value, (list, tuple)):
683
            dims = get_shape(self.Value)
684
            if len(dims) > 1:
685
                self.Dimensions = dims
686
687
    def __eq__(self, other):
688
        if isinstance(other, Variant) and self.VariantType == other.VariantType and self.Value == other.Value:
689
            return True
690
        return False
691
692
    def __ne__(self, other):
693
        return not self.__eq__(other)
694
695
    def _guess_type(self, val):
696
        if isinstance(val, (list, tuple)):
697
            error_val = val
698
        while isinstance(val, (list, tuple)):
699
            if len(val) == 0:
700
                raise UaError("could not guess UA type of variable {0}".format(error_val))
701
            val = val[0]
702
        if val is None:
703
            return VariantType.Null
704
        elif isinstance(val, bool):
705
            return VariantType.Boolean
706
        elif isinstance(val, float):
707
            return VariantType.Double
708
        elif isinstance(val, IntEnum):
709
            return VariantType.Int32
710
        elif isinstance(val, int):
711
            return VariantType.Int64
712
        elif isinstance(val, (str, unicode)):
713
            return VariantType.String
714
        elif isinstance(val, bytes):
715
            return VariantType.ByteString
716
        elif isinstance(val, datetime):
717
            return VariantType.DateTime
718
        elif isinstance(val, uuid.UUID):
719
            return VariantType.Guid
720
        else:
721
            if isinstance(val, object):
722
                try:
723
                    return getattr(VariantType, val.__class__.__name__)
724
                except AttributeError:
725
                    return VariantType.ExtensionObject
726
            else:
727
                raise UaError("Could not guess UA type of {0} with type {1}, specify UA type".format(val, type(val)))
728
729
    def __str__(self):
730
        return "Variant(val:{0!s},type:{1})".format(self.Value, self.VariantType)
731
732
    __repr__ = __str__
733
734
    def to_binary(self):
735
        from opcua.ua.ua_binary import variant_to_binary
736
        return variant_to_binary(self)
737
738
739
def _split_list(l, n):
740
    n = max(1, n)
741
    return [l[i:i + n] for i in range(0, len(l), n)]
742
743
744
def flatten_and_get_shape(mylist):
745
    dims = []
746
    dims.append(len(mylist))
747
    while isinstance(mylist[0], (list, tuple)):
748
        dims.append(len(mylist[0]))
749
        mylist = [item for sublist in mylist for item in sublist]
750
        if len(mylist) == 0:
751
            break
752
    return mylist, dims
753
754
755
def flatten(mylist):
756
    if mylist is None:
757
        return None
758
    elif len(mylist) == 0:
759
        return mylist
760
    while isinstance(mylist[0], (list, tuple)):
761
        mylist = [item for sublist in mylist for item in sublist]
762
        if len(mylist) == 0:
763
            break
764
    return mylist
765
766
767
def get_shape(mylist):
768
    dims = []
769
    while isinstance(mylist, (list, tuple)):
770
        dims.append(len(mylist))
771
        if len(mylist) == 0:
772
            break
773
        mylist = mylist[0]
774
    return dims
775
776
777
class DataValue(FrozenClass):
778
    """
779
    A value with an associated timestamp, and quality.
780
    Automatically generated from xml , copied and modified here to fix errors in xml spec
781
782
    :ivar Value:
783
    :vartype Value: Variant
784
    :ivar StatusCode:
785
    :vartype StatusCode: StatusCode
786
    :ivar SourceTimestamp:
787
    :vartype SourceTimestamp: datetime
788
    :ivar SourcePicoSeconds:
789
    :vartype SourcePicoSeconds: int
790
    :ivar ServerTimestamp:
791
    :vartype ServerTimestamp: datetime
792
    :ivar ServerPicoseconds:
793
    :vartype ServerPicoseconds: int
794
    """
795
796
    ua_switches = {
797
        'Value': ('Encoding', 0),
798
        'StatusCode': ('Encoding', 1),
799
        'SourceTimestamp': ('Encoding', 2),
800
        'ServerTimestamp': ('Encoding', 3),
801
        'SourcePicoseconds': ('Encoding', 4),
802
        'ServerPicoseconds': ('Encoding', 5),
803
    }
804
805 View Code Duplication
    ua_types = (
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
806
            ('Encoding', 'Byte'), 
807
            ('Value', 'Variant'), 
808
            ('StatusCode', 'StatusCode'), 
809
            ('SourceTimestamp', 'DateTime'),
810
            ('SourcePicoseconds', 'UInt16'), 
811
            ('ServerTimestamp', 'DateTime'), 
812
            ('ServerPicoseconds', 'UInt16'), 
813
            )
814
815
    def __init__(self, variant=None, status=None):
816
        self.Encoding = 0
817
        if not isinstance(variant, Variant):
818
            variant = Variant(variant)
819
        self.Value = variant
820
        if status is None:
821
            self.StatusCode = StatusCode()
822
        else:
823
            self.StatusCode = status
824
        self.SourceTimestamp = None  # DateTime()
825
        self.SourcePicoseconds = None
826
        self.ServerTimestamp = None  # DateTime()
827
        self.ServerPicoseconds = None
828
        self._freeze = True
829
830
    def __str__(self):
831
        s = 'DataValue(Value:{0}'.format(self.Value)
832
        if self.StatusCode is not None:
833
            s += ', StatusCode:{0}'.format(self.StatusCode)
834
        if self.SourceTimestamp is not None:
835
            s += ', SourceTimestamp:{0}'.format(self.SourceTimestamp)
836
        if self.ServerTimestamp is not None:
837
            s += ', ServerTimestamp:{0}'.format(self.ServerTimestamp)
838
        if self.SourcePicoseconds is not None:
839
            s += ', SourcePicoseconds:{0}'.format(self.SourcePicoseconds)
840
        if self.ServerPicoseconds is not None:
841
            s += ', ServerPicoseconds:{0}'.format(self.ServerPicoseconds)
842
        s += ')'
843
        return s
844
845
    __repr__ = __str__
846
847
848
def datatype_to_varianttype(int_type):
849
    """
850
    Takes a NodeId or int and return a VariantType
851
    This is only supported if int_type < 63 due to VariantType encoding
852
    At low level we do not have access to address space thus decoding is limited
853
    a better version of this method can be find in ua_utils.py
854
    """
855
    if isinstance(int_type, NodeId):
856
        int_type = int_type.Identifier
857
858
    if int_type <= 25:
859
        return VariantType(int_type)
860
    else:
861
        return VariantTypeCustom(int_type)
862
863
864
def get_default_value(vtype):
865
    """
866
    Given a variant type return default value for this type
867
    """
868
    if vtype == VariantType.Null:
869
        return None
870
    elif vtype == VariantType.Boolean:
871
        return False
872
    elif vtype in (VariantType.SByte, VariantType.Byte):
873
        return 0
874
    elif vtype == VariantType.ByteString:
875
        return b""
876
    elif 4 <= vtype.value <= 9:
877
        return 0
878
    elif vtype in (VariantType.Float, VariantType.Double):
879
        return 0.0
880
    elif vtype == VariantType.String:
881
        return None  # a string can be null
882
    elif vtype == VariantType.DateTime:
883
        return datetime.utcnow()
884
    elif vtype == VariantType.Guid:
885
        return uuid.uuid4()
886
    elif vtype == VariantType.XmlElement:
887
        return None  #Not sure this is correct
888
    elif vtype == VariantType.NodeId:
889
        return NodeId()
890
    elif vtype == VariantType.ExpandedNodeId:
891
        return NodeId()
892
    elif vtype == VariantType.StatusCode:
893
        return StatusCode()
894
    elif vtype == VariantType.QualifiedName:
895
        return QualifiedName()
896
    elif vtype == VariantType.LocalizedText:
897
        return LocalizedText()
898
    elif vtype == VariantType.ExtensionObject:
899
        return ExtensionObject()
900
    elif vtype == VariantType.DataValue:
901
        return DataValue()
902
    elif vtype == VariantType.Variant:
903
        return Variant()
904
    else:
905
        raise RuntimeError("function take a uatype as argument, got:", vtype)
906
907
908
# These dictionnaries are used to register extensions classes for automatic
909
# decoding and encoding
910
extension_object_classes = {}
911
extension_object_ids = {}
912
913
914
def register_extension_object(name, nodeid, class_type):
915
    """
916
    Register a new extension object for automatic decoding and make them available in ua module
917
    """
918
    logger.warning("registring new extension object: %s %s %s", name, nodeid, class_type)
919
    extension_object_classes[nodeid] = class_type
920
    extension_object_ids[name] = nodeid
921
    # FIXME: Next line is not exactly a Python best practices, so feel free to propose something else
922
    # add new extensions objects to ua modules to automate decoding
923
    import opcua.ua
924
    setattr(opcua.ua, name, class_type)
925
926
927
def get_extensionobject_class_type(typeid):
928
    """
929
    Returns the registered class type for typid of an extension object
930
    """
931
    if typeid in extension_object_classes:
932
        return extension_object_classes[typeid]
933
    else:
934
        return None
935