Completed
Push — master ( 6ab15d...466e15 )
by Olivier
03:43
created

XMLParser.__next__()   A

Complexity

Conditions 3

Size

Total Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 8.2077

Importance

Changes 0
Metric Value
cc 3
dl 0
loc 7
ccs 1
cts 6
cp 0.1666
crap 8.2077
rs 9.4285
c 0
b 0
f 0
1
"""
2
parse xml file from opcua-spec
3
"""
4 1
import logging
5 1
import re
6 1
import sys
7
8 1
import xml.etree.ElementTree as ET
9
10
11 1
def _to_bool(val):
12
    return val in ("True", "true", "on", "On", "1")
13 1
14 1
15 1
def ua_type_to_python(val, uatype):
16 1
    if uatype.startswith("Int") or uatype.startswith("UInt"):
17 1
        return int(val)
18 1
    elif uatype.lower().startswith("bool"):
19 1
        return _to_bool(val)
20 1
    elif uatype in ("Double", "Float"):
21 1
        return float(val)
22 1
    elif uatype == "String":
23 1
        return val
24 1
    elif uatype in ("Bytes", "Bytes", "ByteString", "ByteArray"):
25 1
        if sys.version_info.major > 2:
26
            return bytes(val, 'utf8')
27
        else:
28 1
            return val
29 1
    else:
30 1
        raise Exception("uatype nopt handled", uatype, " for val ", val)
31 1
32 1
33 1
class NodeData(object):
34 1
35 1
    def __init__(self):
36
        self.nodetype = None
37
        self.nodeid = None
38 1
        self.browsename = None
39 1
        self.displayname = None
40 1
        self.symname = None  # FIXME: this param is never used, why?
41
        self.parent = None
42
        self.parentlink = None
43 1
        self.desc = ""
44
        self.typedef = None
45
        self.refs = []
46 1
        self.nodeclass = None
47
        self.eventnotifier = 0
48 1
49
        # variable
50
        self.datatype = None
51
        self.rank = -1  # check default value
52
        self.value = None
53
        self.valuetype = None
54 1
        self.dimensions = None
55
        self.accesslevel = None
56 1
        self.useraccesslevel = None
57 1
        self.minsample = None
58 1
59 1
        # referencetype
60 1
        self.inversename = ""
61
        self.abstract = False
62 1
        self.symmetric = False
63 1
64 1
        # datatype
65
        self.definition = []
66 1
67 1
    def __str__(self):
68 1
        return "NodeData(nodeid:{})".format(self.nodeid)
69
    __repr__ = __str__
70 1
71 1
72 1
class RefStruct(object):
73
74
    def __init__(self):
75 1
        self.reftype = None
76 1
        self.forward = True
77 1
        self.target = None
78
79
80
class ExtObj(object):
81 1
82 1
    def __init__(self):
83
        self.typeid = None
84 1
        self.objname = None
85
        self.bodytype = None
86
        self.body = {}
87 1
88 1
    def __str__(self):
89 1
        return "ExtObj({}, {})".format(self.objname, self.body)
90 1
    __repr__ = __str__
91 1
92 1
93 1
class XMLParser(object):
94 1
95 1
    def __init__(self, xmlpath):
96
        self.logger = logging.getLogger(__name__)
97 1
        self._retag = re.compile(r"(\{.*\})(.*)")
98 1
        self.path = xmlpath
99 1
100 1
        self.tree = ET.parse(xmlpath)
101 1
        self.root = self.tree.getroot()
102 1
        # FIXME: hard to get these xml namespaces with ElementTree, we may have to shift to lxml
103
        self.ns = {
104 1
            'base': "http://opcfoundation.org/UA/2011/03/UANodeSet.xsd",
105
            'uax': "http://opcfoundation.org/UA/2008/02/Types.xsd",
106 1
            'xsd': "http://www.w3.org/2001/XMLSchema",
107 1
            'xsi': "http://www.w3.org/2001/XMLSchema-instance"
108
        }
109
110
    def get_used_namespaces(self):
111
        """
112
        Return the used namespace uris in this import file
113
        """
114
        namespaces_uris = []
115
        for child in self.root:
116
            tag = self._retag.match(child.tag).groups()[1]
