Completed
Push — master ( 63a85c...1bf806 )
by Olivier
03:59
created

XmlExporter._val_to_etree()   C

Complexity

Conditions 9

Size

Total Lines 19

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 17
CRAP Score 9

Importance

Changes 2
Bugs 1 Features 0
Metric Value
cc 9
c 2
b 1
f 0
dl 0
loc 19
ccs 17
cts 17
cp 1
crap 9
rs 6.4615
1
"""
2
from a list of nodes in the address space, build an XML file
3
format is the one from opc-ua specification
4
"""
5 1
import logging
6 1
from collections import OrderedDict
7 1
import xml.etree.ElementTree as Et
8 1
from copy import copy
9
10 1
from opcua import ua
11 1
from opcua.ua import object_ids as o_ids
12 1
from opcua.common.ua_utils import get_base_data_type
13
14
15 1
class XmlExporter(object):
16
17 1
    def __init__(self, server):
18 1
        self.logger = logging.getLogger(__name__)
19 1
        self.server = server
20 1
        self.aliases = {}
21 1
        self._addr_idx_to_xml_idx = {}
22
23 1
        node_set_attributes = OrderedDict()
24 1
        node_set_attributes['xmlns:xsi'] = 'http://www.w3.org/2001/XMLSchema-instance'
25 1
        node_set_attributes['xmlns:uax'] = 'http://opcfoundation.org/UA/2008/02/Types.xsd'
26 1
        node_set_attributes['xmlns:xsd'] = 'http://www.w3.org/2001/XMLSchema'
27 1
        node_set_attributes['xmlns'] = 'http://opcfoundation.org/UA/2011/03/UANodeSet.xsd'
28
29 1
        self.etree = Et.ElementTree(Et.Element('UANodeSet', node_set_attributes))
30
31 1
    def build_etree(self, node_list, uris=None):
32
        """
33
        Create an XML etree object from a list of nodes; custom namespace uris are optional
34
        Namespaces used by nodes are always exported for consistency.
35
        Args:
36
            node_list: list of Node objects for export
37
            uris: list of namespace uri strings
38
39
        Returns:
40
        """
41 1
        self.logger.info('Building XML etree')
42
43 1
        self._add_namespaces(node_list, uris)
44
45
        # add all nodes in the list to the XML etree
46 1
        for node in node_list:
47 1
            self.node_to_etree(node)
48
49
        # add aliases to the XML etree
50 1
        self._add_alias_els()
51
52 1
    def _add_namespaces(self, nodes, uris):
53 1
        idxs = self._get_ns_idxs_of_nodes(nodes)
54
55 1
        ns_array = self.server.get_namespace_array()
56
57
        # now add index of provided uris if necessary
58 1
        if uris:
59
            self._add_idxs_from_uris(idxs, uris, ns_array)
60
61
        # now create a dict of idx_in_address_space to idx_in_exported_file
62 1
        self._addr_idx_to_xml_idx = self._make_idx_dict(idxs, ns_array)
63 1
        ns_to_export = [ns_array[i] for i in sorted(list(self._addr_idx_to_xml_idx.keys())) if i != 0]
64
65
        # write namespaces to xml
66 1
        self._add_namespace_uri_els(ns_to_export)
67
68 1
    def _make_idx_dict(self, idxs, ns_array):
69 1
        idxs.sort()
70 1
        addr_idx_to_xml_idx = {0: 0}
71 1
        for xml_idx, addr_idx in enumerate(idxs):
72 1
            if addr_idx >= len(ns_array):
73 1
                break
74 1
            addr_idx_to_xml_idx[addr_idx] = xml_idx + 1
75 1
        return addr_idx_to_xml_idx
76
77 1
    def _get_ns_idxs_of_nodes(self, nodes):
78
        """
79
        get a list of all indexes used or references by nodes
80
        """
81 1
        idxs = []
82 1
        for node in nodes:
83 1
            node_idxs = [node.nodeid.NamespaceIndex]
84 1
            node_idxs.append(node.get_browse_name().NamespaceIndex)
