XmlExporter._add_ref_els()   A
last analyzed

Complexity

Conditions 4

Size

Total Lines 15
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 4
eloc 13
nop 3
dl 0
loc 15
rs 9.75
c 0
b 0
f 0
1
"""
2
from a list of nodes in the address space, build an XML file
3
format is the one from opc-ua specification
4
"""
5
import logging
6
import functools
7
from collections import OrderedDict
8
import xml.etree.ElementTree as Et
9
from copy import copy
10
import base64
11
12
from asyncua import ua
13
from ..ua import object_ids as o_ids
14
from .ua_utils import get_base_data_type
15
from asyncua.ua.uaerrors import UaError
16
17
18
class XmlExporter:
19
    """
20
    If it is required that for _extobj_to_etree members to the value should be written in a certain
21
    order it can be added to the dictionary below.
22
    """
23
    extobj_ordered_elements = {
24
        ua.NodeId(ua.ObjectIds.Argument): [
25
            'Name',
26
            'DataType',
27
            'ValueRank',
28
            'ArrayDimensions',
29
            'Description'
30
        ]
31
    }
32
33
    def __init__(self, server):
34
        self.logger = logging.getLogger(__name__)
35
        self.server = server
36
        self.aliases = {}
37
        self._addr_idx_to_xml_idx = {}
38
39
        node_write_attributes = OrderedDict()
40
        node_write_attributes['xmlns:xsi'] = 'http://www.w3.org/2001/XMLSchema-instance'
41
        node_write_attributes['xmlns:uax'] = 'http://opcfoundation.org/UA/2008/02/Types.xsd'
42
        node_write_attributes['xmlns:xsd'] = 'http://www.w3.org/2001/XMLSchema'
43
        node_write_attributes['xmlns'] = 'http://opcfoundation.org/UA/2011/03/UANodeSet.xsd'
44
45
        self.etree = Et.ElementTree(Et.Element('UANodeSet', node_write_attributes))
46
47
    async def build_etree(self, node_list, uris=None):
48
        """
49
        Create an XML etree object from a list of nodes; custom namespace uris are optional
50
        Namespaces used by nodes are always exported for consistency.
51
        Args:
52
            node_list: list of Node objects for export
53
            uris: list of namespace uri strings
54
55
        Returns:
56
        """
57
        self.logger.info('Building XML etree')
58
        await self._add_namespaces(node_list, uris)
59
        # add all nodes in the list to the XML etree
60
        for node in node_list:
61
            await self.node_to_etree(node)
62
        # add aliases to the XML etree
63
        self._add_alias_els()
64
65
    async def _add_namespaces(self, nodes, uris):
66
        idxs = await self._get_ns_idxs_of_nodes(nodes)
67
        ns_array = await self.server.get_namespace_array()
68
        # now add index of provided uris if necessary
69
        if uris:
70
            self._add_idxs_from_uris(idxs, uris, ns_array)
71
        # now create a dict of idx_in_address_space to idx_in_exported_file
72
        self._addr_idx_to_xml_idx = self._make_idx_dict(idxs, ns_array)
73
        ns_to_export = [ns_array[i] for i in sorted(list(self._addr_idx_to_xml_idx.keys())) if i != 0]
74
        # write namespaces to xml
75
        self._add_namespace_uri_els(ns_to_export)
76
77
    def _make_idx_dict(self, idxs, ns_array):
78
        idxs.sort()
79
        addr_idx_to_xml_idx = {0: 0}
80
        for xml_idx, addr_idx in enumerate(idxs):
81
            if addr_idx >= len(ns_array):
82
                break
83
            addr_idx_to_xml_idx[addr_idx] = xml_idx + 1
84
        return addr_idx_to_xml_idx
85
86
    async def _get_ns_idxs_of_nodes(self, nodes):
87
        """
88
        get a list of all indexes used or references by nodes
89
        """
90
        idxs = []
91
        for node in nodes:
92
            node_idxs = [node.nodeid.NamespaceIndex]
93
            try:
94
                node_idxs.append((await node.read_browse_name()).NamespaceIndex)
95
            except UaError:
96
                self.logger.exception("Error retrieving browse name of node %s", node)
