Completed
Pull Request — master (#379)
by
unknown
04:12
created

XmlImporter._add_variable_value()   F

Complexity

Conditions 14

Size

Total Lines 38

Duplication

Lines 12
Ratio 31.58 %

Code Coverage

Tests 29
CRAP Score 14.0072

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 14
c 1
b 0
f 0
dl 12
loc 38
ccs 29
cts 30
cp 0.9667
crap 14.0072
rs 2.7581

How to fix   Complexity   

Complexity

Complex classes like XmlImporter._add_variable_value() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
add nodes defined in XML to address space
3
format is the one from opc-ua specification
4
"""
5 1
import logging
6 1
import uuid
7 1
from copy import copy
8
9 1
import dateutil.parser
10
11 1
from opcua import ua
12 1
from opcua.common import xmlparser
13
14
15 1
class XmlImporter(object):
16
17 1
    def __init__(self, server):
18 1
        self.logger = logging.getLogger(__name__)
19 1
        self.parser = None
20 1
        self.server = server
21 1
        self.namespaces = {}
22 1
        self.aliases = {}
23
24 1
    def _map_namespaces(self, namespaces_uris):
25
        """
26
        creates a mapping between the namespaces in the xml file and in the server.
27
        if not present the namespace is registered.
28
        """
29 1
        namespaces = {}
30 1
        for ns_index, ns_uri in enumerate(namespaces_uris):
31 1
            ns_server_index = self.server.register_namespace(ns_uri)
32 1
            namespaces[ns_index + 1] = ns_server_index
33 1
        return namespaces
34
35 1
    def _map_aliases(self, aliases):
36
        """
37
        maps the import aliases to the correct namespaces        
38
        """
39 1
        aliases_mapped = {}
40 1
        for alias, node_id in aliases.items():
41 1
            aliases_mapped[alias] = self._migrate_ns(self.to_nodeid(node_id))
42 1
        return aliases_mapped
43
44 1
    def import_xml(self, xmlpath):
45
        """
46
        import xml and return added nodes
47
        """
48 1
        self.logger.info("Importing XML file %s", xmlpath)
49 1
        self.parser = xmlparser.XMLParser(xmlpath)
50
51 1
        dnodes = self.parser.get_node_datas()
52 1
        dnodes = self.make_objects(dnodes)
53
54 1
        self.namespaces = self._map_namespaces(self.parser.get_used_namespaces())
55 1
        self.aliases = self._map_aliases(self.parser.get_aliases())
56
57 1
        nodes_parsed = self._sort_nodes_by_parentid(dnodes)
58
59 1
        nodes = []
60 1
        for nodedata in nodes_parsed:  # self.parser:
61 1
            if nodedata.nodetype == 'UAObject':
62 1
                node = self.add_object(nodedata)
63 1
            elif nodedata.nodetype == 'UAObjectType':
64 1
                node = self.add_object_type(nodedata)
65 1
            elif nodedata.nodetype == 'UAVariable':
66 1
                node = self.add_variable(nodedata)
67 1
            elif nodedata.nodetype == 'UAVariableType':
68
                node = self.add_variable_type(nodedata)
69 1
            elif nodedata.nodetype == 'UAReferenceType':
70
                node = self.add_reference_type(nodedata)
71 1
            elif nodedata.nodetype == 'UADataType':
72 1
                node = self.add_datatype(nodedata)
73 1
            elif nodedata.nodetype == 'UAMethod':
74 1
                node = self.add_method(nodedata)
75
            else:
76
                self.logger.warning("Not implemented node type: %s ", nodedata.nodetype)
77
                continue
78 1
            nodes.append(node)
79 1
        return nodes
80
81 1
    def make_objects(self, node_datas):
82 1
        new_nodes = []
83 1
        for ndata in node_datas:
84 1
            ndata.nodeid = ua.NodeId.from_string(ndata.nodeid)
85 1
            ndata.browsename = ua.QualifiedName.from_string(ndata.browsename)
86 1
            if ndata.parent:
87 1
                ndata.parent = ua.NodeId.from_string(ndata.parent)
88 1
            if ndata.parentlink:
89 1
                ndata.parentlink = self.to_nodeid(ndata.parentlink)
90 1
            if ndata.typedef:
91 1
                ndata.typedef = self.to_nodeid(ndata.typedef)
92 1
            new_nodes.append(ndata)
93 1
        return new_nodes
94
95 1
    def _migrate_ns(self, nodeid):
96
        """
97
        Check if the index of nodeid or browsename  given in the xml model file
98
        must be converted to a already existing namespace id based on the files
99
        namespace uri
100
101
        :returns: NodeId (str)
102
        """
103 1
        if nodeid.NamespaceIndex in self.namespaces:
104 1
            nodeid = copy(nodeid)
105 1
            nodeid.NamespaceIndex = self.namespaces[nodeid.NamespaceIndex]
106 1
        return nodeid
107
108 1
    def _get_node(self, obj):
109 1
        node = ua.AddNodesItem()
110 1
        node.RequestedNewNodeId = self._migrate_ns(obj.nodeid)
111 1
        node.BrowseName = self._migrate_ns(obj.browsename)
112 1
        self.logger.info("Importing xml node (%s, %s) as (%s %s)", obj.browsename, obj.nodeid, node.BrowseName, node.RequestedNewNodeId)
113 1
        node.NodeClass = getattr(ua.NodeClass, obj.nodetype[2:])
114 1
        if obj.parent:
115 1
            node.ParentNodeId = self._migrate_ns(obj.parent)
116 1
        if obj.parentlink:
117 1
            node.ReferenceTypeId = self._migrate_ns(obj.parentlink)
118 1
        if obj.typedef:
119 1
            node.TypeDefinition = self._migrate_ns(obj.typedef)
120 1
        return node
121
122 1
    def to_nodeid(self, nodeid):
123 1
        if isinstance(nodeid, ua.NodeId):
124
            return nodeid
125 1
        elif not nodeid:
126
            return ua.NodeId(ua.ObjectIds.String)
127 1
        elif "=" in nodeid:
128 1
            return ua.NodeId.from_string(nodeid)
129 1
        elif hasattr(ua.ObjectIds, nodeid):
130 1
            return ua.NodeId(getattr(ua.ObjectIds, nodeid))
131
        else:
132 1
            if nodeid in self.aliases:
133 1
                return self.aliases[nodeid]
134
            else:
135
                return ua.NodeId(getattr(ua.ObjectIds, nodeid))
136
137 1 View Code Duplication
    def add_object(self, obj):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
138 1
        node = self._get_node(obj)
139 1
        attrs = ua.ObjectAttributes()
140 1
        if obj.desc:
141 1
            attrs.Description = ua.LocalizedText(obj.desc)
142 1
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
143 1
        attrs.EventNotifier = obj.eventnotifier
144 1
        node.NodeAttributes = attrs
145 1
        res = self.server.iserver.isession.add_nodes([node])
146 1
        self._add_refs(obj)
147 1
        res[0].StatusCode.check()
148 1
        return res[0].AddedNodeId
149
150 1 View Code Duplication
    def add_object_type(self, obj):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
151 1
        node = self._get_node(obj)
152 1
        attrs = ua.ObjectTypeAttributes()
153 1
        if obj.desc:
154
            attrs.Description = ua.LocalizedText(obj.desc)
155 1
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
156 1
        attrs.IsAbstract = obj.abstract
157 1
        node.NodeAttributes = attrs
158 1
        res = self.server.iserver.isession.add_nodes([node])
159 1
        self._add_refs(obj)
160 1
        res[0].StatusCode.check()
161 1
        return res[0].AddedNodeId
162
163 1
    def add_variable(self, obj):
164 1
        node = self._get_node(obj)
165 1
        attrs = ua.VariableAttributes()
166 1
        if obj.desc:
167 1
            attrs.Description = ua.LocalizedText(obj.desc)
168 1
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
169 1
        attrs.DataType = self.to_nodeid(obj.datatype)
170 1
        if obj.value is not None:
171 1
            attrs.Value = self._add_variable_value(obj,)
172 1
        if obj.rank:
173 1
            attrs.ValueRank = obj.rank
174 1
        if obj.accesslevel:
175
            attrs.AccessLevel = obj.accesslevel
176
        else:
177 1
            attrs.AccessLevel = ua.AccessLevel.CurrentRead.mask
178 1
        if obj.useraccesslevel:
179
            attrs.UserAccessLevel = obj.useraccesslevel
180
        else:
181 1
            attrs.UserAccessLevel = ua.AccessLevel.CurrentRead.mask
182 1
        if obj.minsample:
183
            attrs.MinimumSamplingInterval = obj.minsample
184 1
        if obj.dimensions:
185 1
            attrs.ArrayDimensions = obj.dimensions
186 1
        node.NodeAttributes = attrs
187 1
        res = self.server.iserver.isession.add_nodes([node])
188 1
        self._add_refs(obj)
189 1
        res[0].StatusCode.check()
190 1
        return res[0].AddedNodeId
191
192 1
    def _make_ext_obj(self, obj):
193 1
        ext = getattr(ua, obj.objname)()
194 1
        for name, val in obj.body:
195 1
            if isinstance(val, str):
196
                raise Exception("Error val should a dict", name, val)
197
            else:
198 1
                for attname, v in val:
199 1
                    self._set_attr(ext, attname, v)
200 1
        return ext
201
202 1
    def _set_attr(self, obj, attname, val):
203
        # tow possible values:
204
        # either we get value directly
205
        # or a dict if it s an object or a list
206 1
        if isinstance(val, str):
207 1
            pval = xmlparser.ua_type_to_python(val, obj.ua_types[attname])
208 1
            setattr(obj, attname, pval)
209
        else:
210
            # so we have either an object or a list...
211 1
            obj2 = getattr(obj, attname)
212 1
            if isinstance(obj2, ua.NodeId):  # NodeId representation does not follow common rules!!
213 1
                for attname2, v2 in val:
214 1
                    if attname2 == "Identifier":
215 1
                        obj2 = ua.NodeId.from_string(v2)
216 1
                        setattr(obj, attname, obj2)
217 1
                        break
218 1
            elif not isinstance(obj2, ua.NodeId) and not hasattr(obj2, "ua_types"):
219
                # we probably have a list
220 1
                my_list = []
221 1
                for vtype, v2 in val:
222 1
                    my_list.append(xmlparser.ua_type_to_python(v2, vtype))
223 1
                setattr(obj, attname, my_list)
224
            else:
225 1
                for attname2, v2 in val:
226 1
                    self._set_attr(obj2, attname2, v2)
227 1
                setattr(obj, attname, obj2)
228
229 1
    def _add_variable_value(self, obj):
230
        """
231
        Returns the value for a Variable based on the objects value type.
232
        """
233 1
        self.logger.debug("Setting value with type %s and value %s", obj.valuetype, obj.value)
234 1
        if obj.valuetype == 'ListOfExtensionObject':
235 1
            values = []
236 1
            for ext in obj.value:
237 1
                extobj = self._make_ext_obj(ext)
238 1
                values.append(extobj)
239 1
            return values
240 1
        elif obj.valuetype == 'ListOfGuid':
241 1
            return ua.Variant([
242
                uuid.UUID(guid) for guid in obj.value
243
            ], getattr(ua.VariantType, obj.valuetype[6:]))
244 1
        elif obj.valuetype.startswith("ListOf"):
245 1
            vtype = obj.valuetype[6:]
246 1
            if hasattr(ua.ua_binary.Primitives, vtype):
247 1
                return ua.Variant(obj.value, getattr(ua.VariantType, vtype))
248
            else:
249 1
                return [getattr(ua, vtype)(v) for v in obj.value]
250 1
        elif obj.valuetype == 'ExtensionObject':
251 1
            extobj = self._make_ext_obj(obj.value)
252 1
            return ua.Variant(extobj, getattr(ua.VariantType, obj.valuetype))
253 1
        elif obj.valuetype == 'Guid':
254 1
            return ua.Variant(uuid.UUID(obj.value), getattr(ua.VariantType, obj.valuetype))
255 1 View Code Duplication
        elif obj.valuetype == 'LocalizedText':
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
256 1
            ltext = ua.LocalizedText()
257 1
            for name, val in obj.value:
258 1
                if name == "Text":
259 1
                    ltext.Text = val.encode("utf-8")
260
                else:
261
                    self.logger.warning("While parsing localizedText value, unkown element: %s with val: %s", name, val)
262 1
            return ua.Variant(ltext, ua.VariantType.LocalizedText)
263 1
        elif obj.valuetype == 'NodeId':
264 1
            return ua.Variant(ua.NodeId.from_string(obj.value))
265
        else:
266 1
            return ua.Variant(obj.value, getattr(ua.VariantType, obj.valuetype))
267
268 1
    def add_variable_type(self, obj):
269
        node = self._get_node(obj)
270
        attrs = ua.VariableTypeAttributes()
271
        if obj.desc:
272
            attrs.Description = ua.LocalizedText(obj.desc)
273
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
274
        attrs.DataType = self.to_nodeid(obj.datatype)
275
        if obj.value and len(obj.value) == 1:
276 View Code Duplication
            attrs.Value = obj.value[0]
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
277
        if obj.rank:
278
            attrs.ValueRank = obj.rank
279
        if obj.abstract:
280
            attrs.IsAbstract = obj.abstract
281
        if obj.dimensions:
282
            attrs.ArrayDimensions = obj.dimensions
283
        node.NodeAttributes = attrs
284
        res = self.server.iserver.isession.add_nodes([node])
285
        self._add_refs(obj)
286
        res[0].StatusCode.check()
287
        return res[0].AddedNodeId
288
289 1
    def add_method(self, obj):
290 1
        node = self._get_node(obj)
291 1
        attrs = ua.MethodAttributes()
292 1
        if obj.desc:
293 1
            attrs.Description = ua.LocalizedText(obj.desc)
294 1
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
295 1
        if obj.accesslevel:
296
            attrs.AccessLevel = obj.accesslevel
297 1
        if obj.useraccesslevel:
298
            attrs.UserAccessLevel = obj.useraccesslevel
299 1
        if obj.minsample:
300
            attrs.MinimumSamplingInterval = obj.minsample
301 1
        if obj.dimensions:
302
            attrs.ArrayDimensions = obj.dimensions
303 1
        node.NodeAttributes = attrs
304 1
        res = self.server.iserver.isession.add_nodes([node])
305 1
        self._add_refs(obj)
306 1
        res[0].StatusCode.check()
307 1
        return res[0].AddedNodeId
308
309 1
    def add_reference_type(self, obj):
310
        node = self._get_node(obj)
311
        attrs = ua.ReferenceTypeAttributes()
312
        if obj.desc:
313
            attrs.Description = ua.LocalizedText(obj.desc)
314
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
315
        if obj. inversename:
316
            attrs.InverseName = ua.LocalizedText(obj.inversename)
317
        if obj.abstract:
318
            attrs.IsAbstract = obj.abstract
319
        if obj.symmetric:
320
            attrs.Symmetric = obj.symmetric
321 View Code Duplication
        node.NodeAttributes = attrs
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
322
        res = self.server.iserver.isession.add_nodes([node])
323
        self._add_refs(obj)
324
        res[0].StatusCode.check()
325
        return res[0].AddedNodeId
326
327 1
    def add_datatype(self, obj):
328 1
        node = self._get_node(obj)
329 1
        attrs = ua.DataTypeAttributes()
330 1
        if obj.desc:
331 1
            attrs.Description = ua.LocalizedText(obj.desc)
332 1
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
333 1
        if obj.abstract:
334
            attrs.IsAbstract = obj.abstract
335 1
        node.NodeAttributes = attrs
336 1
        res = self.server.iserver.isession.add_nodes([node])
337 1
        self._add_refs(obj)
338 1
        res[0].StatusCode.check()
339 1
        return res[0].AddedNodeId
340
341 1
    def _add_refs(self, obj):
342 1
        if not obj.refs:
343 1
            return
344 1
        refs = []
345 1
        for data in obj.refs:
346 1
            ref = ua.AddReferencesItem()
347 1
            ref.IsForward = True
348 1
            ref.ReferenceTypeId = self.to_nodeid(data.reftype)
349 1
            ref.SourceNodeId = self._migrate_ns(obj.nodeid)
350 1
            ref.TargetNodeClass = ua.NodeClass.DataType
351 1
            ref.TargetNodeId = self._migrate_ns(self.to_nodeid(data.target))
352 1
            refs.append(ref)
353 1
        self.server.iserver.isession.add_references(refs)
354
355 1
    def _sort_nodes_by_parentid(self, ndatas):
356
        """
357
        Sort the list of nodes according their parent node in order to respect
358
        the dependency between nodes.
359
360
        :param nodes: list of NodeDataObjects
361
        :returns: list of sorted nodes
362
        """
363 1
        _ndatas = list(ndatas)
364
        # list of node ids that are already sorted / inserted
365 1
        sorted_nodes_ids = []
366
        # list of sorted nodes (i.e. XML Elements)
367 1
        sorted_ndatas = []
368 1
        all_node_ids = [data.nodeid for data in ndatas]
369
        # list of namespace indexes that are relevant for this import
370
        # we can only respect ordering nodes for namespaces indexes that
371
        # are defined in the xml file itself. Thus we assume that all other
372
        # references namespaces are already known to the server and should
373
        # not create any dependency problems (like "NodeNotFound")
374 1
        while len(_ndatas) > 0:
375 1
            pop_nodes = []
376 1
            for ndata in _ndatas:
377
                # Insert nodes that
378
                #   (1) have no parent / parent_ns is None (e.g. namespace 0)
379
                #   (2) ns is not in list of relevant namespaces
380 1
                if ndata.nodeid.NamespaceIndex not in self.namespaces or \
381
                        ndata.parent is None or \
382
                        ndata.parent not in all_node_ids:
383 1
                    sorted_ndatas.append(ndata)
384 1
                    sorted_nodes_ids.append(ndata.nodeid)
385 1
                    pop_nodes.append(ndata)
386
                else:
387
                    # Check if the nodes parent is already in the list of
388
                    # inserted nodes
389 1
                    if ndata.parent in sorted_nodes_ids:
390 1
                        sorted_ndatas.append(ndata)
391 1
                        sorted_nodes_ids.append(ndata.nodeid)
392 1
                        pop_nodes.append(ndata)
393
            # Remove inserted nodes from the list
394 1
            for ndata in pop_nodes:
395 1
                _ndatas.pop(_ndatas.index(ndata))
396
        return sorted_ndatas
397