Completed
Pull Request — master (#296)
by Olivier
04:13
created

XMLParser._parse_ext_obj()   A

Complexity

Conditions 4

Size

Total Lines 15

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 20

Importance

Changes 0
Metric Value
cc 4
dl 0
loc 15
ccs 0
cts 0
cp 0
crap 20
rs 9.2
c 0
b 0
f 0
1
"""
2
parse xml file from opcua-spec
3
"""
4 1
import logging
5 1
import re
6 1
import sys
7
8 1
import xml.etree.ElementTree as ET
9
10
11 1
class NodeData(object):
12
13 1
    def __init__(self):
14 1
        self.nodetype = None
15 1
        self.nodeid = None
16 1
        self.browsename = None
17 1
        self.displayname = None
18 1
        self.symname = None  # FIXME: this param is never used, why?
19 1
        self.parent = None
20 1
        self.parentlink = None
21 1
        self.desc = ""
22 1
        self.typedef = None
23 1
        self.refs = []
24 1
        self.nodeclass = None
25 1
        self.eventnotifier = 0
26
27
        # variable
28 1
        self.datatype = None
29 1
        self.rank = -1  # check default value
30 1
        self.value = None
31 1
        self.valuetype = None
32 1
        self.dimensions = None
33 1
        self.accesslevel = None
34 1
        self.useraccesslevel = None
35 1
        self.minsample = None
36
37
        # referencetype
38 1
        self.inversename = ""
39 1
        self.abstract = False 
40 1
        self.symmetric = False 
41
42
        # datatype
43 1
        self.definition = []
44
45
46 1
class RefStruct(object):
47
48 1
    def __init__(self):
49
        self.reftype = None
50
        self.forward = True
51
        self.target = None
52
53
54 1
class ExtObj(object):
55
56 1
    def __init__(self):
57 1
        self.typeid = None
58 1
        self.objname = None
59 1
        self.bodytype = None
60 1
        self.body = {}
61
62 1
63 1
class XMLParser(object):
64 1
65
    def __init__(self, xmlpath, server):
66 1
        self.server = server  # POC
67 1
        self.logger = logging.getLogger(__name__)
68 1
        self._retag = re.compile(r"(\{.*\})(.*)")
69
        self.path = xmlpath
70 1
        self.aliases = {}
71 1
72 1
        self.tree = ET.parse(xmlpath)
73
        self.root = self.tree.getroot()
74
        self.it = None
75 1
76 1
        self.namespaces = {}
77 1
        self._re_nodeid = re.compile(r"^ns=(?P<ns>\d+[^;]*);i=(?P<i>\d+)")
78
79
    def __iter__(self):
80
        nodes = []
81 1
        for child in self.root:
82 1
            name = self._retag.match(child.tag).groups()[1]
83
            if name == "Aliases":
84 1
                for el in child:
85
                    self.aliases[el.attrib["Alias"]] = self._get_node_id(el.text)
86
            elif name == 'NamespaceUris':
87 1
                for ns_index, ns_element in enumerate(child):
88 1
                    ns_uri = ns_element.text
89 1
                    ns_server_index = self.server.register_namespace(ns_uri)
90 1
                    self.namespaces[ns_index + 1] = (ns_server_index, ns_uri)
91 1
            else:
92 1
                node = self._parse_node(name, child)
93 1
                nodes.append(node)
94 1
95 1
        # The ordering of nodes currently only works if namespaces are
96
        # defined in XML.
97 1
        # Also, it is recommended not to use node ids without namespace prefix!
98 1
        if self.namespaces:
99 1
            nodes = self._sort_nodes_by_parentid(nodes)
100 1
101 1
        self.it = iter(nodes)
102 1
        return self
103
104 1
    def __next__(self):
105
        while True:
106 1
            if sys.version_info[0] < 3:
107 1
                child = self.it.next()
108
            else:
109
                child = self.it.__next__()
110
            return child
111
112
    def next(self):  # support for python2
113
        return self.__next__()
114
115
    def _sort_nodes_by_parentid(self, nodes):
116
        """
117
        Sort the list of nodes according theire parent node in order to respect
118
        the depency between nodes.
119
120
        :param nodes: list of NodeDataObjects
121
        :returns: list of sorted nodes
122
        """
123
        _nodes = list(nodes)
124
        # list of node ids that are already sorted / inserted
125
        sorted_nodes_ids = []
126
        # list of sorted nodes (i.e. XML Elements)
127
        sorted_nodes = []
128 1
        # list of namespace indexes that are relevant for this import
129 1
        # we can only respect ordering nodes for namespaces indexes that
130
        # are defined in the xml file itself. Thus we assume that all other
131 1
        # references namespaces are already known to the server and should
132
        # not create any dependency problems (like "NodeNotFound")
133 1
        relevant_namespaces = [str(i[0]) for i in self.namespaces.values()]
134 1
        while len(_nodes) > 0:
135 1
            pop_nodes = []
136 1
            for node in _nodes:
137 1
                insert = None
138 1
                # Get the node and parent node namespace and id parts
139
                node_ns, node_id = self._split_node_id(node.nodeid)
140
                parent_ns, parent_id = self._split_node_id(node.parent)
141
142
                # Insert nodes that
143
                #   (1) have no parent / parent_ns is None (e.g. namespace 0)
144
                #   (2) ns is not in list of relevant namespaces
145
                if (parent_ns is None or node_ns not in relevant_namespaces or
146
                    parent_id is None):
147 1
                    insert = 0
148 1
                else:
149 1
                    # Check if the nodes parent is already in the list of
150 1
                    # inserted nodes
151 1
                    if node.parent in sorted_nodes_ids:
152 1
                        insert = -1
153 1
154
                if insert == 0:
155 1
                    sorted_nodes.insert(insert, node)
156
                    sorted_nodes_ids.insert(insert, node.nodeid)
157
                    pop_nodes.append(node)
158
                elif insert == -1:
159
                    sorted_nodes.append(node)
160 1
                    sorted_nodes_ids.append(node.nodeid)
161 1
                    pop_nodes.append(node)
162 1
163
            # Remove inserted nodes from the list
164 1
            for node in pop_nodes:
165
                _nodes.pop(_nodes.index(node))
166 1
        return sorted_nodes
167
168
    def _split_node_id(self, value):
169
        """
170
        Split the fq node id into namespace and id part.
171
172
        :returns: (namespace, id)
173
        """
174 1
        if not value:
175 1
            return (None, value)
176 1
        r_match = self._re_nodeid.search(value)
177 1
        if r_match:
178 1
            return r_match.groups()
179
180
        return (None, value)
181 1
182 1
    def _get_node_id(self, value):
183
        """
184
        Check if the nodeid given in the xml model file must be converted
185
        to a already existing namespace id based on the files namespace uri
186
187
        :returns: NodeId (str)
188
        """
189
        result = value
190
191
        node_ns, node_id = self._split_node_id(value)
192
        if node_ns:
193
            ns_server = self.namespaces.get(int(node_ns), None)
194
            if ns_server:
195
                result = "ns={};i={}".format(ns_server[0], node_id)
196
        return result
197
198
    def _parse_node(self, name, child):
199
        """
200
        Parse a XML node and create a NodeData object.
201
        """
202
        obj = NodeData()
203
        obj.nodetype = name
204
        for key, val in child.attrib.items():
205
            self._set_attr(key, val, obj)
206
        self.logger.info("\n     Parsing node: %s %s", obj.nodeid, obj.browsename)
207
        obj.displayname = obj.browsename  # give a default value to display name
208
        for el in child:
209
            self._parse_tag(el, obj)
210
        return obj
211
212
    def _set_attr(self, key, val, obj):
213
        if key == "NodeId":
214
            obj.nodeid = self._get_node_id(val)
215
            print("PARSING", obj.nodeid)
216
        elif key == "BrowseName":
217
            obj.browsename = val
218
        elif key == "SymbolicName":
219
            obj.symname = val
220
        elif key == "ParentNodeId":
221
            obj.parent = self._get_node_id(val)
222
        elif key == "DataType":
223
            obj.datatype = val
224
        elif key == "IsAbstract":
225
            obj.abstract = self._to_bool(val)
226
        elif key == "Executable":
227
            obj.executable = self._to_bool(val)
228
        elif key == "EventNotifier":
229
            obj.eventnotifier = 1 if val == "1" else 0
230
        elif key == "ValueRank":
231
            obj.rank = int(val)
232
        elif key == "ArrayDimensions":
233
            obj.dimensions = [int(i) for i in val.split(",")]
234
        elif key == "MinimumSamplingInterval":
235
            obj.minsample = int(val)
236
        elif key == "AccessLevel":
237
            obj.accesslevel = int(val)
238
        elif key == "UserAccessLevel":
239
            obj.useraccesslevel = int(val)
240
        elif key == "Symmetric":
241
            obj.symmetric = self._to_bool(val)
242
        else:
243
            self.logger.info("Attribute not implemented: %s:%s", key, val)
244
245
    def _to_bool(self, val):
246
        if val in ("False, false"):
247
            return False
248
        else:
249
            return True
250
    def _parse_tag(self, el, obj):
251
        tag = self._retag.match(el.tag).groups()[1]
252
253
        if tag == "DisplayName":
254
            obj.displayname = el.text
255
        elif tag == "Description":
256
            obj.desc = el.text
257
        elif tag == "References":
258
            self._parse_refs(el, obj)
259
        elif tag == "Value":
260
            self._parse_value(el, obj)
261
        elif tag == "InverseName":
262
            obj.inversename = el.text
263
        elif tag == "Definition":
264
            for field in el:
265
                obj.definition.append(field)
266
        else:
267
            self.logger.info("Not implemented tag: %s", el)
268
269
    def _parse_value(self, el, obj):
270
        self.logger.info("Parsing value")
271
        for val in el:
272
            self.logger.info("tag %s", val.tag)
273
            ntag = self._retag.match(val.tag).groups()[1]
274
            obj.valuetype = ntag
275
            if ntag in ("Int8", "UInt8", "Int16", "UInt16", "Int32", "UInt32", "Int64", "UInt64"):
276
                obj.value = int(val.text)
277
            elif ntag in ("Float", "Double"):
278
                obj.value = float(val.text)
279
            elif ntag in ("Boolean"):
280
                if val.text in ("True", "true", "1", "on", "On"):
281
                    obj.value = bool(1)
282
                else:
283
                    obj.value = bool(0)
284
            elif ntag in ("ByteString", "String"):
285
                mytext = val.text
286
                if mytext is None:  # support importing null strings
287
                    mytext = ""
288
                mytext = mytext.replace('\n', '').replace('\r', '')
289
                # obj.value.append('b"{}"'.format(mytext))
290
                obj.value = mytext
291
            elif ntag in ("Guid"):
292
                self._parse_value(val, obj)
293
            elif ntag == "ListOfExtensionObject":
294
                obj.valuetype = "ExtensionObject"
295
                obj.value = self._parse_list_of_extension_object(el)
296
            elif ntag == "ListOfLocalizedText":
297
                obj.valuetype = "LocalizedText"
298
                obj.value = self._parse_list_of_localized_text(el)
299
            else:
300
                self.logger.info("Value type not implemented: '%s', %s", ntag)
301
302
    def _get_text(self, el):
303
        txt = ""
304
        for text in el.itertext():
305
            txt += text
306
        txt = txt.strip()
307
        return txt
308
309
    def _parse_list_of_localized_text(self, el):
310
        value = []
311
        for localized_text_list in el:
312
            for localized_text in localized_text_list:
313
                ntag = self._retag.match(localized_text.tag).groups()[1]
314
                for child in localized_text:
315
                    ntag = self._retag.match(child.tag).groups()[1]
316
                    if ntag == 'Text':
317
                        value.append(self._get_text(child))
318
        return value
319
320
    def _parse_list_of_extension_object(self, el):
321
        '''
322
        Parse a uax:ListOfExtensionObject Value
323
        
324
        Return an array with a value of each uax:ExtensionObject/*/* (each element is convert to a netry in a dict.
325
               also the valuetype is returned. The valuetype is  uax:ExtensionObject/*/tag()
326
        '''
327
        value = []
328
        for extension_object_list in el:
329
            for extension_object in extension_object_list:
330
                ext_obj = self._parse_ext_obj(extension_object)
331
                value.append(ext_obj)
332
        return value
333
334
    def _parse_ext_obj(self, el):
335
        ext = ExtObj()
336
        for extension_object_part in el:
337
            ntag = self._retag.match(extension_object_part.tag).groups()[1]
338
            if ntag == 'TypeId':
339
                ntag = self._retag.match(extension_object_part.find('*').tag).groups()[1]
340
                ext.typeid = self._get_text(extension_object_part)
341
            elif ntag == 'Body':
342
                ext.objname = self._retag.match(extension_object_part.find('*').tag).groups()[1]
343
                ext.body = self._parse_body(extension_object_part)
344
                print("body", ext.body)
345
            else:
346
                print("Uknoen ndtag", ntag)
347
        print("ext_obj", ext.objname, ext.typeid, ext.body)
348
        return ext
349
350
    def _parse_body(self, el):
351
        body = {}
352
        #ntag = self._retag.match(el.find('*').tag).groups()[1]
353
        #body[ntag] = {}
354
        for body_item in el:
355
            otag = self._retag.match(body_item.tag).groups()[1]
356
            childs = [i for i in body_item]
357
            if not childs:
358
                val = self._get_text(body_item)
359
            elif len(childs) == 1:
360
                val = self._get_text(childs[0])
361
            else:
362
                val = self._parse_body(body_item)
363
            if val:
364
                body[otag] = val
365
        return body
366
367
    def _parse_refs(self, el, obj):
368
        for ref in el:
369
            if ref.attrib["ReferenceType"] == "HasTypeDefinition":
370
                obj.typedef = self._get_node_id(ref.text)
371
            elif "IsForward" in ref.attrib and ref.attrib["IsForward"] in ("false", "False"):
372
                # if obj.parent:
373
                    # sys.stderr.write("Parent is already set with: "+ obj.parent + " " + ref.text + "\n")
374
                obj.parent = self._get_node_id(ref.text)
375
                obj.parentlink = ref.attrib["ReferenceType"]
376
            else:
377
                struct = RefStruct()
378
                if "IsForward" in ref.attrib:
379
                    struct.forward = ref.attrib["IsForward"]
380
                struct.target = self._get_node_id(ref.text)
381
                struct.reftype = ref.attrib["ReferenceType"]
382
                obj.refs.append(struct)
383