Completed
Push — master ( 5ca03b...453bfe )
by Olivier
03:43
created

XmlImporter._add_references()   A

Complexity

Conditions 2

Size

Total Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 2

Importance

Changes 0
Metric Value
cc 2
c 0
b 0
f 0
dl 0
loc 5
ccs 4
cts 4
cp 1
crap 2
rs 9.4285
1
"""
2
add nodes defined in XML to address space
3
format is the one from opc-ua specification
4
"""
5 1
import logging
6 1
import uuid
7 1
from copy import copy
8
9 1
import opcua
10 1
from opcua import ua
11 1
from opcua.common import xmlparser
12
13
14 1
class XmlImporter(object):
15
16 1
    def __init__(self, server):
17 1
        self.logger = logging.getLogger(__name__)
18 1
        self.parser = None
19 1
        self.server = server
20 1
        self.namespaces = {}
21 1
        self.aliases = {}
22
23 1
    def _map_namespaces(self, namespaces_uris):
24
        """
25
        creates a mapping between the namespaces in the xml file and in the server.
26
        if not present the namespace is registered.
27
        """
28 1
        namespaces = {}
29 1
        for ns_index, ns_uri in enumerate(namespaces_uris):
30 1
            ns_server_index = self.server.register_namespace(ns_uri)
31 1
            namespaces[ns_index + 1] = ns_server_index
32 1
        return namespaces
33
34 1
    def _map_aliases(self, aliases):
35
        """
36
        maps the import aliases to the correct namespaces        
37
        """
38 1
        aliases_mapped = {}
39 1
        for alias, node_id in aliases.items():
40 1
            aliases_mapped[alias] = self._migrate_ns(self.to_nodeid(node_id))
41 1
        return aliases_mapped
42
43 1
    def import_xml(self, xmlpath):
44
        """
45
        import xml and return added nodes
46
        """
47 1
        self.logger.info("Importing XML file %s", xmlpath)
48 1
        self.parser = xmlparser.XMLParser(xmlpath)
49
50 1
        dnodes = self.parser.get_node_datas()
51 1
        dnodes = self.make_objects(dnodes)
52
53 1
        self.namespaces = self._map_namespaces(self.parser.get_used_namespaces())
54 1
        self.aliases = self._map_aliases(self.parser.get_aliases())
55
56 1
        nodes_parsed = self._sort_nodes_by_parentid(dnodes)
57
58 1
        nodes = []
59 1
        for nodedata in nodes_parsed:  # self.parser:
60 1
            try:
61 1
                node = self._add_node_data(nodedata)
62
            except Exception:
63
                self.logger.warning("failure adding node %s", nodedata)
64
                raise
65 1
            nodes.append(node)
66 1
        return nodes
67
68 1
    def _add_node_data(self, nodedata):
69 1
        if nodedata.nodetype == 'UAObject':
70 1
            node = self.add_object(nodedata)
71 1
        elif nodedata.nodetype == 'UAObjectType':
72 1
            node = self.add_object_type(nodedata)
73 1
        elif nodedata.nodetype == 'UAVariable':
74 1
            node = self.add_variable(nodedata)
75 1
        elif nodedata.nodetype == 'UAVariableType':
76
            node = self.add_variable_type(nodedata)
77 1
        elif nodedata.nodetype == 'UAReferenceType':
78
            node = self.add_reference_type(nodedata)
79 1
        elif nodedata.nodetype == 'UADataType':
80 1
            node = self.add_datatype(nodedata)
81 1
        elif nodedata.nodetype == 'UAMethod':
82 1
            node = self.add_method(nodedata)
83
        else:
84
            self.logger.warning("Not implemented node type: %s ", nodedata.nodetype)
85 1
        return node
86
87 1
    def _add_node(self, node):
88 1
        if isinstance(self.server, opcua.server.server.Server):
89 1
            return self.server.iserver.isession.add_nodes([node])
90
        else:
91 1
            return self.server.uaclient.add_nodes([node])
92
93 1
    def _add_references(self, refs):
94 1
        if isinstance(self.server, opcua.server.server.Server):
95 1
            return self.server.iserver.isession.add_references(refs)
96
        else:
97 1
            return self.server.uaclient.add_references(refs)
98
    
99 1
    def make_objects(self, node_datas):
100 1
        new_nodes = []
101 1
        for ndata in node_datas:
102 1
            ndata.nodeid = ua.NodeId.from_string(ndata.nodeid)
103 1
            ndata.browsename = ua.QualifiedName.from_string(ndata.browsename)
104 1
            if ndata.parent:
105 1
                ndata.parent = ua.NodeId.from_string(ndata.parent)
106 1
            if ndata.parentlink:
107 1
                ndata.parentlink = self.to_nodeid(ndata.parentlink)
108 1
            if ndata.typedef:
