Completed
Push — master ( eb5e78...c75f71 )
by Olivier
04:30
created

XMLParser._set_attr()   F

Complexity

Conditions 17

Size

Total Lines 29

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 306

Importance

Changes 0
Metric Value
cc 17
c 0
b 0
f 0
dl 0
loc 29
ccs 0
cts 0
cp 0
crap 306
rs 2.7204

How to fix   Complexity   

Complexity

Complex classes like XMLParser._set_attr() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
parse xml file from opcua-spec
3
"""
4 1
import logging
5 1
import re
6 1
import sys
7
8 1
import xml.etree.ElementTree as ET
9
10
11 1
class NodeData(object):
12
13 1
    def __init__(self):
14 1
        self.nodetype = None
15 1
        self.nodeid = None
16 1
        self.browsename = None
17 1
        self.displayname = None
18 1
        self.symname = None  # FIXME: this param is never used, why?
19 1
        self.parent = None
20 1
        self.parentlink = None
21 1
        self.desc = ""
22 1
        self.typedef = None
23 1
        self.refs = []
24 1
        self.nodeclass = None
25 1
        self.eventnotifier = 0
26
27
        # variable
28 1
        self.datatype = None
29 1
        self.rank = -1  # check default value
30 1
        self.value = None
31 1
        self.valuetype = None
32 1
        self.dimensions = None
33 1
        self.accesslevel = None
34 1
        self.useraccesslevel = None
35 1
        self.minsample = None
36
37
        # referencetype
38 1
        self.inversename = ""
39 1
        self.abstract = "false"
40 1
        self.symmetric = "false"
41
42
        # datatype
43 1
        self.definition = []
44
45
46 1
class RefStruct(object):
47
48 1
    def __init__(self):
49
        self.reftype = None
50
        self.forward = True
51
        self.target = None
52
53
54 1
class XMLParser(object):
55
56 1
    def __init__(self, xmlpath, server):
57 1
        self.server = server  # POC
58 1
        self.logger = logging.getLogger(__name__)
59 1
        self._retag = re.compile(r"(\{.*\})(.*)")
60 1
        self.path = xmlpath
61
        self.aliases = {}
62 1
63 1
        self.tree = ET.parse(xmlpath)
64 1
        self.root = self.tree.getroot()
65
        self.it = None
66 1
67 1
        self.namespaces = {}
68 1
        self._re_nodeid = re.compile(r"^ns=(?P<ns>\d+[^;]*);i=(?P<i>\d+)")
69
70 1
    def __iter__(self):
71 1
        nodes = []
72 1
        for child in self.root:
73
            name = self._retag.match(child.tag).groups()[1]
74
            if name == "Aliases":
75 1
                for el in child:
76 1
                    self.aliases[el.attrib["Alias"]] = self._get_node_id(el.text)
77 1
            elif name == 'NamespaceUris':
78
                for ns_index, ns_element in enumerate(child):
79
                    ns_uri = ns_element.text
80
                    ns_server_index = self.server.register_namespace(ns_uri)
81 1
                    self.namespaces[ns_index + 1] = (ns_server_index, ns_uri)
82 1
            else:
83
                node = self._parse_node(name, child)
84 1
                nodes.append(node)
85
86
        # The ordering of nodes currently only works if namespaces are
87 1
        # defined in XML.
88 1
        # Also, it is recommended not to use node ids without namespace prefix!
89 1
        if self.namespaces:
90 1
            nodes = self._sort_nodes_by_parentid(nodes)
91 1
92 1
        self.it = iter(nodes)
93 1
        return self
94 1
95 1
    def __next__(self):
96
        while True:
97 1
            if sys.version_info[0] < 3:
98 1
                child = self.it.next()
99 1
            else:
100 1
                child = self.it.__next__()
101 1
            return child
102 1
103
    def next(self):  # support for python2
104 1
        return self.__next__()
105
106 1
    def _sort_nodes_by_parentid(self, nodes):
107 1
        """
108
        Sort the list of nodes according theire parent node in order to respect
109
        the depency between nodes.
110
111
        :param nodes: list of NodeDataObjects
112
        :returns: list of sorted nodes
