asyncua.common.xmlparser.XMLParser._set_attr()   F
last analyzed

Complexity

Conditions 15

Size

Total Lines 31
Code Lines 30

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 15
eloc 30
nop 4
dl 0
loc 31
rs 2.9998
c 0
b 0
f 0

How to fix   Complexity   

Complexity

Complex classes like asyncua.common.xmlparser.XMLParser._set_attr() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
parse xml file from asyncua-spec
3
"""
4
import re
5
import asyncio
6
import base64
7
import logging
8
from pytz import utc
9
10
11
import xml.etree.ElementTree as ET
12
13
from .ua_utils import string_to_val
14
from asyncua import ua
15
16
17
def ua_type_to_python(val, uatype_as_str):
18
    """
19
    Converts a string value to a python value according to ua_utils.
20
    """
21
    return string_to_val(val, getattr(ua.VariantType, uatype_as_str))
22
23
24
def _to_bool(val):
25
    """
26
    Easy access to boolean conversion.
27
    """
28
    return ua_type_to_python(val, "Boolean")
29
30
31
class NodeData:
32
33
    def __init__(self):
34
        self.nodetype = None
35
        self.nodeid = None
36
        self.browsename = None
37
        self.displayname = None
38
        self.symname = None  # FIXME: this param is never used, why?
39
        self.parent = None
40
        self.parentlink = None
41
        self.desc = ""
42
        self.typedef = None
43
        self.refs = []
44
        self.nodeclass = None
45
        self.eventnotifier = 0
46
47
        # variable
48
        self.datatype = None
49
        self.rank = -1  # check default value
50
        self.value = None
51
        self.valuetype = None
52
        self.dimensions = None
53
        self.accesslevel = None
54
        self.useraccesslevel = None
55
        self.minsample = None
56
57
        # referencetype
58
        self.inversename = ""
59
        self.abstract = False
60
        self.symmetric = False
61
62
        # datatype
63
        self.definitions = []
64
65
    def __str__(self):
66
        return f"NodeData(nodeid:{self.nodeid})"
67
68
    __repr__ = __str__
69
70
class Field:
71
    def __init__(self, data):
72
        self.datatype = data.get("DataType", "")
73
        self.name = data.get("Name")
74
        self.dname = data.get("DisplayName", "")
75
        self.optional = bool(data.get("IsOptional", False))
76
        self.valuerank = int(data.get("ValueRank", -1))
77
        self.arraydim = data.get("ArrayDimensions", None) #FIXME: check type
78
        self.value = int(data.get("Value", 0))
79
        self.desc = data.get("Description", "")
80
81
class RefStruct:
82
83
    def __init__(self):
84
        self.reftype = None
85
        self.forward = True
86
        self.target = None
87
88
    def __str__(self):
89
        return f"RefStruct({self.reftype, self.forward, self.target})"
90
    __repr__ = __str__
91
92
93
class ExtObj:
94
95
    def __init__(self):
96
        self.typeid = None
97
        self.objname = None
98
        self.bodytype = None
99
        self.body = {}
100
101
    def __str__(self):
102
        return f"ExtObj({self.objname}, {self.body})"
103
104
    __repr__ = __str__
105
106
107
class XMLParser:
108
109
    def __init__(self):
110
        self.logger = logging.getLogger(__name__)
111
        self._retag = re.compile(r"(\{.*\})(.*)")
112
        self.root = None
113
        self.ns = {
114
            'base': "http://opcfoundation.org/UA/2011/03/UANodeSet.xsd",
115
            'uax': "http://opcfoundation.org/UA/2008/02/Types.xsd",
116
            'xsd': "http://www.w3.org/2001/XMLSchema",
117
            'xsi': "http://www.w3.org/2001/XMLSchema-instance"
118
        }
119
120
    async def parse(self, xmlpath=None, xmlstring=None):
121
        if xmlstring:
122
            self.root = ET.fromstring(xmlstring)
123
        else:
124
            tree = await asyncio.get_event_loop().run_in_executor(None, ET.parse, xmlpath)
125
            self.root = tree.getroot()
126
127
    def parse_sync(self, xmlpath=None, xmlstring=None):
128
        if xmlstring:
129
            self.root = ET.fromstring(xmlstring)
130
        else:
131
            tree = ET.parse(xmlpath)
132
            self.root = tree.getroot()
133
134
    def get_used_namespaces(self):
135
        """
136
        Return the used namespace uris in this import file
137
        """
138
        namespaces_uris = []
139
        for child in self.root:
140
            tag = self._retag.match(child.tag).groups()[1]
141
            if tag == 'NamespaceUris':
142
                namespaces_uris = [ns_element.text for ns_element in child]
143
                break
144
        return namespaces_uris
145
146
    def get_aliases(self) -> dict:
147
        """
