Completed
Push — master ( 5318bd...777e81 )
by Olivier
04:55
created

NodeId.__init__()   C

Complexity

Conditions 7

Size

Total Lines 23

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 11
CRAP Score 7.0283

Importance

Changes 0
Metric Value
cc 7
dl 0
loc 23
ccs 11
cts 12
cp 0.9167
crap 7.0283
rs 5.5
c 0
b 0
f 0
1
"""
2
implement ua datatypes
3
"""
4 1
import struct
5 1
from enum import Enum, IntEnum, EnumMeta
6 1
from datetime import datetime
7 1
import sys
8 1
import os
9 1
import uuid
10 1
import re
11 1
import itertools
12 1
if sys.version_info.major > 2:
13 1
    unicode = str
14 1
15
from opcua.ua import ua_binary as uabin
16 1
from opcua.ua import status_codes
17 1
from opcua.ua import ObjectIds
18 1
from opcua.ua.uaerrors import UaError
19 1
from opcua.ua.uaerrors import UaStatusCodeError
20
from opcua.ua.uaerrors import UaStringParsingError
21
22 1
23
def get_win_epoch():
24
    return uabin.win_epoch_to_datetime(0)
25
26 1
27
class _FrozenClass(object):
28
29 1
    """
30 1
    make it impossible to add members to a class.
31 1
    This is a hack since I found out that most bugs are due to misspelling a variable in protocol
32
    """
33
    _freeze = False
34 1
35
    def __setattr__(self, key, value):
36
        if self._freeze and not hasattr(self, key):
37
            raise TypeError("Error adding member '{}' to class '{}', class is frozen, members are {}".format(
38 1
                key, self.__class__.__name__, self.__dict__.keys()))
39
        object.__setattr__(self, key, value)
40
41 1
if "PYOPCUA_NO_TYPO_CHECK" in os.environ:
42
    # typo check is cpu consuming, but it will make debug easy.
43
    # if typo check is not need (in production), please set env PYOPCUA_NO_TYPO_CHECK.
44 1
    # this will make all uatype class inherit from object intead of _FrozenClass
45 1
    # and skip the typo check.
46
    FrozenClass = object
47
else:
48
    FrozenClass = _FrozenClass
49 1
50 1
51 1
class ValueRank(IntEnum):
52 1
    """
53 1
    Defines dimensions of a variable.
54
    This enum does not support all cases since ValueRank support any n>0
55
    but since it is an IntEnum it can be replace by a normal int
56 1
    """
57 1
    ScalarOrOneDimension = -3
58 1
    Any = -2
59
    Scalar = -1
60
    OneOrMoreDimensions = 0
61
    OneDimension = 1
62
    # the next names are not in spec but so common we express them here
63
    TwoDimensions = 2
64
    ThreeDimensions = 3
65
    FourDimensions = 4
66 1
67
68
class _MaskEnum(IntEnum):
69 1
70
    @classmethod
71
    def parse_bitfield(cls, the_int):
72
        """ Take an integer and interpret it as a set of enum values. """
73 1
        assert isinstance(the_int, int)
74 1
75
        return {cls(b) for b in cls._bits(the_int)}
76
77 1
    @classmethod
78
    def to_bitfield(cls, collection):
79
        """ Takes some enum values and creates an integer from them. """
80 1
        # make sure all elements are of the correct type (use itertools.tee in case we get passed an
81
        # iterator)
82
        iter1, iter2 = itertools.tee(iter(collection))
83 1
        assert all(isinstance(x, cls) for x in iter1)
84 1
85 1
        return sum(x.mask for x in iter2)
86
87 1
    @property
88
    def mask(self):
89
        return 1 << self.value
90 1
91 1
    @staticmethod
92
    def _bits(n):
93 1
        """ Iterate over the bits in n.
94 1
95 1
            e.g. bits(44) yields at 2, 3, 5
96 1
        """
97 1
        assert n >= 0  # avoid infinite recursion
98 1
99 1
        pos = 0
100 1
        while n:
101 1
            if n & 0x1:
102 1
                yield pos
103
            n = n // 2
104
            pos += 1
105 1
106 1
107 1
class AccessLevel(_MaskEnum):
108
    """
109 1
    Bit index to indicate what the access level is.
110 1
111 1
    Spec Part 3, appears multiple times, e.g. paragraph 5.6.2 Variable NodeClass
112 1
    """
113 1
    CurrentRead = 0
114 1
    CurrentWrite = 1
115 1
    HistoryRead = 2
116 1
    HistoryWrite = 3
117 1
    SemanticChange = 4
118 1
    StatusWrite = 5
119
    TimestampWrite = 6
120
121
122 1
class WriteMask(_MaskEnum):
123 1
    """
124
    Bit index to indicate which attribute of a node is writable
125 1
126
    Spec Part 3, Paragraph 5.2.7 WriteMask
127 1
    """
128 1
    AccessLevel = 0
129 1
    ArrayDimensions = 1
130 1
    BrowseName = 2
131 1
    ContainsNoLoops = 3
132 1
    DataType = 4
133 1
    Description = 5
