Completed
Push — master ( 0e5e79...d552f1 )
by Olivier
05:52
created

XmlImporter.add_variable()   D

Complexity

Conditions 8

Size

Total Lines 24

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 23
CRAP Score 8.0046

Importance

Changes 0
Metric Value
cc 8
c 0
b 0
f 0
dl 0
loc 24
ccs 23
cts 24
cp 0.9583
crap 8.0046
rs 4.3478
1
"""
2
add nodes defined in XML to address space
3
format is the one from opc-ua specification
4
"""
5 1
import logging
6 1
import uuid
7 1
from copy import copy
8
9 1
import opcua
10 1
from opcua import ua
11 1
from opcua.common import xmlparser
12 1
from opcua.ua.uaerrors import UaError
13
14 1
import sys
15
16 1
if sys.version_info.major > 2:
17 1
    unicode = str
18
19 1
class XmlImporter(object):
20
21 1
    def __init__(self, server):
22 1
        self.logger = logging.getLogger(__name__)
23 1
        self.parser = None
24 1
        self.server = server
25 1
        self.namespaces = {}
26 1
        self.aliases = {}
27 1
        self.refs = None
28
29 1
    def _map_namespaces(self, namespaces_uris):
30
        """
31
        creates a mapping between the namespaces in the xml file and in the server.
32
        if not present the namespace is registered.
33
        """
34 1
        namespaces = {}
35 1
        for ns_index, ns_uri in enumerate(namespaces_uris):
36 1
            ns_server_index = self.server.register_namespace(ns_uri)
37 1
            namespaces[ns_index + 1] = ns_server_index
38 1
        return namespaces
39
40 1
    def _map_aliases(self, aliases):
41
        """
42
        maps the import aliases to the correct namespaces
43
        """
44 1
        aliases_mapped = {}
45 1
        for alias, node_id in aliases.items():
46 1
            aliases_mapped[alias] = self.to_nodeid(node_id)
47 1
        return aliases_mapped
48
49 1
    def import_xml(self, xmlpath=None, xmlstring=None):
50
        """
51
        import xml and return added nodes
52
        """
53 1
        self.logger.info("Importing XML file %s", xmlpath)
54 1
        self.parser = xmlparser.XMLParser(xmlpath, xmlstring)
55
56 1
        self.namespaces = self._map_namespaces(self.parser.get_used_namespaces())
57 1
        self.aliases = self._map_aliases(self.parser.get_aliases())
58 1
        self.refs = []
59
60 1
        dnodes = self.parser.get_node_datas()
61 1
        dnodes = self.make_objects(dnodes)
62 1
        nodes_parsed = self._sort_nodes_by_parentid(dnodes)
63
64 1
        nodes = []
65 1
        for nodedata in nodes_parsed:  # self.parser:
66 1
            try:
67 1
                node = self._add_node_data(nodedata)
68
            except Exception:
69
                self.logger.warning("failure adding node %s", nodedata)
70
                raise
71 1
            nodes.append(node)
72
73 1
        self.refs, remaining_refs = [], self.refs
74 1
        self._add_references(remaining_refs)
75 1
        if len(self.refs) != 0:
76
            self.logger.warning("The following references could not be imported and are probaly broken: %s", self.refs) 
77
78 1
        return nodes
79
80 1
    def _add_node_data(self, nodedata):
81 1
        if nodedata.nodetype == 'UAObject':
82 1
            node = self.add_object(nodedata)
83 1
        elif nodedata.nodetype == 'UAObjectType':
84 1
            node = self.add_object_type(nodedata)
85 1
        elif nodedata.nodetype == 'UAVariable':
86 1
            node = self.add_variable(nodedata)
87 1
        elif nodedata.nodetype == 'UAVariableType':
88
            node = self.add_variable_type(nodedata)
89 1
        elif nodedata.nodetype == 'UAReferenceType':
90
            node = self.add_reference_type(nodedata)
91 1
        elif nodedata.nodetype == 'UADataType':
92 1
            node = self.add_datatype(nodedata)
93 1
        elif nodedata.nodetype == 'UAMethod':
94 1
            node = self.add_method(nodedata)
95
        else:
96
            self.logger.warning("Not implemented node type: %s ", nodedata.nodetype)
