Passed
Push — master ( 7e6c7f...0b26fe )
by Olivier
04:39
created

XmlImporter._add_node_data()   C

Complexity

Conditions 8

Size

Total Lines 18

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 14
CRAP Score 8.3518

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 8
dl 0
loc 18
ccs 14
cts 17
cp 0.8235
crap 8.3518
rs 6.6666
c 1
b 0
f 0
1
"""
2
add nodes defined in XML to address space
3
format is the one from opc-ua specification
4
"""
5 1
import logging
6 1
import uuid
7 1
from copy import copy
8
9 1
import opcua
10 1
from opcua import ua
11 1
from opcua.common import xmlparser
12
13 1
import sys
14
15 1
if sys.version_info.major > 2:
16
    unicode = str
17
18 1
class XmlImporter(object):
19
20 1
    def __init__(self, server):
21 1
        self.logger = logging.getLogger(__name__)
22 1
        self.parser = None
23 1
        self.server = server
24 1
        self.namespaces = {}
25 1
        self.aliases = {}
26
27 1
    def _map_namespaces(self, namespaces_uris):
28
        """
29
        creates a mapping between the namespaces in the xml file and in the server.
30
        if not present the namespace is registered.
31
        """
32 1
        namespaces = {}
33 1
        for ns_index, ns_uri in enumerate(namespaces_uris):
34 1
            ns_server_index = self.server.register_namespace(ns_uri)
35 1
            namespaces[ns_index + 1] = ns_server_index
36 1
        return namespaces
37
38 1
    def _map_aliases(self, aliases):
39
        """
40
        maps the import aliases to the correct namespaces        
41
        """
42 1
        aliases_mapped = {}
43 1
        for alias, node_id in aliases.items():
44 1
            aliases_mapped[alias] = self._migrate_ns(self.to_nodeid(node_id))
45 1
        return aliases_mapped
46
47 1
    def import_xml(self, xmlpath):
48
        """
49
        import xml and return added nodes
50
        """
51 1
        self.logger.info("Importing XML file %s", xmlpath)
52 1
        self.parser = xmlparser.XMLParser(xmlpath)
53
54 1
        self.namespaces = self._map_namespaces(self.parser.get_used_namespaces())
55 1
        self.aliases = self._map_aliases(self.parser.get_aliases())
56
57 1
        dnodes = self.parser.get_node_datas()
58 1
        dnodes = self.make_objects(dnodes)
59 1
        nodes_parsed = self._sort_nodes_by_parentid(dnodes)
60
61 1
        nodes = []
62 1
        for nodedata in nodes_parsed:  # self.parser:
63 1
            try:
64 1
                node = self._add_node_data(nodedata)
65
            except Exception:
66
                self.logger.warning("failure adding node %s", nodedata)
67
                raise
68 1
            nodes.append(node)
69 1
        return nodes
70
71 1
    def _add_node_data(self, nodedata):
72 1
        if nodedata.nodetype == 'UAObject':
73 1
            node = self.add_object(nodedata)
74 1
        elif nodedata.nodetype == 'UAObjectType':
75 1
            node = self.add_object_type(nodedata)
76 1
        elif nodedata.nodetype == 'UAVariable':
77 1
            node = self.add_variable(nodedata)
78 1
        elif nodedata.nodetype == 'UAVariableType':
79
            node = self.add_variable_type(nodedata)
80 1
        elif nodedata.nodetype == 'UAReferenceType':
81
            node = self.add_reference_type(nodedata)
82 1
        elif nodedata.nodetype == 'UADataType':
83 1
            node = self.add_datatype(nodedata)
84 1
        elif nodedata.nodetype == 'UAMethod':
85 1
            node = self.add_method(nodedata)
86
        else:
87
            self.logger.warning("Not implemented node type: %s ", nodedata.nodetype)
88 1
        return node
89
90 1
    def _add_node(self, node):
91 1
        if isinstance(self.server, opcua.server.server.Server):