85 1
            node_idxs.extend(ref.NodeId.NamespaceIndex for ref in node.get_references())
86 1
            node_idxs = list(set(node_idxs))  # remove duplicates
87 1
            for i in node_idxs:
88 1
                if i != 0 and i not in idxs:
89 1
                    idxs.append(i)
90 1
        return idxs
91
92 1
    def _add_idxs_from_uris(self, idxs, uris, ns_array):
93
        for uri in uris:
94
            if uri in ns_array:
95
                i = ns_array.index(uri)
96
                if i not in idxs:
97
                    idxs.append(i)
98
99
100 1
    def write_xml(self, xmlpath, pretty=True):
101
        """
102
        Write the XML etree in the exporter object to a file
103
        Args:
104
            xmlpath: string representing the path/file name
105
106
        Returns:
107
        """
108
        # try to write the XML etree to a file
109 1
        self.logger.info('Exporting XML file to %s', xmlpath)
110
        # from IPython import embed
111
        # embed()
112 1
        if pretty:
113 1
            self.indent(self.etree.getroot())
114 1
            self.etree.write(xmlpath,
115
                             encoding='utf-8',
116
                             xml_declaration=True
117
                            )
118
        else:
119
            self.etree.write(xmlpath,
120
                             encoding='utf-8',
121
                             xml_declaration=True
122
                            )
123
124 1
    def dump_etree(self):
125
        """
126
        Dump etree to console for debugging
127
        Returns:
128
        """
129
        self.logger.info('Dumping XML etree to console')
130
        Et.dump(self.etree)
131
132 1
    def node_to_etree(self, node):
133
        """
134
        Add the necessary XML sub elements to the etree for exporting the node
135
        Args:
136
            node: Node object which will be added to XML etree
137
138
        Returns:
139
        """
140 1
        node_class = node.get_node_class()
141
142 1
        if node_class is ua.NodeClass.Object:
143 1
            self.add_etree_object(node)
144 1
        elif node_class is ua.NodeClass.ObjectType:
145
            self.add_etree_object_type(node)
146 1
        elif node_class is ua.NodeClass.Variable:
147 1
            self.add_etree_variable(node)
148 1
        elif node_class is ua.NodeClass.VariableType:
149
            self.add_etree_variable_type(node)
150 1
        elif node_class is ua.NodeClass.ReferenceType:
151
            self.add_etree_reference_type(node)
152 1
        elif node_class is ua.NodeClass.DataType:
153
            self.add_etree_datatype(node)
154 1
        elif node_class is ua.NodeClass.Method:
155 1
            self.add_etree_method(node)
156
        else:
157
            self.logger.info("Exporting node class not implemented: %s ", node_class)
158
159 1
    def _add_sub_el(self, el, name, text):
160 1
        child_el = Et.SubElement(el, name)
161 1
        child_el.text = text
162 1
        return child_el
163
164 1
    def _node_to_string(self, nodeid):
165 1
        if not isinstance(nodeid, ua.NodeId):
166 1
            nodeid = nodeid.nodeid
167
168 1
        if nodeid.NamespaceIndex in self._addr_idx_to_xml_idx:
169 1
            nodeid = copy(nodeid)
170 1
            nodeid.NamespaceIndex = self._addr_idx_to_xml_idx[nodeid.NamespaceIndex]
171 1
        return nodeid.to_string()
172
173 1
    def _bname_to_string(self, bname):
174 1
        if bname.NamespaceIndex in self._addr_idx_to_xml_idx:
175 1
            bname = copy(bname)
176 1
            bname.NamespaceIndex = self._addr_idx_to_xml_idx[bname.NamespaceIndex]
177 1
        return bname.to_string()
178
179 1
    def _add_node_common(self, nodetype, node):
180 1
        browsename = node.get_browse_name()
181 1
        nodeid = node.nodeid
182 1
        parent = node.get_parent()
183 1
        displayname = node.get_display_name().Text.decode('utf-8')
184 1
        desc = node.get_description().Text
185 1
        node_el = Et.SubElement(self.etree.getroot(), nodetype)
