Passed
Push — master ( 72629f...de3f23 )
by Olivier
03:10
created

XmlExporter._extobj_to_etree()   A

Complexity

Conditions 3

Size

Total Lines 12

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 11
CRAP Score 3

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 3
c 1
b 0
f 0
dl 0
loc 12
ccs 11
cts 11
cp 1
crap 3
rs 9.4285
1
"""
2
from a list of nodes in the address space, build an XML file
3
format is the one from opc-ua specification
4
"""
5 1
import logging
6 1
from collections import OrderedDict
7 1
import xml.etree.ElementTree as Et
8 1
from copy import copy
9 1
import base64
10
11 1
from opcua import ua
12 1
from opcua.ua import object_ids as o_ids
13 1
from opcua.common.ua_utils import get_base_data_type
14 1
from opcua.ua.uatypes import extension_object_ids
15
16
17 1
class XmlExporter(object):
18
19
    ''' If it is required that for _extobj_to_etree members to the value should be written in a certain
20
        order it can be added to the dictionary below.    
21
    '''
22 1
    extobj_ordered_elements = {
23
        ua.NodeId(ua.ObjectIds.Argument) : ['Name',
24
                                            'DataType',
25
                                            'ValueRank',
26
                                            'ArrayDimensions',
27
                                            'Description']
28
        }
29
30 1
    def __init__(self, server):
31 1
        self.logger = logging.getLogger(__name__)
32 1
        self.server = server
33 1
        self.aliases = {}
34 1
        self._addr_idx_to_xml_idx = {}
35
36 1
        node_set_attributes = OrderedDict()
37 1
        node_set_attributes['xmlns:xsi'] = 'http://www.w3.org/2001/XMLSchema-instance'
38 1
        node_set_attributes['xmlns:uax'] = 'http://opcfoundation.org/UA/2008/02/Types.xsd'
39 1
        node_set_attributes['xmlns:xsd'] = 'http://www.w3.org/2001/XMLSchema'
40 1
        node_set_attributes['xmlns'] = 'http://opcfoundation.org/UA/2011/03/UANodeSet.xsd'
41
42 1
        self.etree = Et.ElementTree(Et.Element('UANodeSet', node_set_attributes))
43
44 1
    def build_etree(self, node_list, uris=None):
45
        """
46
        Create an XML etree object from a list of nodes; custom namespace uris are optional
47
        Namespaces used by nodes are always exported for consistency.
48
        Args:
49
            node_list: list of Node objects for export
50
            uris: list of namespace uri strings
51
52
        Returns:
53
        """
54 1
        self.logger.info('Building XML etree')
55
56 1
        self._add_namespaces(node_list, uris)
57
58
        # add all nodes in the list to the XML etree
59 1
        for node in node_list:
60 1
            self.node_to_etree(node)
61
62
        # add aliases to the XML etree
63 1
        self._add_alias_els()
64
65 1
    def _add_namespaces(self, nodes, uris):
66 1
        idxs = self._get_ns_idxs_of_nodes(nodes)
67
68 1
        ns_array = self.server.get_namespace_array()
69
70
        # now add index of provided uris if necessary
71 1
        if uris:
72
            self._add_idxs_from_uris(idxs, uris, ns_array)
73
74
        # now create a dict of idx_in_address_space to idx_in_exported_file
75 1
        self._addr_idx_to_xml_idx = self._make_idx_dict(idxs, ns_array)
76 1
        ns_to_export = [ns_array[i] for i in sorted(list(self._addr_idx_to_xml_idx.keys())) if i != 0]
77
78
        # write namespaces to xml
79 1
        self._add_namespace_uri_els(ns_to_export)
80
81 1
    def _make_idx_dict(self, idxs, ns_array):
82 1
        idxs.sort()
83 1
        addr_idx_to_xml_idx = {0: 0}
84 1
        for xml_idx, addr_idx in enumerate(idxs):
85 1
            if addr_idx >= len(ns_array):
86 1
                break
87 1
            addr_idx_to_xml_idx[addr_idx] = xml_idx + 1
88 1
        return addr_idx_to_xml_idx
89
90 1
    def _get_ns_idxs_of_nodes(self, nodes):
91
        """
92
        get a list of all indexes used or references by nodes
93
        """
94 1
        idxs = []
95 1
        for node in nodes:
96 1
            node_idxs = [node.nodeid.NamespaceIndex]
