Completed
Push — master ( 3f750a...cf2066 )
by Olivier
02:32
created

_val_to_etree()   B

Complexity

Conditions 5

Size

Total Lines 11

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 5
dl 0
loc 11
rs 8.5454
c 0
b 0
f 0
1
"""
2
from a list of nodes in the address space, build an XML file
3
format is the one from opc-ua specification
4
"""
5
import logging
6
from collections import OrderedDict
7
import xml.etree.ElementTree as Et
8
9
from opcua import ua
10
from opcua.ua import object_ids as o_ids
11
12
13
class XmlExporter(object):
14
15
    def __init__(self, server):
16
        self.logger = logging.getLogger(__name__)
17
        self.server = server
18
        self.aliases = {}
19
20
        node_set_attributes = OrderedDict()
21
        node_set_attributes['xmlns:xsi'] = 'http://www.w3.org/2001/XMLSchema-instance'
22
        node_set_attributes['xmlns:uax'] = 'http://opcfoundation.org/UA/2008/02/Types.xsd'
23
        node_set_attributes['xmlns:xsd'] = 'http://www.w3.org/2001/XMLSchema'
24
        node_set_attributes['xmlns'] = 'http://opcfoundation.org/UA/2011/03/UANodeSet.xsd'
25
26
        self.etree = Et.ElementTree(Et.Element('UANodeSet', node_set_attributes))
27
28
    def build_etree(self, node_list, uris=None):
29
        """
30
        Create an XML etree object from a list of nodes; custom namespace uris are optional
31
        Args:
32
            node_list: list of Node objects for export
33
            uris: list of namespace uri strings
34
35
        Returns:
36
        """
37
        self.logger.info('Building XML etree')
38
39
        # add all nodes in the list to the XML etree
40
        for node in node_list:
41
            self.node_to_etree(node)
42
43
        # add aliases to the XML etree
44
        self._add_alias_els()
45
46
        if uris:
47
            # add namespace uris to the XML etree
48
            self._add_namespace_uri_els(uris)
49
50
    def write_xml(self, xmlpath, pretty=True):
51
        """
52
        Write the XML etree in the exporter object to a file
53
        Args:
54
            xmlpath: string representing the path/file name
55
56
        Returns:
57
        """
58
        # try to write the XML etree to a file
59
        self.logger.info('Exporting XML file to %s', xmlpath)
60
        #from IPython import embed
61
        # embed()
62
        if pretty:
63
            indent(self.etree.getroot())
64
            self.etree.write(xmlpath,
65
                             short_empty_elements=False,
66
                             encoding='utf-8',
67
                             xml_declaration=True
68
                             )
69
        else:
70
            self.etree.write(xmlpath,
71
                             short_empty_elements=False,
72
                             encoding='utf-8',
73
                             xml_declaration=True
74
                             )
75
76
    def dump_etree(self):
77
        """
78
        Dump etree to console for debugging
79
        Returns:
80
        """
81
        self.logger.info('Dumping XML etree to console')
82
        Et.dump(self.etree)
83
84
    def node_to_etree(self, node):
85
        """
86
        Add the necessary XML sub elements to the etree for exporting the node
87
        Args:
88
            node: Node object which will be added to XML etree
89
90
        Returns:
91
        """
92
        node_class = node.get_node_class()
93
        print("Exporting: ", node)
94
95
        if node_class is ua.NodeClass.Object:
96
            self.add_etree_object(node)
97
        elif node_class is ua.NodeClass.ObjectType:
98
            self.add_etree_object_type(node)
99
        elif node_class is ua.NodeClass.Variable:
100
            self.add_etree_variable(node)
101
        elif node_class is ua.NodeClass.VariableType:
102
            self.add_etree_variable_type(node)
103
        elif node_class is ua.NodeClass.ReferenceType:
104
            self.add_etree_reference_type(node)
105
        elif node_class is ua.NodeClass.DataType:
106
            self.add_etree_datatype(node)
107
        elif node_class is ua.NodeClass.Method:
108
            self.add_etree_method(node)
109
        else:
110
            self.logger.info("Exporting node class not implemented: %s ", node_class)
111
112
    def _add_sub_el(self, el, name, text):
113
        child_el = Et.SubElement(el, name)
114
        child_el.text = text
115
        return child_el
116
117
    def _add_node_common(self, nodetype, node):
118
        browsename = node.get_browse_name().to_string()
119
        nodeid = node.nodeid.to_string()
120
        parent = node.get_parent()
121
        displayname = node.get_display_name().Text.decode('utf-8')
122
        desc = node.get_description().Text
123
        print("NODE COMMON", node)
124
        node_el = Et.SubElement(self.etree.getroot(), nodetype)
125
        node_el.attrib["NodeId"] = nodeid
126
        node_el.attrib["BrowseName"] = browsename
127
        if parent is not None:
128
            node_class = node.get_node_class()
129
            if node_class in (ua.NodeClass.Object, ua.NodeClass.Variable, ua.NodeClass.Method):
