Completed
Pull Request — master (#327)
by Olivier
06:13 queued 02:31
created

XMLParser.get_node_datas()   A

Complexity

Conditions 3

Size

Total Lines 9

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 3.0261

Importance

Changes 0
Metric Value
cc 3
dl 0
loc 9
ccs 6
cts 7
cp 0.8571
crap 3.0261
rs 9.6666
c 0
b 0
f 0
1
"""
2
parse xml file from opcua-spec
3
"""
4 1
import logging
5 1
import re
6 1
import sys
7
8 1
import xml.etree.ElementTree as ET
9
10
11 1
def _to_bool(val):
12
    if val in ("True", "true", "on", "On", "1"):
13 1
        return True
14 1
    else:
15 1
        return False
16 1
17 1
18 1
def ua_type_to_python(val, uatype):
19 1
    if uatype.startswith("Int") or uatype.startswith("UInt"):
20 1
        return int(val)
21 1
    elif uatype.lower().startswith("bool"):
22 1
        return _to_bool(val)
23 1
    elif uatype in ("String"):
24 1
        return val
25 1
    elif uatype in ("Bytes", "Bytes", "ByteString", "ByteArray"):
26
        if sys.version_info.major > 2:
27
            return bytes(val, 'utf8')
28 1
        else:
29 1
            return val
30 1
    else:
31 1
        raise Exception("uatype nopt handled", uatype, " for val ", val)
32 1
33 1
34 1
35 1
class NodeData(object):
36
37
    def __init__(self):
38 1
        self.nodetype = None
39 1
        self.nodeid = None
40 1
        self.browsename = None
41
        self.displayname = None
42
        self.symname = None  # FIXME: this param is never used, why?
43 1
        self.parent = None
44
        self.parentlink = None
45
        self.desc = ""
46 1
        self.typedef = None
47
        self.refs = []
48 1
        self.nodeclass = None
49
        self.eventnotifier = 0
50
51
        # variable
52
        self.datatype = None
53
        self.rank = -1  # check default value
54 1
        self.value = None
55
        self.valuetype = None
56 1
        self.dimensions = None
57 1
        self.accesslevel = None
58 1
        self.useraccesslevel = None
59 1
        self.minsample = None
60 1
61
        # referencetype
62 1
        self.inversename = ""
63 1
        self.abstract = False
64 1
        self.symmetric = False
65
66 1
        # datatype
67 1
        self.definition = []
68 1
69
    def __str__(self):
70 1
        return "NodeData(nodeid:{})".format(self.nodeid)
71 1
    __repr__ = __str__
72 1
73
74
class RefStruct(object):
75 1
76 1
    def __init__(self):
77 1
        self.reftype = None
78
        self.forward = True
79
        self.target = None
80
81 1
82 1
class ExtObj(object):
83
84 1
    def __init__(self):
85
        self.typeid = None
86
        self.objname = None
87 1
        self.bodytype = None
88 1
        self.body = {}
89 1
90 1
91 1
class XMLParser(object):
92 1
93 1
    def __init__(self, xmlpath):
94 1
        self.logger = logging.getLogger(__name__)
95 1
        self._retag = re.compile(r"(\{.*\})(.*)")
96
        self.path = xmlpath
97 1
98 1
        self.tree = ET.parse(xmlpath)
99 1
        self.root = self.tree.getroot()
100 1
        self.it = None
101 1
102 1
    def get_used_namespaces(self):
103
        """
104 1
        Return the used namespace uris in this import file
105
        """
106 1
        namespaces_uris = []
107 1
        for child in self.root:
108
            name = self._retag.match(child.tag).groups()[1]
109
            if name == 'NamespaceUris':
110
                namespaces_uris = [ns_element.text for ns_element in child]
111
                break
112
        return namespaces_uris
113
114
    def get_aliases(self):
115
        """
116
        Return the used node aliases in this import file
117
        """
118
        aliases = {}
119
        for child in self.root:
120
            name = self._retag.match(child.tag).groups()[1]
121
            if name == 'Aliases':
122
                for el in child:
123
                    aliases[el.attrib["Alias"]] = el.text
124
                break
125
        return aliases
126
127
    def get_node_datas(self):
128 1
        nodes = []
129 1
        for child in self.root:
130
            name = self._retag.match(child.tag).groups()[1]
131 1
            if name not in ["Aliases", "NamespaceUris"]:
132
                node = self._parse_node(name, child)
133 1
                nodes.append(node)
134 1
        
135 1
        return nodes
136 1
137 1
    def __next__(self):
138 1
        while True:
139
            if sys.version_info[0] < 3:
140
                child = self.it.next()
141
            else:
142
                child = self.it.__next__()
143
            return child
144
145
    def next(self):  # support for python2
146
        return self.__next__()
147 1
148 1
    def _parse_node(self, name, child):
149 1
        """
150 1
        Parse a XML node and create a NodeData object.
151 1
        """
152 1
        obj = NodeData()
153 1
        obj.nodetype = name
154
        for key, val in child.attrib.items():
155 1
            self._set_attr(key, val, obj)
156
        self.logger.info("\n     Parsing node: %s %s", obj.nodeid, obj.browsename)
157
        obj.displayname = obj.browsename  # give a default value to display name
158
        for el in child:
159
            self._parse_tag(el, obj)
160 1
        return obj
161 1
162 1
    def _set_attr(self, key, val, obj):
163
        if key == "NodeId":
164 1
            obj.nodeid = val
165
        elif key == "BrowseName":
166 1
            obj.browsename = val
167
        elif key == "SymbolicName":
168
            obj.symname = val
169
        elif key == "ParentNodeId":
170
            obj.parent = val
171
        elif key == "DataType":
172
            obj.datatype = val
173
        elif key == "IsAbstract":
174 1
            obj.abstract = _to_bool(val)
175 1
        elif key == "Executable":
176 1
            obj.executable = _to_bool(val)
177 1
        elif key == "EventNotifier":
178 1
            obj.eventnotifier = int(val)
179
        elif key == "ValueRank":
180
            obj.rank = int(val)
181 1
        elif key == "ArrayDimensions":
182 1
            obj.dimensions = [int(i) for i in val.split(",")]
183
        elif key == "MinimumSamplingInterval":
184
            obj.minsample = int(val)
185
        elif key == "AccessLevel":
186
            obj.accesslevel = int(val)
187
        elif key == "UserAccessLevel":
188
            obj.useraccesslevel = int(val)
189
        elif key == "Symmetric":
190
            obj.symmetric = _to_bool(val)
191
        else:
192
            self.logger.info("Attribute not implemented: %s:%s", key, val)
193
194
    def _parse_tag(self, el, obj):
195
        tag = self._retag.match(el.tag).groups()[1]
196
197
        if tag == "DisplayName":
198
            obj.displayname = el.text
199
        elif tag == "Description":
200
            obj.desc = el.text
201
        elif tag == "References":
202
            self._parse_refs(el, obj)
203
        elif tag == "Value":
204
            self._parse_value(el, obj)
205
        elif tag == "InverseName":
206
            obj.inversename = el.text
207
        elif tag == "Definition":
208
            for field in el:
209
                obj.definition.append(field)
210
        else:
211
            self.logger.info("Not implemented tag: %s", el)
212
213
    def _parse_value(self, el, obj):
214
        self.logger.info("Parsing value")
215
        for val in el:
216
            self.logger.info("tag %s", val.tag)
217
            ntag = self._retag.match(val.tag).groups()[1]
218
            obj.valuetype = ntag
219
            if ntag in ("Int8", "UInt8", "Int16", "UInt16", "Int32", "UInt32", "Int64", "UInt64"):
220
                obj.value = int(val.text)
221
            elif ntag in ("Float", "Double"):
222
                obj.value = float(val.text)
223
            elif ntag in ("Boolean"):
224
                obj.value = _to_bool(val.text)
225
            elif ntag in ("ByteString", "String"):
226
                mytext = val.text
227
                if mytext is None:  # support importing null strings
228
                    mytext = ""
229
                mytext = mytext.replace('\n', '').replace('\r', '')
230
                # obj.value.append('b"{}"'.format(mytext))
231
                obj.value = mytext
232
            elif ntag in ("Guid"):
233
                self._parse_value(val, obj)
234
            elif ntag == "ListOfExtensionObject":
235
                obj.value = self._parse_list_of_extension_object(el)
236
            elif ntag == "ListOfLocalizedText":
237
                obj.value = self._parse_list_of_localized_text(el)
238
            elif ntag.startswith("ListOf"):
239
                obj.value = self._parse_list(el[0])
240
            elif ntag == "ExtensionObject":
241
                obj.value = self._parse_extension_object(el)
242
            else:
243
                self.logger.warning("Value type not implemented: '%s'", ntag)
244
245
    def _get_text(self, el):
246
        txtlist = [txt.strip() for txt in el.itertext()]
247
        return "".join(txtlist)
248
249
    def _parse_list(self, el):
250
        value = []
251
        for val_el in el:
252
            ntag = self._retag.match(val_el.tag).groups()[1]
253
            if ntag.startswith("ListOf"):
254
                val = self._parse_list(val_el)
255
            else:
256
                val = ua_type_to_python(val_el.text, ntag)
257
            value.append(val)
258
        return value
259
260
    def _parse_list_of_localized_text(self, el):
261
        value = []
262
        for localized_text_list in el:
263
            for localized_text in localized_text_list:
264
                ntag = self._retag.match(localized_text.tag).groups()[1]
265
                for child in localized_text:
266
                    ntag = self._retag.match(child.tag).groups()[1]
267
                    if ntag == 'Text':
268
                        value.append(self._get_text(child))
269
        return value
270
271
    def _parse_list_of_extension_object(self, el):
272
        '''
273
        Parse a uax:ListOfExtensionObject Value
274
        Return an list of ExtObj
275
        '''
276
        value = []
277
        for extension_object_list in el:
278
            for extension_object in extension_object_list:
279
                ext_obj = self._parse_ext_obj(extension_object)
280
                value.append(ext_obj)
281
        return value
282
283
    def _parse_extension_object(self, el):
284
        for ext_obj in el:
285
            return self._parse_ext_obj(ext_obj)
286
287
    def _parse_ext_obj(self, el):
288
        ext = ExtObj()
289
        for extension_object_part in el:
290
            ntag = self._retag.match(extension_object_part.tag).groups()[1]
291
            if ntag == 'TypeId':
292
                ntag = self._retag.match(extension_object_part.find('*').tag).groups()[1]
293
                ext.typeid = self._get_text(extension_object_part)
294
            elif ntag == 'Body':
295
                ext.objname = self._retag.match(extension_object_part.find('*').tag).groups()[1]
296
                ext.body = self._parse_body(extension_object_part)
297
            else:
298
                print("Uknown ndtag", ntag)
299
        return ext
300
301
    def _parse_body(self, el):
302
        body = []
303
        for body_item in el:
304
            otag = self._retag.match(body_item.tag).groups()[1]
305
            childs = [i for i in body_item]
306
            if not childs:
307
                val = self._get_text(body_item)
308
            else:
309
                val = self._parse_body(body_item)
310
            if val:
311
                body.append((otag, val))
312
        return body
313
314
    def _parse_refs(self, el, obj):
315
        for ref in el:
316
            if ref.attrib["ReferenceType"] == "HasTypeDefinition":
317
                obj.typedef = ref.text
318
            elif "IsForward" in ref.attrib and ref.attrib["IsForward"] in ("false", "False"):
319
                # if obj.parent:
320
                    # sys.stderr.write("Parent is already set with: "+ obj.parent + " " + ref.text + "\n")
321
                obj.parent = ref.text
322
                obj.parentlink = ref.attrib["ReferenceType"]
323
            else:
324
                struct = RefStruct()
325
                if "IsForward" in ref.attrib:
326
                    struct.forward = ref.attrib["IsForward"]
327
                struct.target = ref.text
328
                struct.reftype = ref.attrib["ReferenceType"]
329
                obj.refs.append(struct)
330