Completed
Push — master ( 0e5e79...d552f1 )
by Olivier
05:52
created

XMLParser._set_attr()   F

Complexity

Conditions 16

Size

Total Lines 31

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 23
CRAP Score 19.2506

Importance

Changes 0
Metric Value
cc 16
c 0
b 0
f 0
dl 0
loc 31
ccs 23
cts 30
cp 0.7667
crap 19.2506
rs 2.7326

How to fix   Complexity   

Complexity

Complex classes like XMLParser._set_attr() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
parse xml file from opcua-spec
3
"""
4 1
import logging
5 1
from pytz import utc
6 1
import uuid
7 1
import re
8 1
import sys
9 1
import base64
10
11 1
import xml.etree.ElementTree as ET
12
13 1
from opcua.common import ua_utils
14 1
from opcua import ua
15
16
17 1
def ua_type_to_python(val, uatype_as_str):
18
    """
19
    Converts a string value to a python value according to ua_utils.
20
    """
21 1
    return ua_utils.string_to_val(val, getattr(ua.VariantType, uatype_as_str))
22
23 1
def _to_bool(val):
24
    """
25
    Easy access to boolean conversion.
26
    """
27
    return ua_type_to_python(val, "Boolean")
28
29
30 1
class NodeData(object):
31
32 1
    def __init__(self):
33 1
        self.nodetype = None
34 1
        self.nodeid = None
35 1
        self.browsename = None
36 1
        self.displayname = None
37 1
        self.symname = None  # FIXME: this param is never used, why?
38 1
        self.parent = None
39 1
        self.parentlink = None
40 1
        self.desc = ""
41 1
        self.typedef = None
42 1
        self.refs = []
43 1
        self.nodeclass = None
44 1
        self.eventnotifier = 0
45
46
        # variable
47 1
        self.datatype = None
48 1
        self.rank = -1  # check default value
49 1
        self.value = None
50 1
        self.valuetype = None
51 1
        self.dimensions = None
52 1
        self.accesslevel = None
53 1
        self.useraccesslevel = None
54 1
        self.minsample = None
55
56
        # referencetype
57 1
        self.inversename = ""
58 1
        self.abstract = False
59 1
        self.symmetric = False
60
61
        # datatype
62 1
        self.definition = []
63
64 1
    def __str__(self):
65
        return "NodeData(nodeid:{0})".format(self.nodeid)
66 1
    __repr__ = __str__
67
68
69 1
class RefStruct(object):
70
71 1
    def __init__(self):
72 1
        self.reftype = None
73 1
        self.forward = True
74 1
        self.target = None
75
76
77 1
class ExtObj(object):
78
79 1
    def __init__(self):
80 1
        self.typeid = None
81 1
        self.objname = None
82 1
        self.bodytype = None
83 1
        self.body = {}
84
85 1
    def __str__(self):
86 1
        return "ExtObj({0}, {1})".format(self.objname, self.body)
87 1
    __repr__ = __str__
88
89
90 1
class XMLParser(object):
91
92 1
    def __init__(self, xmlpath=None, xmlstring=None):
93 1
        self.logger = logging.getLogger(__name__)
94 1
        self._retag = re.compile(r"(\{.*\})(.*)")
95 1
        self.path = xmlpath
96
97 1
        if xmlstring:
98 1
            self.root = ET.fromstring(xmlstring)
99
        else:
100 1
            self.root = ET.parse(xmlpath).getroot()
101
102
        # FIXME: hard to get these xml namespaces with ElementTree, we may have to shift to lxml
103 1
        self.ns = {
104
            'base': "http://opcfoundation.org/UA/2011/03/UANodeSet.xsd",
105
            'uax': "http://opcfoundation.org/UA/2008/02/Types.xsd",
106
            'xsd': "http://www.w3.org/2001/XMLSchema",
107
            'xsi': "http://www.w3.org/2001/XMLSchema-instance"
108
        }
109
110 1
    def get_used_namespaces(self):
111
        """
112
        Return the used namespace uris in this import file
113
        """
114 1
        namespaces_uris = []
115 1
        for child in self.root:
116 1
            tag = self._retag.match(child.tag).groups()[1]
117 1
            if tag == 'NamespaceUris':
118 1
                namespaces_uris = [ns_element.text for ns_element in child]
119 1
                break
120 1
        return namespaces_uris
121
122 1
    def get_aliases(self):
123
        """
124
        Return the used node aliases in this import file