134 1
    DisplayName = 6
135 1
    EventNotifier = 7
136 1
    Executable = 8
137 1
    Historizing = 9
138 1
    InverseName = 10
139 1
    IsAbstract = 11
140 1
    MinimumSamplingInterval = 12
141
    NodeClass = 13
142 1
    NodeId = 14
143
    Symmetric = 15
144
    UserAccessLevel = 16
145
    UserExecutable = 17
146
    UserWriteMask = 18
147
    ValueRank = 19
148
    WriteMask = 20
149
    ValueForVariableType = 21
150
151
152
class EventNotifier(_MaskEnum):
153
    """
154
    Bit index to indicate how a node can be used for events.
155
156
    Spec Part 3, appears multiple times, e.g. Paragraph 5.4 View NodeClass
157
    """
158
    SubscribeToEvents = 0
159
    # Reserved        = 1
160 1
    HistoryRead = 2
161 1
    HistoryWrite = 3
162 1
163 1
164 1
class StatusCode(FrozenClass):
165 1
    """
166 1
    :ivar value:
167 1
    :vartype value: int
168 1
    :ivar name:
169 1
    :vartype name: string
170 1
    :ivar doc:
171
    :vartype doc: string
172
    """
173
174 1
    def __init__(self, value=0):
175 1
        if isinstance(value, str):
176
            self.name = value
177 1
            self.value = getattr(status_codes.StatusCodes, value)
178 1
        else:
179 1
            self.value = value
180 1
            self.name, self.doc = status_codes.get_name_and_doc(value)
181 1
        self._freeze = True
182
183
    def to_binary(self):
184
        return uabin.Primitives.UInt32.pack(self.value)
185 1
186 1
    @staticmethod
187 1
    def from_binary(data):
188
        val = uabin.Primitives.UInt32.unpack(data)
189 1
        sc = StatusCode(val)
190 1
        return sc
191 1
192 1
    def check(self):
193 1
        """
194 1
        Raises an exception if the status code is anything else than 0 (good).
195
196 1
        Use the is_good() method if you do not want an exception.
197 1
        """
198
        if not self.is_good():
199 1
            raise UaStatusCodeError(self.value)
200 1
201 1
    def is_good(self):
202 1
        """
203
        return True if status is Good.
204
        """
205 1
        mask = 3 << 30
206 1
        if mask & self.value:
207 1
            return False
208
        else:
209
            return True
210 1
211 1
    def __str__(self):
212 1
        return 'StatusCode({})'.format(self.name)
213
    __repr__ = __str__
214
215 1
    def __eq__(self, other):
216 1
        return self.value == other.value
217 1
218 1
    def __ne__(self, other):
219 1
        return not self.__eq__(other)
220 1
221 1
222
class NodeIdType(Enum):
223 1
    TwoByte = 0
224
    FourByte = 1
225
    Numeric = 2
226 1
    String = 3
227 1
    Guid = 4
228 1
    ByteString = 5
229 1
230 1
231
class NodeId(FrozenClass):
232
    """
233 1
    NodeId Object
234 1
235 1
    Args:
236 1
        identifier: The identifier might be an int, a string, bytes or a Guid
237 1
        namespaceidx(int): The index of the namespace
238
        nodeidtype(NodeIdType): The type of the nodeid if it cannor be guess or you want something special like twobyte nodeid or fourbytenodeid
239
240 1
241
    :ivar Identifier:
242
    :vartype Identifier: NodeId
243 1
    :ivar NamespaceIndex:
244
    :vartype NamespaceIndex: Int
245
    :ivar NamespaceUri:
246 1
    :vartype NamespaceUri: String
247 1
    :ivar ServerIndex:
248 1
    :vartype ServerIndex: Int
249
    """
250
    
251 1
    def __init__(self, identifier=None, namespaceidx=0, nodeidtype=None):
252 1
253 1
        self.Identifier = identifier
254
        self.NamespaceIndex = namespaceidx
255
        self.NodeIdType = nodeidtype
256 1
        self.NamespaceUri = ""
257 1
        self.ServerIndex = 0
258 1
        self._freeze = True
259
        if not isinstance(self.NamespaceIndex, int):
260
            raise UaError("NamespaceIndex must be an int")
261 1
        if self.Identifier is None:
262
            self.Identifier = 0
263
            self.NodeIdType = NodeIdType.TwoByte
264
            return
265
        if self.NodeIdType is None:
266
            if isinstance(self.Identifier, int):
267 1
                self.NodeIdType = NodeIdType.Numeric
268
            elif isinstance(self.Identifier, str):
269 1
                self.NodeIdType = NodeIdType.String
270 1
            elif isinstance(self.Identifier, bytes):
271
                self.NodeIdType = NodeIdType.ByteString
272 1
            else:
273
                raise UaError("NodeId: Could not guess type of NodeId, set NodeIdType")
274 1
275
    def __key(self):
276
        if self.NodeIdType in (NodeIdType.TwoByte, NodeIdType.FourByte, NodeIdType.Numeric):  # twobyte, fourbyte and numeric may represent the same node