130
                node_el.attrib["ParentNodeId"] = parent.nodeid.to_string()
131
        self._add_sub_el(node_el, 'DisplayName', displayname)
132
        if desc not in (None, ""):
133
            self._add_sub_el(node_el, 'Description', desc.decode('utf-8'))
134
        # FIXME: add WriteMask and UserWriteMask
135
        return node_el
136
137
    def add_etree_object(self, node):
138
        """
139
        Add a UA object element to the XML etree
140
        """
141
        obj_el = self._add_node_common("UAObject", node)
142
        var = node.get_attribute(ua.AttributeIds.EventNotifier)
143
        if var.Value.Value != 0:
144
            obj_el.attrib["EventNotifier"] = str(var.Value.Value)
145
        self._add_ref_els(obj_el, node)
146
147
    def add_etree_object_type(self, node):
148
        """
149
        Add a UA object type element to the XML etree
150
        """
151
        obj_el = self._add_node_common("UAObjectType", node)
152
        abstract = node.get_attribute(ua.AttributeIds.IsAbstract).Value.Value
153
        if abstract:
154
            obj_el.attrib["IsAbstract"] = 'true'
155
        self._add_ref_els(obj_el, node)
156
157
    def add_variable_common(self, node, el):
158
        dtype = node.get_data_type()
159
        if dtype.Identifier in o_ids.ObjectIdNames:
160
            dtype_name = o_ids.ObjectIdNames[dtype.Identifier]
161
            self.aliases[dtype_name] = dtype.to_string()
162
        else:
163
            dtype_name = dtype.to_string()
164
        rank = node.get_value_rank()
165
        if rank != -1:
166
            el.attrib["ValueRank"] = str(rank)
167
        #var = node.get_attribute(ua.AttributeIds.ArrayDimensions())
168
        #self._addobj_el.attrib["ArrayDimensions"] = str(var.Value.Value)
169
        el.attrib["DataType"] = dtype_name
170
        value_to_etree(el, dtype_name, dtype, node)
171
172
    def add_etree_variable(self, node):
173
        """
174
        Add a UA variable element to the XML etree
175
        """
176
        var_el = self._add_node_common("UAVariable", node)
177
        self.add_variable_common(node, var_el)
178
179
        accesslevel = node.get_attribute(ua.AttributeIds.AccessLevel).Value.Value
180
        useraccesslevel = node.get_attribute(ua.AttributeIds.UserAccessLevel).Value.Value
181
182
        # We only write these values if they are different from defaults
183
        # Not sure where default is defined....
184
        if accesslevel not in (0, ua.AccessLevel.CurrentRead.mask):
185
            var_el.attrib["AccessLevel"] = str(accesslevel)
186
        if useraccesslevel not in (0, ua.AccessLevel.CurrentRead.mask):
187
            var_el.attrib["UserAccessLevel"] = str(useraccesslevel)
188
189
        var = node.get_attribute(ua.AttributeIds.MinimumSamplingInterval)
190
        if var.Value.Value:
191
            var_el.attrib["MinimumSamplingInterval"] = str(var.Value.Value)
192
        var = node.get_attribute(ua.AttributeIds.Historizing)
193
        if var.Value.Value:
194
            var_el.attrib["Historizing"] = 'true'
195
        self._add_ref_els(var_el, node)
196
197
    def add_etree_variable_type(self, node):
198
        """
199
        Add a UA variable type element to the XML etree
200
        """
201
202
        var_el = self._add_node_common("UAVariableType", node)
203
        self.add_variable_common(node, var_el)
204
205
        abstract = node.get_attribute(ua.AttributeIds.IsAbstract)
206
        if abstract.Value.Value:
207
            var_el.attrib["IsAbstract"] = "true"
208
209
        self._add_ref_els(var_el, node)
210
211
    def add_etree_method(self, node):
212
        obj_el = self._add_node_common("UAMethod", node)
213
214
        var = node.get_attribute(ua.AttributeIds.Executable)
215
        if var.Value.Value is False:
216
            obj_el.attrib["Executable"] = "false"
217
        var = node.get_attribute(ua.AttributeIds.UserExecutable)
218
        if var.Value.Value is False:
219
            obj_el.attrib["UserExecutable"] = "false"
220
        self._add_ref_els(obj_el, node)
221
222
    def add_etree_reference_type(self, obj):
223
        obj_el = self._add_node_common("UAReferenceType", obj)
224
        var = obj.get_attribute(ua.AttributeIds.InverseName)
225
        if var is not None and var.Value.Value is not None:
226
            self._add_sub_el(obj_el, 'InverseName', var.Value.Value.Text.decode('utf-8'))
227
        self._add_ref_els(obj_el, obj)
228
229
    def add_etree_datatype(self, obj):
230
        """
231
        Add a UA data type element to the XML etree
232
        """
233
        obj_el = self._add_node_common("UADataType", obj)