97
                raise
98
99
            node_idxs.extend(ref.NodeId.NamespaceIndex for ref in await node.get_references())
100
            node_idxs = list(set(node_idxs))  # remove duplicates
101
            for i in node_idxs:
102
                if i != 0 and i not in idxs:
103
                    idxs.append(i)
104
        return idxs
105
106
    def _add_idxs_from_uris(self, idxs, uris, ns_array):
107
        for uri in uris:
108
            if uri in ns_array:
109
                i = ns_array.index(uri)
110
                if i not in idxs:
111
                    idxs.append(i)
112
113
    async def write_xml(self, xmlpath, pretty=True):
114
        """
115
        Write the XML etree in the exporter object to a file
116
        Args:
117
            xmlpath: string representing the path/file name
118
            pretty: add spaces and newlines, to be more readable
119
        Returns:
120
        """
121
        # try to write the XML etree to a file
122
        self.logger.info('Exporting XML file to %s', xmlpath)
123
        if pretty:
124
            indent(self.etree.getroot())
125
        func = functools.partial(self.etree.write, xmlpath, encoding='utf-8', xml_declaration=True)
126
        await self.server.loop.run_in_executor(None, func)
127
128
    def dump_etree(self):
129
        """
130
        Dump etree to console for debugging
131
        Returns:
132
        """
133
        self.logger.info('Dumping XML etree to console')
134
        Et.dump(self.etree)
135
136
    async def node_to_etree(self, node):
137
        """
138
        Add the necessary XML sub elements to the etree for exporting the node
139
        Args:
140
            node: Node object which will be added to XML etree
141
142
        Returns:
143
        """
144
        node_class = await node.read_node_class()
145
146
        if node_class is ua.NodeClass.Object:
147
            await self.add_etree_object(node)
148
        elif node_class is ua.NodeClass.ObjectType:
149
            await self.add_etree_object_type(node)
150
        elif node_class is ua.NodeClass.Variable:
151
            await self.add_etree_variable(node)
152
        elif node_class is ua.NodeClass.VariableType:
153
            await self.add_etree_variable_type(node)
154
        elif node_class is ua.NodeClass.ReferenceType:
155
            await self.add_etree_reference_type(node)
156
        elif node_class is ua.NodeClass.DataType:
157
            await self.add_etree_datatype(node)
158
        elif node_class is ua.NodeClass.Method:
159
            await self.add_etree_method(node)
160
        else:
161
            self.logger.info("Exporting node class not implemented: %s ", node_class)
162
163
    def _add_sub_el(self, el, name, text):
164
        child_el = Et.SubElement(el, name)
165
        child_el.text = text
166
        return child_el
167
168
    def _node_to_string(self, nodeid):
169
        if not isinstance(nodeid, ua.NodeId):
170
            nodeid = nodeid.nodeid
171
172
        if nodeid.NamespaceIndex in self._addr_idx_to_xml_idx:
173
            nodeid = copy(nodeid)
174
            nodeid.NamespaceIndex = self._addr_idx_to_xml_idx[nodeid.NamespaceIndex]
175
        return nodeid.to_string()
176
177
    def _bname_to_string(self, bname):
178
        if bname.NamespaceIndex in self._addr_idx_to_xml_idx:
179
            bname = copy(bname)
180
            bname.NamespaceIndex = self._addr_idx_to_xml_idx[bname.NamespaceIndex]
181
        return bname.to_string()
182
183
    async def _add_node_common(self, nodetype, node):
184
        browsename = await node.read_browse_name()
185
        nodeid = node.nodeid
186
        parent = await node.get_parent()
187
        displayname = (await node.read_display_name()).Text
188
        desc = await node.read_description()
189
        if desc:
190
            desc = desc.Text
191
        node_el = Et.SubElement(self.etree.getroot(), nodetype)
192
        node_el.attrib["NodeId"] = self._node_to_string(nodeid)
193
        node_el.attrib["BrowseName"] = self._bname_to_string(browsename)
194
        if parent is not None:
195
            node_class = await node.read_node_class()
196
            if node_class in (ua.NodeClass.Object, ua.NodeClass.Variable, ua.NodeClass.Method):
