Test Failed
Pull Request — master (#490)
by Olivier
02:59
created

NodeId   B

Complexity

Total Complexity 45

Size/Duplication

Total Lines 151
Duplicated Lines 0 %

Test Coverage

Coverage 87.29%

Importance

Changes 1
Bugs 0 Features 0
Metric Value
dl 0
loc 151
ccs 103
cts 118
cp 0.8729
rs 8.3673
c 1
b 0
f 0
wmc 45

12 Methods

Rating   Name   Duplication   Size   Complexity  
D to_string() 0 23 10
D __init__() 0 25 8
A __eq__() 0 2 1
F _from_string() 0 38 11
A is_null() 0 4 2
A __hash__() 0 2 1
A __lt__() 0 4 2
A __ne__() 0 2 1
A __str__() 0 2 1
A from_string() 0 6 2
A _key() 0 5 2
A has_null_identifier() 0 6 4

How to fix   Complexity   

Complex Class

Complex classes like NodeId often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
implement ua datatypes
3
"""
4
5 1
import logging
6 1
import struct
7 1
from enum import Enum, IntEnum
8 1
from datetime import datetime
9 1
import sys
10 1
import os
11 1
import uuid
12 1
import re
13 1
import itertools
14
15 1
from opcua.ua import ua_binary as uabin
16 1
from opcua.ua import status_codes
17 1
from opcua.ua import ObjectIds
18 1
from opcua.ua.uaerrors import UaError
19 1
from opcua.ua.uaerrors import UaStatusCodeError
20 1
from opcua.ua.uaerrors import UaStringParsingError
21 1
22
23
logger = logging.getLogger(__name__)
24 1
25
26
if sys.version_info.major > 2:
27 1
    unicode = str
28
def get_win_epoch():
29 1
    return uabin.win_epoch_to_datetime(0)
30 1
31
32
class _FrozenClass(object):
33 1
34
    """
35
    Make it impossible to add members to a class.
36
    Not pythonic at all but we found out it prevents many many
37
    bugs in use of protocol structures
38
    """
39
    _freeze = False
40 1
41
    def __setattr__(self, key, value):
42 1
        if self._freeze and not hasattr(self, key):
43 1
            raise TypeError("Error adding member '{0}' to class '{1}', class is frozen, members are {2}".format(
44
                key, self.__class__.__name__, self.__dict__.keys()))
45
        object.__setattr__(self, key, value)
46 1
47
48
if "PYOPCUA_NO_TYPO_CHECK" in os.environ:
49 1
    # typo check is cpu consuming, but it will make debug easy.
50
    # if typo check is not need (in production), please set env PYOPCUA_NO_TYPO_CHECK.
51
    # this will make all uatype class inherit from object intead of _FrozenClass
52
    # and skip the typo check.
53
    FrozenClass = object
54
else:
55
    FrozenClass = _FrozenClass
56 1
57
58
class ValueRank(IntEnum):
59 1
    """
60
    Defines dimensions of a variable.
61
    This enum does not support all cases since ValueRank support any n>0
62
    but since it is an IntEnum it can be replace by a normal int
63
    """
64
    ScalarOrOneDimension = -3
65 1
    Any = -2
66 1
    Scalar = -1
67 1
    OneOrMoreDimensions = 0
68 1
    OneDimension = 1
69 1
    # the next names are not in spec but so common we express them here
70
    TwoDimensions = 2
71 1
    ThreeDimensions = 3
72 1
    FourDimensions = 4
73 1
74
75
class _MaskEnum(IntEnum):
76 1
77
    @classmethod
78 1
    def parse_bitfield(cls, the_int):
79
        """ Take an integer and interpret it as a set of enum values. """
80
        assert isinstance(the_int, int)
81 1
82
        return {cls(b) for b in cls._bits(the_int)}
83 1
84
    @classmethod
85 1
    def to_bitfield(cls, collection):
86
        """ Takes some enum values and creates an integer from them. """
87
        # make sure all elements are of the correct type (use itertools.tee in case we get passed an
88
        # iterator)
89
        iter1, iter2 = itertools.tee(iter(collection))
90 1
        assert all(isinstance(x, cls) for x in iter1)
91 1
92
        return sum(x.mask for x in iter2)
93 1
94
    @property
95 1
    def mask(self):
96
        return 1 << self.value
97 1
98
    @staticmethod
99 1
    def _bits(n):
100
        """ Iterate over the bits in n.
101
102
            e.g. bits(44) yields at 2, 3, 5
103
        """
104
        assert n >= 0  # avoid infinite recursion
105 1
106
        pos = 0
107 1
        while n:
108 1
            if n & 0x1:
109 1
                yield pos
110 1
            n = n // 2
111 1
            pos += 1
112 1
113
114
class AccessLevel(_MaskEnum):
115 1
    """
116
    Bit index to indicate what the access level is.
117
118
    Spec Part 3, appears multiple times, e.g. paragraph 5.6.2 Variable NodeClass
119
    """
120
    CurrentRead = 0
121 1
    CurrentWrite = 1
122 1
    HistoryRead = 2
123 1
    HistoryWrite = 3
124 1
    SemanticChange = 4
125 1
    StatusWrite = 5
126 1
    TimestampWrite = 6
127 1
128
129
class WriteMask(_MaskEnum):
130 1
    """
131
    Bit index to indicate which attribute of a node is writable
132
133
    Spec Part 3, Paragraph 5.2.7 WriteMask
134
    """
135
    AccessLevel = 0
136 1
    ArrayDimensions = 1
137 1
    BrowseName = 2
138 1
    ContainsNoLoops = 3
139 1
    DataType = 4
140 1
    Description = 5
141 1
    DisplayName = 6
142 1
    EventNotifier = 7
143 1
    Executable = 8
144 1
    Historizing = 9
145 1
    InverseName = 10
146 1
    IsAbstract = 11
147 1
    MinimumSamplingInterval = 12
148 1
    NodeClass = 13
149 1
    NodeId = 14
150 1
    Symmetric = 15
151 1
    UserAccessLevel = 16
152 1
    UserExecutable = 17
153 1
    UserWriteMask = 18
154 1
    ValueRank = 19
155 1
    WriteMask = 20
156 1
    ValueForVariableType = 21
157 1
158
159
class EventNotifier(_MaskEnum):
160 1
    """
161
    Bit index to indicate how a node can be used for events.
162
163
    Spec Part 3, appears multiple times, e.g. Paragraph 5.4 View NodeClass
164
    """
165
    SubscribeToEvents = 0
166 1
    # Reserved        = 1
167
    HistoryRead = 2
168 1
    HistoryWrite = 3
169 1
170
171
class StatusCode(FrozenClass):
172 1
173
    """
174
    :ivar value:
175
    :vartype value: int
176
    :ivar name:
177
    :vartype name: string
178
    :ivar doc:
179
    :vartype doc: string
180
    """
181
182 1
    ua_types = [
183 1
            ("value", "UInt32")
184 1
            ]
185 1
186
    def __init__(self, value=0):
187 1
        if isinstance(value, str):
188 1
            self.name = value
189 1
            self.value = getattr(status_codes.StatusCodes, value)
190
        else:
191 1
            self.value = value
192 1
            self.name, self.doc = status_codes.get_name_and_doc(value)
193
        self._freeze = True
194 1
195
    def to_binary(self):
196 1
        return uabin.Primitives.UInt32.pack(self.value)
197 1
198 1
    @staticmethod
199
    def from_binary(data):
200 1
        val = uabin.Primitives.UInt32.unpack(data)
201
        sc = StatusCode(val)
202
        return sc
203
204
    def check(self):
205
        """
206 1
        Raises an exception if the status code is anything else than 0 (good).
207 1
208
        Use the is_good() method if you do not want an exception.
209 1
        """
210
        if not self.is_good():
211
            raise UaStatusCodeError(self.value)
212
213 1
    def is_good(self):
214 1
        """
215 1
        return True if status is Good.
216
        """
217 1
        mask = 3 << 30
218
        if mask & self.value:
219 1
            return False
220
        else:
221 1
            return True
222
223 1
    def __str__(self):
224 1
        return 'StatusCode({0})'.format(self.name)
225
    __repr__ = __str__
226 1
227
    def __eq__(self, other):
228
        return self.value == other.value
229
230 1
    def __ne__(self, other):
231 1
        return not self.__eq__(other)
232 1
233 1
234 1
class NodeIdType(IntEnum):
235 1
    TwoByte = 0
236 1
    FourByte = 1
237
    Numeric = 2
238
    String = 3
239 1
    Guid = 4
240
    ByteString = 5
241
242
243
class NodeId(FrozenClass):
244
    """
245
    NodeId Object
246
247
    Args:
248
        identifier: The identifier might be an int, a string, bytes or a Guid
249
        namespaceidx(int): The index of the namespace
250
        nodeidtype(NodeIdType): The type of the nodeid if it cannor be guess or you want something special like twobyte nodeid or fourbytenodeid
251
252
253
    :ivar Identifier:
254
    :vartype Identifier: NodeId
255
    :ivar NamespaceIndex:
256
    :vartype NamespaceIndex: Int
257
    :ivar NamespaceUri:
258
    :vartype NamespaceUri: String
259 1
    :ivar ServerIndex:
260
    :vartype ServerIndex: Int
261 1
    """
262 1
    
263 1
    def __init__(self, identifier=None, namespaceidx=0, nodeidtype=None):
264 1
265 1
        self.Identifier = identifier
266 1
        self.NamespaceIndex = namespaceidx
267 1
        self.NodeIdType = nodeidtype
268 1
        self.NamespaceUri = ""
269 1
        self.ServerIndex = 0
270 1
        self._freeze = True
271 1
        if not isinstance(self.NamespaceIndex, int):
272 1
            raise UaError("NamespaceIndex must be an int")
273 1
        if self.Identifier is None:
274 1
            self.Identifier = 0
275 1
            self.NodeIdType = NodeIdType.TwoByte
276 1
            return
277 1
        if self.NodeIdType is None:
278 1
            if isinstance(self.Identifier, int):
279
                self.NodeIdType = NodeIdType.Numeric
280 1
            elif isinstance(self.Identifier, str):
281 1
                self.NodeIdType = NodeIdType.String
282
            elif isinstance(self.Identifier, bytes):
283
                self.NodeIdType = NodeIdType.ByteString
284
            elif isinstance(self.Identifier, uuid.UUID):
285 1
                self.NodeIdType = NodeIdType.Guid
286 1
            else:
287
                raise UaError("NodeId: Could not guess type of NodeId, set NodeIdType")
288 1
289 1
    def _key(self):
290
        if self.NodeIdType in (NodeIdType.TwoByte, NodeIdType.FourByte, NodeIdType.Numeric):
291 1
            # twobyte, fourbyte and numeric may represent the same node
292 1
            return (NodeIdType.Numeric, self.NamespaceIndex, self.Identifier)
293
        return (self.NodeIdType, self.NamespaceIndex, self.Identifier)
294 1
295 1
    def __eq__(self, node):
296
        return isinstance(node, NodeId) and self._key() == node._key()
297 1
298 1
    def __ne__(self, other):
299
        return not self.__eq__(other)
300 1
301 1
    def __hash__(self):
302
        return hash(self._key())
303 1
304
    def __lt__(self, other):
305 1
        if not isinstance(other, NodeId):
306 1
            raise AttributeError("Can only compare to NodeId")
307 1
        return self._key() < other._key()
308 1
309
    def is_null(self):
310 1
        if self.NamespaceIndex != 0:
311 1
            return False
312 1
        return self.has_null_identifier()
313 1
314 1
    def has_null_identifier(self):
315 1
        if not self.Identifier:
316
            return True
317 1
        if self.NodeIdType == NodeIdType.Guid and re.match(b'0.', self.Identifier):
318
            return True
319 1
        return False
320 1
321 1
    @staticmethod
322 1
    def from_string(string):
323
        try:
324 1
            return NodeId._from_string(string)
325
        except ValueError as ex:
326 1
            raise UaStringParsingError("Error parsing string {0}".format(string), ex)
327 1
328 1
    @staticmethod
329 1
    def _from_string(string):
330 1
        l = string.split(";")
331 1
        identifier = None
332 1
        namespace = 0
333 1
        ntype = None
334 1
        srv = None
335 1
        nsu = None
336 1
        for el in l:
337 1
            if not el:
338 1
                continue
339 1
            k, v = el.split("=", 1)
340 1
            k = k.strip()
341 1
            v = v.strip()
342 1
            if k == "ns":
343 1
                namespace = int(v)
344 1
            elif k == "i":
345 1
                ntype = NodeIdType.Numeric
346 1
                identifier = int(v)
347
            elif k == "s":
348
                ntype = NodeIdType.String
349 1
                identifier = v
350
            elif k == "g":
351
                ntype = NodeIdType.Guid
352 1
                identifier = v
353 1
            elif k == "b":
354
                ntype = NodeIdType.ByteString
355
                identifier = v
356 1
            elif k == "srv":
357 1
                srv = v
358 1
            elif k == "nsu":
359 1
                nsu = v
360 1
        if identifier is None:
361 1
            raise UaStringParsingError("Could not find identifier in string: " + string)
362
        nodeid = NodeId(identifier, namespace, ntype)
363 1
        nodeid.NamespaceUri = nsu
364 1
        nodeid.ServerIndex = srv
365 1
        return nodeid
366 1
367 1
    def to_string(self):
368 1
        string = ""
369 1
        if self.NamespaceIndex != 0:
370 1
            string += "ns={0};".format(self.NamespaceIndex)
371 1
        ntype = None
372 1
        if self.NodeIdType == NodeIdType.Numeric:
373 1
            ntype = "i"
374 1
        elif self.NodeIdType == NodeIdType.String:
375 1
            ntype = "s"
376
        elif self.NodeIdType == NodeIdType.TwoByte:
377
            ntype = "i"
378
        elif self.NodeIdType == NodeIdType.FourByte:
379
            ntype = "i"
380 1
        elif self.NodeIdType == NodeIdType.Guid:
381 1
            ntype = "g"
382
        elif self.NodeIdType == NodeIdType.ByteString:
383 1
            ntype = "b"
384
        string += "{0}={1}".format(ntype, self.Identifier)
385 1
        if self.ServerIndex:
386
            string = "srv=" + str(self.ServerIndex) + string
387 1
        if self.NamespaceUri:
388 1
            string += "nsu={0}".format(self.NamespaceUri)
389 1
        return string
390
391 1
    def __str__(self):
392 1
        return "{0}NodeId({1})".format(self.NodeIdType.name, self.to_string())
393 1
    __repr__ = __str__
394 1
395 1
396 1
class TwoByteNodeId(NodeId):
397 1
398 1
    def __init__(self, identifier):
399 1
        NodeId.__init__(self, identifier, 0, NodeIdType.TwoByte)
400
401 1
402 1
class FourByteNodeId(NodeId):
403
404 1
    def __init__(self, identifier, namespace=0):
405 1
        NodeId.__init__(self, identifier, namespace, NodeIdType.FourByte)
406
407
408
class NumericNodeId(NodeId):
409
410
    def __init__(self, identifier, namespace=0):
411
        NodeId.__init__(self, identifier, namespace, NodeIdType.Numeric)
412 1
413
414 1
class ByteStringNodeId(NodeId):
415 1
416 1
    def __init__(self, identifier, namespace=0):
417
        NodeId.__init__(self, identifier, namespace, NodeIdType.ByteString)
418 1
419 1
420 1
class GuidNodeId(NodeId):
421 1
422 1
    def __init__(self, identifier, namespace=0):
423 1
        NodeId.__init__(self, identifier, namespace, NodeIdType.Guid)
424 1
425 1
426 1
class StringNodeId(NodeId):
427 1
428 1
    def __init__(self, identifier, namespace=0):
429 1
        NodeId.__init__(self, identifier, namespace, NodeIdType.String)
430 1
431 1
432 1
ExpandedNodeId = NodeId
433
434
435
class QualifiedName(FrozenClass):
436 1
    """
437
    A string qualified with a namespace index.
438 1
    """
439
440
    def __init__(self, name=None, namespaceidx=0):
441 1
        if not isinstance(namespaceidx, int):
442
            raise UaError("namespaceidx must be an int")
443
        self.NamespaceIndex = namespaceidx
444 1
        self.Name = name
445
        self._freeze = True
446 1
447 1
    def to_string(self):
448
        return "{0}:{1}".format(self.NamespaceIndex, self.Name)
449
450 1
    @staticmethod
451
    def from_string(string):
452 1
        if ":" in string:
453 1
            try:
454
                idx, name = string.split(":", 1)
455
                idx = int(idx)
456 1
            except (TypeError, ValueError) as ex:
457
                raise UaStringParsingError("Error parsing string {0}".format(string), ex)
458 1
        else:
459 1
            idx = 0
460
            name = string
461
        return QualifiedName(name, idx)
462 1
463
    def to_binary(self):
464 1
        packet = []
465 1
        packet.append(uabin.Primitives.UInt16.pack(self.NamespaceIndex))
466
        packet.append(uabin.Primitives.String.pack(self.Name))
467
        return b''.join(packet)
468 1
469
    @staticmethod
470 1
    def from_binary(data):
471 1
        obj = QualifiedName()
472
        obj.NamespaceIndex = uabin.Primitives.UInt16.unpack(data)
473
        obj.Name = uabin.Primitives.String.unpack(data)
474 1
        return obj
475
476 1
    def __eq__(self, bname):
477 1
        return isinstance(bname, QualifiedName) and self.Name == bname.Name and self.NamespaceIndex == bname.NamespaceIndex
478
479
    def __ne__(self, other):
480 1
        return not self.__eq__(other)
481
482
    def __lt__(self, other):
483 1
        if not isinstance(other, QualifiedName):
484
            raise TypeError("Cannot compare QualifiedName and {0}".format(other))
485
        if self.NamespaceIndex == other.NamespaceIndex:
486
            return self.Name < other.Name
487
        else:
488 1
            return self.NamespaceIndex < other.NamespaceIndex
489 1
490
    def __str__(self):
491 1
        return 'QualifiedName({0}:{1})'.format(self.NamespaceIndex, self.Name)
492 1
493 1
    __repr__ = __str__
494
495 1
496 1
class LocalizedText(FrozenClass):
497
    """
498 1
    A string qualified with a namespace index.
499
    """
500 1
501 1
    ua_switches = {
502 1
        'Locale': ('Encoding', 0),
503 1
        'Text': ('Encoding', 1),
504 1
               }
505 1
506
    ua_types = (
507 1
        ('Encoding', 'UInt8'),
508 1
        ('Locale', 'CharArray'),
509 1
        ('Text', 'CharArray'),
510
               )
511 1
512 1
    def __init__(self, text=None):
513 1
        self.Encoding = 0
514 1
        if text is not None and not isinstance(text, str):
515 1
            raise ValueError("A LocalizedText object takes a string as argument, not a {}, {}".format(text, type(text)))
516
        self.Text = text
517 1
        if self.Text:
518
            self.Encoding |= (1 << 1)
519 1
        self.Locale = None
520 1
        self._freeze = True
521 1
522 1
    def to_binary(self):
523
        packet = []
524 1
        if self.Locale:
525 1
            self.Encoding |= (1 << 0)
526
        if self.Text:
527 1
            self.Encoding |= (1 << 1)
528
        packet.append(uabin.Primitives.UInt8.pack(self.Encoding))
529
        if self.Locale:
530 1
            packet.append(uabin.Primitives.String.pack(self.Locale))
531
        if self.Text:
532
            packet.append(uabin.Primitives.String.pack(self.Text))
533
        return b''.join(packet)
534
535
    @staticmethod
536
    def from_binary(data):
537
        obj = LocalizedText()
538 1
        obj.Encoding = ord(data.read(1))
539 1
        if obj.Encoding & (1 << 0):
540
            obj.Locale = uabin.Primitives.String.unpack(data)
541 1
        if obj.Encoding & (1 << 1):
542
            obj.Text = uabin.Primitives.String.unpack(data)
543
        return obj
544 1
545
    def to_string(self):
546
        # FIXME: use local
547
        if self.Text is None:
548
            return ""
549 1
        return self.Text
550
551
    def __str__(self):
552
        return 'LocalizedText(' + 'Encoding:' + str(self.Encoding) + ', ' + \
553
            'Locale:' + str(self.Locale) + ', ' + \
554 1
            'Text:' + str(self.Text) +')'
555 1
    __repr__ = __str__
556 1
557 1
    def __eq__(self, other):
558
        if isinstance(other, LocalizedText) and self.Locale == other.Locale and self.Text == other.Text:
559 1
            return True
560 1
        return False
561 1
562 1
    def __ne__(self, other):
563
        return not self.__eq__(other)
564 1
565 1
566 1
class ExtensionObject(FrozenClass):
567
    """
568 1
    Any UA object packed as an ExtensionObject
569 1
570 1
    :ivar TypeId:
571 1
    :vartype TypeId: NodeId
572
    :ivar Body:
573 1
    :vartype Body: bytes
574 1
    """
575 1 View Code Duplication
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
576
    ua_types = {
577 1
        "TypeId": "NodeId",
578
        "Encoding": "Byte",
579 1
        "Body": "ByteString"
580 1
    }
581 1
582
    def __init__(self):
583 1
        self.TypeId = NodeId()
584 1
        self.Encoding = 0
585 1
        self.Body = b''
586
        self._freeze = True
587 1
588
    def to_binary(self):
589 1
        packet = []
590
        if self.Body:
591 1
            self.Encoding = 0x01
592
        packet.append(self.TypeId.to_binary())
593 1
        packet.append(uabin.Primitives.UInt8.pack(self.Encoding))
594
        if self.Body:
595
            packet.append(uabin.Primitives.ByteString.pack(self.Body))
596
        return b''.join(packet)
597 1
598
    @staticmethod
599 1
    def from_binary(data):
600 1
        obj = ExtensionObject()
601 1
        obj.TypeId = NodeId.from_binary(data)
602 1
        obj.Encoding = uabin.Primitives.UInt8.unpack(data)
603
        if obj.Encoding & (1 << 0):
604 1
            obj.Body = uabin.Primitives.ByteString.unpack(data)
605 1
        return obj
606
607
    @staticmethod
608 1
    def from_object(obj):
609
        ext = ExtensionObject()
610
        oid = getattr(ObjectIds, "{0}_Encoding_DefaultBinary".format(obj.__class__.__name__))
611
        ext.TypeId = FourByteNodeId(oid)
612
        ext.Body = obj.to_binary()
613
        return ext
614
615
    def __str__(self):
616
        return 'ExtensionObject(' + 'TypeId:' + str(self.TypeId) + ', ' + \
617
            'Encoding:' + str(self.Encoding) + ', ' + str(len(self.Body)) + ' bytes)'
618 1
619
    __repr__ = __str__
620
621
622
class VariantType(Enum):
623
    """
624 1
    The possible types of a variant.
625 1
626 1
    :ivar Null:
627 1
    :ivar Boolean:
628 1
    :ivar SByte:
629
    :ivar Byte:
630 1
    :ivar Int16:
631 1
    :ivar UInt16:
632 1
    :ivar Int32:
633 1
    :ivar UInt32:
634 1
    :ivar Int64:
635 1
    :ivar UInt64:
636 1
    :ivar Float:
637 1
    :ivar Double:
638 1
    :ivar String:
639
    :ivar DateTime:
640 1
    :ivar Guid:
641
    :ivar ByteString:
642 1
    :ivar XmlElement:
643 1
    :ivar NodeId:
644 1
    :ivar ExpandedNodeId:
645 1
    :ivar StatusCode:
646
    :ivar QualifiedName:
647 1
    :ivar LocalizedText:
648
    :ivar ExtensionObject:
649 1
    :ivar DataValue:
650
    :ivar Variant:
651
    :ivar DiagnosticInfo:
652
    """
653
654
    Null = 0
655
    Boolean = 1
656
    SByte = 2
657 1
    Byte = 3
658
    Int16 = 4
659
    UInt16 = 5
660
    Int32 = 6
661 1
    UInt32 = 7
662
    Int64 = 8
663
    UInt64 = 9
664 1
    Float = 10
665
    Double = 11
666
    String = 12
667
    DateTime = 13
668
    Guid = 14
669
    ByteString = 15
670
    XmlElement = 16
671
    NodeId = 17
672
    ExpandedNodeId = 18
673
    StatusCode = 19
674
    QualifiedName = 20
675
    LocalizedText = 21
676
    ExtensionObject = 22
677
    DataValue = 23
678
    Variant = 24
679
    DiagnosticInfo = 25
680
681
682
class VariantTypeCustom(object):
683
    """
684
    Looks like sometime we get variant with other values than those
685
    defined in VariantType.
686
    FIXME: We should not need this class, as far as I iunderstand the spec
687
    variants can only be of VariantType
688
    """
689
690
    def __init__(self, val):
691
        self.name = "Custom"
692
        self.value = val
693
        if self.value > 0b00111111:
694
            raise UaError("Cannot create VariantType. VariantType must be {0} > x > {1}, received {2}".format(0b111111, 25, val))
695
696 1
    def __str__(self):
697 1
        return "VariantType.Custom:{0}".format(self.value)
698 1
    __repr__ = __str__
699 1
700 1
    def __eq__(self, other):
701 1
        return self.value == other.value
702 1
703 1
704 1
class Variant(FrozenClass):
705 1
    """
706 1
    Create an OPC-UA Variant object.
707 1
    if no argument a Null Variant is created.
708 1
    if not variant type is given, attemps to guess type from python type
709 1
    if a variant is given as value, the new objects becomes a copy of the argument
710 1
711 1
    :ivar Value:
712 1
    :vartype Value: Any supported type
713 1
    :ivar VariantType:
714 1
    :vartype VariantType: VariantType
715 1
    :ivar Dimension:
716 1
    :vartype Dimensions: The length of each dimensions. Usually guessed from value.
717 1
    :ivar is_array:
718 1
    :vartype is_array: If the variant is an array. Usually guessed from value.
719 1
    """
720 1
721 1
    def __init__(self, value=None, varianttype=None, dimensions=None, is_array=None):
722
        self.Value = value
723
        self.VariantType = varianttype
724 1
        self.Dimensions = dimensions
725
        self.is_array = is_array
726
        if self.is_array is None:
727
            if isinstance(value, (list, tuple)):
728
                self.is_array = True
729
            else:
730
                self.is_array = False
731
        self._freeze = True
732 1
        if isinstance(value, Variant):
733 1
            self.Value = value.Value
734 1
            self.VariantType = value.VariantType
735 1
        if self.VariantType is None:
736 1
            self.VariantType = self._guess_type(self.Value)
737
        if self.Value is None and not self.is_array and self.VariantType not in (
738 1
                VariantType.Null,
739
                VariantType.String,
740 1
                VariantType.DateTime):
741
            raise UaError("Non array Variant of type {0} cannot have value None".format(self.VariantType))
742 1
        if self.Dimensions is None and isinstance(self.Value, (list, tuple)):
743 1
            dims = get_shape(self.Value)
744
            if len(dims) > 1:
745
                self.Dimensions = dims
746 1
747
    def __eq__(self, other):
748
        if isinstance(other, Variant) and self.VariantType == other.VariantType and self.Value == other.Value:
749
            return True
750
        return False
751
752
    def __ne__(self, other):
753
        return not self.__eq__(other)
754
755
    def _guess_type(self, val):
756
        if isinstance(val, (list, tuple)):
757
            error_val = val
758
        while isinstance(val, (list, tuple)):
759
            if len(val) == 0:
760
                raise UaError("could not guess UA type of variable {0}".format(error_val))
761
            val = val[0]
762
        if val is None:
763 1
            return VariantType.Null
764 1
        elif isinstance(val, bool):
765 1
            return VariantType.Boolean
766 1
        elif isinstance(val, float):
767 1
            return VariantType.Double
768 1
        elif isinstance(val, IntEnum):
769 1
            return VariantType.Int32
770 1
        elif isinstance(val, int):
771
            return VariantType.Int64
772 1
        elif type(val) in (str, unicode):
773 1
            return VariantType.String
774 1
        elif isinstance(val, bytes):
775 1
            return VariantType.ByteString
776 1
        elif isinstance(val, datetime):
777 1
            return VariantType.DateTime
778 1
        elif isinstance(val, uuid.UUID):
779 1
            return VariantType.Guid
780
        else:
781
            if isinstance(val, object):
782
                try:
783
                    return getattr(VariantType, val.__class__.__name__)
784 1
                except AttributeError:
785 1
                    return VariantType.ExtensionObject
786 1
            else:
787 1
                raise UaError("Could not guess UA type of {0} with type {1}, specify UA type".format(val, type(val)))
788
789 1
    def __str__(self):
790 1
        return "Variant(val:{0!s},type:{1})".format(self.Value, self.VariantType)
791 1
    __repr__ = __str__
792 1
793
    def to_binary(self):
794 1
        from opcua.ua.ua_binary import variant_to_binary
795 1
        return variant_to_binary(self)
796
797 1
def _split_list(l, n):
798 1
    n = max(1, n)
799 1
    return [l[i:i + n] for i in range(0, len(l), n)]
800 1
801 1
802
def flatten_and_get_shape(mylist):
803 1
    dims = []
804 1
    dims.append(len(mylist))
805 1 View Code Duplication
    while isinstance(mylist[0], (list, tuple)):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
806 1
        dims.append(len(mylist[0]))
807 1
        mylist = [item for sublist in mylist for item in sublist]
808 1
        if len(mylist) == 0:
809 1
            break
810 1
    return mylist, dims
811 1
812 1
813 1
def flatten(mylist):
814 1
    if mylist is None:
815 1
        return None
816 1
    elif len(mylist) == 0:
817
        return mylist
818 1
    while isinstance(mylist[0], (list, tuple)):
819 1
        mylist = [item for sublist in mylist for item in sublist]
820 1
        if len(mylist) == 0:
821 1
            break
822
    return mylist
823 1
824 1
825 1
def get_shape(mylist):
826 1
    dims = []
827 1
    while isinstance(mylist, (list, tuple)):
828
        dims.append(len(mylist))
829
        if len(mylist) == 0:
830
            break
831 1
        mylist = mylist[0]
832 1
    return dims
833 1
834
835 1
class XmlElement(FrozenClass):
836 1
    """
837 1
    An XML element encoded as an UTF-8 string.
838 1
    """
839 1
840 1
    def __init__(self, value=None, binary=None):
841 1
        if binary is not None:
842 1
            self._binary_init(binary)
843 1
            self._freeze = True
844 1
            return
845 1
        self.Value = value
846 1
        self._freeze = True
847
848 1
    def to_binary(self):
849 1
        return uabin.Primitives.String.pack(self.Value)
850
851 1
    def __eq__(self, other):
852
        return isinstance(other, XmlElement) and self.Value == other.Value
853 1
854
    @staticmethod
855 1
    def from_binary(data):
856 1
        return XmlElement(binary=data)
857 1
858 1
    def _binary_init(self, data):
859 1
        self.Value = uabin.Primitives.String.unpack(data)
860 1
861 1
    def __str__(self):
862 1
        return 'XmlElement(Value:' + str(self.Value) + ')'
863
864 1
    __repr__ = __str__
865 1
866 1
867 1
class DataValue(FrozenClass):
868 1
    """
869
    A value with an associated timestamp, and quality.
870
    Automatically generated from xml , copied and modified here to fix errors in xml spec
871 1
872 1
    :ivar Value:
873 1
    :vartype Value: Variant
874 1
    :ivar StatusCode:
875 1
    :vartype StatusCode: StatusCode
876 1
    :ivar SourceTimestamp:
877 1
    :vartype SourceTimestamp: datetime
878 1
    :ivar SourcePicoSeconds:
879 1
    :vartype SourcePicoSeconds: int
880 1
    :ivar ServerTimestamp:
881 1
    :vartype ServerTimestamp: datetime
882 1
    :ivar ServerPicoseconds:
883
    :vartype ServerPicoseconds: int
884
    """
885 1
886
    ua_switches = {
887
        'Value': ('Encoding', 0),
888
        'StatusCode': ('Encoding', 1),
889
        'SourceTimestamp': ('Encoding', 2),
890 1
        'ServerTimestamp': ('Encoding', 3),
891
        'SourcePicoseconds': ('Encoding', 4),
892
        'ServerPicoseconds': ('Encoding', 5),
893
               }
894
895
    ua_types = (
896
        ('Encoding', 'UInt8'),
897
        ('Value', 'Variant'),
898
        ('StatusCode', 'StatusCode'),
899
        ('SourceTimestamp', 'DateTime'),
900
        ('SourcePicoseconds', 'UInt16'),
901 1
        ('ServerTimestamp', 'DateTime'),
902 1
        ('ServerPicoseconds', 'UInt16'),
903 1
               )
904 1
905 1
    def __init__(self, variant=None, status=None):
906 1
        self.Encoding = 0
907 1
        if not isinstance(variant, Variant):
908 1
            variant = Variant(variant)
909 1
        self.Value = variant
910 1
        if status is None:
911
            self.StatusCode = StatusCode()
912
        else:
913 1
            self.StatusCode = status
914 1
        self.SourceTimestamp = None  # DateTime()
915 1
        self.SourcePicoseconds = None
916 1
        self.ServerTimestamp = None  # DateTime()
917 1
        self.ServerPicoseconds = None
918 1
        self._freeze = True
919 1
920 1
    def to_binary(self):
921
        packet = []
922
        if self.Value:
923 1
            self.Encoding |= (1 << 0)
924
        if self.StatusCode:
925
            self.Encoding |= (1 << 1)
926
        if self.SourceTimestamp:
927
            self.Encoding |= (1 << 2)
928 1
        if self.ServerTimestamp:
929 1
            self.Encoding |= (1 << 3)
930 1
        if self.SourcePicoseconds:
931 1
            self.Encoding |= (1 << 4)
932 1
        if self.ServerPicoseconds:
933 1
            self.Encoding |= (1 << 5)
934 1
        packet.append(uabin.Primitives.UInt8.pack(self.Encoding))
935
        if self.Value:
936 1
            packet.append(self.Value.to_binary())
937 1
        if self.StatusCode:
938
            packet.append(self.StatusCode.to_binary())
939 1
        if self.SourceTimestamp:
940 1
            packet.append(uabin.Primitives.DateTime.pack(self.SourceTimestamp))  # self.SourceTimestamp.to_binary())
941
        if self.ServerTimestamp:
942 1
            packet.append(uabin.Primitives.DateTime.pack(self.ServerTimestamp))  # self.ServerTimestamp.to_binary())
943
        if self.SourcePicoseconds:
944 1
            packet.append(uabin.Primitives.UInt16.pack(self.SourcePicoseconds))
945
        if self.ServerPicoseconds:
946 1
            packet.append(uabin.Primitives.UInt16.pack(self.ServerPicoseconds))
947 1
        return b''.join(packet)
948
949 1
    @staticmethod
950
    def from_binary(data):
951
        encoding = ord(data.read(1))
952 1
        if encoding & (1 << 0):
953
            value = Variant.from_binary(data)
954
        else:
955 1
            value = None
956
        if encoding & (1 << 1):
957
            status = StatusCode.from_binary(data)
958
        else:
959
            status = None
960
        obj = DataValue(value, status)
961
        obj.Encoding = encoding
962
        if obj.Encoding & (1 << 2):
963
            obj.SourceTimestamp = uabin.Primitives.DateTime.unpack(data)  # DateTime.from_binary(data)
964
        if obj.Encoding & (1 << 3):
965
            obj.ServerTimestamp = uabin.Primitives.DateTime.unpack(data)  # DateTime.from_binary(data)
966
        if obj.Encoding & (1 << 4):
967
            obj.SourcePicoseconds = uabin.Primitives.UInt16.unpack(data)
968
        if obj.Encoding & (1 << 5):
969
            obj.ServerPicoseconds = uabin.Primitives.UInt16.unpack(data)
970
        return obj
971
972
    def __str__(self):
973 View Code Duplication
        s = 'DataValue(Value:{0}'.format(self.Value)
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
974 1
        if self.StatusCode is not None:
975 1
            s += ', StatusCode:{0}'.format(self.StatusCode)
976 1
        if self.SourceTimestamp is not None:
977 1
            s += ', SourceTimestamp:{0}'.format(self.SourceTimestamp)
978 1
        if self.ServerTimestamp is not None:
979 1
            s += ', ServerTimestamp:{0}'.format(self.ServerTimestamp)
980 1
        if self.SourcePicoseconds is not None:
981
            s += ', SourcePicoseconds:{0}'.format(self.SourcePicoseconds)
982 1
        if self.ServerPicoseconds is not None:
983 1
            s += ', ServerPicoseconds:{0}'.format(self.ServerPicoseconds)
984 1
        s += ')'
985 1
        return s
986 1
987 1
    __repr__ = __str__
988
989 1
990 1
def datatype_to_varianttype(int_type):
991 1
    """
992 1
    Takes a NodeId or int and return a VariantType
993 1
    This is only supported if int_type < 63 due to VariantType encoding
994 1
    At low level we do not have access to address space thus decoding is limited
995 1
    a better version of this method can be find in ua_utils.py
996 1
    """
997 1
    if isinstance(int_type, NodeId):
998 1
        int_type = int_type.Identifier
999 1
1000
    if int_type <= 25:
1001 1
        return VariantType(int_type)
1002
    else:
1003 1
        return VariantTypeCustom(int_type)
1004 1
1005 1
1006 1
def get_default_value(vtype):
1007 1
    """
1008 1
    Given a variant type return default value for this type
1009 1
    """
1010 1
    if vtype == VariantType.Null:
1011 1
        return None
1012 1
    elif vtype == VariantType.Boolean:
1013
        return False
1014 1
    elif vtype in (VariantType.SByte, VariantType.Byte):
1015
        return 0
1016 1
    elif vtype == VariantType.ByteString:
1017
        return b""
1018 1
    elif 4 <= vtype.value <= 9:
1019
        return 0
1020 1
    elif vtype in (VariantType.Float, VariantType.Double):
1021 1
        return 0.0
1022 1
    elif vtype == VariantType.String:
1023
        return None  # a string can be null
1024
    elif vtype == VariantType.DateTime:
1025 1
        return datetime.utcnow()
1026 1
    elif vtype == VariantType.Guid:
1027
        return uuid.uuid4()
1028
    elif vtype == VariantType.XmlElement:
1029 1
        return None  #Not sure this is correct
1030 1
    elif vtype == VariantType.NodeId:
1031 1
        return NodeId()
1032 1
    elif vtype == VariantType.ExpandedNodeId:
1033 1
        return NodeId()
1034 1
    elif vtype == VariantType.StatusCode:
1035 1
        return StatusCode()
1036
    elif vtype == VariantType.QualifiedName:
1037 1
        return QualifiedName()
1038
    elif vtype == VariantType.LocalizedText:
1039 1
        return LocalizedText()
1040
    elif vtype == VariantType.ExtensionObject:
1041 1
        return ExtensionObject()
1042
    elif vtype == VariantType.DataValue:
1043
        return DataValue()
1044
    elif vtype == VariantType.Variant:
1045
        return Variant()
1046
    else:
1047
        raise RuntimeError("function take a uatype as argument, got:", vtype)
1048
1049
1050
# These dictionnaries are used to register extensions classes for automatic
1051
# decoding and encoding
1052
extension_object_classes = {}
1053
extension_object_ids = {}
1054
1055
1056 1
def register_extension_object(name, nodeid, class_type):
1057
    """
1058
    Register a new extension object for automatic decoding and make them available in ua module
1059 1
    """
1060
    logger.warning("registring new extension object: %s %s %s", name, nodeid, class_type)
1061
    extension_object_classes[nodeid] = class_type
1062
    extension_object_ids[name] = nodeid
1063
    # FIXME: Next line is not exactly a Python best practices, so feel free to propose something else
1064
    import opcua.ua
1065
    setattr(opcua.ua, name, class_type)
1066 1
1067
def get_extensionobject_class_type(typeid):
1068
    """
1069 1
    Returns the registered class type for typid of an extension object
1070 1
    """
1071
    if typeid in extension_object_classes:
1072 1
        return extension_object_classes[typeid]
1073
    else:
1074
        return None
1075
1076