Completed
Push — master ( ae98a1...b0b448 )
by Olivier
03:37
created

XMLParser._parse_bname()   A

Complexity

Conditions 4

Size

Total Lines 11

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 20

Importance

Changes 0
Metric Value
cc 4
dl 0
loc 11
ccs 0
cts 0
cp 0
crap 20
rs 9.2
c 0
b 0
f 0
1
"""
2
parse xml file from opcua-spec
3
"""
4 1
import logging
5 1
import re
6 1
import sys
7
8 1
import xml.etree.ElementTree as ET
9
10
11 1
def _to_bool(val):
12
    if val in ("True", "true", "on", "On", "1"):
13 1
        return True
14 1
    else:
15 1
        return False
16 1
17 1
18 1
19 1
class NodeData(object):
20 1
21 1
    def __init__(self):
22 1
        self.nodetype = None
23 1
        self.nodeid = None
24 1
        self.browsename = None
25 1
        self.displayname = None
26
        self.symname = None  # FIXME: this param is never used, why?
27
        self.parent = None
28 1
        self.parentlink = None
29 1
        self.desc = ""
30 1
        self.typedef = None
31 1
        self.refs = []
32 1
        self.nodeclass = None
33 1
        self.eventnotifier = 0
34 1
35 1
        # variable
36
        self.datatype = None
37
        self.rank = -1  # check default value
38 1
        self.value = None
39 1
        self.valuetype = None
40 1
        self.dimensions = None
41
        self.accesslevel = None
42
        self.useraccesslevel = None
43 1
        self.minsample = None
44
45
        # referencetype
46 1
        self.inversename = ""
47
        self.abstract = False
48 1
        self.symmetric = False
49
50
        # datatype
51
        self.definition = []
52
53
54 1
class RefStruct(object):
55
56 1
    def __init__(self):
57 1
        self.reftype = None
58 1
        self.forward = True
59 1
        self.target = None
60 1
61
62 1
class ExtObj(object):
63 1
64 1
    def __init__(self):
65
        self.typeid = None
66 1
        self.objname = None
67 1
        self.bodytype = None
68 1
        self.body = {}
69
70 1
71 1
class XMLParser(object):
72 1
73
    def __init__(self, xmlpath, server):
74
        self.server = server  # POC
75 1
        self.logger = logging.getLogger(__name__)
76 1
        self._retag = re.compile(r"(\{.*\})(.*)")
77 1
        self.path = xmlpath
78
        self.aliases = {}
79
80
        self.tree = ET.parse(xmlpath)
81 1
        self.root = self.tree.getroot()
82 1
        self.it = None
83
84 1
        self.namespaces = {}
85
        self._re_nodeid = re.compile(r"^ns=(?P<ns>\d+[^;]*);i=(?P<i>\d+)")
86
87 1
    def __iter__(self):
88 1
        nodes = []
89 1
        for child in self.root:
90 1
            name = self._retag.match(child.tag).groups()[1]
91 1
            if name == "Aliases":
92 1
                for el in child:
93 1
                    self.aliases[el.attrib["Alias"]] = self._get_node_id(el.text)
94 1
            elif name == 'NamespaceUris':
95 1
                for ns_index, ns_element in enumerate(child):
96
                    ns_uri = ns_element.text
97 1
                    ns_server_index = self.server.register_namespace(ns_uri)
98 1
                    self.namespaces[ns_index + 1] = (ns_server_index, ns_uri)
99 1
                    print("namespace offset", ns_index + 1, (ns_server_index, ns_uri))
100 1
            else:
101 1
                node = self._parse_node(name, child)
102 1
                nodes.append(node)
103
104 1
        # The ordering of nodes currently only works if namespaces are
105
        # defined in XML.
106 1
        # Also, it is recommended not to use node ids without namespace prefix!
107 1
        if self.namespaces:
108
            nodes = self._sort_nodes_by_parentid(nodes)
109
110
        self.it = iter(nodes)
111
        return self
112
113
    def __next__(self):
114
        while True:
115
            if sys.version_info[0] < 3:
116
                child = self.it.next()
117
            else:
118
                child = self.it.__next__()
119
            return child
120
121
    def next(self):  # support for python2
122
        return self.__next__()
123
124
    def _sort_nodes_by_parentid(self, nodes):
125
        """
126
        Sort the list of nodes according theire parent node in order to respect
127
        the depency between nodes.
128 1
129 1
        :param nodes: list of NodeDataObjects
130
        :returns: list of sorted nodes
131 1
        """
132
        _nodes = list(nodes)
133 1
        # list of node ids that are already sorted / inserted
134 1
        sorted_nodes_ids = []
135 1
        # list of sorted nodes (i.e. XML Elements)
136 1
        sorted_nodes = []
137 1
        # list of namespace indexes that are relevant for this import
138 1
        # we can only respect ordering nodes for namespaces indexes that
139
        # are defined in the xml file itself. Thus we assume that all other
140
        # references namespaces are already known to the server and should