186 1
        node_el.attrib["NodeId"] = self._node_to_string(nodeid)
187 1
        node_el.attrib["BrowseName"] = self._bname_to_string(browsename)
188 1
        if parent is not None:
189 1
            node_class = node.get_node_class()
190 1
            if node_class in (ua.NodeClass.Object, ua.NodeClass.Variable, ua.NodeClass.Method):
191 1
                node_el.attrib["ParentNodeId"] = self._node_to_string(parent)
192 1
        self._add_sub_el(node_el, 'DisplayName', displayname)
193 1
        if desc not in (None, ""):
194 1
            self._add_sub_el(node_el, 'Description', desc.decode('utf-8'))
195
        # FIXME: add WriteMask and UserWriteMask
196 1
        return node_el
197
198 1
    def add_etree_object(self, node):
199
        """
200
        Add a UA object element to the XML etree
201
        """
202 1
        obj_el = self._add_node_common("UAObject", node)
203 1
        var = node.get_attribute(ua.AttributeIds.EventNotifier)
204 1
        if var.Value.Value != 0:
205
            obj_el.attrib["EventNotifier"] = str(var.Value.Value)
206 1
        self._add_ref_els(obj_el, node)
207
208 1
    def add_etree_object_type(self, node):
209
        """
210
        Add a UA object type element to the XML etree
211
        """
212
        obj_el = self._add_node_common("UAObjectType", node)
213
        abstract = node.get_attribute(ua.AttributeIds.IsAbstract).Value.Value
214
        if abstract:
215
            obj_el.attrib["IsAbstract"] = 'true'
216
        self._add_ref_els(obj_el, node)
217
218 1
    def add_variable_common(self, node, el):
219 1
        dtype = node.get_data_type()
220 1
        if dtype.NamespaceIndex == 0 and dtype.Identifier in o_ids.ObjectIdNames:
221 1
            dtype_name = o_ids.ObjectIdNames[dtype.Identifier]
222 1
            self.aliases[dtype] = dtype_name
223
        else:
224 1
            dtype_name = dtype.to_string()
225 1
        rank = node.get_value_rank()
226 1
        if rank != -1:
227 1
            el.attrib["ValueRank"] = str(rank)
228 1
        dim = node.get_attribute(ua.AttributeIds.ArrayDimensions)
229 1
        if dim.Value.Value:
230 1
            el.attrib["ArrayDimensions"] = ",".join([str(i) for i in dim.Value.Value])
231 1
        el.attrib["DataType"] = dtype_name
232 1
        self.value_to_etree(el, dtype_name, dtype, node)
233
234 1
    def add_etree_variable(self, node):
235
        """
236
        Add a UA variable element to the XML etree
237
        """
238 1
        var_el = self._add_node_common("UAVariable", node)
239 1
        self._add_ref_els(var_el, node)
240 1
        self.add_variable_common(node, var_el)
241
242 1
        accesslevel = node.get_attribute(ua.AttributeIds.AccessLevel).Value.Value
243 1
        useraccesslevel = node.get_attribute(ua.AttributeIds.UserAccessLevel).Value.Value
244
245
        # We only write these values if they are different from defaults
246
        # Not sure where default is defined....
247 1
        if accesslevel not in (0, ua.AccessLevel.CurrentRead.mask):
248
            var_el.attrib["AccessLevel"] = str(accesslevel)
249 1
        if useraccesslevel not in (0, ua.AccessLevel.CurrentRead.mask):
250
            var_el.attrib["UserAccessLevel"] = str(useraccesslevel)
251
252 1
        var = node.get_attribute(ua.AttributeIds.MinimumSamplingInterval)
253 1
        if var.Value.Value:
254
            var_el.attrib["MinimumSamplingInterval"] = str(var.Value.Value)
255 1
        var = node.get_attribute(ua.AttributeIds.Historizing)
256 1
        if var.Value.Value:
257
            var_el.attrib["Historizing"] = 'true'
258
259 1
    def add_etree_variable_type(self, node):
260
        """
261
        Add a UA variable type element to the XML etree
262
        """