113
        """
114
        _nodes = list(nodes)
115
        # list of node ids that are already sorted / inserted
116
        sorted_nodes_ids = []
117
        # list of sorted nodes (i.e. XML Elements)
118
        sorted_nodes = []
119
        # list of namespace indexes that are relevant for this import
120
        # we can only respect ordering nodes for namespaces indexes that
121
        # are defined in the xml file itself. Thus we assume that all other
122
        # references namespaces are already known to the server and should
123
        # not create any dependency problems (like "NodeNotFound")
124
        relevant_namespaces = [str(i[0]) for i in self.namespaces.values()]
125
        while len(_nodes) > 0:
126
            pop_nodes = []
127
            for node in _nodes:
128 1
                insert = None
129 1
                # Get the node and parent node namespace and id parts
130
                node_ns, node_id = self._split_node_id(node.nodeid)
131 1
                parent_ns, parent_id = self._split_node_id(node.parent)
132
133 1
                # Insert nodes that
134 1
                #   (1) have no parent / parent_ns is None (e.g. namespace 0)
135 1
                #   (2) ns is not in list of relevant namespaces
136 1
                if (parent_ns is None or node_ns not in relevant_namespaces or
137 1
                    parent_id is None):
138 1
                    insert = 0
139
                else:
140
                    # Check if the nodes parent is already in the list of
141
                    # inserted nodes
142
                    if node.parent in sorted_nodes_ids:
143
                        insert = -1
144
145
                if insert == 0:
146
                    sorted_nodes.insert(insert, node)
147 1
                    sorted_nodes_ids.insert(insert, node.nodeid)
148 1
                    pop_nodes.append(node)
149 1
                elif insert == -1:
150 1
                    sorted_nodes.append(node)
151 1
                    sorted_nodes_ids.append(node.nodeid)
152 1
                    pop_nodes.append(node)
153 1
154
            # Remove inserted nodes from the list
155 1
            for node in pop_nodes:
156
                _nodes.pop(_nodes.index(node))
157
        return sorted_nodes
158
159
    def _split_node_id(self, value):
160 1
        """
161 1
        Split the fq node id into namespace and id part.
162 1
163
        :returns: (namespace, id)
164 1
        """
165
        if not value:
166 1
            return (None, value)
167
        r_match = self._re_nodeid.search(value)
168
        if r_match:
169
            return r_match.groups()
170
171
        return (None, value)
172
173
    def _get_node_id(self, value):
174 1
        """
175 1
        Check if the nodeid given in the xml model file must be converted
176 1
        to a already existing namespace id based on the files namespace uri
177 1
178 1
        :returns: NodeId (str)
179
        """
180
        result = value
181 1
182 1
        node_ns, node_id = self._split_node_id(value)
183
        if node_ns:
184
            ns_server = self.namespaces.get(int(node_ns), None)
185
            if ns_server:
186
                result = "ns={};i={}".format(ns_server[0], node_id)
187
        return result
188
189
    def _parse_node(self, name, child):
190
        """
191
        Parse a XML node and create a NodeData object.