117
            if tag == 'NamespaceUris':
118
                namespaces_uris = [ns_element.text for ns_element in child]
119
                break
120
        return namespaces_uris
121
122
    def get_aliases(self):
123
        """
124
        Return the used node aliases in this import file
125
        """
126
        aliases = {}
127
        for child in self.root:
128 1
            tag = self._retag.match(child.tag).groups()[1]
129 1
            if tag == 'Aliases':
130
                for el in child:
131 1
                    aliases[el.attrib["Alias"]] = el.text
132
                break
133 1
        return aliases
134 1
135 1
    def get_node_datas(self):
136 1
        nodes = []
137 1
        for child in self.root:
138 1
            tag = self._retag.match(child.tag).groups()[1]
139
            if tag not in ["Aliases", "NamespaceUris"]:
140
                node = self._parse_node(tag, child)
141
                nodes.append(node)
142
        return nodes
143
144
    def _parse_node(self, nodetype, child):
145
        """
146
        Parse a XML node and create a NodeData object.
147 1
        """
148 1
        obj = NodeData()
149 1
        obj.nodetype = nodetype
150 1
        for key, val in child.attrib.items():
151 1
            self._set_attr(key, val, obj)
152 1
        self.logger.info("Parsing node: %s %s", obj.nodeid, obj.browsename)
153 1
        obj.displayname = obj.browsename  # give a default value to display name
154
        for el in child:
155 1
            self._parse_attr(el, obj)
156
        return obj
157
158
    def _set_attr(self, key, val, obj):
159
        if key == "NodeId":
160 1
            obj.nodeid = val
161 1
        elif key == "BrowseName":
162 1
            obj.browsename = val
163
        elif key == "SymbolicName":
164 1
            obj.symname = val
165
        elif key == "ParentNodeId":
166 1
            obj.parent = val
167
        elif key == "DataType":
168
            obj.datatype = val
169
        elif key == "IsAbstract":
170
            obj.abstract = _to_bool(val)
171
        elif key == "Executable":
172
            obj.executable = _to_bool(val)
173
        elif key == "EventNotifier":
174 1
            obj.eventnotifier = int(val)
175 1
        elif key == "ValueRank":
176 1
            obj.rank = int(val)
177 1
        elif key == "ArrayDimensions":
178 1
            obj.dimensions = [int(i) for i in val.split(",")]
179
        elif key == "MinimumSamplingInterval":
180
            obj.minsample = int(val)
181 1
        elif key == "AccessLevel":
182 1
            obj.accesslevel = int(val)
183
        elif key == "UserAccessLevel":
184
            obj.useraccesslevel = int(val)
185
        elif key == "Symmetric":
186
            obj.symmetric = _to_bool(val)
187
        else:
188
            self.logger.info("Attribute not implemented: %s:%s", key, val)
189
190
    def _parse_attr(self, el, obj):
191
        tag = self._retag.match(el.tag).groups()[1]
192
193
        if tag == "DisplayName":
194
            obj.displayname = el.text
195
        elif tag == "Description":
196
            obj.desc = el.text
197
        elif tag == "References":
198
            self._parse_refs(el, obj)
199
        elif tag == "Value":
200
            self._parse_value(el, obj)
201
        elif tag == "InverseName":
202
            obj.inversename = el.text
203
        elif tag == "Definition":
204
            for field in el:
205
                obj.definition.append(field)
206
        else:
207
            self.logger.info("Not implemented tag: %s", el)
208
209
    def _parse_value(self, val_el, obj):
210
        child_el = val_el.find(".//")  # should be only one child
211
        ntag = self._retag.match(child_el.tag).groups()[1]
212
        obj.valuetype = ntag
213
214
        if ntag in ("Int8", "UInt8", "Int16", "UInt16", "Int32", "UInt32", "Int64", "UInt64"):
215
            obj.value = int(child_el.text)
216
        elif ntag in ("Float", "Double"):
217
            obj.value = float(child_el.text)
218
        elif ntag == "Boolean":
219
            obj.value = _to_bool(child_el.text)
220
        elif ntag in ("ByteString", "String"):
221
            mytext = child_el.text
222
            if mytext is None:  # support importing null strings
223
                mytext = ""
224
            mytext = mytext.replace('\n', '').replace('\r', '')
