Completed
Push — master ( c97137...1c6e5d )
by Olivier
03:49
created

XMLParser.get_aliases()   A

Complexity

Conditions 4

Size

Total Lines 12

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 9
CRAP Score 4.0961

Importance

Changes 0
Metric Value
cc 4
c 0
b 0
f 0
dl 0
loc 12
ccs 9
cts 11
cp 0.8182
crap 4.0961
rs 9.2
1
"""
2
parse xml file from opcua-spec
3
"""
4 1
import logging
5 1
import re
6 1
import sys
7
8 1
import xml.etree.ElementTree as ET
9
10
11 1
def _to_bool(val):
12
    if val in ("True", "true", "on", "On", "1"):
13 1
        return True
14 1
    else:
15 1
        return False
16 1
17 1
18 1
19 1
class NodeData(object):
20 1
21 1
    def __init__(self):
22 1
        self.nodetype = None
23 1
        self.nodeid = None
24 1
        self.browsename = None
25 1
        self.displayname = None
26
        self.symname = None  # FIXME: this param is never used, why?
27
        self.parent = None
28 1
        self.parentlink = None
29 1
        self.desc = ""
30 1
        self.typedef = None
31 1
        self.refs = []
32 1
        self.nodeclass = None
33 1
        self.eventnotifier = 0
34 1
35 1
        # variable
36
        self.datatype = None
37
        self.rank = -1  # check default value
38 1
        self.value = None
39 1
        self.valuetype = None
40 1
        self.dimensions = None
41
        self.accesslevel = None
42
        self.useraccesslevel = None
43 1
        self.minsample = None
44
45
        # referencetype
46 1
        self.inversename = ""
47
        self.abstract = False
48 1
        self.symmetric = False
49
50
        # datatype
51
        self.definition = []
52
53
54 1
class RefStruct(object):
55
56 1
    def __init__(self):
57 1
        self.reftype = None
58 1
        self.forward = True
59 1
        self.target = None
60 1
61
62 1
class ExtObj(object):
63 1
64 1
    def __init__(self):
65
        self.typeid = None
66 1
        self.objname = None
67 1
        self.bodytype = None
68 1
        self.body = {}
69
70 1
71 1
class XMLParser(object):
72 1
73
    def __init__(self, xmlpath, server):
74
        self.server = server  # POC
75 1
        self.logger = logging.getLogger(__name__)
76 1
        self._retag = re.compile(r"(\{.*\})(.*)")
77 1
        self.path = xmlpath
78
79
        self.tree = ET.parse(xmlpath)
80
        self.root = self.tree.getroot()
81 1
        self.it = None
82 1
83
    def get_used_namespaces(self):
84 1
        """
85
        Return the used namespace uris in this import file
86
        """
87 1
        namespaces_uris = []
88 1
        for child in self.root:
89 1
            name = self._retag.match(child.tag).groups()[1]
90 1
            if name == 'NamespaceUris':
91 1
                namespaces_uris = [ns_element.text for ns_element in child]
92 1
                break
93 1
        return namespaces_uris
94 1
95 1
    def get_aliases(self):
96
        """
97 1
        Return the used node aliases in this import file
98 1
        """
99 1
        aliases = {}
100 1
        for child in self.root:
101 1
            name = self._retag.match(child.tag).groups()[1]
102 1
            if name == 'Aliases':
103
                for el in child:
104 1
                    aliases[el.attrib["Alias"]] = el.text
105
                break
106 1
        return aliases
107 1
108
    def __iter__(self):
109
        nodes = []
110
        for child in self.root:
111
            name = self._retag.match(child.tag).groups()[1]
112
            if name not in ["Aliases", "NamespaceUris"]:
113
                node = self._parse_node(name, child)
114
                nodes.append(node)
115
116
        self.it = iter(nodes)
117
        return self
118
119
    def __next__(self):
120
        while True:
121
            if sys.version_info[0] < 3:
122
                child = self.it.next()
123
            else:
124
                child = self.it.__next__()
125
            return child
126
127
    def next(self):  # support for python2
128 1
        return self.__next__()
129 1
130
    def _parse_node(self, name, child):
131 1
        """
132
        Parse a XML node and create a NodeData object.
133 1
        """
134 1
        obj = NodeData()
135 1
        obj.nodetype = name
136 1
        for key, val in child.attrib.items():
137 1
            self._set_attr(key, val, obj)
138 1
        self.logger.info("\n     Parsing node: %s %s", obj.nodeid, obj.browsename)
139
        obj.displayname = obj.browsename  # give a default value to display name
140
        for el in child:
141
            self._parse_tag(el, obj)
142
        return obj
143
144
    def _set_attr(self, key, val, obj):
145
        if key == "NodeId":
146
            obj.nodeid = val
147 1
        elif key == "BrowseName":
148 1
            obj.browsename = val
149 1
        elif key == "SymbolicName":
150 1
            obj.symname = val
151 1
        elif key == "ParentNodeId":
152 1
            obj.parent = val
153 1
        elif key == "DataType":
154
            obj.datatype = val
155 1
        elif key == "IsAbstract":
156
            obj.abstract = _to_bool(val)
157
        elif key == "Executable":
158
            obj.executable = _to_bool(val)
159
        elif key == "EventNotifier":
160 1
            obj.eventnotifier = int(val)
161 1
        elif key == "ValueRank":
162 1
            obj.rank = int(val)
163
        elif key == "ArrayDimensions":
164 1
            obj.dimensions = [int(i) for i in val.split(",")]
