Completed
Push — master ( 3728c0...8941cc )
by Olivier
03:39
created

XmlExporter.write_xml()   A

Complexity

Conditions 2

Size

Total Lines 22

Duplication

Lines 0
Ratio 0 %

Importance

Changes 5
Bugs 1 Features 2
Metric Value
cc 2
c 5
b 1
f 2
dl 0
loc 22
rs 9.2
1
"""
2
from a list of nodes in the address space, build an XML file
3
format is the one from opc-ua specification
4
"""
5
import logging
6
from collections import OrderedDict
7
import xml.etree.ElementTree as Et
8
9
from opcua import ua
10
from opcua.ua import object_ids as o_ids
11
12
13
class XmlExporter(object):
14
15
    def __init__(self, server):
16
        self.logger = logging.getLogger(__name__)
17
        self.server = server
18
        self.aliases = {}
19
20
        node_set_attributes = OrderedDict()
21
        node_set_attributes['xmlns:xsi'] = 'http://www.w3.org/2001/XMLSchema-instance'
22
        node_set_attributes['xmlns:uax'] = 'http://opcfoundation.org/UA/2008/02/Types.xsd'
23
        node_set_attributes['xmlns:xsd'] = 'http://www.w3.org/2001/XMLSchema'
24
        node_set_attributes['xmlns'] = 'http://opcfoundation.org/UA/2011/03/UANodeSet.xsd'
25
26
        self.etree = Et.ElementTree(Et.Element('UANodeSet', node_set_attributes))
27
28
    def build_etree(self, node_list, uris=None):
29
        """
30
        Create an XML etree object from a list of nodes; custom namespace uris are optional
31
        Args:
32
            node_list: list of Node objects for export
33
            uris: list of namespace uri strings
34
35
        Returns:
36
        """
37
        self.logger.info('Building XML etree')
38
39
        # add all nodes in the list to the XML etree
40
        for node in node_list:
41
            self.node_to_etree(node)
42
43
        # add aliases to the XML etree
44
        self._add_alias_els()
45
46
        if uris:
47
            # add namespace uris to the XML etree
48
            self._add_namespace_uri_els(uris)
49
50
    def write_xml(self, xmlpath, pretty=True):
51
        """
52
        Write the XML etree in the exporter object to a file
53
        Args:
54
            xmlpath: string representing the path/file name
55
56
        Returns:
57
        """
58
        # try to write the XML etree to a file
59
        self.logger.info('Exporting XML file to %s', xmlpath)
60
        #from IPython import embed
61
        # embed()
62
        if pretty:
63
            indent(self.etree.getroot())
64
            self.etree.write(xmlpath,
65
                             encoding='utf-8',
66
                             xml_declaration=True
67
                             )
68
        else:
69
            self.etree.write(xmlpath,
70
                             encoding='utf-8',
71
                             xml_declaration=True
72
                             )
73
74
    def dump_etree(self):
75
        """
76
        Dump etree to console for debugging
77
        Returns:
78
        """
79
        self.logger.info('Dumping XML etree to console')
80
        Et.dump(self.etree)
81
82
    def node_to_etree(self, node):
83
        """
84
        Add the necessary XML sub elements to the etree for exporting the node
85
        Args:
86
            node: Node object which will be added to XML etree
87
88
        Returns:
89
        """
90
        node_class = node.get_node_class()
91
        print("Exporting: ", node)
92
93
        if node_class is ua.NodeClass.Object:
94
            self.add_etree_object(node)
95
        elif node_class is ua.NodeClass.ObjectType:
96
            self.add_etree_object_type(node)
97
        elif node_class is ua.NodeClass.Variable:
98
            self.add_etree_variable(node)
99
        elif node_class is ua.NodeClass.VariableType:
100
            self.add_etree_variable_type(node)
101
        elif node_class is ua.NodeClass.ReferenceType:
102
            self.add_etree_reference_type(node)
103
        elif node_class is ua.NodeClass.DataType:
104
            self.add_etree_datatype(node)
105
        elif node_class is ua.NodeClass.Method:
106
            self.add_etree_method(node)
107
        else:
108
            self.logger.info("Exporting node class not implemented: %s ", node_class)
109
110
    def _add_sub_el(self, el, name, text):
111
        child_el = Et.SubElement(el, name)
112
        child_el.text = text
113
        return child_el
114
115
    def _add_node_common(self, nodetype, node):
116
        browsename = node.get_browse_name().to_string()
117
        nodeid = node.nodeid.to_string()
118
        parent = node.get_parent()
119
        displayname = node.get_display_name().Text.decode('utf-8')
120
        desc = node.get_description().Text
121
        print("NODE COMMON", node)
122
        node_el = Et.SubElement(self.etree.getroot(), nodetype)
123
        node_el.attrib["NodeId"] = nodeid
124
        node_el.attrib["BrowseName"] = browsename
125
        if parent is not None:
126
            node_class = node.get_node_class()