234
        self._add_ref_els(obj_el, obj)
235
236
    def _add_namespace_uri_els(self, uris):
237
        nuris_el = Et.Element('NamespaceUris')
238
239
        for uri in uris:
240
            self._add_sub_el(nuris_el, 'Uri', uri)
241
242
        self.etree.getroot().insert(0, nuris_el)
243
244
    def _add_alias_els(self):
245
        aliases_el = Et.Element('Aliases')
246
247
        for k, v in self.aliases.items():
248
            ref_el = Et.SubElement(aliases_el, 'Alias', Alias=k)
249
            ref_el.text = v
250
251
        self.etree.getroot().insert(0, aliases_el)
252
253
    def _add_ref_els(self, parent_el, obj):
254
        refs = obj.get_references()
255
        refs_el = Et.SubElement(parent_el, 'References')
256
257
        for ref in refs:
258
            if ref.ReferenceTypeId.Identifier in o_ids.ObjectIdNames:
259
                ref_name = o_ids.ObjectIdNames[ref.ReferenceTypeId.Identifier]
260
            else:
261
                ref_name = ref.ReferenceTypeId.to_string()
262
            ref_el = Et.SubElement(refs_el, 'Reference')
263
            ref_el.attrib['ReferenceType'] = ref_name
264
            if not ref.IsForward:
265
                ref_el.attrib['IsForward'] = 'false'
266
            ref_el.text = ref.NodeId.to_string()
267
268
            self.aliases[ref_name] = ref.ReferenceTypeId.to_string()
269
270
271
def member_to_etree(el, name, dtype, val):
272
    member_el = Et.SubElement(el, "uax:" + name)
273
    if isinstance(val, (list, tuple)):
274
        for v in val:
275
            _value_to_etree(member_el, ua.ObjectIdNames[dtype.Identifier], dtype, v)
276
    else:
277
        _val_to_etree(member_el, dtype, val)
278
279
280
def _val_to_etree(el, dtype, val):
281
    if val is None:
282
        val = ""
283
    if dtype == ua.NodeId(ua.ObjectIds.NodeId):
284
        id_el = Et.SubElement(el, "uax:Identifier")
285
        id_el.text = val.to_string()
286
    elif not hasattr(val, "ua_types"):
287
        el.text = str(val)
288
    else:
289
        for name, vtype in val.ua_types.items():
290
            member_to_etree(el, name, ua.NodeId(getattr(ua.ObjectIds, vtype)), getattr(val, name))
291
292
293
def value_to_etree(el, dtype_name, dtype, node):
294
    var = node.get_data_value().Value
295
    if var.Value is not None:
296
        val_el = Et.SubElement(el, 'Value')
297
        _value_to_etree(val_el, dtype_name, dtype, var.Value)
298
299
300
def _value_to_etree(el, type_name, dtype, val):
301
    if not val:
302
        return
303
    if isinstance(val, (list, tuple)):
304
        if dtype.Identifier > 21:  # this is an extentionObject:
305
            elname = "uax:ListOfExtensionObject"
306
        else:
307
            elname = "uax:ListOf" + type_name
308
        list_el = Et.SubElement(el, elname)
309
        for nval in val:
310
            _value_to_etree(list_el, type_name, dtype, nval)
311
    else:
312
        if dtype.Identifier > 21:  # this is an extentionObject:
313
            _extobj_to_etree(el, type_name, dtype, val)
314
        else:
315
            val_el = Et.SubElement(el, "uax:" + type_name)
316
            _val_to_etree(val_el, dtype, val)
317
           
318
319
def _extobj_to_etree(val_el, name, dtype, val):
320
    obj_el = Et.SubElement(val_el, "uax:ExtensionObject")
321
    type_el = Et.SubElement(obj_el, "uax:TypeId")
322
    id_el = Et.SubElement(type_el, "uax:Identifier")
323
    id_el.text = dtype.to_string()
324
    body_el = Et.SubElement(obj_el, "uax:Body")
325
    struct_el = Et.SubElement(body_el, "uax:" + name)
326
    for name, vtype in val.ua_types.items():
327
        member_to_etree(struct_el, name, ua.NodeId(getattr(ua.ObjectIds, vtype)), getattr(val, name))
328
329
330
def indent(elem, level=0):
331
    '''
332
    copy and paste from http://effbot.org/zone/element-lib.htm#prettyprint
333
    it basically walks your tree and adds spaces and newlines so the tree is
334
    printed in a nice way
335
    '''
336
    i = "\n" + level * "  "
337
    if len(elem):
338
        if not elem.text or not elem.text.strip():
339
            elem.text = i + "  "
340
        if not elem.tail or not elem.tail.strip():
341
            elem.tail = i
342
        for elem in elem:
343
            indent(elem, level + 1)
344
        if not elem.tail or not elem.tail.strip():
345
            elem.tail = i
346
    else:
347
        if level and (not elem.tail or not elem.tail.strip()):
348
            elem.tail = i
349