Completed
Pull Request — master (#490)
by Olivier
04:31
created

XmlImporter   F

Complexity

Total Complexity 116

Size/Duplication

Total Lines 418
Duplicated Lines 8.85 %

Test Coverage

Coverage 80.67%

Importance

Changes 6
Bugs 0 Features 1
Metric Value
c 6
b 0
f 1
dl 37
loc 418
ccs 263
cts 326
cp 0.8067
rs 1.5789
wmc 116

25 Methods

Rating   Name   Duplication   Size   Complexity  
D add_variable() 0 24 8
B to_nodeid() 9 14 6
A _add_node() 0 5 2
A _map_namespaces() 0 10 2
C _add_node_data() 0 18 8
A __init__() 0 6 1
A _add_references() 0 5 2
B add_variable_type() 0 20 7
A add_datatype() 0 13 3
A _add_refs() 0 13 3
B add_reference_type() 0 17 5
F _add_variable_value() 0 38 14
A _get_node() 0 13 4
A _map_aliases() 0 8 2
A _get_ext_class() 0 12 4
A add_object_type() 2 12 2
A _migrate_ns() 0 12 2
A add_object() 11 12 2
F _sort_nodes_by_parentid() 0 42 9
A import_xml() 0 23 3
A _make_ext_obj() 0 9 4
B make_objects() 0 13 5
A _get_val_type() 0 5 3
F _set_attr() 0 29 9
B add_method() 13 19 6

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complex Class

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like XmlImporter often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
add nodes defined in XML to address space
3
format is the one from opc-ua specification
4
"""
5 1
import logging
6 1
import uuid
7 1
from copy import copy
8
9 1
import opcua
10 1
from opcua import ua
11 1
from opcua.common import xmlparser
12 1
from opcua.ua.uaerrors import UaError
13
14 1
import sys
15
16 1
if sys.version_info.major > 2:
17
    unicode = str
18
19 1
class XmlImporter(object):
20
21 1
    def __init__(self, server):
22 1
        self.logger = logging.getLogger(__name__)
23 1
        self.parser = None
24 1
        self.server = server
25 1
        self.namespaces = {}
26 1
        self.aliases = {}
27
28 1
    def _map_namespaces(self, namespaces_uris):
29
        """
30
        creates a mapping between the namespaces in the xml file and in the server.
31
        if not present the namespace is registered.
32
        """
33 1
        namespaces = {}
34 1
        for ns_index, ns_uri in enumerate(namespaces_uris):
35 1
            ns_server_index = self.server.register_namespace(ns_uri)
36 1
            namespaces[ns_index + 1] = ns_server_index
37 1
        return namespaces
38
39 1
    def _map_aliases(self, aliases):
40
        """
41
        maps the import aliases to the correct namespaces        
42
        """
43 1
        aliases_mapped = {}
44 1
        for alias, node_id in aliases.items():
45 1
            aliases_mapped[alias] = self._migrate_ns(self.to_nodeid(node_id))
46 1
        return aliases_mapped
47
48 1
    def import_xml(self, xmlpath):
49
        """
50
        import xml and return added nodes
51
        """
52 1
        self.logger.info("Importing XML file %s", xmlpath)
53 1
        self.parser = xmlparser.XMLParser(xmlpath)
54
55 1
        self.namespaces = self._map_namespaces(self.parser.get_used_namespaces())
56 1
        self.aliases = self._map_aliases(self.parser.get_aliases())
57
58 1
        dnodes = self.parser.get_node_datas()
59 1
        dnodes = self.make_objects(dnodes)
60 1
        nodes_parsed = self._sort_nodes_by_parentid(dnodes)
61
62 1
        nodes = []
63 1
        for nodedata in nodes_parsed:  # self.parser:
64 1
            try:
65 1
                node = self._add_node_data(nodedata)
66
            except Exception:
67
                self.logger.warning("failure adding node %s", nodedata)
68
                raise
69 1
            nodes.append(node)
70 1
        return nodes
71
72 1
    def _add_node_data(self, nodedata):
73 1
        if nodedata.nodetype == 'UAObject':
74 1
            node = self.add_object(nodedata)
75 1
        elif nodedata.nodetype == 'UAObjectType':
76 1
            node = self.add_object_type(nodedata)
77 1
        elif nodedata.nodetype == 'UAVariable':
78 1
            node = self.add_variable(nodedata)
79 1
        elif nodedata.nodetype == 'UAVariableType':
80
            node = self.add_variable_type(nodedata)
81 1
        elif nodedata.nodetype == 'UAReferenceType':
82
            node = self.add_reference_type(nodedata)
83 1
        elif nodedata.nodetype == 'UADataType':
84 1
            node = self.add_datatype(nodedata)
85 1
        elif nodedata.nodetype == 'UAMethod':
86 1
            node = self.add_method(nodedata)
87
        else:
88
            self.logger.warning("Not implemented node type: %s ", nodedata.nodetype)
89 1
        return node
90
91 1
    def _add_node(self, node):
92 1
        if isinstance(self.server, opcua.server.server.Server):
93 1
            return self.server.iserver.isession.add_nodes([node])
94
        else:
95 1
            return self.server.uaclient.add_nodes([node])
96
97 1
    def _add_references(self, refs):
98 1
        if isinstance(self.server, opcua.server.server.Server):
99 1
            return self.server.iserver.isession.add_references(refs)
100
        else:
101 1
            return self.server.uaclient.add_references(refs)
102
103 1
    def make_objects(self, node_datas):
104 1
        new_nodes = []
105 1
        for ndata in node_datas:
106 1
            ndata.nodeid = ua.NodeId.from_string(ndata.nodeid)
107 1
            ndata.browsename = ua.QualifiedName.from_string(ndata.browsename)
108 1
            if ndata.parent:
109 1
                ndata.parent = ua.NodeId.from_string(ndata.parent)
110 1
            if ndata.parentlink:
111 1
                ndata.parentlink = self.to_nodeid(ndata.parentlink)
112 1
            if ndata.typedef:
113 1
                ndata.typedef = self.to_nodeid(ndata.typedef)
114 1
            new_nodes.append(ndata)
115 1
        return new_nodes
116
117 1
    def _migrate_ns(self, nodeid):
118
        """
119
        Check if the index of nodeid or browsename  given in the xml model file
120
        must be converted to a already existing namespace id based on the files
121
        namespace uri
122
123
        :returns: NodeId (str)
124
        """
125 1
        if nodeid.NamespaceIndex in self.namespaces:
126 1
            nodeid = copy(nodeid)
127 1
            nodeid.NamespaceIndex = self.namespaces[nodeid.NamespaceIndex]
128 1
        return nodeid
129
130 1
    def _get_node(self, obj):
131 1
        node = ua.AddNodesItem()
132 1
        node.RequestedNewNodeId = self._migrate_ns(obj.nodeid)
133 1
        node.BrowseName = self._migrate_ns(obj.browsename)
134 1
        self.logger.info("Importing xml node (%s, %s) as (%s %s)", obj.browsename, obj.nodeid, node.BrowseName, node.RequestedNewNodeId)
135 1
        node.NodeClass = getattr(ua.NodeClass, obj.nodetype[2:])
136 1
        if obj.parent:
137 1
            node.ParentNodeId = self._migrate_ns(obj.parent)
138 1
        if obj.parentlink:
139 1
            node.ReferenceTypeId = self._migrate_ns(obj.parentlink)
140 1
        if obj.typedef:
141 1
            node.TypeDefinition = self._migrate_ns(obj.typedef)
142 1
        return node
143
144 1
    def to_nodeid(self, nodeid):
145 1
        if isinstance(nodeid, ua.NodeId):
146
            return nodeid
147 1
        elif not nodeid:
148
            return ua.NodeId(ua.ObjectIds.String)
149 1 View Code Duplication
        elif "=" in nodeid:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
150 1
            return ua.NodeId.from_string(nodeid)
151 1
        elif hasattr(ua.ObjectIds, nodeid):
152 1
            return ua.NodeId(getattr(ua.ObjectIds, nodeid))
153
        else:
154 1
            if nodeid in self.aliases:
155 1
                return self.aliases[nodeid]
156
            else:
157
                return ua.NodeId(getattr(ua.ObjectIds, nodeid))
158
159 1
    def add_object(self, obj):
160 1
        node = self._get_node(obj)
161 1
        attrs = ua.ObjectAttributes()
162 1 View Code Duplication
        if obj.desc:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
163 1
            attrs.Description = ua.LocalizedText(obj.desc)
164 1
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
165 1
        attrs.EventNotifier = obj.eventnotifier
166 1
        node.NodeAttributes = attrs
167 1
        res = self._add_node(node)
168 1
        self._add_refs(obj)
169 1
        res[0].StatusCode.check()
170 1
        return res[0].AddedNodeId
171
172 1
    def add_object_type(self, obj):
173 1
        node = self._get_node(obj)
174 1
        attrs = ua.ObjectTypeAttributes()
175 1
        if obj.desc:
176
            attrs.Description = ua.LocalizedText(obj.desc)
177 1
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
178 1
        attrs.IsAbstract = obj.abstract
179 1
        node.NodeAttributes = attrs
180 1
        res = self._add_node(node)
181 1
        self._add_refs(obj)
182 1
        res[0].StatusCode.check()
183 1
        return res[0].AddedNodeId
184
185 1
    def add_variable(self, obj):
186 1
        node = self._get_node(obj)
187 1
        attrs = ua.VariableAttributes()
188 1
        if obj.desc:
189 1
            attrs.Description = ua.LocalizedText(obj.desc)
190 1
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
191 1
        attrs.DataType = self.to_nodeid(obj.datatype)
192 1
        if obj.value is not None:
193 1
            attrs.Value = self._add_variable_value(obj,)
194 1
        if obj.rank:
195 1
            attrs.ValueRank = obj.rank
196 1
        if obj.accesslevel:
197
            attrs.AccessLevel = obj.accesslevel
198 1
        if obj.useraccesslevel:
199
            attrs.UserAccessLevel = obj.useraccesslevel
200 1
        if obj.minsample:
201
            attrs.MinimumSamplingInterval = obj.minsample
202 1
        if obj.dimensions:
203 1
            attrs.ArrayDimensions = obj.dimensions
204 1
        node.NodeAttributes = attrs
205 1
        res = self._add_node(node)
206 1
        self._add_refs(obj)
207 1
        res[0].StatusCode.check()
208 1
        return res[0].AddedNodeId
209
210 1
    def _get_ext_class(self, name):
211 1
        if hasattr(ua, name):
212 1
            return  getattr(ua, name)
213
        elif name in self.aliases.keys():
214
            nodeid = self.aliases[name]
215
            class_type = ua.uatypes.get_extensionobject_class_type(nodeid)
216
            if class_type:
217
                return class_type
218
            else:
219
                raise Exception("Error no extension class registered ", name, nodeid)
220
        else:
221
            raise Exception("Error no alias found for extension class", name)
222
223 1
    def _make_ext_obj(self, obj):
224 1
        ext = self._get_ext_class(obj.objname)()
225 1
        for name, val in obj.body:
226 1
            if not isinstance(val, list):
227
                raise Exception("Error val should be a list, this is a python-opcua bug", name, type(val), val)
228
            else:
229 1
                for attname, v in val:
230 1
                    self._set_attr(ext, attname, v)
231 1
        return ext
232
233 1
    def _get_val_type(self, obj, attname):
234 1
        for name, uatype in obj.ua_types:
235 1
            if name == attname:
236 1
                return uatype
237
        raise UaError("Attribute '{}' defined in xml is not found in object '{}'".format(attname, ext))
238
239 1
    def _set_attr(self, obj, attname, val):
240
        # tow possible values:
241
        # either we get value directly
242
        # or a dict if it s an object or a list
243 1
        if isinstance(val, (str, unicode)):
244 1
            pval = xmlparser.ua_type_to_python(val, self._get_val_type(obj, attname))
245 1
            setattr(obj, attname, pval)
246
        else:
247
            # so we have either an object or a list...
248 1
            obj2 = getattr(obj, attname)
249 1
            if isinstance(obj2, ua.NodeId):  # NodeId representation does not follow common rules!!
250 1
                for attname2, v2 in val:
251 1
                    if attname2 == "Identifier":
252 1
                        if hasattr(ua.ObjectIds, v2):
253
                            obj2 = ua.NodeId(getattr(ua.ObjectIds, v2))
254
                        else:
255 1
                            obj2 = ua.NodeId.from_string(v2)
256 1
                        setattr(obj, attname, obj2)
257 1
                        break
258 1
            elif not hasattr(obj2, "ua_types"):
259
                # we probably have a list
260 1
                my_list = []
261 1
                for vtype, v2 in val:
262 1
                    my_list.append(xmlparser.ua_type_to_python(v2, vtype))
263 1
                setattr(obj, attname, my_list)
264
            else:
265 1
                for attname2, v2 in val:
266 1
                    self._set_attr(obj2, attname2, v2)
267 1
                setattr(obj, attname, obj2)
268
269 1
    def _add_variable_value(self, obj):
270
        """
271
        Returns the value for a Variable based on the objects value type.
272
        """
273 1
        self.logger.debug("Setting value with type %s and value %s", obj.valuetype, obj.value)
274 1
        if obj.valuetype == 'ListOfExtensionObject':
275 1
            values = []
276 1
            for ext in obj.value:
277 1
                extobj = self._make_ext_obj(ext)
278 1
                values.append(extobj)
279 1
            return ua.Variant(values, ua.VariantType.ExtensionObject)
280 1
        elif obj.valuetype == 'ListOfGuid':
281 1
            return ua.Variant([
282
                uuid.UUID(guid) for guid in obj.value
283
            ], getattr(ua.VariantType, obj.valuetype[6:]))
284 1
        elif obj.valuetype.startswith("ListOf"):
285 1
            vtype = obj.valuetype[6:]
286 1
            if hasattr(ua.ua_binary.Primitives, vtype):
287 1
                return ua.Variant(obj.value, getattr(ua.VariantType, vtype))
288
            else:
289 1
                return ua.Variant([getattr(ua, vtype)(v) for v in obj.value])
290 1
        elif obj.valuetype == 'ExtensionObject':
291 1
            extobj = self._make_ext_obj(obj.value)
292 1
            return ua.Variant(extobj, getattr(ua.VariantType, obj.valuetype))
293 1
        elif obj.valuetype == 'Guid':
294 1
            return ua.Variant(uuid.UUID(obj.value), getattr(ua.VariantType, obj.valuetype))
295 1
        elif obj.valuetype == 'LocalizedText':
296 1
            ltext = ua.LocalizedText()
297 1
            for name, val in obj.value:
298 1
                if name == "Text":
299 1
                    ltext.Text = val
300
                else:
301 1
                    self.logger.warning("While parsing localizedText value, unkown element: %s with val: %s", name, val)
302 1
            return ua.Variant(ltext, ua.VariantType.LocalizedText)
303 1
        elif obj.valuetype == 'NodeId':
304 1
            return ua.Variant(ua.NodeId.from_string(obj.value))
305
        else:
306 1
            return ua.Variant(obj.value, getattr(ua.VariantType, obj.valuetype))
307
308 1
    def add_variable_type(self, obj):
309
        node = self._get_node(obj)
310
        attrs = ua.VariableTypeAttributes()
311
        if obj.desc:
312
            attrs.Description = ua.LocalizedText(obj.desc)
313
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
314
        attrs.DataType = self.to_nodeid(obj.datatype)
315
        if obj.value and len(obj.value) == 1:
316
            attrs.Value = obj.value[0]
317
        if obj.rank:
318
            attrs.ValueRank = obj.rank
319
        if obj.abstract:
320
            attrs.IsAbstract = obj.abstract
321
        if obj.dimensions:
322
            attrs.ArrayDimensions = obj.dimensions
323
        node.NodeAttributes = attrs
324
        res = self._add_node(node)
325
        self._add_refs(obj)
326
        res[0].StatusCode.check()
327
        return res[0].AddedNodeId
328
329 1
    def add_method(self, obj):
330 1
        node = self._get_node(obj)
331 1
        attrs = ua.MethodAttributes()
332 1
        if obj.desc:
333 1
            attrs.Description = ua.LocalizedText(obj.desc)
334 1
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
335 1 View Code Duplication
        if obj.accesslevel:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
336
            attrs.AccessLevel = obj.accesslevel
337 1
        if obj.useraccesslevel:
338
            attrs.UserAccessLevel = obj.useraccesslevel
339 1
        if obj.minsample:
340
            attrs.MinimumSamplingInterval = obj.minsample
341 1
        if obj.dimensions:
342
            attrs.ArrayDimensions = obj.dimensions
343 1
        node.NodeAttributes = attrs
344 1
        res = self._add_node(node)
345 1
        self._add_refs(obj)
346 1
        res[0].StatusCode.check()
347 1
        return res[0].AddedNodeId
348
349 1
    def add_reference_type(self, obj):
350
        node = self._get_node(obj)
351
        attrs = ua.ReferenceTypeAttributes()
352
        if obj.desc:
353
            attrs.Description = ua.LocalizedText(obj.desc)
354
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
355
        if obj. inversename:
356
            attrs.InverseName = ua.LocalizedText(obj.inversename)
357
        if obj.abstract:
358
            attrs.IsAbstract = obj.abstract
359
        if obj.symmetric:
360
            attrs.Symmetric = obj.symmetric
361
        node.NodeAttributes = attrs
362
        res = self._add_node(node)
363
        self._add_refs(obj)
364
        res[0].StatusCode.check()
365
        return res[0].AddedNodeId
366
367 1
    def add_datatype(self, obj):
368 1
        node = self._get_node(obj)
369 1
        attrs = ua.DataTypeAttributes()
370 1
        if obj.desc:
371 1
            attrs.Description = ua.LocalizedText(obj.desc)
372 1
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
373 1
        if obj.abstract:
374
            attrs.IsAbstract = obj.abstract
375 1
        node.NodeAttributes = attrs
376 1
        res = self._add_node(node)
377 1
        self._add_refs(obj)
378 1
        res[0].StatusCode.check()
379 1
        return res[0].AddedNodeId
380
381 1
    def _add_refs(self, obj):
382 1
        if not obj.refs:
383 1
            return
384 1
        refs = []
385 1
        for data in obj.refs:
386 1
            ref = ua.AddReferencesItem()
387 1
            ref.IsForward = True
388 1
            ref.ReferenceTypeId = self.to_nodeid(data.reftype)
389 1
            ref.SourceNodeId = self._migrate_ns(obj.nodeid)
390 1
            ref.TargetNodeClass = ua.NodeClass.DataType
391 1
            ref.TargetNodeId = self._migrate_ns(self.to_nodeid(data.target))
392 1
            refs.append(ref)
393 1
        self._add_references(refs)
394
395 1
    def _sort_nodes_by_parentid(self, ndatas):
396
        """
397
        Sort the list of nodes according their parent node in order to respect
398
        the dependency between nodes.
399
400
        :param nodes: list of NodeDataObjects
401
        :returns: list of sorted nodes
402
        """
403 1
        _ndatas = list(ndatas)
404
        # list of node ids that are already sorted / inserted
405 1
        sorted_nodes_ids = []
406
        # list of sorted nodes (i.e. XML Elements)
407 1
        sorted_ndatas = []
408 1
        all_node_ids = [data.nodeid for data in ndatas]
409
        # list of namespace indexes that are relevant for this import
410
        # we can only respect ordering nodes for namespaces indexes that
411
        # are defined in the xml file itself. Thus we assume that all other
412
        # references namespaces are already known to the server and should
413
        # not create any dependency problems (like "NodeNotFound")
414 1
        while len(_ndatas) > 0:
415 1
            pop_nodes = []
416 1
            for ndata in _ndatas:
417
                # Insert nodes that
418
                #   (1) have no parent / parent_ns is None (e.g. namespace 0)
419
                #   (2) ns is not in list of relevant namespaces
420 1
                if ndata.nodeid.NamespaceIndex not in self.namespaces or \
421
                        ndata.parent is None or \
422
                        ndata.parent not in all_node_ids:
423 1
                    sorted_ndatas.append(ndata)
424 1
                    sorted_nodes_ids.append(ndata.nodeid)
425 1
                    pop_nodes.append(ndata)
426
                else:
427
                    # Check if the nodes parent is already in the list of
428
                    # inserted nodes
429 1
                    if ndata.parent in sorted_nodes_ids:
430 1
                        sorted_ndatas.append(ndata)
431 1
                        sorted_nodes_ids.append(ndata.nodeid)
432 1
                        pop_nodes.append(ndata)
433
            # Remove inserted nodes from the list
434 1
            for ndata in pop_nodes:
435 1
                _ndatas.pop(_ndatas.index(ndata))
436
        return sorted_ndatas
437