97 1
        return node
98
99 1
    def _add_node(self, node):
100 1
        if isinstance(self.server, opcua.server.server.Server):
101 1
            return self.server.iserver.isession.add_nodes([node])
102
        else:
103 1
            return self.server.uaclient.add_nodes([node])
104
105 1
    def _add_references(self, refs):
106 1
        if isinstance(self.server, opcua.server.server.Server):
107 1
            res = self.server.iserver.isession.add_references(refs)
108
        else:
109 1
            res = self.server.uaclient.add_references(refs)
110
111 1
        for sc, ref in zip(res, refs):
112 1
            if not sc.is_good():
113 1
                self.refs.append(ref)
114
115 1
    def make_objects(self, node_datas):
116 1
        new_nodes = []
117 1
        for ndata in node_datas:
118 1
            ndata.nodeid = ua.NodeId.from_string(ndata.nodeid)
119 1
            ndata.browsename = ua.QualifiedName.from_string(ndata.browsename)
120 1
            if ndata.parent:
121 1
                ndata.parent = ua.NodeId.from_string(ndata.parent)
122 1
            if ndata.parentlink:
123 1
                ndata.parentlink = self.to_nodeid(ndata.parentlink)
124 1
            if ndata.typedef:
125 1
                ndata.typedef = self.to_nodeid(ndata.typedef)
126 1
            new_nodes.append(ndata)
127 1
        return new_nodes
128
129 1
    def _migrate_ns(self, nodeid):
130
        """
131
        Check if the index of nodeid or browsename  given in the xml model file
132
        must be converted to a already existing namespace id based on the files
133
        namespace uri
134
135
        :returns: NodeId (str)
136
        """
137 1
        if nodeid.NamespaceIndex in self.namespaces:
138 1
            nodeid = copy(nodeid)
139 1
            nodeid.NamespaceIndex = self.namespaces[nodeid.NamespaceIndex]
140 1
        return nodeid
141
142 1
    def _get_node(self, obj):
143 1
        node = ua.AddNodesItem()
144 1
        node.RequestedNewNodeId = self._migrate_ns(obj.nodeid)
145 1
        node.BrowseName = self._migrate_ns(obj.browsename)
146 1
        self.logger.info("Importing xml node (%s, %s) as (%s %s)", obj.browsename, obj.nodeid, node.BrowseName, node.RequestedNewNodeId)
147 1
        node.NodeClass = getattr(ua.NodeClass, obj.nodetype[2:])
148 1
        if obj.parent and obj.parentlink:
149 1
            node.ParentNodeId = self._migrate_ns(obj.parent)
150 1
            node.ReferenceTypeId = self._migrate_ns(obj.parentlink)
151 1
        if obj.typedef:
152 1
            node.TypeDefinition = self._migrate_ns(obj.typedef)
153 1
        return node
154
155 1
    def _to_nodeid(self, nodeid):
156 1
        if isinstance(nodeid, ua.NodeId):
157
            return nodeid
158 1
        elif not nodeid:
159
            return ua.NodeId(ua.ObjectIds.String)
160 1
        elif "=" in nodeid:
161 1
            return ua.NodeId.from_string(nodeid)
162 1
        elif hasattr(ua.ObjectIds, nodeid):
163 1
            return ua.NodeId(getattr(ua.ObjectIds, nodeid))
164
        else:
165 1
            if nodeid in self.aliases:
166 1
                return self.aliases[nodeid]
167
            else:
168
                return ua.NodeId(getattr(ua.ObjectIds, nodeid))
169
170 1
    def to_nodeid(self, nodeid):
171 1
        return self._migrate_ns(self._to_nodeid(nodeid))
172 View Code Duplication
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
173 1
    def add_object(self, obj):
174 1
        node = self._get_node(obj)
175 1
        attrs = ua.ObjectAttributes()
176 1
        if obj.desc:
177 1
            attrs.Description = ua.LocalizedText(obj.desc)
178 1
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
179 1
        attrs.EventNotifier = obj.eventnotifier
180 1
        node.NodeAttributes = attrs
181 1
        res = self._add_node(node)
182 1
        self._add_refs(obj)