277
            return self.NamespaceIndex, self.Identifier
278
        else:
279
            return self.NodeIdType, self.NamespaceIndex, self.Identifier
280
281 1
    def __eq__(self, node):
282
        return isinstance(node, NodeId) and self.__key() == node.__key()
283
284 1
    def __ne__(self, other):
285
        return not self.__eq__(other)
286
287
    def __hash__(self):
288
        return hash(self.__key())
289
290 1
    def is_null(self):
291 1
        if self.NamespaceIndex != 0:
292 1
            return False
293 1
        return self.has_null_identifier()
294 1
295
    def has_null_identifier(self):
296 1
        if not self.Identifier:
297 1
            return True
298 1
        if self.NodeIdType == NodeIdType.Guid and re.match(b'0.', self.Identifier):
299
            return True
300
        return False
301 1
302
    @staticmethod
303
    def from_string(string):
304 1
        try:
305 1
            return NodeId._from_string(string)
306 1
        except ValueError as ex:
307 1
            raise UaStringParsingError("Error parsing string {}".format(string), ex)
308 1
309
    @staticmethod
310
    def _from_string(string):
311 1
        l = string.split(";")
312
        identifier = None
313
        namespace = 0
314
        ntype = None
315 1
        srv = None
316 1
        nsu = None
317 1
        for el in l:
318 1
            if not el:
319 1
                continue
320
            k, v = el.split("=", 1)
321
            k = k.strip()
322 1
            v = v.strip()
323
            if k == "ns":
324 1
                namespace = int(v)
325 1
            elif k == "i":
326 1
                ntype = NodeIdType.Numeric
327
                identifier = int(v)
328 1
            elif k == "s":
329 1
                ntype = NodeIdType.String
330
                identifier = v
331 1
            elif k == "g":
332
                ntype = NodeIdType.Guid
333
                identifier = v
334 1
            elif k == "b":
335
                ntype = NodeIdType.ByteString
336 1
                identifier = v
337 1
            elif k == "srv":
338 1
                srv = v
339
            elif k == "nsu":
340 1
                nsu = v
341 1
        if identifier is None:
342
            raise UaStringParsingError("Could not find identifier in string: " + string)
343
        nodeid = NodeId(identifier, namespace, ntype)
344 1
        nodeid.NamespaceUri = nsu
345
        nodeid.ServerIndex = srv
346
        return nodeid
347
348
    def to_string(self):
349
        string = ""
350
        if self.NamespaceIndex != 0:
351
            string += "ns={};".format(self.NamespaceIndex)
352
        ntype = None
353
        if self.NodeIdType == NodeIdType.Numeric:
354
            ntype = "i"
355 1
        elif self.NodeIdType == NodeIdType.String:
356 1
            ntype = "s"
357 1
        elif self.NodeIdType == NodeIdType.TwoByte:
358 1
            ntype = "i"
359
        elif self.NodeIdType == NodeIdType.FourByte:
360 1
            ntype = "i"
361 1
        elif self.NodeIdType == NodeIdType.Guid:
362
            ntype = "g"
363 1
        elif self.NodeIdType == NodeIdType.ByteString:
364
            ntype = "b"
365 1
        string += "{}={}".format(ntype, self.Identifier)
366 1
        if self.ServerIndex:
367 1
            string = "srv=" + str(self.ServerIndex) + string
368
        if self.NamespaceUri:
369 1
            string += "nsu={}".format(self.NamespaceUri)
370
        return string
371
372
    def __str__(self):
373
        return "{}NodeId({})".format(self.NodeIdType.name, self.to_string())
374 1
    __repr__ = __str__
375 1
376
    def to_binary(self):
377 1
        if self.NodeIdType == NodeIdType.TwoByte:
378
            return struct.pack("<BB", self.NodeIdType.value, self.Identifier)
379
        elif self.NodeIdType == NodeIdType.FourByte:
380
            return struct.pack("<BBH", self.NodeIdType.value, self.NamespaceIndex, self.Identifier)
381 1
        elif self.NodeIdType == NodeIdType.Numeric:
382 1
            return struct.pack("<BHI", self.NodeIdType.value, self.NamespaceIndex, self.Identifier)
383 1
        elif self.NodeIdType == NodeIdType.String:
384
            return struct.pack("<BH", self.NodeIdType.value, self.NamespaceIndex) + \
385 1
                uabin.Primitives.String.pack(self.Identifier)
386
        elif self.NodeIdType == NodeIdType.ByteString:
387 1
            return struct.pack("<BH", self.NodeIdType.value, self.NamespaceIndex) + \
388
                uabin.Primitives.Bytes.pack(self.Identifier)
389 1
        elif self.NodeIdType == NodeIdType.Guid:
390
            return struct.pack("<BH", self.NodeIdType.value, self.NamespaceIndex) + \
391
                   uabin.Primitives.Guid.pack(self.Identifier)
392 1
        else:
393
            return struct.pack("<BH", self.NodeIdType.value, self.NamespaceIndex) + \
394
                self.Identifier.to_binary()
395
        # FIXME: Missing NNamespaceURI and ServerIndex