263
264
        var_el = self._add_node_common("UAVariableType", node)
265
        self.add_variable_common(node, var_el)
266
267
        abstract = node.get_attribute(ua.AttributeIds.IsAbstract)
268
        if abstract.Value.Value:
269
            var_el.attrib["IsAbstract"] = "true"
270
271
        self._add_ref_els(var_el, node)
272
273 1
    def add_etree_method(self, node):
274 1
        obj_el = self._add_node_common("UAMethod", node)
275
276 1
        var = node.get_attribute(ua.AttributeIds.Executable)
277 1
        if var.Value.Value is False:
278
            obj_el.attrib["Executable"] = "false"
279 1
        var = node.get_attribute(ua.AttributeIds.UserExecutable)
280 1
        if var.Value.Value is False:
281
            obj_el.attrib["UserExecutable"] = "false"
282 1
        self._add_ref_els(obj_el, node)
283
284 1
    def add_etree_reference_type(self, obj):
285
        obj_el = self._add_node_common("UAReferenceType", obj)
286
        var = obj.get_attribute(ua.AttributeIds.InverseName)
287
        if var is not None and var.Value.Value is not None:
288
            self._add_sub_el(obj_el, 'InverseName', var.Value.Value.Text.decode('utf-8'))
289
        self._add_ref_els(obj_el, obj)
290
291 1
    def add_etree_datatype(self, obj):
292
        """
293
        Add a UA data type element to the XML etree
294
        """
295
        obj_el = self._add_node_common("UADataType", obj)
296
        self._add_ref_els(obj_el, obj)
297
298 1
    def _add_namespace_uri_els(self, uris):
299 1
        nuris_el = Et.Element('NamespaceUris')
300
301 1
        for uri in uris:
302 1
            self._add_sub_el(nuris_el, 'Uri', uri)
303
304 1
        self.etree.getroot().insert(0, nuris_el)
305
306 1
    def _add_alias_els(self):
307 1
        aliases_el = Et.Element('Aliases')
308
309 1
        ordered_keys = list(self.aliases.keys())
310 1
        ordered_keys.sort()
311 1
        for nodeid in ordered_keys:
312 1
            name = self.aliases[nodeid]
313 1
            ref_el = Et.SubElement(aliases_el, 'Alias', Alias=name)
314 1
            ref_el.text = nodeid.to_string()
315
316
        # insert behind the namespace element
317 1
        self.etree.getroot().insert(1, aliases_el)
318
319 1
    def _add_ref_els(self, parent_el, obj):
320 1
        refs = obj.get_references()
321 1
        refs_el = Et.SubElement(parent_el, 'References')
322
323 1
        for ref in refs:
324 1
            if ref.ReferenceTypeId.Identifier in o_ids.ObjectIdNames:
325 1
                ref_name = o_ids.ObjectIdNames[ref.ReferenceTypeId.Identifier]
326
            else:
327
                ref_name = ref.ReferenceTypeId.to_string()
328 1
            ref_el = Et.SubElement(refs_el, 'Reference')
329 1
            ref_el.attrib['ReferenceType'] = ref_name
330 1
            if not ref.IsForward:
331 1
                ref_el.attrib['IsForward'] = 'false'
332 1
            ref_el.text = self._node_to_string(ref.NodeId)
333
334 1
            self.aliases[ref.ReferenceTypeId] = ref_name
335
336
337 1
    def member_to_etree(self, el, name, dtype, val):
338 1
        member_el = Et.SubElement(el, "uax:" + name)
339 1
        if isinstance(val, (list, tuple)):
340 1
            for v in val:
341 1
                self._value_to_etree(member_el, ua.ObjectIdNames[dtype.Identifier], dtype, v)
342
        else:
343 1
            self._val_to_etree(member_el, dtype, val)
344
345
346 1
    def _val_to_etree(self, el, dtype, val):
347 1
        if val is None:
348 1
            val = ""
349 1
        if dtype == ua.NodeId(ua.ObjectIds.NodeId):
350 1
            id_el = Et.SubElement(el, "uax:Identifier")
351 1
            id_el.text = val.to_string()