97 1
            node_idxs.append(node.get_browse_name().NamespaceIndex)
98 1
            node_idxs.extend(ref.NodeId.NamespaceIndex for ref in node.get_references())
99 1
            node_idxs = list(set(node_idxs))  # remove duplicates
100 1
            for i in node_idxs:
101 1
                if i != 0 and i not in idxs:
102 1
                    idxs.append(i)
103 1
        return idxs
104
105 1
    def _add_idxs_from_uris(self, idxs, uris, ns_array):
106
        for uri in uris:
107
            if uri in ns_array:
108
                i = ns_array.index(uri)
109
                if i not in idxs:
110
                    idxs.append(i)
111
112
113 1
    def write_xml(self, xmlpath, pretty=True):
114
        """
115
        Write the XML etree in the exporter object to a file
116
        Args:
117
            xmlpath: string representing the path/file name
118
119
        Returns:
120
        """
121
        # try to write the XML etree to a file
122 1
        self.logger.info('Exporting XML file to %s', xmlpath)
123
        # from IPython import embed
124
        # embed()
125 1
        if pretty:
126 1
            self.indent(self.etree.getroot())
127 1
            self.etree.write(xmlpath,
128
                             encoding='utf-8',
129
                             xml_declaration=True
130
                            )
131
        else:
132
            self.etree.write(xmlpath,
133
                             encoding='utf-8',
134
                             xml_declaration=True
135
                            )
136
137 1
    def dump_etree(self):
138
        """
139
        Dump etree to console for debugging
140
        Returns:
141
        """
142
        self.logger.info('Dumping XML etree to console')
143
        Et.dump(self.etree)
144
145 1
    def node_to_etree(self, node):
146
        """
147
        Add the necessary XML sub elements to the etree for exporting the node
148
        Args:
149
            node: Node object which will be added to XML etree
150
151
        Returns:
152
        """
153 1
        node_class = node.get_node_class()
154
155 1
        if node_class is ua.NodeClass.Object:
156 1
            self.add_etree_object(node)
157 1
        elif node_class is ua.NodeClass.ObjectType:
158
            self.add_etree_object_type(node)
159 1
        elif node_class is ua.NodeClass.Variable:
160 1
            self.add_etree_variable(node)
161 1
        elif node_class is ua.NodeClass.VariableType:
162
            self.add_etree_variable_type(node)
163 1
        elif node_class is ua.NodeClass.ReferenceType:
164
            self.add_etree_reference_type(node)
165 1
        elif node_class is ua.NodeClass.DataType:
166
            self.add_etree_datatype(node)
167 1
        elif node_class is ua.NodeClass.Method:
168 1
            self.add_etree_method(node)
169
        else:
170
            self.logger.info("Exporting node class not implemented: %s ", node_class)
171
172 1
    def _add_sub_el(self, el, name, text):
173 1
        child_el = Et.SubElement(el, name)
174 1
        child_el.text = text
175 1
        return child_el
176
177 1
    def _node_to_string(self, nodeid):
178 1
        if not isinstance(nodeid, ua.NodeId):
179 1
            nodeid = nodeid.nodeid
180
181 1
        if nodeid.NamespaceIndex in self._addr_idx_to_xml_idx:
182 1
            nodeid = copy(nodeid)
183 1
            nodeid.NamespaceIndex = self._addr_idx_to_xml_idx[nodeid.NamespaceIndex]
184 1
        return nodeid.to_string()
185
186 1
    def _bname_to_string(self, bname):
187 1
        if bname.NamespaceIndex in self._addr_idx_to_xml_idx:
188 1
            bname = copy(bname)
189 1
            bname.NamespaceIndex = self._addr_idx_to_xml_idx[bname.NamespaceIndex]
190 1
        return bname.to_string()
191
192 1
    def _add_node_common(self, nodetype, node):
193 1
        browsename = node.get_browse_name()
194 1
        nodeid = node.nodeid
195 1
        parent = node.get_parent()
196 1
        displayname = node.get_display_name().Text
197 1
        desc = node.get_description().Text
198 1
        node_el = Et.SubElement(self.etree.getroot(), nodetype)
199 1
        node_el.attrib["NodeId"] = self._node_to_string(nodeid)
200 1
        node_el.attrib["BrowseName"] = self._bname_to_string(browsename)
201 1
        if parent is not None:
202 1
            node_class = node.get_node_class()