396 1
397 1
    @staticmethod
398 1
    def from_binary(data):
399 1
        nid = NodeId()
400 1
        encoding = ord(data.read(1))
401 1
        nid.NodeIdType = NodeIdType(encoding & 0b00111111)
402 1
403
        if nid.NodeIdType == NodeIdType.TwoByte:
404
            nid.Identifier = ord(data.read(1))
405 1
        elif nid.NodeIdType == NodeIdType.FourByte:
406
            nid.NamespaceIndex, nid.Identifier = struct.unpack("<BH", data.read(3))
407
        elif nid.NodeIdType == NodeIdType.Numeric:
408
            nid.NamespaceIndex, nid.Identifier = struct.unpack("<HI", data.read(6))
409
        elif nid.NodeIdType == NodeIdType.String:
410
            nid.NamespaceIndex = uabin.Primitives.UInt16.unpack(data)
411
            nid.Identifier = uabin.Primitives.String.unpack(data)
412
        elif nid.NodeIdType == NodeIdType.ByteString:
413
            nid.NamespaceIndex = uabin.Primitives.UInt16.unpack(data)
414
            nid.Identifier = uabin.Primitives.Bytes.unpack(data)
415
        elif nid.NodeIdType == NodeIdType.Guid:
416
            nid.NamespaceIndex = uabin.Primitives.UInt16.unpack(data)
417
            nid.Identifier = uabin.Primitives.Guid.unpack(data)
418
        else:
419
            raise UaError("Unknown NodeId encoding: " + str(nid.NodeIdType))
420
421
        if uabin.test_bit(encoding, 7):
422
            nid.NamespaceUri = uabin.Primitives.String.unpack(data)
423
        if uabin.test_bit(encoding, 6):
424
            nid.ServerIndex = uabin.Primitives.UInt32.unpack(data)
425
426 1
        return nid
427 1
428 1
429 1
class TwoByteNodeId(NodeId):
430 1
431 1
    def __init__(self, identifier):
432 1
        NodeId.__init__(self, identifier, 0, NodeIdType.TwoByte)
433 1
434 1
435 1
class FourByteNodeId(NodeId):
436 1
437 1
    def __init__(self, identifier, namespace=0):
438 1
        NodeId.__init__(self, identifier, namespace, NodeIdType.FourByte)
439 1
440 1
441 1
class NumericNodeId(NodeId):
442 1
443 1
    def __init__(self, identifier, namespace=0):
444
        NodeId.__init__(self, identifier, namespace, NodeIdType.Numeric)
445
446
447
class ByteStringNodeId(NodeId):
448
449 1
    def __init__(self, identifier, namespace=0):
450 1
        NodeId.__init__(self, identifier, namespace, NodeIdType.ByteString)
451 1
452
453 1
class GuidNodeId(NodeId):
454
455 1
    def __init__(self, identifier, namespace=0):
456 1
        NodeId.__init__(self, identifier, namespace, NodeIdType.Guid)
457
458 1
459 1
class StringNodeId(NodeId):
460
461 1
    def __init__(self, identifier, namespace=0):
462 1
        NodeId.__init__(self, identifier, namespace, NodeIdType.String)
463
464 1
465 1
ExpandedNodeId = NodeId
466 1
467 1
468
class QualifiedName(FrozenClass):
469 1
    """
470 1
    A string qualified with a namespace index.
471 1
    """
472 1
473 1
    def __init__(self, name=None, namespaceidx=0):
474 1
        if not isinstance(namespaceidx, int):
475
            raise UaError("namespaceidx must be an int")
476 1
        self.NamespaceIndex = namespaceidx
477
        self.Name = name
478 1
        self._freeze = True
479 1
480 1
    def to_string(self):
481 1
        return "{}:{}".format(self.NamespaceIndex, self.Name)
482
483 1
    @staticmethod
484
    def from_string(string):
485 1
        if ":" in string:
486 1
            try:
487 1
                idx, name = string.split(":", 1)
488 1
                idx = int(idx)
489 1
            except (TypeError, ValueError) as ex:
490 1
                raise UaStringParsingError("Error parsing string {}".format(string), ex)
491 1
        else:
492 1
            idx = 0
493 1
            name = string
494 1
        return QualifiedName(name, idx)
495 1
496 1
    def to_binary(self):
497 1
        packet = []
498 1
        packet.append(uabin.Primitives.UInt16.pack(self.NamespaceIndex))
499 1
        packet.append(uabin.Primitives.String.pack(self.Name))
500 1
        return b''.join(packet)
501 1
502 1
    @staticmethod
503 1
    def from_binary(data):
504 1
        obj = QualifiedName()
505 1
        obj.NamespaceIndex = uabin.Primitives.UInt16.unpack(data)
506
        obj.Name = uabin.Primitives.String.unpack(data)
507
        return obj
508 1
509
    def __eq__(self, bname):
510
        return isinstance(bname, QualifiedName) and self.Name == bname.Name and self.NamespaceIndex == bname.NamespaceIndex
511 1
512 1
    def __ne__(self, other):