148
        Return the used node aliases in this import file
149
        """
150
        aliases = {}
151
        for child in self.root:
152
            tag = self._retag.match(child.tag).groups()[1]
153
            if tag == 'Aliases':
154
                for el in child:
155
                    aliases[el.attrib["Alias"]] = el.text
156
                break
157
        return aliases
158
159
    def get_node_datas(self):
160
        nodes = []
161
        for child in self.root:
162
            tag = self._retag.match(child.tag).groups()[1]
163
            if tag not in ["Aliases", "NamespaceUris", "Extensions", "Models"]:  # these XML tags don't contain nodes
164
                node = self._parse_node(tag, child)
165
                nodes.append(node)
166
        return nodes
167
168
    def _parse_node(self, nodetype, child):
169
        """
170
        Parse a XML node and create a NodeData object.
171
        """
172
        obj = NodeData()
173
        obj.nodetype = nodetype
174
        for key, val in child.attrib.items():
175
            self._set_attr(key, val, obj)
176
        self.logger.debug("Parsing node: %s %s", obj.nodeid, obj.browsename)
177
        obj.displayname = obj.browsename  # give a default value to display name
178
        for el in child:
179
            self._parse_attr(el, obj)
180
        return obj
181
182
    def _set_attr(self, key, val, obj):
183
        if key == "NodeId":
184
            obj.nodeid = val
185
        elif key == "BrowseName":
186
            obj.browsename = val
187
        elif key == "SymbolicName":
188
            obj.symname = val
189
        elif key == "ParentNodeId":
190
            obj.parent = val
191
        elif key == "DataType":
192
            obj.datatype = val
193
        elif key == "IsAbstract":
194
            obj.abstract = _to_bool(val)
195
        elif key == "Executable":
196
            obj.executable = _to_bool(val)
197
        elif key == "EventNotifier":
198
            obj.eventnotifier = int(val)
199
        elif key == "ValueRank":
200
            obj.rank = int(val)
201
        elif key == "ArrayDimensions":
202
            obj.dimensions = [int(i) for i in val.split(",")]
203
        elif key == "MinimumSamplingInterval":
204
            obj.minsample = int(val)
205
        elif key == "AccessLevel":
206
            obj.accesslevel = int(val)
207
        elif key == "UserAccessLevel":
208
            obj.useraccesslevel = int(val)
209
        elif key == "Symmetric":
210
            obj.symmetric = _to_bool(val)
211
        else:
212
            self.logger.info("Attribute not implemented: %s:%s", key, val)
213
214
    def _parse_attr(self, el, obj):
215
        tag = self._retag.match(el.tag).groups()[1]
216
217
        if tag == "DisplayName":
218
            obj.displayname = el.text
219
        elif tag == "Description":
220
            obj.desc = el.text
221
        elif tag == "References":
222
            self._parse_refs(el, obj)
223
        elif tag == "Value":
224
            self._parse_contained_value(el, obj)
225
        elif tag == "InverseName":
226
            obj.inversename = el.text
227
        elif tag == "Definition":
228
            for field in el:
229
                field = self._parse_field(field)
230
                obj.definitions.append(field)
231
        else:
232
            self.logger.info("Not implemented tag: %s", el)
233
234
    def _parse_field(self, field):
235
        return Field(field)
236
237
    def _parse_contained_value(self, el, obj):
238
        """
239
        Parse the child of el as a constant.
240
        """
241
        val_el = el.find(".//")  # should be only one child
242
        self._parse_value(val_el, obj)
243
244
    def _parse_value(self, val_el, obj):
245
        """
246
        Parse the node val_el as a constant.
