Completed
Push — master ( e184dd...515bc6 )
by Olivier
07:33 queued 02:48
created

XMLParser._parse_value()   F

Complexity

Conditions 19

Size

Total Lines 57

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 39
CRAP Score 19.0418

Importance

Changes 4
Bugs 1 Features 1
Metric Value
cc 19
c 4
b 1
f 1
dl 0
loc 57
ccs 39
cts 41
cp 0.9512
crap 19.0418
rs 3.3414

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like XMLParser._parse_value() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
parse xml file from opcua-spec
3
"""
4 1
import logging
5 1
from pytz import utc
6 1
import uuid
7 1
import re
8 1
import sys
9
10 1
import xml.etree.ElementTree as ET
11
12 1
from opcua.common import ua_utils
13 1
from opcua import ua
14
15
16 1
def ua_type_to_python(val, uatype_as_str):
17
    """
18
    Converts a string value to a python value according to ua_utils.
19
    """
20 1
    return ua_utils.string_to_val(val, getattr(ua.VariantType, uatype_as_str))
21
22 1
def _to_bool(val):
23
    """
24
    Easy access to boolean conversion.
25
    """
26
    return ua_type_to_python(val, "Boolean")
27
28
29 1
class NodeData(object):
30
31 1
    def __init__(self):
32 1
        self.nodetype = None
33 1
        self.nodeid = None
34 1
        self.browsename = None
35 1
        self.displayname = None
36 1
        self.symname = None  # FIXME: this param is never used, why?
37 1
        self.parent = None
38 1
        self.parentlink = None
39 1
        self.desc = ""
40 1
        self.typedef = None
41 1
        self.refs = []
42 1
        self.nodeclass = None
43 1
        self.eventnotifier = 0
44
45
        # variable
46 1
        self.datatype = None
47 1
        self.rank = -1  # check default value
48 1
        self.value = None
49 1
        self.valuetype = None
50 1
        self.dimensions = None
51 1
        self.accesslevel = None
52 1
        self.useraccesslevel = None
53 1
        self.minsample = None
54
55
        # referencetype
56 1
        self.inversename = ""
57 1
        self.abstract = False
58 1
        self.symmetric = False
59
60
        # datatype
61 1
        self.definition = []
62
63 1
    def __str__(self):
64
        return "NodeData(nodeid:{0})".format(self.nodeid)
65 1
    __repr__ = __str__
66
67
68 1
class RefStruct(object):
69
70 1
    def __init__(self):
71 1
        self.reftype = None
72 1
        self.forward = True
73 1
        self.target = None
74
75
76 1
class ExtObj(object):
77
78 1
    def __init__(self):
79 1
        self.typeid = None
80 1
        self.objname = None
81 1
        self.bodytype = None
82 1
        self.body = {}
83
84 1
    def __str__(self):
85 1
        return "ExtObj({0}, {1})".format(self.objname, self.body)
86 1
    __repr__ = __str__
87
88
89 1
class XMLParser(object):
90
91 1
    def __init__(self, xmlpath):
92 1
        self.logger = logging.getLogger(__name__)
93 1
        self._retag = re.compile(r"(\{.*\})(.*)")
94 1
        self.path = xmlpath
95
96 1
        self.tree = ET.parse(xmlpath)
97 1
        self.root = self.tree.getroot()
98
        # FIXME: hard to get these xml namespaces with ElementTree, we may have to shift to lxml
99 1
        self.ns = {
100
            'base': "http://opcfoundation.org/UA/2011/03/UANodeSet.xsd",
101
            'uax': "http://opcfoundation.org/UA/2008/02/Types.xsd",
102
            'xsd': "http://www.w3.org/2001/XMLSchema",
103
            'xsi': "http://www.w3.org/2001/XMLSchema-instance"
104
        }
105
106 1
    def get_used_namespaces(self):
107
        """
108
        Return the used namespace uris in this import file
109
        """
110 1
        namespaces_uris = []
111 1
        for child in self.root:
112 1
            tag = self._retag.match(child.tag).groups()[1]
113 1
            if tag == 'NamespaceUris':
114 1
                namespaces_uris = [ns_element.text for ns_element in child]
115 1
                break
116 1
        return namespaces_uris
117
118 1
    def get_aliases(self):
119
        """
120
        Return the used node aliases in this import file