513
        return not self.__eq__(other)
514
515 1
    def __lt__(self, other):
516 1
        if not isinstance(other, QualifiedName):
517 1
            raise TypeError("Cannot compare QualifiedName and {}".format(other))
518 1
        if self.NamespaceIndex == other.NamespaceIndex:
519 1
            return self.Name < other.Name
520 1
        else:
521
            return self.NamespaceIndex < other.NamespaceIndex
522 1
523 1
    def __str__(self):
524 1
        return 'QualifiedName({}:{})'.format(self.NamespaceIndex, self.Name)
525 1
526 1
    __repr__ = __str__
527 1
528 1
529 1
class LocalizedText(FrozenClass):
530 1
    """
531 1
    A string qualified with a namespace index.
532 1
    """
533 1
534 1
    ua_types = {
535
        "Text": "ByteString",
536
        "Locale": "ByteString"
537
    }
538
539 1
    def __init__(self, text=None):
540 1
        self.Encoding = 0
541
        self.Text = text
542 1
        if isinstance(self.Text, unicode):
543
            self.Text = self.Text.encode('utf-8')
544 1
        if self.Text:
545
            self.Encoding |= (1 << 1)
546 1
        self.Locale = None
547 1
        self._freeze = True
548 1
549
    def to_binary(self):
550 1
        packet = []
551 1
        if self.Locale:
552 1
            self.Encoding |= (1 << 0)
553 1
        if self.Text:
554 1
            self.Encoding |= (1 << 1)
555 1
        packet.append(uabin.Primitives.UInt8.pack(self.Encoding))
556 1
        if self.Locale:
557 1
            packet.append(uabin.Primitives.Bytes.pack(self.Locale))
558 1
        if self.Text:
559
            packet.append(uabin.Primitives.Bytes.pack(self.Text))
560 1
        return b''.join(packet)
561 1
562
    @staticmethod
563
    def from_binary(data):
564 1
        obj = LocalizedText()
565
        obj.Encoding = ord(data.read(1))
566
        if obj.Encoding & (1 << 0):
567
            obj.Locale = uabin.Primitives.Bytes.unpack(data)
568 1
        if obj.Encoding & (1 << 1):
569
            obj.Text = uabin.Primitives.Bytes.unpack(data)
570 1
        return obj
571 1
572 1
    def to_string(self):
573
        # FIXME: use local
574 1
        if self.Text is None:
575 1
            return ""
576 1
        return self.Text.decode('utf-8')
577 1
578 1
    def __str__(self):
579 1
        return 'LocalizedText(' + 'Encoding:' + str(self.Encoding) + ', ' + \
580 1
            'Locale:' + str(self.Locale) + ', ' + \
581 1
            'Text:' + str(self.Text) + ')'
582 1
    __repr__ = __str__
583 1
584 1
    def __eq__(self, other):
585 1
        if isinstance(other, LocalizedText) and self.Locale == other.Locale and self.Text == other.Text:
586 1
            return True
587 1
        return False
588 1
589
    def __ne__(self, other):
590
        return not self.__eq__(other)
591
592 1
593
class ExtensionObject(FrozenClass):
594 1
    """
595
    Any UA object packed as an ExtensionObject
596
597 1
    :ivar TypeId:
598
    :vartype TypeId: NodeId
599
    :ivar Body:
600 1
    :vartype Body: bytes
601
    """
602 1
603 1
    def __init__(self):
604
        self.TypeId = NodeId()
605
        self.Encoding = 0
606 1
        self.Body = b''
607
        self._freeze = True
608 1
609 1
    def to_binary(self):
610
        packet = []
611
        if self.Body:
612 1
            self.Encoding |= (1 << 0)
613
        packet.append(self.TypeId.to_binary())
614 1
        packet.append(uabin.Primitives.UInt8.pack(self.Encoding))
615 1
        if self.Body:
616
            packet.append(uabin.Primitives.ByteString.pack(self.Body))
617
        return b''.join(packet)
618 1
619
    @staticmethod
620 1
    def from_binary(data):
621 1
        obj = ExtensionObject()
622
        obj.TypeId = NodeId.from_binary(data)
623
        obj.Encoding = uabin.Primitives.UInt8.unpack(data)
624 1
        if obj.Encoding & (1 << 0):
625
            obj.Body = uabin.Primitives.ByteString.unpack(data)
626 1
        return obj
627 1
628
    @staticmethod
629
    def from_object(obj):
630 1
        ext = ExtensionObject()
631
        oid = getattr(ObjectIds, "{}_Encoding_DefaultBinary".format(obj.__class__.__name__))
632 1
        ext.TypeId = FourByteNodeId(oid)
633 1
        ext.Body = obj.to_binary()
634
        return ext
635
636 1
    def __str__(self):
637
        return 'ExtensionObject(' + 'TypeId:' + str(self.TypeId) + ', ' + \
638
            'Encoding:' + str(self.Encoding) + ', ' + str(len(self.Body)) + ' bytes)'
639 1
640
    __repr__ = __str__
