Completed
Pull Request — master (#340)
by Olivier
03:37
created

get_default_value()   F

Complexity

Conditions 18

Size

Total Lines 40

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 7
CRAP Score 83.9355

Importance

Changes 1
Bugs 0 Features 1
Metric Value
cc 18
dl 0
loc 40
ccs 7
cts 17
cp 0.4118
crap 83.9355
rs 2.7087
c 1
b 0
f 1

How to fix   Complexity   

Complexity

Complex classes like get_default_value() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
implement ua datatypes
3
"""
4 1
import struct
5 1
from enum import Enum, IntEnum, EnumMeta
6 1
from datetime import datetime
7 1
import sys
8 1
import os
9 1
import uuid
10 1
import re
11 1
import itertools
12 1
13 1
from opcua.ua import ua_binary as uabin
14 1
from opcua.ua import status_codes
15
from opcua.ua import ObjectIds
16 1
from opcua.ua.uaerrors import UaError
17 1
from opcua.ua.uaerrors import UaStatusCodeError
18 1
from opcua.ua.uaerrors import UaStringParsingError
19 1
20
21
if sys.version_info.major > 2:
22 1
    unicode = str
23
def get_win_epoch():
24
    return uabin.win_epoch_to_datetime(0)
25
26 1
27
class _FrozenClass(object):
28
29 1
    """
30 1
    Make it impossible to add members to a class.
31 1
    Not pythonic at all but we found out it prevents many many
32
    bugs in use of protocol structures
33
    """
34 1
    _freeze = False
35
36
    def __setattr__(self, key, value):
37
        if self._freeze and not hasattr(self, key):
38 1
            raise TypeError("Error adding member '{}' to class '{}', class is frozen, members are {}".format(
39
                key, self.__class__.__name__, self.__dict__.keys()))
40
        object.__setattr__(self, key, value)
41 1
42
43
if "PYOPCUA_NO_TYPO_CHECK" in os.environ:
44 1
    # typo check is cpu consuming, but it will make debug easy.
45 1
    # if typo check is not need (in production), please set env PYOPCUA_NO_TYPO_CHECK.
46
    # this will make all uatype class inherit from object intead of _FrozenClass
47
    # and skip the typo check.
48
    FrozenClass = object
49 1
else:
50 1
    FrozenClass = _FrozenClass
51 1
52 1
53 1
class ValueRank(IntEnum):
54
    """
55
    Defines dimensions of a variable.
56 1
    This enum does not support all cases since ValueRank support any n>0
57 1
    but since it is an IntEnum it can be replace by a normal int
58 1
    """
59
    ScalarOrOneDimension = -3
60
    Any = -2
61
    Scalar = -1
62
    OneOrMoreDimensions = 0
63
    OneDimension = 1
64
    # the next names are not in spec but so common we express them here
65
    TwoDimensions = 2
66 1
    ThreeDimensions = 3
67
    FourDimensions = 4
68
69 1
70
class _MaskEnum(IntEnum):
71
72
    @classmethod
73 1
    def parse_bitfield(cls, the_int):
74 1
        """ Take an integer and interpret it as a set of enum values. """
75
        assert isinstance(the_int, int)
76
77 1
        return {cls(b) for b in cls._bits(the_int)}
78
79
    @classmethod
80 1
    def to_bitfield(cls, collection):
81
        """ Takes some enum values and creates an integer from them. """
82
        # make sure all elements are of the correct type (use itertools.tee in case we get passed an
83 1
        # iterator)
84 1
        iter1, iter2 = itertools.tee(iter(collection))
85 1
        assert all(isinstance(x, cls) for x in iter1)
86
87 1
        return sum(x.mask for x in iter2)
88
89
    @property
90 1
    def mask(self):
91 1
        return 1 << self.value
92
93 1
    @staticmethod
94 1
    def _bits(n):
95 1
        """ Iterate over the bits in n.
96 1
97 1
            e.g. bits(44) yields at 2, 3, 5
98 1
        """
99 1
        assert n >= 0  # avoid infinite recursion
100 1
101 1
        pos = 0
102 1
        while n:
103
            if n & 0x1:
104
                yield pos
105 1
            n = n // 2
106 1
            pos += 1
107 1
108
109 1
class AccessLevel(_MaskEnum):
110 1
    """
111 1
    Bit index to indicate what the access level is.
112 1
113 1
    Spec Part 3, appears multiple times, e.g. paragraph 5.6.2 Variable NodeClass
114 1
    """
115 1
    CurrentRead = 0
116 1
    CurrentWrite = 1
117 1
    HistoryRead = 2
118 1
    HistoryWrite = 3
119
    SemanticChange = 4
120
    StatusWrite = 5
121
    TimestampWrite = 6
122 1
123 1
124
class WriteMask(_MaskEnum):
125 1
    """
126
    Bit index to indicate which attribute of a node is writable
127 1
128 1
    Spec Part 3, Paragraph 5.2.7 WriteMask
129 1
    """
130 1
    AccessLevel = 0
131 1
    ArrayDimensions = 1
132 1
    BrowseName = 2
133 1
    ContainsNoLoops = 3
134 1
    DataType = 4
135 1
    Description = 5
136 1
    DisplayName = 6
137 1
    EventNotifier = 7
138 1
    Executable = 8
139 1
    Historizing = 9
140 1
    InverseName = 10
141
    IsAbstract = 11
142 1
    MinimumSamplingInterval = 12
143
    NodeClass = 13
144
    NodeId = 14
145
    Symmetric = 15
146
    UserAccessLevel = 16
147
    UserExecutable = 17
148
    UserWriteMask = 18
149
    ValueRank = 19
150
    WriteMask = 20
151
    ValueForVariableType = 21
152
153
154
class EventNotifier(_MaskEnum):
155
    """
156
    Bit index to indicate how a node can be used for events.
157
158
    Spec Part 3, appears multiple times, e.g. Paragraph 5.4 View NodeClass
159
    """
160 1
    SubscribeToEvents = 0
161 1
    # Reserved        = 1
162 1
    HistoryRead = 2
163 1
    HistoryWrite = 3
164 1
165 1
166 1
class StatusCode(FrozenClass):
167 1
    """
168 1
    :ivar value:
169 1
    :vartype value: int
170 1
    :ivar name:
171
    :vartype name: string
172
    :ivar doc:
173
    :vartype doc: string
174 1
    """
175 1
176
    def __init__(self, value=0):
177 1
        if isinstance(value, str):
178 1
            self.name = value
179 1
            self.value = getattr(status_codes.StatusCodes, value)
180 1
        else:
181 1
            self.value = value
182
            self.name, self.doc = status_codes.get_name_and_doc(value)
183
        self._freeze = True
184
185 1
    def to_binary(self):
186 1
        return uabin.Primitives.UInt32.pack(self.value)
187 1
188
    @staticmethod
189 1
    def from_binary(data):
190 1
        val = uabin.Primitives.UInt32.unpack(data)
191 1
        sc = StatusCode(val)
192 1
        return sc
193 1
194 1
    def check(self):
195
        """
196 1
        Raises an exception if the status code is anything else than 0 (good).
197 1
198
        Use the is_good() method if you do not want an exception.
199 1
        """
200 1
        if not self.is_good():
201 1
            raise UaStatusCodeError(self.value)
202 1
203
    def is_good(self):
204
        """
205 1
        return True if status is Good.
206 1
        """
207 1
        mask = 3 << 30
208
        if mask & self.value:
209
            return False
210 1
        else:
211 1
            return True
212 1
213
    def __str__(self):
214
        return 'StatusCode({})'.format(self.name)
215 1
    __repr__ = __str__
216 1
217 1
    def __eq__(self, other):
218 1
        return self.value == other.value
219 1
220 1
    def __ne__(self, other):
221 1
        return not self.__eq__(other)
222
223 1
224
class NodeIdType(Enum):
225
    TwoByte = 0
226 1
    FourByte = 1
227 1
    Numeric = 2
228 1
    String = 3
229 1
    Guid = 4
230 1
    ByteString = 5
231
232
233 1
class NodeId(FrozenClass):
234 1
    """
235 1
    NodeId Object
236 1
237 1
    Args:
238
        identifier: The identifier might be an int, a string, bytes or a Guid
239
        namespaceidx(int): The index of the namespace
240 1
        nodeidtype(NodeIdType): The type of the nodeid if it cannor be guess or you want something special like twobyte nodeid or fourbytenodeid
241
242
243 1
    :ivar Identifier:
244
    :vartype Identifier: NodeId
245
    :ivar NamespaceIndex:
246 1
    :vartype NamespaceIndex: Int
247 1
    :ivar NamespaceUri:
248 1
    :vartype NamespaceUri: String
249
    :ivar ServerIndex:
250
    :vartype ServerIndex: Int
251 1
    """
252 1
    
253 1
    def __init__(self, identifier=None, namespaceidx=0, nodeidtype=None):
254
255
        self.Identifier = identifier
256 1
        self.NamespaceIndex = namespaceidx
257 1
        self.NodeIdType = nodeidtype
258 1
        self.NamespaceUri = ""
259
        self.ServerIndex = 0
260
        self._freeze = True
261 1
        if not isinstance(self.NamespaceIndex, int):
262
            raise UaError("NamespaceIndex must be an int")
263
        if self.Identifier is None:
264
            self.Identifier = 0
265
            self.NodeIdType = NodeIdType.TwoByte
266
            return
267 1
        if self.NodeIdType is None:
268
            if isinstance(self.Identifier, int):
269 1
                self.NodeIdType = NodeIdType.Numeric
270 1
            elif isinstance(self.Identifier, str):
271
                self.NodeIdType = NodeIdType.String
272 1
            elif isinstance(self.Identifier, bytes):
273
                self.NodeIdType = NodeIdType.ByteString
274 1
            else:
275
                raise UaError("NodeId: Could not guess type of NodeId, set NodeIdType")
276
277
    def __key(self):
278
        if self.NodeIdType in (NodeIdType.TwoByte, NodeIdType.FourByte, NodeIdType.Numeric):  # twobyte, fourbyte and numeric may represent the same node
279
            return self.NamespaceIndex, self.Identifier
280
        else:
281 1
            return self.NodeIdType, self.NamespaceIndex, self.Identifier
282
283
    def __eq__(self, node):
284 1
        return isinstance(node, NodeId) and self.__key() == node.__key()
285
286
    def __ne__(self, other):
287
        return not self.__eq__(other)
288
289
    def __hash__(self):
290 1
        return hash(self.__key())
291 1
292 1
    def __lt__(self, other):
293 1
        if not isinstance(other, NodeId):
294 1
            raise AttributeError("Can only compare to NodeId")
295
        return self.__hash__() < other.__hash__()
296 1
297 1
    def is_null(self):
298 1
        if self.NamespaceIndex != 0:
299
            return False
300
        return self.has_null_identifier()
301 1
302
    def has_null_identifier(self):
303
        if not self.Identifier:
304 1
            return True
305 1
        if self.NodeIdType == NodeIdType.Guid and re.match(b'0.', self.Identifier):
306 1
            return True
307 1
        return False
308 1
309
    @staticmethod
310
    def from_string(string):
311 1
        try:
312
            return NodeId._from_string(string)
313
        except ValueError as ex:
314
            raise UaStringParsingError("Error parsing string {}".format(string), ex)
315 1
316 1
    @staticmethod
317 1
    def _from_string(string):
318 1
        l = string.split(";")
319 1
        identifier = None
320
        namespace = 0
321
        ntype = None
322 1
        srv = None
323
        nsu = None
324 1
        for el in l:
325 1
            if not el:
326 1
                continue
327
            k, v = el.split("=", 1)
328 1
            k = k.strip()
329 1
            v = v.strip()
330
            if k == "ns":
331 1
                namespace = int(v)
332
            elif k == "i":
333
                ntype = NodeIdType.Numeric
334 1
                identifier = int(v)
335
            elif k == "s":
336 1
                ntype = NodeIdType.String
337 1
                identifier = v
338 1
            elif k == "g":
339
                ntype = NodeIdType.Guid
340 1
                identifier = v
341 1
            elif k == "b":
342
                ntype = NodeIdType.ByteString
343
                identifier = v
344 1
            elif k == "srv":
345
                srv = v
346
            elif k == "nsu":
347
                nsu = v
348
        if identifier is None:
349
            raise UaStringParsingError("Could not find identifier in string: " + string)
350
        nodeid = NodeId(identifier, namespace, ntype)
351
        nodeid.NamespaceUri = nsu
352
        nodeid.ServerIndex = srv
353
        return nodeid
354
355 1
    def to_string(self):
356 1
        string = ""
357 1
        if self.NamespaceIndex != 0:
358 1
            string += "ns={};".format(self.NamespaceIndex)
359
        ntype = None
360 1
        if self.NodeIdType == NodeIdType.Numeric:
361 1
            ntype = "i"
362
        elif self.NodeIdType == NodeIdType.String:
363 1
            ntype = "s"
364
        elif self.NodeIdType == NodeIdType.TwoByte:
365 1
            ntype = "i"
366 1
        elif self.NodeIdType == NodeIdType.FourByte:
367 1
            ntype = "i"
368
        elif self.NodeIdType == NodeIdType.Guid:
369 1
            ntype = "g"
370
        elif self.NodeIdType == NodeIdType.ByteString:
371
            ntype = "b"
372
        string += "{}={}".format(ntype, self.Identifier)
373
        if self.ServerIndex:
374 1
            string = "srv=" + str(self.ServerIndex) + string
375 1
        if self.NamespaceUri:
376
            string += "nsu={}".format(self.NamespaceUri)
377 1
        return string
378
379
    def __str__(self):
380
        return "{}NodeId({})".format(self.NodeIdType.name, self.to_string())
381 1
    __repr__ = __str__
382 1
383 1
    def to_binary(self):
384
        if self.NodeIdType == NodeIdType.TwoByte:
385 1
            return struct.pack("<BB", self.NodeIdType.value, self.Identifier)
386
        elif self.NodeIdType == NodeIdType.FourByte:
387 1
            return struct.pack("<BBH", self.NodeIdType.value, self.NamespaceIndex, self.Identifier)
388
        elif self.NodeIdType == NodeIdType.Numeric:
389 1
            return struct.pack("<BHI", self.NodeIdType.value, self.NamespaceIndex, self.Identifier)
390
        elif self.NodeIdType == NodeIdType.String:
391
            return struct.pack("<BH", self.NodeIdType.value, self.NamespaceIndex) + \
392 1
                uabin.Primitives.String.pack(self.Identifier)
393
        elif self.NodeIdType == NodeIdType.ByteString:
394
            return struct.pack("<BH", self.NodeIdType.value, self.NamespaceIndex) + \
395
                uabin.Primitives.Bytes.pack(self.Identifier)
396 1
        elif self.NodeIdType == NodeIdType.Guid:
397 1
            return struct.pack("<BH", self.NodeIdType.value, self.NamespaceIndex) + \
398 1
                   uabin.Primitives.Guid.pack(self.Identifier)
399 1
        else:
400 1
            return struct.pack("<BH", self.NodeIdType.value, self.NamespaceIndex) + \
401 1
                self.Identifier.to_binary()
402 1
        # FIXME: Missing NNamespaceURI and ServerIndex
403
404
    @staticmethod
405 1
    def from_binary(data):
406
        nid = NodeId()
407
        encoding = ord(data.read(1))
408
        nid.NodeIdType = NodeIdType(encoding & 0b00111111)
409
410
        if nid.NodeIdType == NodeIdType.TwoByte:
411
            nid.Identifier = ord(data.read(1))
412
        elif nid.NodeIdType == NodeIdType.FourByte:
413
            nid.NamespaceIndex, nid.Identifier = struct.unpack("<BH", data.read(3))
414
        elif nid.NodeIdType == NodeIdType.Numeric:
415
            nid.NamespaceIndex, nid.Identifier = struct.unpack("<HI", data.read(6))
416
        elif nid.NodeIdType == NodeIdType.String:
417
            nid.NamespaceIndex = uabin.Primitives.UInt16.unpack(data)
418
            nid.Identifier = uabin.Primitives.String.unpack(data)
419
        elif nid.NodeIdType == NodeIdType.ByteString:
420
            nid.NamespaceIndex = uabin.Primitives.UInt16.unpack(data)
421
            nid.Identifier = uabin.Primitives.Bytes.unpack(data)
422
        elif nid.NodeIdType == NodeIdType.Guid:
423
            nid.NamespaceIndex = uabin.Primitives.UInt16.unpack(data)
424
            nid.Identifier = uabin.Primitives.Guid.unpack(data)
425
        else:
426 1
            raise UaError("Unknown NodeId encoding: " + str(nid.NodeIdType))
427 1
428 1
        if uabin.test_bit(encoding, 7):
429 1
            nid.NamespaceUri = uabin.Primitives.String.unpack(data)
430 1
        if uabin.test_bit(encoding, 6):
431 1
            nid.ServerIndex = uabin.Primitives.UInt32.unpack(data)
432 1
433 1
        return nid
434 1
435 1
436 1
class TwoByteNodeId(NodeId):
437 1
438 1
    def __init__(self, identifier):
439 1
        NodeId.__init__(self, identifier, 0, NodeIdType.TwoByte)
440 1
441 1
442 1
class FourByteNodeId(NodeId):
443 1
444
    def __init__(self, identifier, namespace=0):
445
        NodeId.__init__(self, identifier, namespace, NodeIdType.FourByte)
446
447
448
class NumericNodeId(NodeId):
449 1
450 1
    def __init__(self, identifier, namespace=0):
451 1
        NodeId.__init__(self, identifier, namespace, NodeIdType.Numeric)
452
453 1
454
class ByteStringNodeId(NodeId):
455 1
456 1
    def __init__(self, identifier, namespace=0):
457
        NodeId.__init__(self, identifier, namespace, NodeIdType.ByteString)
458 1
459 1
460
class GuidNodeId(NodeId):
461 1
462 1
    def __init__(self, identifier, namespace=0):
463
        NodeId.__init__(self, identifier, namespace, NodeIdType.Guid)
464 1
465 1
466 1
class StringNodeId(NodeId):
467 1
468
    def __init__(self, identifier, namespace=0):
469 1
        NodeId.__init__(self, identifier, namespace, NodeIdType.String)
470 1
471 1
472 1
ExpandedNodeId = NodeId
473 1
474 1
475
class QualifiedName(FrozenClass):
476 1
    """
477
    A string qualified with a namespace index.
478 1
    """
479 1
480 1
    def __init__(self, name=None, namespaceidx=0):
481 1
        if not isinstance(namespaceidx, int):
482
            raise UaError("namespaceidx must be an int")
483 1
        self.NamespaceIndex = namespaceidx
484
        self.Name = name
485 1
        self._freeze = True
486 1
487 1
    def to_string(self):
488 1
        return "{}:{}".format(self.NamespaceIndex, self.Name)
489 1
490 1
    @staticmethod
491 1
    def from_string(string):
492 1
        if ":" in string:
493 1
            try:
494 1
                idx, name = string.split(":", 1)
495 1
                idx = int(idx)
496 1
            except (TypeError, ValueError) as ex:
497 1
                raise UaStringParsingError("Error parsing string {}".format(string), ex)
498 1
        else:
499 1
            idx = 0
500 1
            name = string
501 1
        return QualifiedName(name, idx)
502 1
503 1
    def to_binary(self):
504 1
        packet = []
505 1
        packet.append(uabin.Primitives.UInt16.pack(self.NamespaceIndex))
506
        packet.append(uabin.Primitives.String.pack(self.Name))
507
        return b''.join(packet)
508 1
509
    @staticmethod
510
    def from_binary(data):
511 1
        obj = QualifiedName()
512 1
        obj.NamespaceIndex = uabin.Primitives.UInt16.unpack(data)
513
        obj.Name = uabin.Primitives.String.unpack(data)
514
        return obj
515 1
516 1
    def __eq__(self, bname):
517 1
        return isinstance(bname, QualifiedName) and self.Name == bname.Name and self.NamespaceIndex == bname.NamespaceIndex
518 1
519 1
    def __ne__(self, other):
520 1
        return not self.__eq__(other)
521
522 1
    def __lt__(self, other):
523 1
        if not isinstance(other, QualifiedName):
524 1
            raise TypeError("Cannot compare QualifiedName and {}".format(other))
525 1
        if self.NamespaceIndex == other.NamespaceIndex:
526 1
            return self.Name < other.Name
527 1
        else:
528 1
            return self.NamespaceIndex < other.NamespaceIndex
529 1
530 1
    def __str__(self):
531 1
        return 'QualifiedName({}:{})'.format(self.NamespaceIndex, self.Name)
532 1
533 1
    __repr__ = __str__
534 1
535
536
class LocalizedText(FrozenClass):
537
    """
538
    A string qualified with a namespace index.
539 1
    """
540 1
541
    ua_types = {
542 1
        "Text": "ByteString",
543
        "Locale": "ByteString"
544 1
    }
545
546 1
    def __init__(self, text=None):
547 1
        self.Encoding = 0
548 1
        self.Text = text
549
        if isinstance(self.Text, unicode):
550 1
            self.Text = self.Text.encode('utf-8')
551 1
        if self.Text:
552 1
            self.Encoding |= (1 << 1)
553 1
        self.Locale = None
554 1
        self._freeze = True
555 1
556 1
    def to_binary(self):
557 1
        packet = []
558 1
        if self.Locale:
559
            self.Encoding |= (1 << 0)
560 1
        if self.Text:
561 1
            self.Encoding |= (1 << 1)
562
        packet.append(uabin.Primitives.UInt8.pack(self.Encoding))
563
        if self.Locale:
564 1
            packet.append(uabin.Primitives.Bytes.pack(self.Locale))
565
        if self.Text:
566
            packet.append(uabin.Primitives.Bytes.pack(self.Text))
567
        return b''.join(packet)
568 1
569
    @staticmethod
570 1
    def from_binary(data):
571 1
        obj = LocalizedText()
572 1
        obj.Encoding = ord(data.read(1))
573
        if obj.Encoding & (1 << 0):
574 1
            obj.Locale = uabin.Primitives.Bytes.unpack(data)
575 1
        if obj.Encoding & (1 << 1):
576 1
            obj.Text = uabin.Primitives.Bytes.unpack(data)
577 1
        return obj
578 1
579 1
    def to_string(self):
580 1
        # FIXME: use local
581 1
        if self.Text is None:
582 1
            return ""
583 1
        return self.Text.decode('utf-8')
584 1
585 1
    def __str__(self):
586 1
        return 'LocalizedText(' + 'Encoding:' + str(self.Encoding) + ', ' + \
587 1
            'Locale:' + str(self.Locale) + ', ' + \
588 1
            'Text:' + str(self.Text) + ')'
589
    __repr__ = __str__
590
591
    def __eq__(self, other):
592 1
        if isinstance(other, LocalizedText) and self.Locale == other.Locale and self.Text == other.Text:
593
            return True
594 1
        return False
595
596
    def __ne__(self, other):
597 1
        return not self.__eq__(other)
598
599
600 1
class ExtensionObject(FrozenClass):
601
    """
602 1
    Any UA object packed as an ExtensionObject
603 1
604
    :ivar TypeId:
605
    :vartype TypeId: NodeId
606 1
    :ivar Body:
607
    :vartype Body: bytes
608 1
    """
609 1
610
    def __init__(self):
611
        self.TypeId = NodeId()
612 1
        self.Encoding = 0
613
        self.Body = b''
614 1
        self._freeze = True
615 1
616
    def to_binary(self):
617
        packet = []
618 1
        if self.Body:
619
            self.Encoding |= (1 << 0)
620 1
        packet.append(self.TypeId.to_binary())
621 1
        packet.append(uabin.Primitives.UInt8.pack(self.Encoding))
622
        if self.Body:
623
            packet.append(uabin.Primitives.ByteString.pack(self.Body))
624 1
        return b''.join(packet)
625
626 1
    @staticmethod
627 1
    def from_binary(data):
628
        obj = ExtensionObject()
629
        obj.TypeId = NodeId.from_binary(data)
630 1
        obj.Encoding = uabin.Primitives.UInt8.unpack(data)
631
        if obj.Encoding & (1 << 0):
632 1
            obj.Body = uabin.Primitives.ByteString.unpack(data)
633 1
        return obj
634
635
    @staticmethod
636 1
    def from_object(obj):
637
        ext = ExtensionObject()
638
        oid = getattr(ObjectIds, "{}_Encoding_DefaultBinary".format(obj.__class__.__name__))
639 1
        ext.TypeId = FourByteNodeId(oid)
640
        ext.Body = obj.to_binary()
641
        return ext
642
643
    def __str__(self):
644
        return 'ExtensionObject(' + 'TypeId:' + str(self.TypeId) + ', ' + \
645 1
            'Encoding:' + str(self.Encoding) + ', ' + str(len(self.Body)) + ' bytes)'
646 1
647
    __repr__ = __str__
648 1
649 1
650 1
class VariantType(Enum):
651
    """
652 1
    The possible types of a variant.
653 1
654
    :ivar Null:
655 1
    :ivar Boolean:
656
    :ivar SByte:
657 1
    :ivar Byte:
658 1
    :ivar Int16:
659 1
    :ivar UInt16:
660 1
    :ivar Int32:
661 1
    :ivar UInt32:
662 1
    :ivar Int64:
663
    :ivar UInt64:
664 1
    :ivar Float:
665 1
    :ivar Double:
666 1
    :ivar String:
667
    :ivar DateTime:
668 1
    :ivar Guid:
669 1
    :ivar ByteString:
670 1
    :ivar XmlElement:
671 1
    :ivar NodeId:
672 1
    :ivar ExpandedNodeId:
673
    :ivar StatusCode:
674 1
    :ivar QualifiedName:
675
    :ivar LocalizedText:
676 1
    :ivar ExtensionObject:
677 1
    :ivar DataValue:
678 1
    :ivar Variant:
679 1
    :ivar DiagnosticInfo:
680
    """
681 1
682 1
    Null = 0
683
    Boolean = 1
684 1
    SByte = 2
685
    Byte = 3
686
    Int16 = 4
687 1
    UInt16 = 5
688
    Int32 = 6
689
    UInt32 = 7
690 1
    Int64 = 8
691
    UInt64 = 9
692
    Float = 10
693 1
    Double = 11
694
    String = 12
695
    DateTime = 13
696
    Guid = 14
697
    ByteString = 15
698
    XmlElement = 16
699 1
    NodeId = 17
700 1
    ExpandedNodeId = 18
701 1
    StatusCode = 19
702 1
    QualifiedName = 20
703 1
    LocalizedText = 21
704 1
    ExtensionObject = 22
705 1
    DataValue = 23
706 1
    Variant = 24
707 1
    DiagnosticInfo = 25
708
709 1
710 1
class VariantTypeCustom(object):
711 1
    """
712
    Looks like sometime we get variant with other values than those
713 1
    defined in VariantType.
714 1
    FIXME: We should not need this class, as far as I iunderstand the spec
715 1
    variants can only be of VariantType
716 1
    """
717
718 1
    def __init__(self, val):
719 1
        self.name = "Custom"
720 1
        self.value = val
721
        if self.value > 0b00111111:
722 1
            raise UaError("Cannot create VariantType. VariantType must be {} > x > {}, received {}".format(0b111111, 25, val))
723
724 1
    def __str__(self):
725 1
        return "VariantType.Custom:{}".format(self.value)
726 1
    __repr__ = __str__
727
728 1
    def __eq__(self, other):
729 1
        return self.value == other.value
730 1
731
732 1
class Variant(FrozenClass):
733
    """
734
    Create an OPC-UA Variant object.
735
    if no argument a Null Variant is created.
736 1
    if not variant type is given, attemps to guess type from python type
737
    if a variant is given as value, the new objects becomes a copy of the argument
738
739
    :ivar Value:
740 1
    :vartype Value: Any supported type
741
    :ivar VariantType:
742 1
    :vartype VariantType: VariantType
743 1
    """
744 1
745 1
    def __init__(self, value=None, varianttype=None, dimensions=None):
746
        self.Value = value
747 1
        self.VariantType = varianttype
748 1
        self.Dimensions = dimensions
749
        self._freeze = True
750
        if isinstance(value, Variant):
751 1
            self.Value = value.Value
752
            self.VariantType = value.VariantType
753
        if self.VariantType is None:
754
            self.VariantType = self._guess_type(self.Value)
755
        if self.Value is None and self.VariantType not in (
756
                VariantType.Null,
757
                VariantType.String,
758
                VariantType.DateTime):
759
            raise UaError("Variant of type {} cannot have value None".format(self.VariantType))
760
        if self.Dimensions is None and type(self.Value) in (list, tuple):
761
            dims = get_shape(self.Value)
762
            if len(dims) > 1:
763
                self.Dimensions = dims
764
765 1
    def __eq__(self, other):
766 1
        if isinstance(other, Variant) and self.VariantType == other.VariantType and self.Value == other.Value:
767 1
            return True
768 1
        return False
769 1
770
    def __ne__(self, other):
771 1
        return not self.__eq__(other)
772 1
773 1
    def _guess_type(self, val):
774 1
        if isinstance(val, (list, tuple)):
775 1
            error_val = val
776 1
        while isinstance(val, (list, tuple)):
777 1
            if len(val) == 0:
778 1
                raise UaError("could not guess UA type of variable {}".format(error_val))
779 1
            val = val[0]
780
        if val is None:
781 1
            return VariantType.Null
782
        elif isinstance(val, bool):
783
            return VariantType.Boolean
784
        elif isinstance(val, float):
785
            return VariantType.Double
786
        elif isinstance(val, int):
787
            return VariantType.Int64
788
        elif type(val) in (str, unicode):
789
            return VariantType.String
790 1
        elif isinstance(val, bytes):
791
            return VariantType.ByteString
792
        elif isinstance(val, datetime):
793
            return VariantType.DateTime
794
        elif isinstance(val, uuid.UUID):
795
            return VariantType.Guid
796
        else:
797
            if isinstance(val, object):
798 1
                try:
799
                    return getattr(VariantType, val.__class__.__name__)
800
                except AttributeError:
801
                    return VariantType.ExtensionObject
802 1
            else:
803
                raise UaError("Could not guess UA type of {} with type {}, specify UA type".format(val, type(val)))
804
805 1 View Code Duplication
    def __str__(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
806
        return "Variant(val:{!s},type:{})".format(self.Value, self.VariantType)
807
    __repr__ = __str__
808
809
    def to_binary(self):
810
        b = []
811
        encoding = self.VariantType.value & 0b111111
812
        if type(self.Value) in (list, tuple):
813
            if self.Dimensions is not None:
814
                encoding = uabin.set_bit(encoding, 6)
815
            encoding = uabin.set_bit(encoding, 7)
816
            b.append(uabin.Primitives.UInt8.pack(encoding))
817
            b.append(uabin.pack_uatype_array(self.VariantType, flatten(self.Value)))
818
            if self.Dimensions is not None:
819
                b.append(uabin.pack_uatype_array(VariantType.Int32, self.Dimensions))
820
        else:
821
            b.append(uabin.Primitives.UInt8.pack(encoding))
822
            b.append(uabin.pack_uatype(self.VariantType, self.Value))
823
824
        return b"".join(b)
825
826
    @staticmethod
827
    def from_binary(data):
828
        dimensions = None
829
        encoding = ord(data.read(1))
830
        int_type = encoding & 0b00111111
831
        vtype = datatype_to_varianttype(int_type)
832
        if vtype == VariantType.Null:
833
            return Variant(None, vtype, encoding)
834
        if uabin.test_bit(encoding, 7):
835
            value = uabin.unpack_uatype_array(vtype, data)
836
        else:
837
            value = uabin.unpack_uatype(vtype, data)
838
        if uabin.test_bit(encoding, 6):
839
            dimensions = uabin.unpack_uatype_array(VariantType.Int32, data)
840 1
            value = reshape(value, dimensions)
841 1
842 1
        return Variant(value, vtype, dimensions)
843 1
844 1
845 1
def reshape(flat, dims):
846 1
    subdims = dims[1:]
847 1
    subsize = 1
848 1
    for i in subdims:
849 1
        if i == 0:
850 1
            i = 1
851 1
        subsize *= i
852 1
    while dims[0] * subsize > len(flat):
853 1
        flat.append([])
854 1
    if not subdims or subdims == [0]:
855 1
        return flat
856 1
    return [reshape(flat[i: i + subsize], subdims) for i in range(0, len(flat), subsize)]
857 1
858 1
859 1
def _split_list(l, n):
860 1
    n = max(1, n)
861 1
    return [l[i:i + n] for i in range(0, len(l), n)]
862 1
863 1
864 1
def flatten_and_get_shape(mylist):
865 1
    dims = []
866
    dims.append(len(mylist))
867
    while isinstance(mylist[0], (list, tuple)):
868 1
        dims.append(len(mylist[0]))
869 1
        mylist = [item for sublist in mylist for item in sublist]
870 1
        if len(mylist) == 0:
871 1
            break
872 1
    return mylist, dims
873 1
874
875 1
def flatten(mylist):
876
    if len(mylist) == 0:
877 1
        return mylist
878
    while isinstance(mylist[0], (list, tuple)):
879 1
        mylist = [item for sublist in mylist for item in sublist]
880 1
        if len(mylist) == 0:
881
            break
882
    return mylist
883 1
884
885
def get_shape(mylist):
886
    dims = []
887
    while isinstance(mylist, (list, tuple)):
888
        dims.append(len(mylist))
889
        if len(mylist) == 0:
890
            break
891
        mylist = mylist[0]
892
    return dims
893
894
895
class XmlElement(FrozenClass):
896
    """
897 1
    An XML element encoded as an UTF-8 string.
898 1
    """
899 1
900 1
    def __init__(self, binary=None):
901 1
        if binary is not None:
902 1
            self._binary_init(binary)
903 1
            self._freeze = True
904 1
            return
905 1
        self.Value = []
906 1
        self._freeze = True
907 1
908 1
    def to_binary(self):
909 1
        return uabin.Primitives.String.pack(self.Value)
910 1
911
    @staticmethod
912 1
    def from_binary(data):
913 1
        return XmlElement(data)
914 1
915 1
    def _binary_init(self, data):
916
        self.Value = uabin.Primitives.String.unpack(data)
917 1
918 1
    def __str__(self):
919
        return 'XmlElement(Value:' + str(self.Value) + ')'
920 1
921 1
    __repr__ = __str__
922 1
923 1
924 1
class DataValue(FrozenClass):
925
    """
926 1
    A value with an associated timestamp, and quality.
927 1
    Automatically generated from xml , copied and modified here to fix errors in xml spec
928 1
929 1
    :ivar Value:
930 1
    :vartype Value: Variant
931 1
    :ivar StatusCode:
932 1
    :vartype StatusCode: StatusCode
933 1
    :ivar SourceTimestamp:
934 1
    :vartype SourceTimestamp: datetime
935 1
    :ivar SourcePicoSeconds:
936 1
    :vartype SourcePicoSeconds: int
937 1
    :ivar ServerTimestamp:
938 1
    :vartype ServerTimestamp: datetime
939 1
    :ivar ServerPicoseconds:
940 1
    :vartype ServerPicoseconds: int
941
    """
942 1
943 1
    def __init__(self, variant=None, status=None):
944 1
        self.Encoding = 0
945 1
        if not isinstance(variant, Variant):
946 1
            variant = Variant(variant)
947
        self.Value = variant
948
        if status is None:
949
            self.StatusCode = StatusCode()
950 1
        else:
951 1
            self.StatusCode = status
952 1
        self.SourceTimestamp = None  # DateTime()
953
        self.SourcePicoseconds = None
954 1
        self.ServerTimestamp = None  # DateTime()
955 1
        self.ServerPicoseconds = None
956 1
        self._freeze = True
957 1
958 1
    def to_binary(self):
959 1
        packet = []
960 1
        if self.Value:
961 1
            self.Encoding |= (1 << 0)
962 1
        if self.StatusCode:
963 1
            self.Encoding |= (1 << 1)
964 1
        if self.SourceTimestamp:
965
            self.Encoding |= (1 << 2)
966 1
        if self.ServerTimestamp:
967 1
            self.Encoding |= (1 << 3)
968
        if self.SourcePicoseconds:
969 1
            self.Encoding |= (1 << 4)
970
        if self.ServerPicoseconds:
971 1
            self.Encoding |= (1 << 5)
972
        packet.append(uabin.Primitives.UInt8.pack(self.Encoding))
973 1
        if self.Value:
974 1
            packet.append(self.Value.to_binary())
975 1
        if self.StatusCode:
976 1
            packet.append(self.StatusCode.to_binary())
977 1
        if self.SourceTimestamp:
978
            packet.append(uabin.Primitives.DateTime.pack(self.SourceTimestamp))  # self.SourceTimestamp.to_binary())
979 1
        if self.ServerTimestamp:
980 1
            packet.append(uabin.Primitives.DateTime.pack(self.ServerTimestamp))  # self.ServerTimestamp.to_binary())
981 1
        if self.SourcePicoseconds:
982 1
            packet.append(uabin.Primitives.UInt16.pack(self.SourcePicoseconds))
983 1
        if self.ServerPicoseconds:
984
            packet.append(uabin.Primitives.UInt16.pack(self.ServerPicoseconds))
985 1
        return b''.join(packet)
986 1
987 1
    @staticmethod
988 1
    def from_binary(data):
989
        encoding = ord(data.read(1))
990 1
        if encoding & (1 << 0):
991
            value = Variant.from_binary(data)
992
        else:
993 1
            value = None
994 1
        if encoding & (1 << 1):
995 1
            status = StatusCode.from_binary(data)
996 1
        else:
997 1
            status = None
998 1
        obj = DataValue(value, status)
999 1
        obj.Encoding = encoding
1000 1
        if obj.Encoding & (1 << 2):
1001 1
            obj.SourceTimestamp = uabin.Primitives.DateTime.unpack(data)  # DateTime.from_binary(data)
1002 1
        if obj.Encoding & (1 << 3):
1003 1
            obj.ServerTimestamp = uabin.Primitives.DateTime.unpack(data)  # DateTime.from_binary(data)
1004 1
        if obj.Encoding & (1 << 4):
1005
            obj.SourcePicoseconds = uabin.Primitives.UInt16.unpack(data)
1006
        if obj.Encoding & (1 << 5):
1007 1
            obj.ServerPicoseconds = uabin.Primitives.UInt16.unpack(data)
1008
        return obj
1009
1010
    def __str__(self):
1011
        s = 'DataValue(Value:{}'.format(self.Value)
1012 1
        if self.StatusCode is not None:
1013
            s += ', StatusCode:{}'.format(self.StatusCode)
1014
        if self.SourceTimestamp is not None:
1015
            s += ', SourceTimestamp:{}'.format(self.SourceTimestamp)
1016
        if self.ServerTimestamp is not None:
1017
            s += ', ServerTimestamp:{}'.format(self.ServerTimestamp)
1018
        if self.SourcePicoseconds is not None:
1019
            s += ', SourcePicoseconds:{}'.format(self.SourcePicoseconds)
1020
        if self.ServerPicoseconds is not None:
1021
            s += ', ServerPicoseconds:{}'.format(self.ServerPicoseconds)
1022
        s += ')'
1023 1
        return s
1024 1
1025 1
    __repr__ = __str__
1026 1
1027 1
1028 1
def datatype_to_varianttype(int_type):
1029 1
    """
1030 1
    Takes a NodeId or int and return a VariantType
1031
    This is only supported if int_type < 63 due to VariantType encoding
1032
    At low level we do not have access to address space thus decoding is limited
1033 1
    a better version of this method can be find in ua_utils.py
1034 1
    """
1035 1
    if isinstance(int_type, NodeId):
1036 1
        int_type = int_type.Identifier
1037 1
1038 1
    if int_type <= 25:
1039 1
        return VariantType(int_type)
1040 1
    else:
1041
        return VariantTypeCustom(int_type)
1042
1043 1
1044
def get_default_value(vtype):
1045
    """
1046
    Given a variant type return default value for this type
1047 1
    """
1048
    if vtype == VariantType.Null:
1049
        return None
1050
    elif vtype == VariantType.Boolean:
1051
        return False
1052
    elif vtype in (VariantType.SByte, VariantType.Byte, VariantType.ByteString):
1053
        return b""
1054
    elif 4 <= vtype.value <= 9:
1055 1
        return 0
1056
    elif vtype in (VariantType.Float, VariantType.Double):
1057
        return 0.0
1058 1
    elif vtype == VariantType.String:
1059
        return None  # a string can be null
1060
    elif vtype == VariantType.DateTime:
1061
        return datetime.now()
1062 1
    elif vtype == VariantType.Guid:
1063
        return uuid.uuid4()
1064
    elif vtype == VariantType.XmlElement:
1065 1
        return None  #Not sure this is correct
1066
    elif vtype == VariantType.NodeId:
1067
        return NodeId()
1068 1
    elif vtype == VariantType.ExpandedNodeId:
1069
        return NodeId()
1070
    elif vtype == VariantType.StatusCode:
1071 1
        return StatusCode()
1072
    elif vtype == VariantType.QualifiedName:
1073
        return QualifiedName()
1074
    elif vtype == VariantType.LocalizedText:
1075
        return LocalizedText()
1076
    elif vtype == VariantType.ExtensionObject:
1077
        return ExtensionObject()
1078
    elif vtype == VariantType.DataValue:
1079
        return DataValue()
1080
    elif vtype == VariantType.Variant:
1081
        return Variant()
1082
    else:
1083
        raise RuntimeError("function take a uatype as argument, got:", vtype)
1084
1085
1086