203 1
            if node_class in (ua.NodeClass.Object, ua.NodeClass.Variable, ua.NodeClass.Method):
204 1
                node_el.attrib["ParentNodeId"] = self._node_to_string(parent)
205 1
        self._add_sub_el(node_el, 'DisplayName', displayname)
206 1
        if desc not in (None, ""):
207 1
            self._add_sub_el(node_el, 'Description', desc)
208
        # FIXME: add WriteMask and UserWriteMask
209 1
        return node_el
210
211 1
    def add_etree_object(self, node):
212
        """
213
        Add a UA object element to the XML etree
214
        """
215 1
        obj_el = self._add_node_common("UAObject", node)
216 1
        var = node.get_attribute(ua.AttributeIds.EventNotifier)
217 1
        if var.Value.Value != 0:
218
            obj_el.attrib["EventNotifier"] = str(var.Value.Value)
219 1
        self._add_ref_els(obj_el, node)
220
221 1
    def add_etree_object_type(self, node):
222
        """
223
        Add a UA object type element to the XML etree
224
        """
225
        obj_el = self._add_node_common("UAObjectType", node)
226
        abstract = node.get_attribute(ua.AttributeIds.IsAbstract).Value.Value
227
        if abstract:
228
            obj_el.attrib["IsAbstract"] = 'true'
229
        self._add_ref_els(obj_el, node)
230
231 1
    def add_variable_common(self, node, el):
232 1
        dtype = node.get_data_type()
233 1
        if dtype.NamespaceIndex == 0 and dtype.Identifier in o_ids.ObjectIdNames:
234 1
            dtype_name = o_ids.ObjectIdNames[dtype.Identifier]
235 1
            self.aliases[dtype] = dtype_name
236
        else:
237 1
            dtype_name = dtype.to_string()
238 1
        rank = node.get_value_rank()
239 1
        if rank != -1:
240 1
            el.attrib["ValueRank"] = str(int(rank))
241 1
        dim = node.get_attribute(ua.AttributeIds.ArrayDimensions)
242 1
        if dim.Value.Value:
243 1
            el.attrib["ArrayDimensions"] = ",".join([str(i) for i in dim.Value.Value])
244 1
        el.attrib["DataType"] = dtype_name
245 1
        self.value_to_etree(el, dtype_name, dtype, node)
246
247 1
    def add_etree_variable(self, node):
248
        """
249
        Add a UA variable element to the XML etree
250
        """
251 1
        var_el = self._add_node_common("UAVariable", node)
252 1
        self._add_ref_els(var_el, node)
253 1
        self.add_variable_common(node, var_el)
254
255 1
        accesslevel = node.get_attribute(ua.AttributeIds.AccessLevel).Value.Value
256 1
        useraccesslevel = node.get_attribute(ua.AttributeIds.UserAccessLevel).Value.Value
257
258
        # We only write these values if they are different from defaults
259
        # Not sure where default is defined....
260 1
        if accesslevel not in (0, ua.AccessLevel.CurrentRead.mask):
261
            var_el.attrib["AccessLevel"] = str(accesslevel)
262 1
        if useraccesslevel not in (0, ua.AccessLevel.CurrentRead.mask):
263
            var_el.attrib["UserAccessLevel"] = str(useraccesslevel)
264
265 1
        var = node.get_attribute(ua.AttributeIds.MinimumSamplingInterval)
266 1
        if var.Value.Value:
267
            var_el.attrib["MinimumSamplingInterval"] = str(var.Value.Value)
268 1
        var = node.get_attribute(ua.AttributeIds.Historizing)
269 1
        if var.Value.Value:
270
            var_el.attrib["Historizing"] = 'true'
271
272 1
    def add_etree_variable_type(self, node):
273
        """
274
        Add a UA variable type element to the XML etree
275
        """
276
277
        var_el = self._add_node_common("UAVariableType", node)
278
        self.add_variable_common(node, var_el)
279
280
        abstract = node.get_attribute(ua.AttributeIds.IsAbstract)
281
        if abstract.Value.Value:
282
            var_el.attrib["IsAbstract"] = "true"
283
284
        self._add_ref_els(var_el, node)
285
286 1
    def add_etree_method(self, node):
287 1
        obj_el = self._add_node_common("UAMethod", node)
288
289 1
        var = node.get_attribute(ua.AttributeIds.Executable)
290 1
        if var.Value.Value is False:
291
            obj_el.attrib["Executable"] = "false"
