Completed
Pull Request — master (#354)
by
unknown
05:22
created

XmlExporter._val_to_etree()   B

Complexity

Conditions 7

Size

Total Lines 17

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 1 Features 0
Metric Value
cc 7
dl 0
loc 17
rs 7.3333
c 1
b 1
f 0
1
"""
2
from a list of nodes in the address space, build an XML file
3
format is the one from opc-ua specification
4
"""
5
import logging
6
from collections import OrderedDict
7
import xml.etree.ElementTree as Et
8
from copy import copy
9
10
from opcua import ua
11
from opcua.ua import object_ids as o_ids
12
from opcua.common.ua_utils import get_node_supertypes
13
14
15
class XmlExporter(object):
16
17
    def __init__(self, server):
18
        self.logger = logging.getLogger(__name__)
19
        self.server = server
20
        self.aliases = {}
21
        self._addr_idx_to_xml_idx = {}
22
23
        node_set_attributes = OrderedDict()
24
        node_set_attributes['xmlns:xsi'] = 'http://www.w3.org/2001/XMLSchema-instance'
25
        node_set_attributes['xmlns:uax'] = 'http://opcfoundation.org/UA/2008/02/Types.xsd'
26
        node_set_attributes['xmlns:xsd'] = 'http://www.w3.org/2001/XMLSchema'
27
        node_set_attributes['xmlns'] = 'http://opcfoundation.org/UA/2011/03/UANodeSet.xsd'
28
29
        self.etree = Et.ElementTree(Et.Element('UANodeSet', node_set_attributes))
30
31
    def build_etree(self, node_list, uris=None):
32
        """
33
        Create an XML etree object from a list of nodes; custom namespace uris are optional
34
        Namespaces used by nodes are always exported for consistency.
35
        Args:
36
            node_list: list of Node objects for export
37
            uris: list of namespace uri strings
38
39
        Returns:
40
        """
41
        self.logger.info('Building XML etree')
42
43
        self._add_namespaces(node_list, uris)
44
45
        # add all nodes in the list to the XML etree
46
        for node in node_list:
47
            self.node_to_etree(node)
48
49
        # add aliases to the XML etree
50
        self._add_alias_els()
51
52
    def _add_namespaces(self, nodes, uris):
53
        idxs = self._get_ns_idxs_of_nodes(nodes)
54
55
        ns_array = self.server.get_namespace_array()
56
57
        # now add index of provided uris if necessary
58
        if uris:
59
            self._add_idxs_from_uris(idxs, uris, ns_array)
60
61
        # now create a dict of idx_in_address_space to idx_in_exported_file
62
        self._addr_idx_to_xml_idx = self._make_idx_dict(idxs, ns_array)
63
        ns_to_export = [ns_array[i] for i in sorted(list(self._addr_idx_to_xml_idx.keys())) if i != 0]
64
65
        # write namespaces to xml
66
        self._add_namespace_uri_els(ns_to_export)
67
68
    def _make_idx_dict(self, idxs, ns_array):
69
        idxs.sort()
70
        addr_idx_to_xml_idx = {0: 0}
71
        for xml_idx, addr_idx in enumerate(idxs):
72
            if addr_idx >= len(ns_array):
73
                break
74
            addr_idx_to_xml_idx[addr_idx] = xml_idx + 1
75
        return addr_idx_to_xml_idx
76
77
    def _get_ns_idxs_of_nodes(self, nodes):
78
        """
79
        get a list of all indexes used or references by nodes
80
        """
81
        idxs = []
82
        for node in nodes:
83
            node_idxs = [node.nodeid.NamespaceIndex]
84
            node_idxs.append(node.get_browse_name().NamespaceIndex)
85
            node_idxs.extend(ref.NodeId.NamespaceIndex for ref in node.get_references())
86
            node_idxs = list(set(node_idxs))  # remove duplicates
87
            for i in node_idxs:
88
                if i != 0 and i not in idxs:
89
                    idxs.append(i)
90
        return idxs
91
92
    def _add_idxs_from_uris(self, idxs, uris, ns_array):
93
        for uri in uris:
94
            if uri in ns_array:
95
                i = ns_array.index(uri)
96
                if i not in idxs:
97
                    idxs.append(i)
98
99
100
    def write_xml(self, xmlpath, pretty=True):
101
        """
102
        Write the XML etree in the exporter object to a file
103
        Args:
104
            xmlpath: string representing the path/file name
105
106
        Returns:
107
        """
108
        # try to write the XML etree to a file
109
        self.logger.info('Exporting XML file to %s', xmlpath)
110
        # from IPython import embed
111
        # embed()
112
        if pretty:
113
            self.indent(self.etree.getroot())
114
            self.etree.write(xmlpath,
115
                             encoding='utf-8',
116
                             xml_declaration=True
117
                            )
118
        else:
119
            self.etree.write(xmlpath,
120
                             encoding='utf-8',
121
                             xml_declaration=True
122
                            )
123
124
    def dump_etree(self):
125
        """
126
        Dump etree to console for debugging
127
        Returns:
128
        """
129
        self.logger.info('Dumping XML etree to console')
130
        Et.dump(self.etree)
131
132
    def node_to_etree(self, node):
133
        """
134
        Add the necessary XML sub elements to the etree for exporting the node
135
        Args:
136
            node: Node object which will be added to XML etree
137
138
        Returns:
139
        """
140
        node_class = node.get_node_class()
141
142
        if node_class is ua.NodeClass.Object:
143
            self.add_etree_object(node)
144
        elif node_class is ua.NodeClass.ObjectType:
145
            self.add_etree_object_type(node)
146
        elif node_class is ua.NodeClass.Variable:
147
            self.add_etree_variable(node)
148
        elif node_class is ua.NodeClass.VariableType:
149
            self.add_etree_variable_type(node)
150
        elif node_class is ua.NodeClass.ReferenceType:
151
            self.add_etree_reference_type(node)
152
        elif node_class is ua.NodeClass.DataType:
153
            self.add_etree_datatype(node)
154
        elif node_class is ua.NodeClass.Method:
155
            self.add_etree_method(node)
156
        else:
157
            self.logger.info("Exporting node class not implemented: %s ", node_class)
158
159
    def _add_sub_el(self, el, name, text):
160
        child_el = Et.SubElement(el, name)
161
        child_el.text = text
162
        return child_el
163
164
    def _node_to_string(self, nodeid):
165
        if not isinstance(nodeid, ua.NodeId):
166
            nodeid = nodeid.nodeid
167
168
        if nodeid.NamespaceIndex in self._addr_idx_to_xml_idx:
169
            nodeid = copy(nodeid)
170
            nodeid.NamespaceIndex = self._addr_idx_to_xml_idx[nodeid.NamespaceIndex]
171
        return nodeid.to_string()
172
173
    def _bname_to_string(self, bname):
174
        if bname.NamespaceIndex in self._addr_idx_to_xml_idx:
175
            bname = copy(bname)
176
            bname.NamespaceIndex = self._addr_idx_to_xml_idx[bname.NamespaceIndex]
177
        return bname.to_string()
178
179
    def _add_node_common(self, nodetype, node):
180
        browsename = node.get_browse_name()
181
        nodeid = node.nodeid
182
        parent = node.get_parent()
183
        displayname = node.get_display_name().Text.decode('utf-8')
184
        desc = node.get_description().Text
185
        node_el = Et.SubElement(self.etree.getroot(), nodetype)
186
        node_el.attrib["NodeId"] = self._node_to_string(nodeid)
187
        node_el.attrib["BrowseName"] = self._bname_to_string(browsename)
188
        if parent is not None:
189
            node_class = node.get_node_class()
190
            if node_class in (ua.NodeClass.Object, ua.NodeClass.Variable, ua.NodeClass.Method):
191
                node_el.attrib["ParentNodeId"] = self._node_to_string(parent)
192
        self._add_sub_el(node_el, 'DisplayName', displayname)
193
        if desc not in (None, ""):
194
            self._add_sub_el(node_el, 'Description', desc.decode('utf-8'))
195
        # FIXME: add WriteMask and UserWriteMask
196
        return node_el
197
198
    def add_etree_object(self, node):
199
        """
200
        Add a UA object element to the XML etree
201
        """
202
        obj_el = self._add_node_common("UAObject", node)
203
        var = node.get_attribute(ua.AttributeIds.EventNotifier)
204
        if var.Value.Value != 0:
205
            obj_el.attrib["EventNotifier"] = str(var.Value.Value)
206
        self._add_ref_els(obj_el, node)
207
208
    def add_etree_object_type(self, node):
209
        """
210
        Add a UA object type element to the XML etree
211
        """
212
        obj_el = self._add_node_common("UAObjectType", node)
213
        abstract = node.get_attribute(ua.AttributeIds.IsAbstract).Value.Value
214
        if abstract:
215
            obj_el.attrib["IsAbstract"] = 'true'
216
        self._add_ref_els(obj_el, node)
217
218
    def add_variable_common(self, node, el):
219
        dtype = node.get_data_type()
220
        if dtype.NamespaceIndex == 0 and dtype.Identifier in o_ids.ObjectIdNames:
221
            dtype_name = o_ids.ObjectIdNames[dtype.Identifier]
222
            self.aliases[dtype] = dtype_name
223
        else:
224
            dtype_name = dtype.to_string()
225
        rank = node.get_value_rank()
226
        if rank != -1:
227
            el.attrib["ValueRank"] = str(rank)
228
        dim = node.get_attribute(ua.AttributeIds.ArrayDimensions)
229
        if dim.Value.Value:
230
            el.attrib["ArrayDimensions"] = ",".join([str(i) for i in dim.Value.Value])
231
        el.attrib["DataType"] = dtype_name
232
        self.value_to_etree(el, dtype_name, dtype, node)
233
234
    def _get_variable_basetype(self, datatype):
235
        """
236
        Looks up the base datatype of the provided datatype. 
237
        The base datatype is either:
238
        A primitive type (ns=0, i<=21) or a complex one (ns=0 i>21 and i<=30) like Enum and Struct.
239
        
240
        Args:
241
            datatype: NodeId of a datype of a variable
242
        Returns:
243
            NodeId of datatype base or None in case base datype can not be determined
244
        """
245
        if datatype.NamespaceIndex == 0 and type(datatype) == ua.NumericNodeId and datatype.Identifier <= 30:
246
            return datatype
247
        dtype_supers_nodes = get_node_supertypes(self.server.get_node(datatype), includeitself=True, skipbase=True)
248
        dtype_supers_nodeids = [node.nodeid for node in dtype_supers_nodes if node.nodeid.NamespaceIndex == 0 and  node.nodeid.Identifier <= 30]
249
        if len(dtype_supers_nodeids) >= 1:
250
            return dtype_supers_nodeids[0]
251
        return None
252
253
    def add_etree_variable(self, node):
254
        """
255
        Add a UA variable element to the XML etree
256
        """
257
        var_el = self._add_node_common("UAVariable", node)
258
        self.add_variable_common(node, var_el)
259
260
        accesslevel = node.get_attribute(ua.AttributeIds.AccessLevel).Value.Value
261
        useraccesslevel = node.get_attribute(ua.AttributeIds.UserAccessLevel).Value.Value
262
263
        # We only write these values if they are different from defaults
264
        # Not sure where default is defined....
265
        if accesslevel not in (0, ua.AccessLevel.CurrentRead.mask):
266
            var_el.attrib["AccessLevel"] = str(accesslevel)
267
        if useraccesslevel not in (0, ua.AccessLevel.CurrentRead.mask):
268
            var_el.attrib["UserAccessLevel"] = str(useraccesslevel)
269
270
        var = node.get_attribute(ua.AttributeIds.MinimumSamplingInterval)
271
        if var.Value.Value:
272
            var_el.attrib["MinimumSamplingInterval"] = str(var.Value.Value)
273
        var = node.get_attribute(ua.AttributeIds.Historizing)
274
        if var.Value.Value:
275
            var_el.attrib["Historizing"] = 'true'
276
        self._add_ref_els(var_el, node)
277
278
    def add_etree_variable_type(self, node):
279
        """
280
        Add a UA variable type element to the XML etree
281
        """
282
283
        var_el = self._add_node_common("UAVariableType", node)
284
        self.add_variable_common(node, var_el)
285
286
        abstract = node.get_attribute(ua.AttributeIds.IsAbstract)
287
        if abstract.Value.Value:
288
            var_el.attrib["IsAbstract"] = "true"
289
290
        self._add_ref_els(var_el, node)
291
292
    def add_etree_method(self, node):
293
        obj_el = self._add_node_common("UAMethod", node)
294
295
        var = node.get_attribute(ua.AttributeIds.Executable)
296
        if var.Value.Value is False:
297
            obj_el.attrib["Executable"] = "false"
298
        var = node.get_attribute(ua.AttributeIds.UserExecutable)
299
        if var.Value.Value is False:
300
            obj_el.attrib["UserExecutable"] = "false"
301
        self._add_ref_els(obj_el, node)
302
303
    def add_etree_reference_type(self, obj):
304
        obj_el = self._add_node_common("UAReferenceType", obj)
305
        var = obj.get_attribute(ua.AttributeIds.InverseName)
306
        if var is not None and var.Value.Value is not None:
307
            self._add_sub_el(obj_el, 'InverseName', var.Value.Value.Text.decode('utf-8'))
308
        self._add_ref_els(obj_el, obj)
309
310
    def add_etree_datatype(self, obj):
311
        """
312
        Add a UA data type element to the XML etree
313
        """
314
        obj_el = self._add_node_common("UADataType", obj)
315
        self._add_ref_els(obj_el, obj)
316
317
    def _add_namespace_uri_els(self, uris):
318
        nuris_el = Et.Element('NamespaceUris')
319
320
        for uri in uris:
321
            self._add_sub_el(nuris_el, 'Uri', uri)
322
323
        self.etree.getroot().insert(0, nuris_el)
324
325
    def _add_alias_els(self):
326
        aliases_el = Et.Element('Aliases')
327
328
        ordered_keys = list(self.aliases.keys())
329
        ordered_keys.sort()
330
        for nodeid in ordered_keys:
331
            name = self.aliases[nodeid]
332
            ref_el = Et.SubElement(aliases_el, 'Alias', Alias=name)
333
            ref_el.text = nodeid.to_string()
334
335
        self.etree.getroot().insert(0, aliases_el)
336
337
    def _add_ref_els(self, parent_el, obj):
338
        refs = obj.get_references()
339
        refs_el = Et.SubElement(parent_el, 'References')
340
341
        for ref in refs:
342
            if ref.ReferenceTypeId.Identifier in o_ids.ObjectIdNames:
343
                ref_name = o_ids.ObjectIdNames[ref.ReferenceTypeId.Identifier]
344
            else:
345
                ref_name = ref.ReferenceTypeId.to_string()
346
            ref_el = Et.SubElement(refs_el, 'Reference')
347
            ref_el.attrib['ReferenceType'] = ref_name
348
            if not ref.IsForward:
349
                ref_el.attrib['IsForward'] = 'false'
350
            ref_el.text = self._node_to_string(ref.NodeId)
351
352
            self.aliases[ref.ReferenceTypeId] = ref_name
353
354
355
    def member_to_etree(self, el, name, dtype, val):
356
        member_el = Et.SubElement(el, "uax:" + name)
357
        if isinstance(val, (list, tuple)):
358
            for v in val:
359
                self._value_to_etree(member_el, ua.ObjectIdNames[dtype.Identifier], dtype, v)
360
        else:
361
            self._val_to_etree(member_el, dtype, val)
362
363
364
    def _val_to_etree(self, el, dtype, val):
365
        if val is None:
366
            val = ""
367
        if dtype == ua.NodeId(ua.ObjectIds.NodeId):
368
            id_el = Et.SubElement(el, "uax:Identifier")
369
            id_el.text = val.to_string()
370
        elif dtype == ua.NodeId(ua.ObjectIds.Guid):
371
            id_el = Et.SubElement(el, "uax:String")
372
            id_el.text = str(val)
373
        elif not hasattr(val, "ua_types"):
374
            if isinstance(val, bytes):
375
                el.text = val.decode("utf-8")
376
            else:
377
                el.text = str(val)
378
        else:
379
            for name, vtype in val.ua_types.items():
380
                self.member_to_etree(el, name, ua.NodeId(getattr(ua.ObjectIds, vtype)), getattr(val, name))
381
382
383
    def value_to_etree(self, el, dtype_name, dtype, node):
384
        var = node.get_data_value().Value
385
        if var.Value is not None:
386
            val_el = Et.SubElement(el, 'Value')
387
            self._value_to_etree(val_el, dtype_name, dtype, var.Value)
388
389
390
    def _value_to_etree(self, el, type_name, dtype, val):
391
        if val is None:
392
            return
393
394
        dtype_base = self._get_variable_basetype(dtype)
395
396
        if isinstance(val, (list, tuple)):
397
            if dtype.NamespaceIndex == 0 and dtype.Identifier <= 21:
398
                elname = "uax:ListOf" + type_name
399
            else:  # this is an extentionObject:
400
                elname = "uax:ListOfExtensionObject"
401
402
            list_el = Et.SubElement(el, elname)
403
            for nval in val:
404
                self._value_to_etree(list_el, type_name, dtype, nval)
405
        else:
406
            if dtype_base == ua.NodeId(ua.ObjectIds.Enumeration):
407
                dtype_base = ua.NodeId(ua.ObjectIds.Int32)
408
                type_name = ua.ObjectIdNames[dtype_base.Identifier]
409
410
            if dtype_base.NamespaceIndex == 0 and dtype_base.Identifier <= 21:
411
                type_name = ua.ObjectIdNames[dtype_base.Identifier]
412
                val_el = Et.SubElement(el, "uax:" + type_name)
413
                self._val_to_etree(val_el, dtype, val)
414
            else:
415
                self._extobj_to_etree(el, type_name, dtype, val)
416
417
418
    def _extobj_to_etree(self, val_el, name, dtype, val):
419
        obj_el = Et.SubElement(val_el, "uax:ExtensionObject")
420
        type_el = Et.SubElement(obj_el, "uax:TypeId")
421
        id_el = Et.SubElement(type_el, "uax:Identifier")
422
        id_el.text = dtype.to_string()
423
        body_el = Et.SubElement(obj_el, "uax:Body")
424
        struct_el = Et.SubElement(body_el, "uax:" + name)
425
        for name, vtype in val.ua_types.items():
426
            self.member_to_etree(struct_el, name, ua.NodeId(getattr(ua.ObjectIds, vtype)), getattr(val, name))
427
428
429
    def indent(self, elem, level=0):
430
        """
431
        copy and paste from http://effbot.org/zone/element-lib.htm#prettyprint
432
        it basically walks your tree and adds spaces and newlines so the tree is
433
        printed in a nice way
434
        """
435
        i = "\n" + level * "  "
436
        if len(elem):
437
            if not elem.text or not elem.text.strip():
438
                elem.text = i + "  "
439
            if not elem.tail or not elem.tail.strip():
440
                elem.tail = i
441
            for elem in elem:
442
                self.indent(elem, level + 1)
443
            if not elem.tail or not elem.tail.strip():
444
                elem.tail = i
445
        else:
446
            if level and (not elem.tail or not elem.tail.strip()):
447
                elem.tail = i
448