Completed
Pull Request — master (#337)
by
unknown
04:53
created

XMLParser._parse_node()   F

Complexity

Conditions 9

Size

Total Lines 30

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 13
CRAP Score 19.125

Importance

Changes 0
Metric Value
cc 9
dl 0
loc 30
ccs 13
cts 26
cp 0.5
crap 19.125
rs 3
c 0
b 0
f 0
1
"""
2
parse xml file from opcua-spec
3
"""
4 1
import logging
5 1
import re
6 1
import sys
7
8 1
import xml.etree.ElementTree as ET
9
10
11 1
def _to_bool(val):
12
    if val in ("True", "true", "on", "On", "1"):
13 1
        return True
14 1
    else:
15 1
        return False
16 1
17 1
18 1
class NodeData(object):
19 1
20 1
    def __init__(self):
21 1
        self.nodetype = None
22 1
        self.nodeid = None
23 1
        self.browsename = None
24 1
        self.displayname = None
25 1
        self.symname = None  # FIXME: this param is never used, why?
26
        self.parent = None
27
        self.parentlink = None
28 1
        self.desc = ""
29 1
        self.typedef = None
30 1
        self.refs = []
31 1
        self.nodeclass = None
32 1
        self.eventnotifier = 0
33 1
34 1
        # variable
35 1
        self.datatype = None
36
        self.rank = -1  # check default value
37
        self.value = None
38 1
        self.valuetype = None
39 1
        self.dimensions = None
40 1
        self.accesslevel = None
41
        self.useraccesslevel = None
42
        self.minsample = None
43 1
44
        # referencetype
45
        self.inversename = ""
46 1
        self.abstract = False
47
        self.symmetric = False
48 1
49
        # datatype
50
        self.definition = []
51
52
53
class RefStruct(object):
54 1
55
    def __init__(self):
56 1
        self.reftype = None
57 1
        self.forward = True
58 1
        self.target = None
59 1
60 1
61
class ExtObj(object):
62 1
63 1
    def __init__(self):
64 1
        self.typeid = None
65
        self.objname = None
66 1
        self.bodytype = None
67 1
        self.body = {}
68 1
69
70 1
class XMLParser(object):
71 1
    """
72 1
    XML Parser class which traverses an XML document to collect all information required for creating address an space
73
    """
74
75 1
    def __init__(self, xmlpath, server, enable_default_values=False):
76 1
        """
77 1
        Constructor for XML parser
78
        Args:
79
            xmlpath: path to the xml document; the document must be in OPC UA defined format
80
            server: the python opcua server object the nodes will be imported to
81 1
            enable_default_values: false results in xml variable nodes with no Value element being converted to Null
82 1
                                   true results in XML variable nodes to keep defined datatype with a default value
83
        """
84 1
        self.server = server  # POC
85
        self.logger = logging.getLogger(__name__)
86
        self._retag = re.compile(r"(\{.*\})(.*)")
87 1
        self.path = xmlpath
88 1
        self.enable_default_values = enable_default_values
89 1
90 1
        self.tree = ET.parse(xmlpath)
91 1
        self.root = self.tree.getroot()
92 1
        self.it = None
93 1
94 1
    def get_used_namespaces(self):
95 1
        """
96
        Return the used namespace uris in this import file
97 1
        """
98 1
        namespaces_uris = []
99 1
        for child in self.root:
100 1
            name = self._retag.match(child.tag).groups()[1]
101 1
            if name == 'NamespaceUris':
102 1
                namespaces_uris = [ns_element.text for ns_element in child]
103
                break
104 1
        return namespaces_uris
105
106 1
    def get_aliases(self):
107 1
        """
108
        Return the used node aliases in this import file
109
        """
110
        aliases = {}
111
        for child in self.root:
112
            name = self._retag.match(child.tag).groups()[1]
113
            if name == 'Aliases':
114
                for el in child:
115
                    aliases[el.attrib["Alias"]] = el.text
116
                break
117
        return aliases
118
119
    def __iter__(self):
120
        nodes = []
121
        for child in self.root:
122
            name = self._retag.match(child.tag).groups()[1]
123
            if name not in ["Aliases", "NamespaceUris"]:
124
                node = self._parse_node(name, child)
125
                nodes.append(node)
126
127
        self.it = iter(nodes)
128 1
        return self
129 1
130
    def __next__(self):
131 1
        while True:
132
            if sys.version_info[0] < 3:
133 1
                child = self.it.next()
134 1
            else:
135 1
                child = self.it.__next__()
136 1
            return child
137 1
138 1
    def next(self):  # support for python2
139
        return self.__next__()
140
141
    def _parse_node(self, name, child):
142
        """
143
        Parse a XML node and create a NodeData object.
144
        """
145
        obj = NodeData()
146
        obj.nodetype = name
147 1
        for key, val in child.attrib.items():
148 1
            self._set_attr(key, val, obj)
149 1
        self.logger.info("\n     Parsing node: %s %s", obj.nodeid, obj.browsename)
150 1
        obj.displayname = obj.browsename  # give a default value to display name
151 1
152 1
        # set default Value based on type; avoids nodes with Value=None being converted to type Null by server
153 1
        if self.enable_default_values:
154
            if obj.datatype is not None:
155 1
                if obj.datatype in ("Int8", "UInt8", "Int16", "UInt16", "Int32", "UInt32", "Int64", "UInt64"):
156
                    obj.valuetype = obj.datatype
157
                    obj.value = 0
158
                elif obj.datatype in ("Float", "Double"):
159
                    obj.valuetype = obj.datatype
160 1
                    obj.value = 0.0
161 1
                elif obj.datatype in ("Boolean"):
162 1
                    obj.valuetype = obj.datatype
163
                    obj.value = False
164 1
                elif obj.datatype in ("ByteString", "String"):
165
                    obj.valuetype = obj.datatype
166 1
                    obj.value = ""
167
168
        for el in child:
169
            self._parse_tag(el, obj)
170
        return obj
171
172
    def _set_attr(self, key, val, obj):
173
        if key == "NodeId":
174 1
            obj.nodeid = val
175 1
        elif key == "BrowseName":
176 1
            obj.browsename = val
177 1
        elif key == "SymbolicName":
178 1
            obj.symname = val
179
        elif key == "ParentNodeId":
180
            obj.parent = val
181 1
        elif key == "DataType":
182 1
            obj.datatype = val
183
        elif key == "IsAbstract":
184
            obj.abstract = _to_bool(val)
185
        elif key == "Executable":
186
            obj.executable = _to_bool(val)
187
        elif key == "EventNotifier":
188
            obj.eventnotifier = int(val)
189
        elif key == "ValueRank":
190
            obj.rank = int(val)
191
        elif key == "ArrayDimensions":
192
            obj.dimensions = [int(i) for i in val.split(",")]
193
        elif key == "MinimumSamplingInterval":
194
            obj.minsample = int(val)
195
        elif key == "AccessLevel":
196
            obj.accesslevel = int(val)
197
        elif key == "UserAccessLevel":
198
            obj.useraccesslevel = int(val)
199
        elif key == "Symmetric":
200
            obj.symmetric = _to_bool(val)
201
        else:
202
            self.logger.info("Attribute not implemented: %s:%s", key, val)
203
204
    def _parse_tag(self, el, obj):
205
        tag = self._retag.match(el.tag).groups()[1]
206
207
        if tag == "DisplayName":
208
            obj.displayname = el.text
209
        elif tag == "Description":
210
            obj.desc = el.text
211
        elif tag == "References":
212
            self._parse_refs(el, obj)
213
        elif tag == "Value":
214
            self._parse_value(el, obj)
215
        elif tag == "InverseName":
216
            obj.inversename = el.text
217
        elif tag == "Definition":
218
            for field in el:
219
                obj.definition.append(field)
220
        else:
221
            self.logger.info("Not implemented tag: %s", el)
222
223
    def _parse_value(self, el, obj):
224
        self.logger.info("Parsing value")
225
        for val in el:
226
            self.logger.info("tag %s", val.tag)
227
            ntag = self._retag.match(val.tag).groups()[1]
228
            obj.valuetype = ntag
229
            if ntag in ("Int8", "UInt8", "Int16", "UInt16", "Int32", "UInt32", "Int64", "UInt64"):
230
                obj.value = int(val.text)
231
            elif ntag in ("Float", "Double"):
232
                obj.value = float(val.text)
233
            elif ntag in ("Boolean"):
234
                obj.value = _to_bool(val.text)
235
            elif ntag in ("ByteString", "String"):
236
                mytext = val.text
237
                if mytext is None:  # support importing null strings
238
                    mytext = ""
239
                mytext = mytext.replace('\n', '').replace('\r', '')
240
                # obj.value.append('b"{}"'.format(mytext))
241
                obj.value = mytext
242
            elif ntag in ("Guid"):
243
                self._parse_value(val, obj)
244
            elif ntag == "ListOfExtensionObject":
245
                obj.value = self._parse_list_of_extension_object(el)
246
            elif ntag == "ListOfLocalizedText":
247
                obj.value = self._parse_list_of_localized_text(el)
248
            elif ntag == "ExtensionObject":
249
                obj.value = self._parse_extension_object(el)
250
            else:
251
                self.logger.warning("Value type not implemented: '%s'", ntag)
252
253
    def _get_text(self, el):
254
        txtlist = [txt.strip() for txt in el.itertext()]
255
        return "".join(txtlist)
256
257
    def _parse_list_of_localized_text(self, el):
258
        value = []
259
        for localized_text_list in el:
260
            for localized_text in localized_text_list:
261
                ntag = self._retag.match(localized_text.tag).groups()[1]
262
                for child in localized_text:
263
                    ntag = self._retag.match(child.tag).groups()[1]
264
                    if ntag == 'Text':
265
                        value.append(self._get_text(child))
266
        return value
267
268
    def _parse_list_of_extension_object(self, el):
269
        """
270
        Parse a uax:ListOfExtensionObject Value
271
        Return an list of ExtObj
272
        """
273
        value = []
274
        for extension_object_list in el:
275
            for extension_object in extension_object_list:
276
                ext_obj = self._parse_ext_obj(extension_object)
277
                value.append(ext_obj)
278
        return value
279
280
    def _parse_extension_object(self, el):
281
        for ext_obj in el:
282
            return self._parse_ext_obj(ext_obj)
283
284
    def _parse_ext_obj(self, el):
285
        ext = ExtObj()
286
        for extension_object_part in el:
287
            ntag = self._retag.match(extension_object_part.tag).groups()[1]
288
            if ntag == 'TypeId':
289
                ntag = self._retag.match(extension_object_part.find('*').tag).groups()[1]
290
                ext.typeid = self._get_text(extension_object_part)
291
            elif ntag == 'Body':
292
                ext.objname = self._retag.match(extension_object_part.find('*').tag).groups()[1]
293
                ext.body = self._parse_body(extension_object_part)
294
            else:
295
                print("Uknown ndtag", ntag)
296
        return ext
297
298
    def _parse_body(self, el):
299
        body = []
300
        for body_item in el:
301
            otag = self._retag.match(body_item.tag).groups()[1]
302
            childs = [i for i in body_item]
303
            if not childs:
304
                val = self._get_text(body_item)
305
            else:
306
                val = self._parse_body(body_item)
307
            if val:
308
                body.append((otag, val))
309
        return body
310
311
    def _parse_refs(self, el, obj):
312
        for ref in el:
313
            if ref.attrib["ReferenceType"] == "HasTypeDefinition":
314
                obj.typedef = ref.text
315
            elif "IsForward" in ref.attrib and ref.attrib["IsForward"] in ("false", "False"):
316
                # if obj.parent:
317
                    # sys.stderr.write("Parent is already set with: "+ obj.parent + " " + ref.text + "\n")
318
                obj.parent = ref.text
319
                obj.parentlink = ref.attrib["ReferenceType"]
320
            else:
321
                struct = RefStruct()
322
                if "IsForward" in ref.attrib:
323
                    struct.forward = ref.attrib["IsForward"]
324
                struct.target = ref.text
325
                struct.reftype = ref.attrib["ReferenceType"]
326
                obj.refs.append(struct)
327