192
        """
193
        obj = NodeData()
194
        obj.nodetype = name
195
        for key, val in child.attrib.items():
196
            self._set_attr(key, val, obj)
197
        obj.displayname = obj.browsename  # give a default value to display name
198
        for el in child:
199
            self._parse_tag(el, obj)
200
        return obj
201
202
    def _set_attr(self, key, val, obj):
203
        if key == "NodeId":
204
            obj.nodeid = self._get_node_id(val)
205
        elif key == "BrowseName":
206
            obj.browsename = val
207
        elif key == "SymbolicName":
208
            obj.symname = val
209
        elif key == "ParentNodeId":
210
            obj.parent = self._get_node_id(val)
211
        elif key == "DataType":
212
            obj.datatype = val
213
        elif key == "IsAbstract":
214
            obj.abstract = val
215
        elif key == "EventNotifier":
216
            obj.eventnotifier = 1 if val == "1" else 0
217
        elif key == "ValueRank":
218
            obj.rank = int(val)
219
        elif key == "ArrayDimensions":
220
            obj.dimensions = [int(i) for i in val.split(",")]
221
        elif key == "MinimumSamplingInterval":
222
            obj.minsample = int(val)
223
        elif key == "AccessLevel":
224
            obj.accesslevel = int(val)
225
        elif key == "UserAccessLevel":
226
            obj.useraccesslevel = int(val)
227
        elif key == "Symmetric":
228
            obj.symmetric = True if val == "true" else False
229
        else:
230
            self.logger.info("Attribute not implemented: %s:%s", key, val)
231
232
    def _parse_tag(self, el, obj):
233
        tag = self._retag.match(el.tag).groups()[1]
234
235
        if tag == "DisplayName":
236
            obj.displayname = el.text
237
        elif tag == "Description":
238
            obj.desc = el.text
239
        elif tag == "References":
240
            self._parse_refs(el, obj)
241
        elif tag == "Value":
242
            self._parse_value(el, obj)
243
        elif tag == "InverseName":
244
            obj.inversename = el.text
245
        elif tag == "Definition":
246
            for field in el:
247
                obj.definition.append(field)
248
        else:
249
            self.logger.info("Not implemented tag: %s", el)
250
251
    def _parse_value(self, el, obj):
252
        for val in el:
253
            ntag = self._retag.match(val.tag).groups()[1]
254
            obj.valuetype = ntag
255
            if ntag in ("Int8", "UInt8", "Int16", "UInt16", "Int32", "UInt32", "Int64", "UInt64"):
256
                obj.value = int(val.text)
257
            elif ntag in ("Float", "Double"):
258
                obj.value = float(val.text)
259
            elif ntag in ("Boolean"):
260
                if val.text in ("True", "true", "1", "on", "On"):
261
                    obj.value = bool(1)
262
                else:
263
                    obj.value = bool(0)
264
            elif ntag in ("ByteString", "String"):
265
                mytext = val.text
266
                if mytext is None:  # support importing null strings
267
                    mytext = ""
268
                mytext = mytext.replace('\n', '').replace('\r', '')
269
                # obj.value.append('b"{}"'.format(mytext))
270
                obj.value = mytext
271
            elif ntag == "ListOfExtensionObject":
272
                obj.value, obj.valuetype = self._parse_list_of_extension_object(el)
273
            elif ntag == "ListOfLocalizedText":
274
                obj.value = self._parse_list_of_localized_text(el)
275
            else:
276
                self.logger.info("Value type not implemented: %s", ntag)
277
278
    def _get_text(self, el):
279
        txt = ""
280
        for text in el.itertext():
281
            txt += text
282
        return txt
283
284
    def _parse_list_of_localized_text(self, el):
285
        value = []
286
        for localized_text_list in el:
287
            for localized_text in localized_text_list:
288
                ntag = self._retag.match(localized_text.tag).groups()[1]
289
                for child in localized_text:
290
                    ntag = self._retag.match(child.tag).groups()[1]
291
                    if ntag == 'Text':
292
                        value.append(self._get_text(child))
293
        return value
294
295
    def _parse_list_of_extension_object(self, el):
296
        '''
297
        Parse a uax:ListOfExtensionObject Value
298
        
299
        Return an array with a value of each uax:ExtensionObject/*/* (each element is convert to a netry in a dict.
300
               also the valuetype is returned. The valuetype is  uax:ExtensionObject/*/tag()
301
        '''
302
        value = []
303
        valuetype = None
304
        for extension_object_list in el:
305
            for extension_object in extension_object_list:
306
                extension_object.find('Body')
307
                for extension_object_part in extension_object:
308
                    ntag = self._retag.match(extension_object_part.tag).groups()[1]
309
                    if ntag == 'Body':
310
                        data = {}
311
                        ntag = self._retag.match(extension_object_part.find('*').tag).groups()[1]
312
                        valuetype = ntag
313
                        for body_item in extension_object_part.findall('*/*'):
314
                            ntag = self._retag.match(body_item.tag).groups()[1]
315
316
                            child = body_item.find('*')
317
                            if child is not None:
318
                                data[ntag] = self._get_text(child)
319
                            else:
320
                                data[ntag] = self._get_text(body_item)
321
                        value.append(data)
322
        return value, valuetype
323
324
    def _parse_refs(self, el, obj):
325
        for ref in el:
326
            if ref.attrib["ReferenceType"] == "HasTypeDefinition":
327
                obj.typedef = self._get_node_id(ref.text)
328
            elif "IsForward" in ref.attrib and ref.attrib["IsForward"] in ("false", "False"):
329
                # if obj.parent:
330
                    # sys.stderr.write("Parent is already set with: "+ obj.parent + " " + ref.text + "\n")
331
                obj.parent = self._get_node_id(ref.text)
332
                obj.parentlink = ref.attrib["ReferenceType"]
333
            else:
334
                struct = RefStruct()
335
                if "IsForward" in ref.attrib:
336
                    struct.forward = ref.attrib["IsForward"]
337
                struct.target = self._get_node_id(ref.text)
338
                struct.reftype = ref.attrib["ReferenceType"]
339
                obj.refs.append(struct)
340