Completed
Pull Request — master (#326)
by Olivier
05:02
created

XmlExporter.__init__()   A

Complexity

Conditions 1

Size

Total Lines 12

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
c 1
b 0
f 0
dl 0
loc 12
rs 9.4285
1
"""
2
from a list of nodes in the address space, build an XML file
3
format is the one from opc-ua specification
4
"""
5
import logging
6
from collections import OrderedDict
7
import xml.etree.ElementTree as Et
8
from copy import copy
9
10
from opcua import ua
11
from opcua.ua import object_ids as o_ids
12
13
14
class XmlExporter(object):
15
16
    def __init__(self, server):
17
        self.logger = logging.getLogger(__name__)
18
        self.server = server
19
        self.aliases = {}
20
21
        node_set_attributes = OrderedDict()
22
        node_set_attributes['xmlns:xsi'] = 'http://www.w3.org/2001/XMLSchema-instance'
23
        node_set_attributes['xmlns:uax'] = 'http://opcfoundation.org/UA/2008/02/Types.xsd'
24
        node_set_attributes['xmlns:xsd'] = 'http://www.w3.org/2001/XMLSchema'
25
        node_set_attributes['xmlns'] = 'http://opcfoundation.org/UA/2011/03/UANodeSet.xsd'
26
27
        self.etree = Et.ElementTree(Et.Element('UANodeSet', node_set_attributes))
28
29
    def build_etree(self, node_list, uris=None):
30
        """
31
        Create an XML etree object from a list of nodes; custom namespace uris are optional
32
        Namespaces used by nodes are allways exported for consistency.
33
        Args:
34
            node_list: list of Node objects for export
35
            uris: list of namespace uri strings
36
37
        Returns:
38
        """
39
        self.logger.info('Building XML etree')
40
41
        self._add_namespaces(node_list, uris)
42
43
        # add all nodes in the list to the XML etree
44
        for node in node_list:
45
            self.node_to_etree(node)
46
47
        # add aliases to the XML etree
48
        self._add_alias_els()
49
50
    def _add_namespaces(self, nodes, uris):
51
        # first get a list of all indexes used by nodes
52
        idxs = []
53
        for node in nodes:
54
            if node.nodeid.NamespaceIndex != 0 and node.nodeid.NamespaceIndex not in idxs:
55
                idxs.append(node.nodeid.NamespaceIndex)
56
        ns_array = self.server.get_namespace_array()
57
        print("IDXS 1", idxs)
58
59
        # now add index of provided uris if necessary
60
        if uris:
61
            for uri in uris:
62
                if uri in ns_array:
63
                    i = ns_array.index(uri)
64
                    if i not in idxs:
65
                        idxs.append(i)
66
        print("IDXS 2", idxs)
67
68
        # now create a dict of idx_in_address_space to idx_in_exported_file
69
        idxs.sort()
70
        self._addr_idx_to_xml_idx = {0: 0}
71
        ns_to_export = []
72
        for xml_idx, addr_idx in enumerate(idxs):
73
            if addr_idx >= len(ns_array):
74
                break
75
            self._addr_idx_to_xml_idx[addr_idx] = xml_idx + 1
76
            ns_to_export.append(ns_array[addr_idx])
77
        print(" ADDR_TO_ XML", self._addr_idx_to_xml_idx)
78
79
        # write namespaces to xml
80
        self._add_namespace_uri_els(ns_to_export)
81
82
    def write_xml(self, xmlpath, pretty=True):
83
        """
84
        Write the XML etree in the exporter object to a file
85
        Args:
86
            xmlpath: string representing the path/file name
87
88
        Returns:
89
        """
90
        # try to write the XML etree to a file
91
        self.logger.info('Exporting XML file to %s', xmlpath)
92
        #from IPython import embed
93
        # embed()
94
        if pretty:
95
            indent(self.etree.getroot())
96
            self.etree.write(xmlpath,
97
                             encoding='utf-8',
98
                             xml_declaration=True
99
                             )
100
        else:
101
            self.etree.write(xmlpath,
102
                             encoding='utf-8',
103
                             xml_declaration=True
104
                             )
105
106
    def dump_etree(self):
107
        """
108
        Dump etree to console for debugging
109
        Returns:
110
        """
111
        self.logger.info('Dumping XML etree to console')
112
        Et.dump(self.etree)
113
114
    def node_to_etree(self, node):
115
        """
116
        Add the necessary XML sub elements to the etree for exporting the node
117
        Args:
118
            node: Node object which will be added to XML etree
119
120
        Returns:
121
        """
122
        node_class = node.get_node_class()
123
        print("Exporting: ", node)
124
125
        if node_class is ua.NodeClass.Object:
126
            self.add_etree_object(node)
127
        elif node_class is ua.NodeClass.ObjectType:
128
            self.add_etree_object_type(node)
129
        elif node_class is ua.NodeClass.Variable:
130
            self.add_etree_variable(node)
131
        elif node_class is ua.NodeClass.VariableType:
132
            self.add_etree_variable_type(node)
133
        elif node_class is ua.NodeClass.ReferenceType:
134
            self.add_etree_reference_type(node)
135
        elif node_class is ua.NodeClass.DataType:
136
            self.add_etree_datatype(node)
137
        elif node_class is ua.NodeClass.Method:
138
            self.add_etree_method(node)
139
        else:
140
            self.logger.info("Exporting node class not implemented: %s ", node_class)
141
142
    def _add_sub_el(self, el, name, text):
143
        child_el = Et.SubElement(el, name)
144
        child_el.text = text
145
        return child_el
146
147
    def _node_to_string(self, nodeid):
148
        if not isinstance(nodeid, ua.NodeId):
149
            nodeid = nodeid.nodeid
150
151
        if nodeid.NamespaceIndex in self._addr_idx_to_xml_idx:
152
            nodeid = copy(nodeid)
153
            nodeid.NamespaceIndex = self._addr_idx_to_xml_idx[nodeid.NamespaceIndex]
154
        return nodeid.to_string()
155
156
    def _bname_to_string(self, bname):
157
        if bname.NamespaceIndex in self._addr_idx_to_xml_idx:
158
            bname = copy(bname)
159
            bname.NamespaceIndex = self._addr_idx_to_xml_idx[bname.NamespaceIndex]
160
        return bname.to_string()
161
162
    def _add_node_common(self, nodetype, node):
163
        browsename = node.get_browse_name()
164
        nodeid = node.nodeid
165
        parent = node.get_parent()
166
        displayname = node.get_display_name().Text.decode('utf-8')
167
        desc = node.get_description().Text
168
        print("NODE COMMON", node)
169
        node_el = Et.SubElement(self.etree.getroot(), nodetype)
170
        node_el.attrib["NodeId"] = self._node_to_string(nodeid)
171
        node_el.attrib["BrowseName"] = self._bname_to_string(browsename)
172
        if parent is not None:
173
            node_class = node.get_node_class()
174
            if node_class in (ua.NodeClass.Object, ua.NodeClass.Variable, ua.NodeClass.Method):
175
                node_el.attrib["ParentNodeId"] = parent.nodeid.to_string()
176
        self._add_sub_el(node_el, 'DisplayName', displayname)
177
        if desc not in (None, ""):
178
            self._add_sub_el(node_el, 'Description', desc.decode('utf-8'))
179
        # FIXME: add WriteMask and UserWriteMask
180
        return node_el
181
182
    def add_etree_object(self, node):
183
        """
184
        Add a UA object element to the XML etree
185
        """
186
        obj_el = self._add_node_common("UAObject", node)
187
        var = node.get_attribute(ua.AttributeIds.EventNotifier)
188
        if var.Value.Value != 0:
189
            obj_el.attrib["EventNotifier"] = str(var.Value.Value)
190
        self._add_ref_els(obj_el, node)
191
192
    def add_etree_object_type(self, node):
193
        """
194
        Add a UA object type element to the XML etree
195
        """
196
        obj_el = self._add_node_common("UAObjectType", node)
197
        abstract = node.get_attribute(ua.AttributeIds.IsAbstract).Value.Value
198
        if abstract:
199
            obj_el.attrib["IsAbstract"] = 'true'
200
        self._add_ref_els(obj_el, node)
201
202
    def add_variable_common(self, node, el):
203
        dtype = node.get_data_type()
204
        if dtype.Identifier in o_ids.ObjectIdNames:
205
            dtype_name = o_ids.ObjectIdNames[dtype.Identifier]
206
            self.aliases[dtype_name] = dtype.to_string()
207
        else:
208
            dtype_name = dtype.to_string()
209
        rank = node.get_value_rank()
210
        if rank != -1:
211
            el.attrib["ValueRank"] = str(rank)
212
        #var = node.get_attribute(ua.AttributeIds.ArrayDimensions())
213
        #self._addobj_el.attrib["ArrayDimensions"] = str(var.Value.Value)
214
        el.attrib["DataType"] = dtype_name
215
        value_to_etree(el, dtype_name, dtype, node)
216
217
    def add_etree_variable(self, node):
218
        """
219
        Add a UA variable element to the XML etree
220
        """
221
        var_el = self._add_node_common("UAVariable", node)
222
        self.add_variable_common(node, var_el)
223
224
        accesslevel = node.get_attribute(ua.AttributeIds.AccessLevel).Value.Value
225
        useraccesslevel = node.get_attribute(ua.AttributeIds.UserAccessLevel).Value.Value
226
227
        # We only write these values if they are different from defaults
228
        # Not sure where default is defined....
229
        if accesslevel not in (0, ua.AccessLevel.CurrentRead.mask):
230
            var_el.attrib["AccessLevel"] = str(accesslevel)
231
        if useraccesslevel not in (0, ua.AccessLevel.CurrentRead.mask):
232
            var_el.attrib["UserAccessLevel"] = str(useraccesslevel)
233
234
        var = node.get_attribute(ua.AttributeIds.MinimumSamplingInterval)
235
        if var.Value.Value:
236
            var_el.attrib["MinimumSamplingInterval"] = str(var.Value.Value)
237
        var = node.get_attribute(ua.AttributeIds.Historizing)
238
        if var.Value.Value:
239
            var_el.attrib["Historizing"] = 'true'
240
        self._add_ref_els(var_el, node)
241
242
    def add_etree_variable_type(self, node):
243
        """
244
        Add a UA variable type element to the XML etree
245
        """
246
247
        var_el = self._add_node_common("UAVariableType", node)
248
        self.add_variable_common(node, var_el)
249
250
        abstract = node.get_attribute(ua.AttributeIds.IsAbstract)
251
        if abstract.Value.Value:
252
            var_el.attrib["IsAbstract"] = "true"
253
254
        self._add_ref_els(var_el, node)
255
256
    def add_etree_method(self, node):
257
        obj_el = self._add_node_common("UAMethod", node)
258
259
        var = node.get_attribute(ua.AttributeIds.Executable)
260
        if var.Value.Value is False:
261
            obj_el.attrib["Executable"] = "false"
262
        var = node.get_attribute(ua.AttributeIds.UserExecutable)
263
        if var.Value.Value is False:
264
            obj_el.attrib["UserExecutable"] = "false"
265
        self._add_ref_els(obj_el, node)
266
267
    def add_etree_reference_type(self, obj):
268
        obj_el = self._add_node_common("UAReferenceType", obj)
269
        var = obj.get_attribute(ua.AttributeIds.InverseName)
270
        if var is not None and var.Value.Value is not None:
271
            self._add_sub_el(obj_el, 'InverseName', var.Value.Value.Text.decode('utf-8'))
272
        self._add_ref_els(obj_el, obj)
273
274
    def add_etree_datatype(self, obj):
275
        """
276
        Add a UA data type element to the XML etree
277
        """
278
        obj_el = self._add_node_common("UADataType", obj)
279
        self._add_ref_els(obj_el, obj)
280
281
    def _add_namespace_uri_els(self, uris):
282
        nuris_el = Et.Element('NamespaceUris')
283
284
        for uri in uris:
285
            self._add_sub_el(nuris_el, 'Uri', uri)
286
287
        self.etree.getroot().insert(0, nuris_el)
288
289
    def _add_alias_els(self):
290
        aliases_el = Et.Element('Aliases')
291
292
        for k, v in self.aliases.items():
293
            ref_el = Et.SubElement(aliases_el, 'Alias', Alias=k)
294
            ref_el.text = v
295
296
        self.etree.getroot().insert(0, aliases_el)
297
298
    def _add_ref_els(self, parent_el, obj):
299
        refs = obj.get_references()
300
        refs_el = Et.SubElement(parent_el, 'References')
301
302
        for ref in refs:
303
            if ref.ReferenceTypeId.Identifier in o_ids.ObjectIdNames:
304
                ref_name = o_ids.ObjectIdNames[ref.ReferenceTypeId.Identifier]
305
            else:
306
                ref_name = ref.ReferenceTypeId.to_string()
307
            ref_el = Et.SubElement(refs_el, 'Reference')
308
            ref_el.attrib['ReferenceType'] = ref_name
309
            if not ref.IsForward:
310
                ref_el.attrib['IsForward'] = 'false'
311
            ref_el.text = ref.NodeId.to_string()
312
313
            self.aliases[ref_name] = ref.ReferenceTypeId.to_string()
314
315
316
def member_to_etree(el, name, dtype, val):
317
    member_el = Et.SubElement(el, "uax:" + name)
318
    if isinstance(val, (list, tuple)):
319
        for v in val:
320
            _value_to_etree(member_el, ua.ObjectIdNames[dtype.Identifier], dtype, v)
321
    else:
322
        _val_to_etree(member_el, dtype, val)
323
324
325
def _val_to_etree(el, dtype, val):
326
    if val is None:
327
        val = ""
328
    if dtype == ua.NodeId(ua.ObjectIds.NodeId):
329
        id_el = Et.SubElement(el, "uax:Identifier")
330
        id_el.text = val.to_string()
331
    elif not hasattr(val, "ua_types"):
332
        if type(val) is bytes:
333
            el.text = val.decode("utf-8")
334
        else:
335
            el.text = str(val)
336
    else:
337
        for name, vtype in val.ua_types.items():
338
            member_to_etree(el, name, ua.NodeId(getattr(ua.ObjectIds, vtype)), getattr(val, name))
339
340
341
def value_to_etree(el, dtype_name, dtype, node):
342
    var = node.get_data_value().Value
343
    if var.Value is not None:
344
        val_el = Et.SubElement(el, 'Value')
345
        _value_to_etree(val_el, dtype_name, dtype, var.Value)
346
347
348
def _value_to_etree(el, type_name, dtype, val):
349
    if not val:
350
        return
351
    if isinstance(val, (list, tuple)):
352
        if dtype.Identifier > 21:  # this is an extentionObject:
353
            elname = "uax:ListOfExtensionObject"
354
        else:
355
            elname = "uax:ListOf" + type_name
356
        list_el = Et.SubElement(el, elname)
357
        for nval in val:
358
            _value_to_etree(list_el, type_name, dtype, nval)
359
    else:
360
        if dtype.Identifier > 21:  # this is an extentionObject:
361
            _extobj_to_etree(el, type_name, dtype, val)
362
        else:
363
            val_el = Et.SubElement(el, "uax:" + type_name)
364
            _val_to_etree(val_el, dtype, val)
365
           
366
367
def _extobj_to_etree(val_el, name, dtype, val):
368
    obj_el = Et.SubElement(val_el, "uax:ExtensionObject")
369
    type_el = Et.SubElement(obj_el, "uax:TypeId")
370
    id_el = Et.SubElement(type_el, "uax:Identifier")
371
    id_el.text = dtype.to_string()
372
    body_el = Et.SubElement(obj_el, "uax:Body")
373
    struct_el = Et.SubElement(body_el, "uax:" + name)
374
    for name, vtype in val.ua_types.items():
375
        member_to_etree(struct_el, name, ua.NodeId(getattr(ua.ObjectIds, vtype)), getattr(val, name))
376
377
378
def indent(elem, level=0):
379
    '''
380
    copy and paste from http://effbot.org/zone/element-lib.htm#prettyprint
381
    it basically walks your tree and adds spaces and newlines so the tree is
382
    printed in a nice way
383
    '''
384
    i = "\n" + level * "  "
385
    if len(elem):
386
        if not elem.text or not elem.text.strip():
387
            elem.text = i + "  "
388
        if not elem.tail or not elem.tail.strip():
389
            elem.tail = i
390
        for elem in elem:
391
            indent(elem, level + 1)
392
        if not elem.tail or not elem.tail.strip():
393
            elem.tail = i
394
    else:
395
        if level and (not elem.tail or not elem.tail.strip()):
396
            elem.tail = i
397