92 1
            return self.server.iserver.isession.add_nodes([node])
93
        else:
94 1
            return self.server.uaclient.add_nodes([node])
95
96 1
    def _add_references(self, refs):
97 1
        if isinstance(self.server, opcua.server.server.Server):
98 1
            return self.server.iserver.isession.add_references(refs)
99
        else:
100 1
            return self.server.uaclient.add_references(refs)
101
102 1
    def make_objects(self, node_datas):
103 1
        new_nodes = []
104 1
        for ndata in node_datas:
105 1
            ndata.nodeid = ua.NodeId.from_string(ndata.nodeid)
106 1
            ndata.browsename = ua.QualifiedName.from_string(ndata.browsename)
107 1
            if ndata.parent:
108 1
                ndata.parent = ua.NodeId.from_string(ndata.parent)
109 1
            if ndata.parentlink:
110 1
                ndata.parentlink = self.to_nodeid(ndata.parentlink)
111 1
            if ndata.typedef:
112 1
                ndata.typedef = self.to_nodeid(ndata.typedef)
113 1
            new_nodes.append(ndata)
114 1
        return new_nodes
115
116 1
    def _migrate_ns(self, nodeid):
117
        """
118
        Check if the index of nodeid or browsename  given in the xml model file
119
        must be converted to a already existing namespace id based on the files
120
        namespace uri
121
122
        :returns: NodeId (str)
123
        """
124 1
        if nodeid.NamespaceIndex in self.namespaces:
125 1
            nodeid = copy(nodeid)
126 1
            nodeid.NamespaceIndex = self.namespaces[nodeid.NamespaceIndex]
127 1
        return nodeid
128
129 1
    def _get_node(self, obj):
130 1
        node = ua.AddNodesItem()
131 1
        node.RequestedNewNodeId = self._migrate_ns(obj.nodeid)
132 1
        node.BrowseName = self._migrate_ns(obj.browsename)
133 1
        self.logger.info("Importing xml node (%s, %s) as (%s %s)", obj.browsename, obj.nodeid, node.BrowseName, node.RequestedNewNodeId)
134 1
        node.NodeClass = getattr(ua.NodeClass, obj.nodetype[2:])
135 1
        if obj.parent:
136 1
            node.ParentNodeId = self._migrate_ns(obj.parent)
137 1
        if obj.parentlink:
138 1
            node.ReferenceTypeId = self._migrate_ns(obj.parentlink)
139 1
        if obj.typedef:
140 1
            node.TypeDefinition = self._migrate_ns(obj.typedef)
141 1
        return node
142
143 1
    def to_nodeid(self, nodeid):
144 1
        if isinstance(nodeid, ua.NodeId):
145
            return nodeid
146 1
        elif not nodeid:
147
            return ua.NodeId(ua.ObjectIds.String)
148 1
        elif "=" in nodeid:
149 1 View Code Duplication
            return ua.NodeId.from_string(nodeid)
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
150 1
        elif hasattr(ua.ObjectIds, nodeid):
151 1
            return ua.NodeId(getattr(ua.ObjectIds, nodeid))
152
        else:
153 1
            if nodeid in self.aliases:
154 1
                return self.aliases[nodeid]
155
            else:
156
                return ua.NodeId(getattr(ua.ObjectIds, nodeid))
157
158 1
    def add_object(self, obj):
159 1
        node = self._get_node(obj)
160 1
        attrs = ua.ObjectAttributes()
161 1
        if obj.desc:
162 1 View Code Duplication
            attrs.Description = ua.LocalizedText(obj.desc)
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
163 1
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
164 1
        attrs.EventNotifier = obj.eventnotifier
165 1
        node.NodeAttributes = attrs
166 1
        res = self._add_node(node)
167 1
        self._add_refs(obj)
168 1
        res[0].StatusCode.check()
169 1
        return res[0].AddedNodeId
170
171 1
    def add_object_type(self, obj):
172 1
        node = self._get_node(obj)
173 1
        attrs = ua.ObjectTypeAttributes()
