Completed
Pull Request — master (#376)
by
unknown
03:45
created

XMLParser._parse_value()   F

Complexity

Conditions 15

Size

Total Lines 58

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 33
CRAP Score 15.2842

Importance

Changes 5
Bugs 1 Features 1
Metric Value
cc 15
c 5
b 1
f 1
dl 0
loc 58
ccs 33
cts 37
cp 0.8919
crap 15.2842
rs 3.2148

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like XMLParser._parse_value() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
parse xml file from opcua-spec
3
"""
4 1
import logging
5 1
import datetime
6 1
import re
7 1
import sys
8
9 1
import xml.etree.ElementTree as ET
10
11 1
from opcua.common import ua_utils
12 1
from opcua import ua
13
14
15 1
def ua_type_to_python(val, uatype_as_str):
16
    """
17
    Converts a string value to a python value according to ua_utils.
18
    """
19 1
    return ua_utils.string_to_val(val, getattr(ua.VariantType, uatype_as_str))
20
21 1
def _to_bool(val):
22
    """
23
    Easy access to boolean conversion.
24
    """
25
    return ua_type_to_python(val, "Boolean")
26
27
28 1
class NodeData(object):
29
30 1
    def __init__(self):
31 1
        self.nodetype = None
32 1
        self.nodeid = None
33 1
        self.browsename = None
34 1
        self.displayname = None
35 1
        self.symname = None  # FIXME: this param is never used, why?
36 1
        self.parent = None
37 1
        self.parentlink = None
38 1
        self.desc = ""
39 1
        self.typedef = None
40 1
        self.refs = []
41 1
        self.nodeclass = None
42 1
        self.eventnotifier = 0
43
44
        # variable
45 1
        self.datatype = None
46 1
        self.rank = -1  # check default value
47 1
        self.value = None
48 1
        self.valuetype = None
49 1
        self.dimensions = None
50 1
        self.accesslevel = None
51 1
        self.useraccesslevel = None
52 1
        self.minsample = None
53
54
        # referencetype
55 1
        self.inversename = ""
56 1
        self.abstract = False
57 1
        self.symmetric = False
58
59
        # datatype
60 1
        self.definition = []
61
62 1
    def __str__(self):
63
        return "NodeData(nodeid:{0})".format(self.nodeid)
64 1
    __repr__ = __str__
65
66
67 1
class RefStruct(object):
68
69 1
    def __init__(self):
70 1
        self.reftype = None
71 1
        self.forward = True
72 1
        self.target = None
73
74
75 1
class ExtObj(object):
76
77 1
    def __init__(self):
78 1
        self.typeid = None
79 1
        self.objname = None
80 1
        self.bodytype = None
81 1
        self.body = {}
82
83 1
    def __str__(self):
84 1
        return "ExtObj({0}, {1})".format(self.objname, self.body)
85 1
    __repr__ = __str__
86
87
88 1
class XMLParser(object):
89
90 1
    def __init__(self, xmlpath):
91 1
        self.logger = logging.getLogger(__name__)
92 1
        self._retag = re.compile(r"(\{.*\})(.*)")
93 1
        self.path = xmlpath
94
95 1
        self.tree = ET.parse(xmlpath)
96 1
        self.root = self.tree.getroot()
97
        # FIXME: hard to get these xml namespaces with ElementTree, we may have to shift to lxml
98 1
        self.ns = {
99
            'base': "http://opcfoundation.org/UA/2011/03/UANodeSet.xsd",
100
            'uax': "http://opcfoundation.org/UA/2008/02/Types.xsd",
101
            'xsd': "http://www.w3.org/2001/XMLSchema",
102
            'xsi': "http://www.w3.org/2001/XMLSchema-instance"
103
        }
104
105 1
    def get_used_namespaces(self):
106
        """
107
        Return the used namespace uris in this import file
108
        """
109 1
        namespaces_uris = []
110 1
        for child in self.root:
111 1
            tag = self._retag.match(child.tag).groups()[1]
112 1
            if tag == 'NamespaceUris':
113 1
                namespaces_uris = [ns_element.text for ns_element in child]
114 1
                break
115 1
        return namespaces_uris
116
117 1
    def get_aliases(self):
118
        """
119
        Return the used node aliases in this import file
120
        """
121 1
        aliases = {}
122 1
        for child in self.root:
123 1
            tag = self._retag.match(child.tag).groups()[1]
124 1
            if tag == 'Aliases':
125 1
                for el in child:
126 1
                    aliases[el.attrib["Alias"]] = el.text
127 1
                break
128 1
        return aliases
129
130 1
    def get_node_datas(self):
131 1
        nodes = []
132 1
        for child in self.root:
133 1
            tag = self._retag.match(child.tag).groups()[1]
134 1
            if tag not in ["Aliases", "NamespaceUris", "Extensions", "Models"]:  # these XML tags don't contain nodes
135 1
                node = self._parse_node(tag, child)
136 1
                nodes.append(node)
137 1
        return nodes
138
139 1
    def _parse_node(self, nodetype, child):
140
        """
141
        Parse a XML node and create a NodeData object.
142
        """
143 1
        obj = NodeData()
144 1
        obj.nodetype = nodetype
145 1
        for key, val in child.attrib.items():
146 1
            self._set_attr(key, val, obj)
147 1
        self.logger.info("Parsing node: %s %s", obj.nodeid, obj.browsename)
148 1
        obj.displayname = obj.browsename  # give a default value to display name
149 1
        for el in child:
150 1
            self._parse_attr(el, obj)
151 1
        return obj
152
153 1
    def _set_attr(self, key, val, obj):
154 1
        if key == "NodeId":
155 1
            obj.nodeid = val
156 1
        elif key == "BrowseName":
157 1
            obj.browsename = val
158 1
        elif key == "SymbolicName":
159
            obj.symname = val
160 1
        elif key == "ParentNodeId":
161 1
            obj.parent = val
162 1
        elif key == "DataType":
163 1
            obj.datatype = val
164 1
        elif key == "IsAbstract":
165
            obj.abstract = _to_bool(val)
166 1
        elif key == "Executable":
167
            obj.executable = _to_bool(val)
168 1
        elif key == "EventNotifier":
169
            obj.eventnotifier = int(val)
170 1
        elif key == "ValueRank":
171 1
            obj.rank = int(val)
172 1
        elif key == "ArrayDimensions":
173 1
            obj.dimensions = [int(i) for i in val.split(",")]
174
        elif key == "MinimumSamplingInterval":
175
            obj.minsample = int(val)
176
        elif key == "AccessLevel":
177
            obj.accesslevel = int(val)
178
        elif key == "UserAccessLevel":
179
            obj.useraccesslevel = int(val)
180
        elif key == "Symmetric":
181
            obj.symmetric = _to_bool(val)
182
        else:
183
            self.logger.info("Attribute not implemented: %s:%s", key, val)
184
185 1
    def _parse_attr(self, el, obj):
186 1
        tag = self._retag.match(el.tag).groups()[1]
187
188 1
        if tag == "DisplayName":
189 1
            obj.displayname = el.text
190 1
        elif tag == "Description":
191 1
            obj.desc = el.text
192 1
        elif tag == "References":
193 1
            self._parse_refs(el, obj)
194 1
        elif tag == "Value":
195 1
            self._parse_contained_value(el, obj)
196 1
        elif tag == "InverseName":
197
            obj.inversename = el.text
198 1
        elif tag == "Definition":
199 1
            for field in el:
200 1
                obj.definition.append(field)
201
        else:
202
            self.logger.info("Not implemented tag: %s", el)
203
204 1
    def _parse_contained_value(self, el, obj):
205
        """
206
        Parse the child of el as a constant.
207
        """
208 1
        val_el = el.find(".//")  # should be only one child
209 1
        self._parse_value(val_el, obj)
210
211 1
    def _parse_value(self, val_el, obj):
212
        """
213
        Parse the node val_el as a constant.
214
        """
215 1
        if val_el is not None:
216 1
            ntag = self._retag.match(val_el.tag).groups()[1]
217
        else:
218
            ntag = "Null"
219
220 1
        obj.valuetype = ntag
221 1
        if ntag == "Null":
222
            obj.value = None
223 1
        elif ntag in (
224
                "SByte", "Byte",
225
                "Int16", "UInt16", "Int32", "UInt32", "Int64", "UInt64",
226
                "Float", "Double",
227
                "Boolean",
228
                "DateTime" #FIXME Check format in a specific elif?
229
        ):
230
            # Elementary types have their parsing directly relying on ua_type_to_python.
231 1
            obj.value = ua_type_to_python(val_el.text, ntag)
232 1
        elif ntag in ("ByteString", "String"):
233 1
            mytext = ua_type_to_python(val_el.text, ntag)
234 1
            if mytext is None:
235
                # Support importing null strings.
236
                mytext = ""
237 1
            mytext = mytext.replace('\n', '').replace('\r', '')
238 1
            obj.value = mytext
239 1
        elif ntag == "Guid":
240 1
            self._parse_contained_value(val_el, obj)
241 1
            obj.valuetype = obj.datatype # Override parsed string type to guid.
242 1
        elif ntag == "NodeId":
243 1
            id_el = val_el.find("uax:Identifier", self.ns)
244 1
            if id_el is not None:
245 1
                obj.value = id_el.text
246 1
        elif ntag == "LocalizedText":
247 1
            obj.value = self._parse_body(val_el)
248 1
        elif ntag == "ListOfLocalizedText":
249 1
            obj.value = self._parse_list_of_localized_text(val_el)
250 1
        elif ntag == "ListOfExtensionObject":
251 1
            obj.value = self._parse_list_of_extension_object(val_el)
252 1
        elif ntag.startswith("ListOf"):
253
            # Default case for "ListOf" types.
254
            # Should stay after particular cases (e.g.: "ListOfLocalizedText").
255 1
            obj.value = []
256 1
            for val_el in val_el:
257 1
                tmp = NodeData()
258 1
                self._parse_value(val_el, tmp)
259
                # FIXME Do some checks on parsed element?
260 1
                obj.value.append(tmp.value)
261 1
        elif ntag == "ExtensionObject":
262 1
            obj.value = self._parse_ext_obj(val_el)
263
        else:
264
            # Missing according to string_to_val: XmlElement, ExpandedNodeId,
265
            # QualifiedName, StatusCode.
266
            # Missing according to ua.VariantType (also missing in string_to_val):
267
            # DataValue, Variant, DiagnosticInfo.
268
            self.logger.warning("Parsing value of type '%s' not implemented", ntag)
269
270 1
    def _get_text(self, el):
271 1
        txtlist = [txt.strip() for txt in el.itertext()]
272 1
        return "".join(txtlist)
273
274 1
    def _parse_list_of_localized_text(self, el):
275 1
        value = []
276 1
        for localized_text in el:
277 1
            ntag = self._retag.match(localized_text.tag).groups()[1]
278 1
            for child in localized_text:
279 1
                ntag = self._retag.match(child.tag).groups()[1]
280 1
                if ntag == 'Text':
281 1
                    value.append(self._get_text(child))
282 1
        return value
283
284 1
    def _parse_list_of_extension_object(self, el):
285
        """
286
        Parse a uax:ListOfExtensionObject Value
287
        Return an list of ExtObj
288
        """
289 1
        value = []
290 1
        for extension_object in el:
291 1
            ext_obj = self._parse_ext_obj(extension_object)
292 1
            value.append(ext_obj)
293 1
        return value
294
295 1
    def _parse_ext_obj(self, el):
296 1
        ext = ExtObj()
297 1
        for extension_object_part in el:
298 1
            ntag = self._retag.match(extension_object_part.tag).groups()[1]
299 1
            if ntag == 'TypeId':
300 1
                ntag = self._retag.match(extension_object_part.find('*').tag).groups()[1]
301 1
                ext.typeid = self._get_text(extension_object_part)
302 1
            elif ntag == 'Body':
303 1
                ext.objname = self._retag.match(extension_object_part.find('*').tag).groups()[1]
304 1
                ext.body = self._parse_body(extension_object_part)
305
            else:
306
                print("Uknown ndtag", ntag)
307 1
        return ext
308
309 1
    def _parse_body(self, el):
310 1
        body = []
311 1
        for body_item in el:
312 1
            otag = self._retag.match(body_item.tag).groups()[1]
313 1
            childs = [i for i in body_item]
314 1
            if not childs:
315 1
                val = self._get_text(body_item)
316
            else:
317 1
                val = self._parse_body(body_item)
318 1
            if val:
319 1
                body.append((otag, val))
320 1
        return body
321
322 1
    def _parse_refs(self, el, obj):
323 1
        for ref in el:
324 1
            if ref.attrib["ReferenceType"] == "HasTypeDefinition":
325 1
                obj.typedef = ref.text
326 1
            elif "IsForward" in ref.attrib and ref.attrib["IsForward"] in ("false", "False"):
327
                # if obj.parent:
328
                    # sys.stderr.write("Parent is already set with: "+ obj.parent + " " + ref.text + "\n")
329 1
                obj.parent = ref.text
330 1
                obj.parentlink = ref.attrib["ReferenceType"]
331
            else:
332 1
                struct = RefStruct()
333 1
                if "IsForward" in ref.attrib:
334
                    struct.forward = ref.attrib["IsForward"]
335 1
                struct.target = ref.text
336 1
                struct.reftype = ref.attrib["ReferenceType"]
337
                obj.refs.append(struct)
338