Completed
Pull Request — master (#376)
by
unknown
03:45
created

XMLParser._parse_contained_value()   A

Complexity

Conditions 1

Size

Total Lines 6

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 1

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
c 1
b 0
f 0
dl 0
loc 6
ccs 3
cts 3
cp 1
crap 1
rs 9.4285
1
"""
2
parse xml file from opcua-spec
3
"""
4 1
import logging
5 1
import datetime
6 1
import re
7 1
import sys
8
9 1
import xml.etree.ElementTree as ET
10
11 1
from opcua.common import ua_utils
12 1
from opcua import ua
13
14
15 1
def ua_type_to_python(val, uatype_as_str):
16
    """
17
    Converts a string value to a python value according to ua_utils.
18
    """
19 1
    return ua_utils.string_to_val(val, getattr(ua.VariantType, uatype_as_str))
20
21 1
def _to_bool(val):
22
    """
23
    Easy access to boolean conversion.
24
    """
25
    return ua_type_to_python(val, "Boolean")
26
27
28 1
class NodeData(object):
29
30 1
    def __init__(self):
31 1
        self.nodetype = None
32 1
        self.nodeid = None
33 1
        self.browsename = None
34 1
        self.displayname = None
35 1
        self.symname = None  # FIXME: this param is never used, why?
36 1
        self.parent = None
37 1
        self.parentlink = None
38 1
        self.desc = ""
39 1
        self.typedef = None
40 1
        self.refs = []
41 1
        self.nodeclass = None
42 1
        self.eventnotifier = 0
43
44
        # variable
45 1
        self.datatype = None
46 1
        self.rank = -1  # check default value
47 1
        self.value = None
48 1
        self.valuetype = None
49 1
        self.dimensions = None
50 1
        self.accesslevel = None
51 1
        self.useraccesslevel = None
52 1
        self.minsample = None
53
54
        # referencetype
55 1
        self.inversename = ""
56 1
        self.abstract = False
57 1
        self.symmetric = False
58
59
        # datatype
60 1
        self.definition = []
61
62 1
    def __str__(self):
63
        return "NodeData(nodeid:{0})".format(self.nodeid)
64 1
    __repr__ = __str__
65
66
67 1
class RefStruct(object):
68
69 1
    def __init__(self):
70 1
        self.reftype = None
71 1
        self.forward = True
72 1
        self.target = None
73
74
75 1
class ExtObj(object):
76
77 1
    def __init__(self):
78 1
        self.typeid = None
79 1
        self.objname = None
80 1
        self.bodytype = None
81 1
        self.body = {}
82
83 1
    def __str__(self):
84 1
        return "ExtObj({0}, {1})".format(self.objname, self.body)
85 1
    __repr__ = __str__
86
87
88 1
class XMLParser(object):
89
90 1
    def __init__(self, xmlpath):
91 1
        self.logger = logging.getLogger(__name__)
92 1
        self._retag = re.compile(r"(\{.*\})(.*)")
93 1
        self.path = xmlpath
94
95 1
        self.tree = ET.parse(xmlpath)
96 1
        self.root = self.tree.getroot()
97
        # FIXME: hard to get these xml namespaces with ElementTree, we may have to shift to lxml
98 1
        self.ns = {
99
            'base': "http://opcfoundation.org/UA/2011/03/UANodeSet.xsd",
100
            'uax': "http://opcfoundation.org/UA/2008/02/Types.xsd",
101
            'xsd': "http://www.w3.org/2001/XMLSchema",
102
            'xsi': "http://www.w3.org/2001/XMLSchema-instance"
103
        }
104
105 1
    def get_used_namespaces(self):
106
        """
107
        Return the used namespace uris in this import file
108
        """
109 1
        namespaces_uris = []
110 1
        for child in self.root:
111 1
            tag = self._retag.match(child.tag).groups()[1]
112 1
            if tag == 'NamespaceUris':
113 1
                namespaces_uris = [ns_element.text for ns_element in child]
114 1
                break
115 1
        return namespaces_uris
116
117 1
    def get_aliases(self):
118
        """
119
        Return the used node aliases in this import file
120
        """
121 1
        aliases = {}
122 1
        for child in self.root:
123 1
            tag = self._retag.match(child.tag).groups()[1]
124 1
            if tag == 'Aliases':
125 1
                for el in child:
126 1
                    aliases[el.attrib["Alias"]] = el.text
127 1
                break
128 1
        return aliases
129
130 1
    def get_node_datas(self):
131 1
        nodes = []
132 1
        for child in self.root:
133 1
            tag = self._retag.match(child.tag).groups()[1]
134 1
            if tag not in ["Aliases", "NamespaceUris", "Extensions", "Models"]:  # these XML tags don't contain nodes
135 1
                node = self._parse_node(tag, child)
136 1
                nodes.append(node)
137 1
        return nodes
138
139 1
    def _parse_node(self, nodetype, child):
140
        """
141
        Parse a XML node and create a NodeData object.
142
        """
143 1
        obj = NodeData()
144 1
        obj.nodetype = nodetype
145 1
        for key, val in child.attrib.items():
146 1
            self._set_attr(key, val, obj)
147 1
        self.logger.info("Parsing node: %s %s", obj.nodeid, obj.browsename)
148 1
        obj.displayname = obj.browsename  # give a default value to display name
149 1
        for el in child:
150 1
            self._parse_attr(el, obj)
151 1
        return obj
152
153 1
    def _set_attr(self, key, val, obj):
154 1
        if key == "NodeId":
155 1
            obj.nodeid = val
156 1
        elif key == "BrowseName":
157 1
            obj.browsename = val
158 1
        elif key == "SymbolicName":
159
            obj.symname = val
160 1
        elif key == "ParentNodeId":
161 1
            obj.parent = val
162 1
        elif key == "DataType":
163 1
            obj.datatype = val
164 1
        elif key == "IsAbstract":
165
            obj.abstract = _to_bool(val)
166 1
        elif key == "Executable":
167
            obj.executable = _to_bool(val)
168 1
        elif key == "EventNotifier":
169
            obj.eventnotifier = int(val)
170 1
        elif key == "ValueRank":
171 1
            obj.rank = int(val)
172 1
        elif key == "ArrayDimensions":
173 1
            obj.dimensions = [int(i) for i in val.split(",")]
174
        elif key == "MinimumSamplingInterval":
175
            obj.minsample = int(val)
176
        elif key == "AccessLevel":
177
            obj.accesslevel = int(val)
178
        elif key == "UserAccessLevel":
179
            obj.useraccesslevel = int(val)
180
        elif key == "Symmetric":
181
            obj.symmetric = _to_bool(val)
182
        else:
183
            self.logger.info("Attribute not implemented: %s:%s", key, val)
184
185 1
    def _parse_attr(self, el, obj):
186 1
        tag = self._retag.match(el.tag).groups()[1]
187
188 1
        if tag == "DisplayName":
189 1
            obj.displayname = el.text
190 1
        elif tag == "Description":
191 1
            obj.desc = el.text
192 1
        elif tag == "References":
193 1
            self._parse_refs(el, obj)
194 1
        elif tag == "Value":
195 1
            self._parse_contained_value(el, obj)
196 1
        elif tag == "InverseName":
197
            obj.inversename = el.text
198 1
        elif tag == "Definition":
199 1
            for field in el:
200 1
                obj.definition.append(field)
201
        else:
202
            self.logger.info("Not implemented tag: %s", el)
203
204 1
    def _parse_contained_value(self, el, obj):
205
        """
206
        Parse the child of el as a constant.
207
        """
208 1
        val_el = el.find(".//")  # should be only one child
209 1
        self._parse_value(val_el, obj)
210
211 1
    def _parse_value(self, val_el, obj):
212
        """
213
        Parse the node val_el as a constant.
214
        """
215 1
        if val_el is not None:
216 1
            ntag = self._retag.match(val_el.tag).groups()[1]
217
        else:
218
            ntag = "Null"
219
220 1
        obj.valuetype = ntag
221 1
        if ntag == "Null":
222
            obj.value = None
223 1
        elif ntag in (
224
                "SByte", "Byte",
225
                "Int16", "UInt16", "Int32", "UInt32", "Int64", "UInt64",
226
                "Float", "Double",
227
                "Boolean",
228
                "DateTime" #FIXME Check format in a specific elif?
229
        ):
230
            # Elementary types have their parsing directly relying on ua_type_to_python.
231 1
            obj.value = ua_type_to_python(val_el.text, ntag)
232 1
        elif ntag in ("ByteString", "String"):
233 1
            mytext = ua_type_to_python(val_el.text, ntag)
234 1
            if mytext is None:
235
                # Support importing null strings.
236
                mytext = ""
237 1
            mytext = mytext.replace('\n', '').replace('\r', '')
238 1
            obj.value = mytext
239 1
        elif ntag == "Guid":
240 1
            self._parse_contained_value(val_el, obj)
241 1
            obj.valuetype = obj.datatype # Override parsed string type to guid.
242 1
        elif ntag == "NodeId":
243 1
            id_el = val_el.find("uax:Identifier", self.ns)
244 1
            if id_el is not None:
245 1
                obj.value = id_el.text
246 1
        elif ntag == "LocalizedText":
247 1
            obj.value = self._parse_body(val_el)
248 1
        elif ntag == "ListOfLocalizedText":
249 1
            obj.value = self._parse_list_of_localized_text(val_el)
250 1
        elif ntag == "ListOfExtensionObject":
251 1
            obj.value = self._parse_list_of_extension_object(val_el)
252 1
        elif ntag.startswith("ListOf"):
253
            # Default case for "ListOf" types.
254
            # Should stay after particular cases (e.g.: "ListOfLocalizedText").
255 1
            obj.value = []
256 1
            for val_el in val_el:
257 1
                tmp = NodeData()
258 1
                self._parse_value(val_el, tmp)
259
                # FIXME Do some checks on parsed element?
260 1
                obj.value.append(tmp.value)
261 1
        elif ntag == "ExtensionObject":
262 1
            obj.value = self._parse_ext_obj(val_el)
263
        else:
264
            # Missing according to string_to_val: XmlElement, ExpandedNodeId,
265
            # QualifiedName, StatusCode.
266
            # Missing according to ua.VariantType (also missing in string_to_val):
267
            # DataValue, Variant, DiagnosticInfo.
268
            self.logger.warning("Parsing value of type '%s' not implemented", ntag)
269
270 1
    def _get_text(self, el):
271 1
        txtlist = [txt.strip() for txt in el.itertext()]
272 1
        return "".join(txtlist)
273
274 1
    def _parse_list_of_localized_text(self, el):
275 1
        value = []
276 1
        for localized_text in el:
277 1
            ntag = self._retag.match(localized_text.tag).groups()[1]
278 1
            for child in localized_text:
279 1
                ntag = self._retag.match(child.tag).groups()[1]
280 1
                if ntag == 'Text':
281 1
                    value.append(self._get_text(child))
282 1
        return value
283
284 1
    def _parse_list_of_extension_object(self, el):
285
        """
286
        Parse a uax:ListOfExtensionObject Value
287
        Return an list of ExtObj
288
        """
289 1
        value = []
290 1
        for extension_object in el:
291 1
            ext_obj = self._parse_ext_obj(extension_object)
292 1
            value.append(ext_obj)
293 1
        return value
294
295 1
    def _parse_ext_obj(self, el):
296 1
        ext = ExtObj()
297 1
        for extension_object_part in el:
298 1
            ntag = self._retag.match(extension_object_part.tag).groups()[1]
299 1
            if ntag == 'TypeId':
300 1
                ntag = self._retag.match(extension_object_part.find('*').tag).groups()[1]
301 1
                ext.typeid = self._get_text(extension_object_part)
302 1
            elif ntag == 'Body':
303 1
                ext.objname = self._retag.match(extension_object_part.find('*').tag).groups()[1]
304 1
                ext.body = self._parse_body(extension_object_part)
305
            else:
306
                print("Uknown ndtag", ntag)
307 1
        return ext
308
309 1
    def _parse_body(self, el):
310 1
        body = []
311 1
        for body_item in el:
312 1
            otag = self._retag.match(body_item.tag).groups()[1]
313 1
            childs = [i for i in body_item]
314 1
            if not childs:
315 1
                val = self._get_text(body_item)
316
            else:
317 1
                val = self._parse_body(body_item)
318 1
            if val:
319 1
                body.append((otag, val))
320 1
        return body
321
322 1
    def _parse_refs(self, el, obj):
323 1
        for ref in el:
324 1
            if ref.attrib["ReferenceType"] == "HasTypeDefinition":
325 1
                obj.typedef = ref.text
326 1
            elif "IsForward" in ref.attrib and ref.attrib["IsForward"] in ("false", "False"):
327
                # if obj.parent:
328
                    # sys.stderr.write("Parent is already set with: "+ obj.parent + " " + ref.text + "\n")
329 1
                obj.parent = ref.text
330 1
                obj.parentlink = ref.attrib["ReferenceType"]
331
            else:
332 1
                struct = RefStruct()
333 1
                if "IsForward" in ref.attrib:
334
                    struct.forward = ref.attrib["IsForward"]
335 1
                struct.target = ref.text
336 1
                struct.reftype = ref.attrib["ReferenceType"]
337
                obj.refs.append(struct)
338