Completed
Push — master ( 960975...2f9535 )
by Olivier
06:03
created

XMLParser._parse_contained_value()   A

Complexity

Conditions 1

Size

Total Lines 6

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
dl 0
loc 6
ccs 3
cts 3
cp 1
crap 1
rs 9.4285
c 0
b 0
f 0
1
"""
2
parse xml file from opcua-spec
3
"""
4 1
import logging
5 1
from pytz import utc
6 1
import uuid
7 1
import re
8 1
import sys
9
10 1
import xml.etree.ElementTree as ET
11
12 1
from opcua.common import ua_utils
13 1
from opcua import ua
14
15
16 1
def ua_type_to_python(val, uatype_as_str):
17
    """
18
    Converts a string value to a python value according to ua_utils.
19
    """
20 1
    return ua_utils.string_to_val(val, getattr(ua.VariantType, uatype_as_str))
21
22 1
def _to_bool(val):
23
    """
24
    Easy access to boolean conversion.
25
    """
26
    return ua_type_to_python(val, "Boolean")
27
28
29 1
class NodeData(object):
30
31 1
    def __init__(self):
32 1
        self.nodetype = None
33 1
        self.nodeid = None
34 1
        self.browsename = None
35 1
        self.displayname = None
36 1
        self.symname = None  # FIXME: this param is never used, why?
37 1
        self.parent = None
38 1
        self.parentlink = None
39 1
        self.desc = ""
40 1
        self.typedef = None
41 1
        self.refs = []
42 1
        self.nodeclass = None
43 1
        self.eventnotifier = 0
44
45
        # variable
46 1
        self.datatype = None
47 1
        self.rank = -1  # check default value
48 1
        self.value = None
49 1
        self.valuetype = None
50 1
        self.dimensions = None
51 1
        self.accesslevel = None
52 1
        self.useraccesslevel = None
53 1
        self.minsample = None
54
55
        # referencetype
56 1
        self.inversename = ""
57 1
        self.abstract = False
58 1
        self.symmetric = False
59
60
        # datatype
61 1
        self.definition = []
62
63 1
    def __str__(self):
64
        return "NodeData(nodeid:{0})".format(self.nodeid)
65 1
    __repr__ = __str__
66
67
68 1
class RefStruct(object):
69
70 1
    def __init__(self):
71 1
        self.reftype = None
72 1
        self.forward = True
73 1
        self.target = None
74
75
76 1
class ExtObj(object):
77
78 1
    def __init__(self):
79 1
        self.typeid = None
80 1
        self.objname = None
81 1
        self.bodytype = None
82 1
        self.body = {}
83
84 1
    def __str__(self):
85 1
        return "ExtObj({0}, {1})".format(self.objname, self.body)
86 1
    __repr__ = __str__
87
88
89 1
class XMLParser(object):
90
91 1
    def __init__(self, xmlpath):
92 1
        self.logger = logging.getLogger(__name__)
93 1
        self._retag = re.compile(r"(\{.*\})(.*)")
94 1
        self.path = xmlpath
95
96 1
        self.tree = ET.parse(xmlpath)
97 1
        self.root = self.tree.getroot()
98
        # FIXME: hard to get these xml namespaces with ElementTree, we may have to shift to lxml
99 1
        self.ns = {
100
            'base': "http://opcfoundation.org/UA/2011/03/UANodeSet.xsd",
101
            'uax': "http://opcfoundation.org/UA/2008/02/Types.xsd",
102
            'xsd': "http://www.w3.org/2001/XMLSchema",
103
            'xsi': "http://www.w3.org/2001/XMLSchema-instance"
104
        }
105
106 1
    def get_used_namespaces(self):
107
        """
108
        Return the used namespace uris in this import file
109
        """
110 1
        namespaces_uris = []
111 1
        for child in self.root:
112 1
            tag = self._retag.match(child.tag).groups()[1]
113 1
            if tag == 'NamespaceUris':
114 1
                namespaces_uris = [ns_element.text for ns_element in child]
115 1
                break
116 1
        return namespaces_uris
117
118 1
    def get_aliases(self):
119
        """
120
        Return the used node aliases in this import file
121
        """
122 1
        aliases = {}
123 1
        for child in self.root:
124 1
            tag = self._retag.match(child.tag).groups()[1]
125 1
            if tag == 'Aliases':
126 1
                for el in child:
127 1
                    aliases[el.attrib["Alias"]] = el.text
128 1
                break
129 1
        return aliases
130
131 1
    def get_node_datas(self):
132 1
        nodes = []
133 1
        for child in self.root:
134 1
            tag = self._retag.match(child.tag).groups()[1]
