Test Failed
Pull Request — master (#490)
by Olivier
03:08
created

Variant   A

Complexity

Total Complexity 33

Size/Duplication

Total Lines 92
Duplicated Lines 4.35 %

Test Coverage

Coverage 0%

Importance

Changes 2
Bugs 1 Features 0
Metric Value
c 2
b 1
f 0
dl 4
loc 92
ccs 0
cts 65
cp 0
rs 9.3999
wmc 33

6 Methods

Rating   Name   Duplication   Size   Complexity  
A to_binary() 3 3 1
A __ne__() 0 2 1
F __init__() 0 25 11
F _guess_type() 0 33 15
A __str__() 0 2 1
A __eq__() 0 4 4

How to fix   Duplicated Code   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

1
"""
2
implement ua datatypes
3
"""
4
5
import logging
6
import struct
7
from enum import Enum, IntEnum
8
from datetime import datetime
9
import sys
10
import os
11
import uuid
12
import re
13
import itertools
14
15
from opcua.ua import ua_binary as uabin
16
from opcua.ua import status_codes
17
from opcua.ua import ObjectIds
18
from opcua.ua.uaerrors import UaError
19
from opcua.ua.uaerrors import UaStatusCodeError
20
from opcua.ua.uaerrors import UaStringParsingError
21
22
23
logger = logging.getLogger(__name__)
24
25
26
if sys.version_info.major > 2:
27
    unicode = str
28
def get_win_epoch():
29
    return uabin.win_epoch_to_datetime(0)
30
31
32
class _FrozenClass(object):
33
34
    """
35
    Make it impossible to add members to a class.
36
    Not pythonic at all but we found out it prevents many many
37
    bugs in use of protocol structures
38
    """
39
    _freeze = False
40
41
    def __setattr__(self, key, value):
42
        if self._freeze and not hasattr(self, key):
43
            raise TypeError("Error adding member '{0}' to class '{1}', class is frozen, members are {2}".format(
44
                key, self.__class__.__name__, self.__dict__.keys()))
45
        object.__setattr__(self, key, value)
46
47
48
if "PYOPCUA_NO_TYPO_CHECK" in os.environ:
49
    # typo check is cpu consuming, but it will make debug easy.
50
    # if typo check is not need (in production), please set env PYOPCUA_NO_TYPO_CHECK.
51
    # this will make all uatype class inherit from object intead of _FrozenClass
52
    # and skip the typo check.
53
    FrozenClass = object
54
else:
55
    FrozenClass = _FrozenClass
56
57
58
class ValueRank(IntEnum):
59
    """
60
    Defines dimensions of a variable.
61
    This enum does not support all cases since ValueRank support any n>0
62
    but since it is an IntEnum it can be replace by a normal int
63
    """
64
    ScalarOrOneDimension = -3
65
    Any = -2
66
    Scalar = -1
67
    OneOrMoreDimensions = 0
68
    OneDimension = 1
69
    # the next names are not in spec but so common we express them here
70
    TwoDimensions = 2
71
    ThreeDimensions = 3
72
    FourDimensions = 4
73
74
75
class _MaskEnum(IntEnum):
76
77
    @classmethod
78
    def parse_bitfield(cls, the_int):
79
        """ Take an integer and interpret it as a set of enum values. """
80
        assert isinstance(the_int, int)
81
82
        return {cls(b) for b in cls._bits(the_int)}
83
84
    @classmethod
85
    def to_bitfield(cls, collection):
86
        """ Takes some enum values and creates an integer from them. """
87
        # make sure all elements are of the correct type (use itertools.tee in case we get passed an
88
        # iterator)
89
        iter1, iter2 = itertools.tee(iter(collection))
90
        assert all(isinstance(x, cls) for x in iter1)
91
92
        return sum(x.mask for x in iter2)
93
94
    @property
95
    def mask(self):
96
        return 1 << self.value
97
98
    @staticmethod
99
    def _bits(n):
100
        """ Iterate over the bits in n.
101
102
            e.g. bits(44) yields at 2, 3, 5
103
        """
104
        assert n >= 0  # avoid infinite recursion
105
106
        pos = 0
107
        while n:
108
            if n & 0x1:
109
                yield pos
110
            n = n // 2
111
            pos += 1
112
113
114
class AccessLevel(_MaskEnum):
115
    """
116
    Bit index to indicate what the access level is.
117
118
    Spec Part 3, appears multiple times, e.g. paragraph 5.6.2 Variable NodeClass
119
    """
120
    CurrentRead = 0
121
    CurrentWrite = 1
122
    HistoryRead = 2
123
    HistoryWrite = 3
124
    SemanticChange = 4
125
    StatusWrite = 5
126
    TimestampWrite = 6
127
128
129
class WriteMask(_MaskEnum):
130
    """
131
    Bit index to indicate which attribute of a node is writable
132
133
    Spec Part 3, Paragraph 5.2.7 WriteMask
134
    """
135
    AccessLevel = 0
136
    ArrayDimensions = 1
137
    BrowseName = 2
138
    ContainsNoLoops = 3
139
    DataType = 4
140
    Description = 5
141
    DisplayName = 6
142
    EventNotifier = 7
143
    Executable = 8
144
    Historizing = 9
145
    InverseName = 10
146
    IsAbstract = 11
147
    MinimumSamplingInterval = 12
148
    NodeClass = 13
149
    NodeId = 14
150
    Symmetric = 15
151
    UserAccessLevel = 16
152
    UserExecutable = 17
153
    UserWriteMask = 18
154
    ValueRank = 19
155
    WriteMask = 20
156
    ValueForVariableType = 21
157
158
159
class EventNotifier(_MaskEnum):
160
    """
161
    Bit index to indicate how a node can be used for events.
162
163
    Spec Part 3, appears multiple times, e.g. Paragraph 5.4 View NodeClass
164
    """
165
    SubscribeToEvents = 0
166
    # Reserved        = 1
167
    HistoryRead = 2
168
    HistoryWrite = 3
169
170
171
class StatusCode(FrozenClass):
172
173
    """
174
    :ivar value:
175
    :vartype value: int
176
    :ivar name:
177
    :vartype name: string
178
    :ivar doc:
179
    :vartype doc: string
180
    """
181
182
    ua_types = [
183
            ("value", "UInt32")
184
            ]
185
186
    def __init__(self, value=0):
187
        if isinstance(value, str):
188
            self.name = value
189
            self.value = getattr(status_codes.StatusCodes, value)
190
        else:
191
            self.value = value
192
            self.name, self.doc = status_codes.get_name_and_doc(value)
193
        self._freeze = True
194
195
    def to_binary(self):
196
        return uabin.Primitives.UInt32.pack(self.value)
197
198
    @staticmethod
199
    def from_binary(data):
200
        val = uabin.Primitives.UInt32.unpack(data)
201
        sc = StatusCode(val)
202
        return sc
203
204
    def check(self):
205
        """
206
        Raises an exception if the status code is anything else than 0 (good).
207
208
        Use the is_good() method if you do not want an exception.
209
        """
210
        if not self.is_good():
211
            raise UaStatusCodeError(self.value)
212
213
    def is_good(self):
214
        """
215
        return True if status is Good.
216
        """
217
        mask = 3 << 30
218
        if mask & self.value:
219
            return False
220
        else:
221
            return True
222
223
    def __str__(self):
224
        return 'StatusCode({0})'.format(self.name)
225
    __repr__ = __str__
226
227
    def __eq__(self, other):
228
        return self.value == other.value
229
230
    def __ne__(self, other):
231
        return not self.__eq__(other)
232
233
234
class NodeIdType(IntEnum):
235
    TwoByte = 0
236
    FourByte = 1
237
    Numeric = 2
238
    String = 3
239
    Guid = 4
240
    ByteString = 5
241
242
243
class NodeId(FrozenClass):
244
    """
245
    NodeId Object
246
247
    Args:
248
        identifier: The identifier might be an int, a string, bytes or a Guid
249
        namespaceidx(int): The index of the namespace
250
        nodeidtype(NodeIdType): The type of the nodeid if it cannor be guess or you want something special like twobyte nodeid or fourbytenodeid
251
252
253
    :ivar Identifier:
254
    :vartype Identifier: NodeId
255
    :ivar NamespaceIndex:
256
    :vartype NamespaceIndex: Int
257
    :ivar NamespaceUri:
258
    :vartype NamespaceUri: String
259
    :ivar ServerIndex:
260
    :vartype ServerIndex: Int
261
    """
262
    
263
    def __init__(self, identifier=None, namespaceidx=0, nodeidtype=None):
264
265
        self.Identifier = identifier
266
        self.NamespaceIndex = namespaceidx
267
        self.NodeIdType = nodeidtype
268
        self.NamespaceUri = ""
269
        self.ServerIndex = 0
270
        self._freeze = True
271
        if not isinstance(self.NamespaceIndex, int):
272
            raise UaError("NamespaceIndex must be an int")
273
        if self.Identifier is None:
274
            self.Identifier = 0
275
            self.NodeIdType = NodeIdType.TwoByte
276
            return
277
        if self.NodeIdType is None:
278
            if isinstance(self.Identifier, int):
279
                self.NodeIdType = NodeIdType.Numeric
280
            elif isinstance(self.Identifier, str):
281
                self.NodeIdType = NodeIdType.String
282
            elif isinstance(self.Identifier, bytes):
283
                self.NodeIdType = NodeIdType.ByteString
284
            elif isinstance(self.Identifier, uuid.UUID):
285
                self.NodeIdType = NodeIdType.Guid
286
            else:
287
                raise UaError("NodeId: Could not guess type of NodeId, set NodeIdType")
288
289
    def _key(self):
290
        if self.NodeIdType in (NodeIdType.TwoByte, NodeIdType.FourByte, NodeIdType.Numeric):
291
            # twobyte, fourbyte and numeric may represent the same node
292
            return (NodeIdType.Numeric, self.NamespaceIndex, self.Identifier)
293
        return (self.NodeIdType, self.NamespaceIndex, self.Identifier)
294
295
    def __eq__(self, node):
296
        return isinstance(node, NodeId) and self._key() == node._key()
297
298
    def __ne__(self, other):
299
        return not self.__eq__(other)
300
301
    def __hash__(self):
302
        return hash(self._key())
303
304
    def __lt__(self, other):
305
        if not isinstance(other, NodeId):
306
            raise AttributeError("Can only compare to NodeId")
307
        return self._key() < other._key()
308
309
    def is_null(self):
310
        if self.NamespaceIndex != 0:
311
            return False
312
        return self.has_null_identifier()
313
314
    def has_null_identifier(self):
315
        if not self.Identifier:
316
            return True
317
        if self.NodeIdType == NodeIdType.Guid and re.match(b'0.', self.Identifier):
318
            return True
319
        return False
320
321
    @staticmethod
322
    def from_string(string):
323
        try:
324
            return NodeId._from_string(string)
325
        except ValueError as ex:
326
            raise UaStringParsingError("Error parsing string {0}".format(string), ex)
327
328
    @staticmethod
329
    def _from_string(string):
330
        l = string.split(";")
331
        identifier = None
332
        namespace = 0
333
        ntype = None
334
        srv = None
335
        nsu = None
336
        for el in l:
337
            if not el:
338
                continue
339
            k, v = el.split("=", 1)
340
            k = k.strip()
341
            v = v.strip()
342
            if k == "ns":
343
                namespace = int(v)
344
            elif k == "i":
345
                ntype = NodeIdType.Numeric
346
                identifier = int(v)
347
            elif k == "s":
348
                ntype = NodeIdType.String
349
                identifier = v
350
            elif k == "g":
351
                ntype = NodeIdType.Guid
352
                identifier = v
353
            elif k == "b":
354
                ntype = NodeIdType.ByteString
355
                identifier = v
356
            elif k == "srv":
357
                srv = v
358
            elif k == "nsu":
359
                nsu = v
360
        if identifier is None:
361
            raise UaStringParsingError("Could not find identifier in string: " + string)
362
        nodeid = NodeId(identifier, namespace, ntype)
363
        nodeid.NamespaceUri = nsu
364
        nodeid.ServerIndex = srv
365
        return nodeid
366
367
    def to_string(self):
368
        string = ""
369
        if self.NamespaceIndex != 0:
370
            string += "ns={0};".format(self.NamespaceIndex)
371
        ntype = None
372
        if self.NodeIdType == NodeIdType.Numeric:
373
            ntype = "i"
374
        elif self.NodeIdType == NodeIdType.String:
375
            ntype = "s"
376
        elif self.NodeIdType == NodeIdType.TwoByte:
377
            ntype = "i"
378
        elif self.NodeIdType == NodeIdType.FourByte:
379
            ntype = "i"
380
        elif self.NodeIdType == NodeIdType.Guid:
381
            ntype = "g"
382
        elif self.NodeIdType == NodeIdType.ByteString:
383
            ntype = "b"
384
        string += "{0}={1}".format(ntype, self.Identifier)
385
        if self.ServerIndex:
386
            string = "srv=" + str(self.ServerIndex) + string
387
        if self.NamespaceUri:
388
            string += "nsu={0}".format(self.NamespaceUri)
389
        return string
390
391
    def __str__(self):
392
        return "{0}NodeId({1})".format(self.NodeIdType.name, self.to_string())
393
    __repr__ = __str__
394
395
    def to_binary(self):
396
        import opcua 
397
        return opcua.ua.ua_binary.nodeid_to_binary(self)
398
399
400
class TwoByteNodeId(NodeId):
401
402
    def __init__(self, identifier):
403
        NodeId.__init__(self, identifier, 0, NodeIdType.TwoByte)
404
405
406
class FourByteNodeId(NodeId):
407
408
    def __init__(self, identifier, namespace=0):
409
        NodeId.__init__(self, identifier, namespace, NodeIdType.FourByte)
410
411
412
class NumericNodeId(NodeId):
413
414
    def __init__(self, identifier, namespace=0):
415
        NodeId.__init__(self, identifier, namespace, NodeIdType.Numeric)
416
417
418
class ByteStringNodeId(NodeId):
419
420
    def __init__(self, identifier, namespace=0):
421
        NodeId.__init__(self, identifier, namespace, NodeIdType.ByteString)
422
423
424
class GuidNodeId(NodeId):
425
426
    def __init__(self, identifier, namespace=0):
427
        NodeId.__init__(self, identifier, namespace, NodeIdType.Guid)
428
429
430
class StringNodeId(NodeId):
431
432
    def __init__(self, identifier, namespace=0):
433
        NodeId.__init__(self, identifier, namespace, NodeIdType.String)
434
435
436
ExpandedNodeId = NodeId
437
438
439
class QualifiedName(FrozenClass):
440
    """
441
    A string qualified with a namespace index.
442
    """
443
444
    ua_types = [
445
        ('NamespaceIndex', 'UInt16'),
446
        ('Name', 'String'),
447
        ]
448
  
449
    def __init__(self, name=None, namespaceidx=0):
450
        if not isinstance(namespaceidx, int):
451
            raise UaError("namespaceidx must be an int")
452
        self.NamespaceIndex = namespaceidx
453
        self.Name = name
454
        self._freeze = True
455
456
    def to_string(self):
457
        return "{0}:{1}".format(self.NamespaceIndex, self.Name)
458
459
    @staticmethod
460
    def from_string(string):
461
        if ":" in string:
462
            try:
463
                idx, name = string.split(":", 1)
464
                idx = int(idx)
465
            except (TypeError, ValueError) as ex:
466
                raise UaStringParsingError("Error parsing string {0}".format(string), ex)
467
        else:
468
            idx = 0
469
            name = string
470
        return QualifiedName(name, idx)
471
472
    def to_binary(self):
473
        packet = []
474
        packet.append(uabin.Primitives.UInt16.pack(self.NamespaceIndex))
475
        packet.append(uabin.Primitives.String.pack(self.Name))
476
        return b''.join(packet)
477
478
    @staticmethod
479
    def from_binary(data):
480
        obj = QualifiedName()
481
        obj.NamespaceIndex = uabin.Primitives.UInt16.unpack(data)
482
        obj.Name = uabin.Primitives.String.unpack(data)
483
        return obj
484
485
    def __eq__(self, bname):
486
        return isinstance(bname, QualifiedName) and self.Name == bname.Name and self.NamespaceIndex == bname.NamespaceIndex
487
488
    def __ne__(self, other):
489
        return not self.__eq__(other)
490
491
    def __lt__(self, other):
492
        if not isinstance(other, QualifiedName):
493
            raise TypeError("Cannot compare QualifiedName and {0}".format(other))
494
        if self.NamespaceIndex == other.NamespaceIndex:
495
            return self.Name < other.Name
496
        else:
497
            return self.NamespaceIndex < other.NamespaceIndex
498
499
    def __str__(self):
500
        return 'QualifiedName({0}:{1})'.format(self.NamespaceIndex, self.Name)
501
502
    __repr__ = __str__
503
504
505
class LocalizedText(FrozenClass):
506
    """
507
    A string qualified with a namespace index.
508
    """
509
510
    ua_switches = {
511
        'Locale': ('Encoding', 0),
512
        'Text': ('Encoding', 1),
513
               }
514
515
    ua_types = (
516
        ('Encoding', 'UInt8'),
517
        ('Locale', 'CharArray'),
518
        ('Text', 'CharArray'),
519
               )
520
521
    def __init__(self, text=None):
522
        self.Encoding = 0
523
        if text is not None and not isinstance(text, str):
524
            raise ValueError("A LocalizedText object takes a string as argument, not a {}, {}".format(text, type(text)))
525
        self.Text = text
526
        if self.Text:
527
            self.Encoding |= (1 << 1)
528
        self.Locale = None
529
        self._freeze = True
530
531
    def to_binary(self):
532
        packet = []
533
        if self.Locale:
534
            self.Encoding |= (1 << 0)
535
        if self.Text:
536
            self.Encoding |= (1 << 1)
537
        packet.append(uabin.Primitives.UInt8.pack(self.Encoding))
538
        if self.Locale:
539
            packet.append(uabin.Primitives.String.pack(self.Locale))
540
        if self.Text:
541
            packet.append(uabin.Primitives.String.pack(self.Text))
542
        return b''.join(packet)
543
544
    @staticmethod
545
    def from_binary(data):
546
        obj = LocalizedText()
547
        obj.Encoding = ord(data.read(1))
548
        if obj.Encoding & (1 << 0):
549
            obj.Locale = uabin.Primitives.String.unpack(data)
550
        if obj.Encoding & (1 << 1):
551
            obj.Text = uabin.Primitives.String.unpack(data)
552
        return obj
553
554
    def to_string(self):
555
        # FIXME: use local
556
        if self.Text is None:
557
            return ""
558
        return self.Text
559
560
    def __str__(self):
561
        return 'LocalizedText(' + 'Encoding:' + str(self.Encoding) + ', ' + \
562
            'Locale:' + str(self.Locale) + ', ' + \
563
            'Text:' + str(self.Text) +')'
564
    __repr__ = __str__
565
566
    def __eq__(self, other):
567
        if isinstance(other, LocalizedText) and self.Locale == other.Locale and self.Text == other.Text:
568
            return True
569
        return False
570
571
    def __ne__(self, other):
572
        return not self.__eq__(other)
573
574
575
class ExtensionObject(FrozenClass):
576
    """
577
    Any UA object packed as an ExtensionObject
578
579
    :ivar TypeId:
580
    :vartype TypeId: NodeId
581
    :ivar Body:
582
    :vartype Body: bytes
583
    """
584
    ua_switches = {
585
        'Body': ('Encoding', 0),
586
            }
587
588
    ua_types = (
589
        ("TypeId", "NodeId"),
590
        ("Encoding", "Byte"),
591
        ("Body", "ByteString"),
592
    )
593
594
    def __init__(self):
595
        self.TypeId = NodeId()
596
        self.Encoding = 0
597
        self.Body = None
598
        self._freeze = True
599
600
    def to_binary(self):
601
        packet = []
602
        if self.Body:
603
            self.Encoding = 0x01
604
        packet.append(self.TypeId.to_binary())
605
        packet.append(uabin.Primitives.UInt8.pack(self.Encoding))
606
        if self.Body:
607
            packet.append(uabin.Primitives.ByteString.pack(self.Body))
608
        return b''.join(packet)
609
610
    @staticmethod
611
    def from_binary(data):
612
        obj = ExtensionObject()
613
        obj.TypeId = NodeId.from_binary(data)
614
        obj.Encoding = uabin.Primitives.UInt8.unpack(data)
615
        if obj.Encoding & (1 << 0):
616
            obj.Body = uabin.Primitives.ByteString.unpack(data)
617
        return obj
618
619
    @staticmethod
620
    def from_object(obj):
621
        ext = ExtensionObject()
622
        oid = getattr(ObjectIds, "{0}_Encoding_DefaultBinary".format(obj.__class__.__name__))
623
        ext.TypeId = FourByteNodeId(oid)
624
        ext.Body = obj.to_binary()
625
        return ext
626
627
    def __str__(self):
628
        size = len(self.Body) if self.Body is not None else None
629
        return 'ExtensionObject(' + 'TypeId:' + str(self.TypeId) + ', ' + \
630
            'Encoding:' + str(self.Encoding) + ', ' + str(size) + ' bytes)'
631
632
    __repr__ = __str__
633
634
635
class VariantType(Enum):
636
    """
637
    The possible types of a variant.
638
639
    :ivar Null:
640
    :ivar Boolean:
641
    :ivar SByte:
642
    :ivar Byte:
643
    :ivar Int16:
644
    :ivar UInt16:
645
    :ivar Int32:
646
    :ivar UInt32:
647
    :ivar Int64:
648
    :ivar UInt64:
649
    :ivar Float:
650
    :ivar Double:
651
    :ivar String:
652
    :ivar DateTime:
653
    :ivar Guid:
654
    :ivar ByteString:
655
    :ivar XmlElement:
656
    :ivar NodeId:
657
    :ivar ExpandedNodeId:
658
    :ivar StatusCode:
659
    :ivar QualifiedName:
660
    :ivar LocalizedText:
661
    :ivar ExtensionObject:
662
    :ivar DataValue:
663
    :ivar Variant:
664
    :ivar DiagnosticInfo:
665
    """
666
667
    Null = 0
668
    Boolean = 1
669
    SByte = 2
670
    Byte = 3
671
    Int16 = 4
672
    UInt16 = 5
673
    Int32 = 6
674
    UInt32 = 7
675
    Int64 = 8
676
    UInt64 = 9
677
    Float = 10
678
    Double = 11
679
    String = 12
680
    DateTime = 13
681
    Guid = 14
682
    ByteString = 15
683
    XmlElement = 16
684
    NodeId = 17
685
    ExpandedNodeId = 18
686
    StatusCode = 19
687
    QualifiedName = 20
688
    LocalizedText = 21
689
    ExtensionObject = 22
690
    DataValue = 23
691
    Variant = 24
692
    DiagnosticInfo = 25
693
694
695
class VariantTypeCustom(object):
696
    """
697
    Looks like sometime we get variant with other values than those
698
    defined in VariantType.
699
    FIXME: We should not need this class, as far as I iunderstand the spec
700
    variants can only be of VariantType
701
    """
702
703
    def __init__(self, val):
704
        self.name = "Custom"
705
        self.value = val
706
        if self.value > 0b00111111:
707
            raise UaError("Cannot create VariantType. VariantType must be {0} > x > {1}, received {2}".format(0b111111, 25, val))
708
709
    def __str__(self):
710
        return "VariantType.Custom:{0}".format(self.value)
711
    __repr__ = __str__
712
713
    def __eq__(self, other):
714
        return self.value == other.value
715
716
717
class Variant(FrozenClass):
718
    """
719
    Create an OPC-UA Variant object.
720
    if no argument a Null Variant is created.
721
    if not variant type is given, attemps to guess type from python type
722
    if a variant is given as value, the new objects becomes a copy of the argument
723
724
    :ivar Value:
725
    :vartype Value: Any supported type
726
    :ivar VariantType:
727
    :vartype VariantType: VariantType
728
    :ivar Dimension:
729
    :vartype Dimensions: The length of each dimensions. Usually guessed from value.
730
    :ivar is_array:
731
    :vartype is_array: If the variant is an array. Usually guessed from value.
732
    """
733
734
    def __init__(self, value=None, varianttype=None, dimensions=None, is_array=None):
735
        self.Value = value
736
        self.VariantType = varianttype
737
        self.Dimensions = dimensions
738
        self.is_array = is_array
739
        if self.is_array is None:
740
            if isinstance(value, (list, tuple)):
741
                self.is_array = True
742
            else:
743
                self.is_array = False
744
        self._freeze = True
745
        if isinstance(value, Variant):
746
            self.Value = value.Value
747
            self.VariantType = value.VariantType
748
        if self.VariantType is None:
749
            self.VariantType = self._guess_type(self.Value)
750
        if self.Value is None and not self.is_array and self.VariantType not in (
751
                VariantType.Null,
752
                VariantType.String,
753
                VariantType.DateTime):
754
            raise UaError("Non array Variant of type {0} cannot have value None".format(self.VariantType))
755
        if self.Dimensions is None and isinstance(self.Value, (list, tuple)):
756
            dims = get_shape(self.Value)
757
            if len(dims) > 1:
758
                self.Dimensions = dims
759
760
    def __eq__(self, other):
761
        if isinstance(other, Variant) and self.VariantType == other.VariantType and self.Value == other.Value:
762
            return True
763
        return False
764
765
    def __ne__(self, other):
766
        return not self.__eq__(other)
767
768
    def _guess_type(self, val):
769
        if isinstance(val, (list, tuple)):
770
            error_val = val
771
        while isinstance(val, (list, tuple)):
772
            if len(val) == 0:
773
                raise UaError("could not guess UA type of variable {0}".format(error_val))
774
            val = val[0]
775
        if val is None:
776
            return VariantType.Null
777
        elif isinstance(val, bool):
778
            return VariantType.Boolean
779
        elif isinstance(val, float):
780
            return VariantType.Double
781
        elif isinstance(val, IntEnum):
782
            return VariantType.Int32
783
        elif isinstance(val, int):
784
            return VariantType.Int64
785
        elif type(val) in (str, unicode):
786
            return VariantType.String
787
        elif isinstance(val, bytes):
788
            return VariantType.ByteString
789
        elif isinstance(val, datetime):
790
            return VariantType.DateTime
791
        elif isinstance(val, uuid.UUID):
792
            return VariantType.Guid
793
        else:
794
            if isinstance(val, object):
795
                try:
796
                    return getattr(VariantType, val.__class__.__name__)
797
                except AttributeError:
798
                    return VariantType.ExtensionObject
799
            else:
800
                raise UaError("Could not guess UA type of {0} with type {1}, specify UA type".format(val, type(val)))
801
802
    def __str__(self):
803
        return "Variant(val:{0!s},type:{1})".format(self.Value, self.VariantType)
804
    __repr__ = __str__
805 View Code Duplication
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
806
    def to_binary(self):
807
        from opcua.ua.ua_binary import variant_to_binary
808
        return variant_to_binary(self)
809
810
def _split_list(l, n):
811
    n = max(1, n)
812
    return [l[i:i + n] for i in range(0, len(l), n)]
813
814
815
def flatten_and_get_shape(mylist):
816
    dims = []
817
    dims.append(len(mylist))
818
    while isinstance(mylist[0], (list, tuple)):
819
        dims.append(len(mylist[0]))
820
        mylist = [item for sublist in mylist for item in sublist]
821
        if len(mylist) == 0:
822
            break
823
    return mylist, dims
824
825
826
def flatten(mylist):
827
    if mylist is None:
828
        return None
829
    elif len(mylist) == 0:
830
        return mylist
831
    while isinstance(mylist[0], (list, tuple)):
832
        mylist = [item for sublist in mylist for item in sublist]
833
        if len(mylist) == 0:
834
            break
835
    return mylist
836
837
838
def get_shape(mylist):
839
    dims = []
840
    while isinstance(mylist, (list, tuple)):
841
        dims.append(len(mylist))
842
        if len(mylist) == 0:
843
            break
844
        mylist = mylist[0]
845
    return dims
846
847
848
class DataValue(FrozenClass):
849
    """
850
    A value with an associated timestamp, and quality.
851
    Automatically generated from xml , copied and modified here to fix errors in xml spec
852
853
    :ivar Value:
854
    :vartype Value: Variant
855
    :ivar StatusCode:
856
    :vartype StatusCode: StatusCode
857
    :ivar SourceTimestamp:
858
    :vartype SourceTimestamp: datetime
859
    :ivar SourcePicoSeconds:
860
    :vartype SourcePicoSeconds: int
861
    :ivar ServerTimestamp:
862
    :vartype ServerTimestamp: datetime
863
    :ivar ServerPicoseconds:
864
    :vartype ServerPicoseconds: int
865
    """
866
867
    ua_switches = {
868
        'Value': ('Encoding', 0),
869
        'StatusCode': ('Encoding', 1),
870
        'SourceTimestamp': ('Encoding', 2),
871
        'ServerTimestamp': ('Encoding', 3),
872
        'SourcePicoseconds': ('Encoding', 4),
873
        'ServerPicoseconds': ('Encoding', 5),
874
               }
875
876
    ua_types = (
877
        ('Encoding', 'UInt8'),
878
        ('Value', 'Variant'),
879
        ('StatusCode', 'StatusCode'),
880
        ('SourceTimestamp', 'DateTime'),
881
        ('SourcePicoseconds', 'UInt16'),
882
        ('ServerTimestamp', 'DateTime'),
883
        ('ServerPicoseconds', 'UInt16'),
884
               )
885
886
    def __init__(self, variant=None, status=None):
887
        self.Encoding = 0
888
        if not isinstance(variant, Variant):
889
            variant = Variant(variant)
890
        self.Value = variant
891
        if status is None:
892
            self.StatusCode = StatusCode()
893
        else:
894
            self.StatusCode = status
895
        self.SourceTimestamp = None  # DateTime()
896
        self.SourcePicoseconds = None
897
        self.ServerTimestamp = None  # DateTime()
898
        self.ServerPicoseconds = None
899
        self._freeze = True
900
901
    def to_binary(self):
902
        packet = []
903
        if self.Value:
904
            self.Encoding |= (1 << 0)
905
        if self.StatusCode:
906
            self.Encoding |= (1 << 1)
907
        if self.SourceTimestamp:
908
            self.Encoding |= (1 << 2)
909
        if self.ServerTimestamp:
910
            self.Encoding |= (1 << 3)
911
        if self.SourcePicoseconds:
912
            self.Encoding |= (1 << 4)
913
        if self.ServerPicoseconds:
914
            self.Encoding |= (1 << 5)
915
        packet.append(uabin.Primitives.UInt8.pack(self.Encoding))
916
        if self.Value:
917
            packet.append(self.Value.to_binary())
918
        if self.StatusCode:
919
            packet.append(self.StatusCode.to_binary())
920
        if self.SourceTimestamp:
921
            packet.append(uabin.Primitives.DateTime.pack(self.SourceTimestamp))  # self.SourceTimestamp.to_binary())
922
        if self.ServerTimestamp:
923
            packet.append(uabin.Primitives.DateTime.pack(self.ServerTimestamp))  # self.ServerTimestamp.to_binary())
924
        if self.SourcePicoseconds:
925
            packet.append(uabin.Primitives.UInt16.pack(self.SourcePicoseconds))
926
        if self.ServerPicoseconds:
927
            packet.append(uabin.Primitives.UInt16.pack(self.ServerPicoseconds))
928
        return b''.join(packet)
929
930
    @staticmethod
931
    def from_binary(data):
932
        encoding = ord(data.read(1))
933
        if encoding & (1 << 0):
934
            value = Variant.from_binary(data)
935
        else:
936
            value = None
937
        if encoding & (1 << 1):
938
            status = StatusCode.from_binary(data)
939
        else:
940
            status = None
941
        obj = DataValue(value, status)
942
        obj.Encoding = encoding
943
        if obj.Encoding & (1 << 2):
944
            obj.SourceTimestamp = uabin.Primitives.DateTime.unpack(data)  # DateTime.from_binary(data)
945
        if obj.Encoding & (1 << 3):
946
            obj.ServerTimestamp = uabin.Primitives.DateTime.unpack(data)  # DateTime.from_binary(data)
947
        if obj.Encoding & (1 << 4):
948
            obj.SourcePicoseconds = uabin.Primitives.UInt16.unpack(data)
949
        if obj.Encoding & (1 << 5):
950
            obj.ServerPicoseconds = uabin.Primitives.UInt16.unpack(data)
951
        return obj
952
953
    def __str__(self):
954
        s = 'DataValue(Value:{0}'.format(self.Value)
955
        if self.StatusCode is not None:
956
            s += ', StatusCode:{0}'.format(self.StatusCode)
957
        if self.SourceTimestamp is not None:
958
            s += ', SourceTimestamp:{0}'.format(self.SourceTimestamp)
959
        if self.ServerTimestamp is not None:
960
            s += ', ServerTimestamp:{0}'.format(self.ServerTimestamp)
961
        if self.SourcePicoseconds is not None:
962
            s += ', SourcePicoseconds:{0}'.format(self.SourcePicoseconds)
963
        if self.ServerPicoseconds is not None:
964
            s += ', ServerPicoseconds:{0}'.format(self.ServerPicoseconds)
965
        s += ')'
966
        return s
967
968
    __repr__ = __str__
969
970
971
def datatype_to_varianttype(int_type):
972
    """
973
    Takes a NodeId or int and return a VariantType
974
    This is only supported if int_type < 63 due to VariantType encoding
975
    At low level we do not have access to address space thus decoding is limited
976
    a better version of this method can be find in ua_utils.py
977
    """
978
    if isinstance(int_type, NodeId):
979
        int_type = int_type.Identifier
980
981
    if int_type <= 25:
982
        return VariantType(int_type)
983
    else:
984
        return VariantTypeCustom(int_type)
985
986
987
def get_default_value(vtype):
988
    """
989
    Given a variant type return default value for this type
990
    """
991
    if vtype == VariantType.Null:
992
        return None
993
    elif vtype == VariantType.Boolean:
994
        return False
995
    elif vtype in (VariantType.SByte, VariantType.Byte):
996
        return 0
997
    elif vtype == VariantType.ByteString:
998
        return b""
999
    elif 4 <= vtype.value <= 9:
1000
        return 0
1001
    elif vtype in (VariantType.Float, VariantType.Double):
1002
        return 0.0
1003
    elif vtype == VariantType.String:
1004
        return None  # a string can be null
1005
    elif vtype == VariantType.DateTime:
1006
        return datetime.utcnow()
1007
    elif vtype == VariantType.Guid:
1008
        return uuid.uuid4()
1009
    elif vtype == VariantType.XmlElement:
1010
        return None  #Not sure this is correct
1011
    elif vtype == VariantType.NodeId:
1012
        return NodeId()
1013
    elif vtype == VariantType.ExpandedNodeId:
1014
        return NodeId()
1015
    elif vtype == VariantType.StatusCode:
1016
        return StatusCode()
1017
    elif vtype == VariantType.QualifiedName:
1018
        return QualifiedName()
1019
    elif vtype == VariantType.LocalizedText:
1020
        return LocalizedText()
1021
    elif vtype == VariantType.ExtensionObject:
1022
        return ExtensionObject()
1023
    elif vtype == VariantType.DataValue:
1024
        return DataValue()
1025
    elif vtype == VariantType.Variant:
1026
        return Variant()
1027
    else:
1028
        raise RuntimeError("function take a uatype as argument, got:", vtype)
1029
1030
1031
# These dictionnaries are used to register extensions classes for automatic
1032
# decoding and encoding
1033
extension_object_classes = {}
1034
extension_object_ids = {}
1035
1036
1037
def register_extension_object(name, nodeid, class_type):
1038
    """
1039
    Register a new extension object for automatic decoding and make them available in ua module
1040
    """
1041
    logger.warning("registring new extension object: %s %s %s", name, nodeid, class_type)
1042
    extension_object_classes[nodeid] = class_type
1043
    extension_object_ids[name] = nodeid
1044
    # FIXME: Next line is not exactly a Python best practices, so feel free to propose something else
1045
    import opcua.ua
1046
    setattr(opcua.ua, name, class_type)
1047
1048
def get_extensionobject_class_type(typeid):
1049
    """
1050
    Returns the registered class type for typid of an extension object
1051
    """
1052
    if typeid in extension_object_classes:
1053
        return extension_object_classes[typeid]
1054
    else:
1055
        return None
1056
1057