Completed
Push — master ( d47619...3daa2d )
by Olivier
04:11
created

XmlImporter.import_xml()   A

Complexity

Conditions 3

Size

Total Lines 23

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 14
CRAP Score 3.0494

Importance

Changes 5
Bugs 1 Features 0
Metric Value
cc 3
c 5
b 1
f 0
dl 0
loc 23
ccs 14
cts 17
cp 0.8235
crap 3.0494
rs 9.0856
1
"""
2
add nodes defined in XML to address space
3
format is the one from opc-ua specification
4
"""
5 1
import logging
6 1
import uuid
7 1
from copy import copy
8
9 1
import opcua
10 1
from opcua import ua
11 1
from opcua.common import xmlparser
12
13
14 1
class XmlImporter(object):
15
16 1
    def __init__(self, server):
17 1
        self.logger = logging.getLogger(__name__)
18 1
        self.parser = None
19 1
        self.server = server
20 1
        self.namespaces = {}
21 1
        self.aliases = {}
22
23 1
    def _map_namespaces(self, namespaces_uris):
24
        """
25
        creates a mapping between the namespaces in the xml file and in the server.
26
        if not present the namespace is registered.
27
        """
28 1
        namespaces = {}
29 1
        for ns_index, ns_uri in enumerate(namespaces_uris):
30 1
            ns_server_index = self.server.register_namespace(ns_uri)
31 1
            namespaces[ns_index + 1] = ns_server_index
32 1
        return namespaces
33
34 1
    def _map_aliases(self, aliases):
35
        """
36
        maps the import aliases to the correct namespaces        
37
        """
38 1
        aliases_mapped = {}
39 1
        for alias, node_id in aliases.items():
40 1
            aliases_mapped[alias] = self._migrate_ns(self.to_nodeid(node_id))
41 1
        return aliases_mapped
42
43 1
    def import_xml(self, xmlpath):
44
        """
45
        import xml and return added nodes
46
        """
47 1
        self.logger.info("Importing XML file %s", xmlpath)
48 1
        self.parser = xmlparser.XMLParser(xmlpath)
49
50 1
        self.namespaces = self._map_namespaces(self.parser.get_used_namespaces())
51 1
        self.aliases = self._map_aliases(self.parser.get_aliases())
52
        
53 1
        dnodes = self.parser.get_node_datas()
54 1
        dnodes = self.make_objects(dnodes)
55 1
        nodes_parsed = self._sort_nodes_by_parentid(dnodes)
56
57 1
        nodes = []
58 1
        for nodedata in nodes_parsed:  # self.parser:
59 1
            try:
60 1
                node = self._add_node_data(nodedata)
61
            except Exception:
62
                self.logger.warning("failure adding node %s", nodedata)
63
                raise
64 1
            nodes.append(node)
65 1
        return nodes
66
67 1
    def _add_node_data(self, nodedata):
68 1
        if nodedata.nodetype == 'UAObject':
69 1
            node = self.add_object(nodedata)
70 1
        elif nodedata.nodetype == 'UAObjectType':
71 1
            node = self.add_object_type(nodedata)
72 1
        elif nodedata.nodetype == 'UAVariable':
73 1
            node = self.add_variable(nodedata)
74 1
        elif nodedata.nodetype == 'UAVariableType':
75
            node = self.add_variable_type(nodedata)
76 1
        elif nodedata.nodetype == 'UAReferenceType':
77
            node = self.add_reference_type(nodedata)
78 1
        elif nodedata.nodetype == 'UADataType':
79 1
            node = self.add_datatype(nodedata)
80 1
        elif nodedata.nodetype == 'UAMethod':
81 1
            node = self.add_method(nodedata)
82
        else:
83
            self.logger.warning("Not implemented node type: %s ", nodedata.nodetype)
84 1
        return node
85
86 1
    def _add_node(self, node):
87 1
        if isinstance(self.server, opcua.server.server.Server):
88 1
            return self.server.iserver.isession.add_nodes([node])
89
        else:
90 1
            return self.server.uaclient.add_nodes([node])
91
92 1
    def _add_references(self, refs):
93 1
        if isinstance(self.server, opcua.server.server.Server):
94 1
            return self.server.iserver.isession.add_references(refs)
95
        else:
96 1
            return self.server.uaclient.add_references(refs)
97
    
98 1
    def make_objects(self, node_datas):
99 1
        new_nodes = []
100 1
        for ndata in node_datas:
101 1
            ndata.nodeid = ua.NodeId.from_string(ndata.nodeid)
102 1
            ndata.browsename = ua.QualifiedName.from_string(ndata.browsename)
103 1
            if ndata.parent:
104 1
                ndata.parent = ua.NodeId.from_string(ndata.parent)
105 1
            if ndata.parentlink:
106 1
                ndata.parentlink = self.to_nodeid(ndata.parentlink)