197
                node_el.attrib["ParentNodeId"] = self._node_to_string(parent)
198
        self._add_sub_el(node_el, 'DisplayName', displayname)
199
        if desc not in (None, ""):
200
            self._add_sub_el(node_el, 'Description', desc)
201
        # FIXME: add WriteMask and UserWriteMask
202
        return node_el
203
204
    async def add_etree_object(self, node):
205
        """
206
        Add a UA object element to the XML etree
207
        """
208
        obj_el = await self._add_node_common("UAObject", node)
209
        var = await node.read_attribute(ua.AttributeIds.EventNotifier)
210
        if var.Value.Value != 0:
211
            obj_el.attrib["EventNotifier"] = str(var.Value.Value)
212
        await self._add_ref_els(obj_el, node)
213
214
    async def add_etree_object_type(self, node):
215
        """
216
        Add a UA object type element to the XML etree
217
        """
218
        obj_el = await self._add_node_common("UAObjectType", node)
219
        abstract = (await node.read_attribute(ua.AttributeIds.IsAbstract)).Value.Value
220
        if abstract:
221
            obj_el.attrib["IsAbstract"] = 'true'
222
        await self._add_ref_els(obj_el, node)
223
224
    async def add_variable_common(self, node, el):
225
        dtype = await node.read_data_type()
226
        if dtype.NamespaceIndex == 0 and dtype.Identifier in o_ids.ObjectIdNames:
227
            dtype_name = o_ids.ObjectIdNames[dtype.Identifier]
228
            self.aliases[dtype] = dtype_name
229
        else:
230
            dtype_name = self._node_to_string(dtype)
231
        rank = await node.read_value_rank()
232
        if rank != -1:
233
            el.attrib["ValueRank"] = str(int(rank))
234
        dim = await node.read_attribute(ua.AttributeIds.ArrayDimensions)
235
        if dim.Value.Value:
236
            el.attrib["ArrayDimensions"] = ",".join([str(i) for i in dim.Value.Value])
237
        el.attrib["DataType"] = dtype_name
238
        await self.value_to_etree(el, dtype_name, dtype, node)
239
240
    async def add_etree_variable(self, node):
241
        """
242
        Add a UA variable element to the XML etree
243
        """
244
        var_el = await self._add_node_common("UAVariable", node)
245
        await self._add_ref_els(var_el, node)
246
        await self.add_variable_common(node, var_el)
247
248
        accesslevel = (await node.read_attribute(ua.AttributeIds.AccessLevel)).Value.Value
249
        useraccesslevel = (await node.read_attribute(ua.AttributeIds.UserAccessLevel)).Value.Value
250
251
        # We only write these values if they are different from defaults
252
        # Not sure where default is defined....
253
        if accesslevel not in (0, ua.AccessLevel.CurrentRead.mask):
254
            var_el.attrib["AccessLevel"] = str(accesslevel)
255
        if useraccesslevel not in (0, ua.AccessLevel.CurrentRead.mask):
256
            var_el.attrib["UserAccessLevel"] = str(useraccesslevel)
257
258
        var = await node.read_attribute(ua.AttributeIds.MinimumSamplingInterval)
259
        if var.Value.Value:
260
            var_el.attrib["MinimumSamplingInterval"] = str(var.Value.Value)
261
        var = await node.read_attribute(ua.AttributeIds.Historizing)
262
        if var.Value.Value:
263
            var_el.attrib["Historizing"] = 'true'
264
265
    async def add_etree_variable_type(self, node):
266
        """
267
        Add a UA variable type element to the XML etree
268
        """
269
        var_el = await self._add_node_common("UAVariableType", node)
270
        await self.add_variable_common(node, var_el)
271
        abstract = await node.read_attribute(ua.AttributeIds.IsAbstract)
272
        if abstract.Value.Value:
273
            var_el.attrib["IsAbstract"] = "true"
274
        await self._add_ref_els(var_el, node)
275
276
    async def add_etree_method(self, node):
277
        obj_el = await self._add_node_common("UAMethod", node)
278
        var = await node.read_attribute(ua.AttributeIds.Executable)
279
        if var.Value.Value is False:
280
            obj_el.attrib["Executable"] = "false"