174 1
        if obj.desc:
175
            attrs.Description = ua.LocalizedText(obj.desc)
176 1
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
177 1
        attrs.IsAbstract = obj.abstract
178 1
        node.NodeAttributes = attrs
179 1
        res = self._add_node(node)
180 1
        self._add_refs(obj)
181 1
        res[0].StatusCode.check()
182 1
        return res[0].AddedNodeId
183
184 1
    def add_variable(self, obj):
185 1
        node = self._get_node(obj)
186 1
        attrs = ua.VariableAttributes()
187 1
        if obj.desc:
188 1
            attrs.Description = ua.LocalizedText(obj.desc)
189 1
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
190 1
        attrs.DataType = self.to_nodeid(obj.datatype)
191 1
        if obj.value is not None:
192 1
            attrs.Value = self._add_variable_value(obj,)
193 1
        if obj.rank:
194 1
            attrs.ValueRank = obj.rank
195 1
        if obj.accesslevel:
196
            attrs.AccessLevel = obj.accesslevel
197 1
        if obj.useraccesslevel:
198
            attrs.UserAccessLevel = obj.useraccesslevel
199 1
        if obj.minsample:
200
            attrs.MinimumSamplingInterval = obj.minsample
201 1
        if obj.dimensions:
202 1
            attrs.ArrayDimensions = obj.dimensions
203 1
        node.NodeAttributes = attrs
204 1
        res = self._add_node(node)
205 1
        self._add_refs(obj)
206 1
        res[0].StatusCode.check()
207 1
        return res[0].AddedNodeId
208
209 1
    def _get_ext_class(self, name):
210 1
        if hasattr(ua, name):
211 1
            return  getattr(ua, name)
212
        elif name in self.aliases.keys():
213
            nodeid = self.aliases[name]
214
            class_type = ua.uatypes.get_extensionobject_class_type(nodeid)
215
            if class_type:
216
                return class_type
217
            else:
218
                raise Exception("Error no extension class registered ", name, nodeid)
219
        else:
220
            raise Exception("Error no alias found for extension class", name)
221
222 1
    def _make_ext_obj(self, obj):
223 1
        ext = self._get_ext_class(obj.objname)()
224 1
        for name, val in obj.body:
225 1
            if isinstance(val, str):
226
                raise Exception("Error val should a dict", name, val)
227
            else:
228 1
                for attname, v in val:
229 1
                    self._set_attr(ext, attname, v)
230 1
        return ext
231
232 1
    def _set_attr(self, obj, attname, val):
233
        # tow possible values:
234
        # either we get value directly
235
        # or a dict if it s an object or a list
236 1
        if isinstance(val, (str, unicode)):
237 1
            pval = xmlparser.ua_type_to_python(val, obj.ua_types[attname])
238 1
            setattr(obj, attname, pval)
239
        else:
240
            # so we have either an object or a list...
241 1
            obj2 = getattr(obj, attname)
242 1
            if isinstance(obj2, ua.NodeId):  # NodeId representation does not follow common rules!!
243 1
                for attname2, v2 in val:
244 1
                    if attname2 == "Identifier":
245 1
                        obj2 = ua.NodeId.from_string(v2)
246 1
                        setattr(obj, attname, obj2)
247 1
                        break
248 1
            elif not isinstance(obj2, ua.NodeId) and not hasattr(obj2, "ua_types"):
249
                # we probably have a list
250 1
                my_list = []
251 1
                for vtype, v2 in val:
252 1
                    my_list.append(xmlparser.ua_type_to_python(v2, vtype))
253 1
                setattr(obj, attname, my_list)
254
            else:
255 1
                for attname2, v2 in val:
256 1
                    self._set_attr(obj2, attname2, v2)
257 1
                setattr(obj, attname, obj2)
258
259 1
    def _add_variable_value(self, obj):
260
        """
261
        Returns the value for a Variable based on the objects value type.
262
        """