641
642
643
class VariantType(Enum):
644
    """
645 1
    The possible types of a variant.
646 1
647
    :ivar Null:
648 1
    :ivar Boolean:
649 1
    :ivar SByte:
650 1
    :ivar Byte:
651
    :ivar Int16:
652 1
    :ivar UInt16:
653 1
    :ivar Int32:
654
    :ivar UInt32:
655 1
    :ivar Int64:
656
    :ivar UInt64:
657 1
    :ivar Float:
658 1
    :ivar Double:
659 1
    :ivar String:
660 1
    :ivar DateTime:
661 1
    :ivar Guid:
662 1
    :ivar ByteString:
663
    :ivar XmlElement:
664 1
    :ivar NodeId:
665 1
    :ivar ExpandedNodeId:
666 1
    :ivar StatusCode:
667
    :ivar QualifiedName:
668 1
    :ivar LocalizedText:
669 1
    :ivar ExtensionObject:
670 1
    :ivar DataValue:
671 1
    :ivar Variant:
672 1
    :ivar DiagnosticInfo:
673
    """
674 1
675
    Null = 0
676 1
    Boolean = 1
677 1
    SByte = 2
678 1
    Byte = 3
679 1
    Int16 = 4
680
    UInt16 = 5
681 1
    Int32 = 6
682 1
    UInt32 = 7
683
    Int64 = 8
684 1
    UInt64 = 9
685
    Float = 10
686
    Double = 11
687 1
    String = 12
688
    DateTime = 13
689
    Guid = 14
690 1
    ByteString = 15
691
    XmlElement = 16
692
    NodeId = 17
693 1
    ExpandedNodeId = 18
694
    StatusCode = 19
695
    QualifiedName = 20
696
    LocalizedText = 21
697
    ExtensionObject = 22
698
    DataValue = 23
699 1
    Variant = 24
700 1
    DiagnosticInfo = 25
701 1
702 1
703 1
class VariantTypeCustom(object):
704 1
    """
705 1
    Looks like sometime we get variant with other values than those
706 1
    defined in VariantType.
707 1
    FIXME: We should not need this class, as far as I iunderstand the spec
708
    variants can only be of VariantType
709 1
    """
710 1
711 1
    def __init__(self, val):
712
        self.name = "Custom"
713 1
        self.value = val
714 1
        if self.value > 0b00111111:
715 1
            raise UaError("Cannot create VariantType. VariantType must be {} > x > {}, received {}".format(0b111111, 25, val))
716 1
717
    def __str__(self):
718 1
        return "VariantType.Custom:{}".format(self.value)
719 1
    __repr__ = __str__
720 1
721
    def __eq__(self, other):
722 1
        return self.value == other.value
723
724 1
725 1
class Variant(FrozenClass):
726 1
    """
727
    Create an OPC-UA Variant object.
728 1
    if no argument a Null Variant is created.
729 1
    if not variant type is given, attemps to guess type from python type
730 1
    if a variant is given as value, the new objects becomes a copy of the argument
731
732 1
    :ivar Value:
733
    :vartype Value: Any supported type
734
    :ivar VariantType:
735
    :vartype VariantType: VariantType
736 1
    """
737
738
    def __init__(self, value=None, varianttype=None, dimensions=None):
739
        self.Value = value
740 1
        self.VariantType = varianttype
741
        self.Dimensions = dimensions
742 1
        self._freeze = True
743 1
        if isinstance(value, Variant):
744 1
            self.Value = value.Value
745 1
            self.VariantType = value.VariantType
746
        if self.VariantType is None:
747 1
            self.VariantType = self._guess_type(self.Value)
748 1
        if self.Dimensions is None and type(self.Value) in (list, tuple):
749
            dims = get_shape(self.Value)
750
            if len(dims) > 1:
751 1
                self.Dimensions = dims
752
753
    def __eq__(self, other):
754
        if isinstance(other, Variant) and self.VariantType == other.VariantType and self.Value == other.Value:
755
            return True
756
        return False
757
758
    def __ne__(self, other):
759
        return not self.__eq__(other)
760
761
    def _guess_type(self, val):
762
        if isinstance(val, (list, tuple)):
763
            error_val = val
764
        while isinstance(val, (list, tuple)):
765 1
            if len(val) == 0:
766 1
                raise UaError("could not guess UA type of variable {}".format(error_val))
767 1
            val = val[0]
768 1
        if val is None:
769 1
            return VariantType.Null
770
        elif isinstance(val, bool):
771 1
            return VariantType.Boolean
772 1
        elif isinstance(val, float):
773 1
            return VariantType.Double
774 1
        elif isinstance(val, int):
775 1
            return VariantType.Int64
776 1
        elif type(val) in (str, unicode):
777 1
            return VariantType.String
778 1
        elif isinstance(val, bytes):
779 1
            return VariantType.ByteString
780
        elif isinstance(val, datetime):
781 1
            return VariantType.DateTime
782
        elif isinstance(val, uuid.UUID):
783
            return VariantType.Guid
784
        else:
785
            if isinstance(val, object):
786
                try:
787
                    return getattr(VariantType, val.__class__.__name__)
788
                except AttributeError:
789
                    return VariantType.ExtensionObject
790 1
            else:
791
                raise UaError("Could not guess UA type of {} with type {}, specify UA type".format(val, type(val)))
792
793
    def __str__(self):
794
        return "Variant(val:{!s},type:{})".format(self.Value, self.VariantType)
795
    __repr__ = __str__
796
797
    def to_binary(self):
798 1
        b = []
799
        encoding = self.VariantType.value & 0b111111
800
        if type(self.Value) in (list, tuple):
801
            if self.Dimensions is not None:
802 1
                encoding = uabin.set_bit(encoding, 6)
803
            encoding = uabin.set_bit(encoding, 7)
804
            b.append(uabin.Primitives.UInt8.pack(encoding))
805 1 View Code Duplication
            b.append(uabin.pack_uatype_array(self.VariantType, flatten(self.Value)))
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
806
            if self.Dimensions is not None:
807
                b.append(uabin.pack_uatype_array(VariantType.Int32, self.Dimensions))
808
        else:
809
            b.append(uabin.Primitives.UInt8.pack(encoding))
810
            b.append(uabin.pack_uatype(self.VariantType, self.Value))
811
812
        return b"".join(b)
813
814
    @staticmethod
815
    def from_binary(data):
816
        dimensions = None
817
        encoding = ord(data.read(1))
818
        int_type = encoding & 0b00111111
819
        vtype = datatype_to_varianttype(int_type)
820
        if vtype == VariantType.Null:
821
            return Variant(None, vtype, encoding)
822
        if uabin.test_bit(encoding, 7):
823
            value = uabin.unpack_uatype_array(vtype, data)
824
        else:
825
            value = uabin.unpack_uatype(vtype, data)
826
        if uabin.test_bit(encoding, 6):
827
            dimensions = uabin.unpack_uatype_array(VariantType.Int32, data)
828
            value = reshape(value, dimensions)
829
830
        return Variant(value, vtype, dimensions)
831
832
833
def reshape(flat, dims):
834
    subdims = dims[1:]
835
    subsize = 1
836
    for i in subdims:
837
        if i == 0:
838
            i = 1
839
        subsize *= i
840 1
    while dims[0] * subsize > len(flat):
841 1
        flat.append([])
842 1
    if not subdims or subdims == [0]:
843 1
        return flat
844 1
    return [reshape(flat[i: i + subsize], subdims) for i in range(0, len(flat), subsize)]
845 1
846 1
847 1
def _split_list(l, n):
848 1
    n = max(1, n)
849 1
    return [l[i:i + n] for i in range(0, len(l), n)]
850 1
851 1
852 1
def flatten_and_get_shape(mylist):
853 1
    dims = []
854 1
    dims.append(len(mylist))
855 1
    while isinstance(mylist[0], (list, tuple)):
856 1
        dims.append(len(mylist[0]))
857 1
        mylist = [item for sublist in mylist for item in sublist]
858 1
        if len(mylist) == 0:
859 1
            break
860 1
    return mylist, dims
861 1
862 1
863 1
def flatten(mylist):
864 1
    if len(mylist) == 0:
865 1
        return mylist
866
    while isinstance(mylist[0], (list, tuple)):
867
        mylist = [item for sublist in mylist for item in sublist]
868 1
        if len(mylist) == 0:
869 1
            break
870 1
    return mylist
871 1
872 1
873 1
def get_shape(mylist):
874
    dims = []
875 1
    while isinstance(mylist, (list, tuple)):
876
        dims.append(len(mylist))
877 1
        if len(mylist) == 0:
878
            break
879 1
        mylist = mylist[0]
880 1
    return dims
881
882
883 1
class XmlElement(FrozenClass):
884
    """
885
    An XML element encoded as an UTF-8 string.
886
    """
887
888
    def __init__(self, binary=None):
889
        if binary is not None:
890
            self._binary_init(binary)
891
            self._freeze = True
892
            return
893
        self.Value = []
894
        self._freeze = True
895
896
    def to_binary(self):
897 1
        return uabin.Primitives.String.pack(self.Value)
898 1
899 1
    @staticmethod
900 1
    def from_binary(data):
901 1
        return XmlElement(data)
902 1
903 1
    def _binary_init(self, data):
904 1
        self.Value = uabin.Primitives.String.unpack(data)
905 1
906 1
    def __str__(self):
907 1
        return 'XmlElement(Value:' + str(self.Value) + ')'
908 1
909 1
    __repr__ = __str__
910 1
911
912 1
class DataValue(FrozenClass):
913 1
    """
914 1
    A value with an associated timestamp, and quality.
915 1
    Automatically generated from xml , copied and modified here to fix errors in xml spec
916
917 1
    :ivar Value:
918 1
    :vartype Value: Variant
919
    :ivar StatusCode:
920 1
    :vartype StatusCode: StatusCode
921 1
    :ivar SourceTimestamp:
922 1
    :vartype SourceTimestamp: datetime
923 1
    :ivar SourcePicoSeconds:
924 1
    :vartype SourcePicoSeconds: int
925
    :ivar ServerTimestamp:
926 1
    :vartype ServerTimestamp: datetime
927 1
    :ivar ServerPicoseconds:
928 1
    :vartype ServerPicoseconds: int
929 1
    """