107 1
            if ndata.typedef:
108 1
                ndata.typedef = self.to_nodeid(ndata.typedef)
109 1
            new_nodes.append(ndata)
110 1
        return new_nodes
111
112 1
    def _migrate_ns(self, nodeid):
113
        """
114
        Check if the index of nodeid or browsename  given in the xml model file
115
        must be converted to a already existing namespace id based on the files
116
        namespace uri
117
118
        :returns: NodeId (str)
119
        """
120 1
        if nodeid.NamespaceIndex in self.namespaces:
121 1
            nodeid = copy(nodeid)
122 1
            nodeid.NamespaceIndex = self.namespaces[nodeid.NamespaceIndex]
123 1
        return nodeid
124
125 1
    def _get_node(self, obj):
126 1
        node = ua.AddNodesItem()
127 1
        node.RequestedNewNodeId = self._migrate_ns(obj.nodeid)
128 1
        node.BrowseName = self._migrate_ns(obj.browsename)
129 1
        self.logger.info("Importing xml node (%s, %s) as (%s %s)", obj.browsename, obj.nodeid, node.BrowseName, node.RequestedNewNodeId)
130 1
        node.NodeClass = getattr(ua.NodeClass, obj.nodetype[2:])
131 1
        if obj.parent:
132 1
            node.ParentNodeId = self._migrate_ns(obj.parent)
133 1
        if obj.parentlink:
134 1
            node.ReferenceTypeId = self._migrate_ns(obj.parentlink)
135 1
        if obj.typedef:
136 1
            node.TypeDefinition = self._migrate_ns(obj.typedef)
137 1
        return node
138
139 1
    def to_nodeid(self, nodeid):
140 1
        if isinstance(nodeid, ua.NodeId):
141
            return nodeid
142 1
        elif not nodeid:
143
            return ua.NodeId(ua.ObjectIds.String)
144 1
        elif "=" in nodeid:
145 1
            return ua.NodeId.from_string(nodeid)
146 1
        elif hasattr(ua.ObjectIds, nodeid):
147 1
            return ua.NodeId(getattr(ua.ObjectIds, nodeid))
148
        else:
149 1 View Code Duplication
            if nodeid in self.aliases:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
150 1
                return self.aliases[nodeid]
151
            else:
152
                return ua.NodeId(getattr(ua.ObjectIds, nodeid))
153
154 1
    def add_object(self, obj):
155 1
        node = self._get_node(obj)
156 1
        attrs = ua.ObjectAttributes()
157 1
        if obj.desc:
158 1
            attrs.Description = ua.LocalizedText(obj.desc)
159 1
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
160 1
        attrs.EventNotifier = obj.eventnotifier
161 1
        node.NodeAttributes = attrs
162 1 View Code Duplication
        res = self._add_node(node)
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
163 1
        self._add_refs(obj)
164 1
        res[0].StatusCode.check()
165 1
        return res[0].AddedNodeId
166
167 1
    def add_object_type(self, obj):
168 1
        node = self._get_node(obj)
169 1
        attrs = ua.ObjectTypeAttributes()
170 1
        if obj.desc:
171
            attrs.Description = ua.LocalizedText(obj.desc)
172 1
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
173 1
        attrs.IsAbstract = obj.abstract
174 1
        node.NodeAttributes = attrs
175 1
        res = self._add_node(node)
176 1
        self._add_refs(obj)
177 1
        res[0].StatusCode.check()
178 1
        return res[0].AddedNodeId
179
180 1
    def add_variable(self, obj):
181 1
        node = self._get_node(obj)
182 1
        attrs = ua.VariableAttributes()
183 1
        if obj.desc:
184 1
            attrs.Description = ua.LocalizedText(obj.desc)
185 1
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
186 1
        attrs.DataType = self.to_nodeid(obj.datatype)
187 1
        if obj.value is not None:
188 1
            attrs.Value = self._add_variable_value(obj,)
189 1
        if obj.rank:
190 1
            attrs.ValueRank = obj.rank
191 1
        if obj.accesslevel:
192
            attrs.AccessLevel = obj.accesslevel
193 1
        if obj.useraccesslevel:
194
            attrs.UserAccessLevel = obj.useraccesslevel
195 1
        if obj.minsample:
196
            attrs.MinimumSamplingInterval = obj.minsample
197 1
        if obj.dimensions:
198 1
            attrs.ArrayDimensions = obj.dimensions
199 1
        node.NodeAttributes = attrs
200 1
        res = self._add_node(node)
201 1
        self._add_refs(obj)
202 1
        res[0].StatusCode.check()
203 1
        return res[0].AddedNodeId
204
205 1
    def _make_ext_obj(self, obj):
206 1
        ext = getattr(ua, obj.objname)()
207 1
        for name, val in obj.body:
208 1
            if isinstance(val, str):
209
                raise Exception("Error val should a dict", name, val)
210
            else:
211 1
                for attname, v in val:
212 1
                    self._set_attr(ext, attname, v)
213 1
        return ext
214
215 1
    def _set_attr(self, obj, attname, val):
216
        # tow possible values:
217
        # either we get value directly
218
        # or a dict if it s an object or a list
219 1
        if isinstance(val, str):
220 1
            pval = xmlparser.ua_type_to_python(val, obj.ua_types[attname])
221 1
            setattr(obj, attname, pval)
222
        else:
223
            # so we have either an object or a list...
224 1
            obj2 = getattr(obj, attname)
225 1
            if isinstance(obj2, ua.NodeId):  # NodeId representation does not follow common rules!!
226 1
                for attname2, v2 in val:
227 1
                    if attname2 == "Identifier":
228 1
                        obj2 = ua.NodeId.from_string(v2)
229 1
                        setattr(obj, attname, obj2)
230 1
                        break
231 1
            elif not isinstance(obj2, ua.NodeId) and not hasattr(obj2, "ua_types"):
232
                # we probably have a list
233 1
                my_list = []
234 1
                for vtype, v2 in val:
235 1
                    my_list.append(xmlparser.ua_type_to_python(v2, vtype))
236 1
                setattr(obj, attname, my_list)
237
            else:
238 1
                for attname2, v2 in val:
239 1
                    self._set_attr(obj2, attname2, v2)
240 1
                setattr(obj, attname, obj2)
241
242 1
    def _add_variable_value(self, obj):
243
        """
244
        Returns the value for a Variable based on the objects value type.
245
        """
246 1
        self.logger.debug("Setting value with type %s and value %s", obj.valuetype, obj.value)
247 1
        if obj.valuetype == 'ListOfExtensionObject':
248 1
            values = []
249 1
            for ext in obj.value:
250 1
                extobj = self._make_ext_obj(ext)
251 1
                values.append(extobj)
252 1
            return ua.Variant(values, ua.VariantType.ExtensionObject)
253 1
        elif obj.valuetype == 'ListOfGuid':
254 1
            return ua.Variant([
255
                uuid.UUID(guid) for guid in obj.value
256
            ], getattr(ua.VariantType, obj.valuetype[6:]))
257 1
        elif obj.valuetype.startswith("ListOf"):
258 1
            vtype = obj.valuetype[6:]
259 1
            if hasattr(ua.ua_binary.Primitives, vtype):
260 1
                return ua.Variant(obj.value, getattr(ua.VariantType, vtype))
261
            else:
262 1
                return ua.Variant([getattr(ua, vtype)(v) for v in obj.value])
263 1
        elif obj.valuetype == 'ExtensionObject':
264 1
            extobj = self._make_ext_obj(obj.value)
265 1
            return ua.Variant(extobj, getattr(ua.VariantType, obj.valuetype))
266 1
        elif obj.valuetype == 'Guid':
267 1
            return ua.Variant(uuid.UUID(obj.value), getattr(ua.VariantType, obj.valuetype))
268 1
        elif obj.valuetype == 'LocalizedText':
269 1
            ltext = ua.LocalizedText()
270 1
            for name, val in obj.value:
271 1
                if name == "Text":
272 1
                    ltext.Text = val.encode("utf-8")
273
                else:
274
                    self.logger.warning("While parsing localizedText value, unkown element: %s with val: %s", name, val)
275 1
            return ua.Variant(ltext, ua.VariantType.LocalizedText)
276 1
        elif obj.valuetype == 'NodeId':
277 1
            return ua.Variant(ua.NodeId.from_string(obj.value))
278
        else:
279 1
            return ua.Variant(obj.value, getattr(ua.VariantType, obj.valuetype))
280
281 1
    def add_variable_type(self, obj):
282
        node = self._get_node(obj)
283
        attrs = ua.VariableTypeAttributes()
284
        if obj.desc:
285
            attrs.Description = ua.LocalizedText(obj.desc)
286
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
287
        attrs.DataType = self.to_nodeid(obj.datatype)
288
        if obj.value and len(obj.value) == 1:
289
            attrs.Value = obj.value[0]
290
        if obj.rank:
291
            attrs.ValueRank = obj.rank
292
        if obj.abstract:
293
            attrs.IsAbstract = obj.abstract
294
        if obj.dimensions:
295
            attrs.ArrayDimensions = obj.dimensions
296
        node.NodeAttributes = attrs
297 View Code Duplication
        res = self._add_node(node)
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
298
        self._add_refs(obj)
299
        res[0].StatusCode.check()
300
        return res[0].AddedNodeId
301
302 1
    def add_method(self, obj):
303 1
        node = self._get_node(obj)
304 1
        attrs = ua.MethodAttributes()
305 1
        if obj.desc:
306 1
            attrs.Description = ua.LocalizedText(obj.desc)
307 1
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
308 1
        if obj.accesslevel:
309
            attrs.AccessLevel = obj.accesslevel
310 1
        if obj.useraccesslevel:
311
            attrs.UserAccessLevel = obj.useraccesslevel
312 1
        if obj.minsample:
313
            attrs.MinimumSamplingInterval = obj.minsample
314 1
        if obj.dimensions:
315
            attrs.ArrayDimensions = obj.dimensions
316 1
        node.NodeAttributes = attrs
317 1 View Code Duplication
        res = self._add_node(node)
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
318 1
        self._add_refs(obj)
319 1
        res[0].StatusCode.check()
320 1
        return res[0].AddedNodeId
321
322 1
    def add_reference_type(self, obj):
323
        node = self._get_node(obj)
324
        attrs = ua.ReferenceTypeAttributes()
325
        if obj.desc:
326
            attrs.Description = ua.LocalizedText(obj.desc)
327
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
328
        if obj. inversename:
329
            attrs.InverseName = ua.LocalizedText(obj.inversename)
330
        if obj.abstract:
331
            attrs.IsAbstract = obj.abstract
332
        if obj.symmetric:
333
            attrs.Symmetric = obj.symmetric
334
        node.NodeAttributes = attrs
335 View Code Duplication
        res = self._add_node(node)
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
336
        self._add_refs(obj)
337
        res[0].StatusCode.check()
338
        return res[0].AddedNodeId
339
340 1
    def add_datatype(self, obj):
341 1
        node = self._get_node(obj)
342 1
        attrs = ua.DataTypeAttributes()
343 1
        if obj.desc:
344 1
            attrs.Description = ua.LocalizedText(obj.desc)
345 1
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
346 1
        if obj.abstract:
347
            attrs.IsAbstract = obj.abstract
348 1
        node.NodeAttributes = attrs
349 1
        res = self._add_node(node)
350 1
        self._add_refs(obj)
351 1
        res[0].StatusCode.check()
352 1
        return res[0].AddedNodeId
353
354 1
    def _add_refs(self, obj):
355 1
        if not obj.refs:
356 1
            return
357 1
        refs = []
358 1
        for data in obj.refs:
359 1
            ref = ua.AddReferencesItem()
360 1
            ref.IsForward = True
361 1
            ref.ReferenceTypeId = self.to_nodeid(data.reftype)
362 1
            ref.SourceNodeId = self._migrate_ns(obj.nodeid)
363 1
            ref.TargetNodeClass = ua.NodeClass.DataType
364 1
            ref.TargetNodeId = self._migrate_ns(self.to_nodeid(data.target))
365 1
            refs.append(ref)
366 1
        self._add_references(refs)
367
368 1
    def _sort_nodes_by_parentid(self, ndatas):
369
        """
370
        Sort the list of nodes according their parent node in order to respect
371
        the dependency between nodes.
372
373
        :param nodes: list of NodeDataObjects
374
        :returns: list of sorted nodes
375
        """
376 1
        _ndatas = list(ndatas)
377
        # list of node ids that are already sorted / inserted
378 1
        sorted_nodes_ids = []
379
        # list of sorted nodes (i.e. XML Elements)
380 1
        sorted_ndatas = []
381 1
        all_node_ids = [data.nodeid for data in ndatas]
382
        # list of namespace indexes that are relevant for this import
383
        # we can only respect ordering nodes for namespaces indexes that
384
        # are defined in the xml file itself. Thus we assume that all other
385
        # references namespaces are already known to the server and should
386
        # not create any dependency problems (like "NodeNotFound")
387 1
        while len(_ndatas) > 0:
388 1
            pop_nodes = []
389 1
            for ndata in _ndatas:
390
                # Insert nodes that
391
                #   (1) have no parent / parent_ns is None (e.g. namespace 0)
392
                #   (2) ns is not in list of relevant namespaces
393 1
                if ndata.nodeid.NamespaceIndex not in self.namespaces or \
394
                        ndata.parent is None or \
395
                        ndata.parent not in all_node_ids:
396 1
                    sorted_ndatas.append(ndata)
397 1
                    sorted_nodes_ids.append(ndata.nodeid)
398 1
                    pop_nodes.append(ndata)
399
                else:
400
                    # Check if the nodes parent is already in the list of
401
                    # inserted nodes
402 1
                    if ndata.parent in sorted_nodes_ids:
403 1
                        sorted_ndatas.append(ndata)
404 1
                        sorted_nodes_ids.append(ndata.nodeid)
405 1
                        pop_nodes.append(ndata)
406
            # Remove inserted nodes from the list
407 1
            for ndata in pop_nodes:
408 1
                _ndatas.pop(_ndatas.index(ndata))
409
        return sorted_ndatas
410