263 1
        self.logger.debug("Setting value with type %s and value %s", obj.valuetype, obj.value)
264 1
        if obj.valuetype == 'ListOfExtensionObject':
265 1
            values = []
266 1
            for ext in obj.value:
267 1
                extobj = self._make_ext_obj(ext)
268 1
                values.append(extobj)
269 1
            return ua.Variant(values, ua.VariantType.ExtensionObject)
270 1
        elif obj.valuetype == 'ListOfGuid':
271 1
            return ua.Variant([
272
                uuid.UUID(guid) for guid in obj.value
273
            ], getattr(ua.VariantType, obj.valuetype[6:]))
274 1
        elif obj.valuetype.startswith("ListOf"):
275 1
            vtype = obj.valuetype[6:]
276 1
            if hasattr(ua.ua_binary.Primitives, vtype):
277 1
                return ua.Variant(obj.value, getattr(ua.VariantType, vtype))
278
            else:
279 1
                return ua.Variant([getattr(ua, vtype)(v) for v in obj.value])
280 1
        elif obj.valuetype == 'ExtensionObject':
281 1
            extobj = self._make_ext_obj(obj.value)
282 1
            return ua.Variant(extobj, getattr(ua.VariantType, obj.valuetype))
283 1
        elif obj.valuetype == 'Guid':
284 1
            return ua.Variant(uuid.UUID(obj.value), getattr(ua.VariantType, obj.valuetype))
285 1
        elif obj.valuetype == 'LocalizedText':
286 1
            ltext = ua.LocalizedText()
287 1
            for name, val in obj.value:
288 1
                if name == "Text":
289 1
                    ltext.Text = val.encode("utf-8")
290
                else:
291
                    self.logger.warning("While parsing localizedText value, unkown element: %s with val: %s", name, val)
292 1
            return ua.Variant(ltext, ua.VariantType.LocalizedText)
293 1
        elif obj.valuetype == 'NodeId':
294 1
            return ua.Variant(ua.NodeId.from_string(obj.value))
295
        else:
296 1
            return ua.Variant(obj.value, getattr(ua.VariantType, obj.valuetype))
297
298 1
    def add_variable_type(self, obj):
299
        node = self._get_node(obj)
300
        attrs = ua.VariableTypeAttributes()
301
        if obj.desc:
302
            attrs.Description = ua.LocalizedText(obj.desc)
303
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
304
        attrs.DataType = self.to_nodeid(obj.datatype)
305
        if obj.value and len(obj.value) == 1:
306
            attrs.Value = obj.value[0]
307
        if obj.rank:
308
            attrs.ValueRank = obj.rank
309
        if obj.abstract:
310
            attrs.IsAbstract = obj.abstract
311
        if obj.dimensions:
312
            attrs.ArrayDimensions = obj.dimensions
313
        node.NodeAttributes = attrs
314
        res = self._add_node(node)
315
        self._add_refs(obj)
316
        res[0].StatusCode.check()
317
        return res[0].AddedNodeId
318
319 1
    def add_method(self, obj):
320 1
        node = self._get_node(obj)
321 1
        attrs = ua.MethodAttributes()
322 1
        if obj.desc:
323 1
            attrs.Description = ua.LocalizedText(obj.desc)
324 1
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
325 1
        if obj.accesslevel:
326
            attrs.AccessLevel = obj.accesslevel
327 1
        if obj.useraccesslevel:
328
            attrs.UserAccessLevel = obj.useraccesslevel
329 1
        if obj.minsample:
330
            attrs.MinimumSamplingInterval = obj.minsample
331 1
        if obj.dimensions:
332
            attrs.ArrayDimensions = obj.dimensions
333 1
        node.NodeAttributes = attrs
334 1
        res = self._add_node(node)
335 1 View Code Duplication
        self._add_refs(obj)
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
336 1
        res[0].StatusCode.check()
337 1
        return res[0].AddedNodeId
338
339 1
    def add_reference_type(self, obj):
340
        node = self._get_node(obj)