125
        """
126 1
        aliases = {}
127 1
        for child in self.root:
128 1
            tag = self._retag.match(child.tag).groups()[1]
129 1
            if tag == 'Aliases':
130 1
                for el in child:
131 1
                    aliases[el.attrib["Alias"]] = el.text
132 1
                break
133 1
        return aliases
134
135 1
    def get_node_datas(self):
136 1
        nodes = []
137 1
        for child in self.root:
138 1
            tag = self._retag.match(child.tag).groups()[1]
139 1
            if tag not in ["Aliases", "NamespaceUris", "Extensions", "Models"]:  # these XML tags don't contain nodes
140 1
                node = self._parse_node(tag, child)
141 1
                nodes.append(node)
142 1
        return nodes
143
144 1
    def _parse_node(self, nodetype, child):
145
        """
146
        Parse a XML node and create a NodeData object.
147
        """
148 1
        obj = NodeData()
149 1
        obj.nodetype = nodetype
150 1
        for key, val in child.attrib.items():
151 1
            self._set_attr(key, val, obj)
152 1
        self.logger.info("Parsing node: %s %s", obj.nodeid, obj.browsename)
153 1
        obj.displayname = obj.browsename  # give a default value to display name
154 1
        for el in child:
155 1
            self._parse_attr(el, obj)
156 1
        return obj
157
158 1
    def _set_attr(self, key, val, obj):
159 1
        if key == "NodeId":
160 1
            obj.nodeid = val
161 1
        elif key == "BrowseName":
162 1
            obj.browsename = val
163 1
        elif key == "SymbolicName":
164 1
            obj.symname = val
165 1
        elif key == "ParentNodeId":
166 1
            obj.parent = val
167 1
        elif key == "DataType":
168 1
            obj.datatype = val
169 1
        elif key == "IsAbstract":
170
            obj.abstract = _to_bool(val)
171 1
        elif key == "Executable":
172
            obj.executable = _to_bool(val)
173 1
        elif key == "EventNotifier":
174
            obj.eventnotifier = int(val)
175 1
        elif key == "ValueRank":
176 1
            obj.rank = int(val)
177 1
        elif key == "ArrayDimensions":
178 1
            obj.dimensions = [int(i) for i in val.split(",")]
179 1
        elif key == "MinimumSamplingInterval":
180
            obj.minsample = int(val)
181 1
        elif key == "AccessLevel":
182 1
            obj.accesslevel = int(val)
183 1
        elif key == "UserAccessLevel":
184 1
            obj.useraccesslevel = int(val)
185
        elif key == "Symmetric":
186
            obj.symmetric = _to_bool(val)
187
        else:
188
            self.logger.info("Attribute not implemented: %s:%s", key, val)
189
190 1
    def _parse_attr(self, el, obj):
191 1
        tag = self._retag.match(el.tag).groups()[1]
192
193 1
        if tag == "DisplayName":
194 1
            obj.displayname = el.text
195 1
        elif tag == "Description":
196 1
            obj.desc = el.text
197 1
        elif tag == "References":
198 1
            self._parse_refs(el, obj)
199 1
        elif tag == "Value":
200 1
            self._parse_contained_value(el, obj)
201 1
        elif tag == "InverseName":
202
            obj.inversename = el.text
203 1
        elif tag == "Definition":
204 1
            for field in el:
205 1
                obj.definition.append(field)
206
        else:
207
            self.logger.info("Not implemented tag: %s", el)
208
209 1
    def _parse_contained_value(self, el, obj):
210
        """
211
        Parse the child of el as a constant.
212
        """
213 1
        val_el = el.find(".//")  # should be only one child
214 1
        self._parse_value(val_el, obj)
215
216 1
    def _parse_value(self, val_el, obj):
217
        """
218
        Parse the node val_el as a constant.
