Completed
Pull Request — master (#327)
by Olivier
03:38
created

XmlExporter.add_variable_common()   B

Complexity

Conditions 5

Size

Total Lines 15

Duplication

Lines 0
Ratio 0 %

Importance

Changes 5
Bugs 1 Features 1
Metric Value
cc 5
dl 0
loc 15
rs 8.5454
c 5
b 1
f 1
1
"""
2
from a list of nodes in the address space, build an XML file
3
format is the one from opc-ua specification
4
"""
5
import logging
6
from collections import OrderedDict
7
import xml.etree.ElementTree as Et
8
from copy import copy
9
10
from opcua import ua
11
from opcua.ua import object_ids as o_ids
12
13
14
class XmlExporter(object):
15
16
    def __init__(self, server):
17
        self.logger = logging.getLogger(__name__)
18
        self.server = server
19
        self.aliases = {}
20
        self._addr_idx_to_xml_idx = {}
21
22
        node_set_attributes = OrderedDict()
23
        node_set_attributes['xmlns:xsi'] = 'http://www.w3.org/2001/XMLSchema-instance'
24
        node_set_attributes['xmlns:uax'] = 'http://opcfoundation.org/UA/2008/02/Types.xsd'
25
        node_set_attributes['xmlns:xsd'] = 'http://www.w3.org/2001/XMLSchema'
26
        node_set_attributes['xmlns'] = 'http://opcfoundation.org/UA/2011/03/UANodeSet.xsd'
27
28
        self.etree = Et.ElementTree(Et.Element('UANodeSet', node_set_attributes))
29
30
    def build_etree(self, node_list, uris=None):
31
        """
32
        Create an XML etree object from a list of nodes; custom namespace uris are optional
33
        Namespaces used by nodes are allways exported for consistency.
34
        Args:
35
            node_list: list of Node objects for export
36
            uris: list of namespace uri strings
37
38
        Returns:
39
        """
40
        self.logger.info('Building XML etree')
41
42
        self._add_namespaces(node_list, uris)
43
44
        # add all nodes in the list to the XML etree
45
        for node in node_list:
46
            self.node_to_etree(node)
47
48
        # add aliases to the XML etree
49
        self._add_alias_els()
50
51
    def _add_namespaces(self, nodes, uris):
52
        idxs = self._get_ns_idxs_of_nodes(nodes)
53
54
        ns_array = self.server.get_namespace_array()
55
56
        # now add index of provided uris if necessary
57
        if uris:
58
            self._add_idxs_from_uris(idxs, uris, ns_array)
59
60
        # now create a dict of idx_in_address_space to idx_in_exported_file
61
        self._addr_idx_to_xml_idx = self._make_idx_dict(idxs, ns_array) 
62
        print("DICT", self._addr_idx_to_xml_idx)
63
        ns_to_export = [ns_array[i] for i in self._addr_idx_to_xml_idx.keys() if i != 0]
64
        print("NS_TO_EXPORT", ns_to_export)
65
66
        # write namespaces to xml
67
        self._add_namespace_uri_els(ns_to_export)
68
69
    def _make_idx_dict(self, idxs, ns_array):
70
        print("MAKE", idxs, ns_array)
71
        idxs.sort()
72
        addr_idx_to_xml_idx = {0: 0}
73
        for xml_idx, addr_idx in enumerate(idxs):
74
            if addr_idx >= len(ns_array):
75
                break
76
            addr_idx_to_xml_idx[addr_idx] = xml_idx + 1
77
        print(" ADDR_TO_ XML", self._addr_idx_to_xml_idx)
78
        return addr_idx_to_xml_idx
79
80
    def _get_ns_idxs_of_nodes(self, nodes):
81
        """
82
        get a list of all indexes used or references by nodes
83
        """
84
        idxs = []
85
        for node in nodes:
86
            node_idxs = [node.nodeid.NamespaceIndex]
87
            node_idxs.append(node.get_browse_name().NamespaceIndex)
88
            node_idxs.extend(ref.NodeId.NamespaceIndex for ref in node.get_references())
89
            node_idxs = list(set(node_idxs))  # remove duplicates
90
            for i in node_idxs:
91
                if i != 0 and i not in idxs:
92
                    idxs.append(i)
93
        print("IDXS", idxs)
94
        return idxs
95
96
    def _add_idxs_from_uris(self, idxs, uris, ns_array):
97
        for uri in uris:
98
            if uri in ns_array:
99
                i = ns_array.index(uri)
100
                if i not in idxs:
101
                    idxs.append(i)
102
103
104
    def write_xml(self, xmlpath, pretty=True):
105
        """
106
        Write the XML etree in the exporter object to a file
107
        Args:
108
            xmlpath: string representing the path/file name
109
110
        Returns:
111
        """
112
        # try to write the XML etree to a file
113
        self.logger.info('Exporting XML file to %s', xmlpath)
114
        #from IPython import embed
115
        # embed()
116
        if pretty:
117
            indent(self.etree.getroot())
118
            self.etree.write(xmlpath,
119
                             encoding='utf-8',
120
                             xml_declaration=True
121
                            )
122
        else:
123
            self.etree.write(xmlpath,
124
                             encoding='utf-8',
125
                             xml_declaration=True
126
                            )
127
128
    def dump_etree(self):
129
        """
130
        Dump etree to console for debugging
131
        Returns:
132
        """
133
        self.logger.info('Dumping XML etree to console')
134
        Et.dump(self.etree)
135
136
    def node_to_etree(self, node):
137
        """
138
        Add the necessary XML sub elements to the etree for exporting the node
139
        Args:
140
            node: Node object which will be added to XML etree
141
142
        Returns:
143
        """
144
        node_class = node.get_node_class()
145
        print("Exporting: ", node)
146
147
        if node_class is ua.NodeClass.Object:
148
            self.add_etree_object(node)
149
        elif node_class is ua.NodeClass.ObjectType:
150
            self.add_etree_object_type(node)
151
        elif node_class is ua.NodeClass.Variable:
152
            self.add_etree_variable(node)
153
        elif node_class is ua.NodeClass.VariableType:
154
            self.add_etree_variable_type(node)
155
        elif node_class is ua.NodeClass.ReferenceType:
156
            self.add_etree_reference_type(node)
157
        elif node_class is ua.NodeClass.DataType:
158
            self.add_etree_datatype(node)
159
        elif node_class is ua.NodeClass.Method:
160
            self.add_etree_method(node)
161
        else:
162
            self.logger.info("Exporting node class not implemented: %s ", node_class)
163
164
    def _add_sub_el(self, el, name, text):
165
        child_el = Et.SubElement(el, name)
166
        child_el.text = text
167
        return child_el
168
169
    def _node_to_string(self, nodeid):
170
        if not isinstance(nodeid, ua.NodeId):
171
            nodeid = nodeid.nodeid
172
173
        if nodeid.NamespaceIndex in self._addr_idx_to_xml_idx:
174
            nodeid = copy(nodeid)
175
            nodeid.NamespaceIndex = self._addr_idx_to_xml_idx[nodeid.NamespaceIndex]
176
        return nodeid.to_string()
177
178
    def _bname_to_string(self, bname):
179
        if bname.NamespaceIndex in self._addr_idx_to_xml_idx:
180
            bname = copy(bname)
181
            bname.NamespaceIndex = self._addr_idx_to_xml_idx[bname.NamespaceIndex]
182
        return bname.to_string()
183
184
    def _add_node_common(self, nodetype, node):
185
        browsename = node.get_browse_name()
186
        nodeid = node.nodeid
187
        parent = node.get_parent()
188
        displayname = node.get_display_name().Text.decode('utf-8')
189
        desc = node.get_description().Text
190
        print("NODE COMMON", node)
191
        node_el = Et.SubElement(self.etree.getroot(), nodetype)
192
        node_el.attrib["NodeId"] = self._node_to_string(nodeid)
193
        node_el.attrib["BrowseName"] = self._bname_to_string(browsename)
194
        if parent is not None:
195
            node_class = node.get_node_class()
196
            if node_class in (ua.NodeClass.Object, ua.NodeClass.Variable, ua.NodeClass.Method):
197
                node_el.attrib["ParentNodeId"] = self._node_to_string(parent)
198
        self._add_sub_el(node_el, 'DisplayName', displayname)
199
        if desc not in (None, ""):
200
            self._add_sub_el(node_el, 'Description', desc.decode('utf-8'))
201
        # FIXME: add WriteMask and UserWriteMask
202
        return node_el
203
204
    def add_etree_object(self, node):
205
        """
206
        Add a UA object element to the XML etree
207
        """
208
        obj_el = self._add_node_common("UAObject", node)
209
        var = node.get_attribute(ua.AttributeIds.EventNotifier)
210
        if var.Value.Value != 0:
211
            obj_el.attrib["EventNotifier"] = str(var.Value.Value)
212
        self._add_ref_els(obj_el, node)
213
214
    def add_etree_object_type(self, node):
215
        """
216
        Add a UA object type element to the XML etree
217
        """
218
        obj_el = self._add_node_common("UAObjectType", node)
219
        abstract = node.get_attribute(ua.AttributeIds.IsAbstract).Value.Value
220
        if abstract:
221
            obj_el.attrib["IsAbstract"] = 'true'
222
        self._add_ref_els(obj_el, node)
223
224
    def add_variable_common(self, node, el):
225
        dtype = node.get_data_type()
226
        if dtype.Identifier in o_ids.ObjectIdNames:
227
            dtype_name = o_ids.ObjectIdNames[dtype.Identifier]
228
            self.aliases[dtype_name] = dtype.to_string()
229
        else:
230
            dtype_name = dtype.to_string()
231
        rank = node.get_value_rank()
232
        if rank != -1:
233
            el.attrib["ValueRank"] = str(rank)
234
        dim = node.get_attribute(ua.AttributeIds.ArrayDimensions)
235
        if dim.Value.Value:
236
            el.attrib["ArrayDimensions"] = ",".join([str(i) for i in dim.Value.Value])
237
        el.attrib["DataType"] = dtype_name
238
        value_to_etree(el, dtype_name, dtype, node)
239
240
    def add_etree_variable(self, node):
241
        """
242
        Add a UA variable element to the XML etree
243
        """
244
        var_el = self._add_node_common("UAVariable", node)
245
        self.add_variable_common(node, var_el)
246
247
        accesslevel = node.get_attribute(ua.AttributeIds.AccessLevel).Value.Value
248
        useraccesslevel = node.get_attribute(ua.AttributeIds.UserAccessLevel).Value.Value
249
250
        # We only write these values if they are different from defaults
251
        # Not sure where default is defined....
252
        if accesslevel not in (0, ua.AccessLevel.CurrentRead.mask):
253
            var_el.attrib["AccessLevel"] = str(accesslevel)
254
        if useraccesslevel not in (0, ua.AccessLevel.CurrentRead.mask):
255
            var_el.attrib["UserAccessLevel"] = str(useraccesslevel)
256
257
        var = node.get_attribute(ua.AttributeIds.MinimumSamplingInterval)
258
        if var.Value.Value:
259
            var_el.attrib["MinimumSamplingInterval"] = str(var.Value.Value)
260
        var = node.get_attribute(ua.AttributeIds.Historizing)
261
        if var.Value.Value:
262
            var_el.attrib["Historizing"] = 'true'
263
        self._add_ref_els(var_el, node)
264
265
    def add_etree_variable_type(self, node):
266
        """
267
        Add a UA variable type element to the XML etree
268
        """
269
270
        var_el = self._add_node_common("UAVariableType", node)
271
        self.add_variable_common(node, var_el)
272
273
        abstract = node.get_attribute(ua.AttributeIds.IsAbstract)
274
        if abstract.Value.Value:
275
            var_el.attrib["IsAbstract"] = "true"
276
277
        self._add_ref_els(var_el, node)
278
279
    def add_etree_method(self, node):
280
        obj_el = self._add_node_common("UAMethod", node)
281
282
        var = node.get_attribute(ua.AttributeIds.Executable)
283
        if var.Value.Value is False:
284
            obj_el.attrib["Executable"] = "false"
285
        var = node.get_attribute(ua.AttributeIds.UserExecutable)
286
        if var.Value.Value is False:
287
            obj_el.attrib["UserExecutable"] = "false"
288
        self._add_ref_els(obj_el, node)
289
290
    def add_etree_reference_type(self, obj):
291
        obj_el = self._add_node_common("UAReferenceType", obj)
292
        var = obj.get_attribute(ua.AttributeIds.InverseName)
293
        if var is not None and var.Value.Value is not None:
294
            self._add_sub_el(obj_el, 'InverseName', var.Value.Value.Text.decode('utf-8'))
295
        self._add_ref_els(obj_el, obj)
296
297
    def add_etree_datatype(self, obj):
298
        """
299
        Add a UA data type element to the XML etree
300
        """
301
        obj_el = self._add_node_common("UADataType", obj)
302
        self._add_ref_els(obj_el, obj)
303
304
    def _add_namespace_uri_els(self, uris):
305
        nuris_el = Et.Element('NamespaceUris')
306
307
        for uri in uris:
308
            self._add_sub_el(nuris_el, 'Uri', uri)
309
310
        self.etree.getroot().insert(0, nuris_el)
311
312
    def _add_alias_els(self):
313
        aliases_el = Et.Element('Aliases')
314
315
        for k, v in self.aliases.items():
316
            ref_el = Et.SubElement(aliases_el, 'Alias', Alias=k)
317
            ref_el.text = v
318
319
        self.etree.getroot().insert(0, aliases_el)
320
321
    def _add_ref_els(self, parent_el, obj):
322
        refs = obj.get_references()
323
        refs_el = Et.SubElement(parent_el, 'References')
324
325
        for ref in refs:
326
            if ref.ReferenceTypeId.Identifier in o_ids.ObjectIdNames:
327
                ref_name = o_ids.ObjectIdNames[ref.ReferenceTypeId.Identifier]
328
            else:
329
                ref_name = ref.ReferenceTypeId.to_string()
330
            ref_el = Et.SubElement(refs_el, 'Reference')
331
            ref_el.attrib['ReferenceType'] = ref_name
332
            if not ref.IsForward:
333
                ref_el.attrib['IsForward'] = 'false'
334
            ref_el.text = self._node_to_string(ref.NodeId)
335
336
            self.aliases[ref_name] = ref.ReferenceTypeId.to_string()
337
338
339
def member_to_etree(el, name, dtype, val):
340
    member_el = Et.SubElement(el, "uax:" + name)
341
    if isinstance(val, (list, tuple)):
342
        for v in val:
343
            _value_to_etree(member_el, ua.ObjectIdNames[dtype.Identifier], dtype, v)
344
    else:
345
        _val_to_etree(member_el, dtype, val)
346
347
348
def _val_to_etree(el, dtype, val):
349
    if val is None:
350
        val = ""
351
    if dtype == ua.NodeId(ua.ObjectIds.NodeId):
352
        id_el = Et.SubElement(el, "uax:Identifier")
353
        id_el.text = val.to_string()
354
    elif not hasattr(val, "ua_types"):
355
        if isinstance(val, bytes):
356
            el.text = val.decode("utf-8")
357
        else:
358
            el.text = str(val)
359
    else:
360
        for name, vtype in val.ua_types.items():
361
            member_to_etree(el, name, ua.NodeId(getattr(ua.ObjectIds, vtype)), getattr(val, name))
362
363
364
def value_to_etree(el, dtype_name, dtype, node):
365
    var = node.get_data_value().Value
366
    if var.Value is not None:
367
        val_el = Et.SubElement(el, 'Value')
368
        _value_to_etree(val_el, dtype_name, dtype, var.Value)
369
370
371
def _value_to_etree(el, type_name, dtype, val):
372
    if not val:
373
        return
374
    if isinstance(val, (list, tuple)):
375
        if dtype.Identifier > 21:  # this is an extentionObject:
376
            elname = "uax:ListOfExtensionObject"
377
        else:
378
            elname = "uax:ListOf" + type_name
379
        list_el = Et.SubElement(el, elname)
380
        for nval in val:
381
            _value_to_etree(list_el, type_name, dtype, nval)
382
    else:
383
        if dtype.Identifier > 21:  # this is an extentionObject:
384
            _extobj_to_etree(el, type_name, dtype, val)
385
        else:
386
            val_el = Et.SubElement(el, "uax:" + type_name)
387
            _val_to_etree(val_el, dtype, val)
388
           
389
390
def _extobj_to_etree(val_el, name, dtype, val):
391
    obj_el = Et.SubElement(val_el, "uax:ExtensionObject")
392
    type_el = Et.SubElement(obj_el, "uax:TypeId")
393
    id_el = Et.SubElement(type_el, "uax:Identifier")
394
    id_el.text = dtype.to_string()
395
    body_el = Et.SubElement(obj_el, "uax:Body")
396
    struct_el = Et.SubElement(body_el, "uax:" + name)
397
    for name, vtype in val.ua_types.items():
398
        member_to_etree(struct_el, name, ua.NodeId(getattr(ua.ObjectIds, vtype)), getattr(val, name))
399
400
401
def indent(elem, level=0):
402
    '''
403
    copy and paste from http://effbot.org/zone/element-lib.htm#prettyprint
404
    it basically walks your tree and adds spaces and newlines so the tree is
405
    printed in a nice way
406
    '''
407
    i = "\n" + level * "  "
408
    if len(elem):
409
        if not elem.text or not elem.text.strip():
410
            elem.text = i + "  "
411
        if not elem.tail or not elem.tail.strip():
412
            elem.tail = i
413
        for elem in elem:
414
            indent(elem, level + 1)
415
        if not elem.tail or not elem.tail.strip():
416
            elem.tail = i
417
    else:
418
        if level and (not elem.tail or not elem.tail.strip()):
419
            elem.tail = i
420