341
        attrs = ua.ReferenceTypeAttributes()
342
        if obj.desc:
343
            attrs.Description = ua.LocalizedText(obj.desc)
344
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
345
        if obj. inversename:
346
            attrs.InverseName = ua.LocalizedText(obj.inversename)
347
        if obj.abstract:
348
            attrs.IsAbstract = obj.abstract
349
        if obj.symmetric:
350
            attrs.Symmetric = obj.symmetric
351
        node.NodeAttributes = attrs
352
        res = self._add_node(node)
353
        self._add_refs(obj)
354
        res[0].StatusCode.check()
355
        return res[0].AddedNodeId
356
357 1
    def add_datatype(self, obj):
358 1
        node = self._get_node(obj)
359 1
        attrs = ua.DataTypeAttributes()
360 1
        if obj.desc:
361 1
            attrs.Description = ua.LocalizedText(obj.desc)
362 1
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
363 1
        if obj.abstract:
364
            attrs.IsAbstract = obj.abstract
365 1
        node.NodeAttributes = attrs
366 1
        res = self._add_node(node)
367 1
        self._add_refs(obj)
368 1
        res[0].StatusCode.check()
369 1
        return res[0].AddedNodeId
370
371 1
    def _add_refs(self, obj):
372 1
        if not obj.refs:
373 1
            return
374 1
        refs = []
375 1
        for data in obj.refs:
376 1
            ref = ua.AddReferencesItem()
377 1
            ref.IsForward = True
378 1
            ref.ReferenceTypeId = self.to_nodeid(data.reftype)
379 1
            ref.SourceNodeId = self._migrate_ns(obj.nodeid)
380 1
            ref.TargetNodeClass = ua.NodeClass.DataType
381 1
            ref.TargetNodeId = self._migrate_ns(self.to_nodeid(data.target))
382 1
            refs.append(ref)
383 1
        self._add_references(refs)
384
385 1
    def _sort_nodes_by_parentid(self, ndatas):
386
        """
387
        Sort the list of nodes according their parent node in order to respect
388
        the dependency between nodes.
389
390
        :param nodes: list of NodeDataObjects
391
        :returns: list of sorted nodes
392
        """
393 1
        _ndatas = list(ndatas)
394
        # list of node ids that are already sorted / inserted
395 1
        sorted_nodes_ids = []
396
        # list of sorted nodes (i.e. XML Elements)
397 1
        sorted_ndatas = []
398 1
        all_node_ids = [data.nodeid for data in ndatas]
399
        # list of namespace indexes that are relevant for this import
400
        # we can only respect ordering nodes for namespaces indexes that
401
        # are defined in the xml file itself. Thus we assume that all other
402
        # references namespaces are already known to the server and should
403
        # not create any dependency problems (like "NodeNotFound")
404 1
        while len(_ndatas) > 0:
405 1
            pop_nodes = []
406 1
            for ndata in _ndatas:
407
                # Insert nodes that
408
                #   (1) have no parent / parent_ns is None (e.g. namespace 0)
409
                #   (2) ns is not in list of relevant namespaces
410 1
                if ndata.nodeid.NamespaceIndex not in self.namespaces or \
411
                        ndata.parent is None or \
412
                        ndata.parent not in all_node_ids:
413 1
                    sorted_ndatas.append(ndata)
414 1
                    sorted_nodes_ids.append(ndata.nodeid)
415 1
                    pop_nodes.append(ndata)
416
                else:
417
                    # Check if the nodes parent is already in the list of
418
                    # inserted nodes
419 1
                    if ndata.parent in sorted_nodes_ids:
420 1
                        sorted_ndatas.append(ndata)
421 1
                        sorted_nodes_ids.append(ndata.nodeid)
422 1
                        pop_nodes.append(ndata)
423
            # Remove inserted nodes from the list
424 1
            for ndata in pop_nodes:
425 1
                _ndatas.pop(_ndatas.index(ndata))
426
        return sorted_ndatas
427