Passed
Pull Request — master (#238)
by Olivier
02:33
created

asyncua.common.xmlparser.XMLParser._parse_attr()   B

Complexity

Conditions 8

Size

Total Lines 19
Code Lines 17

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 8
eloc 17
nop 3
dl 0
loc 19
rs 7.3333
c 0
b 0
f 0
1
"""
2
parse xml file from asyncua-spec
3
"""
4
import re
5
import asyncio
6
import base64
7
import logging
8
from pytz import utc
9
10
11
import xml.etree.ElementTree as ET
12
13
from .ua_utils import string_to_val
14
from asyncua import ua
15
16
17
def ua_type_to_python(val, uatype_as_str):
18
    """
19
    Converts a string value to a python value according to ua_utils.
20
    """
21
    return string_to_val(val, getattr(ua.VariantType, uatype_as_str))
22
23
24
def _to_bool(val):
25
    """
26
    Easy access to boolean conversion.
27
    """
28
    return ua_type_to_python(val, "Boolean")
29
30
31
class NodeData:
32
33
    def __init__(self):
34
        self.nodetype = None
35
        self.nodeid = None
36
        self.browsename = None
37
        self.displayname = None
38
        self.symname = None  # FIXME: this param is never used, why?
39
        self.parent = None
40
        self.parentlink = None
41
        self.desc = ""
42
        self.typedef = None
43
        self.refs = []
44
        self.nodeclass = None
45
        self.eventnotifier = 0
46
47
        # variable
48
        self.datatype = None
49
        self.rank = -1  # check default value
50
        self.value = None
51
        self.valuetype = None
52
        self.dimensions = None
53
        self.accesslevel = None
54
        self.useraccesslevel = None
55
        self.minsample = None
56
57
        # referencetype
58
        self.inversename = ""
59
        self.abstract = False
60
        self.symmetric = False
61
62
        # datatype
63
        self.definitions = []
64
65
    def __str__(self):
66
        return f"NodeData(nodeid:{self.nodeid})"
67
68
    __repr__ = __str__
69
70
class Field:
71
    def __init__(self, data):
72
        self.datatype = data.get("DataType")
73
        self.name = data.get("Name")
74
        self.optional = bool(data.get("IsOptional", False))
75
        self.valuerank = int(data.get("ValueRank", -1))
76
        self.arraydim = data.get("ArrayDimensions", None) #FIXME: check type
77
78
class RefStruct:
79
80
    def __init__(self):
81
        self.reftype = None
82
        self.forward = True
83
        self.target = None
84
85
    def __str__(self):
86
        return f"RefStruct({self.reftype, self.forward, self.target})"
87
    __repr__ = __str__
88
89
90
class ExtObj:
91
92
    def __init__(self):
93
        self.typeid = None
94
        self.objname = None
95
        self.bodytype = None
96
        self.body = {}
97
98
    def __str__(self):
99
        return f"ExtObj({self.objname}, {self.body})"
100
101
    __repr__ = __str__
102
103
104
class XMLParser:
105
106
    def __init__(self):
107
        self.logger = logging.getLogger(__name__)
108
        self._retag = re.compile(r"(\{.*\})(.*)")
109
        self.root = None
110
        # FIXME: hard to get these xml namespaces with ElementTree, we may have to shift to lxml
111
        self.ns = {
112
            'base': "http://opcfoundation.org/UA/2011/03/UANodeSet.xsd",
113
            'uax': "http://opcfoundation.org/UA/2008/02/Types.xsd",
114
            'xsd': "http://www.w3.org/2001/XMLSchema",
115
            'xsi': "http://www.w3.org/2001/XMLSchema-instance"
116
        }
117
118
    async def parse(self, xmlpath=None, xmlstring=None):
119
        if xmlstring:
120
            self.root = ET.fromstring(xmlstring)
121
        else:
122
            tree = await asyncio.get_event_loop().run_in_executor(None, ET.parse, xmlpath)
123
            self.root = tree.getroot()
124
125
    def parse_sync(self, xmlpath=None, xmlstring=None):
126
        if xmlstring:
127
            self.root = ET.fromstring(xmlstring)
128
        else:
129
            tree = ET.parse(xmlpath)
130
            self.root = tree.getroot()
131
132
    def get_used_namespaces(self):
133
        """
134
        Return the used namespace uris in this import file
135
        """
136
        namespaces_uris = []
137
        for child in self.root:
138
            tag = self._retag.match(child.tag).groups()[1]
139
            if tag == 'NamespaceUris':
140
                namespaces_uris = [ns_element.text for ns_element in child]
141
                break
142
        return namespaces_uris
143
144
    def get_aliases(self) -> dict:
145
        """
146
        Return the used node aliases in this import file
147
        """
148
        aliases = {}
149
        for child in self.root:
150
            tag = self._retag.match(child.tag).groups()[1]
151
            if tag == 'Aliases':
152
                for el in child:
153
                    aliases[el.attrib["Alias"]] = el.text
154
                break
155
        return aliases
156
157
    def get_node_datas(self):
158
        nodes = []
159
        for child in self.root:
160
            tag = self._retag.match(child.tag).groups()[1]
161
            if tag not in ["Aliases", "NamespaceUris", "Extensions", "Models"]:  # these XML tags don't contain nodes
162
                node = self._parse_node(tag, child)
163
                nodes.append(node)
164
        return nodes
165
166
    def _parse_node(self, nodetype, child):
167
        """
168
        Parse a XML node and create a NodeData object.
169
        """
170
        obj = NodeData()
171
        obj.nodetype = nodetype
172
        for key, val in child.attrib.items():
173
            self._set_attr(key, val, obj)
174
        self.logger.info("Parsing node: %s %s", obj.nodeid, obj.browsename)
175
        obj.displayname = obj.browsename  # give a default value to display name
176
        for el in child:
177
            self._parse_attr(el, obj)
178
        return obj
179
180
    def _set_attr(self, key, val, obj):
181
        if key == "NodeId":
182
            obj.nodeid = val
183
        elif key == "BrowseName":
184
            obj.browsename = val
185
        elif key == "SymbolicName":
186
            obj.symname = val
187
        elif key == "ParentNodeId":
188
            obj.parent = val
189
        elif key == "DataType":
190
            obj.datatype = val
191
        elif key == "IsAbstract":
192
            obj.abstract = _to_bool(val)
193
        elif key == "Executable":
194
            obj.executable = _to_bool(val)
195
        elif key == "EventNotifier":
196
            obj.eventnotifier = int(val)
197
        elif key == "ValueRank":
198
            obj.rank = int(val)
199
        elif key == "ArrayDimensions":
200
            obj.dimensions = [int(i) for i in val.split(",")]
201
        elif key == "MinimumSamplingInterval":
202
            obj.minsample = int(val)
203
        elif key == "AccessLevel":
204
            obj.accesslevel = int(val)
205
        elif key == "UserAccessLevel":
206
            obj.useraccesslevel = int(val)
207
        elif key == "Symmetric":
208
            obj.symmetric = _to_bool(val)
209
        else:
210
            self.logger.info("Attribute not implemented: %s:%s", key, val)
211
212
    def _parse_attr(self, el, obj):
213
        tag = self._retag.match(el.tag).groups()[1]
214
215
        if tag == "DisplayName":
216
            obj.displayname = el.text
217
        elif tag == "Description":
218
            obj.desc = el.text
219
        elif tag == "References":
220
            self._parse_refs(el, obj)
221
        elif tag == "Value":
222
            self._parse_contained_value(el, obj)
223
        elif tag == "InverseName":
224
            obj.inversename = el.text
225
        elif tag == "Definition":
226
            for field in el:
227
                field = self._parse_field(field)
228
                obj.definitions.append(field)
229
        else:
230
            self.logger.info("Not implemented tag: %s", el)
231
232
    def _parse_field(self, field):
233
        return Field(field)
234
235
    def _parse_contained_value(self, el, obj):
236
        """
237
        Parse the child of el as a constant.
238
        """
239
        val_el = el.find(".//")  # should be only one child
240
        self._parse_value(val_el, obj)
241
242
    def _parse_value(self, val_el, obj):
243
        """
244
        Parse the node val_el as a constant.
245
        """
246
        if val_el is not None and val_el.text is not None:
247
            ntag = self._retag.match(val_el.tag).groups()[1]
248
        else:
249
            ntag = "Null"
250
251
        obj.valuetype = ntag
252
        if ntag == "Null":
253
            obj.value = None
254
        elif hasattr(ua.ua_binary.Primitives1, ntag):
255
            # Elementary types have their parsing directly relying on ua_type_to_python.
256
            obj.value = ua_type_to_python(val_el.text, ntag)
257
        elif ntag == "DateTime":
258
            obj.value = ua_type_to_python(val_el.text, ntag)
259
            # According to specs, DateTime should be either UTC or with a timezone.
260
            if obj.value.tzinfo is None or obj.value.tzinfo.utcoffset(obj.value) is None:
261
                utc.localize(obj.value)  # FIXME Forcing to UTC if unaware, maybe should raise?
262
        elif ntag == "ByteString":
263
            if val_el.text is None:
264
                mytext = b""
265
            else:
266
                mytext = val_el.text.encode()
267
                mytext = base64.b64decode(mytext)
268
            obj.value = mytext
269
        elif ntag == "String":
270
            mytext = val_el.text
271
            if mytext is None:
272
                # Support importing null strings.
273
                mytext = ""
274
            # mytext = mytext.replace('\n', '').replace('\r', '')
275
            obj.value = mytext
276
        elif ntag == "Guid":
277
            self._parse_contained_value(val_el, obj)
278
            # Override parsed string type to guid.
279
            obj.valuetype = ntag
280
        elif ntag == "NodeId":
281
            id_el = val_el.find("uax:Identifier", self.ns)
282
            if id_el is not None:
283
                obj.value = id_el.text
284
        elif ntag == "ExtensionObject":
285
            obj.value = self._parse_ext_obj(val_el)
286
        elif ntag == "LocalizedText":
287
            obj.value = self._parse_body(val_el)
288
        elif ntag == "ListOfLocalizedText":
289
            obj.value = self._parse_list_of_localized_text(val_el)
290
        elif ntag == "ListOfExtensionObject":
291
            obj.value = self._parse_list_of_extension_object(val_el)
292
        elif ntag.startswith("ListOf"):
293
            # Default case for "ListOf" types.
294
            # Should stay after particular cases (e.g.: "ListOfLocalizedText").
295
            obj.value = []
296
            for val_el in val_el:
297
                tmp = NodeData()
298
                self._parse_value(val_el, tmp)
299
                obj.value.append(tmp.value)
300
        else:
301
            # Missing according to string_to_val: XmlElement, ExpandedNodeId,
302
            # QualifiedName, StatusCode.
303
            # Missing according to ua.VariantType (also missing in string_to_val):
304
            # DataValue, Variant, DiagnosticInfo.
305
            self.logger.warning("Parsing value of type '%s' not implemented", ntag)
306
307
    def _get_text(self, el):
308
        txtlist = [txt.strip() for txt in el.itertext()]
309
        return "".join(txtlist)
310
311
    def _parse_list_of_localized_text(self, el):
312
        value = []
313
        for localized_text in el:
314
            mylist = self._parse_body(localized_text)
315
            # each localized text is in a dictionary with "Locale" and "Text" keys
316
            item = {"Text": None, "Locale": None}
317
            for name, val in mylist:
318
                item.update({str(name): val})
319
            # value is an array of dictionaries with localized texts
320
            value.append(item)
321
        return value
322
323
    def _parse_list_of_extension_object(self, el):
324
        """
325
        Parse a uax:ListOfExtensionObject Value
326
        Return an list of ExtObj
327
        """
328
        value = []
329
        for extension_object in el:
330
            ext_obj = self._parse_ext_obj(extension_object)
331
            value.append(ext_obj)
332
        return value
333
334
    def _parse_ext_obj(self, el):
335
        ext = ExtObj()
336
        for extension_object_part in el:
337
            ntag = self._retag.match(extension_object_part.tag).groups()[1]
338
            if ntag == 'TypeId':
339
                ntag = self._retag.match(extension_object_part.find('*').tag).groups()[1]
340
                ext.typeid = self._get_text(extension_object_part)
341
            elif ntag == 'Body':
342
                ext.objname = self._retag.match(extension_object_part.find('*').tag).groups()[1]
343
                ext.body = self._parse_body(extension_object_part)
344
            else:
345
                self.logger.warning("Unknown ntag", ntag)
346
        return ext
347
348
    def _parse_body(self, el):
349
        body = []
350
        for body_item in el:
351
            otag = self._retag.match(body_item.tag).groups()[1]
352
            childs = [i for i in body_item]
353
            if not childs:
354
                val = self._get_text(body_item)
355
            else:
356
                val = self._parse_body(body_item)
357
            if val:
358
                body.append((otag, val))
359
        return body
360
361
    def _parse_refs(self, el, obj):
362
        parent, parentlink = obj.parent, None
363
364
        for ref in el:
365
            struct = RefStruct()
366
            struct.forward = "IsForward" not in ref.attrib or ref.attrib["IsForward"] not in ("false", "False")
367
            struct.target = ref.text
368
            struct.reftype = ref.attrib["ReferenceType"]
369
            obj.refs.append(struct)
370
371
            if ref.attrib["ReferenceType"] == "HasTypeDefinition":
372
                obj.typedef = ref.text
373
            elif not struct.forward:
374
                parent, parentlink = struct.target, struct.reftype
375
                if obj.parent == parent or obj.parent != parent and not obj.parentlink:
376
                    obj.parentlink = parentlink
377
378
        if obj.parent and not obj.parentlink:
379
            # the case of asimple parent attribute without any reverse link
380
            obj.parentlink = "HasComponent"
381
        if not obj.parent:
382
            obj.parent, obj.parentlink = parent, parentlink
383
        if not obj.parent:
384
            self.logger.info("Could not find parent for node '%s'", obj.nodeid)
385