292 1
        var = node.get_attribute(ua.AttributeIds.UserExecutable)
293 1
        if var.Value.Value is False:
294
            obj_el.attrib["UserExecutable"] = "false"
295 1
        self._add_ref_els(obj_el, node)
296
297 1
    def add_etree_reference_type(self, obj):
298
        obj_el = self._add_node_common("UAReferenceType", obj)
299
        self._add_ref_els(obj_el, obj)
300
        var = obj.get_attribute(ua.AttributeIds.InverseName)
301
        if var is not None and var.Value.Value is not None and var.Value.Value.Text is not None:
302
            self._add_sub_el(obj_el, 'InverseName', var.Value.Value.Text)
303
304 1
    def add_etree_datatype(self, obj):
305
        """
306
        Add a UA data type element to the XML etree
307
        """
308
        obj_el = self._add_node_common("UADataType", obj)
309
        self._add_ref_els(obj_el, obj)
310
311 1
    def _add_namespace_uri_els(self, uris):
312 1
        nuris_el = Et.Element('NamespaceUris')
313
314 1
        for uri in uris:
315 1
            self._add_sub_el(nuris_el, 'Uri', uri)
316
317 1
        self.etree.getroot().insert(0, nuris_el)
318
319 1
    def _add_alias_els(self):
320 1
        aliases_el = Et.Element('Aliases')
321
322 1
        ordered_keys = list(self.aliases.keys())
323 1
        ordered_keys.sort()
324 1
        for nodeid in ordered_keys:
325 1
            name = self.aliases[nodeid]
326 1
            ref_el = Et.SubElement(aliases_el, 'Alias', Alias=name)
327 1
            ref_el.text = nodeid.to_string()
328
329
        # insert behind the namespace element
330 1
        self.etree.getroot().insert(1, aliases_el)
331
332 1
    def _add_ref_els(self, parent_el, obj):
333 1
        refs = obj.get_references()
334 1
        refs_el = Et.SubElement(parent_el, 'References')
335
336 1
        for ref in refs:
337 1
            if ref.ReferenceTypeId.Identifier in o_ids.ObjectIdNames:
338 1
                ref_name = o_ids.ObjectIdNames[ref.ReferenceTypeId.Identifier]
339
            else:
340
                ref_name = ref.ReferenceTypeId.to_string()
341 1
            ref_el = Et.SubElement(refs_el, 'Reference')
342 1
            ref_el.attrib['ReferenceType'] = ref_name
343 1
            if not ref.IsForward:
344 1
                ref_el.attrib['IsForward'] = 'false'
345 1
            ref_el.text = self._node_to_string(ref.NodeId)
346
347 1
            self.aliases[ref.ReferenceTypeId] = ref_name
348
349
350 1
    def member_to_etree(self, el, name, dtype, val):
351 1
        member_el = Et.SubElement(el, "uax:" + name)
352 1
        if isinstance(val, (list, tuple)):
353 1
            for v in val:
354 1
                self._value_to_etree(member_el, ua.ObjectIdNames[dtype.Identifier], dtype, v)
355
        else:
356 1
            self._val_to_etree(member_el, dtype, val)
357
358
359 1
    def _val_to_etree(self, el, dtype, val):
360 1
        if dtype == ua.NodeId(ua.ObjectIds.NodeId):
361 1
            id_el = Et.SubElement(el, "uax:Identifier")
362 1
            id_el.text = val.to_string()
363 1
        elif dtype == ua.NodeId(ua.ObjectIds.Guid):
364 1
            id_el = Et.SubElement(el, "uax:String")
365 1
            id_el.text = str(val)
366 1
        elif dtype == ua.NodeId(ua.ObjectIds.Boolean):
367 1
            el.text = 'true' if val else 'false'
368 1
        elif dtype == ua.NodeId(ua.ObjectIds.ByteString):
369 1
            if val is None:
370
                val = b""
371 1
            data = base64.b64encode(val)
372 1
            el.text = data.decode("utf-8")
373 1
        elif not hasattr(val, "ua_types"):
374 1
            if isinstance(val, bytes):
375
                # FIXME: should we also encode this (localized text I guess) using base64??
376
                el.text = val.decode("utf-8")
377
            else:
378 1
                if val is not None:
379 1
                    el.text = str(val)
380
        else:
381 1
            for name, vtype in val.ua_types:
382 1
                self.member_to_etree(el, name, ua.NodeId(getattr(ua.ObjectIds, vtype)), getattr(val, name))