127
            if node_class in (ua.NodeClass.Object, ua.NodeClass.Variable, ua.NodeClass.Method):
128
                node_el.attrib["ParentNodeId"] = parent.nodeid.to_string()
129
        self._add_sub_el(node_el, 'DisplayName', displayname)
130
        if desc not in (None, ""):
131
            self._add_sub_el(node_el, 'Description', desc.decode('utf-8'))
132
        # FIXME: add WriteMask and UserWriteMask
133
        return node_el
134
135
    def add_etree_object(self, node):
136
        """
137
        Add a UA object element to the XML etree
138
        """
139
        obj_el = self._add_node_common("UAObject", node)
140
        var = node.get_attribute(ua.AttributeIds.EventNotifier)
141
        if var.Value.Value != 0:
142
            obj_el.attrib["EventNotifier"] = str(var.Value.Value)
143
        self._add_ref_els(obj_el, node)
144
145
    def add_etree_object_type(self, node):
146
        """
147
        Add a UA object type element to the XML etree
148
        """
149
        obj_el = self._add_node_common("UAObjectType", node)
150
        abstract = node.get_attribute(ua.AttributeIds.IsAbstract).Value.Value
151
        if abstract:
152
            obj_el.attrib["IsAbstract"] = 'true'
153
        self._add_ref_els(obj_el, node)
154
155
    def add_variable_common(self, node, el):
156
        dtype = node.get_data_type()
157
        if dtype.Identifier in o_ids.ObjectIdNames:
158
            dtype_name = o_ids.ObjectIdNames[dtype.Identifier]
159
            self.aliases[dtype_name] = dtype.to_string()
160
        else:
161
            dtype_name = dtype.to_string()
162
        rank = node.get_value_rank()
163
        if rank != -1:
164
            el.attrib["ValueRank"] = str(rank)
165
        #var = node.get_attribute(ua.AttributeIds.ArrayDimensions())
166
        #self._addobj_el.attrib["ArrayDimensions"] = str(var.Value.Value)
167
        el.attrib["DataType"] = dtype_name
168
        value_to_etree(el, dtype_name, dtype, node)
169
170
    def add_etree_variable(self, node):
171
        """
172
        Add a UA variable element to the XML etree
173
        """
174
        var_el = self._add_node_common("UAVariable", node)
175
        self.add_variable_common(node, var_el)
176
177
        accesslevel = node.get_attribute(ua.AttributeIds.AccessLevel).Value.Value
178
        useraccesslevel = node.get_attribute(ua.AttributeIds.UserAccessLevel).Value.Value
179
180
        # We only write these values if they are different from defaults
181
        # Not sure where default is defined....
182
        if accesslevel not in (0, ua.AccessLevel.CurrentRead.mask):
183
            var_el.attrib["AccessLevel"] = str(accesslevel)
184
        if useraccesslevel not in (0, ua.AccessLevel.CurrentRead.mask):
185
            var_el.attrib["UserAccessLevel"] = str(useraccesslevel)
186
187
        var = node.get_attribute(ua.AttributeIds.MinimumSamplingInterval)
188
        if var.Value.Value:
189
            var_el.attrib["MinimumSamplingInterval"] = str(var.Value.Value)
190
        var = node.get_attribute(ua.AttributeIds.Historizing)
191
        if var.Value.Value:
192
            var_el.attrib["Historizing"] = 'true'
193
        self._add_ref_els(var_el, node)
194
195
    def add_etree_variable_type(self, node):
196
        """
197
        Add a UA variable type element to the XML etree
198
        """
199
200
        var_el = self._add_node_common("UAVariableType", node)
201
        self.add_variable_common(node, var_el)
202
203
        abstract = node.get_attribute(ua.AttributeIds.IsAbstract)
204
        if abstract.Value.Value:
205
            var_el.attrib["IsAbstract"] = "true"
206
207
        self._add_ref_els(var_el, node)
208
209
    def add_etree_method(self, node):
210
        obj_el = self._add_node_common("UAMethod", node)
211
212
        var = node.get_attribute(ua.AttributeIds.Executable)
213
        if var.Value.Value is False:
214
            obj_el.attrib["Executable"] = "false"
215
        var = node.get_attribute(ua.AttributeIds.UserExecutable)
216
        if var.Value.Value is False:
217
            obj_el.attrib["UserExecutable"] = "false"
218
        self._add_ref_els(obj_el, node)
219
220
    def add_etree_reference_type(self, obj):
221
        obj_el = self._add_node_common("UAReferenceType", obj)
222
        var = obj.get_attribute(ua.AttributeIds.InverseName)
223
        if var is not None and var.Value.Value is not None:
224
            self._add_sub_el(obj_el, 'InverseName', var.Value.Value.Text.decode('utf-8'))
225
        self._add_ref_els(obj_el, obj)
226
227
    def add_etree_datatype(self, obj):
228
        """
229
        Add a UA data type element to the XML etree
230
        """
231
        obj_el = self._add_node_common("UADataType", obj)