930 1
931 1
    def __init__(self, variant=None, status=None):
932 1
        self.Encoding = 0
933 1
        if not isinstance(variant, Variant):
934 1
            variant = Variant(variant)
935 1
        self.Value = variant
936 1
        if status is None:
937 1
            self.StatusCode = StatusCode()
938 1
        else:
939 1
            self.StatusCode = status
940 1
        self.SourceTimestamp = None  # DateTime()
941
        self.SourcePicoseconds = None
942 1
        self.ServerTimestamp = None  # DateTime()
943 1
        self.ServerPicoseconds = None
944 1
        self._freeze = True
945 1
946 1
    def to_binary(self):
947
        packet = []
948
        if self.Value:
949
            self.Encoding |= (1 << 0)
950 1
        if self.StatusCode:
951 1
            self.Encoding |= (1 << 1)
952 1
        if self.SourceTimestamp:
953
            self.Encoding |= (1 << 2)
954 1
        if self.ServerTimestamp:
955 1
            self.Encoding |= (1 << 3)
956 1
        if self.SourcePicoseconds:
957 1
            self.Encoding |= (1 << 4)
958 1
        if self.ServerPicoseconds:
959 1
            self.Encoding |= (1 << 5)
960 1
        packet.append(uabin.Primitives.UInt8.pack(self.Encoding))
961 1
        if self.Value:
962 1
            packet.append(self.Value.to_binary())
963 1
        if self.StatusCode:
964 1
            packet.append(self.StatusCode.to_binary())
965
        if self.SourceTimestamp:
966 1
            packet.append(uabin.Primitives.DateTime.pack(self.SourceTimestamp))  # self.SourceTimestamp.to_binary())
967 1
        if self.ServerTimestamp:
968
            packet.append(uabin.Primitives.DateTime.pack(self.ServerTimestamp))  # self.ServerTimestamp.to_binary())
969 1
        if self.SourcePicoseconds:
970
            packet.append(uabin.Primitives.UInt16.pack(self.SourcePicoseconds))
971 1
        if self.ServerPicoseconds:
972
            packet.append(uabin.Primitives.UInt16.pack(self.ServerPicoseconds))
973 1
        return b''.join(packet)
974 1
975 1
    @staticmethod
976 1
    def from_binary(data):
977 1
        encoding = ord(data.read(1))
978
        if encoding & (1 << 0):
979 1
            value = Variant.from_binary(data)
980 1
        else:
981 1
            value = None
982 1
        if encoding & (1 << 1):
983 1
            status = StatusCode.from_binary(data)
984
        else:
985 1
            status = None
986 1
        obj = DataValue(value, status)
987 1
        obj.Encoding = encoding
988 1
        if obj.Encoding & (1 << 2):
989
            obj.SourceTimestamp = uabin.Primitives.DateTime.unpack(data)  # DateTime.from_binary(data)
990 1
        if obj.Encoding & (1 << 3):
991
            obj.ServerTimestamp = uabin.Primitives.DateTime.unpack(data)  # DateTime.from_binary(data)
992
        if obj.Encoding & (1 << 4):
993 1
            obj.SourcePicoseconds = uabin.Primitives.UInt16.unpack(data)
994 1
        if obj.Encoding & (1 << 5):
995 1
            obj.ServerPicoseconds = uabin.Primitives.UInt16.unpack(data)
996 1
        return obj
997 1
998 1
    def __str__(self):
999 1
        s = 'DataValue(Value:{}'.format(self.Value)
1000 1
        if self.StatusCode is not None:
1001 1
            s += ', StatusCode:{}'.format(self.StatusCode)
1002 1
        if self.SourceTimestamp is not None:
1003 1
            s += ', SourceTimestamp:{}'.format(self.SourceTimestamp)
1004 1
        if self.ServerTimestamp is not None:
1005
            s += ', ServerTimestamp:{}'.format(self.ServerTimestamp)
1006
        if self.SourcePicoseconds is not None:
1007 1
            s += ', SourcePicoseconds:{}'.format(self.SourcePicoseconds)
1008
        if self.ServerPicoseconds is not None:
1009
            s += ', ServerPicoseconds:{}'.format(self.ServerPicoseconds)
1010
        s += ')'
1011
        return s
1012 1
1013
    __repr__ = __str__
1014
1015
1016
def datatype_to_varianttype(int_type):
1017
    """
1018
    Takes a NodeId or int and return a VariantType
1019
    This is only supported if int_type < 63 due to VariantType encoding
1020
    At low level we do not have access to address space thus decoding is limited
1021
    a better version of this method can be find in ua_utils.py
1022
    """
1023 1
    if isinstance(int_type, NodeId):
1024 1
        int_type = int_type.Identifier
1025 1
1026 1
    if int_type <= 25:
1027 1
        return VariantType(int_type)
1028 1
    else:
1029
        return VariantTypeCustom(int_type)
1030