109 1
                ndata.typedef = self.to_nodeid(ndata.typedef)
110 1
            new_nodes.append(ndata)
111 1
        return new_nodes
112
113 1
    def _migrate_ns(self, nodeid):
114
        """
115
        Check if the index of nodeid or browsename  given in the xml model file
116
        must be converted to a already existing namespace id based on the files
117
        namespace uri
118
119
        :returns: NodeId (str)
120
        """
121 1
        if nodeid.NamespaceIndex in self.namespaces:
122 1
            nodeid = copy(nodeid)
123 1
            nodeid.NamespaceIndex = self.namespaces[nodeid.NamespaceIndex]
124 1
        return nodeid
125
126 1
    def _get_node(self, obj):
127 1
        node = ua.AddNodesItem()
128 1
        node.RequestedNewNodeId = self._migrate_ns(obj.nodeid)
129 1
        node.BrowseName = self._migrate_ns(obj.browsename)
130 1
        self.logger.info("Importing xml node (%s, %s) as (%s %s)", obj.browsename, obj.nodeid, node.BrowseName, node.RequestedNewNodeId)
131 1
        node.NodeClass = getattr(ua.NodeClass, obj.nodetype[2:])
132 1
        if obj.parent:
133 1
            node.ParentNodeId = self._migrate_ns(obj.parent)
134 1
        if obj.parentlink:
135 1
            node.ReferenceTypeId = self._migrate_ns(obj.parentlink)
136 1
        if obj.typedef:
137 1
            node.TypeDefinition = self._migrate_ns(obj.typedef)
138 1
        return node
139
140 1
    def to_nodeid(self, nodeid):
141 1
        if isinstance(nodeid, ua.NodeId):
142
            return nodeid
143 1
        elif not nodeid:
144
            return ua.NodeId(ua.ObjectIds.String)
145 1
        elif "=" in nodeid:
146 1
            return ua.NodeId.from_string(nodeid)
147 1
        elif hasattr(ua.ObjectIds, nodeid):
148 1
            return ua.NodeId(getattr(ua.ObjectIds, nodeid))
149 View Code Duplication
        else:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
150 1
            if nodeid in self.aliases:
151 1
                return self.aliases[nodeid]
152
            else:
153
                return ua.NodeId(getattr(ua.ObjectIds, nodeid))
154
155 1
    def add_object(self, obj):
156 1
        node = self._get_node(obj)
157 1
        attrs = ua.ObjectAttributes()
158 1
        if obj.desc:
159 1
            attrs.Description = ua.LocalizedText(obj.desc)
160 1
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
161 1
        attrs.EventNotifier = obj.eventnotifier
162 1 View Code Duplication
        node.NodeAttributes = attrs
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
163 1
        res = self._add_node(node)
164 1
        self._add_refs(obj)
165 1
        res[0].StatusCode.check()
166 1
        return res[0].AddedNodeId
167
168 1
    def add_object_type(self, obj):
169 1
        node = self._get_node(obj)
170 1
        attrs = ua.ObjectTypeAttributes()
171 1
        if obj.desc:
172
            attrs.Description = ua.LocalizedText(obj.desc)
173 1
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
174 1
        attrs.IsAbstract = obj.abstract
175 1
        node.NodeAttributes = attrs
176 1
        res = self._add_node(node)
177 1
        self._add_refs(obj)
178 1
        res[0].StatusCode.check()
179 1
        return res[0].AddedNodeId
180
181 1
    def add_variable(self, obj):
182 1
        node = self._get_node(obj)
183 1
        attrs = ua.VariableAttributes()
184 1
        if obj.desc:
185 1
            attrs.Description = ua.LocalizedText(obj.desc)
186 1
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
187 1
        attrs.DataType = self.to_nodeid(obj.datatype)
188 1
        if obj.value is not None:
189 1
            attrs.Value = self._add_variable_value(obj,)
190 1
        if obj.rank:
191 1
            attrs.ValueRank = obj.rank
192 1
        if obj.accesslevel:
193
            attrs.AccessLevel = obj.accesslevel
194 1
        if obj.useraccesslevel:
195
            attrs.UserAccessLevel = obj.useraccesslevel
196 1
        if obj.minsample:
197
            attrs.MinimumSamplingInterval = obj.minsample
198 1
        if obj.dimensions:
199 1
            attrs.ArrayDimensions = obj.dimensions
200 1
        node.NodeAttributes = attrs
201 1
        res = self._add_node(node)
202 1
        self._add_refs(obj)
203 1
        res[0].StatusCode.check()
204 1
        return res[0].AddedNodeId
205
206 1
    def _make_ext_obj(self, obj):
207 1
        ext = getattr(ua, obj.objname)()
208 1
        for name, val in obj.body:
209 1
            if isinstance(val, str):