281
        var = await node.read_attribute(ua.AttributeIds.UserExecutable)
282
        if var.Value.Value is False:
283
            obj_el.attrib["UserExecutable"] = "false"
284
        await self._add_ref_els(obj_el, node)
285
286
    async def add_etree_reference_type(self, obj):
287
        obj_el = await self._add_node_common("UAReferenceType", obj)
288
        await self._add_ref_els(obj_el, obj)
289
        var = await obj.read_attribute(ua.AttributeIds.InverseName)
290
        if var is not None and var.Value.Value is not None and var.Value.Value.Text is not None:
291
            self._add_sub_el(obj_el, 'InverseName', var.Value.Value.Text)
292
293
    async def add_etree_datatype(self, obj):
294
        """
295
        Add a UA data type element to the XML etree
296
        """
297
        obj_el = await self._add_node_common("UADataType", obj)
298
        await self._add_ref_els(obj_el, obj)
299
300
    def _add_namespace_uri_els(self, uris):
301
        nuris_el = Et.Element('NamespaceUris')
302
        for uri in uris:
303
            self._add_sub_el(nuris_el, 'Uri', uri)
304
        self.etree.getroot().insert(0, nuris_el)
305
306
    def _add_alias_els(self):
307
        aliases_el = Et.Element('Aliases')
308
        ordered_keys = list(self.aliases.keys())
309
        ordered_keys.sort()
310
        for nodeid in ordered_keys:
311
            name = self.aliases[nodeid]
312
            ref_el = Et.SubElement(aliases_el, 'Alias', Alias=name)
313
            ref_el.text = nodeid.to_string()
314
        # insert behind the namespace element
315
        self.etree.getroot().insert(1, aliases_el)
316
317
    async def _add_ref_els(self, parent_el, obj):
318
        refs = await obj.get_references()
319
        refs_el = Et.SubElement(parent_el, 'References')
320
        for ref in refs:
321
            if ref.ReferenceTypeId.Identifier in o_ids.ObjectIdNames:
322
                ref_name = o_ids.ObjectIdNames[ref.ReferenceTypeId.Identifier]
323
            else:
324
                ref_name = ref.ReferenceTypeId.to_string()
325
            ref_el = Et.SubElement(refs_el, 'Reference')
326
            ref_el.attrib['ReferenceType'] = ref_name
327
            if not ref.IsForward:
328
                ref_el.attrib['IsForward'] = 'false'
329
            ref_el.text = self._node_to_string(ref.NodeId)
330
331
            self.aliases[ref.ReferenceTypeId] = ref_name
332
333
    async def member_to_etree(self, el, name, dtype, val):
334
        member_el = Et.SubElement(el, "uax:" + name)
335
        if isinstance(val, (list, tuple)):
336
            for v in val:
337
                await self._value_to_etree(member_el, ua.ObjectIdNames[dtype.Identifier], dtype, v)
338
        else:
339
            await self._val_to_etree(member_el, dtype, val)
340
341
    async def _val_to_etree(self, el, dtype, val):
342
        if dtype == ua.NodeId(ua.ObjectIds.NodeId):
343
            id_el = Et.SubElement(el, "uax:Identifier")
344
            id_el.text = val.to_string()
345
        elif dtype == ua.NodeId(ua.ObjectIds.Guid):
346
            id_el = Et.SubElement(el, "uax:String")
347
            id_el.text = str(val)
348
        elif dtype == ua.NodeId(ua.ObjectIds.Boolean):
349
            el.text = 'true' if val else 'false'
350
        elif dtype == ua.NodeId(ua.ObjectIds.ByteString):
351
            if val is None:
352
                val = b""
353
            data = base64.b64encode(val)
354
            el.text = data.decode("utf-8")
355
        elif not hasattr(val, "ua_types"):
356
            if isinstance(val, bytes):
357
                # FIXME: should we also encode this (localized text I guess) using base64??
358
                el.text = val.decode("utf-8")
359
            else:
360
                if val is not None:
361
                    el.text = str(val)
362
        else:
363
            for name, vtype in val.ua_types:
364
                await self.member_to_etree(el, name, ua.NodeId(getattr(ua.ObjectIds, vtype)), getattr(val, name))