352 1
        elif dtype == ua.NodeId(ua.ObjectIds.Guid):
353 1
            id_el = Et.SubElement(el, "uax:String")
354 1
            id_el.text = str(val)
355 1
        elif dtype == ua.NodeId(ua.ObjectIds.Boolean):
356 1
            el.text = 'true' if val else 'false'
357 1
        elif not hasattr(val, "ua_types"):
358 1
            if isinstance(val, bytes):
359 1
                el.text = val.decode("utf-8")
360
            else:
361 1
                el.text = str(val)
362
        else:
363 1
            for name, vtype in val.ua_types.items():
364 1
                self.member_to_etree(el, name, ua.NodeId(getattr(ua.ObjectIds, vtype)), getattr(val, name))
365
366
367 1
    def value_to_etree(self, el, dtype_name, dtype, node):
368 1
        var = node.get_data_value().Value
369 1
        if var.Value is not None:
370 1
            val_el = Et.SubElement(el, 'Value')
371 1
            self._value_to_etree(val_el, dtype_name, dtype, var.Value)
372
373
374 1
    def _value_to_etree(self, el, type_name, dtype, val):
375 1
        if val is None:
376
            return
377
378 1
        if isinstance(val, (list, tuple)):
379 1
            if dtype.NamespaceIndex == 0 and dtype.Identifier <= 21:
380 1
                elname = "uax:ListOf" + type_name
381
            else:  # this is an extentionObject:
382 1
                elname = "uax:ListOfExtensionObject"
383
384 1
            list_el = Et.SubElement(el, elname)
385 1
            for nval in val:
386 1
                self._value_to_etree(list_el, type_name, dtype, nval)
387
        else:
388 1
            dtype_base = get_base_data_type(self.server.get_node(dtype))
389 1
            dtype_base = dtype_base.nodeid
390
391 1
            if dtype_base == ua.NodeId(ua.ObjectIds.Enumeration):
392 1
                dtype_base = ua.NodeId(ua.ObjectIds.Int32)
393 1
                type_name = ua.ObjectIdNames[dtype_base.Identifier]
394
395 1
            if dtype_base.NamespaceIndex == 0 and dtype_base.Identifier <= 21:
396 1
                type_name = ua.ObjectIdNames[dtype_base.Identifier]
397 1
                val_el = Et.SubElement(el, "uax:" + type_name)
398 1
                self._val_to_etree(val_el, dtype_base, val)
399
            else:
400 1
                self._extobj_to_etree(el, type_name, dtype, val)
401
402
403 1
    def _extobj_to_etree(self, val_el, name, dtype, val):
404 1
        obj_el = Et.SubElement(val_el, "uax:ExtensionObject")
405 1
        type_el = Et.SubElement(obj_el, "uax:TypeId")
406 1
        id_el = Et.SubElement(type_el, "uax:Identifier")
407 1
        id_el.text = dtype.to_string()
408 1
        body_el = Et.SubElement(obj_el, "uax:Body")
409 1
        struct_el = Et.SubElement(body_el, "uax:" + name)
410 1
        for name, vtype in val.ua_types.items():
411 1
            self.member_to_etree(struct_el, name, ua.NodeId(getattr(ua.ObjectIds, vtype)), getattr(val, name))
412
413
414 1
    def indent(self, elem, level=0):
415
        """
416
        copy and paste from http://effbot.org/zone/element-lib.htm#prettyprint
417
        it basically walks your tree and adds spaces and newlines so the tree is
418
        printed in a nice way
419
        """
420 1
        i = "\n" + level * "  "
421 1
        if len(elem):
422 1
            if not elem.text or not elem.text.strip():
423 1
                elem.text = i + "  "
424 1
            if not elem.tail or not elem.tail.strip():
425 1
                elem.tail = i
426 1
            for elem in elem:
427 1
                self.indent(elem, level + 1)
428 1
            if not elem.tail or not elem.tail.strip():
429 1
                elem.tail = i
430
        else:
431 1
            if level and (not elem.tail or not elem.tail.strip()):
432
                elem.tail = i
433