Completed
Push — master ( 420f43...b08d7a )
by Olivier
04:20
created

XmlExporter.dump_etree()   A

Complexity

Conditions 1

Size

Total Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
c 1
b 0
f 0
dl 0
loc 7
rs 9.4285
1
"""
2
from a list of nodes in the address space, build an XML file
3
format is the one from opc-ua specification
4
"""
5
import logging
6
from collections import OrderedDict
7
import xml.etree.ElementTree as Et
8
from copy import copy
9
10
from opcua import ua
11
from opcua.ua import object_ids as o_ids
12
from opcua.common.ua_utils import get_node_supertypes, get_variable_basetype
13
14
15
class XmlExporter(object):
16
17
    def __init__(self, server):
18
        self.logger = logging.getLogger(__name__)
19
        self.server = server
20
        self.aliases = {}
21
        self._addr_idx_to_xml_idx = {}
22
23
        node_set_attributes = OrderedDict()
24
        node_set_attributes['xmlns:xsi'] = 'http://www.w3.org/2001/XMLSchema-instance'
25
        node_set_attributes['xmlns:uax'] = 'http://opcfoundation.org/UA/2008/02/Types.xsd'
26
        node_set_attributes['xmlns:xsd'] = 'http://www.w3.org/2001/XMLSchema'
27
        node_set_attributes['xmlns'] = 'http://opcfoundation.org/UA/2011/03/UANodeSet.xsd'
28
29
        self.etree = Et.ElementTree(Et.Element('UANodeSet', node_set_attributes))
30
31
    def build_etree(self, node_list, uris=None):
32
        """
33
        Create an XML etree object from a list of nodes; custom namespace uris are optional
34
        Namespaces used by nodes are always exported for consistency.
35
        Args:
36
            node_list: list of Node objects for export
37
            uris: list of namespace uri strings
38
39
        Returns:
40
        """
41
        self.logger.info('Building XML etree')
42
43
        self._add_namespaces(node_list, uris)
44
45
        # add all nodes in the list to the XML etree
46
        for node in node_list:
47
            self.node_to_etree(node)
48
49
        # add aliases to the XML etree
50
        self._add_alias_els()
51
52
    def _add_namespaces(self, nodes, uris):
53
        idxs = self._get_ns_idxs_of_nodes(nodes)
54
55
        ns_array = self.server.get_namespace_array()
56
57
        # now add index of provided uris if necessary
58
        if uris:
59
            self._add_idxs_from_uris(idxs, uris, ns_array)
60
61
        # now create a dict of idx_in_address_space to idx_in_exported_file
62
        self._addr_idx_to_xml_idx = self._make_idx_dict(idxs, ns_array)
63
        ns_to_export = [ns_array[i] for i in sorted(list(self._addr_idx_to_xml_idx.keys())) if i != 0]
64
65
        # write namespaces to xml
66
        self._add_namespace_uri_els(ns_to_export)
67
68
    def _make_idx_dict(self, idxs, ns_array):
69
        idxs.sort()
70
        addr_idx_to_xml_idx = {0: 0}
71
        for xml_idx, addr_idx in enumerate(idxs):
72
            if addr_idx >= len(ns_array):
73
                break
74
            addr_idx_to_xml_idx[addr_idx] = xml_idx + 1
75
        return addr_idx_to_xml_idx
76
77
    def _get_ns_idxs_of_nodes(self, nodes):
78
        """
79
        get a list of all indexes used or references by nodes
80
        """
81
        idxs = []
82
        for node in nodes:
83
            node_idxs = [node.nodeid.NamespaceIndex]
84
            node_idxs.append(node.get_browse_name().NamespaceIndex)
85
            node_idxs.extend(ref.NodeId.NamespaceIndex for ref in node.get_references())
86
            node_idxs = list(set(node_idxs))  # remove duplicates
87
            for i in node_idxs:
88
                if i != 0 and i not in idxs:
89
                    idxs.append(i)
90
        return idxs
91
92
    def _add_idxs_from_uris(self, idxs, uris, ns_array):
93
        for uri in uris:
94
            if uri in ns_array:
95
                i = ns_array.index(uri)
96
                if i not in idxs:
97
                    idxs.append(i)
98
99
100
    def write_xml(self, xmlpath, pretty=True):
101
        """
102
        Write the XML etree in the exporter object to a file
103
        Args:
104
            xmlpath: string representing the path/file name
105
106
        Returns:
107
        """
108
        # try to write the XML etree to a file
109
        self.logger.info('Exporting XML file to %s', xmlpath)
110
        # from IPython import embed
111
        # embed()
112
        if pretty:
113
            self.indent(self.etree.getroot())