219
        """
220 1
        if val_el is not None and val_el.text is not None:
221 1
            ntag = self._retag.match(val_el.tag).groups()[1]
222
        else:
223 1
            ntag = "Null"
224
225 1
        obj.valuetype = ntag
226 1
        if ntag == "Null":
227 1
            obj.value = None
228 1
        elif hasattr(ua.ua_binary.Primitives1, ntag):
229
            # Elementary types have their parsing directly relying on ua_type_to_python.
230 1
            obj.value = ua_type_to_python(val_el.text, ntag)
231 1
        elif ntag == "DateTime":
232 1
            obj.value = ua_type_to_python(val_el.text, ntag)
233
            # According to specs, DateTime should be either UTC or with a timezone.
234 1
            if obj.value.tzinfo is None or obj.value.tzinfo.utcoffset(obj.value) is None:
235 1
                utc.localize(obj.value) # FIXME Forcing to UTC if unaware, maybe should raise?
236 1
        elif ntag == "ByteString":
237 1
            if val_el.text is None:
238
                mytext = b""
239
            else:
240 1
                mytext = val_el.text.encode()
241 1
                mytext = base64.b64decode(mytext)
242 1
            obj.value = mytext
243 1
        elif ntag == "String":
244 1
            mytext = val_el.text
245 1
            if mytext is None:
246
                # Support importing null strings.
247
                mytext = ""
248
            #mytext = mytext.replace('\n', '').replace('\r', '')
249 1
            obj.value = mytext
250 1
        elif ntag == "Guid":
251 1
            self._parse_contained_value(val_el, obj)
252
            # Override parsed string type to guid.
253 1
            obj.valuetype = ntag
254 1
        elif ntag == "NodeId":
255 1
            id_el = val_el.find("uax:Identifier", self.ns)
256 1
            if id_el is not None:
257 1
                obj.value = id_el.text
258 1
        elif ntag == "ExtensionObject":
259 1
            obj.value = self._parse_ext_obj(val_el)
260 1
        elif ntag == "LocalizedText":
261 1
            obj.value = self._parse_body(val_el)
262 1
        elif ntag == "ListOfLocalizedText":
263 1
            obj.value = self._parse_list_of_localized_text(val_el)
264 1
        elif ntag == "ListOfExtensionObject":
265 1
            obj.value = self._parse_list_of_extension_object(val_el)
266 1
        elif ntag.startswith("ListOf"):
267
            # Default case for "ListOf" types.
268
            # Should stay after particular cases (e.g.: "ListOfLocalizedText").
269 1
            obj.value = []
270 1
            for val_el in val_el:
271 1
                tmp = NodeData()
272 1
                self._parse_value(val_el, tmp)
273 1
                obj.value.append(tmp.value)
274
        else:
275
            # Missing according to string_to_val: XmlElement, ExpandedNodeId,
276
            # QualifiedName, StatusCode.
277
            # Missing according to ua.VariantType (also missing in string_to_val):
278
            # DataValue, Variant, DiagnosticInfo.
279
            self.logger.warning("Parsing value of type '%s' not implemented", ntag)
280
281 1
    def _get_text(self, el):
282 1
        txtlist = [txt.strip() for txt in el.itertext()]
283 1
        return "".join(txtlist)
284
285 1
    def _parse_list_of_localized_text(self, el):
286 1
        value = []
287 1
        for localized_text in el:
288 1
            mylist = self._parse_body(localized_text)
289
            # small hack since we did not handle LocalizedText as ExtensionObject at begynning
290 1
            for name, val in mylist:
291 1
                if name == "Text":
292 1
                    value.append(val)
293 1
        return value
294
295 1
    def _parse_list_of_extension_object(self, el):
296
        """
297
        Parse a uax:ListOfExtensionObject Value
298
        Return an list of ExtObj
299
        """
300 1
        value = []
301 1
        for extension_object in el:
302 1
            ext_obj = self._parse_ext_obj(extension_object)
303 1
            value.append(ext_obj)
304 1
        return value
305
306 1
    def _parse_ext_obj(self, el):
307 1
        ext = ExtObj()
308 1
        for extension_object_part in el:
309 1
            ntag = self._retag.match(extension_object_part.tag).groups()[1]
310 1
            if ntag == 'TypeId':
311 1
                ntag = self._retag.match(extension_object_part.find('*').tag).groups()[1]
312 1
                ext.typeid = self._get_text(extension_object_part)
313 1
            elif ntag == 'Body':
314 1
                ext.objname = self._retag.match(extension_object_part.find('*').tag).groups()[1]
315 1
                ext.body = self._parse_body(extension_object_part)
316
            else:
317
                self.logger.warning("Unknown ntag", ntag)
318 1
        return ext
319
320 1
    def _parse_body(self, el):
321 1
        body = []
322 1
        for body_item in el:
323 1
            otag = self._retag.match(body_item.tag).groups()[1]
324 1
            childs = [i for i in body_item]
325 1
            if not childs:
326 1
                val = self._get_text(body_item)
327
            else:
328 1
                val = self._parse_body(body_item)
329 1
            if val:
330 1
                body.append((otag, val))
331 1
        return body
332
333 1
    def _parse_refs(self, el, obj):
334 1
        for ref in el:
335 1
            struct = RefStruct()
336 1
            struct.forward = "IsForward" not in ref.attrib or ref.attrib["IsForward"] not in ("false", "False")
337 1
            struct.target = ref.text
338 1
            struct.reftype = ref.attrib["ReferenceType"]
339 1
            obj.refs.append(struct)
340
341 1
            if ref.attrib["ReferenceType"] == "HasTypeDefinition":
342 1
                obj.typedef = ref.text
343 1
            elif not struct.forward:
344 1
                if not obj.parent:
345 1
                    obj.parent = ref.text
346 1
                if obj.parent == ref.text:
347
                    obj.parentlink = ref.attrib["ReferenceType"]
348