135 1
            if tag not in ["Aliases", "NamespaceUris", "Extensions", "Models"]:  # these XML tags don't contain nodes
136 1
                node = self._parse_node(tag, child)
137 1
                nodes.append(node)
138 1
        return nodes
139
140 1
    def _parse_node(self, nodetype, child):
141
        """
142
        Parse a XML node and create a NodeData object.
143
        """
144 1
        obj = NodeData()
145 1
        obj.nodetype = nodetype
146 1
        for key, val in child.attrib.items():
147 1
            self._set_attr(key, val, obj)
148 1
        self.logger.info("Parsing node: %s %s", obj.nodeid, obj.browsename)
149 1
        obj.displayname = obj.browsename  # give a default value to display name
150 1
        for el in child:
151 1
            self._parse_attr(el, obj)
152 1
        return obj
153
154 1
    def _set_attr(self, key, val, obj):
155 1
        if key == "NodeId":
156 1
            obj.nodeid = val
157 1
        elif key == "BrowseName":
158 1
            obj.browsename = val
159 1
        elif key == "SymbolicName":
160
            obj.symname = val
161 1
        elif key == "ParentNodeId":
162 1
            obj.parent = val
163 1
        elif key == "DataType":
164 1
            obj.datatype = val
165 1
        elif key == "IsAbstract":
166
            obj.abstract = _to_bool(val)
167 1
        elif key == "Executable":
168
            obj.executable = _to_bool(val)
169 1
        elif key == "EventNotifier":
170
            obj.eventnotifier = int(val)
171 1
        elif key == "ValueRank":
172 1
            obj.rank = int(val)
173 1
        elif key == "ArrayDimensions":
174 1
            obj.dimensions = [int(i) for i in val.split(",")]
175
        elif key == "MinimumSamplingInterval":
176
            obj.minsample = int(val)
177
        elif key == "AccessLevel":
178
            obj.accesslevel = int(val)
179
        elif key == "UserAccessLevel":
180
            obj.useraccesslevel = int(val)
181
        elif key == "Symmetric":
182
            obj.symmetric = _to_bool(val)
183
        else:
184
            self.logger.info("Attribute not implemented: %s:%s", key, val)
185
186 1
    def _parse_attr(self, el, obj):
187 1
        tag = self._retag.match(el.tag).groups()[1]
188
189 1
        if tag == "DisplayName":
190 1
            obj.displayname = el.text
191 1
        elif tag == "Description":
192 1
            obj.desc = el.text
193 1
        elif tag == "References":
194 1
            self._parse_refs(el, obj)
195 1
        elif tag == "Value":
196 1
            self._parse_contained_value(el, obj)
197 1
        elif tag == "InverseName":
198
            obj.inversename = el.text
199 1
        elif tag == "Definition":
200 1
            for field in el:
201 1
                obj.definition.append(field)
202
        else:
203
            self.logger.info("Not implemented tag: %s", el)
204
205 1
    def _parse_contained_value(self, el, obj):
206
        """
207
        Parse the child of el as a constant.
208
        """
209 1
        val_el = el.find(".//")  # should be only one child
210 1
        self._parse_value(val_el, obj)
211
212 1
    def _parse_value(self, val_el, obj):
213
        """
214
        Parse the node val_el as a constant.
215
        """
216 1
        if val_el is not None:
217 1
            ntag = self._retag.match(val_el.tag).groups()[1]
218
        else:
219
            ntag = "Null"
220
221 1
        obj.valuetype = ntag
222 1
        if ntag == "Null":
223
            obj.value = None
224 1
        elif hasattr(ua.ua_binary.Primitives1, ntag):
225
            # Elementary types have their parsing directly relying on ua_type_to_python.
226 1
            obj.value = ua_type_to_python(val_el.text, ntag)
227 1
        elif ntag == "DateTime":
228 1
            obj.value = ua_type_to_python(val_el.text, ntag)
229
            # According to specs, DateTime should be either UTC or with a timezone.
230 1
            if obj.value.tzinfo is None or obj.value.tzinfo.utcoffset(obj.value) is None:
231 1
                utc.localize(obj.value) # FIXME Forcing to UTC if unaware, maybe should raise?
232 1
        elif ntag in ("ByteString", "String"):
233 1
            mytext = ua_type_to_python(val_el.text, ntag)
234 1
            if mytext is None:
235
                # Support importing null strings.
236
                mytext = ""
237 1
            mytext = mytext.replace('\n', '').replace('\r', '')
238 1
            obj.value = mytext
239 1
        elif ntag == "Guid":
