Completed
Pull Request — master (#302)
by Olivier
04:13
created

XmlExporter   B

Complexity

Total Complexity 41

Size/Duplication

Total Lines 230
Duplicated Lines 0 %

Importance

Changes 9
Bugs 1 Features 1
Metric Value
c 9
b 1
f 1
dl 0
loc 230
rs 8.2769
wmc 41

18 Methods

Rating   Name   Duplication   Size   Complexity  
A build_etree() 0 21 3
A __init__() 0 12 1
A write_xml() 0 21 3
D node_to_etree() 0 27 8
A dump_etree() 0 7 1
A add_etree_method() 0 3 1
A add_etree_variable_type() 0 13 2
A add_etree_datatype() 0 6 1
A _add_sub_el() 0 4 1
A add_etree_object_type() 0 9 2
A _add_namespace_uri_els() 0 7 2
A add_variable_common() 0 11 2
A _add_alias_els() 0 8 2
A add_etree_variable() 0 18 3
A _add_ref_els() 0 18 4
A _add_node_common() 0 17 3
A add_etree_reference() 0 3 1
A add_etree_object() 0 6 1

How to fix   Complexity   

Complex Class

Complex classes like XmlExporter often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
from a list of nodes in the address space, build an XML file
3
format is the one from opc-ua specification
4
"""
5
import logging
6
from collections import OrderedDict
7
import xml.etree.ElementTree as Et
8
from xml.dom import minidom
9
10
from opcua import ua
11
from opcua.ua import object_ids as o_ids
12
13
14
class XmlExporter(object):
15
16
    def __init__(self, server):
17
        self.logger = logging.getLogger(__name__)
18
        self.server = server
19
        self.aliases = {}
20
21
        node_set_attributes = OrderedDict()
22
        node_set_attributes['xmlns:xsi'] = 'http://www.w3.org/2001/XMLSchema-instance'
23
        node_set_attributes['xmlns:uax'] = 'http://opcfoundation.org/UA/2008/02/Types.xsd'
24
        node_set_attributes['xmlns:xsd'] = 'http://www.w3.org/2001/XMLSchema'
25
        node_set_attributes['xmlns'] = 'http://opcfoundation.org/UA/2011/03/UANodeSet.xsd'
26
27
        self.etree = Et.ElementTree(Et.Element('UANodeSet', node_set_attributes))
28
29
    def build_etree(self, node_list, uris=None):
30
        """
31
        Create an XML etree object from a list of nodes; custom namespace uris are optional
32
        Args:
33
            node_list: list of Node objects for export
34
            uris: list of namespace uri strings
35
36
        Returns:
37
        """
38
        self.logger.info('Building XML etree')
39
40
        # add all nodes in the list to the XML etree
41
        for node in node_list:
42
            self.node_to_etree(node)
43
44
        # add all required aliases to the XML etree; must be done after nodes are added
45
        self._add_alias_els()
46
47
        if uris:
48
            # add all namespace uris to the XML etree; must be done after aliases are added
49
            self._add_namespace_uri_els(uris)
50
51
    def write_xml(self, xmlpath, pretty=True):
52
        """
53
        Write the XML etree in the exporter object to a file
54
        Args:
55
            xmlpath: string representing the path/file name
56
57
        Returns:
58
        """
59
        # try to write the XML etree to a file
60
        self.logger.info('Exporting XML file to %s', xmlpath)
61
        #from IPython import embed
62
        #embed()
63
        if pretty:
64
            self.etree.write(xmlpath + "-not-pretty.xml", short_empty_elements=False)
65
            rough_string = Et.tostring(self.etree.getroot(), 'utf-8')
66
            reparsed = minidom.parseString(rough_string)
67
            pretty_string = reparsed.toprettyxml(indent="    ")
68
            with open(xmlpath, "wt") as f:
69
                f.write(pretty_string)
70
        else:
71
            self.etree.write(xmlpath, short_empty_elements=False)
72
73
    def dump_etree(self):
74
        """
75
        Dump etree to console for debugging
76
        Returns:
77
        """
78
        self.logger.info('Dumping XML etree to console')
79
        Et.dump(self.etree)
80
81
    def node_to_etree(self, node):
82
        """
83
        Add the necessary XML sub elements to the etree for exporting the node
84
        Args:
85
            node: Node object which will be added to XML etree
86
87
        Returns:
88
        """
89
        node_class = node.get_node_class()
90
        print("Exporting: ", node)
91
92
        if node_class is ua.NodeClass.Object:
93
            self.add_etree_object(node)
94
        elif node_class is ua.NodeClass.ObjectType:
95
            self.add_etree_object_type(node)
96
        elif node_class is ua.NodeClass.Variable:
97
            self.add_etree_variable(node)
98
        elif node_class is ua.NodeClass.VariableType:
99
            self.add_etree_variable_type(node)
100
        elif node_class is ua.NodeClass.ReferenceType:
101
            self.add_etree_reference(node)
102
        elif node_class is ua.NodeClass.DataType:
103
            self.add_etree_datatype(node)
104
        elif node_class is ua.NodeClass.Method:
105
            self.add_etree_method(node)
106
        else:
107
            self.logger.info("Exporting node class not implemented: %s ", node_class)
108
109
    def _add_sub_el(self, el, name, text):
110
        child_el = Et.SubElement(el, name)
111
        child_el.text = text
112
        return child_el
113
114
    def _add_node_common(self, nodetype, node):
115
        browsename = node.get_browse_name().to_string()
116
        nodeid = node.nodeid.to_string()
117
        parent = node.get_parent()
118
        displayname = node.get_display_name().Text.decode('utf-8')
119
        desc = node.get_description().Text
120
        print("NODE COMMON", node)
121
        node_el = Et.SubElement(self.etree.getroot(),
122
                                nodetype,
123
                                BrowseName=browsename,
124
                                NodeId=nodeid)
125
        if parent is not None:
126
            node_el.attrib["ParentNodeId"] = parent.nodeid.to_string()
127
        if desc not in (None, ""):
128
            self._add_sub_el(node_el, 'Description', desc.decode('utf-8'))
129
        self._add_sub_el(node_el, 'DisplayName', displayname)
130
        return node_el
131
132
    def add_etree_object(self, obj):
133
        """
134
        Add a UA object element to the XML etree
135
        """
136
        obj_el = self._add_node_common("UAObject", obj)
137
        self._add_ref_els(obj_el, obj)
138
139
    def add_etree_object_type(self, node):
140
        """
141
        Add a UA object type element to the XML etree
142
        """
143
        obj_el = self._add_node_common("UAObjectType", node)
144
        abstract = node.get_attribute(ua.AttributeIds.IsAbstract).Value.Value
145
        if abstract:
146
            obj_el.attrib["IsAbstract"] = str(abstract)
147
        self._add_ref_els(obj_el, node)
148
149
    def add_variable_common(self, node, el):
150
        dtype = node.get_data_type()
151
        if dtype.Identifier in o_ids.ObjectIdNames:
152
            dtype_name = o_ids.ObjectIdNames[dtype.Identifier]
153
            self.aliases[dtype_name] = dtype.to_string()
154
        else:
155
            dtype_name = dtype.to_string()
156
        rank = node.get_value_rank()
157
        el.attrib["DataType"] = dtype_name
158
        el.attrib["ValueRank"] = str(int(rank))
159
        value_to_etree(el, dtype_name, dtype, node)
160
161
    def add_etree_variable(self, node):
162
        """
163
        Add a UA variable element to the XML etree
164
        """
165
        var_el = self._add_node_common("UAVariable", node)
166
        self.add_variable_common(node, var_el)
167
168
        accesslevel = node.get_attribute(ua.AttributeIds.AccessLevel).Value.Value
169
        useraccesslevel = node.get_attribute(ua.AttributeIds.UserAccessLevel).Value.Value
170
171
        # We only write these values if they are different from defaults
172
        # default must of course mange the one in manage_nodes.py
173
        # and other OPC UA servers
174
        if accesslevel != ua.AccessLevel.CurrentRead.mask:
175
            var_el.attrib["AccessLevel"] = str(accesslevel)
176
        if useraccesslevel != ua.AccessLevel.CurrentRead.mask:
177
            var_el.attrib["UserAccessLevel"] = str(useraccesslevel)
178
        self._add_ref_els(var_el, node)
179
180
    def add_etree_variable_type(self, node):
181
        """
182
        Add a UA variable type element to the XML etree
183
        """
184
185
        var_el = self._add_node_common("UAVariable", node)
186
        self.add_variable_common(node, var_el)
187
188
        abstract = node.get_attribute(ua.AttributeIds.IsAbstract)
189
        if abstract:
190
            var_el.attrib["IsAbstract"] = abstract
191
192
        self._add_ref_els(var_el, node)
193
194
    def add_etree_method(self, obj):
195
        obj_el = self._add_node_common("UAMethod", obj)
196
        self._add_ref_els(obj_el, obj)
197
198
    def add_etree_reference(self, obj):
199
        obj_el = self._add_node_common("UAReference", obj)
200
        self._add_ref_els(obj_el, obj)
201
202
    def add_etree_datatype(self, obj):
203
        """
204
        Add a UA data type element to the XML etree
205
        """
206
        obj_el = self._add_node_common("UADataType", obj)
207
        self._add_ref_els(obj_el, obj)
208
209
    def _add_namespace_uri_els(self, uris):
210
        nuris_el = Et.Element('NamespaceUris')
211
212
        for uri in uris:
213
            self._add_sub_el(nuris_el, 'Uri', uri)
214
215
        self.etree.getroot().insert(0, nuris_el)
216
217
    def _add_alias_els(self):
218
        aliases_el = Et.Element('Aliases')
219
220
        for k, v in self.aliases.items():
221
            ref_el = Et.SubElement(aliases_el, 'Alias', Alias=k)
222
            ref_el.text = v
223
224
        self.etree.getroot().insert(0, aliases_el)
225
226
    def _add_ref_els(self, parent_el, obj):
227
        refs = obj.get_references()
228
        refs_el = Et.SubElement(parent_el, 'References')
229
230
        for ref in refs:
231
            if ref.ReferenceTypeId.Identifier in o_ids.ObjectIdNames:
232
                ref_name = o_ids.ObjectIdNames[ref.ReferenceTypeId.Identifier]
233
            else:
234
                ref_name = ref.ReferenceTypeId.to_string()
235
            ref_forward = str(ref.IsForward).lower()
236
            ref_nodeid = ref.NodeId.to_string()
237
            ref_el = Et.SubElement(refs_el, 'Reference', ReferenceType=ref_name)
238
            if not ref_forward:
239
                ref_el.attrib['IsForward'] = ref_forward
240
            ref_el.text = ref_nodeid
241
242
            # add any references that gets used to aliases dict; this gets handled later
243
            self.aliases[ref_name] = ref_nodeid
244
245
246
def value_to_etree(el, dtype_name, dtype, node):
247
    var = node.get_data_value().Value
248
    val_el = Et.SubElement(el, 'Value')
249
    _value_to_etree(val_el, dtype_name, dtype, var.Value)
250
251
252
def _value_to_etree(el, dtype_name, dtype, val):
253
    if isinstance(val, (list, tuple)):
254
        list_el = Et.SubElement(el, "uax:ListOf" + dtype_name)
255
        for nval in val:
256
            _value_to_etree(list_el, dtype_name, dtype, nval)
257
    else:
258
        if dtype.Identifier is int and dtype.Identifier > 21:  # this is an extentionObject:
259
            _extobj_to_etree(el, dtype_name, dtype)
260
        else:
261
            val_el = Et.SubElement(el, "uax:" + dtype_name)
262
            val_el.text = str(val)
263
264
265
def _extobj_to_etree(val_el, dtype_name, dtype, val):
266
    obj_el = Et.SubElement(val_el, "uax:ExtensionObject")
267
    type_el = Et.SubElement(obj_el, "uax:TypeId")
268
    id_el = Et.SubElement(type_el, "uax:Identifier")
269
    id_el.text = val.TypeId.to_string()
270
    body_el = Et.SubElement(obj_el, "uax:Body")
271
    struct_el = Et.SubElement(body_el, "uax:" + dtype_name)
272
    # FIXME: finish
273
    
274
275