383
384
385 1
    def value_to_etree(self, el, dtype_name, dtype, node):
386 1
        var = node.get_data_value().Value
387 1
        if var.Value is not None:
388 1
            val_el = Et.SubElement(el, 'Value')
389 1
            self._value_to_etree(val_el, dtype_name, dtype, var.Value)
390
391 1
    def _value_to_etree(self, el, type_name, dtype, val):
392 1
        if val is None:
393
            return
394
395 1
        if isinstance(val, (list, tuple)):
396 1
            if dtype.NamespaceIndex == 0 and dtype.Identifier <= 21:
397 1
                elname = "uax:ListOf" + type_name
398
            else:  # this is an extentionObject:
399 1
                elname = "uax:ListOfExtensionObject"
400
401 1
            list_el = Et.SubElement(el, elname)
402 1
            for nval in val:
403 1
                self._value_to_etree(list_el, type_name, dtype, nval)
404
        else:
405 1
            dtype_base = get_base_data_type(self.server.get_node(dtype))
406 1
            dtype_base = dtype_base.nodeid
407
408 1
            if dtype_base == ua.NodeId(ua.ObjectIds.Enumeration):
409 1
                dtype_base = ua.NodeId(ua.ObjectIds.Int32)
410 1
                type_name = ua.ObjectIdNames[dtype_base.Identifier]
411
412 1
            if dtype_base.NamespaceIndex == 0 and dtype_base.Identifier <= 21:
413 1
                type_name = ua.ObjectIdNames[dtype_base.Identifier]
414 1
                val_el = Et.SubElement(el, "uax:" + type_name)
415 1
                self._val_to_etree(val_el, dtype_base, val)
416
            else:
417 1
                self._extobj_to_etree(el, type_name, dtype, val)
418
419
420
421
422 1
    def _extobj_to_etree(self, val_el, name, dtype, val):
423 1
        obj_el = Et.SubElement(val_el, "uax:ExtensionObject")
424 1
        type_el = Et.SubElement(obj_el, "uax:TypeId")
425 1
        id_el = Et.SubElement(type_el, "uax:Identifier")
426 1
        id_el.text = dtype.to_string()
427 1
        body_el = Et.SubElement(obj_el, "uax:Body")
428 1
        struct_el = Et.SubElement(body_el, "uax:" + name)
429 1
        for name, vtype  in val.ua_types:
430
            # FIXME; what happend if we have a custom type which is not part of ObjectIds???
431 1
            if vtype.startswith("ListOf"):
432 1
                vtype = vtype[6:]
433 1
            self.member_to_etree(struct_el, name, ua.NodeId(getattr(ua.ObjectIds, vtype)), getattr(val, name))
434
            #self.member_to_etree(struct_el, name, extension_object_ids[vtype], getattr(val, name))
435
        #for name in self._get_member_order(dtype, val):
436
            #self.member_to_etree(struct_el, name, ua.NodeId(getattr(ua.ObjectIds, val.ua_types[name])), getattr(val, name))
437
438 1
    def _get_member_order(self, dtype, val):
439
        '''
440
        If an dtype has an entry in XmlExporter.extobj_ordered_elements return the export order of the elements 
441
        else return the unordered members.
442
        '''
443
        if dtype not in XmlExporter.extobj_ordered_elements.keys():
444
            return val.ua_types.keys()
445
        else:
446
            member_keys = [name for name in XmlExporter.extobj_ordered_elements[dtype] if name in val.ua_types.keys() and getattr(val, name) is not None ]
447
448
        return member_keys
449
450 1
    def indent(self, elem, level=0):
451
        """
452
        copy and paste from http://effbot.org/zone/element-lib.htm#prettyprint
453
        it basically walks your tree and adds spaces and newlines so the tree is
454
        printed in a nice way
455
        """
456 1
        i = "\n" + level * "  "
457 1
        if len(elem):
458 1
            if not elem.text or not elem.text.strip():
459 1
                elem.text = i + "  "
460 1
            if not elem.tail or not elem.tail.strip():
461 1
                elem.tail = i
462 1
            for elem in elem:
463 1
                self.indent(elem, level + 1)
464 1
            if not elem.tail or not elem.tail.strip():
465 1
                elem.tail = i
466
        else:
467 1
            if level and (not elem.tail or not elem.tail.strip()):
468
                elem.tail = i
469