225
            obj.value = mytext
226
        elif ntag == "DateTime":
227
            obj.value = child_el.text
228
        elif ntag == "Guid":
229
            self._parse_value(child_el, obj)
230
            obj.valuetype = obj.datatype  # override parsed string type to guid
231
        elif ntag == "LocalizedText":
232
            obj.value = self._parse_body(child_el)
233
        elif ntag == "NodeId":
234
            id_el = child_el.find("uax:Identifier", self.ns)
235
            if id_el is not None:
236
                obj.value = id_el.text
237
        elif ntag == "ListOfExtensionObject":
238
            obj.value = self._parse_list_of_extension_object(child_el)
239
        elif ntag == "ListOfLocalizedText":
240
            obj.value = self._parse_list_of_localized_text(child_el)
241
        elif ntag.startswith("ListOf"):
242
            obj.value = self._parse_list(child_el)
243
        elif ntag == "ExtensionObject":
244
            obj.value = self._parse_ext_obj(child_el)
245
        else:
246
            self.logger.warning("Parsing value of type '%s' not implemented", ntag)
247
248
    def _get_text(self, el):
249
        txtlist = [txt.strip() for txt in el.itertext()]
250
        return "".join(txtlist)
251
252
    def _parse_list(self, el):
253
        value = []
254
        for val_el in el:
255
            ntag = self._retag.match(val_el.tag).groups()[1]
256
            if ntag.startswith("ListOf"):
257
                val = self._parse_list(val_el)
258
            else:
259
                val = ua_type_to_python(val_el.text, ntag)
260
            value.append(val)
261
        return value
262
263
    def _parse_list_of_localized_text(self, el):
264
        value = []
265
        for localized_text in el:
266
            ntag = self._retag.match(localized_text.tag).groups()[1]
267
            for child in localized_text:
268
                ntag = self._retag.match(child.tag).groups()[1]
269
                if ntag == 'Text':
270
                    value.append(self._get_text(child))
271
        return value
272
273
    def _parse_list_of_extension_object(self, el):
274
        """
275
        Parse a uax:ListOfExtensionObject Value
276
        Return an list of ExtObj
277
        """
278
        value = []
279
        for extension_object in el:
280
            ext_obj = self._parse_ext_obj(extension_object)
281
            value.append(ext_obj)
282
        return value
283
284
    def _parse_ext_obj(self, el):
285
        ext = ExtObj()
286
        for extension_object_part in el:
287
            ntag = self._retag.match(extension_object_part.tag).groups()[1]
288
            if ntag == 'TypeId':
289
                ntag = self._retag.match(extension_object_part.find('*').tag).groups()[1]
290
                ext.typeid = self._get_text(extension_object_part)
291
            elif ntag == 'Body':
292
                ext.objname = self._retag.match(extension_object_part.find('*').tag).groups()[1]
293
                ext.body = self._parse_body(extension_object_part)
294
            else:
295
                print("Uknown ndtag", ntag)
296
        return ext
297
298
    def _parse_body(self, el):
299
        body = []
300
        for body_item in el:
301
            otag = self._retag.match(body_item.tag).groups()[1]
302
            childs = [i for i in body_item]
303
            if not childs:
304
                val = self._get_text(body_item)
305
            else:
306
                val = self._parse_body(body_item)
307
            if val:
308
                body.append((otag, val))
309
        return body
310
311
    def _parse_refs(self, el, obj):
312
        for ref in el:
313
            if ref.attrib["ReferenceType"] == "HasTypeDefinition":
314
                obj.typedef = ref.text
315
            elif "IsForward" in ref.attrib and ref.attrib["IsForward"] in ("false", "False"):
316
                # if obj.parent:
317
                    # sys.stderr.write("Parent is already set with: "+ obj.parent + " " + ref.text + "\n")
318
                obj.parent = ref.text
319
                obj.parentlink = ref.attrib["ReferenceType"]
320
            else:
321
                struct = RefStruct()
322
                if "IsForward" in ref.attrib:
323
                    struct.forward = ref.attrib["IsForward"]
324
                struct.target = ref.text
325
                struct.reftype = ref.attrib["ReferenceType"]
326
                obj.refs.append(struct)
327