Completed
Pull Request — master (#311)
by
unknown
03:40
created

_extobj_argument_to_etree()   B

Complexity

Conditions 1

Size

Total Lines 27

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
c 1
b 0
f 0
dl 0
loc 27
rs 8.8571
1
"""
2
from a list of nodes in the address space, build an XML file
3
format is the one from opc-ua specification
4
"""
5
import logging
6
from collections import OrderedDict
7
import xml.etree.ElementTree as Et
8
9
from opcua import ua
10
from opcua.ua import object_ids as o_ids
11
12
13
class XmlExporter(object):
14
15
    def __init__(self, server):
16
        self.logger = logging.getLogger(__name__)
17
        self.server = server
18
        self.aliases = {}
19
20
        node_set_attributes = OrderedDict()
21
        node_set_attributes['xmlns:xsi'] = 'http://www.w3.org/2001/XMLSchema-instance'
22
        node_set_attributes['xmlns:uax'] = 'http://opcfoundation.org/UA/2008/02/Types.xsd'
23
        node_set_attributes['xmlns:xsd'] = 'http://www.w3.org/2001/XMLSchema'
24
        node_set_attributes['xmlns'] = 'http://opcfoundation.org/UA/2011/03/UANodeSet.xsd'
25
26
        self.etree = Et.ElementTree(Et.Element('UANodeSet', node_set_attributes))
27
28
    def build_etree(self, node_list, uris=None):
29
        """
30
        Create an XML etree object from a list of nodes; custom namespace uris are optional
31
        Args:
32
            node_list: list of Node objects for export
33
            uris: list of namespace uri strings
34
35
        Returns:
36
        """
37
        self.logger.info('Building XML etree')
38
39
        # add all nodes in the list to the XML etree
40
        for node in node_list:
41
            self.node_to_etree(node)
42
43
        # add aliases to the XML etree
44
        self._add_alias_els()
45
46
        if uris:
47
            # add namespace uris to the XML etree
48
            self._add_namespace_uri_els(uris)
49
50
    def write_xml(self, xmlpath, pretty=True):
51
        """
52
        Write the XML etree in the exporter object to a file
53
        Args:
54
            xmlpath: string representing the path/file name
55
56
        Returns:
57
        """
58
        # try to write the XML etree to a file
59
        self.logger.info('Exporting XML file to %s', xmlpath)
60
        #from IPython import embed
61
        #embed()
62
        if pretty:
63
            indent(self.etree.getroot())
64
            self.etree.write(xmlpath, 
65
                             short_empty_elements=False, 
66
                             encoding='utf-8', 
67
                             xml_declaration=True
68
                            )
69
        else:
70
            self.etree.write(xmlpath, 
71
                             short_empty_elements=False, 
72
                             encoding='utf-8', 
73
                             xml_declaration=True)
74
75
    def dump_etree(self):
76
        """
77
        Dump etree to console for debugging
78
        Returns:
79
        """
80
        self.logger.info('Dumping XML etree to console')
81
        Et.dump(self.etree)
82
83
    def node_to_etree(self, node):
84
        """
85
        Add the necessary XML sub elements to the etree for exporting the node
86
        Args:
87
            node: Node object which will be added to XML etree
88
89
        Returns:
90
        """
91
        node_class = node.get_node_class()
92
        print("Exporting: ", node)
93
94
        if node_class is ua.NodeClass.Object:
95
            self.add_etree_object(node)
96
        elif node_class is ua.NodeClass.ObjectType:
97
            self.add_etree_object_type(node)
98
        elif node_class is ua.NodeClass.Variable:
99
            self.add_etree_variable(node)
100
        elif node_class is ua.NodeClass.VariableType:
101
            self.add_etree_variable_type(node)
102
        elif node_class is ua.NodeClass.ReferenceType:
103
            self.add_etree_reference_type(node)
104
        elif node_class is ua.NodeClass.DataType:
105
            self.add_etree_datatype(node)
106
        elif node_class is ua.NodeClass.Method:
107
            self.add_etree_method(node)
108
        else:
109
            self.logger.info("Exporting node class not implemented: %s ", node_class)
110
111
    def _add_sub_el(self, el, name, text):
112
        child_el = Et.SubElement(el, name)
113
        child_el.text = text
114
        return child_el
115
116
    def _add_node_common(self, nodetype, node):
117
        browsename = node.get_browse_name().to_string()
118
        nodeid = node.nodeid.to_string()
119
        parent = node.get_parent()
120
        displayname = node.get_display_name().Text.decode('utf-8')
121
        desc = node.get_description().Text
122
        print("NODE COMMON", node)
123
        node_el = Et.SubElement(self.etree.getroot(), nodetype)
124
        node_el.attrib["NodeId"] = nodeid
125
        node_el.attrib["BrowseName"] = browsename
126
        if parent is not None:
127
            node_class = node.get_node_class()
128
            if node_class in (ua.NodeClass.Object, ua.NodeClass.Variable, ua.NodeClass.Method):
129
                node_el.attrib["ParentNodeId"] = parent.nodeid.to_string()
130
        self._add_sub_el(node_el, 'DisplayName', displayname)
131
        if desc not in (None, ""):
132
            self._add_sub_el(node_el, 'Description', desc.decode('utf-8'))
133
        # FIXME: add WriteMask and UserWriteMask
134
        return node_el
135
136
    def add_etree_object(self, node):
137
        """
138
        Add a UA object element to the XML etree
139
        """
140
        obj_el = self._add_node_common("UAObject", node)
141
        var = node.get_attribute(ua.AttributeIds.EventNotifier)
142
        if var.Value.Value != 0:
143
            obj_el.attrib["EventNotifier"] = str(var.Value.Value)
144
        self._add_ref_els(obj_el, node)
145
146
    def add_etree_object_type(self, node):
147
        """
148
        Add a UA object type element to the XML etree
149
        """
150
        obj_el = self._add_node_common("UAObjectType", node)
151
        abstract = node.get_attribute(ua.AttributeIds.IsAbstract).Value.Value
152
        if abstract:
153
            obj_el.attrib["IsAbstract"] = 'true'
154
        self._add_ref_els(obj_el, node)
155
156
    def add_variable_common(self, node, el):
157
        dtype = node.get_data_type()
158
159
        # FIXME hack because get_data_type() has issues
160
        if dtype.NamespaceIndex > 50:
161
            dtype.Identifier = dtype.NamespaceIndex
162
            dtype.NamespaceIndex = 0
163
164
        if dtype.Identifier in o_ids.ObjectIdNames:
165
            dtype_name = o_ids.ObjectIdNames[dtype.Identifier]
166
            self.aliases[dtype_name] = dtype.to_string()
167
        else:
168
            dtype_name = dtype.to_string()
169
        rank = node.get_value_rank()
170
        if rank != -1:
171
            el.attrib["ValueRank"] = str(rank)
172
        #var = node.get_attribute(ua.AttributeIds.ArrayDimensions())
173
        #self._addobj_el.attrib["ArrayDimensions"] = str(var.Value.Value)
174
        el.attrib["DataType"] = dtype_name
175
        value_to_etree(el, dtype_name, dtype, node)
176
177
    def add_etree_variable(self, node):
178
        """
179
        Add a UA variable element to the XML etree
180
        """
181
        var_el = self._add_node_common("UAVariable", node)
182
        self.add_variable_common(node, var_el)
183
184
        accesslevel = node.get_attribute(ua.AttributeIds.AccessLevel).Value.Value
185
        useraccesslevel = node.get_attribute(ua.AttributeIds.UserAccessLevel).Value.Value
186
187
        # We only write these values if they are different from defaults
188
        # Not sure where default is defined....
189
        if accesslevel not in (0, ua.AccessLevel.CurrentRead.mask):
190
            var_el.attrib["AccessLevel"] = str(accesslevel)
191
        if useraccesslevel not in (0, ua.AccessLevel.CurrentRead.mask):
192
            var_el.attrib["UserAccessLevel"] = str(useraccesslevel)
193
194
        var = node.get_attribute(ua.AttributeIds.MinimumSamplingInterval)
195
        if var.Value.Value:
196
            var_el.attrib["MinimumSamplingInterval"] = str(var.Value.Value)
197
        var = node.get_attribute(ua.AttributeIds.Historizing)
198
        if var.Value.Value:
199
            var_el.attrib["Historizing"] = 'true'
200
        self._add_ref_els(var_el, node)
201
202
    def add_etree_variable_type(self, node):
203
        """
204
        Add a UA variable type element to the XML etree
205
        """
206
207
        var_el = self._add_node_common("UAVariableType", node)
208
        self.add_variable_common(node, var_el)
209
210
        abstract = node.get_attribute(ua.AttributeIds.IsAbstract)
211
        if abstract.Value.Value:
212
            var_el.attrib["IsAbstract"] = "true" 
213
214
        self._add_ref_els(var_el, node)
215
216
    def add_etree_method(self, node):
217
        obj_el = self._add_node_common("UAMethod", node)
218
219
        var = node.get_attribute(ua.AttributeIds.Executable)
220
        if var.Value.Value is False:
221
            obj_el.attrib["Executable"] = "false"
222
        var = node.get_attribute(ua.AttributeIds.UserExecutable)
223
        if var.Value.Value is False:
224
            obj_el.attrib["UserExecutable"] = "false"
225
        self._add_ref_els(obj_el, node)
226
227
    def add_etree_reference_type(self, obj):
228
        obj_el = self._add_node_common("UAReferenceType", obj)
229
        var = obj.get_attribute(ua.AttributeIds.InverseName)
230
        if var is not None and var.Value.Value is not None:
231
            self._add_sub_el(obj_el, 'InverseName', var.Value.Value.Text.decode('utf-8'))
232
        self._add_ref_els(obj_el, obj)
233
234
    def add_etree_datatype(self, obj):
235
        """
236
        Add a UA data type element to the XML etree
237
        """
238
        obj_el = self._add_node_common("UADataType", obj)
239
        self._add_ref_els(obj_el, obj)
240
241
    def _add_namespace_uri_els(self, uris):
242
        nuris_el = Et.Element('NamespaceUris')
243
244
        for uri in uris:
245
            self._add_sub_el(nuris_el, 'Uri', uri)
246
247
        self.etree.getroot().insert(0, nuris_el)
248
249
    def _add_alias_els(self):
250
        aliases_el = Et.Element('Aliases')
251
252
        for k, v in self.aliases.items():
253
            ref_el = Et.SubElement(aliases_el, 'Alias', Alias=k)
254
            ref_el.text = v
255
256
        self.etree.getroot().insert(0, aliases_el)
257
258
    def _add_ref_els(self, parent_el, obj):
259
        refs = obj.get_references()
260
        refs_el = Et.SubElement(parent_el, 'References')
261
262
        for ref in refs:
263
            if ref.ReferenceTypeId.Identifier in o_ids.ObjectIdNames:
264
                ref_name = o_ids.ObjectIdNames[ref.ReferenceTypeId.Identifier]
265
            else:
266
                ref_name = ref.ReferenceTypeId.to_string()
267
            ref_el = Et.SubElement(refs_el, 'Reference')
268
            ref_el.attrib['ReferenceType'] = ref_name
269
            if not ref.IsForward:
270
                ref_el.attrib['IsForward'] = 'false'
271
            ref_el.text = ref.NodeId.to_string()
272
273
            self.aliases[ref_name] = ref.ReferenceTypeId.to_string()
274
275
276
def value_to_etree(el, dtype_name, dtype, node):
277
    var = node.get_data_value().Value
278
    if var.Value is not None:
279
        val_el = Et.SubElement(el, 'Value')
280
        _value_to_etree(val_el, dtype_name, dtype, var.Value)
281
282
283
def _value_to_etree(el, dtype_name, dtype, val):
284
    if isinstance(val, (list, tuple)):
285
        if type(dtype.Identifier) is int and dtype.Identifier > 21:  # this is a list of ExtensionObjects:
286
            list_el = Et.SubElement(el, "uax:ListOfExtensionObject")  # FIXME should this string be hardcoded?
287
            for nval in val:
288
                _extobj_to_etree(list_el, dtype_name, dtype, nval)
289
        else:
290
            list_el = Et.SubElement(el, "uax:ListOf" + dtype_name)
291
            for nval in val:
292
                _value_to_etree(list_el, dtype_name, dtype, nval)
293
    else:
294
        if dtype.Identifier is int and dtype.Identifier > 21:  # this is an ExtensionObject:
295
            _extobj_to_etree(el, dtype_name, dtype, val)
296
        else:
297
            val_el = Et.SubElement(el, "uax:" + dtype_name)
298
            val_el.text = str(val)
299
300
301
def _extobj_to_etree(val_el, dtype_name, dtype, val):
302
    obj_el = Et.SubElement(val_el, "uax:ExtensionObject")
303
    type_el = Et.SubElement(obj_el, "uax:TypeId")
304
    id_el = Et.SubElement(type_el, "uax:Identifier")
305
    id_el.text = "i=" + str(dtype.Identifier)  # val.TypeId.to_string()
306
    body_el = Et.SubElement(obj_el, "uax:Body")
307
    if dtype.Identifier == 296:
308
        struct_el = _extobj_argument_to_etree(body_el, dtype_name, dtype, val)
309
    else:
310
        # TODO implement other Extension Objects here
311
        print("Exporting extension object not implemented: %s ", dtype_name)  # FIXME send to self.logger.info
312
313
    # FIXME: finish
314
315
316
def _extobj_argument_to_etree(body_el, dtype_name, dtype, val):
317
    """
318
    Export structure for UA extension object Argument (i=296)
319
    :param body_el: Body XML element
320
    :param dtype_name: DataType name in string format
321
    :param dtype: DataType as node id
322
    :param val: ua.Argument extension object
323
    :return: Extension Object structure XML element
324
    """
325
    struct_el = Et.SubElement(body_el, "uax:" + dtype_name)
326
327
    ex_name_el = Et.SubElement(struct_el, "uax:Name")
328
    ex_name_el.text = val.Name
329
330
    ex_type_el = Et.SubElement(struct_el, "uax:DataType")
331
    ex_id_el = Et.SubElement(ex_type_el, "uax:Identifier")
332
    ex_id_el.text = "i=" + str(val.DataType.Identifier.Identifier)
333
334
    ex_rank_el = Et.SubElement(struct_el, "uax:ValueRank")
335
    ex_rank_el.text = str(val.ValueRank)
336
337
    ex_dimen_el = Et.SubElement(struct_el, "uax:ArrayDimensions")
338
    ex_dimen_el.text = ""  # FIXME should parse val.ArrayDimensions list
339
340
    ex_desc_el = Et.SubElement(struct_el, "uax:Description")
341
    ex_desc_el.text = val.Description.Text.decode('utf-8')
342
    return struct_el
343
   
344
345
def indent(elem, level=0):
346
    """
347
    copy and paste from http://effbot.org/zone/element-lib.htm#prettyprint
348
    it basically walks your tree and adds spaces and newlines so the tree is
349
    printed in a nice way
350
    """
351
    i = "\n" + level*"  "
352
    if len(elem):
353
        if not elem.text or not elem.text.strip():
354
            elem.text = i + "  "
355
        if not elem.tail or not elem.tail.strip():
356
            elem.tail = i
357
        for elem in elem:
358
            indent(elem, level+1)
359
        if not elem.tail or not elem.tail.strip():
360
            elem.tail = i
361
    else:
362
        if level and (not elem.tail or not elem.tail.strip()):
363
            elem.tail = i
364