Completed
Pull Request — master (#490)
by Olivier
10:23
created

DataValue.from_binary()   C

Complexity

Conditions 7

Size

Total Lines 22

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 15
CRAP Score 7.457

Importance

Changes 0
Metric Value
cc 7
c 0
b 0
f 0
dl 0
loc 22
ccs 15
cts 19
cp 0.7895
crap 7.457
rs 5.7894
1
"""
2
implement ua datatypes
3
"""
4
5 1
import logging
6 1
from enum import Enum, IntEnum
7 1
from calendar import timegm
8 1
import sys
9 1
import os
10 1
import uuid
11 1
import re
12 1
import itertools
13 1
from datetime import datetime, timedelta, MAXYEAR, tzinfo
14
15 1
from opcua.ua import status_codes
16 1
from opcua.ua import ObjectIds
17 1
from opcua.ua.uaerrors import UaError
18 1
from opcua.ua.uaerrors import UaStatusCodeError
19 1
from opcua.ua.uaerrors import UaStringParsingError
20 1
21 1
logger = logging.getLogger(__name__)
22
23
if sys.version_info.major > 2:
24 1
    unicode = str
25
26
27 1
EPOCH_AS_FILETIME = 116444736000000000  # January 1, 1970 as MS file time
28
HUNDREDS_OF_NANOSECONDS = 10000000
29 1
FILETIME_EPOCH_AS_DATETIME = datetime(1601, 1, 1)
30 1
31
32
class UTC(tzinfo):
33 1
    """
34
    UTC
35
    """
36
37
    def utcoffset(self, dt):
38
        return timedelta(0)
39
40 1
    def tzname(self, dt):
41
        return "UTC"
42 1
43 1
    def dst(self, dt):
44
        return timedelta(0)
45
46 1
47
48
# method copied from David Buxton <[email protected]> sample code
49 1
def datetime_to_win_epoch(dt):
50
    if (dt.tzinfo is None) or (dt.tzinfo.utcoffset(dt) is None):
51
        dt = dt.replace(tzinfo=UTC())
52
    ft = EPOCH_AS_FILETIME + (timegm(dt.timetuple()) * HUNDREDS_OF_NANOSECONDS)
53
    return ft + (dt.microsecond * 10)
54
55
56 1
def get_win_epoch():
57
    return win_epoch_to_datetime(0)
58
59 1
60
def win_epoch_to_datetime(epch):
61
    try:
62
        return FILETIME_EPOCH_AS_DATETIME + timedelta(microseconds=epch // 10)
63
    except OverflowError:
64
        # FILETIMEs after 31 Dec 9999 can't be converted to datetime
65 1
        logger.warning("datetime overflow: %s", epch)
66 1
        return datetime(MAXYEAR, 12, 31, 23, 59, 59, 999999)
67 1
68 1
69 1
class _FrozenClass(object):
70
    """
71 1
    Make it impossible to add members to a class.
72 1
    Not pythonic at all but we found out it prevents many many
73 1
    bugs in use of protocol structures
74
    """
75
    _freeze = False
76 1
77
    def __setattr__(self, key, value):
78 1
        if self._freeze and not hasattr(self, key):
79
            raise TypeError("Error adding member '{0}' to class '{1}', class is frozen, members are {2}".format(
80
                key, self.__class__.__name__, self.__dict__.keys()))
81 1
        object.__setattr__(self, key, value)
82
83 1
84
if "PYOPCUA_NO_TYPO_CHECK" in os.environ:
85 1
    # typo check is cpu consuming, but it will make debug easy.
86
    # if typo check is not need (in production), please set env PYOPCUA_NO_TYPO_CHECK.
87
    # this will make all uatype class inherit from object intead of _FrozenClass
88
    # and skip the typo check.
89
    FrozenClass = object
90 1
else:
91 1
    FrozenClass = _FrozenClass
92
93 1
94
class ValueRank(IntEnum):
95 1
    """
96
    Defines dimensions of a variable.
97 1
    This enum does not support all cases since ValueRank support any n>0
98
    but since it is an IntEnum it can be replace by a normal int
99 1
    """
100
    ScalarOrOneDimension = -3
101
    Any = -2
102
    Scalar = -1
103
    OneOrMoreDimensions = 0
104
    OneDimension = 1
105 1
    # the next names are not in spec but so common we express them here
106
    TwoDimensions = 2
107 1
    ThreeDimensions = 3
108 1
    FourDimensions = 4
109 1
110 1
111 1
class _MaskEnum(IntEnum):
112 1
    @classmethod
113
    def parse_bitfield(cls, the_int):
114
        """ Take an integer and interpret it as a set of enum values. """
115 1
        assert isinstance(the_int, int)
116
117
        return {cls(b) for b in cls._bits(the_int)}
118
119
    @classmethod
120
    def to_bitfield(cls, collection):
121 1
        """ Takes some enum values and creates an integer from them. """
122 1
        # make sure all elements are of the correct type (use itertools.tee in case we get passed an
123 1
        # iterator)
124 1
        iter1, iter2 = itertools.tee(iter(collection))
125 1
        assert all(isinstance(x, cls) for x in iter1)
126 1
127 1
        return sum(x.mask for x in iter2)
128
129
    @property
130 1
    def mask(self):
131
        return 1 << self.value
132
133
    @staticmethod
134
    def _bits(n):
135
        """ Iterate over the bits in n.
136 1
137 1
            e.g. bits(44) yields at 2, 3, 5
138 1
        """
139 1
        assert n >= 0  # avoid infinite recursion
140 1
141 1
        pos = 0
142 1
        while n:
143 1
            if n & 0x1:
144 1
                yield pos
145 1
            n = n // 2
146 1
            pos += 1
147 1
148 1
149 1
class AccessLevel(_MaskEnum):
150 1
    """
151 1
    Bit index to indicate what the access level is.
152 1
153 1
    Spec Part 3, appears multiple times, e.g. paragraph 5.6.2 Variable NodeClass
154 1
    """
155 1
    CurrentRead = 0
156 1
    CurrentWrite = 1
157 1
    HistoryRead = 2
158
    HistoryWrite = 3
159
    SemanticChange = 4
160 1
    StatusWrite = 5
161
    TimestampWrite = 6
162
163
164
class WriteMask(_MaskEnum):
165
    """
166 1
    Bit index to indicate which attribute of a node is writable
167
168 1
    Spec Part 3, Paragraph 5.2.7 WriteMask
169 1
    """
170
    AccessLevel = 0
171
    ArrayDimensions = 1
172 1
    BrowseName = 2
173
    ContainsNoLoops = 3
174
    DataType = 4
175
    Description = 5
176
    DisplayName = 6
177
    EventNotifier = 7
178
    Executable = 8
179
    Historizing = 9
180
    InverseName = 10
181
    IsAbstract = 11
182 1
    MinimumSamplingInterval = 12
183 1
    NodeClass = 13
184 1
    NodeId = 14
185 1
    Symmetric = 15
186
    UserAccessLevel = 16
187 1
    UserExecutable = 17
188 1
    UserWriteMask = 18
189 1
    ValueRank = 19
190
    WriteMask = 20
191 1
    ValueForVariableType = 21
192 1
193
194 1
class EventNotifier(_MaskEnum):
195
    """
196 1
    Bit index to indicate how a node can be used for events.
197 1
198 1
    Spec Part 3, appears multiple times, e.g. Paragraph 5.4 View NodeClass
199
    """
200 1
    SubscribeToEvents = 0
201
    # Reserved        = 1
202
    HistoryRead = 2
203
    HistoryWrite = 3
204
205
206 1
class StatusCode(FrozenClass):
207 1
    """
208
    :ivar value:
209 1
    :vartype value: int
210
    :ivar name:
211
    :vartype name: string
212
    :ivar doc:
213 1
    :vartype doc: string
214 1
    """
215 1
216
    ua_types = [("value", "UInt32")]
217 1
218
    def __init__(self, value=0):
219 1
        if isinstance(value, str):
220 1
            self.name = value
221 1
            self.value = getattr(status_codes.StatusCodes, value)
222
        else:
223 1
            self.value = value
224 1
            self.name, self.doc = status_codes.get_name_and_doc(value)
225
        self._freeze = True
226 1
227
    def check(self):
228
        """
229
        Raises an exception if the status code is anything else than 0 (good).
230 1
231 1
        Use the is_good() method if you do not want an exception.
232 1
        """
233 1
        if not self.is_good():
234 1
            raise UaStatusCodeError(self.value)
235 1
236 1
    def is_good(self):
237
        """
238
        return True if status is Good.
239 1
        """
240
        mask = 3 << 30
241
        if mask & self.value:
242
            return False
243
        else:
244
            return True
245
246
    def __str__(self):
247
        return 'StatusCode({0})'.format(self.name)
248
249
    __repr__ = __str__
250
251
    def __eq__(self, other):
252
        return self.value == other.value
253
254
    def __ne__(self, other):
255
        return not self.__eq__(other)
256
257
258
class NodeIdType(IntEnum):
259 1
    TwoByte = 0
260
    FourByte = 1
261 1
    Numeric = 2
262 1
    String = 3
263 1
    Guid = 4
264 1
    ByteString = 5
265 1
266 1
267 1
class NodeId(FrozenClass):
268 1
    """
269 1
    NodeId Object
270 1
271 1
    Args:
272 1
        identifier: The identifier might be an int, a string, bytes or a Guid
273 1
        namespaceidx(int): The index of the namespace
274 1
        nodeidtype(NodeIdType): The type of the nodeid if it cannor be guess or you want something special like twobyte nodeid or fourbytenodeid
275 1
276 1
277 1
    :ivar Identifier:
278 1
    :vartype Identifier: NodeId
279
    :ivar NamespaceIndex:
280 1
    :vartype NamespaceIndex: Int
281 1
    :ivar NamespaceUri:
282
    :vartype NamespaceUri: String
283
    :ivar ServerIndex:
284
    :vartype ServerIndex: Int
285 1
    """
286 1
287
    def __init__(self, identifier=None, namespaceidx=0, nodeidtype=None):
288 1
289 1
        self.Identifier = identifier
290
        self.NamespaceIndex = namespaceidx
291 1
        self.NodeIdType = nodeidtype
292 1
        self.NamespaceUri = ""
293
        self.ServerIndex = 0
294 1
        self._freeze = True
295 1
        if not isinstance(self.NamespaceIndex, int):
296
            raise UaError("NamespaceIndex must be an int")
297 1
        if self.Identifier is None:
298 1
            self.Identifier = 0
299
            self.NodeIdType = NodeIdType.TwoByte
300 1
            return
301 1
        if self.NodeIdType is None:
302
            if isinstance(self.Identifier, int):
303 1
                self.NodeIdType = NodeIdType.Numeric
304
            elif isinstance(self.Identifier, str):
305 1
                self.NodeIdType = NodeIdType.String
306 1
            elif isinstance(self.Identifier, bytes):
307 1
                self.NodeIdType = NodeIdType.ByteString
308 1
            elif isinstance(self.Identifier, uuid.UUID):
309
                self.NodeIdType = NodeIdType.Guid
310 1
            else:
311 1
                raise UaError("NodeId: Could not guess type of NodeId, set NodeIdType")
312 1
313 1
    def _key(self):
314 1
        if self.NodeIdType in (NodeIdType.TwoByte, NodeIdType.FourByte, NodeIdType.Numeric):
315 1
            # twobyte, fourbyte and numeric may represent the same node
316
            return (NodeIdType.Numeric, self.NamespaceIndex, self.Identifier)
317 1
        return (self.NodeIdType, self.NamespaceIndex, self.Identifier)
318
319 1
    def __eq__(self, node):
320 1
        return isinstance(node, NodeId) and self._key() == node._key()
321 1
322 1
    def __ne__(self, other):
323
        return not self.__eq__(other)
324 1
325
    def __hash__(self):
326 1
        return hash(self._key())
327 1
328 1
    def __lt__(self, other):
329 1
        if not isinstance(other, NodeId):
330 1
            raise AttributeError("Can only compare to NodeId")
331 1
        return self._key() < other._key()
332 1
333 1
    def is_null(self):
334 1
        if self.NamespaceIndex != 0:
335 1
            return False
336 1
        return self.has_null_identifier()
337 1
338 1
    def has_null_identifier(self):
339 1
        if not self.Identifier:
340 1
            return True
341 1
        if self.NodeIdType == NodeIdType.Guid and re.match(b'0.', self.Identifier):
342 1
            return True
343 1
        return False
344 1
345 1
    @staticmethod
346 1
    def from_string(string):
347
        try:
348
            return NodeId._from_string(string)
349 1
        except ValueError as ex:
350
            raise UaStringParsingError("Error parsing string {0}".format(string), ex)
351
352 1
    @staticmethod
353 1
    def _from_string(string):
354
        l = string.split(";")
355
        identifier = None
356 1
        namespace = 0
357 1
        ntype = None
358 1
        srv = None
359 1
        nsu = None
360 1
        for el in l:
361 1
            if not el:
362
                continue
363 1
            k, v = el.split("=", 1)
364 1
            k = k.strip()
365 1
            v = v.strip()
366 1
            if k == "ns":
367 1
                namespace = int(v)
368 1
            elif k == "i":
369 1
                ntype = NodeIdType.Numeric
370 1
                identifier = int(v)
371 1
            elif k == "s":
372 1
                ntype = NodeIdType.String
373 1
                identifier = v
374 1
            elif k == "g":
375 1
                ntype = NodeIdType.Guid
376
                identifier = v
377
            elif k == "b":
378
                ntype = NodeIdType.ByteString
379
                identifier = v
380 1
            elif k == "srv":
381 1
                srv = v
382
            elif k == "nsu":
383 1
                nsu = v
384
        if identifier is None:
385 1
            raise UaStringParsingError("Could not find identifier in string: " + string)
386
        nodeid = NodeId(identifier, namespace, ntype)
387 1
        nodeid.NamespaceUri = nsu
388 1
        nodeid.ServerIndex = srv
389 1
        return nodeid
390
391 1
    def to_string(self):
392 1
        string = []
393 1
        if self.NamespaceIndex != 0:
394 1
            string.append("ns={0}".format(self.NamespaceIndex))
395 1
        ntype = None
396 1
        if self.NodeIdType == NodeIdType.Numeric:
397 1
            ntype = "i"
398 1
        elif self.NodeIdType == NodeIdType.String:
399 1
            ntype = "s"
400
        elif self.NodeIdType == NodeIdType.TwoByte:
401 1
            ntype = "i"
402 1
        elif self.NodeIdType == NodeIdType.FourByte:
403
            ntype = "i"
404 1
        elif self.NodeIdType == NodeIdType.Guid:
405 1
            ntype = "g"
406
        elif self.NodeIdType == NodeIdType.ByteString:
407
            ntype = "b"
408
        string.append("{0}={1}".format(ntype, self.Identifier))
409
        if self.ServerIndex:
410
            string.append("srv={}".format(self.ServerIndex))
411
        if self.NamespaceUri:
412 1
            string.append("nsu={0}".format(self.NamespaceUri))
413
        return ";".join(string)
414 1
415 1
    def __str__(self):
416 1
        return "{0}NodeId({1})".format(self.NodeIdType.name, self.to_string())
417
418 1
    __repr__ = __str__
419 1
420 1
    def to_binary(self):
421 1
        import opcua
422 1
        return opcua.ua.ua_binary.nodeid_to_binary(self)
423 1
424 1
425 1
class TwoByteNodeId(NodeId):
426 1
    def __init__(self, identifier):
427 1
        NodeId.__init__(self, identifier, 0, NodeIdType.TwoByte)
428 1
429 1
430 1
class FourByteNodeId(NodeId):
431 1
    def __init__(self, identifier, namespace=0):
432 1
        NodeId.__init__(self, identifier, namespace, NodeIdType.FourByte)
433
434
435
class NumericNodeId(NodeId):
436 1
    def __init__(self, identifier, namespace=0):
437
        NodeId.__init__(self, identifier, namespace, NodeIdType.Numeric)
438 1
439
440
class ByteStringNodeId(NodeId):
441 1
    def __init__(self, identifier, namespace=0):
442
        NodeId.__init__(self, identifier, namespace, NodeIdType.ByteString)
443
444 1
445
class GuidNodeId(NodeId):
446 1
    def __init__(self, identifier, namespace=0):
447 1
        NodeId.__init__(self, identifier, namespace, NodeIdType.Guid)
448
449
450 1
class StringNodeId(NodeId):
451
    def __init__(self, identifier, namespace=0):
452 1
        NodeId.__init__(self, identifier, namespace, NodeIdType.String)
453 1
454
455
ExpandedNodeId = NodeId
456 1
457
458 1
class QualifiedName(FrozenClass):
459 1
    """
460
    A string qualified with a namespace index.
461
    """
462 1
463
    ua_types = [
464 1
        ('NamespaceIndex', 'UInt16'),
465 1
        ('Name', 'String'),
466
    ]
467
468 1
    def __init__(self, name=None, namespaceidx=0):
469
        if not isinstance(namespaceidx, int):
470 1
            raise UaError("namespaceidx must be an int")
471 1
        self.NamespaceIndex = namespaceidx
472
        self.Name = name
473
        self._freeze = True
474 1
475
    def to_string(self):
476 1
        return "{0}:{1}".format(self.NamespaceIndex, self.Name)
477 1
478
    @staticmethod
479
    def from_string(string):
480 1
        if ":" in string:
481
            try:
482
                idx, name = string.split(":", 1)
483 1
                idx = int(idx)
484
            except (TypeError, ValueError) as ex:
485
                raise UaStringParsingError("Error parsing string {0}".format(string), ex)
486
        else:
487
            idx = 0
488 1
            name = string
489 1
        return QualifiedName(name, idx)
490
491 1
    def __eq__(self, bname):
492 1
        return isinstance(bname,
493 1
                          QualifiedName) and self.Name == bname.Name and self.NamespaceIndex == bname.NamespaceIndex
494
495 1
    def __ne__(self, other):
496 1
        return not self.__eq__(other)
497
498 1
    def __lt__(self, other):
499
        if not isinstance(other, QualifiedName):
500 1
            raise TypeError("Cannot compare QualifiedName and {0}".format(other))
501 1
        if self.NamespaceIndex == other.NamespaceIndex:
502 1
            return self.Name < other.Name
503 1
        else:
504 1
            return self.NamespaceIndex < other.NamespaceIndex
505 1
506
    def __str__(self):
507 1
        return 'QualifiedName({0}:{1})'.format(self.NamespaceIndex, self.Name)
508 1
509 1
    __repr__ = __str__
510
511 1
512 1
class LocalizedText(FrozenClass):
513 1
    """
514 1
    A string qualified with a namespace index.
515 1
    """
516
517 1
    ua_switches = {
518
        'Locale': ('Encoding', 0),
519 1
        'Text': ('Encoding', 1),
520 1
    }
521 1
522 1
    ua_types = (
523
            ('Encoding', 'Byte'), 
524 1
            ('Locale', 'String'), 
525 1
            ('Text', 'String'), )
526
527 1
    def __init__(self, text=None):
528
        self.Encoding = 0
529
        if text is not None and not isinstance(text, str):
530 1
            raise ValueError("A LocalizedText object takes a string as argument, not a {}, {}".format(text, type(text)))
531
        self.Text = text
532
        if self.Text:
533
            self.Encoding |= (1 << 1)
534
        self.Locale = None
535
        self._freeze = True
536
537
    def to_string(self):
538 1
        # FIXME: use local
539 1
        if self.Text is None:
540
            return ""
541 1
        return self.Text
542
543
    def __str__(self):
544 1
        return 'LocalizedText(' + 'Encoding:' + str(self.Encoding) + ', ' + \
545
            'Locale:' + str(self.Locale) + ', ' + \
546
            'Text:' + str(self.Text) +')'
547
548
    __repr__ = __str__
549 1
550
    def __eq__(self, other):
551
        if isinstance(other, LocalizedText) and self.Locale == other.Locale and self.Text == other.Text:
552
            return True
553
        return False
554 1
555 1
    def __ne__(self, other):
556 1
        return not self.__eq__(other)
557
558 1
559 1
class ExtensionObject(FrozenClass):
560 1
    """
561 1
    Any UA object packed as an ExtensionObject
562 1
563
    :ivar TypeId:
564 1
    :vartype TypeId: NodeId
565 1
    :ivar Body:
566 1
    :vartype Body: bytes
567
    """
568 1
    ua_switches = {
569 1
        'Body': ('Encoding', 0),
570 1
    }
571 1
572
    ua_types = (
573 1
            ("TypeId", "NodeId"), 
574 1
            ("Encoding", "Byte"), 
575 1
            ("Body", "ByteString"), 
576
            )
577 1
578
    def __init__(self):
579 1
        self.TypeId = NodeId()
580 1
        self.Encoding = 0
581 1
        self.Body = None
582
        self._freeze = True
583 1
584 1
    def __bool__(self):
585 1
        return self.Body is not None
586
    __nonzero__ = __bool__  # Python2 compatibilty
587 1
588
    def __str__(self):
589 1
        size = len(self.Body) if self.Body is not None else None
590
        return 'ExtensionObject(' + 'TypeId:' + str(self.TypeId) + ', ' + \
591 1
            'Encoding:' + str(self.Encoding) + ', ' + str(size) + ' bytes)'
592
593 1
    __repr__ = __str__
594 1
595
596
class VariantType(Enum):
597 1
    """
598
    The possible types of a variant.
599 1
600 1
    :ivar Null:
601 1
    :ivar Boolean:
602 1
    :ivar SByte:
603
    :ivar Byte:
604 1
    :ivar Int16:
605 1
    :ivar UInt16:
606
    :ivar Int32:
607
    :ivar UInt32:
608 1
    :ivar Int64:
609
    :ivar UInt64:
610
    :ivar Float:
611
    :ivar Double:
612
    :ivar String:
613
    :ivar DateTime:
614
    :ivar Guid:
615
    :ivar ByteString:
616
    :ivar XmlElement:
617
    :ivar NodeId:
618 1
    :ivar ExpandedNodeId:
619
    :ivar StatusCode:
620
    :ivar QualifiedName:
621
    :ivar LocalizedText:
622
    :ivar ExtensionObject:
623
    :ivar DataValue:
624 1
    :ivar Variant:
625 1
    :ivar DiagnosticInfo:
626 1
    """
627 1
628 1
    Null = 0
629
    Boolean = 1
630 1
    SByte = 2
631 1
    Byte = 3
632 1
    Int16 = 4
633 1
    UInt16 = 5
634 1
    Int32 = 6
635 1
    UInt32 = 7
636 1
    Int64 = 8
637 1
    UInt64 = 9
638 1
    Float = 10
639
    Double = 11
640 1
    String = 12
641
    DateTime = 13
642 1
    Guid = 14
643 1
    ByteString = 15
644 1
    XmlElement = 16
645 1
    NodeId = 17
646
    ExpandedNodeId = 18
647 1
    StatusCode = 19
648
    QualifiedName = 20
649 1
    LocalizedText = 21
650
    ExtensionObject = 22
651
    DataValue = 23
652
    Variant = 24
653
    DiagnosticInfo = 25
654
655
656
class VariantTypeCustom(object):
657 1
    """
658
    Looks like sometime we get variant with other values than those
659
    defined in VariantType.
660
    FIXME: We should not need this class, as far as I iunderstand the spec
661 1
    variants can only be of VariantType
662
    """
663
664 1
    def __init__(self, val):
665
        self.name = "Custom"
666
        self.value = val
667
        if self.value > 0b00111111:
668
            raise UaError(
669
                "Cannot create VariantType. VariantType must be {0} > x > {1}, received {2}".format(0b111111, 25, val))
670
671
    def __str__(self):
672
        return "VariantType.Custom:{0}".format(self.value)
673
674
    __repr__ = __str__
675
676
    def __eq__(self, other):
677
        return self.value == other.value
678
679
680
class Variant(FrozenClass):
681
    """
682
    Create an OPC-UA Variant object.
683
    if no argument a Null Variant is created.
684
    if not variant type is given, attemps to guess type from python type
685
    if a variant is given as value, the new objects becomes a copy of the argument
686
687
    :ivar Value:
688
    :vartype Value: Any supported type
689
    :ivar VariantType:
690
    :vartype VariantType: VariantType
691
    :ivar Dimension:
692
    :vartype Dimensions: The length of each dimensions. Usually guessed from value.
693
    :ivar is_array:
694
    :vartype is_array: If the variant is an array. Usually guessed from value.
695
    """
696 1
697 1
    def __init__(self, value=None, varianttype=None, dimensions=None, is_array=None):
698 1
        self.Value = value
699 1
        self.VariantType = varianttype
700 1
        self.Dimensions = dimensions
701 1
        self.is_array = is_array
702 1
        if self.is_array is None:
703 1
            if isinstance(value, (list, tuple)):
704 1
                self.is_array = True
705 1
            else:
706 1
                self.is_array = False
707 1
        self._freeze = True
708 1
        if isinstance(value, Variant):
709 1
            self.Value = value.Value
710 1
            self.VariantType = value.VariantType
711 1
        if self.VariantType is None:
712 1
            self.VariantType = self._guess_type(self.Value)
713 1
        if self.Value is None and not self.is_array and self.VariantType not in (VariantType.Null, VariantType.String,
714 1
                                                                                 VariantType.DateTime):
715 1
            raise UaError("Non array Variant of type {0} cannot have value None".format(self.VariantType))
716 1
        if self.Dimensions is None and isinstance(self.Value, (list, tuple)):
717 1
            dims = get_shape(self.Value)
718 1
            if len(dims) > 1:
719 1
                self.Dimensions = dims
720 1
721 1
    def __eq__(self, other):
722
        if isinstance(other, Variant) and self.VariantType == other.VariantType and self.Value == other.Value:
723
            return True
724 1
        return False
725
726
    def __ne__(self, other):
727
        return not self.__eq__(other)
728
729
    def _guess_type(self, val):
730
        if isinstance(val, (list, tuple)):
731
            error_val = val
732 1
        while isinstance(val, (list, tuple)):
733 1
            if len(val) == 0:
734 1
                raise UaError("could not guess UA type of variable {0}".format(error_val))
735 1
            val = val[0]
736 1
        if val is None:
737
            return VariantType.Null
738 1
        elif isinstance(val, bool):
739
            return VariantType.Boolean
740 1
        elif isinstance(val, float):
741
            return VariantType.Double
742 1
        elif isinstance(val, IntEnum):
743 1
            return VariantType.Int32
744
        elif isinstance(val, int):
745
            return VariantType.Int64
746 1
        elif isinstance(val, (str, unicode)):
747
            return VariantType.String
748
        elif isinstance(val, bytes):
749
            return VariantType.ByteString
750
        elif isinstance(val, datetime):
751
            return VariantType.DateTime
752
        elif isinstance(val, uuid.UUID):
753
            return VariantType.Guid
754
        else:
755
            if isinstance(val, object):
756
                try:
757
                    return getattr(VariantType, val.__class__.__name__)
758
                except AttributeError:
759
                    return VariantType.ExtensionObject
760
            else:
761
                raise UaError("Could not guess UA type of {0} with type {1}, specify UA type".format(val, type(val)))
762
763 1
    def __str__(self):
764 1
        return "Variant(val:{0!s},type:{1})".format(self.Value, self.VariantType)
765 1
766 1
    __repr__ = __str__
767 1
768 1
    def to_binary(self):
769 1
        from opcua.ua.ua_binary import variant_to_binary
770 1
        return variant_to_binary(self)
771
772 1
773 1
def _split_list(l, n):
774 1
    n = max(1, n)
775 1
    return [l[i:i + n] for i in range(0, len(l), n)]
776 1
777 1
778 1
def flatten_and_get_shape(mylist):
779 1
    dims = []
780
    dims.append(len(mylist))
781
    while isinstance(mylist[0], (list, tuple)):
782
        dims.append(len(mylist[0]))
783
        mylist = [item for sublist in mylist for item in sublist]
784 1
        if len(mylist) == 0:
785 1
            break
786 1
    return mylist, dims
787 1
788
789 1
def flatten(mylist):
790 1
    if mylist is None:
791 1
        return None
792 1
    elif len(mylist) == 0:
793
        return mylist
794 1
    while isinstance(mylist[0], (list, tuple)):
795 1
        mylist = [item for sublist in mylist for item in sublist]
796
        if len(mylist) == 0:
797 1
            break
798 1
    return mylist
799 1
800 1
801 1
def get_shape(mylist):
802
    dims = []
803 1
    while isinstance(mylist, (list, tuple)):
804 1
        dims.append(len(mylist))
805 1 View Code Duplication
        if len(mylist) == 0:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
806 1
            break
807 1
        mylist = mylist[0]
808 1
    return dims
809 1
810 1
811 1
class DataValue(FrozenClass):
812 1
    """
813 1
    A value with an associated timestamp, and quality.
814 1
    Automatically generated from xml , copied and modified here to fix errors in xml spec
815 1
816 1
    :ivar Value:
817
    :vartype Value: Variant
818 1
    :ivar StatusCode:
819 1
    :vartype StatusCode: StatusCode
820 1
    :ivar SourceTimestamp:
821 1
    :vartype SourceTimestamp: datetime
822
    :ivar SourcePicoSeconds:
823 1
    :vartype SourcePicoSeconds: int
824 1
    :ivar ServerTimestamp:
825 1
    :vartype ServerTimestamp: datetime
826 1
    :ivar ServerPicoseconds:
827 1
    :vartype ServerPicoseconds: int
828
    """
829
830
    ua_switches = {
831 1
        'Value': ('Encoding', 0),
832 1
        'StatusCode': ('Encoding', 1),
833 1
        'SourceTimestamp': ('Encoding', 2),
834
        'ServerTimestamp': ('Encoding', 3),
835 1
        'SourcePicoseconds': ('Encoding', 4),
836 1
        'ServerPicoseconds': ('Encoding', 5),
837 1
    }
838 1
839 1
    ua_types = (
840 1
            ('Encoding', 'Byte'), 
841 1
            ('Value', 'Variant'), 
842 1
            ('StatusCode', 'StatusCode'), 
843 1
            ('SourceTimestamp', 'DateTime'),
844 1
            ('SourcePicoseconds', 'UInt16'), 
845 1
            ('ServerTimestamp', 'DateTime'), 
846 1
            ('ServerPicoseconds', 'UInt16'), 
847
            )
848 1
849 1
    def __init__(self, variant=None, status=None):
850
        self.Encoding = 0
851 1
        if not isinstance(variant, Variant):
852
            variant = Variant(variant)
853 1
        self.Value = variant
854
        if status is None:
855 1
            self.StatusCode = StatusCode()
856 1
        else:
857 1
            self.StatusCode = status
858 1
        self.SourceTimestamp = None  # DateTime()
859 1
        self.SourcePicoseconds = None
860 1
        self.ServerTimestamp = None  # DateTime()
861 1
        self.ServerPicoseconds = None
862 1
        self._freeze = True
863
864 1
    def __str__(self):
865 1
        s = 'DataValue(Value:{0}'.format(self.Value)
866 1
        if self.StatusCode is not None:
867 1
            s += ', StatusCode:{0}'.format(self.StatusCode)
868 1
        if self.SourceTimestamp is not None:
869
            s += ', SourceTimestamp:{0}'.format(self.SourceTimestamp)
870
        if self.ServerTimestamp is not None:
871 1
            s += ', ServerTimestamp:{0}'.format(self.ServerTimestamp)
872 1
        if self.SourcePicoseconds is not None:
873 1
            s += ', SourcePicoseconds:{0}'.format(self.SourcePicoseconds)
874 1
        if self.ServerPicoseconds is not None:
875 1
            s += ', ServerPicoseconds:{0}'.format(self.ServerPicoseconds)
876 1
        s += ')'
877 1
        return s
878 1
879 1
    __repr__ = __str__
880 1
881 1
882 1
def datatype_to_varianttype(int_type):
883
    """
884
    Takes a NodeId or int and return a VariantType
885 1
    This is only supported if int_type < 63 due to VariantType encoding
886
    At low level we do not have access to address space thus decoding is limited
887
    a better version of this method can be find in ua_utils.py
888
    """
889
    if isinstance(int_type, NodeId):
890 1
        int_type = int_type.Identifier
891
892
    if int_type <= 25:
893
        return VariantType(int_type)
894
    else:
895
        return VariantTypeCustom(int_type)
896
897
898
def get_default_value(vtype):
899
    """
900
    Given a variant type return default value for this type
901 1
    """
902 1
    if vtype == VariantType.Null:
903 1
        return None
904 1
    elif vtype == VariantType.Boolean:
905 1
        return False
906 1
    elif vtype in (VariantType.SByte, VariantType.Byte):
907 1
        return 0
908 1
    elif vtype == VariantType.ByteString:
909 1
        return b""
910 1
    elif 4 <= vtype.value <= 9:
911
        return 0
912
    elif vtype in (VariantType.Float, VariantType.Double):
913 1
        return 0.0
914 1
    elif vtype == VariantType.String:
915 1
        return None  # a string can be null
916 1
    elif vtype == VariantType.DateTime:
917 1
        return datetime.utcnow()
918 1
    elif vtype == VariantType.Guid:
919 1
        return uuid.uuid4()
920 1
    elif vtype == VariantType.XmlElement:
921
        return None  #Not sure this is correct
922
    elif vtype == VariantType.NodeId:
923 1
        return NodeId()
924
    elif vtype == VariantType.ExpandedNodeId:
925
        return NodeId()
926
    elif vtype == VariantType.StatusCode:
927
        return StatusCode()
928 1
    elif vtype == VariantType.QualifiedName:
929 1
        return QualifiedName()
930 1
    elif vtype == VariantType.LocalizedText:
931 1
        return LocalizedText()
932 1
    elif vtype == VariantType.ExtensionObject:
933 1
        return ExtensionObject()
934 1
    elif vtype == VariantType.DataValue:
935
        return DataValue()
936 1
    elif vtype == VariantType.Variant:
937 1
        return Variant()
938
    else:
939 1
        raise RuntimeError("function take a uatype as argument, got:", vtype)
940 1
941
942 1
# These dictionnaries are used to register extensions classes for automatic
943
# decoding and encoding
944 1
extension_object_classes = {}
945
extension_object_ids = {}
946 1
947 1
948
def register_extension_object(name, nodeid, class_type):
949 1
    """
950
    Register a new extension object for automatic decoding and make them available in ua module
951
    """
952 1
    logger.warning("registring new extension object: %s %s %s", name, nodeid, class_type)
953
    extension_object_classes[nodeid] = class_type
954
    extension_object_ids[name] = nodeid
955 1
    # FIXME: Next line is not exactly a Python best practices, so feel free to propose something else
956
    # add new extensions objects to ua modules to automate decoding
957
    import opcua.ua
958
    setattr(opcua.ua, name, class_type)
959
960
961
def get_extensionobject_class_type(typeid):
962
    """
963
    Returns the registered class type for typid of an extension object
964
    """
965
    if typeid in extension_object_classes:
966
        return extension_object_classes[typeid]
967
    else:
968
        return None
969