Completed
Pull Request — master (#296)
by Olivier
04:13
created

pack_datetime()   A

Complexity

Conditions 1

Size

Total Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
dl 0
loc 3
ccs 2
cts 2
cp 1
crap 1
rs 10
c 0
b 0
f 0
1
"""
2
implement ua datatypes
3
"""
4 1
import logging
5 1
import struct
6 1
from enum import Enum, IntEnum, EnumMeta
7 1
from datetime import datetime
8 1
import sys
9 1
import os
10 1
import uuid
11 1
import re
12 1
import itertools
13 1
if sys.version_info.major > 2:
14 1
    unicode = str
15
16 1
from opcua.ua import ua_binary as uabin
17 1
from opcua.ua import status_codes
18 1
from opcua.ua import ObjectIds
19 1
from opcua.common.uaerrors import UaError
20
from opcua.common.uaerrors import UaStatusCodeError
21
from opcua.common.uaerrors import UaStringParsingError
22 1
23
24
logger = logging.getLogger('opcua.uaprotocol')
25
26 1
27
def get_win_epoch():
28
    return uabin.win_epoch_to_datetime(0)
29 1
30 1
31 1
class _FrozenClass(object):
32
33
    """
34 1
    make it impossible to add members to a class.
35
    This is a hack since I found out that most bugs are due to misspelling a variable in protocol
36
    """
37
    _freeze = False
38 1
39
    def __setattr__(self, key, value):
40
        if self._freeze and not hasattr(self, key):
41 1
            raise TypeError("Error adding member '{}' to class '{}', class is frozen, members are {}".format(
42
                key, self.__class__.__name__, self.__dict__.keys()))
43
        object.__setattr__(self, key, value)
44 1
45 1
if "PYOPCUA_NO_TYPO_CHECK" in os.environ:
46
    # typo check is cpu consuming, but it will make debug easy.
47
    # if typo check is not need (in production), please set env PYOPCUA_NO_TYPO_CHECK.
48
    # this will make all uatype class inherit from object intead of _FrozenClass
49 1
    # and skip the typo check.
50 1
    FrozenClass = object
51 1
else:
52 1
    FrozenClass = _FrozenClass
53 1
54
55
class ValueRank(IntEnum):
56 1
    """
57 1
    Defines dimensions of a variable.
58 1
    This enum does not support all cases since ValueRank support any n>0
59
    but since it is an IntEnum it can be replace by a normal int
60
    """
61
    ScalarOrOneDimension = -3
62
    Any = -2
63
    Scalar = -1
64
    OneOrMoreDimensions = 0
65
    OneDimension = 1
66 1
    # the next names are not in spec but so common we express them here
67
    TwoDimensions = 2
68
    ThreeDimensions = 3
69 1
    FourDimensions = 4
70
71
72
class _MaskEnum(IntEnum):
73 1
74 1
    @classmethod
75
    def parse_bitfield(cls, the_int):
76
        """ Take an integer and interpret it as a set of enum values. """
77 1
        assert isinstance(the_int, int)
78
79
        return {cls(b) for b in cls._bits(the_int)}
80 1
81
    @classmethod
82
    def to_bitfield(cls, collection):
83 1
        """ Takes some enum values and creates an integer from them. """
84 1
        # make sure all elements are of the correct type (use itertools.tee in case we get passed an
85 1
        # iterator)
86
        iter1, iter2 = itertools.tee(iter(collection))
87 1
        assert all(isinstance(x, cls) for x in iter1)
88
89
        return sum(x.mask for x in iter2)
90 1
91 1
    @property
92
    def mask(self):
93 1
        return 1 << self.value
94 1
95 1
    @staticmethod
96 1
    def _bits(n):
97 1
        """ Iterate over the bits in n.
98 1
99 1
            e.g. bits(44) yields at 2, 3, 5
100 1
        """
101 1
        assert n >= 0  # avoid infinite recursion
102 1
103
        pos = 0
104
        while n:
105 1
            if n & 0x1:
106 1
                yield pos
107 1
            n = n // 2
108
            pos += 1
109 1
110 1
111 1
class AccessLevel(_MaskEnum):
112 1
    """
113 1
    Bit index to indicate what the access level is.
114 1
115 1
    Spec Part 3, appears multiple times, e.g. paragraph 5.6.2 Variable NodeClass
116 1
    """
117 1
    CurrentRead = 0
118 1
    CurrentWrite = 1
119
    HistoryRead = 2
120
    HistoryWrite = 3
121
    SemanticChange = 4
122 1
    StatusWrite = 5
123 1
    TimestampWrite = 6
124
125 1
126
class WriteMask(_MaskEnum):
127 1
    """
128 1
    Bit index to indicate which attribute of a node is writable
129 1
130 1
    Spec Part 3, Paragraph 5.2.7 WriteMask
131 1
    """
132 1
    AccessLevel = 0
133 1
    ArrayDimensions = 1
134 1
    BrowseName = 2
135 1
    ContainsNoLoops = 3
136 1
    DataType = 4
137 1
    Description = 5
138 1
    DisplayName = 6
139 1
    EventNotifier = 7
140 1
    Executable = 8
141
    Historizing = 9
142 1
    InverseName = 10
143
    IsAbstract = 11
144
    MinimumSamplingInterval = 12
145
    NodeClass = 13
146
    NodeId = 14
147
    Symmetric = 15
148
    UserAccessLevel = 16
149
    UserExecutable = 17
150
    UserWriteMask = 18
151
    ValueRank = 19
152
    WriteMask = 20
153
    ValueForVariableType = 21
154
155
156
class EventNotifier(_MaskEnum):
157
    """
158
    Bit index to indicate how a node can be used for events.
159
160 1
    Spec Part 3, appears multiple times, e.g. Paragraph 5.4 View NodeClass
161 1
    """
162 1
    SubscribeToEvents = 0
163 1
    # Reserved        = 1
164 1
    HistoryRead = 2
165 1
    HistoryWrite = 3
166 1
167 1
168 1
class Guid(FrozenClass):
169 1
170 1
    def __init__(self):
171
        self.uuid = uuid.uuid4()
172
        self._freeze = True
173
174 1
    def to_binary(self):
175 1
        return self.uuid.bytes
176
177 1
    def __hash__(self):
178 1
        return hash(self.uuid.bytes)
179 1
180 1
    @staticmethod
181 1
    def from_binary(data):
182
        g = Guid()
183
        g.uuid = uuid.UUID(bytes=data.read(16))
184
        return g
185 1
186 1
    def __eq__(self, other):
187 1
        return isinstance(other, Guid) and self.uuid == other.uuid
188
189 1
190 1
class StatusCode(FrozenClass):
191 1
192 1
    """
193 1
    :ivar value:
194 1
    :vartype value: int
195
    :ivar name:
196 1
    :vartype name: string
197 1
    :ivar doc:
198
    :vartype doc: string
199 1
    """
200 1
201 1
    def __init__(self, value=0):
202 1
        if isinstance(value, str):
203
            self.name = value
204
            self.value = getattr(status_codes.StatusCodes, value)
205 1
        else:
206 1
            self.value = value
207 1
            self.name, self.doc = status_codes.get_name_and_doc(value)
208
        self._freeze = True
209
210 1
    def to_binary(self):
211 1
        return uabin.Primitives.UInt32.pack(self.value)
212 1
213
    @staticmethod
214
    def from_binary(data):
215 1
        val = uabin.Primitives.UInt32.unpack(data)
216 1
        sc = StatusCode(val)
217 1
        return sc
218 1
219 1
    def check(self):
220 1
        """
221 1
        Raises an exception if the status code is anything else than 0 (good).
222
223 1
        Use the is_good() method if you do not want an exception.
224
        """
225
        if self.value != 0:
226 1
            raise UaStatusCodeError(self.value)
227 1
228 1
    def is_good(self):
229 1
        """
230 1
        return True if status is Good.
231
        """
232
        if self.value == 0:
233 1
            return True
234 1
        return False
235 1
236 1
    def __str__(self):
237 1
        return 'StatusCode({})'.format(self.name)
238
    __repr__ = __str__
239
240 1
    def __eq__(self, other):
241
        return self.value == other.value
242
243 1
    def __ne__(self, other):
244
        return not self.__eq__(other)
245
246 1
247 1
class NodeIdType(Enum):
248 1
    TwoByte = 0
249
    FourByte = 1
250
    Numeric = 2
251 1
    String = 3
252 1
    Guid = 4
253 1
    ByteString = 5
254
255
256 1
class NodeId(FrozenClass):
257 1
258 1
    """
259
    NodeId Object
260
261 1
    Args:
262
        identifier: The identifier might be an int, a string, bytes or a Guid
263
        namespaceidx(int): The index of the namespace
264
        nodeidtype(NodeIdType): The type of the nodeid if it cannor be guess or you want something special like twobyte nodeid or fourbytenodeid
265
266
267 1
    :ivar Identifier:
268
    :vartype Identifier: NodeId
269 1
    :ivar NamespaceIndex:
270 1
    :vartype NamespaceIndex: Int
271
    :ivar NamespaceUri:
272 1
    :vartype NamespaceUri: String
273
    :ivar ServerIndex:
274 1
    :vartype ServerIndex: Int
275
    """
276
277
    def __init__(self, identifier=None, namespaceidx=0, nodeidtype=None):
278
        self.Identifier = identifier
279
        self.NamespaceIndex = namespaceidx
280
        self.NodeIdType = nodeidtype
281 1
        self.NamespaceUri = ""
282
        self.ServerIndex = 0
283
        self._freeze = True
284 1
        if not isinstance(self.NamespaceIndex, int):
285
            raise UaError("NamespaceIndex must be an int")
286
        if self.Identifier is None:
287
            self.Identifier = 0
288
            self.NodeIdType = NodeIdType.TwoByte
289
            return
290 1
        if self.NodeIdType is None:
291 1
            if isinstance(self.Identifier, int):
292 1
                self.NodeIdType = NodeIdType.Numeric
293 1
            elif isinstance(self.Identifier, str):
294 1
                self.NodeIdType = NodeIdType.String
295
            elif isinstance(self.Identifier, bytes):
296 1
                self.NodeIdType = NodeIdType.ByteString
297 1
            else:
298 1
                raise UaError("NodeId: Could not guess type of NodeId, set NodeIdType")
299
300
    def __key(self):
301 1
        if self.NodeIdType in (NodeIdType.TwoByte, NodeIdType.FourByte, NodeIdType.Numeric):  # twobyte, fourbyte and numeric may represent the same node
302
            return self.NamespaceIndex, self.Identifier
303
        else:
304 1
            return self.NodeIdType, self.NamespaceIndex, self.Identifier
305 1
306 1
    def __eq__(self, node):
307 1
        return isinstance(node, NodeId) and self.__key() == node.__key()
308 1
309
    def __ne__(self, other):
310
        return not self.__eq__(other)
311 1
312
    def __hash__(self):
313
        return hash(self.__key())
314
315 1
    def is_null(self):
316 1
        if self.NamespaceIndex != 0:
317 1
            return False
318 1
        return self.has_null_identifier()
319 1
320
    def has_null_identifier(self):
321
        if not self.Identifier:
322 1
            return True
323
        if self.NodeIdType == NodeIdType.Guid and re.match(b'0.', self.Identifier):
324 1
            return True
325 1
        return False
326 1
327
    @staticmethod
328 1
    def from_string(string):
329 1
        try:
330
            return NodeId._from_string(string)
331 1
        except ValueError as ex:
332
            raise UaStringParsingError("Error parsing string {}".format(string), ex)
333
334 1
    @staticmethod
335
    def _from_string(string):
336 1
        l = string.split(";")
337 1
        identifier = None
338 1
        namespace = 0
339
        ntype = None
340 1
        srv = None
341 1
        nsu = None
342
        for el in l:
343
            if not el:
344 1
                continue
345
            k, v = el.split("=", 1)
346
            k = k.strip()
347
            v = v.strip()
348
            if k == "ns":
349
                namespace = int(v)
350
            elif k == "i":
351
                ntype = NodeIdType.Numeric
352
                identifier = int(v)
353
            elif k == "s":
354
                ntype = NodeIdType.String
355 1
                identifier = v
356 1
            elif k == "g":
357 1
                ntype = NodeIdType.Guid
358 1
                identifier = v
359
            elif k == "b":
360 1
                ntype = NodeIdType.ByteString
361 1
                identifier = v
362
            elif k == "srv":
363 1
                srv = v
364
            elif k == "nsu":
365 1
                nsu = v
366 1
        if identifier is None:
367 1
            raise UaStringParsingError("Could not find identifier in string: " + string)
368
        nodeid = NodeId(identifier, namespace, ntype)
369 1
        nodeid.NamespaceUri = nsu
370
        nodeid.ServerIndex = srv
371
        return nodeid
372
373
    def to_string(self):
374 1
        string = ""
375 1
        if self.NamespaceIndex != 0:
376
            string += "ns={};".format(self.NamespaceIndex)
377 1
        ntype = None
378
        if self.NodeIdType == NodeIdType.Numeric:
379
            ntype = "i"
380
        elif self.NodeIdType == NodeIdType.String:
381 1
            ntype = "s"
382 1
        elif self.NodeIdType == NodeIdType.TwoByte:
383 1
            ntype = "i"
384
        elif self.NodeIdType == NodeIdType.FourByte:
385 1
            ntype = "i"
386
        elif self.NodeIdType == NodeIdType.Guid:
387 1
            ntype = "g"
388
        elif self.NodeIdType == NodeIdType.ByteString:
389 1
            ntype = "b"
390
        string += "{}={}".format(ntype, self.Identifier)
391
        if self.ServerIndex:
392 1
            string = "srv=" + str(self.ServerIndex) + string
393
        if self.NamespaceUri:
394
            string += "nsu={}".format(self.NamespaceUri)
395
        return string
396 1
397 1
    def __str__(self):
398 1
        return "{}NodeId({})".format(self.NodeIdType.name, self.to_string())
399 1
    __repr__ = __str__
400 1
401 1
    def to_binary(self):
402 1
        if self.NodeIdType == NodeIdType.TwoByte:
403
            return struct.pack("<BB", self.NodeIdType.value, self.Identifier)
404
        elif self.NodeIdType == NodeIdType.FourByte:
405 1
            return struct.pack("<BBH", self.NodeIdType.value, self.NamespaceIndex, self.Identifier)
406
        elif self.NodeIdType == NodeIdType.Numeric:
407
            return struct.pack("<BHI", self.NodeIdType.value, self.NamespaceIndex, self.Identifier)
408
        elif self.NodeIdType == NodeIdType.String:
409
            return struct.pack("<BH", self.NodeIdType.value, self.NamespaceIndex) + \
410
                uabin.Primitives.String.pack(self.Identifier)
411
        elif self.NodeIdType == NodeIdType.ByteString:
412
            return struct.pack("<BH", self.NodeIdType.value, self.NamespaceIndex) + \
413
                uabin.Primitives.Bytes.pack(self.Identifier)
414
        else:
415
            return struct.pack("<BH", self.NodeIdType.value, self.NamespaceIndex) + \
416
                self.Identifier.to_binary()
417
        # FIXME: Missing NNamespaceURI and ServerIndex
418
419
    @staticmethod
420
    def from_binary(data):
421
        nid = NodeId()
422
        encoding = ord(data.read(1))
423
        nid.NodeIdType = NodeIdType(encoding & 0b00111111)
424
425
        if nid.NodeIdType == NodeIdType.TwoByte:
426 1
            nid.Identifier = ord(data.read(1))
427 1
        elif nid.NodeIdType == NodeIdType.FourByte:
428 1
            nid.NamespaceIndex, nid.Identifier = struct.unpack("<BH", data.read(3))
429 1
        elif nid.NodeIdType == NodeIdType.Numeric:
430 1
            nid.NamespaceIndex, nid.Identifier = struct.unpack("<HI", data.read(6))
431 1
        elif nid.NodeIdType == NodeIdType.String:
432 1
            nid.NamespaceIndex = uabin.Primitives.UInt16.unpack(data)
433 1
            nid.Identifier = uabin.Primitives.String.unpack(data)
434 1
        elif nid.NodeIdType == NodeIdType.ByteString:
435 1
            nid.NamespaceIndex = uabin.Primitives.UInt16.unpack(data)
436 1
            nid.Identifier = uabin.Primitives.Bytes.unpack(data)
437 1
        elif nid.NodeIdType == NodeIdType.Guid:
438 1
            nid.NamespaceIndex = uabin.Primitives.UInt16.unpack(data)
439 1
            nid.Identifier = Guid.from_binary(data)
440 1
        else:
441 1
            raise UaError("Unknown NodeId encoding: " + str(nid.NodeIdType))
442 1
443 1
        if uabin.test_bit(encoding, 7):
444
            nid.NamespaceUri = uabin.Primitives.String.unpack(data)
445
        if uabin.test_bit(encoding, 6):
446
            nid.ServerIndex = uabin.Primitives.UInt32.unpack(data)
447
448
        return nid
449 1
450 1
451 1
class TwoByteNodeId(NodeId):
452
453 1
    def __init__(self, identifier):
454
        NodeId.__init__(self, identifier, 0, NodeIdType.TwoByte)
455 1
456 1
457
class FourByteNodeId(NodeId):
458 1
459 1
    def __init__(self, identifier, namespace=0):
460
        NodeId.__init__(self, identifier, namespace, NodeIdType.FourByte)
461 1
462 1
463
class NumericNodeId(NodeId):
464 1
465 1
    def __init__(self, identifier, namespace=0):
466 1
        NodeId.__init__(self, identifier, namespace, NodeIdType.Numeric)
467 1
468
469 1
class ByteStringNodeId(NodeId):
470 1
471 1
    def __init__(self, identifier, namespace=0):
472 1
        NodeId.__init__(self, identifier, namespace, NodeIdType.ByteString)
473 1
474 1
475
class GuidNodeId(NodeId):
476 1
477
    def __init__(self, identifier, namespace=0):
478 1
        NodeId.__init__(self, identifier, namespace, NodeIdType.Guid)
479 1
480 1
481 1
class StringNodeId(NodeId):
482
483 1
    def __init__(self, identifier, namespace=0):
484
        NodeId.__init__(self, identifier, namespace, NodeIdType.String)
485 1
486 1
487 1
ExpandedNodeId = NodeId
488 1
489 1
490 1
class QualifiedName(FrozenClass):
491 1
492 1
    '''
493 1
    A string qualified with a namespace index.
494 1
    '''
495 1
496 1
    def __init__(self, name=None, namespaceidx=0):
497 1
        if not isinstance(namespaceidx, int):
498 1
            raise UaError("namespaceidx must be an int")
499 1
        self.NamespaceIndex = namespaceidx
500 1
        self.Name = name
501 1
        self._freeze = True
502 1
503 1
    def to_string(self):
504 1
        return "{}:{}".format(self.NamespaceIndex, self.Name)
505 1
506
    @staticmethod
507
    def from_string(string):
508 1
        if ":" in string:
509
            try:
510
                idx, name = string.split(":", 1)
511 1
                idx = int(idx)
512 1
            except (TypeError, ValueError) as ex:
513
                raise UaStringParsingError("Error parsing string {}".format(string), ex)
514
        else:
515 1
            idx = 0
516 1
            name = string
517 1
        return QualifiedName(name, idx)
518 1
519 1
    def to_binary(self):
520 1
        packet = []
521
        packet.append(uabin.Primitives.UInt16.pack(self.NamespaceIndex))
522 1
        packet.append(uabin.Primitives.String.pack(self.Name))
523 1
        return b''.join(packet)
524 1
525 1
    @staticmethod
526 1
    def from_binary(data):
527 1
        obj = QualifiedName()
528 1
        obj.NamespaceIndex = uabin.Primitives.UInt16.unpack(data)
529 1
        obj.Name = uabin.Primitives.String.unpack(data)
530 1
        return obj
531 1
532 1
    def __eq__(self, bname):
533 1
        return isinstance(bname, QualifiedName) and self.Name == bname.Name and self.NamespaceIndex == bname.NamespaceIndex
534 1
535
    def __ne__(self, other):
536
        return not self.__eq__(other)
537
538
    def __lt__(self, other):
539 1
        if not isinstance(other, QualifiedName):
540 1
            raise TypeError("Cannot compare QualifiedName and {}".format(other))
541
        if self.NamespaceIndex == other.NamespaceIndex:
542 1
            return self.Name < other.Name
543
        else:
544 1
            return self.NamespaceIndex < other.NamespaceIndex
545
546 1
    def __str__(self):
547 1
        return 'QualifiedName({}:{})'.format(self.NamespaceIndex, self.Name)
548 1
549
    __repr__ = __str__
550 1
551 1
552 1
class LocalizedText(FrozenClass):
553 1
554 1
    '''
555 1
    A string qualified with a namespace index.
556 1
    '''
557 1
558 1
    ua_types = {
559
        "Text": "Bytes",
560 1
        "Locale": "Bytes"
561 1
    }
562
563
    def __init__(self, text=None):
564 1
        self.Encoding = 0
565
        self.Text = text
566
        if isinstance(self.Text, unicode):
567
            self.Text = self.Text.encode('utf-8')
568 1
        if self.Text:
569
            self.Encoding |= (1 << 1)
570 1
        self.Locale = None
571 1
        self._freeze = True
572 1
573
    def to_binary(self):
574 1
        packet = []
575 1
        if self.Locale:
576 1
            self.Encoding |= (1 << 0)
577 1
        if self.Text:
578 1
            self.Encoding |= (1 << 1)
579 1
        packet.append(uabin.Primitives.UInt8.pack(self.Encoding))
580 1
        if self.Locale:
581 1
            packet.append(uabin.Primitives.Bytes.pack(self.Locale))
582 1
        if self.Text:
583 1
            packet.append(uabin.Primitives.Bytes.pack(self.Text))
584 1
        return b''.join(packet)
585 1
586 1
    @staticmethod
587 1
    def from_binary(data):
588 1
        obj = LocalizedText()
589
        obj.Encoding = ord(data.read(1))
590
        if obj.Encoding & (1 << 0):
591
            obj.Locale = uabin.Primitives.Bytes.unpack(data)
592 1
        if obj.Encoding & (1 << 1):
593
            obj.Text = uabin.Primitives.Bytes.unpack(data)
594 1
        return obj
595
596
    def to_string(self):
597 1
        # FIXME: use local
598
        if self.Text is None:
599
            return ""
600 1
        return self.Text.decode('utf-8')
601
602 1
    def __str__(self):
603 1
        return 'LocalizedText(' + 'Encoding:' + str(self.Encoding) + ', ' + \
604
            'Locale:' + str(self.Locale) + ', ' + \
605
            'Text:' + str(self.Text) + ')'
606 1
    __repr__ = __str__
607
608 1
    def __eq__(self, other):
609 1
        if isinstance(other, LocalizedText) and self.Locale == other.Locale and self.Text == other.Text:
610
            return True
611
        return False
612 1
613
    def __ne__(self, other):
614 1
        return not self.__eq__(other)
615 1
616
617
class ExtensionObject(FrozenClass):
618 1
619
    '''
620 1
621 1
    Any UA object packed as an ExtensionObject
622
623
624 1
    :ivar TypeId:
625
    :vartype TypeId: NodeId
626 1
    :ivar Body:
627 1
    :vartype Body: bytes
628
629
    '''
630 1
631
    def __init__(self):
632 1
        self.TypeId = NodeId()
633 1
        self.Encoding = 0
634
        self.Body = b''
635
        self._freeze = True
636 1
637
    def to_binary(self):
638
        packet = []
639 1
        if self.Body:
640
            self.Encoding |= (1 << 0)
641
        packet.append(self.TypeId.to_binary())
642
        packet.append(uabin.Primitives.UInt8.pack(self.Encoding))
643
        if self.Body:
644
            packet.append(uabin.Primitives.ByteString.pack(self.Body))
645 1
        return b''.join(packet)
646 1
647
    @staticmethod
648 1
    def from_binary(data):
649 1
        obj = ExtensionObject()
650 1
        obj.TypeId = NodeId.from_binary(data)
651
        obj.Encoding = uabin.Primitives.UInt8.unpack(data)
652 1
        if obj.Encoding & (1 << 0):
653 1
            obj.Body = uabin.Primitives.ByteString.unpack(data)
654
        return obj
655 1
656
    @staticmethod
657 1
    def from_object(obj):
658 1
        ext = ExtensionObject()
659 1
        oid = getattr(ObjectIds, "{}_Encoding_DefaultBinary".format(obj.__class__.__name__))
660 1
        ext.TypeId = FourByteNodeId(oid)
661 1
        ext.Body = obj.to_binary()
662 1
        return ext
663
664 1
    def __str__(self):
665 1
        return 'ExtensionObject(' + 'TypeId:' + str(self.TypeId) + ', ' + \
666 1
            'Encoding:' + str(self.Encoding) + ', ' + str(len(self.Body)) + ' bytes)'
667
668 1
    __repr__ = __str__
669 1
670 1
671 1
class VariantType(Enum):
672 1
673
    '''
674 1
    The possible types of a variant.
675
676 1
    :ivar Null:
677 1
    :ivar Boolean:
678 1
    :ivar SByte:
679 1
    :ivar Byte:
680
    :ivar Int16:
681 1
    :ivar UInt16:
682 1
    :ivar Int32:
683
    :ivar UInt32:
684 1
    :ivar Int64:
685
    :ivar UInt64:
686
    :ivar Float:
687 1
    :ivar Double:
688
    :ivar String:
689
    :ivar DateTime:
690 1
    :ivar Guid:
691
    :ivar ByteString:
692
    :ivar XmlElement:
693 1
    :ivar NodeId:
694
    :ivar ExpandedNodeId:
695
    :ivar StatusCode:
696
    :ivar QualifiedName:
697
    :ivar LocalizedText:
698
    :ivar ExtensionObject:
699 1
    :ivar DataValue:
700 1
    :ivar Variant:
701 1
    :ivar DiagnosticInfo:
702 1
703 1
704 1
705 1
    '''
706 1
    Null = 0
707 1
    Boolean = 1
708
    SByte = 2
709 1
    Byte = 3
710 1
    Int16 = 4
711 1
    UInt16 = 5
712
    Int32 = 6
713 1
    UInt32 = 7
714 1
    Int64 = 8
715 1
    UInt64 = 9
716 1
    Float = 10
717
    Double = 11
718 1
    String = 12
719 1
    DateTime = 13
720 1
    Guid = 14
721
    ByteString = 15
722 1
    XmlElement = 16
723
    NodeId = 17
724 1
    ExpandedNodeId = 18
725 1
    StatusCode = 19
726 1
    QualifiedName = 20
727
    LocalizedText = 21
728 1
    ExtensionObject = 22
729 1
    DataValue = 23
730 1
    Variant = 24
731
    DiagnosticInfo = 25
732 1
733
734
class VariantTypeCustom(object):
735
    """
736 1
    Looks like sometime we get variant with other values than those
737
    defined in VariantType.
738
    FIXME: We should not need this class, as far as I iunderstand the spec
739
    variants can only be of VariantType
740 1
    """
741
742 1
    def __init__(self, val):
743 1
        self.name = "Custom"
744 1
        self.value = val
745 1
        if self.value > 0b00111111:
746
            raise UaError("Cannot create VariantType. VariantType must be {} > x > {}, received {}".format(0b111111, 25, val))
747 1
748 1
    def __str__(self):
749
        return "VariantType.Custom:{}".format(self.value)
750
    __repr__ = __str__
751 1
752
    def __eq__(self, other):
753
        return self.value == other.value
754
755
756
class Variant(FrozenClass):
757
758
    """
759
    Create an OPC-UA Variant object.
760
    if no argument a Null Variant is created.
761
    if not variant type is given, attemps to guess type from python type
762
    if a variant is given as value, the new objects becomes a copy of the argument
763
764
    :ivar Value:
765 1
    :vartype Value: Any supported type
766 1
    :ivar VariantType:
767 1
    :vartype VariantType: VariantType
768 1
    """
769 1
770
    def __init__(self, value=None, varianttype=None, dimensions=None):
771 1
        self.Value = value
772 1
        self.VariantType = varianttype
773 1
        self.Dimensions = dimensions
774 1
        self._freeze = True
775 1
        if isinstance(value, Variant):
776 1
            self.Value = value.Value
777 1
            self.VariantType = value.VariantType
778 1
        if self.VariantType is None:
779 1
            self.VariantType = self._guess_type(self.Value)
780
        if self.Dimensions is None and type(self.Value) in (list, tuple):
781 1
            dims = get_shape(self.Value)
782
            if len(dims) > 1:
783
                self.Dimensions = dims
784
785
    def __eq__(self, other):
786
        if isinstance(other, Variant) and self.VariantType == other.VariantType and self.Value == other.Value:
787
            return True
788
        return False
789
790 1
    def __ne__(self, other):
791
        return not self.__eq__(other)
792
793
    def _guess_type(self, val):
794
        if isinstance(val, (list, tuple)):
795
            error_val = val
796
        while isinstance(val, (list, tuple)):
797
            if len(val) == 0:
798 1
                raise UaError("could not guess UA type of variable {}".format(error_val))
799
            val = val[0]
800
        if val is None:
801
            return VariantType.Null
802 1
        elif isinstance(val, bool):
803
            return VariantType.Boolean
804
        elif isinstance(val, float):
805 1 View Code Duplication
            return VariantType.Double
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
806
        elif isinstance(val, int):
807
            return VariantType.Int64
808
        elif type(val) in (str, unicode):
809
            return VariantType.String
810
        elif isinstance(val, bytes):
811
            return VariantType.ByteString
812
        elif isinstance(val, datetime):
813
            return VariantType.DateTime
814
        else:
815
            if isinstance(val, object):
816
                try:
817
                    return getattr(VariantType, val.__class__.__name__)
818
                except AttributeError:
819
                    return VariantType.ExtensionObject
820
            else:
821
                raise UaError("Could not guess UA type of {} with type {}, specify UA type".format(val, type(val)))
822
823
    def __str__(self):
824
        return "Variant(val:{!s},type:{})".format(self.Value, self.VariantType)
825
    __repr__ = __str__
826
827
    def to_binary(self):
828
        b = []
829
        encoding = self.VariantType.value & 0b111111
830
        if type(self.Value) in (list, tuple):
831
            if self.Dimensions is not None:
832
                encoding = uabin.set_bit(encoding, 6)
833
            encoding = uabin.set_bit(encoding, 7)
834
            b.append(uabin.Primitives.UInt8.pack(encoding))
835
            b.append(uabin.pack_uatype_array(self.VariantType, flatten(self.Value)))
836
            if self.Dimensions is not None:
837
                b.append(uabin.pack_uatype_array(VariantType.Int32, self.Dimensions))
838
        else:
839
            b.append(uabin.Primitives.UInt8.pack(encoding))
840 1
            b.append(uabin.pack_uatype(self.VariantType, self.Value))
841 1
842 1
        return b"".join(b)
843 1
844 1
    @staticmethod
845 1
    def from_binary(data):
846 1
        dimensions = None
847 1
        encoding = ord(data.read(1))
848 1
        int_type = encoding & 0b00111111
849 1
        vtype = DataType_to_VariantType(int_type)
850 1
        if vtype == VariantType.Null:
851 1
            return Variant(None, vtype, encoding)
852 1
        if uabin.test_bit(encoding, 7):
853 1
            value = uabin.unpack_uatype_array(vtype, data)
854 1
        else:
855 1
            value = uabin.unpack_uatype(vtype, data)
856 1
        if uabin.test_bit(encoding, 6):
857 1
            dimensions = uabin.unpack_uatype_array(VariantType.Int32, data)
858 1
            value = reshape(value, dimensions)
859 1
860 1
        return Variant(value, vtype, dimensions)
861 1
862 1
863 1
def reshape(flat, dims):
864 1
    subdims = dims[1:]
865 1
    subsize = 1
866
    for i in subdims:
867
        if i == 0:
868 1
            i = 1
869 1
        subsize *= i
870 1
    while dims[0] * subsize > len(flat):
871 1
        flat.append([])
872 1
    if not subdims or subdims == [0]:
873 1
        return flat
874
    return [reshape(flat[i: i + subsize], subdims) for i in range(0, len(flat), subsize)]
875 1
876
877 1
def _split_list(l, n):
878
    n = max(1, n)
879 1
    return [l[i:i + n] for i in range(0, len(l), n)]
880 1
881
882
def flatten_and_get_shape(mylist):
883 1
    dims = []
884
    dims.append(len(mylist))
885
    while isinstance(mylist[0], (list, tuple)):
886
        dims.append(len(mylist[0]))
887
        mylist = [item for sublist in mylist for item in sublist]
888
        if len(mylist) == 0:
889
            break
890
    return mylist, dims
891
892
893
def flatten(mylist):
894
    if len(mylist) == 0:
895
        return mylist
896
    while isinstance(mylist[0], (list, tuple)):
897 1
        mylist = [item for sublist in mylist for item in sublist]
898 1
        if len(mylist) == 0:
899 1
            break
900 1
    return mylist
901 1
902 1
903 1
def get_shape(mylist):
904 1
    dims = []
905 1
    while isinstance(mylist, (list, tuple)):
906 1
        dims.append(len(mylist))
907 1
        if len(mylist) == 0:
908 1
            break
909 1
        mylist = mylist[0]
910 1
    return dims
911
912 1
913 1
class XmlElement(FrozenClass):
914 1
    '''
915 1
    An XML element encoded as an UTF-8 string.
916
    '''
917 1
918 1
    def __init__(self, binary=None):
919
        if binary is not None:
920 1
            self._binary_init(binary)
921 1
            self._freeze = True
922 1
            return
923 1
        self.Value = []
924 1
        self._freeze = True
925
926 1
    def to_binary(self):
927 1
        return uabin.Primitives.String.pack(self.Value)
928 1
929 1
    @staticmethod
930 1
    def from_binary(data):
931 1
        return XmlElement(data)
932 1
933 1
    def _binary_init(self, data):
934 1
        self.Value = uabin.Primitives.String.unpack(data)
935 1
936 1
    def __str__(self):
937 1
        return 'XmlElement(Value:' + str(self.Value) + ')'
938 1
939 1
    __repr__ = __str__
940 1
941
942 1
class DataValue(FrozenClass):
943 1
944 1
    '''
945 1
    A value with an associated timestamp, and quality.
946 1
    Automatically generated from xml , copied and modified here to fix errors in xml spec
947
948
    :ivar Value:
949
    :vartype Value: Variant
950 1
    :ivar StatusCode:
951 1
    :vartype StatusCode: StatusCode
952 1
    :ivar SourceTimestamp:
953
    :vartype SourceTimestamp: datetime
954 1
    :ivar SourcePicoSeconds:
955 1
    :vartype SourcePicoSeconds: int
956 1
    :ivar ServerTimestamp:
957 1
    :vartype ServerTimestamp: datetime
958 1
    :ivar ServerPicoseconds:
959 1
    :vartype ServerPicoseconds: int
960 1
961 1
    '''
962 1
963 1
    def __init__(self, variant=None, status=None):
964 1
        self.Encoding = 0
965
        if not isinstance(variant, Variant):
966 1
            variant = Variant(variant)
967 1
        self.Value = variant
968
        if status is None:
969 1
            self.StatusCode = StatusCode()
970
        else:
971 1
            self.StatusCode = status
972
        self.SourceTimestamp = None  # DateTime()
973 1
        self.SourcePicoseconds = None
974 1
        self.ServerTimestamp = None  # DateTime()
975 1
        self.ServerPicoseconds = None
976 1
        self._freeze = True
977 1
978
    def to_binary(self):
979 1
        packet = []
980 1
        if self.Value:
981 1
            self.Encoding |= (1 << 0)
982 1
        if self.StatusCode:
983 1
            self.Encoding |= (1 << 1)
984
        if self.SourceTimestamp:
985 1
            self.Encoding |= (1 << 2)
986 1
        if self.ServerTimestamp:
987 1
            self.Encoding |= (1 << 3)
988 1
        if self.SourcePicoseconds:
989
            self.Encoding |= (1 << 4)
990 1
        if self.ServerPicoseconds:
991
            self.Encoding |= (1 << 5)
992
        packet.append(uabin.Primitives.UInt8.pack(self.Encoding))
993 1
        if self.Value:
994 1
            packet.append(self.Value.to_binary())
995 1
        if self.StatusCode:
996 1
            packet.append(self.StatusCode.to_binary())
997 1
        if self.SourceTimestamp:
998 1
            packet.append(uabin.Primitives.DateTime.pack(self.SourceTimestamp))  # self.SourceTimestamp.to_binary())
999 1
        if self.ServerTimestamp:
1000 1
            packet.append(uabin.Primitives.DateTime.pack(self.ServerTimestamp))  # self.ServerTimestamp.to_binary())
1001 1
        if self.SourcePicoseconds:
1002 1
            packet.append(uabin.Primitives.UInt16.pack(self.SourcePicoseconds))
1003 1
        if self.ServerPicoseconds:
1004 1
            packet.append(uabin.Primitives.UInt16.pack(self.ServerPicoseconds))
1005
        return b''.join(packet)
1006
1007 1
    @staticmethod
1008
    def from_binary(data):
1009
        encoding = ord(data.read(1))
1010
        if encoding & (1 << 0):
1011
            value = Variant.from_binary(data)
1012 1
        else:
1013
            value = None
1014
        if encoding & (1 << 1):
1015
            status = StatusCode.from_binary(data)
1016
        else:
1017
            status = None
1018
        obj = DataValue(value, status)
1019
        obj.Encoding = encoding
1020
        if obj.Encoding & (1 << 2):
1021
            obj.SourceTimestamp = uabin.Primitives.DateTime.unpack(data)  # DateTime.from_binary(data)
1022
        if obj.Encoding & (1 << 3):
1023 1
            obj.ServerTimestamp = uabin.Primitives.DateTime.unpack(data)  # DateTime.from_binary(data)
1024 1
        if obj.Encoding & (1 << 4):
1025 1
            obj.SourcePicoseconds = uabin.Primitives.UInt16.unpack(data)
1026 1
        if obj.Encoding & (1 << 5):
1027 1
            obj.ServerPicoseconds = uabin.Primitives.UInt16.unpack(data)
1028 1
        return obj
1029 1
1030 1
    def __str__(self):
1031
        s = 'DataValue(Value:{}'.format(self.Value)
1032
        if self.StatusCode is not None:
1033 1
            s += ', StatusCode:{}'.format(self.StatusCode)
1034 1
        if self.SourceTimestamp is not None:
1035 1
            s += ', SourceTimestamp:{}'.format(self.SourceTimestamp)
1036 1
        if self.ServerTimestamp is not None:
1037 1
            s += ', ServerTimestamp:{}'.format(self.ServerTimestamp)
1038 1
        if self.SourcePicoseconds is not None:
1039 1
            s += ', SourcePicoseconds:{}'.format(self.SourcePicoseconds)
1040 1
        if self.ServerPicoseconds is not None:
1041
            s += ', ServerPicoseconds:{}'.format(self.ServerPicoseconds)
1042
        s += ')'
1043 1
        return s
1044
1045
    __repr__ = __str__
1046
1047 1
1048
def DataType_to_VariantType(int_type):
1049
    """
1050
    Takes a NodeId or int and return a VariantType
1051
    This is only supported if int_type < 63 due to VariantType encoding
1052
    At low level we do not have access to address space thus decodig is limited
1053
    a better version of this method can be find in ua_utils.py
1054
    """
1055 1
    if isinstance(int_type, NodeId):
1056
        int_type = int_type.Identifier
1057
1058 1
    if int_type <= 25:
1059
        return VariantType(int_type)
1060
    else:
1061
        return VariantTypeCustom(int_type)
1062