247
        """
248
        if val_el is not None and val_el.text is not None:
249
            ntag = self._retag.match(val_el.tag).groups()[1]
250
        else:
251
            ntag = "Null"
252
253
        obj.valuetype = ntag
254
        if ntag == "Null":
255
            obj.value = None
256
        elif hasattr(ua.ua_binary.Primitives1, ntag):
257
            # Elementary types have their parsing directly relying on ua_type_to_python.
258
            obj.value = ua_type_to_python(val_el.text, ntag)
259
        elif ntag == "DateTime":
260
            obj.value = ua_type_to_python(val_el.text, ntag)
261
            # According to specs, DateTime should be either UTC or with a timezone.
262
            if obj.value.tzinfo is None or obj.value.tzinfo.utcoffset(obj.value) is None:
263
                utc.localize(obj.value)  # FIXME Forcing to UTC if unaware, maybe should raise?
264
        elif ntag == "ByteString":
265
            if val_el.text is None:
266
                mytext = b""
267
            else:
268
                mytext = val_el.text.encode()
269
                mytext = base64.b64decode(mytext)
270
            obj.value = mytext
271
        elif ntag == "String":
272
            mytext = val_el.text
273
            if mytext is None:
274
                # Support importing null strings.
275
                mytext = ""
276
            # mytext = mytext.replace('\n', '').replace('\r', '')
277
            obj.value = mytext
278
        elif ntag == "Guid":
279
            self._parse_contained_value(val_el, obj)
280
            # Override parsed string type to guid.
281
            obj.valuetype = ntag
282
        elif ntag == "NodeId":
283
            id_el = val_el.find("uax:Identifier", self.ns)
284
            if id_el is not None:
285
                obj.value = id_el.text
286
        elif ntag == "ExtensionObject":
287
            obj.value = self._parse_ext_obj(val_el)
288
        elif ntag == "LocalizedText":
289
            obj.value = self._parse_body(val_el)
290
        elif ntag == "ListOfLocalizedText":
291
            obj.value = self._parse_list_of_localized_text(val_el)
292
        elif ntag == "ListOfExtensionObject":
293
            obj.value = self._parse_list_of_extension_object(val_el)
294
        elif ntag.startswith("ListOf"):
295
            # Default case for "ListOf" types.
296
            # Should stay after particular cases (e.g.: "ListOfLocalizedText").
297
            obj.value = []
298
            for val_el in val_el:
299
                tmp = NodeData()
300
                self._parse_value(val_el, tmp)
301
                obj.value.append(tmp.value)
302
        else:
303
            # Missing according to string_to_val: XmlElement, ExpandedNodeId,
304
            # QualifiedName, StatusCode.
305
            # Missing according to ua.VariantType (also missing in string_to_val):
306
            # DataValue, Variant, DiagnosticInfo.
307
            self.logger.warning("Parsing value of type '%s' not implemented", ntag)
308
309
    def _get_text(self, el):
310
        txtlist = [txt.strip() for txt in el.itertext()]
311
        return "".join(txtlist)
312
313
    def _parse_list_of_localized_text(self, el):
314
        value = []
315
        for localized_text in el:
316
            mylist = self._parse_body(localized_text)
317
            # each localized text is in a dictionary with "Locale" and "Text" keys
318
            item = {"Text": None, "Locale": None}
319
            for name, val in mylist:
320
                item.update({str(name): val})
321
            # value is an array of dictionaries with localized texts
322
            value.append(item)
323
        return value
324
325
    def _parse_list_of_extension_object(self, el):
326
        """
327
        Parse a uax:ListOfExtensionObject Value
328
        Return an list of ExtObj
329
        """
330
        value = []
331
        for extension_object in el:
332
            ext_obj = self._parse_ext_obj(extension_object)
333
            value.append(ext_obj)
334
        return value
335
336
    def _parse_ext_obj(self, el):
337
        ext = ExtObj()
338
        for extension_object_part in el:
339
            ntag = self._retag.match(extension_object_part.tag).groups()[1]
340
            if ntag == 'TypeId':
341
                ntag = self._retag.match(extension_object_part.find('*').tag).groups()[1]
342
                ext.typeid = self._get_text(extension_object_part)
343
            elif ntag == 'Body':
344
                ext.objname = self._retag.match(extension_object_part.find('*').tag).groups()[1]
345
                ext.body = self._parse_body(extension_object_part)
346
            else:
347
                self.logger.warning("Unknown ntag", ntag)
348
        return ext
349
350
    def _parse_body(self, el):
351
        body = []
352
        for body_item in el:
353
            otag = self._retag.match(body_item.tag).groups()[1]
354
            childs = [i for i in body_item]
355
            if not childs:
356
                val = self._get_text(body_item)
357
            else:
358
                val = self._parse_body(body_item)
359
            if val:
360
                body.append((otag, val))
361
        return body
362
363
    def _parse_refs(self, el, obj):
364
        parent, parentlink = obj.parent, None
365
366
        for ref in el:
367
            struct = RefStruct()
368
            struct.forward = "IsForward" not in ref.attrib or ref.attrib["IsForward"] not in ("false", "False")
369
            struct.target = ref.text
370
            struct.reftype = ref.attrib["ReferenceType"]
371
            obj.refs.append(struct)
372
373
            if ref.attrib["ReferenceType"] == "HasTypeDefinition":
374
                obj.typedef = ref.text
375
            elif not struct.forward:
376
                parent, parentlink = struct.target, struct.reftype
377
                if obj.parent == parent or obj.parent != parent and not obj.parentlink:
378
                    obj.parentlink = parentlink
379
380
        if obj.parent and not obj.parentlink:
381
            # the case of asimple parent attribute without any reverse link
382
            obj.parentlink = "HasComponent"
383
        if not obj.parent:
384
            obj.parent, obj.parentlink = parent, parentlink
385
        if not obj.parent:
386
            self.logger.info("Could not find parent for node '%s'", obj.nodeid)
387