Test Failed
Pull Request — master (#490)
by Olivier
03:08
created

XmlExporter._val_to_etree()   F

Complexity

Conditions 11

Size

Total Lines 24

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 132

Importance

Changes 2
Bugs 1 Features 0
Metric Value
cc 11
c 2
b 1
f 0
dl 0
loc 24
ccs 0
cts 21
cp 0
crap 132
rs 3.3409

How to fix   Complexity   

Complexity

Complex classes like XmlExporter._val_to_etree() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
from a list of nodes in the address space, build an XML file
3
format is the one from opc-ua specification
4
"""
5
import logging
6
from collections import OrderedDict
7
import xml.etree.ElementTree as Et
8
from copy import copy
9
import base64
10
11
from opcua import ua
12
from opcua.ua import object_ids as o_ids
13
from opcua.common.ua_utils import get_base_data_type
14
15
16
class XmlExporter(object):
17
18
    ''' If it is required that for _extobj_to_etree members to the value should be written in a certain
19
        order it can be added to the dictionary below.    
20
    '''
21
    extobj_ordered_elements = {
22
        ua.NodeId(ua.ObjectIds.Argument) : ['Name',
23
                                            'DataType',
24
                                            'ValueRank',
25
                                            'ArrayDimensions',
26
                                            'Description']
27
        }
28
29
    def __init__(self, server):
30
        self.logger = logging.getLogger(__name__)
31
        self.server = server
32
        self.aliases = {}
33
        self._addr_idx_to_xml_idx = {}
34
35
        node_set_attributes = OrderedDict()
36
        node_set_attributes['xmlns:xsi'] = 'http://www.w3.org/2001/XMLSchema-instance'
37
        node_set_attributes['xmlns:uax'] = 'http://opcfoundation.org/UA/2008/02/Types.xsd'
38
        node_set_attributes['xmlns:xsd'] = 'http://www.w3.org/2001/XMLSchema'
39
        node_set_attributes['xmlns'] = 'http://opcfoundation.org/UA/2011/03/UANodeSet.xsd'
40
41
        self.etree = Et.ElementTree(Et.Element('UANodeSet', node_set_attributes))
42
43
    def build_etree(self, node_list, uris=None):
44
        """
45
        Create an XML etree object from a list of nodes; custom namespace uris are optional
46
        Namespaces used by nodes are always exported for consistency.
47
        Args:
48
            node_list: list of Node objects for export
49
            uris: list of namespace uri strings
50
51
        Returns:
52
        """
53
        self.logger.info('Building XML etree')
54
55
        self._add_namespaces(node_list, uris)
56
57
        # add all nodes in the list to the XML etree
58
        for node in node_list:
59
            self.node_to_etree(node)
60
61
        # add aliases to the XML etree
62
        self._add_alias_els()
63
64
    def _add_namespaces(self, nodes, uris):
65
        idxs = self._get_ns_idxs_of_nodes(nodes)
66
67
        ns_array = self.server.get_namespace_array()
68
69
        # now add index of provided uris if necessary
70
        if uris:
71
            self._add_idxs_from_uris(idxs, uris, ns_array)
72
73
        # now create a dict of idx_in_address_space to idx_in_exported_file
74
        self._addr_idx_to_xml_idx = self._make_idx_dict(idxs, ns_array)
75
        ns_to_export = [ns_array[i] for i in sorted(list(self._addr_idx_to_xml_idx.keys())) if i != 0]
76
77
        # write namespaces to xml
78
        self._add_namespace_uri_els(ns_to_export)
79
80
    def _make_idx_dict(self, idxs, ns_array):
81
        idxs.sort()
82
        addr_idx_to_xml_idx = {0: 0}
83
        for xml_idx, addr_idx in enumerate(idxs):
84
            if addr_idx >= len(ns_array):
85
                break
86
            addr_idx_to_xml_idx[addr_idx] = xml_idx + 1
87
        return addr_idx_to_xml_idx
88
89
    def _get_ns_idxs_of_nodes(self, nodes):
90
        """
91
        get a list of all indexes used or references by nodes
92
        """
93
        idxs = []
94
        for node in nodes:
95
            node_idxs = [node.nodeid.NamespaceIndex]
96
            node_idxs.append(node.get_browse_name().NamespaceIndex)
97
            node_idxs.extend(ref.NodeId.NamespaceIndex for ref in node.get_references())
98
            node_idxs = list(set(node_idxs))  # remove duplicates
99
            for i in node_idxs:
100
                if i != 0 and i not in idxs:
101
                    idxs.append(i)
102
        return idxs
103
104
    def _add_idxs_from_uris(self, idxs, uris, ns_array):
105
        for uri in uris:
106
            if uri in ns_array:
107
                i = ns_array.index(uri)
108
                if i not in idxs:
109
                    idxs.append(i)
110
111
112
    def write_xml(self, xmlpath, pretty=True):
113
        """
114
        Write the XML etree in the exporter object to a file
115
        Args:
116
            xmlpath: string representing the path/file name
117
118
        Returns:
119
        """
120
        # try to write the XML etree to a file
121
        self.logger.info('Exporting XML file to %s', xmlpath)
122
        # from IPython import embed
123
        # embed()
124
        if pretty:
125
            self.indent(self.etree.getroot())
126
            self.etree.write(xmlpath,
127
                             encoding='utf-8',
128
                             xml_declaration=True
129
                            )
130
        else:
131
            self.etree.write(xmlpath,
132
                             encoding='utf-8',
133
                             xml_declaration=True
134
                            )
135
136
    def dump_etree(self):
137
        """
138
        Dump etree to console for debugging
139
        Returns:
140
        """
141
        self.logger.info('Dumping XML etree to console')
142
        Et.dump(self.etree)
143
144
    def node_to_etree(self, node):
145
        """
146
        Add the necessary XML sub elements to the etree for exporting the node
147
        Args:
148
            node: Node object which will be added to XML etree
149
150
        Returns:
151
        """
152
        node_class = node.get_node_class()
153
154
        if node_class is ua.NodeClass.Object:
155
            self.add_etree_object(node)
156
        elif node_class is ua.NodeClass.ObjectType:
157
            self.add_etree_object_type(node)
158
        elif node_class is ua.NodeClass.Variable:
159
            self.add_etree_variable(node)
160
        elif node_class is ua.NodeClass.VariableType:
161
            self.add_etree_variable_type(node)
162
        elif node_class is ua.NodeClass.ReferenceType:
163
            self.add_etree_reference_type(node)
164
        elif node_class is ua.NodeClass.DataType:
165
            self.add_etree_datatype(node)
166
        elif node_class is ua.NodeClass.Method:
167
            self.add_etree_method(node)
168
        else:
169
            self.logger.info("Exporting node class not implemented: %s ", node_class)
170
171
    def _add_sub_el(self, el, name, text):
172
        child_el = Et.SubElement(el, name)
173
        child_el.text = text
174
        return child_el
175
176
    def _node_to_string(self, nodeid):
177
        if not isinstance(nodeid, ua.NodeId):
178
            nodeid = nodeid.nodeid
179
180
        if nodeid.NamespaceIndex in self._addr_idx_to_xml_idx:
181
            nodeid = copy(nodeid)
182
            nodeid.NamespaceIndex = self._addr_idx_to_xml_idx[nodeid.NamespaceIndex]
183
        return nodeid.to_string()
184
185
    def _bname_to_string(self, bname):
186
        if bname.NamespaceIndex in self._addr_idx_to_xml_idx:
187
            bname = copy(bname)
188
            bname.NamespaceIndex = self._addr_idx_to_xml_idx[bname.NamespaceIndex]
189
        return bname.to_string()
190
191
    def _add_node_common(self, nodetype, node):
192
        browsename = node.get_browse_name()
193
        nodeid = node.nodeid
194
        parent = node.get_parent()
195
        displayname = node.get_display_name().Text
196
        desc = node.get_description().Text
197
        node_el = Et.SubElement(self.etree.getroot(), nodetype)
198
        node_el.attrib["NodeId"] = self._node_to_string(nodeid)
199
        node_el.attrib["BrowseName"] = self._bname_to_string(browsename)
200
        if parent is not None:
201
            node_class = node.get_node_class()
202
            if node_class in (ua.NodeClass.Object, ua.NodeClass.Variable, ua.NodeClass.Method):
203
                node_el.attrib["ParentNodeId"] = self._node_to_string(parent)
204
        self._add_sub_el(node_el, 'DisplayName', displayname)
205
        if desc not in (None, ""):
206
            self._add_sub_el(node_el, 'Description', desc)
207
        # FIXME: add WriteMask and UserWriteMask
208
        return node_el
209
210
    def add_etree_object(self, node):
211
        """
212
        Add a UA object element to the XML etree
213
        """
214
        obj_el = self._add_node_common("UAObject", node)
215
        var = node.get_attribute(ua.AttributeIds.EventNotifier)
216
        if var.Value.Value != 0:
217
            obj_el.attrib["EventNotifier"] = str(var.Value.Value)
218
        self._add_ref_els(obj_el, node)
219
220
    def add_etree_object_type(self, node):
221
        """
222
        Add a UA object type element to the XML etree
223
        """
224
        obj_el = self._add_node_common("UAObjectType", node)
225
        abstract = node.get_attribute(ua.AttributeIds.IsAbstract).Value.Value
226
        if abstract:
227
            obj_el.attrib["IsAbstract"] = 'true'
228
        self._add_ref_els(obj_el, node)
229
230
    def add_variable_common(self, node, el):
231
        dtype = node.get_data_type()
232
        if dtype.NamespaceIndex == 0 and dtype.Identifier in o_ids.ObjectIdNames:
233
            dtype_name = o_ids.ObjectIdNames[dtype.Identifier]
234
            self.aliases[dtype] = dtype_name
235
        else:
236
            dtype_name = dtype.to_string()
237
        rank = node.get_value_rank()
238
        if rank != -1:
239
            el.attrib["ValueRank"] = str(int(rank))
240
        dim = node.get_attribute(ua.AttributeIds.ArrayDimensions)
241
        if dim.Value.Value:
242
            el.attrib["ArrayDimensions"] = ",".join([str(i) for i in dim.Value.Value])
243
        el.attrib["DataType"] = dtype_name
244
        self.value_to_etree(el, dtype_name, dtype, node)
245
246
    def add_etree_variable(self, node):
247
        """
248
        Add a UA variable element to the XML etree
249
        """
250
        var_el = self._add_node_common("UAVariable", node)
251
        self._add_ref_els(var_el, node)
252
        self.add_variable_common(node, var_el)
253
254
        accesslevel = node.get_attribute(ua.AttributeIds.AccessLevel).Value.Value
255
        useraccesslevel = node.get_attribute(ua.AttributeIds.UserAccessLevel).Value.Value
256
257
        # We only write these values if they are different from defaults
258
        # Not sure where default is defined....
259
        if accesslevel not in (0, ua.AccessLevel.CurrentRead.mask):
260
            var_el.attrib["AccessLevel"] = str(accesslevel)
261
        if useraccesslevel not in (0, ua.AccessLevel.CurrentRead.mask):
262
            var_el.attrib["UserAccessLevel"] = str(useraccesslevel)
263
264
        var = node.get_attribute(ua.AttributeIds.MinimumSamplingInterval)
265
        if var.Value.Value:
266
            var_el.attrib["MinimumSamplingInterval"] = str(var.Value.Value)
267
        var = node.get_attribute(ua.AttributeIds.Historizing)
268
        if var.Value.Value:
269
            var_el.attrib["Historizing"] = 'true'
270
271
    def add_etree_variable_type(self, node):
272
        """
273
        Add a UA variable type element to the XML etree
274
        """
275
276
        var_el = self._add_node_common("UAVariableType", node)
277
        self.add_variable_common(node, var_el)
278
279
        abstract = node.get_attribute(ua.AttributeIds.IsAbstract)
280
        if abstract.Value.Value:
281
            var_el.attrib["IsAbstract"] = "true"
282
283
        self._add_ref_els(var_el, node)
284
285
    def add_etree_method(self, node):
286
        obj_el = self._add_node_common("UAMethod", node)
287
288
        var = node.get_attribute(ua.AttributeIds.Executable)
289
        if var.Value.Value is False:
290
            obj_el.attrib["Executable"] = "false"
291
        var = node.get_attribute(ua.AttributeIds.UserExecutable)
292
        if var.Value.Value is False:
293
            obj_el.attrib["UserExecutable"] = "false"
294
        self._add_ref_els(obj_el, node)
295
296
    def add_etree_reference_type(self, obj):
297
        obj_el = self._add_node_common("UAReferenceType", obj)
298
        self._add_ref_els(obj_el, obj)
299
        var = obj.get_attribute(ua.AttributeIds.InverseName)
300
        if var is not None and var.Value.Value is not None and var.Value.Value.Text is not None:
301
            self._add_sub_el(obj_el, 'InverseName', var.Value.Value.Text)
302
303
    def add_etree_datatype(self, obj):
304
        """
305
        Add a UA data type element to the XML etree
306
        """
307
        obj_el = self._add_node_common("UADataType", obj)
308
        self._add_ref_els(obj_el, obj)
309
310
    def _add_namespace_uri_els(self, uris):
311
        nuris_el = Et.Element('NamespaceUris')
312
313
        for uri in uris:
314
            self._add_sub_el(nuris_el, 'Uri', uri)
315
316
        self.etree.getroot().insert(0, nuris_el)
317
318
    def _add_alias_els(self):
319
        aliases_el = Et.Element('Aliases')
320
321
        ordered_keys = list(self.aliases.keys())
322
        ordered_keys.sort()
323
        for nodeid in ordered_keys:
324
            name = self.aliases[nodeid]
325
            ref_el = Et.SubElement(aliases_el, 'Alias', Alias=name)
326
            ref_el.text = nodeid.to_string()
327
328
        # insert behind the namespace element
329
        self.etree.getroot().insert(1, aliases_el)
330
331
    def _add_ref_els(self, parent_el, obj):
332
        refs = obj.get_references()
333
        refs_el = Et.SubElement(parent_el, 'References')
334
335
        for ref in refs:
336
            if ref.ReferenceTypeId.Identifier in o_ids.ObjectIdNames:
337
                ref_name = o_ids.ObjectIdNames[ref.ReferenceTypeId.Identifier]
338
            else:
339
                ref_name = ref.ReferenceTypeId.to_string()
340
            ref_el = Et.SubElement(refs_el, 'Reference')
341
            ref_el.attrib['ReferenceType'] = ref_name
342
            if not ref.IsForward:
343
                ref_el.attrib['IsForward'] = 'false'
344
            ref_el.text = self._node_to_string(ref.NodeId)
345
346
            self.aliases[ref.ReferenceTypeId] = ref_name
347
348
349
    def member_to_etree(self, el, name, dtype, val):
350
        member_el = Et.SubElement(el, "uax:" + name)
351
        if isinstance(val, (list, tuple)):
352
            for v in val:
353
                self._value_to_etree(member_el, ua.ObjectIdNames[dtype.Identifier], dtype, v)
354
        else:
355
            self._val_to_etree(member_el, dtype, val)
356
357
358
    def _val_to_etree(self, el, dtype, val):
359
        if dtype == ua.NodeId(ua.ObjectIds.NodeId):
360
            id_el = Et.SubElement(el, "uax:Identifier")
361
            id_el.text = val.to_string()
362
        elif dtype == ua.NodeId(ua.ObjectIds.Guid):
363
            id_el = Et.SubElement(el, "uax:String")
364
            id_el.text = str(val)
365
        elif dtype == ua.NodeId(ua.ObjectIds.Boolean):
366
            el.text = 'true' if val else 'false'
367
        elif dtype == ua.NodeId(ua.ObjectIds.ByteString):
368
            if val is None:
369
                val = b""
370
            data = base64.b64encode(val)
371
            el.text = data.decode("utf-8")
372
        elif not hasattr(val, "ua_types"):
373
            if isinstance(val, bytes):
374
                # FIXME: should we also encode this (localized text I guess) using base64??
375
                el.text = val.decode("utf-8")
376
            else:
377
                if val is not None:
378
                    el.text = str(val)
379
        else:
380
            for name, vtype in val.ua_types.items():
381
                self.member_to_etree(el, name, ua.NodeId(getattr(ua.ObjectIds, vtype)), getattr(val, name))
382
383
384
    def value_to_etree(self, el, dtype_name, dtype, node):
385
        var = node.get_data_value().Value
386
        if var.Value is not None:
387
            val_el = Et.SubElement(el, 'Value')
388
            self._value_to_etree(val_el, dtype_name, dtype, var.Value)
389
390
391
    def _value_to_etree(self, el, type_name, dtype, val):
392
        if val is None:
393
            return
394
395
        if isinstance(val, (list, tuple)):
396
            if dtype.NamespaceIndex == 0 and dtype.Identifier <= 21:
397
                elname = "uax:ListOf" + type_name
398
            else:  # this is an extentionObject:
399
                elname = "uax:ListOfExtensionObject"
400
401
            list_el = Et.SubElement(el, elname)
402
            for nval in val:
403
                self._value_to_etree(list_el, type_name, dtype, nval)
404
        else:
405
            dtype_base = get_base_data_type(self.server.get_node(dtype))
406
            dtype_base = dtype_base.nodeid
407
408
            if dtype_base == ua.NodeId(ua.ObjectIds.Enumeration):
409
                dtype_base = ua.NodeId(ua.ObjectIds.Int32)
410
                type_name = ua.ObjectIdNames[dtype_base.Identifier]
411
412
            if dtype_base.NamespaceIndex == 0 and dtype_base.Identifier <= 21:
413
                type_name = ua.ObjectIdNames[dtype_base.Identifier]
414
                val_el = Et.SubElement(el, "uax:" + type_name)
415
                self._val_to_etree(val_el, dtype_base, val)
416
            else:
417
                self._extobj_to_etree(el, type_name, dtype, val)
418
419
420
    def _extobj_to_etree(self, val_el, name, dtype, val):
421
        obj_el = Et.SubElement(val_el, "uax:ExtensionObject")
422
        type_el = Et.SubElement(obj_el, "uax:TypeId")
423
        id_el = Et.SubElement(type_el, "uax:Identifier")
424
        id_el.text = dtype.to_string()
425
        body_el = Et.SubElement(obj_el, "uax:Body")
426
        struct_el = Et.SubElement(body_el, "uax:" + name)
427
        for name in self._get_member_order(dtype, val):
428
            self.member_to_etree(struct_el, name, ua.NodeId(getattr(ua.ObjectIds, val.ua_types[name])), getattr(val, name))
429
430
    def _get_member_order(self, dtype, val):
431
        '''
432
        If an dtype has an entry in XmlExporter.extobj_ordered_elements return the export order of the elements 
433
        else return the unordered members.
434
        '''
435
        if dtype not in XmlExporter.extobj_ordered_elements.keys():
436
            return val.ua_types.keys()
437
        else:
438
            member_keys = [name for name in XmlExporter.extobj_ordered_elements[dtype] if name in val.ua_types.keys() and getattr(val, name) is not None ]
439
440
        return member_keys
441
442
    def indent(self, elem, level=0):
443
        """
444
        copy and paste from http://effbot.org/zone/element-lib.htm#prettyprint
445
        it basically walks your tree and adds spaces and newlines so the tree is
446
        printed in a nice way
447
        """
448
        i = "\n" + level * "  "
449
        if len(elem):
450
            if not elem.text or not elem.text.strip():
451
                elem.text = i + "  "
452
            if not elem.tail or not elem.tail.strip():
453
                elem.tail = i
454
            for elem in elem:
455
                self.indent(elem, level + 1)
456
            if not elem.tail or not elem.tail.strip():
457
                elem.tail = i
458
        else:
459
            if level and (not elem.tail or not elem.tail.strip()):
460
                elem.tail = i
461