165
        elif key == "MinimumSamplingInterval":
166 1
            obj.minsample = int(val)
167
        elif key == "AccessLevel":
168
            obj.accesslevel = int(val)
169
        elif key == "UserAccessLevel":
170
            obj.useraccesslevel = int(val)
171
        elif key == "Symmetric":
172
            obj.symmetric = _to_bool(val)
173
        else:
174 1
            self.logger.info("Attribute not implemented: %s:%s", key, val)
175 1
176 1
    def _parse_tag(self, el, obj):
177 1
        tag = self._retag.match(el.tag).groups()[1]
178 1
179
        if tag == "DisplayName":
180
            obj.displayname = el.text
181 1
        elif tag == "Description":
182 1
            obj.desc = el.text
183
        elif tag == "References":
184
            self._parse_refs(el, obj)
185
        elif tag == "Value":
186
            self._parse_value(el, obj)
187
        elif tag == "InverseName":
188
            obj.inversename = el.text
189
        elif tag == "Definition":
190
            for field in el:
191
                obj.definition.append(field)
192
        else:
193
            self.logger.info("Not implemented tag: %s", el)
194
195
    def _parse_value(self, el, obj):
196
        self.logger.info("Parsing value")
197
        for val in el:
198
            self.logger.info("tag %s", val.tag)
199
            ntag = self._retag.match(val.tag).groups()[1]
200
            obj.valuetype = ntag
201
            if ntag in ("Int8", "UInt8", "Int16", "UInt16", "Int32", "UInt32", "Int64", "UInt64"):
202
                obj.value = int(val.text)
203
            elif ntag in ("Float", "Double"):
204
                obj.value = float(val.text)
205
            elif ntag in ("Boolean"):
206
                obj.value = _to_bool(val.text)
207
            elif ntag in ("ByteString", "String"):
208
                mytext = val.text
209
                if mytext is None:  # support importing null strings
210
                    mytext = ""
211
                mytext = mytext.replace('\n', '').replace('\r', '')
212
                # obj.value.append('b"{}"'.format(mytext))
213
                obj.value = mytext
214
            elif ntag in ("Guid"):
215
                self._parse_value(val, obj)
216
            elif ntag == "ListOfExtensionObject":
217
                obj.value = self._parse_list_of_extension_object(el)
218
            elif ntag == "ListOfLocalizedText":
219
                obj.value = self._parse_list_of_localized_text(el)
220
            elif ntag == "ExtensionObject":
221
                obj.value = self._parse_extension_object(el)
222
            else:
223
                self.logger.warning("Value type not implemented: '%s'", ntag)
224
225
    def _get_text(self, el):
226
        txtlist = [txt.strip() for txt in el.itertext()]
227
        return "".join(txtlist)
228
229
    def _parse_list_of_localized_text(self, el):
230
        value = []
231
        for localized_text_list in el:
232
            for localized_text in localized_text_list:
233
                ntag = self._retag.match(localized_text.tag).groups()[1]
234
                for child in localized_text:
235
                    ntag = self._retag.match(child.tag).groups()[1]
236
                    if ntag == 'Text':
237
                        value.append(self._get_text(child))
238
        return value
239
240
    def _parse_list_of_extension_object(self, el):
241
        '''
242
        Parse a uax:ListOfExtensionObject Value
243
        Return an list of ExtObj
244
        '''
245
        value = []
246
        for extension_object_list in el:
247
            for extension_object in extension_object_list:
248
                ext_obj = self._parse_ext_obj(extension_object)
249
                value.append(ext_obj)
250
        return value
251
252
    def _parse_extension_object(self, el):
253
        for ext_obj in el:
254
            return self._parse_ext_obj(ext_obj)
255
256
    def _parse_ext_obj(self, el):
257
        ext = ExtObj()
258
        for extension_object_part in el:
259
            ntag = self._retag.match(extension_object_part.tag).groups()[1]
260
            if ntag == 'TypeId':
261
                ntag = self._retag.match(extension_object_part.find('*').tag).groups()[1]
262
                ext.typeid = self._get_text(extension_object_part)
263
            elif ntag == 'Body':
264
                ext.objname = self._retag.match(extension_object_part.find('*').tag).groups()[1]
265
                ext.body = self._parse_body(extension_object_part)
266
            else:
267
                print("Uknown ndtag", ntag)
268
        return ext
269
270
    def _parse_body(self, el):
271
        body = []
272
        for body_item in el:
273
            otag = self._retag.match(body_item.tag).groups()[1]
274
            childs = [i for i in body_item]
275
            if not childs:
276
                val = self._get_text(body_item)
277
            else:
278
                val = self._parse_body(body_item)
279
            if val:
280
                body.append((otag, val))
281
        return body
282
283
    def _parse_refs(self, el, obj):
284
        for ref in el:
285
            if ref.attrib["ReferenceType"] == "HasTypeDefinition":
286
                obj.typedef = ref.text
287
            elif "IsForward" in ref.attrib and ref.attrib["IsForward"] in ("false", "False"):
288
                # if obj.parent:
289
                    # sys.stderr.write("Parent is already set with: "+ obj.parent + " " + ref.text + "\n")
290
                obj.parent = ref.text
291
                obj.parentlink = ref.attrib["ReferenceType"]
292
            else:
293
                struct = RefStruct()
294
                if "IsForward" in ref.attrib:
295
                    struct.forward = ref.attrib["IsForward"]
296
                struct.target = ref.text
297
                struct.reftype = ref.attrib["ReferenceType"]
298
                obj.refs.append(struct)
299