Completed
Pull Request — master (#388)
by Olivier
04:23
created

ExtensionObject.from_binary()   A

Complexity

Conditions 2

Size

Total Lines 8

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 2.0116

Importance

Changes 0
Metric Value
cc 2
dl 0
loc 8
ccs 6
cts 7
cp 0.8571
crap 2.0116
rs 9.4285
c 0
b 0
f 0
1
"""
2
implement ua datatypes
3
"""
4 1
import struct
5 1
from enum import Enum, IntEnum, EnumMeta
6 1
from datetime import datetime
7 1
import sys
8 1
import os
9 1
import uuid
10 1
import re
11 1
import itertools
12
13 1
from opcua.ua import ua_binary as uabin
14 1
from opcua.ua import status_codes
15 1
from opcua.ua import ObjectIds
16 1
from opcua.ua.uaerrors import UaError
17 1
from opcua.ua.uaerrors import UaStatusCodeError
18 1
from opcua.ua.uaerrors import UaStringParsingError
19 1
from opcua.common.utils import Buffer
20
21
22 1
if sys.version_info.major > 2:
23 1
    unicode = str
24 1
def get_win_epoch():
25 1
    return uabin.win_epoch_to_datetime(0)
26
27
28 1
class _FrozenClass(object):
29
30
    """
31
    Make it impossible to add members to a class.
32
    Not pythonic at all but we found out it prevents many many
33
    bugs in use of protocol structures
34
    """
35 1
    _freeze = False
36
37 1
    def __setattr__(self, key, value):
38 1
        if self._freeze and not hasattr(self, key):
39
            raise TypeError("Error adding member '{0}' to class '{1}', class is frozen, members are {2}".format(
40
                key, self.__class__.__name__, self.__dict__.keys()))
41 1
        object.__setattr__(self, key, value)
42
43
44 1
if "PYOPCUA_NO_TYPO_CHECK" in os.environ:
45
    # typo check is cpu consuming, but it will make debug easy.
46
    # if typo check is not need (in production), please set env PYOPCUA_NO_TYPO_CHECK.
47
    # this will make all uatype class inherit from object intead of _FrozenClass
48
    # and skip the typo check.
49
    FrozenClass = object
50
else:
51 1
    FrozenClass = _FrozenClass
52
53
54 1
class ValueRank(IntEnum):
55
    """
56
    Defines dimensions of a variable.
57
    This enum does not support all cases since ValueRank support any n>0
58
    but since it is an IntEnum it can be replace by a normal int
59
    """
60 1
    ScalarOrOneDimension = -3
61 1
    Any = -2
62 1
    Scalar = -1
63 1
    OneOrMoreDimensions = 0
64 1
    OneDimension = 1
65
    # the next names are not in spec but so common we express them here
66 1
    TwoDimensions = 2
67 1
    ThreeDimensions = 3
68 1
    FourDimensions = 4
69
70
71 1
class _MaskEnum(IntEnum):
72
73 1
    @classmethod
74
    def parse_bitfield(cls, the_int):
75
        """ Take an integer and interpret it as a set of enum values. """
76 1
        assert isinstance(the_int, int)
77
78 1
        return {cls(b) for b in cls._bits(the_int)}
79
80 1
    @classmethod
81
    def to_bitfield(cls, collection):
82
        """ Takes some enum values and creates an integer from them. """
83
        # make sure all elements are of the correct type (use itertools.tee in case we get passed an
84
        # iterator)
85 1
        iter1, iter2 = itertools.tee(iter(collection))
86 1
        assert all(isinstance(x, cls) for x in iter1)
87
88 1
        return sum(x.mask for x in iter2)
89
90 1
    @property
91
    def mask(self):
92 1
        return 1 << self.value
93
94 1
    @staticmethod
95
    def _bits(n):
96
        """ Iterate over the bits in n.
97
98
            e.g. bits(44) yields at 2, 3, 5
99
        """
100 1
        assert n >= 0  # avoid infinite recursion
101
102 1
        pos = 0
103 1
        while n:
104 1
            if n & 0x1:
105 1
                yield pos
106 1
            n = n // 2
107 1
            pos += 1
108
109
110 1
class AccessLevel(_MaskEnum):
111
    """
112
    Bit index to indicate what the access level is.
113
114
    Spec Part 3, appears multiple times, e.g. paragraph 5.6.2 Variable NodeClass
115
    """
116 1
    CurrentRead = 0
117 1
    CurrentWrite = 1
118 1
    HistoryRead = 2
119 1
    HistoryWrite = 3
120 1
    SemanticChange = 4
121 1
    StatusWrite = 5
122 1
    TimestampWrite = 6
123
124
125 1
class WriteMask(_MaskEnum):
126
    """
127
    Bit index to indicate which attribute of a node is writable
128
129
    Spec Part 3, Paragraph 5.2.7 WriteMask
130
    """
131 1
    AccessLevel = 0
132 1
    ArrayDimensions = 1
133 1
    BrowseName = 2
134 1
    ContainsNoLoops = 3
135 1
    DataType = 4
136 1
    Description = 5
137 1
    DisplayName = 6
138 1
    EventNotifier = 7
139 1
    Executable = 8
140 1
    Historizing = 9
141 1
    InverseName = 10
142 1
    IsAbstract = 11
143 1
    MinimumSamplingInterval = 12
144 1
    NodeClass = 13
145 1
    NodeId = 14
146 1
    Symmetric = 15
147 1
    UserAccessLevel = 16
148 1
    UserExecutable = 17
149 1
    UserWriteMask = 18
150 1
    ValueRank = 19
151 1
    WriteMask = 20
152 1
    ValueForVariableType = 21
153
154
155 1
class EventNotifier(_MaskEnum):
156
    """
157
    Bit index to indicate how a node can be used for events.
158
159
    Spec Part 3, appears multiple times, e.g. Paragraph 5.4 View NodeClass
160
    """
161 1
    SubscribeToEvents = 0
162
    # Reserved        = 1
163 1
    HistoryRead = 2
164 1
    HistoryWrite = 3
165
166
167 1
class StatusCode(FrozenClass):
168
    """
169
    :ivar value:
170
    :vartype value: int
171
    :ivar name:
172
    :vartype name: string
173
    :ivar doc:
174
    :vartype doc: string
175
    """
176
177 1
    def __init__(self, value=0):
178 1
        if isinstance(value, str):
179 1
            self.name = value
180 1
            self.value = getattr(status_codes.StatusCodes, value)
181
        else:
182 1
            self.value = value
183 1
            self.name, self.doc = status_codes.get_name_and_doc(value)
184 1
        self._freeze = True
185
186 1
    def to_binary(self):
187 1
        return uabin.Primitives.UInt32.pack(self.value)
188
189 1
    @staticmethod
190
    def from_binary(data):
191 1
        val = uabin.Primitives.UInt32.unpack(data)
192 1
        sc = StatusCode(val)
193 1
        return sc
194
195 1
    def check(self):
196
        """
197
        Raises an exception if the status code is anything else than 0 (good).
198
199
        Use the is_good() method if you do not want an exception.
200
        """
201 1
        if not self.is_good():
202 1
            raise UaStatusCodeError(self.value)
203
204 1
    def is_good(self):
205
        """
206
        return True if status is Good.
207
        """
208 1
        mask = 3 << 30
209 1
        if mask & self.value:
210 1
            return False
211
        else:
212 1
            return True
213
214 1
    def __str__(self):
215
        return 'StatusCode({0})'.format(self.name)
216 1
    __repr__ = __str__
217
218 1
    def __eq__(self, other):
219 1
        return self.value == other.value
220
221 1
    def __ne__(self, other):
222
        return not self.__eq__(other)
223
224
225 1
class NodeIdType(IntEnum):
226 1
    TwoByte = 0
227 1
    FourByte = 1
228 1
    Numeric = 2
229 1
    String = 3
230 1
    Guid = 4
231 1
    ByteString = 5
232
233
234 1
class NodeId(FrozenClass):
235
    """
236
    NodeId Object
237
238
    Args:
239
        identifier: The identifier might be an int, a string, bytes or a Guid
240
        namespaceidx(int): The index of the namespace
241
        nodeidtype(NodeIdType): The type of the nodeid if it cannor be guess or you want something special like twobyte nodeid or fourbytenodeid
242
243
244
    :ivar Identifier:
245
    :vartype Identifier: NodeId
246
    :ivar NamespaceIndex:
247
    :vartype NamespaceIndex: Int
248
    :ivar NamespaceUri:
249
    :vartype NamespaceUri: String
250
    :ivar ServerIndex:
251
    :vartype ServerIndex: Int
252
    """
253
    
254 1
    def __init__(self, identifier=None, namespaceidx=0, nodeidtype=None):
255
256 1
        self.Identifier = identifier
257 1
        self.NamespaceIndex = namespaceidx
258 1
        self.NodeIdType = nodeidtype
259 1
        self.NamespaceUri = ""
260 1
        self.ServerIndex = 0
261 1
        self._freeze = True
262 1
        if not isinstance(self.NamespaceIndex, int):
263 1
            raise UaError("NamespaceIndex must be an int")
264 1
        if self.Identifier is None:
265 1
            self.Identifier = 0
266 1
            self.NodeIdType = NodeIdType.TwoByte
267 1
            return
268 1
        if self.NodeIdType is None:
269 1
            if isinstance(self.Identifier, int):
270 1
                self.NodeIdType = NodeIdType.Numeric
271 1
            elif isinstance(self.Identifier, str):
272 1
                self.NodeIdType = NodeIdType.String
273 1
            elif isinstance(self.Identifier, bytes):
274
                self.NodeIdType = NodeIdType.ByteString
275 1
            elif isinstance(self.Identifier, uuid.UUID):
276 1
                self.NodeIdType = NodeIdType.Guid
277
            else:
278
                raise UaError("NodeId: Could not guess type of NodeId, set NodeIdType")
279
280 1
    def _key(self):
281 1
        if self.NodeIdType in (NodeIdType.TwoByte, NodeIdType.FourByte, NodeIdType.Numeric): 
282
            # twobyte, fourbyte and numeric may represent the same node
283 1
            return (NodeIdType.Numeric, self.NamespaceIndex, self.Identifier)
284 1
        return (self.NodeIdType, self.NamespaceIndex, self.Identifier)
285
286 1
    def __eq__(self, node):
287 1
        return isinstance(node, NodeId) and self._key() == node._key()
288
289 1
    def __ne__(self, other):
290 1
        return not self.__eq__(other)
291
292 1
    def __hash__(self):
293 1
        return hash(self._key())
294
295 1
    def __lt__(self, other):
296 1
        if not isinstance(other, NodeId):
297
            raise AttributeError("Can only compare to NodeId")
298 1
        return self._key() < other._key()
299
300 1
    def is_null(self):
301 1
        if self.NamespaceIndex != 0:
302 1
            return False
303 1
        return self.has_null_identifier()
304
305 1
    def has_null_identifier(self):
306 1
        if not self.Identifier:
307 1
            return True
308 1
        if self.NodeIdType == NodeIdType.Guid and re.match(b'0.', self.Identifier):
309 1
            return True
310 1
        return False
311
312 1
    @staticmethod
313
    def from_string(string):
314 1
        try:
315 1
            return NodeId._from_string(string)
316 1
        except ValueError as ex:
317 1
            raise UaStringParsingError("Error parsing string {0}".format(string), ex)
318
319 1
    @staticmethod
320
    def _from_string(string):
321 1
        l = string.split(";")
322 1
        identifier = None
323 1
        namespace = 0
324 1
        ntype = None
325 1
        srv = None
326 1
        nsu = None
327 1
        for el in l:
328 1
            if not el:
329 1
                continue
330 1
            k, v = el.split("=", 1)
331 1
            k = k.strip()
332 1
            v = v.strip()
333 1
            if k == "ns":
334 1
                namespace = int(v)
335 1
            elif k == "i":
336 1
                ntype = NodeIdType.Numeric
337 1
                identifier = int(v)
338 1
            elif k == "s":
339 1
                ntype = NodeIdType.String
340 1
                identifier = v
341 1
            elif k == "g":
342
                ntype = NodeIdType.Guid
343
                identifier = v
344 1
            elif k == "b":
345
                ntype = NodeIdType.ByteString
346
                identifier = v
347 1
            elif k == "srv":
348 1
                srv = v
349
            elif k == "nsu":
350
                nsu = v
351 1
        if identifier is None:
352 1
            raise UaStringParsingError("Could not find identifier in string: " + string)
353 1
        nodeid = NodeId(identifier, namespace, ntype)
354 1
        nodeid.NamespaceUri = nsu
355 1
        nodeid.ServerIndex = srv
356 1
        return nodeid
357
358 1
    def to_string(self):
359 1
        string = ""
360 1
        if self.NamespaceIndex != 0:
361 1
            string += "ns={0};".format(self.NamespaceIndex)
362 1
        ntype = None
363 1
        if self.NodeIdType == NodeIdType.Numeric:
364 1
            ntype = "i"
365 1
        elif self.NodeIdType == NodeIdType.String:
366 1
            ntype = "s"
367 1
        elif self.NodeIdType == NodeIdType.TwoByte:
368 1
            ntype = "i"
369 1
        elif self.NodeIdType == NodeIdType.FourByte:
370 1
            ntype = "i"
371
        elif self.NodeIdType == NodeIdType.Guid:
372
            ntype = "g"
373
        elif self.NodeIdType == NodeIdType.ByteString:
374
            ntype = "b"
375 1
        string += "{0}={1}".format(ntype, self.Identifier)
376 1
        if self.ServerIndex:
377
            string = "srv=" + str(self.ServerIndex) + string
378 1
        if self.NamespaceUri:
379
            string += "nsu={0}".format(self.NamespaceUri)
380 1
        return string
381
382 1
    def __str__(self):
383 1
        return "{0}NodeId({1})".format(self.NodeIdType.name, self.to_string())
384 1
    __repr__ = __str__
385
386 1
    def to_binary(self):
387 1
        if self.NodeIdType == NodeIdType.TwoByte:
388 1
            return struct.pack("<BB", self.NodeIdType.value, self.Identifier)
389 1
        elif self.NodeIdType == NodeIdType.FourByte:
390 1
            return struct.pack("<BBH", self.NodeIdType.value, self.NamespaceIndex, self.Identifier)
391 1
        elif self.NodeIdType == NodeIdType.Numeric:
392 1
            return struct.pack("<BHI", self.NodeIdType.value, self.NamespaceIndex, self.Identifier)
393 1
        elif self.NodeIdType == NodeIdType.String:
394 1
            return struct.pack("<BH", self.NodeIdType.value, self.NamespaceIndex) + \
395
                uabin.Primitives.String.pack(self.Identifier)
396 1
        elif self.NodeIdType == NodeIdType.ByteString:
397 1
            return struct.pack("<BH", self.NodeIdType.value, self.NamespaceIndex) + \
398
                uabin.Primitives.Bytes.pack(self.Identifier)
399 1
        elif self.NodeIdType == NodeIdType.Guid:
400 1
            return struct.pack("<BH", self.NodeIdType.value, self.NamespaceIndex) + \
401
                   uabin.Primitives.Guid.pack(self.Identifier)
402
        else:
403
            return struct.pack("<BH", self.NodeIdType.value, self.NamespaceIndex) + \
404
                self.Identifier.to_binary()
405
        # FIXME: Missing NNamespaceURI and ServerIndex
406
407 1
    @staticmethod
408
    def from_binary(data):
409 1
        nid = NodeId()
410 1
        encoding = ord(data.read(1))
411 1
        nid.NodeIdType = NodeIdType(encoding & 0b00111111)
412
413 1
        if nid.NodeIdType == NodeIdType.TwoByte:
414 1
            nid.Identifier = ord(data.read(1))
415 1
        elif nid.NodeIdType == NodeIdType.FourByte:
416 1
            nid.NamespaceIndex, nid.Identifier = struct.unpack("<BH", data.read(3))
417 1
        elif nid.NodeIdType == NodeIdType.Numeric:
418 1
            nid.NamespaceIndex, nid.Identifier = struct.unpack("<HI", data.read(6))
419 1
        elif nid.NodeIdType == NodeIdType.String:
420 1
            nid.NamespaceIndex = uabin.Primitives.UInt16.unpack(data)
421 1
            nid.Identifier = uabin.Primitives.String.unpack(data)
422 1
        elif nid.NodeIdType == NodeIdType.ByteString:
423 1
            nid.NamespaceIndex = uabin.Primitives.UInt16.unpack(data)
424 1
            nid.Identifier = uabin.Primitives.Bytes.unpack(data)
425 1
        elif nid.NodeIdType == NodeIdType.Guid:
426 1
            nid.NamespaceIndex = uabin.Primitives.UInt16.unpack(data)
427 1
            nid.Identifier = uabin.Primitives.Guid.unpack(data)
428
        else:
429
            raise UaError("Unknown NodeId encoding: " + str(nid.NodeIdType))
430
431 1
        if uabin.test_bit(encoding, 7):
432
            nid.NamespaceUri = uabin.Primitives.String.unpack(data)
433 1
        if uabin.test_bit(encoding, 6):
434
            nid.ServerIndex = uabin.Primitives.UInt32.unpack(data)
435
436 1
        return nid
437
438
439 1
class TwoByteNodeId(NodeId):
440
441 1
    def __init__(self, identifier):
442 1
        NodeId.__init__(self, identifier, 0, NodeIdType.TwoByte)
443
444
445 1
class FourByteNodeId(NodeId):
446
447 1
    def __init__(self, identifier, namespace=0):
448 1
        NodeId.__init__(self, identifier, namespace, NodeIdType.FourByte)
449
450
451 1
class NumericNodeId(NodeId):
452
453 1
    def __init__(self, identifier, namespace=0):
454 1
        NodeId.__init__(self, identifier, namespace, NodeIdType.Numeric)
455
456
457 1
class ByteStringNodeId(NodeId):
458
459 1
    def __init__(self, identifier, namespace=0):
460 1
        NodeId.__init__(self, identifier, namespace, NodeIdType.ByteString)
461
462
463 1
class GuidNodeId(NodeId):
464
465 1
    def __init__(self, identifier, namespace=0):
466 1
        NodeId.__init__(self, identifier, namespace, NodeIdType.Guid)
467
468
469 1
class StringNodeId(NodeId):
470
471 1
    def __init__(self, identifier, namespace=0):
472 1
        NodeId.__init__(self, identifier, namespace, NodeIdType.String)
473
474
475 1
ExpandedNodeId = NodeId
476
477
478 1
class QualifiedName(FrozenClass):
479
    """
480
    A string qualified with a namespace index.
481
    """
482
483 1
    def __init__(self, name=None, namespaceidx=0):
484 1
        if not isinstance(namespaceidx, int):
485
            raise UaError("namespaceidx must be an int")
486 1
        self.NamespaceIndex = namespaceidx
487 1
        self.Name = name
488 1
        self._freeze = True
489
490 1
    def to_string(self):
491 1
        return "{0}:{1}".format(self.NamespaceIndex, self.Name)
492
493 1
    @staticmethod
494
    def from_string(string):
495 1
        if ":" in string:
496 1
            try:
497 1
                idx, name = string.split(":", 1)
498 1
                idx = int(idx)
499 1
            except (TypeError, ValueError) as ex:
500 1
                raise UaStringParsingError("Error parsing string {0}".format(string), ex)
501
        else:
502 1
            idx = 0
503 1
            name = string
504 1
        return QualifiedName(name, idx)
505
506 1
    def to_binary(self):
507 1
        packet = []
508 1
        packet.append(uabin.Primitives.UInt16.pack(self.NamespaceIndex))
509 1
        packet.append(uabin.Primitives.String.pack(self.Name))
510 1
        return b''.join(packet)
511
512 1
    @staticmethod
513
    def from_binary(data):
514 1
        obj = QualifiedName()
515 1
        obj.NamespaceIndex = uabin.Primitives.UInt16.unpack(data)
516 1
        obj.Name = uabin.Primitives.String.unpack(data)
517 1
        return obj
518
519 1
    def __eq__(self, bname):
520 1
        return isinstance(bname, QualifiedName) and self.Name == bname.Name and self.NamespaceIndex == bname.NamespaceIndex
521
522 1
    def __ne__(self, other):
523
        return not self.__eq__(other)
524
525 1
    def __lt__(self, other):
526
        if not isinstance(other, QualifiedName):
527
            raise TypeError("Cannot compare QualifiedName and {0}".format(other))
528
        if self.NamespaceIndex == other.NamespaceIndex:
529
            return self.Name < other.Name
530
        else:
531
            return self.NamespaceIndex < other.NamespaceIndex
532
533 1
    def __str__(self):
534 1
        return 'QualifiedName({0}:{1})'.format(self.NamespaceIndex, self.Name)
535
536 1
    __repr__ = __str__
537
538
539 1
class LocalizedText(FrozenClass):
540
    """
541
    A string qualified with a namespace index.
542
    """
543
544 1
    ua_types = {
545
        "Text": "ByteString",
546
        "Locale": "ByteString"
547
    }
548
549 1
    def __init__(self, text=None):
550 1
        self.Encoding = 0
551 1
        self.Text = text
552 1
        if isinstance(self.Text, unicode):
553 1
            self.Text = self.Text.encode('utf-8')
554 1
        if self.Text:
555 1
            self.Encoding |= (1 << 1)
556 1
        self.Locale = None
557 1
        self._freeze = True
558
559 1
    def to_binary(self):
560 1
        packet = []
561 1
        if self.Locale:
562
            self.Encoding |= (1 << 0)
563 1
        if self.Text:
564 1
            self.Encoding |= (1 << 1)
565 1
        packet.append(uabin.Primitives.UInt8.pack(self.Encoding))
566 1
        if self.Locale:
567
            packet.append(uabin.Primitives.Bytes.pack(self.Locale))
568 1
        if self.Text:
569 1
            packet.append(uabin.Primitives.Bytes.pack(self.Text))
570 1
        return b''.join(packet)
571
572 1
    @staticmethod
573
    def from_binary(data):
574 1
        obj = LocalizedText()
575 1
        obj.Encoding = ord(data.read(1))
576 1
        if obj.Encoding & (1 << 0):
577
            obj.Locale = uabin.Primitives.Bytes.unpack(data)
578 1
        if obj.Encoding & (1 << 1):
579 1
            obj.Text = uabin.Primitives.Bytes.unpack(data)
580 1
        return obj
581
582 1
    def to_string(self):
583
        # FIXME: use local
584 1
        if self.Text is None:
585
            return ""
586 1
        return self.Text.decode('utf-8')
587
588 1
    def __str__(self):
589
        return 'LocalizedText(' + 'Encoding:' + str(self.Encoding) + ', ' + \
590
            'Locale:' + str(self.Locale) + ', ' + \
591
            'Text:' + str(self.Text) + ')'
592 1
    __repr__ = __str__
593
594 1
    def __eq__(self, other):
595 1
        if isinstance(other, LocalizedText) and self.Locale == other.Locale and self.Text == other.Text:
596 1
            return True
597 1
        return False
598
599 1
    def __ne__(self, other):
600 1
        return not self.__eq__(other)
601
602
603 1
class ExtensionObject(FrozenClass):
604
    """
605
    Any UA object packed as an ExtensionObject
606
607
    :ivar TypeId:
608
    :vartype TypeId: NodeId
609
    :ivar Body:
610
    :vartype Body: bytes
611
    """
612
613 1
    def __init__(self):
614 1
        self.TypeId = NodeId()
615 1
        self.Encoding = 0
616 1
        self.Body = b''
617 1
        self._freeze = True
618
619 1
    def to_binary(self):
620 1
        packet = []
621 1
        if self.Body:
622 1
            self.Encoding |= (1 << 0)
623 1
        packet.append(self.TypeId.to_binary())
624 1
        packet.append(uabin.Primitives.UInt8.pack(self.Encoding))
625 1
        if self.Body:
626 1
            packet.append(uabin.Primitives.ByteString.pack(self.Body))
627 1
        return b''.join(packet)
628
629 1
    @staticmethod
630
    def from_binary(data):
631 1
        obj = ExtensionObject()
632 1
        obj.TypeId = NodeId.from_binary(data)
633 1
        obj.Encoding = uabin.Primitives.UInt8.unpack(data)
634 1
        if obj.Encoding & (1 << 0):
635
            obj.Body = uabin.Primitives.ByteString.unpack(data)
636 1
        return obj
637
638 1
    @staticmethod
639
    def from_object(obj):
640
        ext = ExtensionObject()
641
        oid = getattr(ObjectIds, "{0}_Encoding_DefaultBinary".format(obj.__class__.__name__))
642
        ext.TypeId = FourByteNodeId(oid)
643
        ext.Body = obj.to_binary()
644
        return ext
645
646 1
    def __str__(self):
647
        return 'ExtensionObject(' + 'TypeId:' + str(self.TypeId) + ', ' + \
648
            'Encoding:' + str(self.Encoding) + ', ' + str(len(self.Body)) + ' bytes)'
649
650 1
    __repr__ = __str__
651
652
653 1
class VariantType(Enum):
654
    """
655
    The possible types of a variant.
656
657
    :ivar Null:
658
    :ivar Boolean:
659
    :ivar SByte:
660
    :ivar Byte:
661
    :ivar Int16:
662
    :ivar UInt16:
663
    :ivar Int32:
664
    :ivar UInt32:
665
    :ivar Int64:
666
    :ivar UInt64:
667
    :ivar Float:
668
    :ivar Double:
669
    :ivar String:
670
    :ivar DateTime:
671
    :ivar Guid:
672
    :ivar ByteString:
673
    :ivar XmlElement:
674
    :ivar NodeId:
675
    :ivar ExpandedNodeId:
676
    :ivar StatusCode:
677
    :ivar QualifiedName:
678
    :ivar LocalizedText:
679
    :ivar ExtensionObject:
680
    :ivar DataValue:
681
    :ivar Variant:
682
    :ivar DiagnosticInfo:
683
    """
684
685 1
    Null = 0
686 1
    Boolean = 1
687 1
    SByte = 2
688 1
    Byte = 3
689 1
    Int16 = 4
690 1
    UInt16 = 5
691 1
    Int32 = 6
692 1
    UInt32 = 7
693 1
    Int64 = 8
694 1
    UInt64 = 9
695 1
    Float = 10
696 1
    Double = 11
697 1
    String = 12
698 1
    DateTime = 13
699 1
    Guid = 14
700 1
    ByteString = 15
701 1
    XmlElement = 16
702 1
    NodeId = 17
703 1
    ExpandedNodeId = 18
704 1
    StatusCode = 19
705 1
    QualifiedName = 20
706 1
    LocalizedText = 21
707 1
    ExtensionObject = 22
708 1
    DataValue = 23
709 1
    Variant = 24
710 1
    DiagnosticInfo = 25
711
712
713 1
class VariantTypeCustom(object):
714
    """
715
    Looks like sometime we get variant with other values than those
716
    defined in VariantType.
717
    FIXME: We should not need this class, as far as I iunderstand the spec
718
    variants can only be of VariantType
719
    """
720
721 1
    def __init__(self, val):
722 1
        self.name = "Custom"
723 1
        self.value = val
724 1
        if self.value > 0b00111111:
725 1
            raise UaError("Cannot create VariantType. VariantType must be {0} > x > {1}, received {2}".format(0b111111, 25, val))
726
727 1
    def __str__(self):
728
        return "VariantType.Custom:{0}".format(self.value)
729 1
    __repr__ = __str__
730
731 1
    def __eq__(self, other):
732 1
        return self.value == other.value
733
734
735 1
class Variant(FrozenClass):
736
    """
737
    Create an OPC-UA Variant object.
738
    if no argument a Null Variant is created.
739
    if not variant type is given, attemps to guess type from python type
740
    if a variant is given as value, the new objects becomes a copy of the argument
741
742
    :ivar Value:
743
    :vartype Value: Any supported type
744
    :ivar VariantType:
745
    :vartype VariantType: VariantType
746
    :ivar Dimension:
747
    :vartype Dimensions: The length of each dimensions. Usually guessed from value.
748
    :ivar is_array:
749
    :vartype is_array: If the variant is an array. Usually guessed from value.
750
    """
751
752 1
    def __init__(self, value=None, varianttype=None, dimensions=None, is_array=None):
753 1
        self.Value = value
754 1
        self.VariantType = varianttype
755 1
        self.Dimensions = dimensions
756 1
        self.is_array = is_array
757 1
        if self.is_array is None:
758 1
            if isinstance(value, (list, tuple)):
759 1
                self.is_array = True
760
            else:
761 1
                self.is_array = False
762 1
        self._freeze = True
763 1
        if isinstance(value, Variant):
764 1
            self.Value = value.Value
765 1
            self.VariantType = value.VariantType
766 1
        if self.VariantType is None:
767 1
            self.VariantType = self._guess_type(self.Value)
768 1
        if self.Value is None and not self.is_array and self.VariantType not in (
769
                VariantType.Null,
770
                VariantType.String,
771
                VariantType.DateTime):
772
            raise UaError("Non array Variant of type {0} cannot have value None".format(self.VariantType))
773 1
        if self.Dimensions is None and isinstance(self.Value, (list, tuple)):
774 1
            dims = get_shape(self.Value)
775 1
            if len(dims) > 1:
776 1
                self.Dimensions = dims
777
778 1
    def __eq__(self, other):
779 1
        if isinstance(other, Variant) and self.VariantType == other.VariantType and self.Value == other.Value:
780 1
            return True
781 1
        return False
782
783 1
    def __ne__(self, other):
784 1
        return not self.__eq__(other)
785
786 1
    def _guess_type(self, val):
787 1
        if isinstance(val, (list, tuple)):
788 1
            error_val = val
789 1
        while isinstance(val, (list, tuple)):
790 1
            if len(val) == 0:
791
                raise UaError("could not guess UA type of variable {0}".format(error_val))
792 1
            val = val[0]
793 1
        if val is None:
794 1
            return VariantType.Null
795 1
        elif isinstance(val, bool):
796 1
            return VariantType.Boolean
797 1
        elif isinstance(val, float):
798 1
            return VariantType.Double
799 1
        elif isinstance(val, int):
800 1
            return VariantType.Int64
801 1
        elif type(val) in (str, unicode):
802 1
            return VariantType.String
803 1
        elif isinstance(val, bytes):
804
            return VariantType.ByteString
805 1 View Code Duplication
        elif isinstance(val, datetime):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
806 1
            return VariantType.DateTime
807 1
        elif isinstance(val, uuid.UUID):
808 1
            return VariantType.Guid
809
        else:
810 1
            if isinstance(val, object):
811 1
                try:
812 1
                    return getattr(VariantType, val.__class__.__name__)
813 1
                except AttributeError:
814 1
                    return VariantType.ExtensionObject
815
            else:
816
                raise UaError("Could not guess UA type of {0} with type {1}, specify UA type".format(val, type(val)))
817
818 1
    def __str__(self):
819 1
        return "Variant(val:{0!s},type:{1})".format(self.Value, self.VariantType)
820 1
    __repr__ = __str__
821
822 1
    def to_binary(self):
823 1
        b = []
824 1
        encoding = self.VariantType.value & 0b111111
825 1
        if self.is_array or type(self.Value) in (list, tuple):
826 1
            self.is_array = True
827 1
            encoding = uabin.set_bit(encoding, 7)
828 1
            if self.Dimensions is not None:
829 1
                encoding = uabin.set_bit(encoding, 6)
830 1
            b.append(uabin.Primitives.UInt8.pack(encoding))
831 1
            b.append(uabin.pack_uatype_array(self.VariantType, flatten(self.Value)))
832 1
            if self.Dimensions is not None:
833 1
                b.append(uabin.pack_uatype_array(VariantType.Int32, self.Dimensions))
834
        else:
835 1
            b.append(uabin.Primitives.UInt8.pack(encoding))
836 1
            b.append(uabin.pack_uatype(self.VariantType, self.Value))
837
838 1
        return b"".join(b)
839
840 1
    @staticmethod
841
    def from_binary(data):
842 1
        dimensions = None
843 1
        array = False
844 1
        encoding = ord(data.read(1))
845 1
        int_type = encoding & 0b00111111
846 1
        vtype = datatype_to_varianttype(int_type)
847 1
        if uabin.test_bit(encoding, 7):
848 1
            value = uabin.unpack_uatype_array(vtype, data)
849 1
            array = True
850
        else:
851 1
            value = uabin.unpack_uatype(vtype, data)
852 1
        if uabin.test_bit(encoding, 6):
853 1
            dimensions = uabin.unpack_uatype_array(VariantType.Int32, data)
854 1
            value = reshape(value, dimensions)
855 1
        return Variant(value, vtype, dimensions, is_array=array)
856
857
858 1
def reshape(flat, dims):
859 1
    subdims = dims[1:]
860 1
    subsize = 1
861 1
    for i in subdims:
862 1
        if i == 0:
863 1
            i = 1
864 1
        subsize *= i
865 1
    while dims[0] * subsize > len(flat):
866 1
        flat.append([])
867 1
    if not subdims or subdims == [0]:
868 1
        return flat
869 1
    return [reshape(flat[i: i + subsize], subdims) for i in range(0, len(flat), subsize)]
870
871
872 1
def _split_list(l, n):
873
    n = max(1, n)
874
    return [l[i:i + n] for i in range(0, len(l), n)]
875
876
877 1
def flatten_and_get_shape(mylist):
878
    dims = []
879
    dims.append(len(mylist))
880
    while isinstance(mylist[0], (list, tuple)):
881
        dims.append(len(mylist[0]))
882
        mylist = [item for sublist in mylist for item in sublist]
883
        if len(mylist) == 0:
884
            break
885
    return mylist, dims
886
887
888 1
def flatten(mylist):
889 1
    if mylist is None:
890 1
        return None
891 1
    elif len(mylist) == 0:
892 1
        return mylist
893 1
    while isinstance(mylist[0], (list, tuple)):
894 1
        mylist = [item for sublist in mylist for item in sublist]
895 1
        if len(mylist) == 0:
896 1
            break
897 1
    return mylist
898
899
900 1
def get_shape(mylist):
901 1
    dims = []
902 1
    while isinstance(mylist, (list, tuple)):
903 1
        dims.append(len(mylist))
904 1
        if len(mylist) == 0:
905 1
            break
906 1
        mylist = mylist[0]
907 1
    return dims
908
909
910 1
class XmlElement(FrozenClass):
911
    """
912
    An XML element encoded as an UTF-8 string.
913
    """
914
915 1
    def __init__(self, value=None, binary=None):
916 1
        if binary is not None:
917 1
            self._binary_init(binary)
918 1
            self._freeze = True
919 1
            return
920 1
        self.Value = value
921 1
        self._freeze = True
922
923 1
    def to_binary(self):
924 1
        return uabin.Primitives.String.pack(self.Value)
925
926 1
    @staticmethod
927
    def from_binary(data):
928 1
        return XmlElement(binary=data)
929
930 1
    def _binary_init(self, data):
931 1
        self.Value = uabin.Primitives.String.unpack(data)
932
933 1
    def __str__(self):
934
        return 'XmlElement(Value:' + str(self.Value) + ')'
935
936 1
    __repr__ = __str__
937
938
939 1
class DataValue(FrozenClass):
940
    """
941
    A value with an associated timestamp, and quality.
942
    Automatically generated from xml , copied and modified here to fix errors in xml spec
943
944
    :ivar Value:
945
    :vartype Value: Variant
946
    :ivar StatusCode:
947
    :vartype StatusCode: StatusCode
948
    :ivar SourceTimestamp:
949
    :vartype SourceTimestamp: datetime
950
    :ivar SourcePicoSeconds:
951
    :vartype SourcePicoSeconds: int
952
    :ivar ServerTimestamp:
953
    :vartype ServerTimestamp: datetime
954
    :ivar ServerPicoseconds:
955
    :vartype ServerPicoseconds: int
956
    """
957
958 1
    def __init__(self, variant=None, status=None):
959 1
        self.Encoding = 0
960 1
        if not isinstance(variant, Variant):
961 1
            variant = Variant(variant)
962 1
        self.Value = variant
963 1
        if status is None:
964 1
            self.StatusCode = StatusCode()
965
        else:
966 1
            self.StatusCode = status
967 1
        self.SourceTimestamp = None  # DateTime()
968 1
        self.SourcePicoseconds = None
969 1
        self.ServerTimestamp = None  # DateTime()
970 1
        self.ServerPicoseconds = None
971 1
        self._freeze = True
972
973 1
    def to_binary(self):
974 1
        packet = []
975 1
        if self.Value:
976 1
            self.Encoding |= (1 << 0)
977 1
        if self.StatusCode:
978 1
            self.Encoding |= (1 << 1)
979 1
        if self.SourceTimestamp:
980 1
            self.Encoding |= (1 << 2)
981 1
        if self.ServerTimestamp:
982 1
            self.Encoding |= (1 << 3)
983 1
        if self.SourcePicoseconds:
984
            self.Encoding |= (1 << 4)
985 1
        if self.ServerPicoseconds:
986
            self.Encoding |= (1 << 5)
987 1
        packet.append(uabin.Primitives.UInt8.pack(self.Encoding))
988 1
        if self.Value:
989 1
            packet.append(self.Value.to_binary())
990 1
        if self.StatusCode:
991 1
            packet.append(self.StatusCode.to_binary())
992 1
        if self.SourceTimestamp:
993 1
            packet.append(uabin.Primitives.DateTime.pack(self.SourceTimestamp))  # self.SourceTimestamp.to_binary())
994 1
        if self.ServerTimestamp:
995 1
            packet.append(uabin.Primitives.DateTime.pack(self.ServerTimestamp))  # self.ServerTimestamp.to_binary())
996 1
        if self.SourcePicoseconds:
997
            packet.append(uabin.Primitives.UInt16.pack(self.SourcePicoseconds))
998 1
        if self.ServerPicoseconds:
999
            packet.append(uabin.Primitives.UInt16.pack(self.ServerPicoseconds))
1000 1
        return b''.join(packet)
1001
1002 1
    @staticmethod
1003
    def from_binary(data):
1004 1
        encoding = ord(data.read(1))
1005 1
        if encoding & (1 << 0):
1006 1
            value = Variant.from_binary(data)
1007
        else:
1008
            value = None
1009 1
        if encoding & (1 << 1):
1010 1
            status = StatusCode.from_binary(data)
1011
        else:
1012
            status = None
1013 1
        obj = DataValue(value, status)
1014 1
        obj.Encoding = encoding
1015 1
        if obj.Encoding & (1 << 2):
1016 1
            obj.SourceTimestamp = uabin.Primitives.DateTime.unpack(data)  # DateTime.from_binary(data)
1017 1
        if obj.Encoding & (1 << 3):
1018 1
            obj.ServerTimestamp = uabin.Primitives.DateTime.unpack(data)  # DateTime.from_binary(data)
1019 1
        if obj.Encoding & (1 << 4):
1020
            obj.SourcePicoseconds = uabin.Primitives.UInt16.unpack(data)
1021 1
        if obj.Encoding & (1 << 5):
1022
            obj.ServerPicoseconds = uabin.Primitives.UInt16.unpack(data)
1023 1
        return obj
1024
1025 1
    def __str__(self):
1026
        s = 'DataValue(Value:{0}'.format(self.Value)
1027
        if self.StatusCode is not None:
1028
            s += ', StatusCode:{0}'.format(self.StatusCode)
1029
        if self.SourceTimestamp is not None:
1030
            s += ', SourceTimestamp:{0}'.format(self.SourceTimestamp)
1031
        if self.ServerTimestamp is not None:
1032
            s += ', ServerTimestamp:{0}'.format(self.ServerTimestamp)
1033
        if self.SourcePicoseconds is not None:
1034
            s += ', SourcePicoseconds:{0}'.format(self.SourcePicoseconds)
1035
        if self.ServerPicoseconds is not None:
1036
            s += ', ServerPicoseconds:{0}'.format(self.ServerPicoseconds)
1037
        s += ')'
1038
        return s
1039
1040 1
    __repr__ = __str__
1041
1042
1043 1
def datatype_to_varianttype(int_type):
1044
    """
1045
    Takes a NodeId or int and return a VariantType
1046
    This is only supported if int_type < 63 due to VariantType encoding
1047
    At low level we do not have access to address space thus decoding is limited
1048
    a better version of this method can be find in ua_utils.py
1049
    """
1050 1
    if isinstance(int_type, NodeId):
1051
        int_type = int_type.Identifier
1052
1053 1
    if int_type <= 25:
1054 1
        return VariantType(int_type)
1055
    else:
1056 1
        return VariantTypeCustom(int_type)
1057
1058
1059 1
def get_default_value(vtype):
1060
    """
1061
    Given a variant type return default value for this type
1062
    """
1063 1
    if vtype == VariantType.Null:
1064
        return None
1065 1
    elif vtype == VariantType.Boolean:
1066 1
        return False
1067 1
    elif vtype in (VariantType.SByte, VariantType.Byte):
1068
        return 0
1069 1
    elif vtype == VariantType.ByteString:
1070
        return b""
1071 1
    elif 4 <= vtype.value <= 9:
1072 1
        return 0
1073 1
    elif vtype in (VariantType.Float, VariantType.Double):
1074 1
        return 0.0
1075 1
    elif vtype == VariantType.String:
1076 1
        return None  # a string can be null
1077
    elif vtype == VariantType.DateTime:
1078
        return datetime.utcnow()
1079
    elif vtype == VariantType.Guid:
1080
        return uuid.uuid4()
1081
    elif vtype == VariantType.XmlElement:
1082
        return None  #Not sure this is correct
1083
    elif vtype == VariantType.NodeId:
1084
        return NodeId()
1085
    elif vtype == VariantType.ExpandedNodeId:
1086
        return NodeId()
1087
    elif vtype == VariantType.StatusCode:
1088
        return StatusCode()
1089
    elif vtype == VariantType.QualifiedName:
1090
        return QualifiedName()
1091
    elif vtype == VariantType.LocalizedText:
1092
        return LocalizedText()
1093
    elif vtype == VariantType.ExtensionObject:
1094
        return ExtensionObject()
1095
    elif vtype == VariantType.DataValue:
1096
        return DataValue()
1097
    elif vtype == VariantType.Variant:
1098
        return Variant()
1099
    else:
1100
        raise RuntimeError("function take a uatype as argument, got:", vtype)
1101
1102
1103
# These dictionnaries are used to register extensions classes for automatic
1104
# decoding and encoding
1105 1
extension_object_classes = {}
1106 1
extension_object_ids = {}
1107
1108
1109 1
def extensionobject_from_binary(data):
1110
    """
1111
    Convert binary-coded ExtensionObject to a Python object.
1112
    Returns an object, or None if TypeId is zero
1113
    """
1114 1
    TypeId = NodeId.from_binary(data)
1115 1
    Encoding = ord(data.read(1))
1116 1
    body = None
1117 1
    if Encoding & (1 << 0):
1118 1
        length = uabin.Primitives.Int32.unpack(data)
1119 1
        if length < 1:
1120
            body = Buffer(b"")
1121
        else:
1122 1
            body = data.copy(length)
1123 1
            data.skip(length)
1124 1
    if TypeId.Identifier == 0:
1125 1
        return None
1126 1
    elif TypeId in extension_object_classes:
1127 1
        klass = extension_object_classes[TypeId]
1128 1
        if body is None:
1129
            raise UaError("parsing ExtensionObject {0} without data".format(klass.__name__))
1130 1
        return klass.from_binary(body)
1131
    else:
1132 1
        e = ExtensionObject()
1133 1
        e.TypeId = TypeId
1134 1
        e.Encoding = Encoding
1135 1
        if body is not None:
1136 1
            e.Body = body.read(len(body))
1137 1
        return e
1138
1139
1140 1
def extensionobject_to_binary(obj):
1141
    """
1142
    Convert Python object to binary-coded ExtensionObject.
1143
    If obj is None, convert to empty ExtensionObject (TypeId = 0, no Body).
1144
    Returns a binary string
1145
    """
1146 1
    if isinstance(obj, ExtensionObject):
1147 1
        return obj.to_binary()
1148 1
    if obj is None:
1149 1
        TypeId = NodeId()
1150 1
        Encoding = 0
1151 1
        Body = None
1152
    else:
1153 1
        TypeId = extension_object_ids[obj.__class__.__name__]
1154 1
        Encoding = 0x01
1155 1
        Body = obj.to_binary()
1156 1
    packet = []
1157 1
    packet.append(TypeId.to_binary())
1158 1
    packet.append(uabin.Primitives.UInt8.pack(Encoding))
1159 1
    if Body:
1160 1
        packet.append(uabin.Primitives.Bytes.pack(Body))
1161
    return b''.join(packet)
1162