121
        """
122 1
        aliases = {}
123 1
        for child in self.root:
124 1
            tag = self._retag.match(child.tag).groups()[1]
125 1
            if tag == 'Aliases':
126 1
                for el in child:
127 1
                    aliases[el.attrib["Alias"]] = el.text
128 1
                break
129 1
        return aliases
130
131 1
    def get_node_datas(self):
132 1
        nodes = []
133 1
        for child in self.root:
134 1
            tag = self._retag.match(child.tag).groups()[1]
135 1
            if tag not in ["Aliases", "NamespaceUris", "Extensions", "Models"]:  # these XML tags don't contain nodes
136 1
                node = self._parse_node(tag, child)
137 1
                nodes.append(node)
138 1
        return nodes
139
140 1
    def _parse_node(self, nodetype, child):
141
        """
142
        Parse a XML node and create a NodeData object.
143
        """
144 1
        obj = NodeData()
145 1
        obj.nodetype = nodetype
146 1
        for key, val in child.attrib.items():
147 1
            self._set_attr(key, val, obj)
148 1
        self.logger.info("Parsing node: %s %s", obj.nodeid, obj.browsename)
149 1
        obj.displayname = obj.browsename  # give a default value to display name
150 1
        for el in child:
151 1
            self._parse_attr(el, obj)
152 1
        return obj
153
154 1
    def _set_attr(self, key, val, obj):
155 1
        if key == "NodeId":
156 1
            obj.nodeid = val
157 1
        elif key == "BrowseName":
158 1
            obj.browsename = val
159 1
        elif key == "SymbolicName":
160
            obj.symname = val
161 1
        elif key == "ParentNodeId":
162 1
            obj.parent = val
163 1
        elif key == "DataType":
164 1
            obj.datatype = val
165 1
        elif key == "IsAbstract":
166
            obj.abstract = _to_bool(val)
167 1
        elif key == "Executable":
168
            obj.executable = _to_bool(val)
169 1
        elif key == "EventNotifier":
170
            obj.eventnotifier = int(val)
171 1
        elif key == "ValueRank":
172 1
            obj.rank = int(val)
173 1
        elif key == "ArrayDimensions":
174 1
            obj.dimensions = [int(i) for i in val.split(",")]
175
        elif key == "MinimumSamplingInterval":
176
            obj.minsample = int(val)
177
        elif key == "AccessLevel":
178
            obj.accesslevel = int(val)
179
        elif key == "UserAccessLevel":
180
            obj.useraccesslevel = int(val)
181
        elif key == "Symmetric":
182
            obj.symmetric = _to_bool(val)
183
        else:
184
            self.logger.info("Attribute not implemented: %s:%s", key, val)
185
186 1
    def _parse_attr(self, el, obj):
187 1
        tag = self._retag.match(el.tag).groups()[1]
188
189 1
        if tag == "DisplayName":
190 1
            obj.displayname = el.text
191 1
        elif tag == "Description":
192 1
            obj.desc = el.text
193 1
        elif tag == "References":
194 1
            self._parse_refs(el, obj)
195 1
        elif tag == "Value":
196 1
            self._parse_contained_value(el, obj)
197 1
        elif tag == "InverseName":
198
            obj.inversename = el.text
199 1
        elif tag == "Definition":
200 1
            for field in el:
201 1
                obj.definition.append(field)
202
        else:
203
            self.logger.info("Not implemented tag: %s", el)
204
205 1
    def _parse_contained_value(self, el, obj):
206
        """
207
        Parse the child of el as a constant.
208
        """
209 1
        val_el = el.find(".//")  # should be only one child
210 1
        self._parse_value(val_el, obj)
211
212 1
    def _parse_value(self, val_el, obj):
213
        """
214
        Parse the node val_el as a constant.
