Completed
Pull Request — master (#337)
by
unknown
04:05
created

XMLParser._parse_node()   F

Complexity

Conditions 9

Size

Total Lines 30

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 11
CRAP Score 20.5013

Importance

Changes 0
Metric Value
cc 9
c 0
b 0
f 0
dl 0
loc 30
ccs 11
cts 23
cp 0.4783
crap 20.5013
rs 3
1
"""
2
parse xml file from opcua-spec
3
"""
4 1
import logging
5 1
import re
6 1
import sys
7
8 1
import xml.etree.ElementTree as ET
9
10
11 1
def _to_bool(val):
12
    if val in ("True", "true", "on", "On", "1"):
13 1
        return True
14 1
    else:
15 1
        return False
16 1
17 1
18 1
def ua_type_to_python(val, uatype):
19 1
    if uatype.startswith("Int") or uatype.startswith("UInt"):
20 1
        return int(val)
21 1
    elif uatype.lower().startswith("bool"):
22 1
        return _to_bool(val)
23 1
    elif uatype in ("Double", "Float"):
24 1
        return float(val)
25 1
    elif uatype in ("String"):
26
        return val
27
    elif uatype in ("Bytes", "Bytes", "ByteString", "ByteArray"):
28 1
        if sys.version_info.major > 2:
29 1
            return bytes(val, 'utf8')
30 1
        else:
31 1
            return val
32 1
    else:
33 1
        raise Exception("uatype nopt handled", uatype, " for val ", val)
34 1
35 1
36
class NodeData(object):
37
38 1
    def __init__(self):
39 1
        self.nodetype = None
40 1
        self.nodeid = None
41
        self.browsename = None
42
        self.displayname = None
43 1
        self.symname = None  # FIXME: this param is never used, why?
44
        self.parent = None
45
        self.parentlink = None
46 1
        self.desc = ""
47
        self.typedef = None
48 1
        self.refs = []
49
        self.nodeclass = None
50
        self.eventnotifier = 0
51
52
        # variable
53
        self.datatype = None
54 1
        self.rank = -1  # check default value
55
        self.value = None
56 1
        self.valuetype = None
57 1
        self.dimensions = None
58 1
        self.accesslevel = None
59 1
        self.useraccesslevel = None
60 1
        self.minsample = None
61
62 1
        # referencetype
63 1
        self.inversename = ""
64 1
        self.abstract = False
65
        self.symmetric = False
66 1
67 1
        # datatype
68 1
        self.definition = []
69
70 1
    def __str__(self):
71 1
        return "NodeData(nodeid:{})".format(self.nodeid)
72 1
    __repr__ = __str__
73
74
75 1
class RefStruct(object):
76 1
77 1
    def __init__(self):
78
        self.reftype = None
79
        self.forward = True
80
        self.target = None
81 1
82 1
83
class ExtObj(object):
84 1
85
    def __init__(self):
86
        self.typeid = None
87 1
        self.objname = None
88 1
        self.bodytype = None
89 1
        self.body = {}
90 1
91 1
92 1
class XMLParser(object):
93 1
    """
94 1
    XML Parser class which traverses an XML document to collect all information required for creating an address space
95 1
    """
96
97 1
    def __init__(self, xmlpath, enable_default_values=False):
98 1
        """
99 1
        Constructor for XML parser
100 1
        Args:
101 1
            xmlpath: path to the xml document; the document must be in OPC UA defined format
102 1
            enable_default_values: false results in xml variable nodes with no Value element being converted to Null
103
                                   true results in XML variable nodes to keep defined datatype with a default value
104 1
        """
105
106 1
        self.logger = logging.getLogger(__name__)
107 1
        self._retag = re.compile(r"(\{.*\})(.*)")
108
        self.path = xmlpath
109
        self.enable_default_values = enable_default_values
110
111
        self.tree = ET.parse(xmlpath)
112
        self.root = self.tree.getroot()
113
        self.it = None
114
115
    def get_used_namespaces(self):
116
        """
117
        Return the used namespace uris in this import file
118
        """
119
        namespaces_uris = []
120
        for child in self.root:
121
            name = self._retag.match(child.tag).groups()[1]
122
            if name == 'NamespaceUris':
123
                namespaces_uris = [ns_element.text for ns_element in child]
124
                break
125
        return namespaces_uris
126
127
    def get_aliases(self):
128 1
        """
129 1
        Return the used node aliases in this import file
130
        """
131 1
        aliases = {}
132
        for child in self.root:
133 1
            name = self._retag.match(child.tag).groups()[1]
134 1
            if name == 'Aliases':
135 1
                for el in child:
136 1
                    aliases[el.attrib["Alias"]] = el.text
137 1
                break
138 1
        return aliases
139
140
    def get_node_datas(self):
141
        nodes = []
142
        for child in self.root:
143
            name = self._retag.match(child.tag).groups()[1]
144
            if name not in ["Aliases", "NamespaceUris"]:
145
                node = self._parse_node(name, child)
146
                nodes.append(node)
147 1
        
148 1
        return nodes
149 1
150 1
    def __next__(self):
151 1
        while True:
152 1
            if sys.version_info[0] < 3:
153 1
                child = self.it.next()
154
            else:
155 1
                child = self.it.__next__()
156
            return child
157
158
    def next(self):  # support for python2
159
        return self.__next__()
160 1
161 1
    def _parse_node(self, name, child):
162 1
        """
163
        Parse a XML node and create a NodeData object.
164 1
        """
165
        obj = NodeData()
166 1
        obj.nodetype = name
167
        for key, val in child.attrib.items():
168
            self._set_attr(key, val, obj)
169
        self.logger.info("\n     Parsing node: %s %s", obj.nodeid, obj.browsename)
170
        obj.displayname = obj.browsename  # give a default value to display name
171
172
        # set default Value based on type; avoids nodes with Value=None being converted to type Null by server
173
        if self.enable_default_values:
174 1
            if obj.datatype is not None:
175 1
                if obj.datatype in ("Int8", "UInt8", "Int16", "UInt16", "Int32", "UInt32", "Int64", "UInt64"):
176 1
                    obj.valuetype = obj.datatype
177 1
                    obj.value = 0
178 1
                elif obj.datatype in ("Float", "Double"):
179
                    obj.valuetype = obj.datatype
180
                    obj.value = 0.0
181 1
                elif obj.datatype in ("Boolean"):
182 1
                    obj.valuetype = obj.datatype
183
                    obj.value = False
184
                elif obj.datatype in ("ByteString", "String"):
185
                    obj.valuetype = obj.datatype
186
                    obj.value = ""
187
188
        for el in child:
189
            self._parse_tag(el, obj)
190
        return obj
191
192
    def _set_attr(self, key, val, obj):
193
        if key == "NodeId":
194
            obj.nodeid = val
195
        elif key == "BrowseName":
196
            obj.browsename = val
197
        elif key == "SymbolicName":
198
            obj.symname = val
199
        elif key == "ParentNodeId":
200
            obj.parent = val
201
        elif key == "DataType":
202
            obj.datatype = val
203
        elif key == "IsAbstract":
204
            obj.abstract = _to_bool(val)
205
        elif key == "Executable":
206
            obj.executable = _to_bool(val)
207
        elif key == "EventNotifier":
208
            obj.eventnotifier = int(val)
209
        elif key == "ValueRank":
210
            obj.rank = int(val)
211
        elif key == "ArrayDimensions":
212
            obj.dimensions = [int(i) for i in val.split(",")]
213
        elif key == "MinimumSamplingInterval":
214
            obj.minsample = int(val)
215
        elif key == "AccessLevel":
216
            obj.accesslevel = int(val)
217
        elif key == "UserAccessLevel":
218
            obj.useraccesslevel = int(val)
219
        elif key == "Symmetric":
220
            obj.symmetric = _to_bool(val)
221
        else:
222
            self.logger.info("Attribute not implemented: %s:%s", key, val)
223
224
    def _parse_tag(self, el, obj):
225
        tag = self._retag.match(el.tag).groups()[1]
226
227
        if tag == "DisplayName":
228
            obj.displayname = el.text
229
        elif tag == "Description":
230
            obj.desc = el.text
231
        elif tag == "References":
232
            self._parse_refs(el, obj)
233
        elif tag == "Value":
234
            self._parse_value(el, obj)
235
        elif tag == "InverseName":
236
            obj.inversename = el.text
237
        elif tag == "Definition":
238
            for field in el:
239
                obj.definition.append(field)
240
        else:
241
            self.logger.info("Not implemented tag: %s", el)
242
243
    def _parse_value(self, el, obj):
244
        self.logger.info("Parsing value")
245
        for val in el:
246
            self.logger.info("tag %s", val.tag)
247
            ntag = self._retag.match(val.tag).groups()[1]
248
            obj.valuetype = ntag
249
            if ntag in ("Int8", "UInt8", "Int16", "UInt16", "Int32", "UInt32", "Int64", "UInt64"):
250
                obj.value = int(val.text)
251
            elif ntag in ("Float", "Double"):
252
                obj.value = float(val.text)
253
            elif ntag in ("Boolean"):
254
                obj.value = _to_bool(val.text)
255
            elif ntag in ("ByteString", "String"):
256
                mytext = val.text
257
                if mytext is None:  # support importing null strings
258
                    mytext = ""
259
                mytext = mytext.replace('\n', '').replace('\r', '')
260
                # obj.value.append('b"{}"'.format(mytext))
261
                obj.value = mytext
262
            elif ntag in ("DateTime"):
263
                obj.value = val.text
264
            elif ntag in ("Guid"):
265
                self._parse_value(val, obj)
266
                obj.valuetype = obj.datatype  # override parsed string type to guid
267
            elif ntag == "LocalizedText":
268
                obj.value = self._parse_body(el)
269
            elif ntag == "NodeId":
270
                obj.Value = self._parse_body(el)
271
            elif ntag == "ListOfExtensionObject":
272
                obj.value = self._parse_list_of_extension_object(el)
273
            elif ntag == "ListOfLocalizedText":
274
                obj.value = self._parse_list_of_localized_text(el)
275
            elif ntag.startswith("ListOf"):
276
                obj.value = self._parse_list(el[0])
277
            elif ntag == "ExtensionObject":
278
                obj.value = self._parse_extension_object(el)
279
            else:
280
                self.logger.warning("Value type not implemented: '%s'", ntag)
281
282
    def _get_text(self, el):
283
        txtlist = [txt.strip() for txt in el.itertext()]
284
        return "".join(txtlist)
285
286
    def _parse_list(self, el):
287
        value = []
288
        for val_el in el:
289
            ntag = self._retag.match(val_el.tag).groups()[1]
290
            if ntag.startswith("ListOf"):
291
                val = self._parse_list(val_el)
292
            else:
293
                val = ua_type_to_python(val_el.text, ntag)
294
            value.append(val)
295
        return value
296
297
    def _parse_list_of_localized_text(self, el):
298
        value = []
299
        for localized_text_list in el:
300
            for localized_text in localized_text_list:
301
                ntag = self._retag.match(localized_text.tag).groups()[1]
302
                for child in localized_text:
303
                    ntag = self._retag.match(child.tag).groups()[1]
304
                    if ntag == 'Text':
305
                        value.append(self._get_text(child))
306
        return value
307
308
    def _parse_list_of_extension_object(self, el):
309
        """
310
        Parse a uax:ListOfExtensionObject Value
311
        Return an list of ExtObj
312
        """
313
        value = []
314
        for extension_object_list in el:
315
            for extension_object in extension_object_list:
316
                ext_obj = self._parse_ext_obj(extension_object)
317
                value.append(ext_obj)
318
        return value
319
320
    def _parse_extension_object(self, el):
321
        for ext_obj in el:
322
            return self._parse_ext_obj(ext_obj)
323
324
    def _parse_ext_obj(self, el):
325
        ext = ExtObj()
326
        for extension_object_part in el:
327
            ntag = self._retag.match(extension_object_part.tag).groups()[1]
328
            if ntag == 'TypeId':
329
                ntag = self._retag.match(extension_object_part.find('*').tag).groups()[1]
330
                ext.typeid = self._get_text(extension_object_part)
331
            elif ntag == 'Body':
332
                ext.objname = self._retag.match(extension_object_part.find('*').tag).groups()[1]
333
                ext.body = self._parse_body(extension_object_part)
334
            else:
335
                print("Uknown ndtag", ntag)
336
        return ext
337
338
    def _parse_body(self, el):
339
        body = []
340
        for body_item in el:
341
            otag = self._retag.match(body_item.tag).groups()[1]
342
            childs = [i for i in body_item]
343
            if not childs:
344
                val = self._get_text(body_item)
345
            else:
346
                val = self._parse_body(body_item)
347
            if val:
348
                body.append((otag, val))
349
        return body
350
351
    def _parse_refs(self, el, obj):
352
        for ref in el:
353
            if ref.attrib["ReferenceType"] == "HasTypeDefinition":
354
                obj.typedef = ref.text
355
            elif "IsForward" in ref.attrib and ref.attrib["IsForward"] in ("false", "False"):
356
                # if obj.parent:
357
                    # sys.stderr.write("Parent is already set with: "+ obj.parent + " " + ref.text + "\n")
358
                obj.parent = ref.text
359
                obj.parentlink = ref.attrib["ReferenceType"]
360
            else:
361
                struct = RefStruct()
362
                if "IsForward" in ref.attrib:
363
                    struct.forward = ref.attrib["IsForward"]
364
                struct.target = ref.text
365
                struct.reftype = ref.attrib["ReferenceType"]
366
                obj.refs.append(struct)
367