Completed
Pull Request — master (#406)
by Denis
03:42
created

LocalizedText.__str__()   A

Complexity

Conditions 1

Size

Total Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 1.125

Importance

Changes 0
Metric Value
cc 1
c 0
b 0
f 0
dl 0
loc 4
ccs 1
cts 2
cp 0.5
crap 1.125
rs 10
1
"""
2
implement ua datatypes
3
"""
4 1
import struct
5 1
from enum import Enum, IntEnum, EnumMeta
6 1
from datetime import datetime
7 1
import sys
8 1
import os
9 1
import uuid
10 1
import re
11 1
import itertools
12
13 1
from opcua.ua import ua_binary as uabin
14 1
from opcua.ua import status_codes
15 1
from opcua.ua import ObjectIds
16 1
from opcua.ua.uaerrors import UaError
17 1
from opcua.ua.uaerrors import UaStatusCodeError
18 1
from opcua.ua.uaerrors import UaStringParsingError
19
20
21 1
if sys.version_info.major > 2:
22
    unicode = str
23 1
def get_win_epoch():
24 1
    return uabin.win_epoch_to_datetime(0)
25
26
27 1
class _FrozenClass(object):
28
29
    """
30
    Make it impossible to add members to a class.
31
    Not pythonic at all but we found out it prevents many many
32
    bugs in use of protocol structures
33
    """
34 1
    _freeze = False
35
36 1
    def __setattr__(self, key, value):
37 1
        if self._freeze and not hasattr(self, key):
38
            raise TypeError("Error adding member '{0}' to class '{1}', class is frozen, members are {2}".format(
39
                key, self.__class__.__name__, self.__dict__.keys()))
40 1
        object.__setattr__(self, key, value)
41
42
43 1
if "PYOPCUA_NO_TYPO_CHECK" in os.environ:
44
    # typo check is cpu consuming, but it will make debug easy.
45
    # if typo check is not need (in production), please set env PYOPCUA_NO_TYPO_CHECK.
46
    # this will make all uatype class inherit from object intead of _FrozenClass
47
    # and skip the typo check.
48
    FrozenClass = object
49
else:
50 1
    FrozenClass = _FrozenClass
51
52
53 1
class ValueRank(IntEnum):
54
    """
55
    Defines dimensions of a variable.
56
    This enum does not support all cases since ValueRank support any n>0
57
    but since it is an IntEnum it can be replace by a normal int
58
    """
59 1
    ScalarOrOneDimension = -3
60 1
    Any = -2
61 1
    Scalar = -1
62 1
    OneOrMoreDimensions = 0
63 1
    OneDimension = 1
64
    # the next names are not in spec but so common we express them here
65 1
    TwoDimensions = 2
66 1
    ThreeDimensions = 3
67 1
    FourDimensions = 4
68
69
70 1
class _MaskEnum(IntEnum):
71
72 1
    @classmethod
73
    def parse_bitfield(cls, the_int):
74
        """ Take an integer and interpret it as a set of enum values. """
75 1
        assert isinstance(the_int, int)
76
77 1
        return {cls(b) for b in cls._bits(the_int)}
78
79 1
    @classmethod
80
    def to_bitfield(cls, collection):
81
        """ Takes some enum values and creates an integer from them. """
82
        # make sure all elements are of the correct type (use itertools.tee in case we get passed an
83
        # iterator)
84 1
        iter1, iter2 = itertools.tee(iter(collection))
85 1
        assert all(isinstance(x, cls) for x in iter1)
86
87 1
        return sum(x.mask for x in iter2)
88
89 1
    @property
90
    def mask(self):
91 1
        return 1 << self.value
92
93 1
    @staticmethod
94
    def _bits(n):
95
        """ Iterate over the bits in n.
96
97
            e.g. bits(44) yields at 2, 3, 5
98
        """
99 1
        assert n >= 0  # avoid infinite recursion
100
101 1
        pos = 0
102 1
        while n:
103 1
            if n & 0x1:
104 1
                yield pos
105 1
            n = n // 2
106 1
            pos += 1
107
108
109 1
class AccessLevel(_MaskEnum):
110
    """
111
    Bit index to indicate what the access level is.
112
113
    Spec Part 3, appears multiple times, e.g. paragraph 5.6.2 Variable NodeClass
114
    """
115 1
    CurrentRead = 0
116 1
    CurrentWrite = 1
117 1
    HistoryRead = 2
118 1
    HistoryWrite = 3
119 1
    SemanticChange = 4
120 1
    StatusWrite = 5
121 1
    TimestampWrite = 6
122
123
124 1
class WriteMask(_MaskEnum):
125
    """
126
    Bit index to indicate which attribute of a node is writable
127
128
    Spec Part 3, Paragraph 5.2.7 WriteMask
129
    """
130 1
    AccessLevel = 0
131 1
    ArrayDimensions = 1
132 1
    BrowseName = 2
133 1
    ContainsNoLoops = 3
134 1
    DataType = 4
135 1
    Description = 5
136 1
    DisplayName = 6
137 1
    EventNotifier = 7
138 1
    Executable = 8
139 1
    Historizing = 9
140 1
    InverseName = 10
141 1
    IsAbstract = 11
142 1
    MinimumSamplingInterval = 12
143 1
    NodeClass = 13
144 1
    NodeId = 14
145 1
    Symmetric = 15
146 1
    UserAccessLevel = 16
147 1
    UserExecutable = 17
148 1
    UserWriteMask = 18
149 1
    ValueRank = 19
150 1
    WriteMask = 20
151 1
    ValueForVariableType = 21
152
153
154 1
class EventNotifier(_MaskEnum):
155
    """
156
    Bit index to indicate how a node can be used for events.
157
158
    Spec Part 3, appears multiple times, e.g. Paragraph 5.4 View NodeClass
159
    """
160 1
    SubscribeToEvents = 0
161
    # Reserved        = 1
162 1
    HistoryRead = 2
163 1
    HistoryWrite = 3
164
165
166 1
class StatusCode(FrozenClass):
167
    """
168
    :ivar value:
169
    :vartype value: int
170
    :ivar name:
171
    :vartype name: string
172
    :ivar doc:
173
    :vartype doc: string
174
    """
175
176 1
    def __init__(self, value=0):
177 1
        if isinstance(value, str):
178 1
            self.name = value
179 1
            self.value = getattr(status_codes.StatusCodes, value)
180
        else:
181 1
            self.value = value
182 1
            self.name, self.doc = status_codes.get_name_and_doc(value)
183 1
        self._freeze = True
184
185 1
    def to_binary(self):
186 1
        return uabin.Primitives.UInt32.pack(self.value)
187
188 1
    @staticmethod
189
    def from_binary(data):
190 1
        val = uabin.Primitives.UInt32.unpack(data)
191 1
        sc = StatusCode(val)
192 1
        return sc
193
194 1
    def check(self):
195
        """
196
        Raises an exception if the status code is anything else than 0 (good).
197
198
        Use the is_good() method if you do not want an exception.
199
        """
200 1
        if not self.is_good():
201 1
            raise UaStatusCodeError(self.value)
202
203 1
    def is_good(self):
204
        """
205
        return True if status is Good.
206
        """
207 1
        mask = 3 << 30
208 1
        if mask & self.value:
209 1
            return False
210
        else:
211 1
            return True
212
213 1
    def __str__(self):
214
        return 'StatusCode({0})'.format(self.name)
215 1
    __repr__ = __str__
216
217 1
    def __eq__(self, other):
218 1
        return self.value == other.value
219
220 1
    def __ne__(self, other):
221
        return not self.__eq__(other)
222
223
224 1
class NodeIdType(IntEnum):
225 1
    TwoByte = 0
226 1
    FourByte = 1
227 1
    Numeric = 2
228 1
    String = 3
229 1
    Guid = 4
230 1
    ByteString = 5
231
232
233 1
class NodeId(FrozenClass):
234
    """
235
    NodeId Object
236
237
    Args:
238
        identifier: The identifier might be an int, a string, bytes or a Guid
239
        namespaceidx(int): The index of the namespace
240
        nodeidtype(NodeIdType): The type of the nodeid if it cannor be guess or you want something special like twobyte nodeid or fourbytenodeid
241
242
243
    :ivar Identifier:
244
    :vartype Identifier: NodeId
245
    :ivar NamespaceIndex:
246
    :vartype NamespaceIndex: Int
247
    :ivar NamespaceUri:
248
    :vartype NamespaceUri: String
249
    :ivar ServerIndex:
250
    :vartype ServerIndex: Int
251
    """
252
    
253 1
    def __init__(self, identifier=None, namespaceidx=0, nodeidtype=None):
254
255 1
        self.Identifier = identifier
256 1
        self.NamespaceIndex = namespaceidx
257 1
        self.NodeIdType = nodeidtype
258 1
        self.NamespaceUri = ""
259 1
        self.ServerIndex = 0
260 1
        self._freeze = True
261 1
        if not isinstance(self.NamespaceIndex, int):
262 1
            raise UaError("NamespaceIndex must be an int")
263 1
        if self.Identifier is None:
264 1
            self.Identifier = 0
265 1
            self.NodeIdType = NodeIdType.TwoByte
266 1
            return
267 1
        if self.NodeIdType is None:
268 1
            if isinstance(self.Identifier, int):
269 1
                self.NodeIdType = NodeIdType.Numeric
270 1
            elif isinstance(self.Identifier, str):
271 1
                self.NodeIdType = NodeIdType.String
272 1
            elif isinstance(self.Identifier, bytes):
273
                self.NodeIdType = NodeIdType.ByteString
274 1
            elif isinstance(self.Identifier, uuid.UUID):
275 1
                self.NodeIdType = NodeIdType.Guid
276
            else:
277
                raise UaError("NodeId: Could not guess type of NodeId, set NodeIdType")
278
279 1
    def _key(self):
280 1
        if self.NodeIdType in (NodeIdType.TwoByte, NodeIdType.FourByte, NodeIdType.Numeric): 
281
            # twobyte, fourbyte and numeric may represent the same node
282 1
            return (NodeIdType.Numeric, self.NamespaceIndex, self.Identifier)
283 1
        return (self.NodeIdType, self.NamespaceIndex, self.Identifier)
284
285 1
    def __eq__(self, node):
286 1
        return isinstance(node, NodeId) and self._key() == node._key()
287
288 1
    def __ne__(self, other):
289 1
        return not self.__eq__(other)
290
291 1
    def __hash__(self):
292 1
        return hash(self._key())
293
294 1
    def __lt__(self, other):
295 1
        if not isinstance(other, NodeId):
296
            raise AttributeError("Can only compare to NodeId")
297 1
        return self._key() < other._key()
298
299 1
    def is_null(self):
300 1
        if self.NamespaceIndex != 0:
301 1
            return False
302 1
        return self.has_null_identifier()
303
304 1
    def has_null_identifier(self):
305 1
        if not self.Identifier:
306 1
            return True
307 1
        if self.NodeIdType == NodeIdType.Guid and re.match(b'0.', self.Identifier):
308 1
            return True
309 1
        return False
310
311 1
    @staticmethod
312
    def from_string(string):
313 1
        try:
314 1
            return NodeId._from_string(string)
315 1
        except ValueError as ex:
316 1
            raise UaStringParsingError("Error parsing string {0}".format(string), ex)
317
318 1
    @staticmethod
319
    def _from_string(string):
320 1
        l = string.split(";")
321 1
        identifier = None
322 1
        namespace = 0
323 1
        ntype = None
324 1
        srv = None
325 1
        nsu = None
326 1
        for el in l:
327 1
            if not el:
328 1
                continue
329 1
            k, v = el.split("=", 1)
330 1
            k = k.strip()
331 1
            v = v.strip()
332 1
            if k == "ns":
333 1
                namespace = int(v)
334 1
            elif k == "i":
335 1
                ntype = NodeIdType.Numeric
336 1
                identifier = int(v)
337 1
            elif k == "s":
338 1
                ntype = NodeIdType.String
339 1
                identifier = v
340 1
            elif k == "g":
341
                ntype = NodeIdType.Guid
342
                identifier = v
343 1
            elif k == "b":
344
                ntype = NodeIdType.ByteString
345
                identifier = v
346 1
            elif k == "srv":
347 1
                srv = v
348
            elif k == "nsu":
349
                nsu = v
350 1
        if identifier is None:
351 1
            raise UaStringParsingError("Could not find identifier in string: " + string)
352 1
        nodeid = NodeId(identifier, namespace, ntype)
353 1
        nodeid.NamespaceUri = nsu
354 1
        nodeid.ServerIndex = srv
355 1
        return nodeid
356
357 1
    def to_string(self):
358 1
        string = ""
359 1
        if self.NamespaceIndex != 0:
360 1
            string += "ns={0};".format(self.NamespaceIndex)
361 1
        ntype = None
362 1
        if self.NodeIdType == NodeIdType.Numeric:
363 1
            ntype = "i"
364 1
        elif self.NodeIdType == NodeIdType.String:
365 1
            ntype = "s"
366 1
        elif self.NodeIdType == NodeIdType.TwoByte:
367 1
            ntype = "i"
368 1
        elif self.NodeIdType == NodeIdType.FourByte:
369 1
            ntype = "i"
370
        elif self.NodeIdType == NodeIdType.Guid:
371
            ntype = "g"
372
        elif self.NodeIdType == NodeIdType.ByteString:
373
            ntype = "b"
374 1
        string += "{0}={1}".format(ntype, self.Identifier)
375 1
        if self.ServerIndex:
376
            string = "srv=" + str(self.ServerIndex) + string
377 1
        if self.NamespaceUri:
378
            string += "nsu={0}".format(self.NamespaceUri)
379 1
        return string
380
381 1
    def __str__(self):
382 1
        return "{0}NodeId({1})".format(self.NodeIdType.name, self.to_string())
383 1
    __repr__ = __str__
384
385 1
    def to_binary(self):
386 1
        if self.NodeIdType == NodeIdType.TwoByte:
387 1
            return struct.pack("<BB", self.NodeIdType.value, self.Identifier)
388 1
        elif self.NodeIdType == NodeIdType.FourByte:
389 1
            return struct.pack("<BBH", self.NodeIdType.value, self.NamespaceIndex, self.Identifier)
390 1
        elif self.NodeIdType == NodeIdType.Numeric:
391 1
            return struct.pack("<BHI", self.NodeIdType.value, self.NamespaceIndex, self.Identifier)
392 1
        elif self.NodeIdType == NodeIdType.String:
393 1
            return struct.pack("<BH", self.NodeIdType.value, self.NamespaceIndex) + \
394
                uabin.Primitives.String.pack(self.Identifier)
395 1
        elif self.NodeIdType == NodeIdType.ByteString:
396 1
            return struct.pack("<BH", self.NodeIdType.value, self.NamespaceIndex) + \
397
                uabin.Primitives.Bytes.pack(self.Identifier)
398 1
        elif self.NodeIdType == NodeIdType.Guid:
399 1
            return struct.pack("<BH", self.NodeIdType.value, self.NamespaceIndex) + \
400
                   uabin.Primitives.Guid.pack(self.Identifier)
401
        else:
402
            return struct.pack("<BH", self.NodeIdType.value, self.NamespaceIndex) + \
403
                self.Identifier.to_binary()
404
        # FIXME: Missing NNamespaceURI and ServerIndex
405
406 1
    @staticmethod
407
    def from_binary(data):
408 1
        nid = NodeId()
409 1
        encoding = ord(data.read(1))
410 1
        nid.NodeIdType = NodeIdType(encoding & 0b00111111)
411
412 1
        if nid.NodeIdType == NodeIdType.TwoByte:
413 1
            nid.Identifier = ord(data.read(1))
414 1
        elif nid.NodeIdType == NodeIdType.FourByte:
415 1
            nid.NamespaceIndex, nid.Identifier = struct.unpack("<BH", data.read(3))
416 1
        elif nid.NodeIdType == NodeIdType.Numeric:
417 1
            nid.NamespaceIndex, nid.Identifier = struct.unpack("<HI", data.read(6))
418 1
        elif nid.NodeIdType == NodeIdType.String:
419 1
            nid.NamespaceIndex = uabin.Primitives.UInt16.unpack(data)
420 1
            nid.Identifier = uabin.Primitives.String.unpack(data)
421 1
        elif nid.NodeIdType == NodeIdType.ByteString:
422 1
            nid.NamespaceIndex = uabin.Primitives.UInt16.unpack(data)
423 1
            nid.Identifier = uabin.Primitives.Bytes.unpack(data)
424 1
        elif nid.NodeIdType == NodeIdType.Guid:
425 1
            nid.NamespaceIndex = uabin.Primitives.UInt16.unpack(data)
426 1
            nid.Identifier = uabin.Primitives.Guid.unpack(data)
427
        else:
428
            raise UaError("Unknown NodeId encoding: " + str(nid.NodeIdType))
429
430 1
        if uabin.test_bit(encoding, 7):
431
            nid.NamespaceUri = uabin.Primitives.String.unpack(data)
432 1
        if uabin.test_bit(encoding, 6):
433
            nid.ServerIndex = uabin.Primitives.UInt32.unpack(data)
434
435 1
        return nid
436
437
438 1
class TwoByteNodeId(NodeId):
439
440 1
    def __init__(self, identifier):
441 1
        NodeId.__init__(self, identifier, 0, NodeIdType.TwoByte)
442
443
444 1
class FourByteNodeId(NodeId):
445
446 1
    def __init__(self, identifier, namespace=0):
447 1
        NodeId.__init__(self, identifier, namespace, NodeIdType.FourByte)
448
449
450 1
class NumericNodeId(NodeId):
451
452 1
    def __init__(self, identifier, namespace=0):
453 1
        NodeId.__init__(self, identifier, namespace, NodeIdType.Numeric)
454
455
456 1
class ByteStringNodeId(NodeId):
457
458 1
    def __init__(self, identifier, namespace=0):
459 1
        NodeId.__init__(self, identifier, namespace, NodeIdType.ByteString)
460
461
462 1
class GuidNodeId(NodeId):
463
464 1
    def __init__(self, identifier, namespace=0):
465 1
        NodeId.__init__(self, identifier, namespace, NodeIdType.Guid)
466
467
468 1
class StringNodeId(NodeId):
469
470 1
    def __init__(self, identifier, namespace=0):
471 1
        NodeId.__init__(self, identifier, namespace, NodeIdType.String)
472
473
474 1
ExpandedNodeId = NodeId
475
476
477 1
class QualifiedName(FrozenClass):
478
    """
479
    A string qualified with a namespace index.
480
    """
481
482 1
    def __init__(self, name=None, namespaceidx=0):
483 1
        if not isinstance(namespaceidx, int):
484
            raise UaError("namespaceidx must be an int")
485 1
        self.NamespaceIndex = namespaceidx
486 1
        self.Name = name
487 1
        self._freeze = True
488
489 1
    def to_string(self):
490 1
        return "{0}:{1}".format(self.NamespaceIndex, self.Name)
491
492 1
    @staticmethod
493
    def from_string(string):
494 1
        if ":" in string:
495 1
            try:
496 1
                idx, name = string.split(":", 1)
497 1
                idx = int(idx)
498 1
            except (TypeError, ValueError) as ex:
499 1
                raise UaStringParsingError("Error parsing string {0}".format(string), ex)
500
        else:
501 1
            idx = 0
502 1
            name = string
503 1
        return QualifiedName(name, idx)
504
505 1
    def to_binary(self):
506 1
        packet = []
507 1
        packet.append(uabin.Primitives.UInt16.pack(self.NamespaceIndex))
508 1
        packet.append(uabin.Primitives.String.pack(self.Name))
509 1
        return b''.join(packet)
510
511 1
    @staticmethod
512
    def from_binary(data):
513 1
        obj = QualifiedName()
514 1
        obj.NamespaceIndex = uabin.Primitives.UInt16.unpack(data)
515 1
        obj.Name = uabin.Primitives.String.unpack(data)
516 1
        return obj
517
518 1
    def __eq__(self, bname):
519 1
        return isinstance(bname, QualifiedName) and self.Name == bname.Name and self.NamespaceIndex == bname.NamespaceIndex
520
521 1
    def __ne__(self, other):
522
        return not self.__eq__(other)
523
524 1
    def __lt__(self, other):
525
        if not isinstance(other, QualifiedName):
526
            raise TypeError("Cannot compare QualifiedName and {0}".format(other))
527
        if self.NamespaceIndex == other.NamespaceIndex:
528
            return self.Name < other.Name
529
        else:
530
            return self.NamespaceIndex < other.NamespaceIndex
531
532 1
    def __str__(self):
533 1
        return 'QualifiedName({0}:{1})'.format(self.NamespaceIndex, self.Name)
534
535 1
    __repr__ = __str__
536
537
538 1
class LocalizedText(FrozenClass):
539
    """
540
    A string qualified with a namespace index.
541
    """
542
543 1
    ua_types = {
544
        "Text": "ByteString",
545
        "Locale": "ByteString"
546
    }
547
548 1
    def __init__(self, text=None):
549 1
        self.Encoding = 0
550 1
        self.Text = text
551 1
        if isinstance(self.Text, unicode):
552
            self.Text = self.Text.encode('utf-8')
553 1
        if self.Text:
554 1
            self.Encoding |= (1 << 1)
555 1
        self.Locale = None
556 1
        self._freeze = True
557
558 1
    def to_binary(self):
559 1
        packet = []
560 1
        if self.Locale:
561
            self.Encoding |= (1 << 0)
562 1
        if self.Text:
563 1
            self.Encoding |= (1 << 1)
564 1
        packet.append(uabin.Primitives.UInt8.pack(self.Encoding))
565 1
        if self.Locale:
566
            packet.append(uabin.Primitives.Bytes.pack(self.Locale))
567 1
        if self.Text:
568 1
            packet.append(uabin.Primitives.Bytes.pack(self.Text))
569 1
        return b''.join(packet)
570
571 1
    @staticmethod
572
    def from_binary(data):
573 1
        obj = LocalizedText()
574 1
        obj.Encoding = ord(data.read(1))
575 1
        if obj.Encoding & (1 << 0):
576
            obj.Locale = uabin.Primitives.Bytes.unpack(data)
577 1
        if obj.Encoding & (1 << 1):
578 1
            obj.Text = uabin.Primitives.Bytes.unpack(data)
579 1
        return obj
580
581 1
    def to_string(self):
582
        # FIXME: use local
583 1
        if self.Text is None:
584
            return ""
585 1
        return self.Text.decode('utf-8')
586
587 1
    def __str__(self):
588
        return 'LocalizedText(' + 'Encoding:' + str(self.Encoding) + ', ' + \
589
            'Locale:' + str(self.Locale) + ', ' + \
590
            'Text:' + str(self.Text) + ')'
591 1
    __repr__ = __str__
592
593 1
    def __eq__(self, other):
594 1
        if isinstance(other, LocalizedText) and self.Locale == other.Locale and self.Text == other.Text:
595 1
            return True
596 1
        return False
597
598 1
    def __ne__(self, other):
599 1
        return not self.__eq__(other)
600
601
602 1
class ExtensionObject(FrozenClass):
603
    """
604
    Any UA object packed as an ExtensionObject
605
606
    :ivar TypeId:
607
    :vartype TypeId: NodeId
608
    :ivar Body:
609
    :vartype Body: bytes
610
    """
611
612 1
    def __init__(self):
613 1
        self.TypeId = NodeId()
614 1
        self.Encoding = 0
615 1
        self.Body = b''
616 1
        self._freeze = True
617
618 1
    def to_binary(self):
619 1
        packet = []
620 1
        if self.Body:
621 1
            self.Encoding |= (1 << 0)
622 1
        packet.append(self.TypeId.to_binary())
623 1
        packet.append(uabin.Primitives.UInt8.pack(self.Encoding))
624 1
        if self.Body:
625 1
            packet.append(uabin.Primitives.ByteString.pack(self.Body))
626 1
        return b''.join(packet)
627
628 1
    @staticmethod
629
    def from_binary(data):
630
        obj = ExtensionObject()
631
        obj.TypeId = NodeId.from_binary(data)
632
        obj.Encoding = uabin.Primitives.UInt8.unpack(data)
633
        if obj.Encoding & (1 << 0):
634
            obj.Body = uabin.Primitives.ByteString.unpack(data)
635
        return obj
636
637 1
    @staticmethod
638
    def from_object(obj):
639
        ext = ExtensionObject()
640
        oid = getattr(ObjectIds, "{0}_Encoding_DefaultBinary".format(obj.__class__.__name__))
641
        ext.TypeId = FourByteNodeId(oid)
642
        ext.Body = obj.to_binary()
643
        return ext
644
645 1
    def __str__(self):
646
        return 'ExtensionObject(' + 'TypeId:' + str(self.TypeId) + ', ' + \
647
            'Encoding:' + str(self.Encoding) + ', ' + str(len(self.Body)) + ' bytes)'
648
649 1
    __repr__ = __str__
650
651
652 1
class VariantType(Enum):
653
    """
654
    The possible types of a variant.
655
656
    :ivar Null:
657
    :ivar Boolean:
658
    :ivar SByte:
659
    :ivar Byte:
660
    :ivar Int16:
661
    :ivar UInt16:
662
    :ivar Int32:
663
    :ivar UInt32:
664
    :ivar Int64:
665
    :ivar UInt64:
666
    :ivar Float:
667
    :ivar Double:
668
    :ivar String:
669
    :ivar DateTime:
670
    :ivar Guid:
671
    :ivar ByteString:
672
    :ivar XmlElement:
673
    :ivar NodeId:
674
    :ivar ExpandedNodeId:
675
    :ivar StatusCode:
676
    :ivar QualifiedName:
677
    :ivar LocalizedText:
678
    :ivar ExtensionObject:
679
    :ivar DataValue:
680
    :ivar Variant:
681
    :ivar DiagnosticInfo:
682
    """
683
684 1
    Null = 0
685 1
    Boolean = 1
686 1
    SByte = 2
687 1
    Byte = 3
688 1
    Int16 = 4
689 1
    UInt16 = 5
690 1
    Int32 = 6
691 1
    UInt32 = 7
692 1
    Int64 = 8
693 1
    UInt64 = 9
694 1
    Float = 10
695 1
    Double = 11
696 1
    String = 12
697 1
    DateTime = 13
698 1
    Guid = 14
699 1
    ByteString = 15
700 1
    XmlElement = 16
701 1
    NodeId = 17
702 1
    ExpandedNodeId = 18
703 1
    StatusCode = 19
704 1
    QualifiedName = 20
705 1
    LocalizedText = 21
706 1
    ExtensionObject = 22
707 1
    DataValue = 23
708 1
    Variant = 24
709 1
    DiagnosticInfo = 25
710
711
712 1
class VariantTypeCustom(object):
713
    """
714
    Looks like sometime we get variant with other values than those
715
    defined in VariantType.
716
    FIXME: We should not need this class, as far as I iunderstand the spec
717
    variants can only be of VariantType
718
    """
719
720 1
    def __init__(self, val):
721 1
        self.name = "Custom"
722 1
        self.value = val
723 1
        if self.value > 0b00111111:
724 1
            raise UaError("Cannot create VariantType. VariantType must be {0} > x > {1}, received {2}".format(0b111111, 25, val))
725
726 1
    def __str__(self):
727
        return "VariantType.Custom:{0}".format(self.value)
728 1
    __repr__ = __str__
729
730 1
    def __eq__(self, other):
731 1
        return self.value == other.value
732
733
734 1
class Variant(FrozenClass):
735
    """
736
    Create an OPC-UA Variant object.
737
    if no argument a Null Variant is created.
738
    if not variant type is given, attemps to guess type from python type
739
    if a variant is given as value, the new objects becomes a copy of the argument
740
741
    :ivar Value:
742
    :vartype Value: Any supported type
743
    :ivar VariantType:
744
    :vartype VariantType: VariantType
745
    :ivar Dimension:
746
    :vartype Dimensions: The length of each dimensions. Usually guessed from value.
747
    :ivar is_array:
748
    :vartype is_array: If the variant is an array. Usually guessed from value.
749
    """
750
751 1
    def __init__(self, value=None, varianttype=None, dimensions=None, is_array=None):
752 1
        self.Value = value
753 1
        self.VariantType = varianttype
754 1
        self.Dimensions = dimensions
755 1
        self.is_array = is_array
756 1
        if self.is_array is None:
757 1
            if isinstance(value, (list, tuple)):
758 1
                self.is_array = True
759
            else:
760 1
                self.is_array = False
761 1
        self._freeze = True
762 1
        if isinstance(value, Variant):
763 1
            self.Value = value.Value
764 1
            self.VariantType = value.VariantType
765 1
        if self.VariantType is None:
766 1
            self.VariantType = self._guess_type(self.Value)
767 1
        if self.Value is None and not self.is_array and self.VariantType not in (
768
                VariantType.Null,
769
                VariantType.String,
770
                VariantType.DateTime):
771
            raise UaError("Non array Variant of type {0} cannot have value None".format(self.VariantType))
772 1
        if self.Dimensions is None and isinstance(self.Value, (list, tuple)):
773 1
            dims = get_shape(self.Value)
774 1
            if len(dims) > 1:
775 1
                self.Dimensions = dims
776
777 1
    def __eq__(self, other):
778 1
        if isinstance(other, Variant) and self.VariantType == other.VariantType and self.Value == other.Value:
779 1
            return True
780 1
        return False
781
782 1
    def __ne__(self, other):
783 1
        return not self.__eq__(other)
784
785 1
    def _guess_type(self, val):
786 1
        if isinstance(val, (list, tuple)):
787 1
            error_val = val
788 1
        while isinstance(val, (list, tuple)):
789 1
            if len(val) == 0:
790
                raise UaError("could not guess UA type of variable {0}".format(error_val))
791 1
            val = val[0]
792 1
        if val is None:
793 1
            return VariantType.Null
794 1
        elif isinstance(val, bool):
795 1
            return VariantType.Boolean
796 1
        elif isinstance(val, float):
797 1
            return VariantType.Double
798 1
        elif isinstance(val, int):
799 1
            return VariantType.Int64
800 1
        elif type(val) in (str, unicode):
801 1
            return VariantType.String
802 1
        elif isinstance(val, bytes):
803
            return VariantType.ByteString
804 1
        elif isinstance(val, datetime):
805 1 View Code Duplication
            return VariantType.DateTime
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
806 1
        elif isinstance(val, uuid.UUID):
807 1
            return VariantType.Guid
808
        else:
809 1
            if isinstance(val, object):
810 1
                try:
811 1
                    return getattr(VariantType, val.__class__.__name__)
812 1
                except AttributeError:
813 1
                    return VariantType.ExtensionObject
814
            else:
815
                raise UaError("Could not guess UA type of {0} with type {1}, specify UA type".format(val, type(val)))
816
817 1
    def __str__(self):
818 1
        return "Variant(val:{0!s},type:{1})".format(self.Value, self.VariantType)
819 1
    __repr__ = __str__
820
821 1
    def to_binary(self):
822 1
        b = []
823 1
        encoding = self.VariantType.value & 0b111111
824 1
        if self.is_array or type(self.Value) in (list, tuple):
825 1
            self.is_array = True
826 1
            encoding = uabin.set_bit(encoding, 7)
827 1
            if self.Dimensions is not None:
828 1
                encoding = uabin.set_bit(encoding, 6)
829 1
            b.append(uabin.Primitives.UInt8.pack(encoding))
830 1
            b.append(uabin.pack_uatype_array(self.VariantType, flatten(self.Value)))
831 1
            if self.Dimensions is not None:
832 1
                b.append(uabin.pack_uatype_array(VariantType.Int32, self.Dimensions))
833
        else:
834 1
            b.append(uabin.Primitives.UInt8.pack(encoding))
835 1
            b.append(uabin.pack_uatype(self.VariantType, self.Value))
836
837 1
        return b"".join(b)
838
839 1
    @staticmethod
840
    def from_binary(data):
841 1
        dimensions = None
842 1
        array = False
843 1
        encoding = ord(data.read(1))
844 1
        int_type = encoding & 0b00111111
845 1
        vtype = datatype_to_varianttype(int_type)
846 1
        if uabin.test_bit(encoding, 7):
847 1
            value = uabin.unpack_uatype_array(vtype, data)
848 1
            array = True
849
        else:
850 1
            value = uabin.unpack_uatype(vtype, data)
851 1
        if uabin.test_bit(encoding, 6):
852 1
            dimensions = uabin.unpack_uatype_array(VariantType.Int32, data)
853 1
            value = reshape(value, dimensions)
854 1
        return Variant(value, vtype, dimensions, is_array=array)
855
856
857 1
def reshape(flat, dims):
858 1
    subdims = dims[1:]
859 1
    subsize = 1
860 1
    for i in subdims:
861 1
        if i == 0:
862 1
            i = 1
863 1
        subsize *= i
864 1
    while dims[0] * subsize > len(flat):
865 1
        flat.append([])
866 1
    if not subdims or subdims == [0]:
867 1
        return flat
868 1
    return [reshape(flat[i: i + subsize], subdims) for i in range(0, len(flat), subsize)]
869
870
871 1
def _split_list(l, n):
872
    n = max(1, n)
873
    return [l[i:i + n] for i in range(0, len(l), n)]
874
875
876 1
def flatten_and_get_shape(mylist):
877
    dims = []
878
    dims.append(len(mylist))
879
    while isinstance(mylist[0], (list, tuple)):
880
        dims.append(len(mylist[0]))
881
        mylist = [item for sublist in mylist for item in sublist]
882
        if len(mylist) == 0:
883
            break
884
    return mylist, dims
885
886
887 1
def flatten(mylist):
888 1
    if mylist is None:
889 1
        return None
890 1
    elif len(mylist) == 0:
891 1
        return mylist
892 1
    while isinstance(mylist[0], (list, tuple)):
893 1
        mylist = [item for sublist in mylist for item in sublist]
894 1
        if len(mylist) == 0:
895 1
            break
896 1
    return mylist
897
898
899 1
def get_shape(mylist):
900 1
    dims = []
901 1
    while isinstance(mylist, (list, tuple)):
902 1
        dims.append(len(mylist))
903 1
        if len(mylist) == 0:
904 1
            break
905 1
        mylist = mylist[0]
906 1
    return dims
907
908
909 1
class XmlElement(FrozenClass):
910
    """
911
    An XML element encoded as an UTF-8 string.
912
    """
913
914 1
    def __init__(self, binary=None):
915
        if binary is not None:
916
            self._binary_init(binary)
917
            self._freeze = True
918
            return
919
        self.Value = []
920
        self._freeze = True
921
922 1
    def to_binary(self):
923
        return uabin.Primitives.String.pack(self.Value)
924
925 1
    @staticmethod
926
    def from_binary(data):
927
        return XmlElement(data)
928
929 1
    def _binary_init(self, data):
930
        self.Value = uabin.Primitives.String.unpack(data)
931
932 1
    def __str__(self):
933
        return 'XmlElement(Value:' + str(self.Value) + ')'
934
935 1
    __repr__ = __str__
936
937
938 1
class DataValue(FrozenClass):
939
    """
940
    A value with an associated timestamp, and quality.
941
    Automatically generated from xml , copied and modified here to fix errors in xml spec
942
943
    :ivar Value:
944
    :vartype Value: Variant
945
    :ivar StatusCode:
946
    :vartype StatusCode: StatusCode
947
    :ivar SourceTimestamp:
948
    :vartype SourceTimestamp: datetime
949
    :ivar SourcePicoSeconds:
950
    :vartype SourcePicoSeconds: int
951
    :ivar ServerTimestamp:
952
    :vartype ServerTimestamp: datetime
953
    :ivar ServerPicoseconds:
954
    :vartype ServerPicoseconds: int
955
    """
956
957 1
    def __init__(self, variant=None, status=None):
958 1
        self.Encoding = 0
959 1
        if not isinstance(variant, Variant):
960 1
            variant = Variant(variant)
961 1
        self.Value = variant
962 1
        if status is None:
963 1
            self.StatusCode = StatusCode()
964
        else:
965 1
            self.StatusCode = status
966 1
        self.SourceTimestamp = None  # DateTime()
967 1
        self.SourcePicoseconds = None
968 1
        self.ServerTimestamp = None  # DateTime()
969 1
        self.ServerPicoseconds = None
970 1
        self._freeze = True
971
972 1
    def to_binary(self):
973 1
        packet = []
974 1
        if self.Value:
975 1
            self.Encoding |= (1 << 0)
976 1
        if self.StatusCode:
977 1
            self.Encoding |= (1 << 1)
978 1
        if self.SourceTimestamp:
979 1
            self.Encoding |= (1 << 2)
980 1
        if self.ServerTimestamp:
981 1
            self.Encoding |= (1 << 3)
982 1
        if self.SourcePicoseconds:
983
            self.Encoding |= (1 << 4)
984 1
        if self.ServerPicoseconds:
985
            self.Encoding |= (1 << 5)
986 1
        packet.append(uabin.Primitives.UInt8.pack(self.Encoding))
987 1
        if self.Value:
988 1
            packet.append(self.Value.to_binary())
989 1
        if self.StatusCode:
990 1
            packet.append(self.StatusCode.to_binary())
991 1
        if self.SourceTimestamp:
992 1
            packet.append(uabin.Primitives.DateTime.pack(self.SourceTimestamp))  # self.SourceTimestamp.to_binary())
993 1
        if self.ServerTimestamp:
994 1
            packet.append(uabin.Primitives.DateTime.pack(self.ServerTimestamp))  # self.ServerTimestamp.to_binary())
995 1
        if self.SourcePicoseconds:
996
            packet.append(uabin.Primitives.UInt16.pack(self.SourcePicoseconds))
997 1
        if self.ServerPicoseconds:
998
            packet.append(uabin.Primitives.UInt16.pack(self.ServerPicoseconds))
999 1
        return b''.join(packet)
1000
1001 1
    @staticmethod
1002
    def from_binary(data):
1003 1
        encoding = ord(data.read(1))
1004 1
        if encoding & (1 << 0):
1005 1
            value = Variant.from_binary(data)
1006
        else:
1007
            value = None
1008 1
        if encoding & (1 << 1):
1009 1
            status = StatusCode.from_binary(data)
1010
        else:
1011
            status = None
1012 1
        obj = DataValue(value, status)
1013 1
        obj.Encoding = encoding
1014 1
        if obj.Encoding & (1 << 2):
1015 1
            obj.SourceTimestamp = uabin.Primitives.DateTime.unpack(data)  # DateTime.from_binary(data)
1016 1
        if obj.Encoding & (1 << 3):
1017 1
            obj.ServerTimestamp = uabin.Primitives.DateTime.unpack(data)  # DateTime.from_binary(data)
1018 1
        if obj.Encoding & (1 << 4):
1019
            obj.SourcePicoseconds = uabin.Primitives.UInt16.unpack(data)
1020 1
        if obj.Encoding & (1 << 5):
1021
            obj.ServerPicoseconds = uabin.Primitives.UInt16.unpack(data)
1022 1
        return obj
1023
1024 1
    def __str__(self):
1025
        s = 'DataValue(Value:{0}'.format(self.Value)
1026
        if self.StatusCode is not None:
1027
            s += ', StatusCode:{0}'.format(self.StatusCode)
1028
        if self.SourceTimestamp is not None:
1029
            s += ', SourceTimestamp:{0}'.format(self.SourceTimestamp)
1030
        if self.ServerTimestamp is not None:
1031
            s += ', ServerTimestamp:{0}'.format(self.ServerTimestamp)
1032
        if self.SourcePicoseconds is not None:
1033
            s += ', SourcePicoseconds:{0}'.format(self.SourcePicoseconds)
1034
        if self.ServerPicoseconds is not None:
1035
            s += ', ServerPicoseconds:{0}'.format(self.ServerPicoseconds)
1036
        s += ')'
1037
        return s
1038
1039 1
    __repr__ = __str__
1040
1041
1042 1
def datatype_to_varianttype(int_type):
1043
    """
1044
    Takes a NodeId or int and return a VariantType
1045
    This is only supported if int_type < 63 due to VariantType encoding
1046
    At low level we do not have access to address space thus decoding is limited
1047
    a better version of this method can be find in ua_utils.py
1048
    """
1049 1
    if isinstance(int_type, NodeId):
1050
        int_type = int_type.Identifier
1051
1052 1
    if int_type <= 25:
1053 1
        return VariantType(int_type)
1054
    else:
1055 1
        return VariantTypeCustom(int_type)
1056
1057
1058 1
def get_default_value(vtype):
1059
    """
1060
    Given a variant type return default value for this type
1061
    """
1062 1
    if vtype == VariantType.Null:
1063
        return None
1064 1
    elif vtype == VariantType.Boolean:
1065 1
        return False
1066 1
    elif vtype in (VariantType.SByte, VariantType.Byte, VariantType.ByteString):
1067
        return b""
1068 1
    elif 4 <= vtype.value <= 9:
1069 1
        return 0
1070 1
    elif vtype in (VariantType.Float, VariantType.Double):
1071 1
        return 0.0
1072 1
    elif vtype == VariantType.String:
1073 1
        return None  # a string can be null
1074
    elif vtype == VariantType.DateTime:
1075
        return datetime.utcnow()
1076
    elif vtype == VariantType.Guid:
1077
        return uuid.uuid4()
1078
    elif vtype == VariantType.XmlElement:
1079
        return None  #Not sure this is correct
1080
    elif vtype == VariantType.NodeId:
1081
        return NodeId()
1082
    elif vtype == VariantType.ExpandedNodeId:
1083
        return NodeId()
1084
    elif vtype == VariantType.StatusCode:
1085
        return StatusCode()
1086
    elif vtype == VariantType.QualifiedName:
1087
        return QualifiedName()
1088
    elif vtype == VariantType.LocalizedText:
1089
        return LocalizedText()
1090
    elif vtype == VariantType.ExtensionObject:
1091
        return ExtensionObject()
1092
    elif vtype == VariantType.DataValue:
1093
        return DataValue()
1094
    elif vtype == VariantType.Variant:
1095
        return Variant()
1096
    else:
1097
        raise RuntimeError("function take a uatype as argument, got:", vtype)
1098
1099
1100