141
        # not create any dependency problems (like "NodeNotFound")
142
        relevant_namespaces = [str(i[0]) for i in self.namespaces.values()]
143
        while len(_nodes) > 0:
144
            pop_nodes = []
145
            for node in _nodes:
146
                insert = None
147 1
                # Get the node and parent node namespace and id parts
148 1
                node_ns, node_id = self._split_node_id(node.nodeid)
149 1
                parent_ns, parent_id = self._split_node_id(node.parent)
150 1
151 1
                # Insert nodes that
152 1
                #   (1) have no parent / parent_ns is None (e.g. namespace 0)
153 1
                #   (2) ns is not in list of relevant namespaces
154
                if (parent_ns is None or node_ns not in relevant_namespaces or
155 1
                    parent_id is None):
156
                    insert = 0
157
                else:
158
                    # Check if the nodes parent is already in the list of
159
                    # inserted nodes
160 1
                    if node.parent in sorted_nodes_ids:
161 1
                        insert = -1
162 1
163
                if insert == 0:
164 1
                    sorted_nodes.insert(insert, node)
165
                    sorted_nodes_ids.insert(insert, node.nodeid)
166 1
                    pop_nodes.append(node)
167
                elif insert == -1:
168
                    sorted_nodes.append(node)
169
                    sorted_nodes_ids.append(node.nodeid)
170
                    pop_nodes.append(node)
171
172
            # Remove inserted nodes from the list
173
            for node in pop_nodes:
174 1
                _nodes.pop(_nodes.index(node))
175 1
        return sorted_nodes
176 1
177 1
    def _split_node_id(self, value):
178 1
        """
179
        Split the fq node id into namespace and id part.
180
181 1
        :returns: (namespace, id)
182 1
        """
183
        if not value:
184
            return (None, value)
185
        r_match = self._re_nodeid.search(value)
186
        if r_match:
187
            return r_match.groups()
188
189
        return (None, value)
190
191
    def _get_node_id(self, value):
192
        """
193
        Check if the nodeid given in the xml model file must be converted
194
        to a already existing namespace id based on the files namespace uri
195
196
        :returns: NodeId (str)
197
        """
198
        result = value
199
200
        node_ns, node_id = self._split_node_id(value)
201
        if node_ns:
202
            ns_server = self.namespaces.get(int(node_ns), None)
203
            if ns_server:
204
                result = "ns={};i={}".format(ns_server[0], node_id)
205
        return result
206
207
    def _parse_bname(self, bname):
208
        """
209
        Parse a browsename and correct the namespace index.
210
        """
211
        if bname.find(':') != -1:
212
            browse_ns, browse_name = bname.split(':')
213
            if browse_ns:
214
                ns_server = self.namespaces.get(int(browse_ns), None)
215
                if ns_server:
216
                    return '%d:%s' % (ns_server[0], browse_name)
217
        return bname
218
219
    def _parse_node(self, name, child):
220
        """
221
        Parse a XML node and create a NodeData object.
222
        """
223
        obj = NodeData()
224
        obj.nodetype = name
225
        for key, val in child.attrib.items():
226
            self._set_attr(key, val, obj)
227
        self.logger.info("\n     Parsing node: %s %s", obj.nodeid, obj.browsename)
228
        obj.displayname = obj.browsename  # give a default value to display name
229
        for el in child:
230
            self._parse_tag(el, obj)
231
        return obj
232
233
    def _set_attr(self, key, val, obj):
234
        if key == "NodeId":
235
            obj.nodeid = self._get_node_id(val)
236
            print("PARSING", obj.nodeid)
237
        elif key == "BrowseName":
238
            obj.browsename = self._parse_bname(val)
239
        elif key == "SymbolicName":
240
            obj.symname = val
241
        elif key == "ParentNodeId":
242
            obj.parent = self._get_node_id(val)
243
        elif key == "DataType":
244
            obj.datatype = val
245
        elif key == "IsAbstract":
246
            obj.abstract = _to_bool(val)
247
        elif key == "Executable":
248
            obj.executable = _to_bool(val)
249
        elif key == "EventNotifier":
250
            obj.eventnotifier = int(val)
251
        elif key == "ValueRank":
252
            obj.rank = int(val)
253
        elif key == "ArrayDimensions":
254
            obj.dimensions = [int(i) for i in val.split(",")]
255
        elif key == "MinimumSamplingInterval":
256
            obj.minsample = int(val)
257
        elif key == "AccessLevel":
258
            obj.accesslevel = int(val)
259
        elif key == "UserAccessLevel":
260
            obj.useraccesslevel = int(val)
261
        elif key == "Symmetric":
262
            obj.symmetric = _to_bool(val)
263
        else:
264
            self.logger.info("Attribute not implemented: %s:%s", key, val)
265
266
    def _parse_tag(self, el, obj):
267
        tag = self._retag.match(el.tag).groups()[1]
268
269
        if tag == "DisplayName":
270
            obj.displayname = el.text