240 1
            self._parse_contained_value(val_el, obj)
241
            # Override parsed string type to guid.
242 1
            obj.valuetype = ntag
243 1
        elif ntag == "NodeId":
244 1
            id_el = val_el.find("uax:Identifier", self.ns)
245 1
            if id_el is not None:
246 1
                obj.value = id_el.text
247 1
        elif ntag == "ExtensionObject":
248 1
            obj.value = self._parse_ext_obj(val_el)
249 1
        elif ntag == "LocalizedText":
250 1
            obj.value = self._parse_body(val_el)
251 1
        elif ntag == "ListOfLocalizedText":
252 1
            obj.value = self._parse_list_of_localized_text(val_el)
253 1
        elif ntag == "ListOfExtensionObject":
254 1
            obj.value = self._parse_list_of_extension_object(val_el)
255 1
        elif ntag.startswith("ListOf"):
256
            # Default case for "ListOf" types.
257
            # Should stay after particular cases (e.g.: "ListOfLocalizedText").
258 1
            obj.value = []
259 1
            for val_el in val_el:
260 1
                tmp = NodeData()
261 1
                self._parse_value(val_el, tmp)
262 1
                obj.value.append(tmp.value)
263
        else:
264
            # Missing according to string_to_val: XmlElement, ExpandedNodeId,
265
            # QualifiedName, StatusCode.
266
            # Missing according to ua.VariantType (also missing in string_to_val):
267
            # DataValue, Variant, DiagnosticInfo.
268
            self.logger.warning("Parsing value of type '%s' not implemented", ntag)
269
270 1
    def _get_text(self, el):
271 1
        txtlist = [txt.strip() for txt in el.itertext()]
272 1
        return "".join(txtlist)
273
274 1
    def _parse_list_of_localized_text(self, el):
275
        # FIXME Why not calling parse_body as for LocalizedText without list?
276 1
        value = []
277 1
        for localized_text in el:
278 1
            ntag = self._retag.match(localized_text.tag).groups()[1]
279 1
            for child in localized_text:
280 1
                ntag = self._retag.match(child.tag).groups()[1]
281 1
                if ntag == 'Text':
282 1
                    value.append(self._get_text(child))
283 1
        return value
284
285 1
    def _parse_list_of_extension_object(self, el):
286
        """
287
        Parse a uax:ListOfExtensionObject Value
288
        Return an list of ExtObj
289
        """
290 1
        value = []
291 1
        for extension_object in el:
292 1
            ext_obj = self._parse_ext_obj(extension_object)
293 1
            value.append(ext_obj)
294 1
        return value
295
296 1
    def _parse_ext_obj(self, el):
297 1
        ext = ExtObj()
298 1
        for extension_object_part in el:
299 1
            ntag = self._retag.match(extension_object_part.tag).groups()[1]
300 1
            if ntag == 'TypeId':
301 1
                ntag = self._retag.match(extension_object_part.find('*').tag).groups()[1]
302 1
                ext.typeid = self._get_text(extension_object_part)
303 1
            elif ntag == 'Body':
304 1
                ext.objname = self._retag.match(extension_object_part.find('*').tag).groups()[1]
305 1
                ext.body = self._parse_body(extension_object_part)
306
            else:
307
                print("Uknown ndtag", ntag)
308 1
        return ext
309
310 1
    def _parse_body(self, el):
311 1
        body = []
312 1
        for body_item in el:
313 1
            otag = self._retag.match(body_item.tag).groups()[1]
314 1
            childs = [i for i in body_item]
315 1
            if not childs:
316 1
                val = self._get_text(body_item)
317
            else:
318 1
                val = self._parse_body(body_item)
319 1
            if val:
320 1
                body.append((otag, val))
321 1
        return body
322
323 1
    def _parse_refs(self, el, obj):
324 1
        for ref in el:
325 1
            if ref.attrib["ReferenceType"] == "HasTypeDefinition":
326 1
                obj.typedef = ref.text
327 1
            elif "IsForward" in ref.attrib and ref.attrib["IsForward"] in ("false", "False"):
328
                # if obj.parent:
329
                    # sys.stderr.write("Parent is already set with: "+ obj.parent + " " + ref.text + "\n")
330 1
                obj.parent = ref.text
331 1
                obj.parentlink = ref.attrib["ReferenceType"]
332
            else:
333 1
                struct = RefStruct()
334 1
                if "IsForward" in ref.attrib:
335
                    struct.forward = ref.attrib["IsForward"]
336 1
                struct.target = ref.text
337 1
                struct.reftype = ref.attrib["ReferenceType"]
338
                obj.refs.append(struct)
339