114
            self.etree.write(xmlpath,
115
                             encoding='utf-8',
116
                             xml_declaration=True
117
                            )
118
        else:
119
            self.etree.write(xmlpath,
120
                             encoding='utf-8',
121
                             xml_declaration=True
122
                            )
123
124
    def dump_etree(self):
125
        """
126
        Dump etree to console for debugging
127
        Returns:
128
        """
129
        self.logger.info('Dumping XML etree to console')
130
        Et.dump(self.etree)
131
132
    def node_to_etree(self, node):
133
        """
134
        Add the necessary XML sub elements to the etree for exporting the node
135
        Args:
136
            node: Node object which will be added to XML etree
137
138
        Returns:
139
        """
140
        node_class = node.get_node_class()
141
142
        if node_class is ua.NodeClass.Object:
143
            self.add_etree_object(node)
144
        elif node_class is ua.NodeClass.ObjectType:
145
            self.add_etree_object_type(node)
146
        elif node_class is ua.NodeClass.Variable:
147
            self.add_etree_variable(node)
148
        elif node_class is ua.NodeClass.VariableType:
149
            self.add_etree_variable_type(node)
150
        elif node_class is ua.NodeClass.ReferenceType:
151
            self.add_etree_reference_type(node)
152
        elif node_class is ua.NodeClass.DataType:
153
            self.add_etree_datatype(node)
154
        elif node_class is ua.NodeClass.Method:
155
            self.add_etree_method(node)
156
        else:
157
            self.logger.info("Exporting node class not implemented: %s ", node_class)
158
159
    def _add_sub_el(self, el, name, text):
160
        child_el = Et.SubElement(el, name)
161
        child_el.text = text
162
        return child_el
163
164
    def _node_to_string(self, nodeid):
165
        if not isinstance(nodeid, ua.NodeId):
166
            nodeid = nodeid.nodeid
167
168
        if nodeid.NamespaceIndex in self._addr_idx_to_xml_idx:
169
            nodeid = copy(nodeid)
170
            nodeid.NamespaceIndex = self._addr_idx_to_xml_idx[nodeid.NamespaceIndex]
171
        return nodeid.to_string()
172
173
    def _bname_to_string(self, bname):
174
        if bname.NamespaceIndex in self._addr_idx_to_xml_idx:
175
            bname = copy(bname)
176
            bname.NamespaceIndex = self._addr_idx_to_xml_idx[bname.NamespaceIndex]
177
        return bname.to_string()
178
179
    def _add_node_common(self, nodetype, node):
180
        browsename = node.get_browse_name()
181
        nodeid = node.nodeid
182
        parent = node.get_parent()
183
        displayname = node.get_display_name().Text.decode('utf-8')
184
        desc = node.get_description().Text
185
        node_el = Et.SubElement(self.etree.getroot(), nodetype)
186
        node_el.attrib["NodeId"] = self._node_to_string(nodeid)
187
        node_el.attrib["BrowseName"] = self._bname_to_string(browsename)
188
        if parent is not None:
189
            node_class = node.get_node_class()
190
            if node_class in (ua.NodeClass.Object, ua.NodeClass.Variable, ua.NodeClass.Method):
191
                node_el.attrib["ParentNodeId"] = self._node_to_string(parent)
192
        self._add_sub_el(node_el, 'DisplayName', displayname)
193
        if desc not in (None, ""):
194
            self._add_sub_el(node_el, 'Description', desc.decode('utf-8'))
195
        # FIXME: add WriteMask and UserWriteMask
196
        return node_el
197
198
    def add_etree_object(self, node):
199
        """
200
        Add a UA object element to the XML etree
201
        """
202
        obj_el = self._add_node_common("UAObject", node)
203
        var = node.get_attribute(ua.AttributeIds.EventNotifier)
204
        if var.Value.Value != 0:
205
            obj_el.attrib["EventNotifier"] = str(var.Value.Value)
206
        self._add_ref_els(obj_el, node)
207
208
    def add_etree_object_type(self, node):
209
        """
210
        Add a UA object type element to the XML etree
211
        """
212
        obj_el = self._add_node_common("UAObjectType", node)
213
        abstract = node.get_attribute(ua.AttributeIds.IsAbstract).Value.Value
214
        if abstract:
215
            obj_el.attrib["IsAbstract"] = 'true'
216
        self._add_ref_els(obj_el, node)
217
218
    def add_variable_common(self, node, el):
219
        dtype = node.get_data_type()
220
        if dtype.NamespaceIndex == 0 and dtype.Identifier in o_ids.ObjectIdNames:
221
            dtype_name = o_ids.ObjectIdNames[dtype.Identifier]
222
            self.aliases[dtype] = dtype_name