210
                raise Exception("Error val should a dict", name, val)
211
            else:
212 1
                for attname, v in val:
213 1
                    self._set_attr(ext, attname, v)
214 1
        return ext
215
216 1
    def _set_attr(self, obj, attname, val):
217
        # tow possible values:
218
        # either we get value directly
219
        # or a dict if it s an object or a list
220 1
        if isinstance(val, str):
221 1
            pval = xmlparser.ua_type_to_python(val, obj.ua_types[attname])
222 1
            setattr(obj, attname, pval)
223
        else:
224
            # so we have either an object or a list...
225 1
            obj2 = getattr(obj, attname)
226 1
            if isinstance(obj2, ua.NodeId):  # NodeId representation does not follow common rules!!
227 1
                for attname2, v2 in val:
228 1
                    if attname2 == "Identifier":
229 1
                        obj2 = ua.NodeId.from_string(v2)
230 1
                        setattr(obj, attname, obj2)
231 1
                        break
232 1
            elif not isinstance(obj2, ua.NodeId) and not hasattr(obj2, "ua_types"):
233
                # we probably have a list
234 1
                my_list = []
235 1
                for vtype, v2 in val:
236 1
                    my_list.append(xmlparser.ua_type_to_python(v2, vtype))
237 1
                setattr(obj, attname, my_list)
238
            else:
239 1
                for attname2, v2 in val:
240 1
                    self._set_attr(obj2, attname2, v2)
241 1
                setattr(obj, attname, obj2)
242
243 1
    def _add_variable_value(self, obj):
244
        """
245
        Returns the value for a Variable based on the objects value type.
246
        """
247 1
        self.logger.debug("Setting value with type %s and value %s", obj.valuetype, obj.value)
248 1
        if obj.valuetype == 'ListOfExtensionObject':
249 1
            values = []
250 1
            for ext in obj.value:
251 1
                extobj = self._make_ext_obj(ext)
252 1
                values.append(extobj)
253 1
            return ua.Variant(values, ua.VariantType.ExtensionObject)
254 1
        elif obj.valuetype == 'ListOfGuid':
255 1
            return ua.Variant([
256
                uuid.UUID(guid) for guid in obj.value
257
            ], getattr(ua.VariantType, obj.valuetype[6:]))
258 1
        elif obj.valuetype.startswith("ListOf"):
259 1
            vtype = obj.valuetype[6:]
260 1
            if hasattr(ua.ua_binary.Primitives, vtype):
261 1
                return ua.Variant(obj.value, getattr(ua.VariantType, vtype))
262
            else:
263 1
                return ua.Variant([getattr(ua, vtype)(v) for v in obj.value])
264 1
        elif obj.valuetype == 'ExtensionObject':
265 1
            extobj = self._make_ext_obj(obj.value)
266 1
            return ua.Variant(extobj, getattr(ua.VariantType, obj.valuetype))
267 1
        elif obj.valuetype == 'Guid':
268 1
            return ua.Variant(uuid.UUID(obj.value), getattr(ua.VariantType, obj.valuetype))
269 1
        elif obj.valuetype == 'LocalizedText':
270 1
            ltext = ua.LocalizedText()
271 1
            for name, val in obj.value:
272 1
                if name == "Text":
273 1
                    ltext.Text = val.encode("utf-8")
274
                else:
275
                    self.logger.warning("While parsing localizedText value, unkown element: %s with val: %s", name, val)
276 1
            return ua.Variant(ltext, ua.VariantType.LocalizedText)
277 1
        elif obj.valuetype == 'NodeId':
278 1
            return ua.Variant(ua.NodeId.from_string(obj.value))
279
        else:
280 1
            return ua.Variant(obj.value, getattr(ua.VariantType, obj.valuetype))
281
282 1
    def add_variable_type(self, obj):
283
        node = self._get_node(obj)
284
        attrs = ua.VariableTypeAttributes()
285
        if obj.desc:
286
            attrs.Description = ua.LocalizedText(obj.desc)
287
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
288
        attrs.DataType = self.to_nodeid(obj.datatype)
289
        if obj.value and len(obj.value) == 1:
290
            attrs.Value = obj.value[0]
291
        if obj.rank:
292
            attrs.ValueRank = obj.rank
293
        if obj.abstract:
294
            attrs.IsAbstract = obj.abstract
295
        if obj.dimensions:
296
            attrs.ArrayDimensions = obj.dimensions
297 View Code Duplication
        node.NodeAttributes = attrs
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
298
        res = self._add_node(node)
299
        self._add_refs(obj)
300
        res[0].StatusCode.check()
301
        return res[0].AddedNodeId
302
303 1
    def add_method(self, obj):
304 1
        node = self._get_node(obj)
305 1
        attrs = ua.MethodAttributes()
306 1
        if obj.desc:
307 1
            attrs.Description = ua.LocalizedText(obj.desc)
308 1
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
309 1
        if obj.accesslevel:
310
            attrs.AccessLevel = obj.accesslevel
311 1
        if obj.useraccesslevel:
312
            attrs.UserAccessLevel = obj.useraccesslevel
313 1
        if obj.minsample:
314
            attrs.MinimumSamplingInterval = obj.minsample
315 1
        if obj.dimensions:
316
            attrs.ArrayDimensions = obj.dimensions
317 1 View Code Duplication
        node.NodeAttributes = attrs
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
318 1
        res = self._add_node(node)
319 1
        self._add_refs(obj)
320 1
        res[0].StatusCode.check()
321 1
        return res[0].AddedNodeId
322
323 1
    def add_reference_type(self, obj):
324
        node = self._get_node(obj)
325
        attrs = ua.ReferenceTypeAttributes()
326
        if obj.desc:
327
            attrs.Description = ua.LocalizedText(obj.desc)
328
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
329
        if obj. inversename:
330
            attrs.InverseName = ua.LocalizedText(obj.inversename)
331
        if obj.abstract:
332
            attrs.IsAbstract = obj.abstract
333
        if obj.symmetric:
334
            attrs.Symmetric = obj.symmetric
335 View Code Duplication
        node.NodeAttributes = attrs
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
336
        res = self._add_node(node)
337
        self._add_refs(obj)
338
        res[0].StatusCode.check()
339
        return res[0].AddedNodeId
340
341 1
    def add_datatype(self, obj):
342 1
        node = self._get_node(obj)
343 1
        attrs = ua.DataTypeAttributes()
344 1
        if obj.desc:
345 1
            attrs.Description = ua.LocalizedText(obj.desc)
346 1
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
347 1
        if obj.abstract:
348
            attrs.IsAbstract = obj.abstract
349 1
        node.NodeAttributes = attrs
350 1
        res = self._add_node(node)
351 1
        self._add_refs(obj)
352 1
        res[0].StatusCode.check()
353 1
        return res[0].AddedNodeId
354
355 1
    def _add_refs(self, obj):
356 1
        if not obj.refs:
357 1
            return
358 1
        refs = []
359 1
        for data in obj.refs:
360 1
            ref = ua.AddReferencesItem()
361 1
            ref.IsForward = True
362 1
            ref.ReferenceTypeId = self.to_nodeid(data.reftype)
363 1
            ref.SourceNodeId = self._migrate_ns(obj.nodeid)
364 1
            ref.TargetNodeClass = ua.NodeClass.DataType
365 1
            ref.TargetNodeId = self._migrate_ns(self.to_nodeid(data.target))
366 1
            refs.append(ref)
367 1
        self._add_references(refs)
368
369 1
    def _sort_nodes_by_parentid(self, ndatas):
370
        """
371
        Sort the list of nodes according their parent node in order to respect
372
        the dependency between nodes.
373
374
        :param nodes: list of NodeDataObjects
375
        :returns: list of sorted nodes
376
        """
377 1
        _ndatas = list(ndatas)
378
        # list of node ids that are already sorted / inserted
379 1
        sorted_nodes_ids = []
380
        # list of sorted nodes (i.e. XML Elements)
381 1
        sorted_ndatas = []
382 1
        all_node_ids = [data.nodeid for data in ndatas]
383
        # list of namespace indexes that are relevant for this import
384
        # we can only respect ordering nodes for namespaces indexes that
385
        # are defined in the xml file itself. Thus we assume that all other
386
        # references namespaces are already known to the server and should
387
        # not create any dependency problems (like "NodeNotFound")
388 1
        while len(_ndatas) > 0:
389 1
            pop_nodes = []
390 1
            for ndata in _ndatas:
391
                # Insert nodes that
392
                #   (1) have no parent / parent_ns is None (e.g. namespace 0)
393
                #   (2) ns is not in list of relevant namespaces
394 1
                if ndata.nodeid.NamespaceIndex not in self.namespaces or \
395
                        ndata.parent is None or \
396
                        ndata.parent not in all_node_ids:
397 1
                    sorted_ndatas.append(ndata)
398 1
                    sorted_nodes_ids.append(ndata.nodeid)
399 1
                    pop_nodes.append(ndata)
400
                else:
401
                    # Check if the nodes parent is already in the list of
402
                    # inserted nodes
403 1
                    if ndata.parent in sorted_nodes_ids:
404 1
                        sorted_ndatas.append(ndata)
405 1
                        sorted_nodes_ids.append(ndata.nodeid)
406 1
                        pop_nodes.append(ndata)
407
            # Remove inserted nodes from the list
408 1
            for ndata in pop_nodes:
409 1
                _ndatas.pop(_ndatas.index(ndata))
410
        return sorted_ndatas
411