183 1
        res[0].StatusCode.check()
184 1
        return res[0].AddedNodeId
185 View Code Duplication
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
186 1
    def add_object_type(self, obj):
187 1
        node = self._get_node(obj)
188 1
        attrs = ua.ObjectTypeAttributes()
189 1
        if obj.desc:
190
            attrs.Description = ua.LocalizedText(obj.desc)
191 1
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
192 1
        attrs.IsAbstract = obj.abstract
193 1
        node.NodeAttributes = attrs
194 1
        res = self._add_node(node)
195 1
        self._add_refs(obj)
196 1
        res[0].StatusCode.check()
197 1
        return res[0].AddedNodeId
198
199 1
    def add_variable(self, obj):
200 1
        node = self._get_node(obj)
201 1
        attrs = ua.VariableAttributes()
202 1
        if obj.desc:
203 1
            attrs.Description = ua.LocalizedText(obj.desc)
204 1
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
205 1
        attrs.DataType = self.to_nodeid(obj.datatype)
206 1
        if obj.value is not None:
207 1
            attrs.Value = self._add_variable_value(obj,)
208 1
        if obj.rank:
209 1
            attrs.ValueRank = obj.rank
210 1
        if obj.accesslevel:
211 1
            attrs.AccessLevel = obj.accesslevel
212 1
        if obj.useraccesslevel:
213 1
            attrs.UserAccessLevel = obj.useraccesslevel
214 1
        if obj.minsample:
215
            attrs.MinimumSamplingInterval = obj.minsample
216 1
        if obj.dimensions:
217 1
            attrs.ArrayDimensions = obj.dimensions
218 1
        node.NodeAttributes = attrs
219 1
        res = self._add_node(node)
220 1
        self._add_refs(obj)
221 1
        res[0].StatusCode.check()
222 1
        return res[0].AddedNodeId
223
224 1
    def _get_ext_class(self, name):
225 1
        if hasattr(ua, name):
226 1
            return  getattr(ua, name)
227
        elif name in self.aliases.keys():
228
            nodeid = self.aliases[name]
229
            class_type = ua.uatypes.get_extensionobject_class_type(nodeid)
230
            if class_type:
231
                return class_type
232
            else:
233
                raise Exception("Error no extension class registered ", name, nodeid)
234
        else:
235
            raise Exception("Error no alias found for extension class", name)
236
237 1
    def _make_ext_obj(self, obj):
238 1
        ext = self._get_ext_class(obj.objname)()
239 1
        for name, val in obj.body:
240 1
            if not isinstance(val, list):
241
                raise Exception("Error val should be a list, this is a python-opcua bug", name, type(val), val)
242
            else:
243 1
                for attname, v in val:
244 1
                    self._set_attr(ext, attname, v)
245 1
        return ext
246
247 1
    def _get_val_type(self, obj, attname):
248 1
        for name, uatype in obj.ua_types:
249 1
            if name == attname:
250 1
                return uatype
251
        raise UaError("Attribute '{}' defined in xml is not found in object '{}'".format(attname, ext))
252
253 1
    def _set_attr(self, obj, attname, val):
254
        # tow possible values:
255
        # either we get value directly
256
        # or a dict if it s an object or a list
257 1
        if isinstance(val, (str, unicode)):
258 1
            pval = xmlparser.ua_type_to_python(val, self._get_val_type(obj, attname))
259 1
            setattr(obj, attname, pval)
260
        else:
261
            # so we have either an object or a list...
262 1
            obj2 = getattr(obj, attname)
263 1
            if isinstance(obj2, ua.NodeId):  # NodeId representation does not follow common rules!!
264 1
                for attname2, v2 in val:
265 1
                    if attname2 == "Identifier":
266 1
                        if hasattr(ua.ObjectIds, v2):
267
                            obj2 = ua.NodeId(getattr(ua.ObjectIds, v2))
268
                        else:
269 1
                            obj2 = ua.NodeId.from_string(v2)
270 1
                        setattr(obj, attname, self._migrate_ns(obj2))
271 1
                        break
272 1
            elif not hasattr(obj2, "ua_types"):
273
                # we probably have a list
274 1
                my_list = []
275 1
                for vtype, v2 in val:
276 1
                    my_list.append(xmlparser.ua_type_to_python(v2, vtype))
277 1
                setattr(obj, attname, my_list)
278
            else:
279 1
                for attname2, v2 in val:
280 1
                    self._set_attr(obj2, attname2, v2)
281 1
                setattr(obj, attname, obj2)
282
283 1
    def _add_variable_value(self, obj):
284
        """
285
        Returns the value for a Variable based on the objects value type.
286
        """
287 1
        self.logger.debug("Setting value with type %s and value %s", obj.valuetype, obj.value)
288 1
        if obj.valuetype == 'ListOfExtensionObject':
289 1
            values = []
290 1
            for ext in obj.value:
291 1
                extobj = self._make_ext_obj(ext)
292 1
                values.append(extobj)
293 1
            return ua.Variant(values, ua.VariantType.ExtensionObject)
294 1
        elif obj.valuetype == 'ListOfGuid':
295 1
            return ua.Variant([
296
                uuid.UUID(guid) for guid in obj.value
297
            ], getattr(ua.VariantType, obj.valuetype[6:]))
298 1
        elif obj.valuetype.startswith("ListOf"):
299 1
            vtype = obj.valuetype[6:]
300 1
            if hasattr(ua.ua_binary.Primitives, vtype):
301 1
                return ua.Variant(obj.value, getattr(ua.VariantType, vtype))
302
            else:
303 1
                return ua.Variant([getattr(ua, vtype)(v) for v in obj.value])
304 1
        elif obj.valuetype == 'ExtensionObject':
305 1
            extobj = self._make_ext_obj(obj.value)
306 1
            return ua.Variant(extobj, getattr(ua.VariantType, obj.valuetype))
307 1
        elif obj.valuetype == 'Guid':
308 1
            return ua.Variant(uuid.UUID(obj.value), getattr(ua.VariantType, obj.valuetype))
309 1
        elif obj.valuetype == 'LocalizedText':
310 1
            ltext = ua.LocalizedText()
311 1
            for name, val in obj.value:
312 1
                if name == "Text":
313 1
                    ltext.Text = val
314
                else:
315 1
                    self.logger.warning("While parsing localizedText value, unkown element: %s with val: %s", name, val)
316 1
            return ua.Variant(ltext, ua.VariantType.LocalizedText)
317 1
        elif obj.valuetype == 'NodeId':
318 1
            return ua.Variant(ua.NodeId.from_string(obj.value))
319
        else:
320 1
            return ua.Variant(obj.value, getattr(ua.VariantType, obj.valuetype))
321
322 1
    def add_variable_type(self, obj):
323
        node = self._get_node(obj)
324
        attrs = ua.VariableTypeAttributes()
325
        if obj.desc:
326
            attrs.Description = ua.LocalizedText(obj.desc)
327
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
328
        attrs.DataType = self.to_nodeid(obj.datatype)
329
        if obj.value and len(obj.value) == 1:
330
            attrs.Value = obj.value[0]
331
        if obj.rank:
332
            attrs.ValueRank = obj.rank
333
        if obj.abstract:
334
            attrs.IsAbstract = obj.abstract
335
        if obj.dimensions:
336
            attrs.ArrayDimensions = obj.dimensions
337
        node.NodeAttributes = attrs
338
        res = self._add_node(node)
339 View Code Duplication
        self._add_refs(obj)
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
340
        res[0].StatusCode.check()
341
        return res[0].AddedNodeId
342
343 1
    def add_method(self, obj):
344 1
        node = self._get_node(obj)
345 1
        attrs = ua.MethodAttributes()
346 1
        if obj.desc:
347 1
            attrs.Description = ua.LocalizedText(obj.desc)
348 1
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
349 1
        if obj.accesslevel:
350
            attrs.AccessLevel = obj.accesslevel
351 1
        if obj.useraccesslevel:
352
            attrs.UserAccessLevel = obj.useraccesslevel
353 1
        if obj.minsample:
354
            attrs.MinimumSamplingInterval = obj.minsample
355 1
        if obj.dimensions:
356
            attrs.ArrayDimensions = obj.dimensions
357 1
        node.NodeAttributes = attrs
358 1
        res = self._add_node(node)