223
        else:
224
            dtype_name = dtype.to_string()
225
        rank = node.get_value_rank()
226
        if rank != -1:
227
            el.attrib["ValueRank"] = str(rank)
228
        dim = node.get_attribute(ua.AttributeIds.ArrayDimensions)
229
        if dim.Value.Value:
230
            el.attrib["ArrayDimensions"] = ",".join([str(i) for i in dim.Value.Value])
231
        el.attrib["DataType"] = dtype_name
232
        self.value_to_etree(el, dtype_name, dtype, node)
233
234
    def add_etree_variable(self, node):
235
        """
236
        Add a UA variable element to the XML etree
237
        """
238
        var_el = self._add_node_common("UAVariable", node)
239
        self.add_variable_common(node, var_el)
240
241
        accesslevel = node.get_attribute(ua.AttributeIds.AccessLevel).Value.Value
242
        useraccesslevel = node.get_attribute(ua.AttributeIds.UserAccessLevel).Value.Value
243
244
        # We only write these values if they are different from defaults
245
        # Not sure where default is defined....
246
        if accesslevel not in (0, ua.AccessLevel.CurrentRead.mask):
247
            var_el.attrib["AccessLevel"] = str(accesslevel)
248
        if useraccesslevel not in (0, ua.AccessLevel.CurrentRead.mask):
249
            var_el.attrib["UserAccessLevel"] = str(useraccesslevel)
250
251
        var = node.get_attribute(ua.AttributeIds.MinimumSamplingInterval)
252
        if var.Value.Value:
253
            var_el.attrib["MinimumSamplingInterval"] = str(var.Value.Value)
254
        var = node.get_attribute(ua.AttributeIds.Historizing)
255
        if var.Value.Value:
256
            var_el.attrib["Historizing"] = 'true'
257
        self._add_ref_els(var_el, node)
258
259
    def add_etree_variable_type(self, node):
260
        """
261
        Add a UA variable type element to the XML etree
262
        """
263
264
        var_el = self._add_node_common("UAVariableType", node)
265
        self.add_variable_common(node, var_el)
266
267
        abstract = node.get_attribute(ua.AttributeIds.IsAbstract)
268
        if abstract.Value.Value:
269
            var_el.attrib["IsAbstract"] = "true"
270
271
        self._add_ref_els(var_el, node)
272
273
    def add_etree_method(self, node):
274
        obj_el = self._add_node_common("UAMethod", node)
275
276
        var = node.get_attribute(ua.AttributeIds.Executable)
277
        if var.Value.Value is False:
278
            obj_el.attrib["Executable"] = "false"
279
        var = node.get_attribute(ua.AttributeIds.UserExecutable)
280
        if var.Value.Value is False:
281
            obj_el.attrib["UserExecutable"] = "false"
282
        self._add_ref_els(obj_el, node)
283
284
    def add_etree_reference_type(self, obj):
285
        obj_el = self._add_node_common("UAReferenceType", obj)
286
        var = obj.get_attribute(ua.AttributeIds.InverseName)
287
        if var is not None and var.Value.Value is not None:
288
            self._add_sub_el(obj_el, 'InverseName', var.Value.Value.Text.decode('utf-8'))
289
        self._add_ref_els(obj_el, obj)
290
291
    def add_etree_datatype(self, obj):
292
        """
293
        Add a UA data type element to the XML etree
294
        """
295
        obj_el = self._add_node_common("UADataType", obj)
296
        self._add_ref_els(obj_el, obj)
297
298
    def _add_namespace_uri_els(self, uris):
299
        nuris_el = Et.Element('NamespaceUris')
300
301
        for uri in uris:
302
            self._add_sub_el(nuris_el, 'Uri', uri)
303
304
        self.etree.getroot().insert(0, nuris_el)
305
306
    def _add_alias_els(self):
307
        aliases_el = Et.Element('Aliases')
308
309
        ordered_keys = list(self.aliases.keys())
310
        ordered_keys.sort()
311
        for nodeid in ordered_keys:
312
            name = self.aliases[nodeid]
313
            ref_el = Et.SubElement(aliases_el, 'Alias', Alias=name)
314
            ref_el.text = nodeid.to_string()
315
316
        self.etree.getroot().insert(0, aliases_el)
317
318
    def _add_ref_els(self, parent_el, obj):
319
        refs = obj.get_references()
320
        refs_el = Et.SubElement(parent_el, 'References')
321
322
        for ref in refs:
323
            if ref.ReferenceTypeId.Identifier in o_ids.ObjectIdNames:
324
                ref_name = o_ids.ObjectIdNames[ref.ReferenceTypeId.Identifier]
325
            else:
326
                ref_name = ref.ReferenceTypeId.to_string()
327
            ref_el = Et.SubElement(refs_el, 'Reference')
328
            ref_el.attrib['ReferenceType'] = ref_name
329
            if not ref.IsForward:
330
                ref_el.attrib['IsForward'] = 'false'
331
            ref_el.text = self._node_to_string(ref.NodeId)
332
333
            self.aliases[ref.ReferenceTypeId] = ref_name
334
335
336
    def member_to_etree(self, el, name, dtype, val):
337
        member_el = Et.SubElement(el, "uax:" + name)
338
        if isinstance(val, (list, tuple)):
339
            for v in val:
340
                self._value_to_etree(member_el, ua.ObjectIdNames[dtype.Identifier], dtype, v)
341
        else:
342
            self._val_to_etree(member_el, dtype, val)
343
344
345
    def _val_to_etree(self, el, dtype, val):
346
        if val is None:
347
            val = ""
348
        if dtype == ua.NodeId(ua.ObjectIds.NodeId):
349
            id_el = Et.SubElement(el, "uax:Identifier")
350
            id_el.text = val.to_string()
351
        elif dtype == ua.NodeId(ua.ObjectIds.Guid):
352
            id_el = Et.SubElement(el, "uax:String")
353
            id_el.text = str(val)
354
        elif not hasattr(val, "ua_types"):
355
            if isinstance(val, bytes):
356
                el.text = val.decode("utf-8")
357
            else:
358
                el.text = str(val)
359
        else:
360
            for name, vtype in val.ua_types.items():
361
                self.member_to_etree(el, name, ua.NodeId(getattr(ua.ObjectIds, vtype)), getattr(val, name))
362
363
364
    def value_to_etree(self, el, dtype_name, dtype, node):
365
        var = node.get_data_value().Value
366
        if var.Value is not None:
367
            val_el = Et.SubElement(el, 'Value')
368
            self._value_to_etree(val_el, dtype_name, dtype, var.Value)
369
370
371
    def _value_to_etree(self, el, type_name, dtype, val):
372
        if val is None:
373
            return
374
375
        if isinstance(val, (list, tuple)):
376
            if dtype.NamespaceIndex == 0 and dtype.Identifier <= 21:
377
                elname = "uax:ListOf" + type_name
378
            else:  # this is an extentionObject:
379
                elname = "uax:ListOfExtensionObject"
380
381
            list_el = Et.SubElement(el, elname)
382
            for nval in val:
383
                self._value_to_etree(list_el, type_name, dtype, nval)
384
        else:
385
            dtype_base = get_variable_basetype(self.server, dtype)
386
387
            if dtype_base == ua.NodeId(ua.ObjectIds.Enumeration):
388
                dtype_base = ua.NodeId(ua.ObjectIds.Int32)
389
                type_name = ua.ObjectIdNames[dtype_base.Identifier]
390
391
            if dtype_base.NamespaceIndex == 0 and dtype_base.Identifier <= 21:
392
                type_name = ua.ObjectIdNames[dtype_base.Identifier]
393
                val_el = Et.SubElement(el, "uax:" + type_name)
394
                self._val_to_etree(val_el, dtype, val)
395
            else:
396
                self._extobj_to_etree(el, type_name, dtype, val)
397
398
399
    def _extobj_to_etree(self, val_el, name, dtype, val):
400
        obj_el = Et.SubElement(val_el, "uax:ExtensionObject")
401
        type_el = Et.SubElement(obj_el, "uax:TypeId")
402
        id_el = Et.SubElement(type_el, "uax:Identifier")
403
        id_el.text = dtype.to_string()
404
        body_el = Et.SubElement(obj_el, "uax:Body")
405
        struct_el = Et.SubElement(body_el, "uax:" + name)
406
        for name, vtype in val.ua_types.items():
407
            self.member_to_etree(struct_el, name, ua.NodeId(getattr(ua.ObjectIds, vtype)), getattr(val, name))
408
409
410
    def indent(self, elem, level=0):
411
        """
412
        copy and paste from http://effbot.org/zone/element-lib.htm#prettyprint
413
        it basically walks your tree and adds spaces and newlines so the tree is
414
        printed in a nice way
415
        """
416
        i = "\n" + level * "  "
417
        if len(elem):
418
            if not elem.text or not elem.text.strip():
419
                elem.text = i + "  "
420
            if not elem.tail or not elem.tail.strip():
421
                elem.tail = i
422
            for elem in elem:
423
                self.indent(elem, level + 1)
424
            if not elem.tail or not elem.tail.strip():
425
                elem.tail = i
426
        else:
427
            if level and (not elem.tail or not elem.tail.strip()):
428
                elem.tail = i
429