Passed
Push — dev ( 454bf6...555c2d )
by Olivier
04:48 queued 02:26
created

opcua.NodeData.__init__()   B

Complexity

Conditions 1

Size

Total Lines 31

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 25
CRAP Score 1
Metric Value
dl 0
loc 31
ccs 25
cts 25
cp 1
rs 8.8571
cc 1
crap 1
1
"""
2
parse xml file from opcua-spec
3
"""
4 1
import logging
5 1
import re
6 1
import sys
7
8 1
import xml.etree.ElementTree as ET
9
10
11 1
class NodeData(object):
12
13 1
    def __init__(self):
14 1
        self.nodetype = None
15 1
        self.nodeid = None
16 1
        self.browsename = None
17 1
        self.displayname = None
18 1
        self.symname = None  # FIXME: this param is never used, why?
19 1
        self.parent = None
20 1
        self.parentlink = None
21 1
        self.desc = ""
22 1
        self.typedef = None
23 1
        self.refs = []
24 1
        self.nodeclass = None
25 1
        self.eventnotifier = 0
26
27
        # variable
28 1
        self.datatype = None
29 1
        self.rank = -1  # check default value
30 1
        self.value = []
31 1
        self.valuetype = None
32 1
        self.dimensions = None
33 1
        self.accesslevel = None
34 1
        self.useraccesslevel = None
35 1
        self.minsample = None
36
37
        # referencetype
38 1
        self.inversename = ""
39 1
        self.abstract = "false"
40 1
        self.symmetric = "false"
41
42
        # datatype
43 1
        self.definition = []
44
45
46 1
class RefStruct(object):
47
48 1
    def __init__(self):
49
        self.reftype = None
50
        self.forward = True
51
        self.target = None
52
53
54 1
class XMLParser(object):
55
56 1
    def __init__(self, xmlpath):
57 1
        self.logger = logging.getLogger(__name__)
58 1
        self._retag = re.compile(r"(\{.*\})(.*)")
59 1
        self.path = xmlpath
60 1
        self.aliases = {}
61
62 1
        self.tree = ET.parse(xmlpath)
63 1
        self.root = self.tree.getroot()
64 1
        self.it = None
65
66 1
    def __iter__(self):
67 1
        self.it = iter(self.root)
68 1
        return self
69
70 1
    def __next__(self):
71 1
        while True:
72 1
            if sys.version_info[0] < 3:
73
                child = self.it.next()
74
            else:
75 1
                child = self.it.__next__()
76 1
            name = self._retag.match(child.tag).groups()[1]
77 1
            if name == "Aliases":
78
                for el in child:
79
                    self.aliases[el.attrib["Alias"]] = el.text
80
            else:
81 1
                node = self._parse_node(name, child)
82 1
                return node
83
84 1
    def next(self):  # support for python2
85
        return self.__next__()
86
87 1
    def _parse_node(self, name, child):
88 1
        obj = NodeData()
89 1
        obj.nodetype = name
90 1
        for key, val in child.attrib.items():
91 1
            self._set_attr(key, val, obj)
92 1
        obj.displayname = obj.browsename  # give a default value to display name
93 1
        for el in child:
94 1
            self._parse_tag(el, obj)
95 1
        return obj
96
97 1
    def _set_attr(self, key, val, obj):
98 1
        if key == "NodeId":
99 1
            obj.nodeid = val
100 1
        elif key == "BrowseName":
101 1
            obj.browsename = val
102 1
        elif key == "SymbolicName":
103
            obj.symname = val
104 1
        elif key == "ParentNodeId":
105
            obj.parent = val
106 1
        elif key == "DataType":
107 1
            obj.datatype = val
108
        elif key == "IsAbstract":
109
            obj.abstract = val
110
        elif key == "EventNotifier":
111
            print("Notfiier", key, val)
112
            obj.eventnotifier = 1 if val == "1" else 0
113
        elif key == "ValueRank":
114
            obj.rank = int(val)
115
        elif key == "ArrayDimensions":
116
            obj.dimensions = [int(i) for i in val.split(",")]
117
        elif key == "MinimumSamplingInterval":
118
            obj.minsample = int(val)
119
        elif key == "AccessLevel":
120
            obj.accesslevel = int(val)
121
        elif key == "UserAccessLevel":
122
            obj.useraccesslevel = int(val)
123
        elif key == "Symmetric":
124
            obj.symmetric = True if val == "true" else False
125
        else:
126
            self.logger.info("Attribute not implemented: %s:%s", key, val)
127
128 1
    def _parse_tag(self, el, obj):
129 1
        tag = self._retag.match(el.tag).groups()[1]
130
131 1
        if tag == "DisplayName":
132
            obj.displayname = el.text
133 1
        elif tag == "Description":
134 1
            obj.desc = el.text
135 1
        elif tag == "References":
136 1
            self._parse_refs(el, obj)
137 1
        elif tag == "Value":
138 1
            self._parse_value(el, obj)
139
        elif tag == "InverseName":
140
            obj.inversename = el.text
141
        elif tag == "Definition":
142
            for field in el:
143
                obj.definition.append(field)
144
        else:
145
            self.logger.info("Not implemented tag: %s", el)
146
147 1
    def _parse_value(self, el, obj):
148 1
        for val in el:
149 1
            ntag = self._retag.match(val.tag).groups()[1]
150 1
            obj.valuetype = ntag
151 1
            if ntag in ("Int8", "UInt8", "Int16", "UInt16", "Int32", "UInt32", "Int64", "UInt64"):
152 1
                obj.value = int(val.text)
153 1
            elif ntag in ("Float", "Double"):
154
                obj.value = float(val.text)
155 1
            elif ntag in ("Boolean"):
156
                if val.text in ("True", "true", "1", "on", "On"):
157
                    obj.value = bool(1)
158
                else:
159
                    obj.value = bool(0)
160 1
            elif ntag in ("ByteString", "String"):
161 1
                mytext = val.text
162 1
                if mytext is None:  # support importing null strings
163
                    mytext = ""
164 1
                mytext = mytext.replace('\n', '').replace('\r', '')
165
                # obj.value.append('b"{}"'.format(mytext))
166 1
                obj.value = mytext
167
            elif ntag == "ListOfExtensionObject":
168
                self.logger.info("Value type not implemented: %s", ntag)
169
            elif ntag == "ListOfLocalizedText":
170
                self.logger.info("Value type not implemented: %s", ntag)
171
            else:
172
                self.logger.info("Value type not implemented: %s", ntag)
173
174 1
    def _parse_refs(self, el, obj):
175 1
        for ref in el:
176 1
            if ref.attrib["ReferenceType"] == "HasTypeDefinition":
177 1
                obj.typedef = ref.text
178 1
            elif "IsForward" in ref.attrib and ref.attrib["IsForward"] == "false":
179
                # if obj.parent:
180
                    #sys.stderr.write("Parent is already set with: "+ obj.parent + " " + ref.text + "\n")
181 1
                obj.parent = ref.text
182 1
                obj.parentlink = ref.attrib["ReferenceType"]
183
            else:
184
                struct = RefStruct()
185
                if "IsForward" in ref.attrib:
186
                    struct.forward = ref.attrib["IsForward"]
187
                struct.target = ref.text
188
                struct.reftype = ref.attrib["ReferenceType"]
189
                obj.refs.append(struct)
190