232
        self._add_ref_els(obj_el, obj)
233
234
    def _add_namespace_uri_els(self, uris):
235
        nuris_el = Et.Element('NamespaceUris')
236
237
        for uri in uris:
238
            self._add_sub_el(nuris_el, 'Uri', uri)
239
240
        self.etree.getroot().insert(0, nuris_el)
241
242
    def _add_alias_els(self):
243
        aliases_el = Et.Element('Aliases')
244
245
        for k, v in self.aliases.items():
246
            ref_el = Et.SubElement(aliases_el, 'Alias', Alias=k)
247
            ref_el.text = v
248
249
        self.etree.getroot().insert(0, aliases_el)
250
251
    def _add_ref_els(self, parent_el, obj):
252
        refs = obj.get_references()
253
        refs_el = Et.SubElement(parent_el, 'References')
254
255
        for ref in refs:
256
            if ref.ReferenceTypeId.Identifier in o_ids.ObjectIdNames:
257
                ref_name = o_ids.ObjectIdNames[ref.ReferenceTypeId.Identifier]
258
            else:
259
                ref_name = ref.ReferenceTypeId.to_string()
260
            ref_el = Et.SubElement(refs_el, 'Reference')
261
            ref_el.attrib['ReferenceType'] = ref_name
262
            if not ref.IsForward:
263
                ref_el.attrib['IsForward'] = 'false'
264
            ref_el.text = ref.NodeId.to_string()
265
266
            self.aliases[ref_name] = ref.ReferenceTypeId.to_string()
267
268
269
def member_to_etree(el, name, dtype, val):
270
    member_el = Et.SubElement(el, "uax:" + name)
271
    if isinstance(val, (list, tuple)):
272
        for v in val:
273
            _value_to_etree(member_el, ua.ObjectIdNames[dtype.Identifier], dtype, v)
274
    else:
275
        _val_to_etree(member_el, dtype, val)
276
277
278
def _val_to_etree(el, dtype, val):
279
    if val is None:
280
        val = ""
281
    if dtype == ua.NodeId(ua.ObjectIds.NodeId):
282
        id_el = Et.SubElement(el, "uax:Identifier")
283
        id_el.text = val.to_string()
284
    elif not hasattr(val, "ua_types"):
285
        el.text = str(val)
286
    else:
287
        for name, vtype in val.ua_types.items():
288
            member_to_etree(el, name, ua.NodeId(getattr(ua.ObjectIds, vtype)), getattr(val, name))
289
290
291
def value_to_etree(el, dtype_name, dtype, node):
292
    var = node.get_data_value().Value
293
    if var.Value is not None:
294
        val_el = Et.SubElement(el, 'Value')
295
        _value_to_etree(val_el, dtype_name, dtype, var.Value)
296
297
298
def _value_to_etree(el, type_name, dtype, val):
299
    if not val:
300
        return
301
    if isinstance(val, (list, tuple)):
302
        if dtype.Identifier > 21:  # this is an extentionObject:
303
            elname = "uax:ListOfExtensionObject"
304
        else:
305
            elname = "uax:ListOf" + type_name
306
        list_el = Et.SubElement(el, elname)
307
        for nval in val:
308
            _value_to_etree(list_el, type_name, dtype, nval)
309
    else:
310
        if dtype.Identifier > 21:  # this is an extentionObject:
311
            _extobj_to_etree(el, type_name, dtype, val)
312
        else:
313
            val_el = Et.SubElement(el, "uax:" + type_name)
314
            _val_to_etree(val_el, dtype, val)
315
           
316
317
def _extobj_to_etree(val_el, name, dtype, val):
318
    obj_el = Et.SubElement(val_el, "uax:ExtensionObject")
319
    type_el = Et.SubElement(obj_el, "uax:TypeId")
320
    id_el = Et.SubElement(type_el, "uax:Identifier")
321
    id_el.text = dtype.to_string()
322
    body_el = Et.SubElement(obj_el, "uax:Body")
323
    struct_el = Et.SubElement(body_el, "uax:" + name)
324
    for name, vtype in val.ua_types.items():
325
        member_to_etree(struct_el, name, ua.NodeId(getattr(ua.ObjectIds, vtype)), getattr(val, name))
326
327
328
def indent(elem, level=0):
329
    '''
330
    copy and paste from http://effbot.org/zone/element-lib.htm#prettyprint
331
    it basically walks your tree and adds spaces and newlines so the tree is
332
    printed in a nice way
333
    '''
334
    i = "\n" + level * "  "
335
    if len(elem):
336
        if not elem.text or not elem.text.strip():
337
            elem.text = i + "  "
338
        if not elem.tail or not elem.tail.strip():
339
            elem.tail = i
340
        for elem in elem:
341
            indent(elem, level + 1)
342
        if not elem.tail or not elem.tail.strip():
343
            elem.tail = i
344
    else:
345
        if level and (not elem.tail or not elem.tail.strip()):
346
            elem.tail = i
347