Test Failed
Pull Request — master (#494)
by Olivier
03:40
created

StatusCode.check()   A

Complexity

Conditions 2

Size

Total Lines 8

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 2

Importance

Changes 0
Metric Value
cc 2
dl 0
loc 8
ccs 3
cts 3
cp 1
crap 2
rs 9.4285
c 0
b 0
f 0
1
"""
2
implement ua datatypes
3
"""
4
5 1
import logging
6 1
import struct
7 1
from enum import Enum, IntEnum
8 1
from datetime import datetime
9 1
import sys
10 1
import os
11 1
import uuid
12 1
import re
13 1
import itertools
14
15 1
from opcua.ua import ua_binary as uabin
16 1
from opcua.ua import status_codes
17 1
from opcua.ua import ObjectIds
18 1
from opcua.ua.uaerrors import UaError
19 1
from opcua.ua.uaerrors import UaStatusCodeError
20 1
from opcua.ua.uaerrors import UaStringParsingError
21 1
from opcua.common.utils import Buffer
22
23
24 1
logger = logging.getLogger(__name__)
25
26
27 1
if sys.version_info.major > 2:
28
    unicode = str
29 1
def get_win_epoch():
30 1
    return uabin.win_epoch_to_datetime(0)
31
32
33 1
class _FrozenClass(object):
34
35
    """
36
    Make it impossible to add members to a class.
37
    Not pythonic at all but we found out it prevents many many
38
    bugs in use of protocol structures
39
    """
40 1
    _freeze = False
41
42 1
    def __setattr__(self, key, value):
43 1
        if self._freeze and not hasattr(self, key):
44
            raise TypeError("Error adding member '{0}' to class '{1}', class is frozen, members are {2}".format(
45
                key, self.__class__.__name__, self.__dict__.keys()))
46 1
        object.__setattr__(self, key, value)
47
48
49 1
if "PYOPCUA_NO_TYPO_CHECK" in os.environ:
50
    # typo check is cpu consuming, but it will make debug easy.
51
    # if typo check is not need (in production), please set env PYOPCUA_NO_TYPO_CHECK.
52
    # this will make all uatype class inherit from object intead of _FrozenClass
53
    # and skip the typo check.
54
    FrozenClass = object
55
else:
56 1
    FrozenClass = _FrozenClass
57
58
59 1
class ValueRank(IntEnum):
60
    """
61
    Defines dimensions of a variable.
62
    This enum does not support all cases since ValueRank support any n>0
63
    but since it is an IntEnum it can be replace by a normal int
64
    """
65 1
    ScalarOrOneDimension = -3
66 1
    Any = -2
67 1
    Scalar = -1
68 1
    OneOrMoreDimensions = 0
69 1
    OneDimension = 1
70
    # the next names are not in spec but so common we express them here
71 1
    TwoDimensions = 2
72 1
    ThreeDimensions = 3
73 1
    FourDimensions = 4
74
75
76 1
class _MaskEnum(IntEnum):
77
78 1
    @classmethod
79
    def parse_bitfield(cls, the_int):
80
        """ Take an integer and interpret it as a set of enum values. """
81 1
        assert isinstance(the_int, int)
82
83 1
        return {cls(b) for b in cls._bits(the_int)}
84
85 1
    @classmethod
86
    def to_bitfield(cls, collection):
87
        """ Takes some enum values and creates an integer from them. """
88
        # make sure all elements are of the correct type (use itertools.tee in case we get passed an
89
        # iterator)
90 1
        iter1, iter2 = itertools.tee(iter(collection))
91 1
        assert all(isinstance(x, cls) for x in iter1)
92
93 1
        return sum(x.mask for x in iter2)
94
95 1
    @property
96
    def mask(self):
97 1
        return 1 << self.value
98
99 1
    @staticmethod
100
    def _bits(n):
101
        """ Iterate over the bits in n.
102
103
            e.g. bits(44) yields at 2, 3, 5
104
        """
105 1
        assert n >= 0  # avoid infinite recursion
106
107 1
        pos = 0
108 1
        while n:
109 1
            if n & 0x1:
110 1
                yield pos
111 1
            n = n // 2
112 1
            pos += 1
113
114
115 1
class AccessLevel(_MaskEnum):
116
    """
117
    Bit index to indicate what the access level is.
118
119
    Spec Part 3, appears multiple times, e.g. paragraph 5.6.2 Variable NodeClass
120
    """
121 1
    CurrentRead = 0
122 1
    CurrentWrite = 1
123 1
    HistoryRead = 2
124 1
    HistoryWrite = 3
125 1
    SemanticChange = 4
126 1
    StatusWrite = 5
127 1
    TimestampWrite = 6
128
129
130 1
class WriteMask(_MaskEnum):
131
    """
132
    Bit index to indicate which attribute of a node is writable
133
134
    Spec Part 3, Paragraph 5.2.7 WriteMask
135
    """
136 1
    AccessLevel = 0
137 1
    ArrayDimensions = 1
138 1
    BrowseName = 2
139 1
    ContainsNoLoops = 3
140 1
    DataType = 4
141 1
    Description = 5
142 1
    DisplayName = 6
143 1
    EventNotifier = 7
144 1
    Executable = 8
145 1
    Historizing = 9
146 1
    InverseName = 10
147 1
    IsAbstract = 11
148 1
    MinimumSamplingInterval = 12
149 1
    NodeClass = 13
150 1
    NodeId = 14
151 1
    Symmetric = 15
152 1
    UserAccessLevel = 16
153 1
    UserExecutable = 17
154 1
    UserWriteMask = 18
155 1
    ValueRank = 19
156 1
    WriteMask = 20
157 1
    ValueForVariableType = 21
158
159
160 1
class EventNotifier(_MaskEnum):
161
    """
162
    Bit index to indicate how a node can be used for events.
163
164
    Spec Part 3, appears multiple times, e.g. Paragraph 5.4 View NodeClass
165
    """
166 1
    SubscribeToEvents = 0
167
    # Reserved        = 1
168 1
    HistoryRead = 2
169 1
    HistoryWrite = 3
170
171
172 1
class StatusCode(FrozenClass):
173
    """
174
    :ivar value:
175
    :vartype value: int
176
    :ivar name:
177
    :vartype name: string
178
    :ivar doc:
179
    :vartype doc: string
180
    """
181
182 1
    def __init__(self, value=0):
183 1
        if isinstance(value, str):
184 1
            self.name = value
185 1
            self.value = getattr(status_codes.StatusCodes, value)
186
        else:
187 1
            self.value = value
188 1
            self.name, self.doc = status_codes.get_name_and_doc(value)
189 1
        self._freeze = True
190
191 1
    def to_binary(self):
192 1
        return uabin.Primitives.UInt32.pack(self.value)
193
194 1
    @staticmethod
195
    def from_binary(data):
196 1
        val = uabin.Primitives.UInt32.unpack(data)
197 1
        sc = StatusCode(val)
198 1
        return sc
199
200 1
    def check(self):
201
        """
202
        Raises an exception if the status code is anything else than 0 (good).
203
204
        Use the is_good() method if you do not want an exception.
205
        """
206 1
        if not self.is_good():
207 1
            raise UaStatusCodeError(self.value)
208
209 1
    def is_good(self):
210
        """
211
        return True if status is Good.
212
        """
213 1
        mask = 3 << 30
214 1
        if mask & self.value:
215 1
            return False
216
        else:
217 1
            return True
218
219 1
    def __str__(self):
220 1
        return 'StatusCode({0})'.format(self.name)
221 1
    __repr__ = __str__
222
223 1
    def __eq__(self, other):
224 1
        return self.value == other.value
225
226 1
    def __ne__(self, other):
227
        return not self.__eq__(other)
228
229
230 1
class NodeIdType(IntEnum):
231 1
    TwoByte = 0
232 1
    FourByte = 1
233 1
    Numeric = 2
234 1
    String = 3
235 1
    Guid = 4
236 1
    ByteString = 5
237
238
239 1
class NodeId(FrozenClass):
240
    """
241
    NodeId Object
242
243
    Args:
244
        identifier: The identifier might be an int, a string, bytes or a Guid
245
        namespaceidx(int): The index of the namespace
246
        nodeidtype(NodeIdType): The type of the nodeid if it cannor be guess or you want something special like twobyte nodeid or fourbytenodeid
247
248
249
    :ivar Identifier:
250
    :vartype Identifier: NodeId
251
    :ivar NamespaceIndex:
252
    :vartype NamespaceIndex: Int
253
    :ivar NamespaceUri:
254
    :vartype NamespaceUri: String
255
    :ivar ServerIndex:
256
    :vartype ServerIndex: Int
257
    """
258
    
259 1
    def __init__(self, identifier=None, namespaceidx=0, nodeidtype=None):
260
261 1
        self.Identifier = identifier
262 1
        self.NamespaceIndex = namespaceidx
263 1
        self.NodeIdType = nodeidtype
264 1
        self.NamespaceUri = ""
265 1
        self.ServerIndex = 0
266 1
        self._freeze = True
267 1
        if not isinstance(self.NamespaceIndex, int):
268 1
            raise UaError("NamespaceIndex must be an int")
269 1
        if self.Identifier is None:
270 1
            self.Identifier = 0
271 1
            self.NodeIdType = NodeIdType.TwoByte
272 1
            return
273 1
        if self.NodeIdType is None:
274 1
            if isinstance(self.Identifier, int):
275 1
                self.NodeIdType = NodeIdType.Numeric
276 1
            elif isinstance(self.Identifier, str):
277 1
                self.NodeIdType = NodeIdType.String
278 1
            elif isinstance(self.Identifier, bytes):
279
                self.NodeIdType = NodeIdType.ByteString
280 1
            elif isinstance(self.Identifier, uuid.UUID):
281 1
                self.NodeIdType = NodeIdType.Guid
282
            else:
283
                raise UaError("NodeId: Could not guess type of NodeId, set NodeIdType")
284
285 1
    def _key(self):
286 1
        if self.NodeIdType in (NodeIdType.TwoByte, NodeIdType.FourByte, NodeIdType.Numeric):
287
            # twobyte, fourbyte and numeric may represent the same node
288 1
            return (NodeIdType.Numeric, self.NamespaceIndex, self.Identifier)
289 1
        return (self.NodeIdType, self.NamespaceIndex, self.Identifier)
290
291 1
    def __eq__(self, node):
292 1
        return isinstance(node, NodeId) and self._key() == node._key()
293
294 1
    def __ne__(self, other):
295 1
        return not self.__eq__(other)
296
297 1
    def __hash__(self):
298 1
        return hash(self._key())
299
300 1
    def __lt__(self, other):
301 1
        if not isinstance(other, NodeId):
302
            raise AttributeError("Can only compare to NodeId")
303 1
        return self._key() < other._key()
304
305 1
    def is_null(self):
306 1
        if self.NamespaceIndex != 0:
307 1
            return False
308 1
        return self.has_null_identifier()
309
310 1
    def has_null_identifier(self):
311 1
        if not self.Identifier:
312 1
            return True
313 1
        if self.NodeIdType == NodeIdType.Guid and re.match(b'0.', self.Identifier):
314 1
            return True
315 1
        return False
316
317 1
    @staticmethod
318
    def from_string(string):
319 1
        try:
320 1
            return NodeId._from_string(string)
321 1
        except ValueError as ex:
322 1
            raise UaStringParsingError("Error parsing string {0}".format(string), ex)
323
324 1
    @staticmethod
325
    def _from_string(string):
326 1
        l = string.split(";")
327 1
        identifier = None
328 1
        namespace = 0
329 1
        ntype = None
330 1
        srv = None
331 1
        nsu = None
332 1
        for el in l:
333 1
            if not el:
334 1
                continue
335 1
            k, v = el.split("=", 1)
336 1
            k = k.strip()
337 1
            v = v.strip()
338 1
            if k == "ns":
339 1
                namespace = int(v)
340 1
            elif k == "i":
341 1
                ntype = NodeIdType.Numeric
342 1
                identifier = int(v)
343 1
            elif k == "s":
344 1
                ntype = NodeIdType.String
345 1
                identifier = v
346 1
            elif k == "g":
347
                ntype = NodeIdType.Guid
348
                identifier = v
349 1
            elif k == "b":
350
                ntype = NodeIdType.ByteString
351
                identifier = v
352 1
            elif k == "srv":
353 1
                srv = v
354
            elif k == "nsu":
355
                nsu = v
356 1
        if identifier is None:
357 1
            raise UaStringParsingError("Could not find identifier in string: " + string)
358 1
        nodeid = NodeId(identifier, namespace, ntype)
359 1
        nodeid.NamespaceUri = nsu
360 1
        nodeid.ServerIndex = srv
361 1
        return nodeid
362
363 1
    def to_string(self):
364 1
        string = ""
365 1
        if self.NamespaceIndex != 0:
366 1
            string += "ns={0};".format(self.NamespaceIndex)
367 1
        ntype = None
368 1
        if self.NodeIdType == NodeIdType.Numeric:
369 1
            ntype = "i"
370 1
        elif self.NodeIdType == NodeIdType.String:
371 1
            ntype = "s"
372 1
        elif self.NodeIdType == NodeIdType.TwoByte:
373 1
            ntype = "i"
374 1
        elif self.NodeIdType == NodeIdType.FourByte:
375 1
            ntype = "i"
376
        elif self.NodeIdType == NodeIdType.Guid:
377
            ntype = "g"
378
        elif self.NodeIdType == NodeIdType.ByteString:
379
            ntype = "b"
380 1
        string += "{0}={1}".format(ntype, self.Identifier)
381 1
        if self.ServerIndex:
382
            string = "srv=" + str(self.ServerIndex) + string
383 1
        if self.NamespaceUri:
384
            string += "nsu={0}".format(self.NamespaceUri)
385 1
        return string
386
387 1
    def __str__(self):
388 1
        return "{0}NodeId({1})".format(self.NodeIdType.name, self.to_string())
389 1
    __repr__ = __str__
390
391 1
    def to_binary(self):
392 1
        if self.NodeIdType == NodeIdType.TwoByte:
393 1
            return struct.pack("<BB", self.NodeIdType.value, self.Identifier)
394 1
        elif self.NodeIdType == NodeIdType.FourByte:
395 1
            return struct.pack("<BBH", self.NodeIdType.value, self.NamespaceIndex, self.Identifier)
396 1
        elif self.NodeIdType == NodeIdType.Numeric:
397 1
            return struct.pack("<BHI", self.NodeIdType.value, self.NamespaceIndex, self.Identifier)
398 1
        elif self.NodeIdType == NodeIdType.String:
399 1
            return struct.pack("<BH", self.NodeIdType.value, self.NamespaceIndex) + \
400
                uabin.Primitives.String.pack(self.Identifier)
401 1
        elif self.NodeIdType == NodeIdType.ByteString:
402 1
            return struct.pack("<BH", self.NodeIdType.value, self.NamespaceIndex) + \
403
                uabin.Primitives.Bytes.pack(self.Identifier)
404 1
        elif self.NodeIdType == NodeIdType.Guid:
405 1
            return struct.pack("<BH", self.NodeIdType.value, self.NamespaceIndex) + \
406
                   uabin.Primitives.Guid.pack(self.Identifier)
407
        else:
408
            return struct.pack("<BH", self.NodeIdType.value, self.NamespaceIndex) + \
409
                self.Identifier.to_binary()
410
        # FIXME: Missing NNamespaceURI and ServerIndex
411
412 1
    @staticmethod
413
    def from_binary(data):
414 1
        nid = NodeId()
415 1
        encoding = ord(data.read(1))
416 1
        nid.NodeIdType = NodeIdType(encoding & 0b00111111)
417
418 1
        if nid.NodeIdType == NodeIdType.TwoByte:
419 1
            nid.Identifier = ord(data.read(1))
420 1
        elif nid.NodeIdType == NodeIdType.FourByte:
421 1
            nid.NamespaceIndex, nid.Identifier = struct.unpack("<BH", data.read(3))
422 1
        elif nid.NodeIdType == NodeIdType.Numeric:
423 1
            nid.NamespaceIndex, nid.Identifier = struct.unpack("<HI", data.read(6))
424 1
        elif nid.NodeIdType == NodeIdType.String:
425 1
            nid.NamespaceIndex = uabin.Primitives.UInt16.unpack(data)
426 1
            nid.Identifier = uabin.Primitives.String.unpack(data)
427 1
        elif nid.NodeIdType == NodeIdType.ByteString:
428 1
            nid.NamespaceIndex = uabin.Primitives.UInt16.unpack(data)
429 1
            nid.Identifier = uabin.Primitives.Bytes.unpack(data)
430 1
        elif nid.NodeIdType == NodeIdType.Guid:
431 1
            nid.NamespaceIndex = uabin.Primitives.UInt16.unpack(data)
432 1
            nid.Identifier = uabin.Primitives.Guid.unpack(data)
433
        else:
434
            raise UaError("Unknown NodeId encoding: " + str(nid.NodeIdType))
435
436 1
        if uabin.test_bit(encoding, 7):
437
            nid.NamespaceUri = uabin.Primitives.String.unpack(data)
438 1
        if uabin.test_bit(encoding, 6):
439
            nid.ServerIndex = uabin.Primitives.UInt32.unpack(data)
440
441 1
        return nid
442
443
444 1
class TwoByteNodeId(NodeId):
445
446 1
    def __init__(self, identifier):
447 1
        NodeId.__init__(self, identifier, 0, NodeIdType.TwoByte)
448
449
450 1
class FourByteNodeId(NodeId):
451
452 1
    def __init__(self, identifier, namespace=0):
453 1
        NodeId.__init__(self, identifier, namespace, NodeIdType.FourByte)
454
455
456 1
class NumericNodeId(NodeId):
457
458 1
    def __init__(self, identifier, namespace=0):
459 1
        NodeId.__init__(self, identifier, namespace, NodeIdType.Numeric)
460
461
462 1
class ByteStringNodeId(NodeId):
463
464 1
    def __init__(self, identifier, namespace=0):
465 1
        NodeId.__init__(self, identifier, namespace, NodeIdType.ByteString)
466
467
468 1
class GuidNodeId(NodeId):
469
470 1
    def __init__(self, identifier, namespace=0):
471 1
        NodeId.__init__(self, identifier, namespace, NodeIdType.Guid)
472
473
474 1
class StringNodeId(NodeId):
475
476 1
    def __init__(self, identifier, namespace=0):
477 1
        NodeId.__init__(self, identifier, namespace, NodeIdType.String)
478
479
480 1
ExpandedNodeId = NodeId
481
482
483 1
class QualifiedName(FrozenClass):
484
    """
485
    A string qualified with a namespace index.
486
    """
487
488 1
    def __init__(self, name=None, namespaceidx=0):
489 1
        if not isinstance(namespaceidx, int):
490
            raise UaError("namespaceidx must be an int")
491 1
        self.NamespaceIndex = namespaceidx
492 1
        self.Name = name
493 1
        self._freeze = True
494
495 1
    def to_string(self):
496 1
        return "{0}:{1}".format(self.NamespaceIndex, self.Name)
497
498 1
    @staticmethod
499
    def from_string(string):
500 1
        if ":" in string:
501 1
            try:
502 1
                idx, name = string.split(":", 1)
503 1
                idx = int(idx)
504 1
            except (TypeError, ValueError) as ex:
505 1
                raise UaStringParsingError("Error parsing string {0}".format(string), ex)
506
        else:
507 1
            idx = 0
508 1
            name = string
509 1
        return QualifiedName(name, idx)
510
511 1
    def to_binary(self):
512 1
        packet = []
513 1
        packet.append(uabin.Primitives.UInt16.pack(self.NamespaceIndex))
514 1
        packet.append(uabin.Primitives.String.pack(self.Name))
515 1
        return b''.join(packet)
516
517 1
    @staticmethod
518
    def from_binary(data):
519 1
        obj = QualifiedName()
520 1
        obj.NamespaceIndex = uabin.Primitives.UInt16.unpack(data)
521 1
        obj.Name = uabin.Primitives.String.unpack(data)
522 1
        return obj
523
524 1
    def __eq__(self, bname):
525 1
        return isinstance(bname, QualifiedName) and self.Name == bname.Name and self.NamespaceIndex == bname.NamespaceIndex
526
527 1
    def __ne__(self, other):
528
        return not self.__eq__(other)
529
530 1
    def __lt__(self, other):
531
        if not isinstance(other, QualifiedName):
532
            raise TypeError("Cannot compare QualifiedName and {0}".format(other))
533
        if self.NamespaceIndex == other.NamespaceIndex:
534
            return self.Name < other.Name
535
        else:
536
            return self.NamespaceIndex < other.NamespaceIndex
537
538 1
    def __str__(self):
539 1
        return 'QualifiedName({0}:{1})'.format(self.NamespaceIndex, self.Name)
540
541 1
    __repr__ = __str__
542
543
544 1
class LocalizedText(FrozenClass):
545
    """
546
    A string qualified with a namespace index.
547
    """
548
549 1
    ua_types = {
550
        "Text": "String",
551
        "Locale": "String"
552
    }
553
554 1
    def __init__(self, text=None):
555 1
        self.Encoding = 0
556 1
        if text is not None and not isinstance(text, str):
557
            raise ValueError("A LocalizedText object takes a string as argument, not a {}, {}".format(text, type(text)))
558 1
        self.Text = text
559 1
        if self.Text:
560 1
            self.Encoding |= (1 << 1)
561 1
        self.Locale = None
562 1
        self._freeze = True
563
564 1
    def to_binary(self):
565 1
        packet = []
566 1
        if self.Locale:
567
            self.Encoding |= (1 << 0)
568 1
        if self.Text:
569 1
            self.Encoding |= (1 << 1)
570 1
        packet.append(uabin.Primitives.UInt8.pack(self.Encoding))
571 1
        if self.Locale:
572
            packet.append(uabin.Primitives.String.pack(self.Locale))
573 1
        if self.Text:
574 1
            packet.append(uabin.Primitives.String.pack(self.Text))
575 1
        return b''.join(packet)
576
577 1
    @staticmethod
578
    def from_binary(data):
579 1
        obj = LocalizedText()
580 1
        obj.Encoding = ord(data.read(1))
581 1
        if obj.Encoding & (1 << 0):
582
            obj.Locale = uabin.Primitives.String.unpack(data)
583 1
        if obj.Encoding & (1 << 1):
584 1
            obj.Text = uabin.Primitives.String.unpack(data)
585 1
        return obj
586
587 1
    def to_string(self):
588
        # FIXME: use local
589 1
        if self.Text is None:
590
            return ""
591 1
        return self.Text
592
593 1
    def __str__(self):
594 1
        return 'LocalizedText(' + 'Encoding:' + str(self.Encoding) + ', ' + \
595
            'Locale:' + str(self.Locale) + ', ' + \
596
            'Text:' + str(self.Text) +')'
597 1
    __repr__ = __str__
598
599 1
    def __eq__(self, other):
600 1
        if isinstance(other, LocalizedText) and self.Locale == other.Locale and self.Text == other.Text:
601 1
            return True
602 1
        return False
603
604 1
    def __ne__(self, other):
605 1
        return not self.__eq__(other)
606
607
608 1
class ExtensionObject(FrozenClass):
609
    """
610
    Any UA object packed as an ExtensionObject
611
612
    :ivar TypeId:
613
    :vartype TypeId: NodeId
614
    :ivar Body:
615
    :vartype Body: bytes
616
    """
617
618 1
    ua_types = {
619
        "TypeId": "NodeId",
620
        "Encoding": "Byte",
621
        "Body": "ByteString"
622
    }
623
624 1
    def __init__(self):
625 1
        self.TypeId = NodeId()
626 1
        self.Encoding = 0
627 1
        self.Body = b''
628 1
        self._freeze = True
629
630 1
    def to_binary(self):
631 1
        packet = []
632 1
        if self.Body:
633 1
            self.Encoding = 0x01
634 1
        packet.append(self.TypeId.to_binary())
635 1
        packet.append(uabin.Primitives.UInt8.pack(self.Encoding))
636 1
        if self.Body:
637 1
            packet.append(uabin.Primitives.ByteString.pack(self.Body))
638 1
        return b''.join(packet)
639
640 1
    @staticmethod
641
    def from_binary(data):
642 1
        obj = ExtensionObject()
643 1
        obj.TypeId = NodeId.from_binary(data)
644 1
        obj.Encoding = uabin.Primitives.UInt8.unpack(data)
645 1
        if obj.Encoding & (1 << 0):
646
            obj.Body = uabin.Primitives.ByteString.unpack(data)
647 1
        return obj
648
649 1
    @staticmethod
650
    def from_object(obj):
651
        ext = ExtensionObject()
652
        oid = getattr(ObjectIds, "{0}_Encoding_DefaultBinary".format(obj.__class__.__name__))
653
        ext.TypeId = FourByteNodeId(oid)
654
        ext.Body = obj.to_binary()
655
        return ext
656
657 1
    def __str__(self):
658
        return 'ExtensionObject(' + 'TypeId:' + str(self.TypeId) + ', ' + \
659
            'Encoding:' + str(self.Encoding) + ', ' + str(len(self.Body)) + ' bytes)'
660
661 1
    __repr__ = __str__
662
663
664 1
class VariantType(Enum):
665
    """
666
    The possible types of a variant.
667
668
    :ivar Null:
669
    :ivar Boolean:
670
    :ivar SByte:
671
    :ivar Byte:
672
    :ivar Int16:
673
    :ivar UInt16:
674
    :ivar Int32:
675
    :ivar UInt32:
676
    :ivar Int64:
677
    :ivar UInt64:
678
    :ivar Float:
679
    :ivar Double:
680
    :ivar String:
681
    :ivar DateTime:
682
    :ivar Guid:
683
    :ivar ByteString:
684
    :ivar XmlElement:
685
    :ivar NodeId:
686
    :ivar ExpandedNodeId:
687
    :ivar StatusCode:
688
    :ivar QualifiedName:
689
    :ivar LocalizedText:
690
    :ivar ExtensionObject:
691
    :ivar DataValue:
692
    :ivar Variant:
693
    :ivar DiagnosticInfo:
694
    """
695
696 1
    Null = 0
697 1
    Boolean = 1
698 1
    SByte = 2
699 1
    Byte = 3
700 1
    Int16 = 4
701 1
    UInt16 = 5
702 1
    Int32 = 6
703 1
    UInt32 = 7
704 1
    Int64 = 8
705 1
    UInt64 = 9
706 1
    Float = 10
707 1
    Double = 11
708 1
    String = 12
709 1
    DateTime = 13
710 1
    Guid = 14
711 1
    ByteString = 15
712 1
    XmlElement = 16
713 1
    NodeId = 17
714 1
    ExpandedNodeId = 18
715 1
    StatusCode = 19
716 1
    QualifiedName = 20
717 1
    LocalizedText = 21
718 1
    ExtensionObject = 22
719 1
    DataValue = 23
720 1
    Variant = 24
721 1
    DiagnosticInfo = 25
722
723
724 1
class VariantTypeCustom(object):
725
    """
726
    Looks like sometime we get variant with other values than those
727
    defined in VariantType.
728
    FIXME: We should not need this class, as far as I iunderstand the spec
729
    variants can only be of VariantType
730
    """
731
732 1
    def __init__(self, val):
733 1
        self.name = "Custom"
734 1
        self.value = val
735 1
        if self.value > 0b00111111:
736 1
            raise UaError("Cannot create VariantType. VariantType must be {0} > x > {1}, received {2}".format(0b111111, 25, val))
737
738 1
    def __str__(self):
739
        return "VariantType.Custom:{0}".format(self.value)
740 1
    __repr__ = __str__
741
742 1
    def __eq__(self, other):
743 1
        return self.value == other.value
744
745
746 1
class Variant(FrozenClass):
747
    """
748
    Create an OPC-UA Variant object.
749
    if no argument a Null Variant is created.
750
    if not variant type is given, attemps to guess type from python type
751
    if a variant is given as value, the new objects becomes a copy of the argument
752
753
    :ivar Value:
754
    :vartype Value: Any supported type
755
    :ivar VariantType:
756
    :vartype VariantType: VariantType
757
    :ivar Dimension:
758
    :vartype Dimensions: The length of each dimensions. Usually guessed from value.
759
    :ivar is_array:
760
    :vartype is_array: If the variant is an array. Usually guessed from value.
761
    """
762
763 1
    def __init__(self, value=None, varianttype=None, dimensions=None, is_array=None):
764 1
        self.Value = value
765 1
        self.VariantType = varianttype
766 1
        self.Dimensions = dimensions
767 1
        self.is_array = is_array
768 1
        if self.is_array is None:
769 1
            if isinstance(value, (list, tuple)):
770 1
                self.is_array = True
771
            else:
772 1
                self.is_array = False
773 1
        self._freeze = True
774 1
        if isinstance(value, Variant):
775 1
            self.Value = value.Value
776 1
            self.VariantType = value.VariantType
777 1
        if self.VariantType is None:
778 1
            self.VariantType = self._guess_type(self.Value)
779 1
        if self.Value is None and not self.is_array and self.VariantType not in (
780
                VariantType.Null,
781
                VariantType.String,
782
                VariantType.DateTime):
783
            raise UaError("Non array Variant of type {0} cannot have value None".format(self.VariantType))
784 1
        if self.Dimensions is None and isinstance(self.Value, (list, tuple)):
785 1
            dims = get_shape(self.Value)
786 1
            if len(dims) > 1:
787 1
                self.Dimensions = dims
788
789 1
    def __eq__(self, other):
790 1
        if isinstance(other, Variant) and self.VariantType == other.VariantType and self.Value == other.Value:
791 1
            return True
792 1
        return False
793
794 1
    def __ne__(self, other):
795 1
        return not self.__eq__(other)
796
797 1
    def _guess_type(self, val):
798 1
        if isinstance(val, (list, tuple)):
799 1
            error_val = val
800 1
        while isinstance(val, (list, tuple)):
801 1
            if len(val) == 0:
802
                raise UaError("could not guess UA type of variable {0}".format(error_val))
803 1
            val = val[0]
804 1
        if val is None:
805 1 View Code Duplication
            return VariantType.Null
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
806 1
        elif isinstance(val, bool):
807 1
            return VariantType.Boolean
808 1
        elif isinstance(val, float):
809 1
            return VariantType.Double
810 1
        elif isinstance(val, IntEnum):
811 1
            return VariantType.Int32
812 1
        elif isinstance(val, int):
813 1
            return VariantType.Int64
814 1
        elif type(val) in (str, unicode):
815 1
            return VariantType.String
816 1
        elif isinstance(val, bytes):
817
            return VariantType.ByteString
818 1
        elif isinstance(val, datetime):
819 1
            return VariantType.DateTime
820 1
        elif isinstance(val, uuid.UUID):
821 1
            return VariantType.Guid
822
        else:
823 1
            if isinstance(val, object):
824 1
                try:
825 1
                    return getattr(VariantType, val.__class__.__name__)
826 1
                except AttributeError:
827 1
                    return VariantType.ExtensionObject
828
            else:
829
                raise UaError("Could not guess UA type of {0} with type {1}, specify UA type".format(val, type(val)))
830
831 1
    def __str__(self):
832 1
        return "Variant(val:{0!s},type:{1})".format(self.Value, self.VariantType)
833 1
    __repr__ = __str__
834
835 1
    def to_binary(self):
836 1
        b = []
837 1
        encoding = self.VariantType.value & 0b111111
838 1
        if self.is_array or isinstance(self.Value, (list, tuple)):
839 1
            self.is_array = True
840 1
            encoding = uabin.set_bit(encoding, 7)
841 1
            if self.Dimensions is not None:
842 1
                encoding = uabin.set_bit(encoding, 6)
843 1
            b.append(uabin.Primitives.UInt8.pack(encoding))
844 1
            b.append(uabin.pack_uatype_array(self.VariantType, flatten(self.Value)))
845 1
            if self.Dimensions is not None:
846 1
                b.append(uabin.pack_uatype_array(VariantType.Int32, self.Dimensions))
847
        else:
848 1
            b.append(uabin.Primitives.UInt8.pack(encoding))
849 1
            b.append(uabin.pack_uatype(self.VariantType, self.Value))
850
851 1
        return b"".join(b)
852
853 1
    @staticmethod
854
    def from_binary(data):
855 1
        dimensions = None
856 1
        array = False
857 1
        encoding = ord(data.read(1))
858 1
        int_type = encoding & 0b00111111
859 1
        vtype = datatype_to_varianttype(int_type)
860 1
        if uabin.test_bit(encoding, 7):
861 1
            value = uabin.unpack_uatype_array(vtype, data)
862 1
            array = True
863
        else:
864 1
            value = uabin.unpack_uatype(vtype, data)
865 1
        if uabin.test_bit(encoding, 6):
866 1
            dimensions = uabin.unpack_uatype_array(VariantType.Int32, data)
867 1
            value = reshape(value, dimensions)
868 1
        return Variant(value, vtype, dimensions, is_array=array)
869
870
871 1
def reshape(flat, dims):
872 1
    subdims = dims[1:]
873 1
    subsize = 1
874 1
    for i in subdims:
875 1
        if i == 0:
876 1
            i = 1
877 1
        subsize *= i
878 1
    while dims[0] * subsize > len(flat):
879 1
        flat.append([])
880 1
    if not subdims or subdims == [0]:
881 1
        return flat
882 1
    return [reshape(flat[i: i + subsize], subdims) for i in range(0, len(flat), subsize)]
883
884
885 1
def _split_list(l, n):
886
    n = max(1, n)
887
    return [l[i:i + n] for i in range(0, len(l), n)]
888
889
890 1
def flatten_and_get_shape(mylist):
891
    dims = []
892
    dims.append(len(mylist))
893
    while isinstance(mylist[0], (list, tuple)):
894
        dims.append(len(mylist[0]))
895
        mylist = [item for sublist in mylist for item in sublist]
896
        if len(mylist) == 0:
897
            break
898
    return mylist, dims
899
900
901 1
def flatten(mylist):
902 1
    if mylist is None:
903 1
        return None
904 1
    elif len(mylist) == 0:
905 1
        return mylist
906 1
    while isinstance(mylist[0], (list, tuple)):
907 1
        mylist = [item for sublist in mylist for item in sublist]
908 1
        if len(mylist) == 0:
909 1
            break
910 1
    return mylist
911
912
913 1
def get_shape(mylist):
914 1
    dims = []
915 1
    while isinstance(mylist, (list, tuple)):
916 1
        dims.append(len(mylist))
917 1
        if len(mylist) == 0:
918 1
            break
919 1
        mylist = mylist[0]
920 1
    return dims
921
922
923 1
class XmlElement(FrozenClass):
924
    """
925
    An XML element encoded as an UTF-8 string.
926
    """
927
928 1
    def __init__(self, value=None, binary=None):
929 1
        if binary is not None:
930 1
            self._binary_init(binary)
931 1
            self._freeze = True
932 1
            return
933 1
        self.Value = value
934 1
        self._freeze = True
935
936 1
    def to_binary(self):
937 1
        return uabin.Primitives.String.pack(self.Value)
938
939 1
    def __eq__(self, other):
940 1
        return isinstance(other, XmlElement) and self.Value == other.Value
941
942 1
    @staticmethod
943
    def from_binary(data):
944 1
        return XmlElement(binary=data)
945
946 1
    def _binary_init(self, data):
947 1
        self.Value = uabin.Primitives.String.unpack(data)
948
949 1
    def __str__(self):
950
        return 'XmlElement(Value:' + str(self.Value) + ')'
951
952 1
    __repr__ = __str__
953
954
955 1
class DataValue(FrozenClass):
956
    """
957
    A value with an associated timestamp, and quality.
958
    Automatically generated from xml , copied and modified here to fix errors in xml spec
959
960
    :ivar Value:
961
    :vartype Value: Variant
962
    :ivar StatusCode:
963
    :vartype StatusCode: StatusCode
964
    :ivar SourceTimestamp:
965
    :vartype SourceTimestamp: datetime
966
    :ivar SourcePicoSeconds:
967
    :vartype SourcePicoSeconds: int
968
    :ivar ServerTimestamp:
969
    :vartype ServerTimestamp: datetime
970
    :ivar ServerPicoseconds:
971
    :vartype ServerPicoseconds: int
972
    """
973
974 1
    def __init__(self, variant=None, status=None):
975 1
        self.Encoding = 0
976 1
        if not isinstance(variant, Variant):
977 1
            variant = Variant(variant)
978 1
        self.Value = variant
979 1
        if status is None:
980 1
            self.StatusCode = StatusCode()
981
        else:
982 1
            self.StatusCode = status
983 1
        self.SourceTimestamp = None  # DateTime()
984 1
        self.SourcePicoseconds = None
985 1
        self.ServerTimestamp = None  # DateTime()
986 1
        self.ServerPicoseconds = None
987 1
        self._freeze = True
988
989 1
    def to_binary(self):
990 1
        packet = []
991 1
        if self.Value:
992 1
            self.Encoding |= (1 << 0)
993 1
        if self.StatusCode:
994 1
            self.Encoding |= (1 << 1)
995 1
        if self.SourceTimestamp:
996 1
            self.Encoding |= (1 << 2)
997 1
        if self.ServerTimestamp:
998 1
            self.Encoding |= (1 << 3)
999 1
        if self.SourcePicoseconds:
1000
            self.Encoding |= (1 << 4)
1001 1
        if self.ServerPicoseconds:
1002
            self.Encoding |= (1 << 5)
1003 1
        packet.append(uabin.Primitives.UInt8.pack(self.Encoding))
1004 1
        if self.Value:
1005 1
            packet.append(self.Value.to_binary())
1006 1
        if self.StatusCode:
1007 1
            packet.append(self.StatusCode.to_binary())
1008 1
        if self.SourceTimestamp:
1009 1
            packet.append(uabin.Primitives.DateTime.pack(self.SourceTimestamp))  # self.SourceTimestamp.to_binary())
1010 1
        if self.ServerTimestamp:
1011 1
            packet.append(uabin.Primitives.DateTime.pack(self.ServerTimestamp))  # self.ServerTimestamp.to_binary())
1012 1
        if self.SourcePicoseconds:
1013
            packet.append(uabin.Primitives.UInt16.pack(self.SourcePicoseconds))
1014 1
        if self.ServerPicoseconds:
1015
            packet.append(uabin.Primitives.UInt16.pack(self.ServerPicoseconds))
1016 1
        return b''.join(packet)
1017
1018 1
    @staticmethod
1019
    def from_binary(data):
1020 1
        encoding = ord(data.read(1))
1021 1
        if encoding & (1 << 0):
1022 1
            value = Variant.from_binary(data)
1023
        else:
1024
            value = None
1025 1
        if encoding & (1 << 1):
1026 1
            status = StatusCode.from_binary(data)
1027
        else:
1028
            status = None
1029 1
        obj = DataValue(value, status)
1030 1
        obj.Encoding = encoding
1031 1
        if obj.Encoding & (1 << 2):
1032 1
            obj.SourceTimestamp = uabin.Primitives.DateTime.unpack(data)  # DateTime.from_binary(data)
1033 1
        if obj.Encoding & (1 << 3):
1034 1
            obj.ServerTimestamp = uabin.Primitives.DateTime.unpack(data)  # DateTime.from_binary(data)
1035 1
        if obj.Encoding & (1 << 4):
1036
            obj.SourcePicoseconds = uabin.Primitives.UInt16.unpack(data)
1037 1
        if obj.Encoding & (1 << 5):
1038
            obj.ServerPicoseconds = uabin.Primitives.UInt16.unpack(data)
1039 1
        return obj
1040
1041 1
    def __str__(self):
1042 1
        s = 'DataValue(Value:{0}'.format(self.Value)
1043 1
        if self.StatusCode is not None:
1044 1
            s += ', StatusCode:{0}'.format(self.StatusCode)
1045 1
        if self.SourceTimestamp is not None:
1046 1
            s += ', SourceTimestamp:{0}'.format(self.SourceTimestamp)
1047 1
        if self.ServerTimestamp is not None:
1048 1
            s += ', ServerTimestamp:{0}'.format(self.ServerTimestamp)
1049 1
        if self.SourcePicoseconds is not None:
1050
            s += ', SourcePicoseconds:{0}'.format(self.SourcePicoseconds)
1051 1
        if self.ServerPicoseconds is not None:
1052
            s += ', ServerPicoseconds:{0}'.format(self.ServerPicoseconds)
1053 1
        s += ')'
1054 1
        return s
1055
1056 1
    __repr__ = __str__
1057
1058
1059 1
def datatype_to_varianttype(int_type):
1060
    """
1061
    Takes a NodeId or int and return a VariantType
1062
    This is only supported if int_type < 63 due to VariantType encoding
1063
    At low level we do not have access to address space thus decoding is limited
1064
    a better version of this method can be find in ua_utils.py
1065
    """
1066 1
    if isinstance(int_type, NodeId):
1067
        int_type = int_type.Identifier
1068
1069 1
    if int_type <= 25:
1070 1
        return VariantType(int_type)
1071
    else:
1072 1
        return VariantTypeCustom(int_type)
1073
1074
1075 1
def get_default_value(vtype):
1076
    """
1077
    Given a variant type return default value for this type
1078
    """
1079 1
    if vtype == VariantType.Null:
1080
        return None
1081 1
    elif vtype == VariantType.Boolean:
1082 1
        return False
1083 1
    elif vtype in (VariantType.SByte, VariantType.Byte):
1084
        return 0
1085 1
    elif vtype == VariantType.ByteString:
1086
        return b""
1087 1
    elif 4 <= vtype.value <= 9:
1088 1
        return 0
1089 1
    elif vtype in (VariantType.Float, VariantType.Double):
1090 1
        return 0.0
1091 1
    elif vtype == VariantType.String:
1092 1
        return None  # a string can be null
1093
    elif vtype == VariantType.DateTime:
1094
        return datetime.utcnow()
1095
    elif vtype == VariantType.Guid:
1096
        return uuid.uuid4()
1097
    elif vtype == VariantType.XmlElement:
1098
        return None  #Not sure this is correct
1099
    elif vtype == VariantType.NodeId:
1100
        return NodeId()
1101
    elif vtype == VariantType.ExpandedNodeId:
1102
        return NodeId()
1103
    elif vtype == VariantType.StatusCode:
1104
        return StatusCode()
1105
    elif vtype == VariantType.QualifiedName:
1106
        return QualifiedName()
1107
    elif vtype == VariantType.LocalizedText:
1108
        return LocalizedText()
1109
    elif vtype == VariantType.ExtensionObject:
1110
        return ExtensionObject()
1111
    elif vtype == VariantType.DataValue:
1112
        return DataValue()
1113
    elif vtype == VariantType.Variant:
1114
        return Variant()
1115
    else:
1116
        raise RuntimeError("function take a uatype as argument, got:", vtype)
1117
1118
1119
# These dictionnaries are used to register extensions classes for automatic
1120
# decoding and encoding
1121 1
extension_object_classes = {}
1122 1
extension_object_ids = {}
1123
1124
1125 1
def register_extension_object(name, nodeid, class_type):
1126
    """
1127
    Register a new extension object for automatic decoding and make them available in ua module
1128
    """
1129
    logger.warning("registring new extension object: %s %s %s", name, nodeid, class_type)
1130
    extension_object_classes[nodeid] = class_type
1131
    extension_object_ids[name] = nodeid
1132
    # FIXME: Next line is not exactly a Python best practices, so feel free to propose something else
1133
    import opcua.ua
1134
    setattr(opcua.ua, name, class_type)
1135
1136 1
def get_extensionobject_class_type(typeid):
1137
    """
1138
    Returns the registered class type for typid of an extension object
1139
    """
1140
    if typeid in extension_object_classes:
1141
        return extension_object_classes[typeid]
1142
    else:
1143
        return None
1144
1145 1
def extensionobject_from_binary(data):
1146
    """
1147
    Convert binary-coded ExtensionObject to a Python object.
1148
    Returns an object, or None if TypeId is zero
1149
    """
1150 1
    typeid = NodeId.from_binary(data)
1151 1
    Encoding = ord(data.read(1))
1152 1
    body = None
1153 1
    if Encoding & (1 << 0):
1154 1
        length = uabin.Primitives.Int32.unpack(data)
1155 1
        if length < 1:
1156
            body = Buffer(b"")
1157
        else:
1158 1
            body = data.copy(length)
1159 1
            data.skip(length)
1160 1
    if typeid.Identifier == 0:
1161 1
        return None
1162 1
    elif typeid in extension_object_classes:
1163 1
        klass = extension_object_classes[typeid]
1164 1
        if body is None:
1165
            raise UaError("parsing ExtensionObject {0} without data".format(klass.__name__))
1166 1
        return klass.from_binary(body)
1167
    else:
1168 1
        e = ExtensionObject()
1169 1
        e.TypeId = typeid
1170 1
        e.Encoding = Encoding
1171 1
        if body is not None:
1172 1
            e.Body = body.read(len(body))
1173 1
        return e
1174
1175
1176 1
def extensionobject_to_binary(obj):
1177
    """
1178
    Convert Python object to binary-coded ExtensionObject.
1179
    If obj is None, convert to empty ExtensionObject (TypeId = 0, no Body).
1180
    Returns a binary string
1181
    """
1182 1
    if isinstance(obj, ExtensionObject):
1183 1
        return obj.to_binary()
1184 1
    if obj is None:
1185 1
        TypeId = NodeId()
1186 1
        Encoding = 0
1187 1
        Body = None
1188
    else:
1189 1
        TypeId = extension_object_ids[obj.__class__.__name__]
1190 1
        Encoding = 0x01
1191 1
        Body = obj.to_binary()
1192 1
    packet = []
1193 1
    packet.append(TypeId.to_binary())
1194 1
    packet.append(uabin.Primitives.UInt8.pack(Encoding))
1195 1
    if Body:
1196 1
        packet.append(uabin.Primitives.Bytes.pack(Body))
1197
    return b''.join(packet)
1198