Passed
Pull Request — master (#358)
by
unknown
03:50 queued 01:08
created

XMLParser.list_required_models()   A

Complexity

Conditions 3

Size

Total Lines 13
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 8
nop 1
dl 0
loc 13
rs 10
c 0
b 0
f 0
1
"""
2
parse xml file from asyncua-spec
3
"""
4
import re
5
import asyncio
6
import base64
7
import logging
8
from pytz import utc
9
10
import xml.etree.ElementTree as ET
11
12
from .ua_utils import string_to_val
13
from asyncua import ua
14
15
16
def ua_type_to_python(val, uatype_as_str):
17
    """
18
    Converts a string value to a python value according to ua_utils.
19
    """
20
    return string_to_val(val, getattr(ua.VariantType, uatype_as_str))
21
22
23
def _to_bool(val):
24
    """
25
    Easy access to boolean conversion.
26
    """
27
    return ua_type_to_python(val, "Boolean")
28
29
30
class NodeData:
31
32
    def __init__(self):
33
        self.nodetype = None
34
        self.nodeid = None
35
        self.browsename = None
36
        self.displayname = None
37
        self.symname = None  # FIXME: this param is never used, why?
38
        self.parent = None
39
        self.parentlink = None
40
        self.desc = ""
41
        self.typedef = None
42
        self.refs = []
43
        self.nodeclass = None
44
        self.eventnotifier = 0
45
46
        # variable
47
        self.datatype = None
48
        self.rank = -1  # check default value
49
        self.value = None
50
        self.valuetype = None
51
        self.dimensions = None
52
        self.accesslevel = None
53
        self.useraccesslevel = None
54
        self.minsample = None
55
56
        # referencetype
57
        self.inversename = ""
58
        self.abstract = False
59
        self.symmetric = False
60
61
        # datatype
62
        self.definitions = []
63
64
    def __str__(self):
65
        return f"NodeData(nodeid:{self.nodeid})"
66
67
    __repr__ = __str__
68
69
70
class Field:
71
    def __init__(self, data):
72
        self.datatype = data.get("DataType", "")
73
        self.name = data.get("Name")
74
        self.dname = data.get("DisplayName", "")
75
        self.optional = bool(data.get("IsOptional", False))
76
        self.valuerank = int(data.get("ValueRank", -1))
77
        self.arraydim = data.get("ArrayDimensions", None)  # FIXME: check type
78
        self.value = int(data.get("Value", 0))
79
        self.desc = data.get("Description", "")
80
81
82
class RefStruct:
83
84
    def __init__(self):
85
        self.reftype = None
86
        self.forward = True
87
        self.target = None
88
89
    def __str__(self):
90
        return f"RefStruct({self.reftype, self.forward, self.target})"
91
92
    __repr__ = __str__
93
94
95
class ExtObj:
96
97
    def __init__(self):
98
        self.typeid = None
99
        self.objname = None
100
        self.bodytype = None
101
        self.body = {}
102
103
    def __str__(self):
104
        return f"ExtObj({self.objname}, {self.body})"
105
106
    __repr__ = __str__
107
108
109
class XMLParser:
110
111
    def __init__(self):
112
        self.logger = logging.getLogger(__name__)
113
        self._retag = re.compile(r"(\{.*\})(.*)")
114
        self.root = None
115
        self.ns = {
116
            'base': "http://opcfoundation.org/UA/2011/03/UANodeSet.xsd",
117
            'uax': "http://opcfoundation.org/UA/2008/02/Types.xsd",
118
            'xsd': "http://www.w3.org/2001/XMLSchema",
119
            'xsi': "http://www.w3.org/2001/XMLSchema-instance"
120
        }
121
122
    async def parse(self, xmlpath=None, xmlstring=None):
123
        if xmlstring:
124
            self.root = ET.fromstring(xmlstring)
125
        else:
126
            tree = await asyncio.get_event_loop().run_in_executor(None, ET.parse, xmlpath)
127
            self.root = tree.getroot()
128
129
    def parse_sync(self, xmlpath=None, xmlstring=None):
130
        if xmlstring:
131
            self.root = ET.fromstring(xmlstring)
132
        else:
133
            tree = ET.parse(xmlpath)
134
            self.root = tree.getroot()
135
136
    def get_used_namespaces(self):
137
        """
138
        Return the used namespace uris in this import file
139
        """
140
        namespaces_uris = []
141
        for child in self.root:
142
            tag = self._retag.match(child.tag).groups()[1]
143
            if tag == 'NamespaceUris':
144
                namespaces_uris = [ns_element.text for ns_element in child]
145
                break
146
        return namespaces_uris
147
148
    def get_aliases(self) -> dict:
149
        """
150
        Return the used node aliases in this import file
151
        """
152
        aliases = {}
153
        for child in self.root:
154
            tag = self._retag.match(child.tag).groups()[1]
155
            if tag == 'Aliases':
156
                for el in child:
157
                    aliases[el.attrib["Alias"]] = el.text
158
                break
159
        return aliases
160
161
    def get_node_datas(self):
162
        nodes = []
163
        for child in self.root:
164
            tag = self._retag.match(child.tag).groups()[1]
165
            if tag not in ["Aliases", "NamespaceUris", "Extensions", "Models"]:  # these XML tags don't contain nodes
166
                node = self._parse_node(tag, child)
167
                nodes.append(node)
168
        return nodes
169
170
    def _parse_node(self, nodetype, child):
171
        """
172
        Parse a XML node and create a NodeData object.
173
        """
174
        obj = NodeData()
175
        obj.nodetype = nodetype
176
        for key, val in child.attrib.items():
177
            self._set_attr(key, val, obj)
178
        self.logger.debug("Parsing node: %s %s", obj.nodeid, obj.browsename)
179
        obj.displayname = obj.browsename  # give a default value to display name
180
        for el in child:
181
            self._parse_attr(el, obj)
182
        return obj
183
184
    def _set_attr(self, key, val, obj):
185
        if key == "NodeId":
186
            obj.nodeid = val
187
        elif key == "BrowseName":
188
            obj.browsename = val
189
        elif key == "SymbolicName":
190
            obj.symname = val
191
        elif key == "ParentNodeId":
192
            obj.parent = val
193
        elif key == "DataType":
194
            obj.datatype = val
195
        elif key == "IsAbstract":
196
            obj.abstract = _to_bool(val)
197
        elif key == "Executable":
198
            obj.executable = _to_bool(val)
199
        elif key == "EventNotifier":
200
            obj.eventnotifier = int(val)
201
        elif key == "ValueRank":
202
            obj.rank = int(val)
203
        elif key == "ArrayDimensions":
204
            obj.dimensions = [int(i) for i in val.split(",")]
205
        elif key == "MinimumSamplingInterval":
206
            obj.minsample = int(val)
207
        elif key == "AccessLevel":
208
            obj.accesslevel = int(val)
209
        elif key == "UserAccessLevel":
210
            obj.useraccesslevel = int(val)
211
        elif key == "Symmetric":
212
            obj.symmetric = _to_bool(val)
213
        else:
214
            self.logger.info("Attribute not implemented: %s:%s", key, val)
215
216
    def _parse_attr(self, el, obj):
217
        tag = self._retag.match(el.tag).groups()[1]
218
219
        if tag == "DisplayName":
220
            obj.displayname = el.text
221
        elif tag == "Description":
222
            obj.desc = el.text
223
        elif tag == "References":
224
            self._parse_refs(el, obj)
225
        elif tag == "Value":
226
            self._parse_contained_value(el, obj)
227
        elif tag == "InverseName":
228
            obj.inversename = el.text
229
        elif tag == "Definition":
230
            for field in el:
231
                field = self._parse_field(field)
232
                obj.definitions.append(field)
233
        else:
234
            self.logger.info("Not implemented tag: %s", el)
235
236
    def _parse_field(self, field):
237
        return Field(field)
238
239
    def _parse_contained_value(self, el, obj):
240
        """
241
        Parse the child of el as a constant.
242
        """
243
        val_el = el.find(".//")  # should be only one child
244
        self._parse_value(val_el, obj)
245
246
    def _parse_value(self, val_el, obj):
247
        """
248
        Parse the node val_el as a constant.
249
        """
250
        if val_el is not None and val_el.text is not None:
251
            ntag = self._retag.match(val_el.tag).groups()[1]
252
        else:
253
            ntag = "Null"
254
255
        obj.valuetype = ntag
256
        if ntag == "Null":
257
            obj.value = None
258
        elif hasattr(ua.ua_binary.Primitives1, ntag):
259
            # Elementary types have their parsing directly relying on ua_type_to_python.
260
            obj.value = ua_type_to_python(val_el.text, ntag)
261
        elif ntag == "DateTime":
262
            obj.value = ua_type_to_python(val_el.text, ntag)
263
            # According to specs, DateTime should be either UTC or with a timezone.
264
            if obj.value.tzinfo is None or obj.value.tzinfo.utcoffset(obj.value) is None:
265
                utc.localize(obj.value)  # FIXME Forcing to UTC if unaware, maybe should raise?
266
        elif ntag == "ByteString":
267
            if val_el.text is None:
268
                mytext = b""
269
            else:
270
                mytext = val_el.text.encode()
271
                mytext = base64.b64decode(mytext)
272
            obj.value = mytext
273
        elif ntag == "String":
274
            mytext = val_el.text
275
            if mytext is None:
276
                # Support importing null strings.
277
                mytext = ""
278
            # mytext = mytext.replace('\n', '').replace('\r', '')
279
            obj.value = mytext
280
        elif ntag == "Guid":
281
            self._parse_contained_value(val_el, obj)
282
            # Override parsed string type to guid.
283
            obj.valuetype = ntag
284
        elif ntag == "NodeId":
285
            id_el = val_el.find("uax:Identifier", self.ns)
286
            if id_el is not None:
287
                obj.value = id_el.text
288
        elif ntag == "ExtensionObject":
289
            obj.value = self._parse_ext_obj(val_el)
290
        elif ntag == "LocalizedText":
291
            obj.value = self._parse_body(val_el)
292
        elif ntag == "ListOfLocalizedText":
293
            obj.value = self._parse_list_of_localized_text(val_el)
294
        elif ntag == "ListOfExtensionObject":
295
            obj.value = self._parse_list_of_extension_object(val_el)
296
        elif ntag.startswith("ListOf"):
297
            # Default case for "ListOf" types.
298
            # Should stay after particular cases (e.g.: "ListOfLocalizedText").
299
            obj.value = []
300
            for val_el in val_el:
301
                tmp = NodeData()
302
                self._parse_value(val_el, tmp)
303
                obj.value.append(tmp.value)
304
        else:
305
            # Missing according to string_to_val: XmlElement, ExpandedNodeId,
306
            # QualifiedName, StatusCode.
307
            # Missing according to ua.VariantType (also missing in string_to_val):
308
            # DataValue, Variant, DiagnosticInfo.
309
            self.logger.warning("Parsing value of type '%s' not implemented", ntag)
310
311
    def _get_text(self, el):
312
        txtlist = [txt.strip() for txt in el.itertext()]
313
        return "".join(txtlist)
314
315
    def _parse_list_of_localized_text(self, el):
316
        value = []
317
        for localized_text in el:
318
            mylist = self._parse_body(localized_text)
319
            # each localized text is in a dictionary with "Locale" and "Text" keys
320
            item = {"Text": None, "Locale": None}
321
            for name, val in mylist:
322
                item.update({str(name): val})
323
            # value is an array of dictionaries with localized texts
324
            value.append(item)
325
        return value
326
327
    def _parse_list_of_extension_object(self, el):
328
        """
329
        Parse a uax:ListOfExtensionObject Value
330
        Return an list of ExtObj
331
        """
332
        value = []
333
        for extension_object in el:
334
            ext_obj = self._parse_ext_obj(extension_object)
335
            value.append(ext_obj)
336
        return value
337
338
    def _parse_ext_obj(self, el):
339
        ext = ExtObj()
340
        for extension_object_part in el:
341
            ntag = self._retag.match(extension_object_part.tag).groups()[1]
342
            if ntag == 'TypeId':
343
                ntag = self._retag.match(extension_object_part.find('*').tag).groups()[1]
344
                ext.typeid = self._get_text(extension_object_part)
345
            elif ntag == 'Body':
346
                ext.objname = self._retag.match(extension_object_part.find('*').tag).groups()[1]
347
                ext.body = self._parse_body(extension_object_part)
348
            else:
349
                self.logger.warning("Unknown ntag", ntag)
350
        return ext
351
352
    def _parse_body(self, el):
353
        body = []
354
        for body_item in el:
355
            otag = self._retag.match(body_item.tag).groups()[1]
356
            childs = [i for i in body_item]
357
            if not childs:
358
                val = self._get_text(body_item)
359
            else:
360
                val = self._parse_body(body_item)
361
            if val:
362
                body.append((otag, val))
363
        return body
364
365
    def _parse_refs(self, el, obj):
366
        parent, parentlink = obj.parent, None
367
368
        for ref in el:
369
            struct = RefStruct()
370
            struct.forward = "IsForward" not in ref.attrib or ref.attrib["IsForward"] not in ("false", "False")
371
            struct.target = ref.text
372
            struct.reftype = ref.attrib["ReferenceType"]
373
            obj.refs.append(struct)
374
375
            if ref.attrib["ReferenceType"] == "HasTypeDefinition":
376
                obj.typedef = ref.text
377
            elif not struct.forward:
378
                parent, parentlink = struct.target, struct.reftype
379
                if obj.parent == parent or obj.parent != parent and not obj.parentlink:
380
                    obj.parentlink = parentlink
381
382
        if obj.parent and not obj.parentlink:
383
            # the case of asimple parent attribute without any reverse link
384
            obj.parentlink = "HasComponent"
385
        if not obj.parent:
386
            obj.parent, obj.parentlink = parent, parentlink
387
        if not obj.parent:
388
            self.logger.info("Could not find parent for node '%s'", obj.nodeid)
389
390
    def get_running_models(self):
391
        # Get Nodes of Server -> Namespaces -> Uri, Version and Date
392
        pass
393
394
    def _check_required_models_with_server(self):
395
        # sort args in a nice Dict
396
        # -> await get_running_models(server) --> in nice dict as well
397
        # compare if requirement in server models
398
        # Yes? -> all okay, pass
399
        # No? -> raise Exception, not importable!
400
        pass
401
402
    @staticmethod
403
    def list_required_models(xml):
404
        """
405
        Try getting required XML Models, before parsing NodeSet
406
        """
407
        tree = ET.parse(xml)
408
        required_models = []
409
410
        for child in tree.iter():
411
            if child.tag.endswith("RequiredModel"):
412
                # check if ModelUri X, in Version Y from time Z was already imported
413
                required_models.append(child.attrib)
414
        return required_models
415
416