271
        elif tag == "Description":
272
            obj.desc = el.text
273
        elif tag == "References":
274
            self._parse_refs(el, obj)
275
        elif tag == "Value":
276
            self._parse_value(el, obj)
277
        elif tag == "InverseName":
278
            obj.inversename = el.text
279
        elif tag == "Definition":
280
            for field in el:
281
                obj.definition.append(field)
282
        else:
283
            self.logger.info("Not implemented tag: %s", el)
284
285
    def _parse_value(self, el, obj):
286
        self.logger.info("Parsing value")
287
        for val in el:
288
            self.logger.info("tag %s", val.tag)
289
            ntag = self._retag.match(val.tag).groups()[1]
290
            obj.valuetype = ntag
291
            if ntag in ("Int8", "UInt8", "Int16", "UInt16", "Int32", "UInt32", "Int64", "UInt64"):
292
                obj.value = int(val.text)
293
            elif ntag in ("Float", "Double"):
294
                obj.value = float(val.text)
295
            elif ntag in ("Boolean"):
296
                obj.value = _to_bool(val.text)
297
            elif ntag in ("ByteString", "String"):
298
                mytext = val.text
299
                if mytext is None:  # support importing null strings
300
                    mytext = ""
301
                mytext = mytext.replace('\n', '').replace('\r', '')
302
                # obj.value.append('b"{}"'.format(mytext))
303
                obj.value = mytext
304
            elif ntag in ("Guid"):
305
                self._parse_value(val, obj)
306
            elif ntag == "ListOfExtensionObject":
307
                obj.value = self._parse_list_of_extension_object(el)
308
            elif ntag == "ListOfLocalizedText":
309
                obj.value = self._parse_list_of_localized_text(el)
310
            elif ntag == "ExtensionObject":
311
                obj.value = self._parse_extension_object(el)
312
            else:
313
                self.logger.warning("Value type not implemented: '%s'", ntag)
314
315
    def _get_text(self, el):
316
        txtlist = [txt.strip() for txt in el.itertext()]
317
        return "".join(txtlist)
318
319
    def _parse_list_of_localized_text(self, el):
320
        value = []
321
        for localized_text_list in el:
322
            for localized_text in localized_text_list:
323
                ntag = self._retag.match(localized_text.tag).groups()[1]
324
                for child in localized_text:
325
                    ntag = self._retag.match(child.tag).groups()[1]
326
                    if ntag == 'Text':
327
                        value.append(self._get_text(child))
328
        return value
329
330
    def _parse_list_of_extension_object(self, el):
331
        '''
332
        Parse a uax:ListOfExtensionObject Value
333
        Return an list of ExtObj
334
        '''
335
        value = []
336
        for extension_object_list in el:
337
            for extension_object in extension_object_list:
338
                ext_obj = self._parse_ext_obj(extension_object)
339
                value.append(ext_obj)
340
        return value
341
342
    def _parse_extension_object(self, el):
343
        for ext_obj in el:
344
            return self._parse_ext_obj(ext_obj)
345
346
    def _parse_ext_obj(self, el):
347
        ext = ExtObj()
348
        for extension_object_part in el:
349
            ntag = self._retag.match(extension_object_part.tag).groups()[1]
350
            if ntag == 'TypeId':
351
                ntag = self._retag.match(extension_object_part.find('*').tag).groups()[1]
352
                ext.typeid = self._get_text(extension_object_part)
353
            elif ntag == 'Body':
354
                ext.objname = self._retag.match(extension_object_part.find('*').tag).groups()[1]
355
                ext.body = self._parse_body(extension_object_part)
356
            else:
357
                print("Uknown ndtag", ntag)
358
        return ext
359
360
    def _parse_body(self, el):
361
        body = []
362
        for body_item in el:
363
            otag = self._retag.match(body_item.tag).groups()[1]
364
            childs = [i for i in body_item]
365
            if not childs:
366
                val = self._get_text(body_item)
367
            else:
368
                val = self._parse_body(body_item)
369
            if val:
370
                body.append((otag, val))
371
        return body
372
373
    def _parse_refs(self, el, obj):
374
        for ref in el:
375
            if ref.attrib["ReferenceType"] == "HasTypeDefinition":
376
                obj.typedef = self._get_node_id(ref.text)
377
            elif "IsForward" in ref.attrib and ref.attrib["IsForward"] in ("false", "False"):
378
                # if obj.parent:
379
                    # sys.stderr.write("Parent is already set with: "+ obj.parent + " " + ref.text + "\n")
380
                obj.parent = self._get_node_id(ref.text)
381
                obj.parentlink = ref.attrib["ReferenceType"]
382
            else:
383
                struct = RefStruct()
384
                if "IsForward" in ref.attrib:
385
                    struct.forward = ref.attrib["IsForward"]
386
                struct.target = self._get_node_id(ref.text)
387
                struct.reftype = ref.attrib["ReferenceType"]
388
                obj.refs.append(struct)
389