Test Failed
Pull Request — master (#331)
by
unknown
02:55
created

asyncua.common.xmlparser.XMLParser._parse_attr()   C

Complexity

Conditions 9

Size

Total Lines 21
Code Lines 19

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 9
eloc 19
nop 3
dl 0
loc 21
rs 6.6666
c 0
b 0
f 0
1
"""
2
parse xml file from asyncua-spec
3
"""
4
import re
5
import asyncio
6
import base64
7
import logging
8
from pytz import utc
9
10
11
import xml.etree.ElementTree as ET
12
13
from .ua_utils import string_to_val
14
from asyncua import ua
15
16
17
def ua_type_to_python(val, uatype_as_str):
18
    """
19
    Converts a string value to a python value according to ua_utils.
20
    """
21
    return string_to_val(val, getattr(ua.VariantType, uatype_as_str))
22
23
24
def _to_bool(val):
25
    """
26
    Easy access to boolean conversion.
27
    """
28
    return ua_type_to_python(val, "Boolean")
29
30
31
class NodeData:
32
33
    def __init__(self):
34
        self.nodetype = None
35
        self.parent = None
36
        self.parentlink = None
37
        self.typedef = None
38
        self.nodeclass = None
39
        self.eventnotifier = 0
40
41
        # UANode - Part6 Table F.2
42
        self.nodeid = None
43
        self.browsename = None
44
        self.symname = None
45
        self.writemask = None
46
        self.userwritemask = None
47
        self.accessrestrictions = None
48
        self.displayname = None
49
        self.desc = ""
50
        self.category = []
51
        self.documentation = None
52
        self.refs = []
53
        self.rolepermissions = []
54
        self.extensions = None
55
56
        # variable
57
        self.datatype = None
58
        self.rank = -1  # check default value
59
        self.value = None
60
        self.valuetype = None
61
        self.dimensions = None
62
        self.accesslevel = None
63
        self.useraccesslevel = None
64
        self.minsample = None
65
66
        # referencetype
67
        self.inversename = ""
68
        self.abstract = False
69
        self.symmetric = False
70
71
        # datatype
72
        self.definitions = []
73
74
    def __str__(self):
75
        return f"NodeData(nodeid:{self.nodeid})"
76
77
    __repr__ = __str__
78
79
class Field:
80
    def __init__(self, data):
81
        self.datatype = data.get("DataType", "")
82
        self.name = data.get("Name")
83
        self.dname = data.get("DisplayName", "")
84
        self.optional = bool(data.get("IsOptional", False))
85
        self.valuerank = int(data.get("ValueRank", -1))
86
        self.arraydim = data.get("ArrayDimensions", None) #FIXME: check type
87
        self.value = int(data.get("Value", 0))
88
        self.desc = data.get("Description", "")
89
90
class RefStruct:
91
92
    def __init__(self):
93
        self.reftype = None
94
        self.forward = True
95
        self.target = None
96
97
    def __str__(self):
98
        return f"RefStruct({self.reftype, self.forward, self.target})"
99
    __repr__ = __str__
100
101
102
class ExtObj:
103
104
    def __init__(self):
105
        self.typeid = None
106
        self.objname = None
107
        self.bodytype = None
108
        self.body = {}
109
110
    def __str__(self):
111
        return f"ExtObj({self.objname}, {self.body})"
112
113
    __repr__ = __str__
114
115
116
class XMLParser:
117
118
    def __init__(self):
119
        self.logger = logging.getLogger(__name__)
120
        self._retag = re.compile(r"({.*})(.*)")
121
        self.root = None
122
        self.ns = {
123
            'base': "http://opcfoundation.org/UA/2011/03/UANodeSet.xsd",
124
            'uax': "http://opcfoundation.org/UA/2008/02/Types.xsd",
125
            'xsd': "http://www.w3.org/2001/XMLSchema",
126
            'xsi': "http://www.w3.org/2001/XMLSchema-instance"
127
        }
128
129
    async def parse(self, xmlpath=None, xmlstring=None):
130
        if xmlstring:
131
            self.root = ET.fromstring(xmlstring)
132
        else:
133
            tree = await asyncio.get_event_loop().run_in_executor(None, ET.parse, xmlpath)
134
            self.root = tree.getroot()
135
136
    def parse_sync(self, xmlpath=None, xmlstring=None):
137
        if xmlstring:
138
            self.root = ET.fromstring(xmlstring)
139
        else:
140
            tree = ET.parse(xmlpath)
141
            self.root = tree.getroot()
142
143
    def get_used_namespaces(self):
144
        """
145
        Return the used namespace uris in this import file
146
        """
147
        namespaces_uris = []
148
        for child in self.root:
149
            tag = self._retag.match(child.tag).groups()[1]
150
            if tag == 'NamespaceUris':
151
                namespaces_uris = [ns_element.text for ns_element in child]
152
                break
153
        return namespaces_uris
154
155
    def get_aliases(self) -> dict:
156
        """
157
        Return the used node aliases in this import file
158
        """
159
        aliases = {}
160
        for child in self.root:
161
            tag = self._retag.match(child.tag).groups()[1]
162
            if tag == 'Aliases':
163
                for el in child:
164
                    aliases[el.attrib["Alias"]] = el.text
165
                break
166
        return aliases
167
168
    def get_node_datas(self):
169
        nodes = []
170
        for child in self.root:
171
            tag = self._retag.match(child.tag).groups()[1]
172
            if tag not in ["Aliases", "NamespaceUris", "Extensions", "Models"]:  # these XML tags don't contain nodes
173
                node = self._parse_node(tag, child)
174
                nodes.append(node)
175
        return nodes
176
177
    def _parse_node(self, nodetype, child):
178
        """
179
        Parse a XML node and create a NodeData object.
180
        """
181
        obj = NodeData()
182
        obj.nodetype = nodetype
183
        for key, val in child.attrib.items():
184
            self._set_attr(key, val, obj)
185
        self.logger.debug("Parsing node: %s %s", obj.nodeid, obj.browsename)
186
        obj.displayname = obj.browsename  # give a default value to display name
187
        for el in child:
188
            self._parse_attr(el, obj)
189
        return obj
190
191
    def _set_attr(self, key, val, obj):
192
        if key == "NodeId":
193
            obj.nodeid = val
194
        elif key == "BrowseName":
195
            obj.browsename = val
196
        elif key == "SymbolicName":
197
            obj.symname = val
198
        elif key == "ParentNodeId":
199
            obj.parent = val
200
        elif key == "DataType":
201
            obj.datatype = val
202
        elif key == "IsAbstract":
203
            obj.abstract = _to_bool(val)
204
        elif key == "Executable":
205
            obj.executable = _to_bool(val)
206
        elif key == "EventNotifier":
207
            obj.eventnotifier = int(val)
208
        elif key == "ValueRank":
209
            obj.rank = int(val)
210
        elif key == "ArrayDimensions":
211
            obj.dimensions = [int(i) for i in val.split(",")]
212
        elif key == "MinimumSamplingInterval":
213
            obj.minsample = int(val)
214
        elif key == "AccessLevel":
215
            obj.accesslevel = int(val)
216
        elif key == "UserAccessLevel":
217
            obj.useraccesslevel = int(val)
218
        elif key == "Symmetric":
219
            obj.symmetric = _to_bool(val)
220
        else:
221
            self.logger.info("Attribute not implemented: %s:%s", key, val)
222
223
    def _parse_attr(self, el, obj):
224
        tag = self._retag.match(el.tag).groups()[1]
225
226
        if tag == "DisplayName":
227
            obj.displayname = el.text
228
        elif tag == "Description":
229
            obj.desc = el.text
230
        elif tag == "Documentation":
231
            obj.documentation = el.text
232
        elif tag == "References":
233
            self._parse_refs(el, obj)
234
        elif tag == "Value":
235
            self._parse_contained_value(el, obj)
236
        elif tag == "InverseName":
237
            obj.inversename = el.text
238
        elif tag == "Definition":
239
            for field in el:
240
                field = self._parse_field(field)
241
                obj.definitions.append(field)
242
        else:
243
            self.logger.info("Not implemented tag: %s", el)
244
245
    def _parse_field(self, field):
246
        return Field(field)
247
248
    def _parse_contained_value(self, el, obj):
249
        """
250
        Parse the child of el as a constant.
251
        """
252
        val_el = el.find(".//")  # should be only one child
253
        self._parse_value(val_el, obj)
254
255
    def _parse_value(self, val_el, obj):
256
        """
257
        Parse the node val_el as a constant.
258
        """
259
        if val_el is not None and val_el.text is not None:
260
            ntag = self._retag.match(val_el.tag).groups()[1]
261
        else:
262
            ntag = "Null"
263
264
        obj.valuetype = ntag
265
        if ntag == "Null":
266
            obj.value = None
267
        elif hasattr(ua.ua_binary.Primitives1, ntag):
268
            # Elementary types have their parsing directly relying on ua_type_to_python.
269
            obj.value = ua_type_to_python(val_el.text, ntag)
270
        elif ntag == "DateTime":
271
            obj.value = ua_type_to_python(val_el.text, ntag)
272
            # According to specs, DateTime should be either UTC or with a timezone.
273
            if obj.value.tzinfo is None or obj.value.tzinfo.utcoffset(obj.value) is None:
274
                utc.localize(obj.value)  # FIXME Forcing to UTC if unaware, maybe should raise?
275
        elif ntag == "ByteString":
276
            if val_el.text is None:
277
                mytext = b""
278
            else:
279
                mytext = val_el.text.encode()
280
                mytext = base64.b64decode(mytext)
281
            obj.value = mytext
282
        elif ntag == "String":
283
            mytext = val_el.text
284
            if mytext is None:
285
                # Support importing null strings.
286
                mytext = ""
287
            # mytext = mytext.replace('\n', '').replace('\r', '')
288
            obj.value = mytext
289
        elif ntag == "Guid":
290
            self._parse_contained_value(val_el, obj)
291
            # Override parsed string type to guid.
292
            obj.valuetype = ntag
293
        elif ntag == "NodeId":
294
            id_el = val_el.find("uax:Identifier", self.ns)
295
            if id_el is not None:
296
                obj.value = id_el.text
297
        elif ntag == "ExtensionObject":
298
            obj.value = self._parse_ext_obj(val_el)
299
        elif ntag == "LocalizedText":
300
            obj.value = self._parse_body(val_el)
301
        elif ntag == "ListOfLocalizedText":
302
            obj.value = self._parse_list_of_localized_text(val_el)
303
        elif ntag == "ListOfExtensionObject":
304
            obj.value = self._parse_list_of_extension_object(val_el)
305
        elif ntag.startswith("ListOf"):
306
            # Default case for "ListOf" types.
307
            # Should stay after particular cases (e.g.: "ListOfLocalizedText").
308
            obj.value = []
309
            for val_el in val_el:
310
                tmp = NodeData()
311
                self._parse_value(val_el, tmp)
312
                obj.value.append(tmp.value)
313
        else:
314
            # Missing according to string_to_val: XmlElement, ExpandedNodeId,
315
            # QualifiedName, StatusCode.
316
            # Missing according to ua.VariantType (also missing in string_to_val):
317
            # DataValue, Variant, DiagnosticInfo.
318
            self.logger.warning("Parsing value of type '%s' not implemented", ntag)
319
320
    def _get_text(self, el):
321
        txtlist = [txt.strip() for txt in el.itertext()]
322
        return "".join(txtlist)
323
324
    def _parse_list_of_localized_text(self, el):
325
        value = []
326
        for localized_text in el:
327
            mylist = self._parse_body(localized_text)
328
            # each localized text is in a dictionary with "Locale" and "Text" keys
329
            item = {"Text": None, "Locale": None}
330
            for name, val in mylist:
331
                item.update({str(name): val})
332
            # value is an array of dictionaries with localized texts
333
            value.append(item)
334
        return value
335
336
    def _parse_list_of_extension_object(self, el):
337
        """
338
        Parse a uax:ListOfExtensionObject Value
339
        Return an list of ExtObj
340
        """
341
        value = []
342
        for extension_object in el:
343
            ext_obj = self._parse_ext_obj(extension_object)
344
            value.append(ext_obj)
345
        return value
346
347
    def _parse_ext_obj(self, el):
348
        ext = ExtObj()
349
        for extension_object_part in el:
350
            ntag = self._retag.match(extension_object_part.tag).groups()[1]
351
            if ntag == 'TypeId':
352
                ntag = self._retag.match(extension_object_part.find('*').tag).groups()[1]
353
                ext.typeid = self._get_text(extension_object_part)
354
            elif ntag == 'Body':
355
                ext.objname = self._retag.match(extension_object_part.find('*').tag).groups()[1]
356
                ext.body = self._parse_body(extension_object_part)
357
            else:
358
                self.logger.warning("Unknown ntag", ntag)
359
        return ext
360
361
    def _parse_body(self, el):
362
        body = []
363
        for body_item in el:
364
            otag = self._retag.match(body_item.tag).groups()[1]
365
            childs = [i for i in body_item]
366
            if not childs:
367
                val = self._get_text(body_item)
368
            else:
369
                val = self._parse_body(body_item)
370
            if val:
371
                body.append((otag, val))
372
        return body
373
374
    def _parse_refs(self, el, obj):
375
        parent, parentlink = obj.parent, None
376
377
        for ref in el:
378
            struct = RefStruct()
379
            struct.forward = "IsForward" not in ref.attrib or ref.attrib["IsForward"] not in ("false", "False")
380
            struct.target = ref.text
381
            struct.reftype = ref.attrib["ReferenceType"]
382
            obj.refs.append(struct)
383
384
            if ref.attrib["ReferenceType"] == "HasTypeDefinition":
385
                obj.typedef = ref.text
386
            elif not struct.forward:
387
                parent, parentlink = struct.target, struct.reftype
388
                if obj.parent == parent or obj.parent != parent and not obj.parentlink:
389
                    obj.parentlink = parentlink
390
391
        if obj.parent and not obj.parentlink:
392
            # the case of asimple parent attribute without any reverse link
393
            obj.parentlink = "HasComponent"
394
        if not obj.parent:
395
            obj.parent, obj.parentlink = parent, parentlink
396
        if not obj.parent:
397
            self.logger.info("Could not find parent for node '%s'", obj.nodeid)
398