Completed
Pull Request — master (#494)
by Olivier
04:24
created

XMLParser._parse_value()   F

Complexity

Conditions 21

Size

Total Lines 64

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 43
CRAP Score 21.1222

Importance

Changes 4
Bugs 1 Features 1
Metric Value
cc 21
c 4
b 1
f 1
dl 0
loc 64
ccs 43
cts 46
cp 0.9348
crap 21.1222
rs 2.8997

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like XMLParser._parse_value() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
parse xml file from opcua-spec
3
"""
4 1
import logging
5 1
from pytz import utc
6 1
import uuid
7 1
import re
8 1
import sys
9 1
import base64
10
11 1
import xml.etree.ElementTree as ET
12
13 1
from opcua.common import ua_utils
14 1
from opcua import ua
15
16
17 1
def ua_type_to_python(val, uatype_as_str):
18
    """
19
    Converts a string value to a python value according to ua_utils.
20
    """
21 1
    return ua_utils.string_to_val(val, getattr(ua.VariantType, uatype_as_str))
22
23 1
def _to_bool(val):
24
    """
25
    Easy access to boolean conversion.
26
    """
27
    return ua_type_to_python(val, "Boolean")
28
29
30 1
class NodeData(object):
31
32 1
    def __init__(self):
33 1
        self.nodetype = None
34 1
        self.nodeid = None
35 1
        self.browsename = None
36 1
        self.displayname = None
37 1
        self.symname = None  # FIXME: this param is never used, why?
38 1
        self.parent = None
39 1
        self.parentlink = None
40 1
        self.desc = ""
41 1
        self.typedef = None
42 1
        self.refs = []
43 1
        self.nodeclass = None
44 1
        self.eventnotifier = 0
45
46
        # variable
47 1
        self.datatype = None
48 1
        self.rank = -1  # check default value
49 1
        self.value = None
50 1
        self.valuetype = None
51 1
        self.dimensions = None
52 1
        self.accesslevel = None
53 1
        self.useraccesslevel = None
54 1
        self.minsample = None
55
56
        # referencetype
57 1
        self.inversename = ""
58 1
        self.abstract = False
59 1
        self.symmetric = False
60
61
        # datatype
62 1
        self.definition = []
63
64 1
    def __str__(self):
65
        return "NodeData(nodeid:{0})".format(self.nodeid)
66 1
    __repr__ = __str__
67
68
69 1
class RefStruct(object):
70
71 1
    def __init__(self):
72 1
        self.reftype = None
73 1
        self.forward = True
74 1
        self.target = None
75
76
77 1
class ExtObj(object):
78
79 1
    def __init__(self):
80 1
        self.typeid = None
81 1
        self.objname = None
82 1
        self.bodytype = None
83 1
        self.body = {}
84
85 1
    def __str__(self):
86 1
        return "ExtObj({0}, {1})".format(self.objname, self.body)
87 1
    __repr__ = __str__
88
89
90 1
class XMLParser(object):
91
92 1
    def __init__(self, xmlpath):
93 1
        self.logger = logging.getLogger(__name__)
94 1
        self._retag = re.compile(r"(\{.*\})(.*)")
95 1
        self.path = xmlpath
96
97 1
        self.tree = ET.parse(xmlpath)
98 1
        self.root = self.tree.getroot()
99
        # FIXME: hard to get these xml namespaces with ElementTree, we may have to shift to lxml
100 1
        self.ns = {
101
            'base': "http://opcfoundation.org/UA/2011/03/UANodeSet.xsd",
102
            'uax': "http://opcfoundation.org/UA/2008/02/Types.xsd",
103
            'xsd': "http://www.w3.org/2001/XMLSchema",
104
            'xsi': "http://www.w3.org/2001/XMLSchema-instance"
105
        }
106
107 1
    def get_used_namespaces(self):
108
        """
109
        Return the used namespace uris in this import file
110
        """
111 1
        namespaces_uris = []
112 1
        for child in self.root:
113 1
            tag = self._retag.match(child.tag).groups()[1]
114 1
            if tag == 'NamespaceUris':
115 1
                namespaces_uris = [ns_element.text for ns_element in child]
116 1
                break
117 1
        return namespaces_uris
118
119 1
    def get_aliases(self):
120
        """
121
        Return the used node aliases in this import file
122
        """
123 1
        aliases = {}
124 1
        for child in self.root:
125 1
            tag = self._retag.match(child.tag).groups()[1]
126 1
            if tag == 'Aliases':
127 1
                for el in child:
128 1
                    aliases[el.attrib["Alias"]] = el.text
129 1
                break
130 1
        return aliases
131
132 1
    def get_node_datas(self):
133 1
        nodes = []
134 1
        for child in self.root:
135 1
            tag = self._retag.match(child.tag).groups()[1]
136 1
            if tag not in ["Aliases", "NamespaceUris", "Extensions", "Models"]:  # these XML tags don't contain nodes
137 1
                node = self._parse_node(tag, child)
138 1
                nodes.append(node)
139 1
        return nodes
140
141 1
    def _parse_node(self, nodetype, child):
142
        """
143
        Parse a XML node and create a NodeData object.
144
        """
145 1
        obj = NodeData()
146 1
        obj.nodetype = nodetype
147 1
        for key, val in child.attrib.items():
148 1
            self._set_attr(key, val, obj)
149 1
        self.logger.info("Parsing node: %s %s", obj.nodeid, obj.browsename)
150 1
        obj.displayname = obj.browsename  # give a default value to display name
151 1
        for el in child:
152 1
            self._parse_attr(el, obj)
153 1
        return obj
154
155 1
    def _set_attr(self, key, val, obj):
156 1
        if key == "NodeId":
157 1
            obj.nodeid = val
158 1
        elif key == "BrowseName":
159 1
            obj.browsename = val
160 1
        elif key == "SymbolicName":
161
            obj.symname = val
162 1
        elif key == "ParentNodeId":
163 1
            obj.parent = val
164 1
        elif key == "DataType":
165 1
            obj.datatype = val
166 1
        elif key == "IsAbstract":
167
            obj.abstract = _to_bool(val)
168 1
        elif key == "Executable":
169
            obj.executable = _to_bool(val)
170 1
        elif key == "EventNotifier":
171
            obj.eventnotifier = int(val)
172 1
        elif key == "ValueRank":
173 1
            obj.rank = int(val)
174 1
        elif key == "ArrayDimensions":
175 1
            obj.dimensions = [int(i) for i in val.split(",")]
176
        elif key == "MinimumSamplingInterval":
177
            obj.minsample = int(val)
178
        elif key == "AccessLevel":
179
            obj.accesslevel = int(val)
180
        elif key == "UserAccessLevel":
181
            obj.useraccesslevel = int(val)
182
        elif key == "Symmetric":
183
            obj.symmetric = _to_bool(val)
184
        else:
185
            self.logger.info("Attribute not implemented: %s:%s", key, val)
186
187 1
    def _parse_attr(self, el, obj):
188 1
        tag = self._retag.match(el.tag).groups()[1]
189
190 1
        if tag == "DisplayName":
191 1
            obj.displayname = el.text
192 1
        elif tag == "Description":
193 1
            obj.desc = el.text
194 1
        elif tag == "References":
195 1
            self._parse_refs(el, obj)
196 1
        elif tag == "Value":
197 1
            self._parse_contained_value(el, obj)
198 1
        elif tag == "InverseName":
199
            obj.inversename = el.text
200 1
        elif tag == "Definition":
201 1
            for field in el:
202 1
                obj.definition.append(field)
203
        else:
204
            self.logger.info("Not implemented tag: %s", el)
205
206 1
    def _parse_contained_value(self, el, obj):
207
        """
208
        Parse the child of el as a constant.
209
        """
210 1
        val_el = el.find(".//")  # should be only one child
211 1
        self._parse_value(val_el, obj)
212
213 1
    def _parse_value(self, val_el, obj):
214
        """
215
        Parse the node val_el as a constant.
216
        """
217 1
        if val_el is not None and val_el.text is not None:
218 1
            ntag = self._retag.match(val_el.tag).groups()[1]
219
        else:
220 1
            ntag = "Null"
221
222 1
        obj.valuetype = ntag
223 1
        if ntag == "Null":
224 1
            obj.value = None
225 1
        elif hasattr(ua.ua_binary.Primitives1, ntag):
226
            # Elementary types have their parsing directly relying on ua_type_to_python.
227 1
            obj.value = ua_type_to_python(val_el.text, ntag)
228 1
        elif ntag == "DateTime":
229 1
            obj.value = ua_type_to_python(val_el.text, ntag)
230
            # According to specs, DateTime should be either UTC or with a timezone.
231 1
            if obj.value.tzinfo is None or obj.value.tzinfo.utcoffset(obj.value) is None:
232 1
                utc.localize(obj.value) # FIXME Forcing to UTC if unaware, maybe should raise?
233 1
        elif ntag == "ByteString":
234 1
            if val_el.text is None:
235
                mytext = b""
236
            else:
237 1
                mytext = val_el.text.encode()
238 1
                mytext = base64.b64decode(mytext)
239 1
            obj.value = mytext
240 1
        elif ntag == "String":
241 1
            mytext = val_el.text
242 1
            if mytext is None:
243
                # Support importing null strings.
244
                mytext = ""
245
            #mytext = mytext.replace('\n', '').replace('\r', '')
246 1
            obj.value = mytext
247 1
        elif ntag == "Guid":
248 1
            self._parse_contained_value(val_el, obj)
249
            # Override parsed string type to guid.
250 1
            obj.valuetype = ntag
251 1
        elif ntag == "NodeId":
252 1
            id_el = val_el.find("uax:Identifier", self.ns)
253 1
            if id_el is not None:
254 1
                obj.value = id_el.text
255 1
        elif ntag == "ExtensionObject":
256 1
            obj.value = self._parse_ext_obj(val_el)
257 1
        elif ntag == "LocalizedText":
258 1
            obj.value = self._parse_body(val_el)
259 1
        elif ntag == "ListOfLocalizedText":
260 1
            obj.value = self._parse_list_of_localized_text(val_el)
261 1
        elif ntag == "ListOfExtensionObject":
262 1
            obj.value = self._parse_list_of_extension_object(val_el)
263 1
        elif ntag.startswith("ListOf"):
264
            # Default case for "ListOf" types.
265
            # Should stay after particular cases (e.g.: "ListOfLocalizedText").
266 1
            obj.value = []
267 1
            for val_el in val_el:
268 1
                tmp = NodeData()
269 1
                self._parse_value(val_el, tmp)
270 1
                obj.value.append(tmp.value)
271
        else:
272
            # Missing according to string_to_val: XmlElement, ExpandedNodeId,
273
            # QualifiedName, StatusCode.
274
            # Missing according to ua.VariantType (also missing in string_to_val):
275
            # DataValue, Variant, DiagnosticInfo.
276
            self.logger.warning("Parsing value of type '%s' not implemented", ntag)
277
278 1
    def _get_text(self, el):
279 1
        txtlist = [txt.strip() for txt in el.itertext()]
280 1
        return "".join(txtlist)
281
282 1
    def _parse_list_of_localized_text(self, el):
283 1
        value = []
284 1
        for localized_text in el:
285 1
            mylist = self._parse_body(localized_text)
286
            # small hack since we did not handle LocalizedText as ExtensionObject at begynning
287 1
            for name, val in mylist:
288 1
                if name == "Text":
289 1
                    value.append(val)
290 1
        return value
291
292 1
    def _parse_list_of_extension_object(self, el):
293
        """
294
        Parse a uax:ListOfExtensionObject Value
295
        Return an list of ExtObj
296
        """
297 1
        value = []
298 1
        for extension_object in el:
299 1
            ext_obj = self._parse_ext_obj(extension_object)
300 1
            value.append(ext_obj)
301 1
        return value
302
303 1
    def _parse_ext_obj(self, el):
304 1
        ext = ExtObj()
305 1
        for extension_object_part in el:
306 1
            ntag = self._retag.match(extension_object_part.tag).groups()[1]
307 1
            if ntag == 'TypeId':
308 1
                ntag = self._retag.match(extension_object_part.find('*').tag).groups()[1]
309 1
                ext.typeid = self._get_text(extension_object_part)
310 1
            elif ntag == 'Body':
311 1
                ext.objname = self._retag.match(extension_object_part.find('*').tag).groups()[1]
312 1
                ext.body = self._parse_body(extension_object_part)
313
            else:
314
                self.logger.warning("Unknown ntag", ntag)
315 1
        return ext
316
317 1
    def _parse_body(self, el):
318 1
        body = []
319 1
        for body_item in el:
320 1
            otag = self._retag.match(body_item.tag).groups()[1]
321 1
            childs = [i for i in body_item]
322 1
            if not childs:
323 1
                val = self._get_text(body_item)
324
            else:
325 1
                val = self._parse_body(body_item)
326 1
            if val:
327 1
                body.append((otag, val))
328 1
        return body
329
330 1
    def _parse_refs(self, el, obj):
331 1
        for ref in el:
332 1
            if ref.attrib["ReferenceType"] == "HasTypeDefinition":
333 1
                obj.typedef = ref.text
334 1
            elif "IsForward" in ref.attrib and ref.attrib["IsForward"] in ("false", "False"):
335
                # if obj.parent:
336
                    # sys.stderr.write("Parent is already set with: "+ obj.parent + " " + ref.text + "\n")
337 1
                obj.parent = ref.text
338 1
                obj.parentlink = ref.attrib["ReferenceType"]
339
            else:
340 1
                struct = RefStruct()
341 1
                if "IsForward" in ref.attrib:
342
                    struct.forward = ref.attrib["IsForward"]
343 1
                struct.target = ref.text
344 1
                struct.reftype = ref.attrib["ReferenceType"]
345
                obj.refs.append(struct)
346