365
366
    async def value_to_etree(self, el, dtype_name, dtype, node):
367
        var = (await node.read_data_value()).Value
368
        if var.Value is not None:
369
            val_el = Et.SubElement(el, 'Value')
370
            await self._value_to_etree(val_el, dtype_name, dtype, var.Value)
371
372
    async def _value_to_etree(self, el, type_name, dtype, val):
373
        if val is None:
374
            return
375
376
        if isinstance(val, (list, tuple)):
377
            if dtype.NamespaceIndex == 0 and dtype.Identifier <= 21:
378
                elname = "uax:ListOf" + type_name
379
            else:  # this is an extentionObject:
380
                elname = "uax:ListOfExtensionObject"
381
382
            list_el = Et.SubElement(el, elname)
383
            for nval in val:
384
                await self._value_to_etree(list_el, type_name, dtype, nval)
385
        else:
386
            dtype_base = await get_base_data_type(self.server.get_node(dtype))
387
            dtype_base = dtype_base.nodeid
388
389
            if dtype_base == ua.NodeId(ua.ObjectIds.Enumeration):
390
                dtype_base = ua.NodeId(ua.ObjectIds.Int32)
391
                type_name = ua.ObjectIdNames[dtype_base.Identifier]
392
393
            if dtype_base.NamespaceIndex == 0 and dtype_base.Identifier <= 21:
394
                type_name = ua.ObjectIdNames[dtype_base.Identifier]
395
                val_el = Et.SubElement(el, "uax:" + type_name)
396
                await self._val_to_etree(val_el, dtype_base, val)
397
            else:
398
                await self._extobj_to_etree(el, type_name, dtype, val)
399
400
    async def _extobj_to_etree(self, val_el, name, dtype, val):
401
        obj_el = Et.SubElement(val_el, "uax:ExtensionObject")
402
        type_el = Et.SubElement(obj_el, "uax:TypeId")
403
        id_el = Et.SubElement(type_el, "uax:Identifier")
404
        id_el.text = dtype.to_string()
405
        body_el = Et.SubElement(obj_el, "uax:Body")
406
        struct_el = Et.SubElement(body_el, "uax:" + name)
407
        for name, vtype in val.ua_types:
408
            # FIXME; what happend if we have a custom type which is not part of ObjectIds???
409
            if vtype.startswith("ListOf"):
410
                vtype = vtype[6:]
411
            await self.member_to_etree(struct_el, name, ua.NodeId(getattr(ua.ObjectIds, vtype)), getattr(val, name))
412
            # self.member_to_etree(struct_el, name, extension_object_typeids[vtype], getattr(val, name))
413
        # for name in self._get_member_order(dtype, val):
414
        # self.member_to_etree(struct_el, name, ua.NodeId(getattr(ua.ObjectIds, val.ua_types[name])), getattr(val, name))
415
416
    def _get_member_order(self, dtype, val):
417
        """
418
        If an dtype has an entry in XmlExporter.extobj_ordered_elements return the export order of the elements
419
        else return the unordered members.
420
        """
421
        if dtype not in XmlExporter.extobj_ordered_elements.keys():
422
            return val.ua_types.keys()
423
        else:
424
            member_keys = [name for name in XmlExporter.extobj_ordered_elements[dtype] if
425
                           name in val.ua_types.keys() and getattr(val, name) is not None]
426
427
        return member_keys
428
429
430 View Code Duplication
def indent(elem, level=0):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
431
    """
432
    copy and paste from http://effbot.org/zone/element-lib.htm#prettyprint
433
    it basically walks your tree and adds spaces and newlines so the tree is
434
    printed in a nice way
435
    """
436
    i = "\n" + level * "  "
437
    if len(elem):
438
        if not elem.text or not elem.text.strip():
439
            elem.text = i + "  "
440
        if not elem.tail or not elem.tail.strip():
441
            elem.tail = i
442
        for elem in elem:
443
            indent(elem, level + 1)
444
        if not elem.tail or not elem.tail.strip():
445
            elem.tail = i
446
    else:
447
        if level and (not elem.tail or not elem.tail.strip()):
448
            elem.tail = i
449