215
        """
216 1
        if val_el is not None and val_el.text is not None:
217 1
            ntag = self._retag.match(val_el.tag).groups()[1]
218
        else:
219 1
            ntag = "Null"
220
221 1
        obj.valuetype = ntag
222 1
        if ntag == "Null":
223 1
            obj.value = None
224 1
        elif hasattr(ua.ua_binary.Primitives1, ntag):
225
            # Elementary types have their parsing directly relying on ua_type_to_python.
226 1
            obj.value = ua_type_to_python(val_el.text, ntag)
227 1
        elif ntag == "DateTime":
228 1
            obj.value = ua_type_to_python(val_el.text, ntag)
229
            # According to specs, DateTime should be either UTC or with a timezone.
230 1
            if obj.value.tzinfo is None or obj.value.tzinfo.utcoffset(obj.value) is None:
231 1
                utc.localize(obj.value) # FIXME Forcing to UTC if unaware, maybe should raise?
232 1
        elif ntag in ("ByteString", "String"):
233 1
            mytext = ua_type_to_python(val_el.text, ntag)
234 1
            if mytext is None:
235
                # Support importing null strings.
236
                mytext = ""
237 1
            mytext = mytext.replace('\n', '').replace('\r', '')
238 1
            obj.value = mytext
239 1
        elif ntag == "Guid":
240 1
            self._parse_contained_value(val_el, obj)
241
            # Override parsed string type to guid.
242 1
            obj.valuetype = ntag
243 1
        elif ntag == "NodeId":
244 1
            id_el = val_el.find("uax:Identifier", self.ns)
245 1
            if id_el is not None:
246 1
                obj.value = id_el.text
247 1
        elif ntag == "ExtensionObject":
248 1
            obj.value = self._parse_ext_obj(val_el)
249 1
        elif ntag == "LocalizedText":
250 1
            obj.value = self._parse_body(val_el)
251 1
        elif ntag == "ListOfLocalizedText":
252 1
            obj.value = self._parse_list_of_localized_text(val_el)
253 1
        elif ntag == "ListOfExtensionObject":
254 1
            obj.value = self._parse_list_of_extension_object(val_el)
255 1
        elif ntag.startswith("ListOf"):
256
            # Default case for "ListOf" types.
257
            # Should stay after particular cases (e.g.: "ListOfLocalizedText").
258 1
            obj.value = []
259 1
            for val_el in val_el:
260 1
                tmp = NodeData()
261 1
                self._parse_value(val_el, tmp)
262 1
                obj.value.append(tmp.value)
263
        else:
264
            # Missing according to string_to_val: XmlElement, ExpandedNodeId,
265
            # QualifiedName, StatusCode.
266
            # Missing according to ua.VariantType (also missing in string_to_val):
267
            # DataValue, Variant, DiagnosticInfo.
268
            self.logger.warning("Parsing value of type '%s' not implemented", ntag)
269
270 1
    def _get_text(self, el):
271 1
        txtlist = [txt.strip() for txt in el.itertext()]
272 1
        return "".join(txtlist)
273
274 1
    def _parse_list_of_localized_text(self, el):
275
        # FIXME Why not calling parse_body as for LocalizedText without list?
276 1
        value = []
277 1
        for localized_text in el:
278 1
            ntag = self._retag.match(localized_text.tag).groups()[1]
279 1
            for child in localized_text:
280 1
                ntag = self._retag.match(child.tag).groups()[1]
281 1
                if ntag == 'Text':
282 1
                    value.append(self._get_text(child))
283 1
        return value
284
285 1
    def _parse_list_of_extension_object(self, el):
286
        """
287
        Parse a uax:ListOfExtensionObject Value
288
        Return an list of ExtObj
289
        """
290 1
        value = []
291 1
        for extension_object in el:
292 1
            ext_obj = self._parse_ext_obj(extension_object)
293 1
            value.append(ext_obj)
294 1
        return value
295
296 1
    def _parse_ext_obj(self, el):
297 1
        ext = ExtObj()
298 1
        for extension_object_part in el:
299 1
            ntag = self._retag.match(extension_object_part.tag).groups()[1]
300 1
            if ntag == 'TypeId':
301 1
                ntag = self._retag.match(extension_object_part.find('*').tag).groups()[1]
302 1
                ext.typeid = self._get_text(extension_object_part)
303 1
            elif ntag == 'Body':
304 1
                ext.objname = self._retag.match(extension_object_part.find('*').tag).groups()[1]
305 1
                ext.body = self._parse_body(extension_object_part)
306
            else:
307
                print("Uknown ndtag", ntag)
308 1
        return ext
309
310 1
    def _parse_body(self, el):
311 1
        body = []
312 1
        for body_item in el:
313 1
            otag = self._retag.match(body_item.tag).groups()[1]
314 1
            childs = [i for i in body_item]
315 1
            if not childs:
316 1
                val = self._get_text(body_item)
317
            else:
318 1
                val = self._parse_body(body_item)
319 1
            if val:
320 1
                body.append((otag, val))
321 1
        return body
322
323 1
    def _parse_refs(self, el, obj):
324 1
        for ref in el:
325 1
            if ref.attrib["ReferenceType"] == "HasTypeDefinition":
326 1
                obj.typedef = ref.text
327 1
            elif "IsForward" in ref.attrib and ref.attrib["IsForward"] in ("false", "False"):
328
                # if obj.parent:
329
                    # sys.stderr.write("Parent is already set with: "+ obj.parent + " " + ref.text + "\n")
330 1
                obj.parent = ref.text
331 1
                obj.parentlink = ref.attrib["ReferenceType"]
332
            else:
333 1
                struct = RefStruct()
334 1
                if "IsForward" in ref.attrib:
335
                    struct.forward = ref.attrib["IsForward"]
336 1
                struct.target = ref.text
337 1
                struct.reftype = ref.attrib["ReferenceType"]
338
                obj.refs.append(struct)
339