359 1 View Code Duplication
        self._add_refs(obj)
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
360 1
        res[0].StatusCode.check()
361 1
        return res[0].AddedNodeId
362
363 1
    def add_reference_type(self, obj):
364
        node = self._get_node(obj)
365
        attrs = ua.ReferenceTypeAttributes()
366
        if obj.desc:
367
            attrs.Description = ua.LocalizedText(obj.desc)
368
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
369
        if obj. inversename:
370
            attrs.InverseName = ua.LocalizedText(obj.inversename)
371
        if obj.abstract:
372
            attrs.IsAbstract = obj.abstract
373
        if obj.symmetric:
374
            attrs.Symmetric = obj.symmetric
375
        node.NodeAttributes = attrs
376
        res = self._add_node(node)
377
        self._add_refs(obj)
378
        res[0].StatusCode.check()
379
        return res[0].AddedNodeId
380 View Code Duplication
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
381 1
    def add_datatype(self, obj):
382 1
        node = self._get_node(obj)
383 1
        attrs = ua.DataTypeAttributes()
384 1
        if obj.desc:
385 1
            attrs.Description = ua.LocalizedText(obj.desc)
386 1
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
387 1
        if obj.abstract:
388
            attrs.IsAbstract = obj.abstract
389 1
        node.NodeAttributes = attrs
390 1
        res = self._add_node(node)
391 1
        self._add_refs(obj)
392 1
        res[0].StatusCode.check()
393 1
        return res[0].AddedNodeId
394
395 1
    def _add_refs(self, obj):
396 1
        if not obj.refs:
397
            return
398 1
        refs = []
399 1
        for data in obj.refs:
400 1
            ref = ua.AddReferencesItem()
401 1
            ref.IsForward = data.forward
402 1
            ref.ReferenceTypeId = self.to_nodeid(data.reftype)
403 1
            ref.SourceNodeId = self._migrate_ns(obj.nodeid)
404 1
            ref.TargetNodeClass = ua.NodeClass.DataType
405 1
            ref.TargetNodeId = self.to_nodeid(data.target)
406 1
            refs.append(ref)
407 1
        self._add_references(refs)
408
409 1
    def _sort_nodes_by_parentid(self, ndatas):
410
        """
411
        Sort the list of nodes according their parent node in order to respect
412
        the dependency between nodes.
413
414
        :param nodes: list of NodeDataObjects
415
        :returns: list of sorted nodes
416
        """
417 1
        _ndatas = list(ndatas)
418
        # list of node ids that are already sorted / inserted
419 1
        sorted_nodes_ids = []
420
        # list of sorted nodes (i.e. XML Elements)
421 1
        sorted_ndatas = []
422 1
        all_node_ids = [data.nodeid for data in ndatas]
423
        # list of namespace indexes that are relevant for this import
424
        # we can only respect ordering nodes for namespaces indexes that
425
        # are defined in the xml file itself. Thus we assume that all other
426
        # references namespaces are already known to the server and should
427
        # not create any dependency problems (like "NodeNotFound")
428 1
        while len(_ndatas) > 0:
429 1
            pop_nodes = []
430 1
            for ndata in _ndatas:
431
                # Insert nodes that
432
                #   (1) have no parent / parent_ns is None (e.g. namespace 0)
433
                #   (2) ns is not in list of relevant namespaces
434 1
                if ndata.nodeid.NamespaceIndex not in self.namespaces or \
435
                        ndata.parent is None or \
436
                        ndata.parent not in all_node_ids:
437 1
                    sorted_ndatas.append(ndata)
438 1
                    sorted_nodes_ids.append(ndata.nodeid)
439 1
                    pop_nodes.append(ndata)
440
                else:
441
                    # Check if the nodes parent is already in the list of
442
                    # inserted nodes
443 1
                    if ndata.parent in sorted_nodes_ids:
444 1
                        sorted_ndatas.append(ndata)
445 1
                        sorted_nodes_ids.append(ndata.nodeid)
446 1
                        pop_nodes.append(ndata)
447
            # Remove inserted nodes from the list
448 1
            for ndata in pop_nodes:
449 1
                _ndatas.pop(_ndatas.index(ndata))
450
        return sorted_ndatas
451