Test Failed
Pull Request — master (#494)
by Olivier
03:40
created

XMLParser._parse_value()   F

Complexity

Conditions 21

Size

Total Lines 64

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 41
CRAP Score 21.5664

Importance

Changes 4
Bugs 1 Features 1
Metric Value
cc 21
dl 0
loc 64
ccs 41
cts 46
cp 0.8913
crap 21.5664
rs 2.8997
c 4
b 1
f 1

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like XMLParser._parse_value() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
parse xml file from opcua-spec
3
"""
4 1
import logging
5 1
from pytz import utc
6 1
import uuid
7 1
import re
8 1
import sys
9 1
import base64
10
11 1
import xml.etree.ElementTree as ET
12
13 1
from opcua.common import ua_utils
14 1
from opcua import ua
15
16
17 1
def ua_type_to_python(val, uatype_as_str):
18
    """
19
    Converts a string value to a python value according to ua_utils.
20
    """
21 1
    return ua_utils.string_to_val(val, getattr(ua.VariantType, uatype_as_str))
22
23 1
def _to_bool(val):
24
    """
25
    Easy access to boolean conversion.
26
    """
27
    return ua_type_to_python(val, "Boolean")
28
29
30 1
class NodeData(object):
31
32 1
    def __init__(self):
33 1
        self.nodetype = None
34 1
        self.nodeid = None
35 1
        self.browsename = None
36 1
        self.displayname = None
37 1
        self.symname = None  # FIXME: this param is never used, why?
38 1
        self.parent = None
39 1
        self.parentlink = None
40 1
        self.desc = ""
41 1
        self.typedef = None
42 1
        self.refs = []
43 1
        self.nodeclass = None
44 1
        self.eventnotifier = 0
45
46
        # variable
47 1
        self.datatype = None
48 1
        self.rank = -1  # check default value
49 1
        self.value = None
50 1
        self.valuetype = None
51 1
        self.dimensions = None
52 1
        self.accesslevel = None
53 1
        self.useraccesslevel = None
54 1
        self.minsample = None
55
56
        # referencetype
57 1
        self.inversename = ""
58 1
        self.abstract = False
59 1
        self.symmetric = False
60
61
        # datatype
62 1
        self.definition = []
63
64 1
    def __str__(self):
65
        return "NodeData(nodeid:{0})".format(self.nodeid)
66 1
    __repr__ = __str__
67
68
69 1
class RefStruct(object):
70
71 1
    def __init__(self):
72 1
        self.reftype = None
73 1
        self.forward = True
74 1
        self.target = None
75
76
77 1
class ExtObj(object):
78
79 1
    def __init__(self):
80 1
        self.typeid = None
81 1
        self.objname = None
82 1
        self.bodytype = None
83 1
        self.body = {}
84
85 1
    def __str__(self):
86 1
        return "ExtObj({0}, {1})".format(self.objname, self.body)
87 1
    __repr__ = __str__
88
89
90 1
class XMLParser(object):
91
92 1
    def __init__(self, xmlpath):
93 1
        self.logger = logging.getLogger(__name__)
94 1
        self._retag = re.compile(r"(\{.*\})(.*)")
95 1
        self.path = xmlpath
96
97 1
        self.tree = ET.parse(xmlpath)
98 1
        self.root = self.tree.getroot()
99
        # FIXME: hard to get these xml namespaces with ElementTree, we may have to shift to lxml
100 1
        self.ns = {
101
            'base': "http://opcfoundation.org/UA/2011/03/UANodeSet.xsd",
102
            'uax': "http://opcfoundation.org/UA/2008/02/Types.xsd",
103
            'xsd': "http://www.w3.org/2001/XMLSchema",
104
            'xsi': "http://www.w3.org/2001/XMLSchema-instance"
105
        }
106
107 1
    def get_used_namespaces(self):
108
        """
109
        Return the used namespace uris in this import file
110
        """
111 1
        namespaces_uris = []
112 1
        for child in self.root:
113 1
            tag = self._retag.match(child.tag).groups()[1]
114 1
            if tag == 'NamespaceUris':
115 1
                namespaces_uris = [ns_element.text for ns_element in child]
116 1
                break
117 1
        return namespaces_uris
118
119 1
    def get_aliases(self):
120
        """
121
        Return the used node aliases in this import file
122
        """
123 1
        aliases = {}
124 1
        for child in self.root:
125 1
            tag = self._retag.match(child.tag).groups()[1]
126 1
            if tag == 'Aliases':
127 1
                for el in child:
128 1
                    aliases[el.attrib["Alias"]] = el.text
129 1
                break
130 1
        return aliases
131
132 1
    def get_node_datas(self):
133 1
        nodes = []
134 1
        for child in self.root:
135 1
            tag = self._retag.match(child.tag).groups()[1]
136 1
            if tag not in ["Aliases", "NamespaceUris", "Extensions", "Models"]:  # these XML tags don't contain nodes
137 1
                node = self._parse_node(tag, child)
138 1
                nodes.append(node)
139 1
        return nodes
140
141 1
    def _parse_node(self, nodetype, child):
142
        """
143
        Parse a XML node and create a NodeData object.
144
        """
145 1
        obj = NodeData()
146 1
        obj.nodetype = nodetype
147 1
        for key, val in child.attrib.items():
148 1
            self._set_attr(key, val, obj)
149 1
        self.logger.info("Parsing node: %s %s", obj.nodeid, obj.browsename)
150 1
        obj.displayname = obj.browsename  # give a default value to display name
151 1
        for el in child:
152 1
            self._parse_attr(el, obj)
153 1
        return obj
154
155 1
    def _set_attr(self, key, val, obj):
156 1
        if key == "NodeId":
157 1
            obj.nodeid = val
158 1
        elif key == "BrowseName":
159 1
            obj.browsename = val
160 1
        elif key == "SymbolicName":
161
            obj.symname = val
162 1
        elif key == "ParentNodeId":
163 1
            obj.parent = val
164 1
        elif key == "DataType":
165 1
            obj.datatype = val
166 1
        elif key == "IsAbstract":
167
            obj.abstract = _to_bool(val)
168 1
        elif key == "Executable":
169
            obj.executable = _to_bool(val)
170 1
        elif key == "EventNotifier":
171
            obj.eventnotifier = int(val)
172 1
        elif key == "ValueRank":
173 1
            obj.rank = int(val)
174 1
        elif key == "ArrayDimensions":
175 1
            obj.dimensions = [int(i) for i in val.split(",")]
176
        elif key == "MinimumSamplingInterval":
177
            obj.minsample = int(val)
178
        elif key == "AccessLevel":
179
            obj.accesslevel = int(val)
180
        elif key == "UserAccessLevel":
181
            obj.useraccesslevel = int(val)
182
        elif key == "Symmetric":
183
            obj.symmetric = _to_bool(val)
184
        else:
185
            self.logger.info("Attribute not implemented: %s:%s", key, val)
186
187 1
    def _parse_attr(self, el, obj):
188 1
        tag = self._retag.match(el.tag).groups()[1]
189
190 1
        if tag == "DisplayName":
191 1
            obj.displayname = el.text
192 1
        elif tag == "Description":
193 1
            obj.desc = el.text
194 1
        elif tag == "References":
195 1
            self._parse_refs(el, obj)
196 1
        elif tag == "Value":
197 1
            self._parse_contained_value(el, obj)
198 1
        elif tag == "InverseName":
199
            obj.inversename = el.text
200 1
        elif tag == "Definition":
201 1
            for field in el:
202 1
                obj.definition.append(field)
203
        else:
204
            self.logger.info("Not implemented tag: %s", el)
205
206 1
    def _parse_contained_value(self, el, obj):
207
        """
208
        Parse the child of el as a constant.
209
        """
210 1
        val_el = el.find(".//")  # should be only one child
211 1
        self._parse_value(val_el, obj)
212
213 1
    def _parse_value(self, val_el, obj):
214
        """
215
        Parse the node val_el as a constant.
216
        """
217 1
        if val_el is not None and val_el.text is not None:
218 1
            ntag = self._retag.match(val_el.tag).groups()[1]
219
        else:
220 1
            ntag = "Null"
221
222 1
        obj.valuetype = ntag
223 1
        if ntag == "Null":
224 1
            obj.value = None
225 1
        elif hasattr(ua.ua_binary.Primitives1, ntag):
226
            # Elementary types have their parsing directly relying on ua_type_to_python.
227 1
            obj.value = ua_type_to_python(val_el.text, ntag)
228 1
        elif ntag == "DateTime":
229 1
            obj.value = ua_type_to_python(val_el.text, ntag)
230
            # According to specs, DateTime should be either UTC or with a timezone.
231 1
            if obj.value.tzinfo is None or obj.value.tzinfo.utcoffset(obj.value) is None:
232 1
                utc.localize(obj.value) # FIXME Forcing to UTC if unaware, maybe should raise?
233 1
        elif ntag == "ByteString":
234 1
            if val_el.text is None:
235
                mytext = b""
236
            else:
237 1
                mytext = bytes(val_el.text, "utf-8")
238
                mytext = base64.b64decode(mytext)
239
            obj.value = mytext
240 1
        elif ntag == "String":
241 1
            mytext = val_el.text
242 1
            if mytext is None:
243
                # Support importing null strings.
244
                mytext = ""
245
            #mytext = mytext.replace('\n', '').replace('\r', '')
246 1
            obj.value = mytext
247 1
        elif ntag == "Guid":
248 1
            self._parse_contained_value(val_el, obj)
249
            # Override parsed string type to guid.
250 1
            obj.valuetype = ntag
251 1
        elif ntag == "NodeId":
252 1
            id_el = val_el.find("uax:Identifier", self.ns)
253 1
            if id_el is not None:
254 1
                obj.value = id_el.text
255 1
        elif ntag == "ExtensionObject":
256 1
            obj.value = self._parse_ext_obj(val_el)
257 1
        elif ntag == "LocalizedText":
258 1
            obj.value = self._parse_body(val_el)
259 1
        elif ntag == "ListOfLocalizedText":
260 1
            obj.value = self._parse_list_of_localized_text(val_el)
261 1
        elif ntag == "ListOfExtensionObject":
262 1
            obj.value = self._parse_list_of_extension_object(val_el)
263 1
        elif ntag.startswith("ListOf"):
264
            # Default case for "ListOf" types.
265
            # Should stay after particular cases (e.g.: "ListOfLocalizedText").
266 1
            obj.value = []
267 1
            for val_el in val_el:
268 1
                tmp = NodeData()
269 1
                self._parse_value(val_el, tmp)
270 1
                obj.value.append(tmp.value)
271
        else:
272
            # Missing according to string_to_val: XmlElement, ExpandedNodeId,
273
            # QualifiedName, StatusCode.
274
            # Missing according to ua.VariantType (also missing in string_to_val):
275
            # DataValue, Variant, DiagnosticInfo.
276
            self.logger.warning("Parsing value of type '%s' not implemented", ntag)
277
278 1
    def _get_text(self, el):
279 1
        txtlist = [txt.strip() for txt in el.itertext()]
280 1
        return "".join(txtlist)
281
282 1
    def _parse_list_of_localized_text(self, el):
283 1
        value = []
284 1
        for localized_text in el:
285 1
            mylist = self._parse_body(localized_text)
286
            # small hack since we did not handle LocalizedText as ExtensionObject at begynning
287 1
            for name, val in mylist:
288 1
                if name == "Text":
289 1
                    value.append(val)
290 1
        return value
291
292 1
    def _parse_list_of_extension_object(self, el):
293
        """
294
        Parse a uax:ListOfExtensionObject Value
295
        Return an list of ExtObj
296
        """
297 1
        value = []
298 1
        for extension_object in el:
299 1
            ext_obj = self._parse_ext_obj(extension_object)
300 1
            value.append(ext_obj)
301 1
        return value
302
303 1
    def _parse_ext_obj(self, el):
304 1
        ext = ExtObj()
305 1
        for extension_object_part in el:
306 1
            ntag = self._retag.match(extension_object_part.tag).groups()[1]
307 1
            if ntag == 'TypeId':
308 1
                ntag = self._retag.match(extension_object_part.find('*').tag).groups()[1]
309 1
                ext.typeid = self._get_text(extension_object_part)
310 1
            elif ntag == 'Body':
311 1
                ext.objname = self._retag.match(extension_object_part.find('*').tag).groups()[1]
312 1
                ext.body = self._parse_body(extension_object_part)
313
            else:
314
                self.logger.warning("Unknown ntag", ntag)
315 1
        return ext
316
317 1
    def _parse_body(self, el):
318 1
        body = []
319 1
        for body_item in el:
320 1
            otag = self._retag.match(body_item.tag).groups()[1]
321 1
            childs = [i for i in body_item]
322 1
            if not childs:
323 1
                val = self._get_text(body_item)
324
            else:
325 1
                val = self._parse_body(body_item)
326 1
            if val:
327 1
                body.append((otag, val))
328 1
        return body
329
330 1
    def _parse_refs(self, el, obj):
331 1
        for ref in el:
332 1
            if ref.attrib["ReferenceType"] == "HasTypeDefinition":
333 1
                obj.typedef = ref.text
334 1
            elif "IsForward" in ref.attrib and ref.attrib["IsForward"] in ("false", "False"):
335
                # if obj.parent:
336
                    # sys.stderr.write("Parent is already set with: "+ obj.parent + " " + ref.text + "\n")
337 1
                obj.parent = ref.text
338 1
                obj.parentlink = ref.attrib["ReferenceType"]
339
            else:
340 1
                struct = RefStruct()
341 1
                if "IsForward" in ref.attrib:
342
                    struct.forward = ref.attrib["IsForward"]
343 1
                struct.target = ref.text
344 1
                struct.reftype = ref.attrib["ReferenceType"]
345
                obj.refs.append(struct)
346