Completed
Pull Request — master (#327)
by Olivier
06:13 queued 02:31
created

XmlImporter._add_variable_value()   C

Complexity

Conditions 7

Size

Total Lines 21

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 56

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 7
dl 0
loc 21
rs 6.4705
c 1
b 0
f 0
ccs 0
cts 0
cp 0
crap 56
1
"""G
2
add node defined in XML to address space
3
format is the one from opc-ua specification
4
"""
5 1
import logging
6
import sys
7
from copy import copy
8 1
9 1
from opcua import ua
10
from opcua.common import xmlparser
11
12 1
13
def to_python(val, obj, attname):
14 1
    if isinstance(obj, ua.NodeId) and attname == "Identifier":
15 1
        raise RuntimeError("Error we should parse a NodeId here")
16 1
        return ua.NodeId.from_string(val)
17 1
    else:
18
        return xmlparser.ua_type_to_python(val, obj.ua_types[attname])
19 1
20 1
21 1
class XmlImporter(object):
22 1
23 1
    def __init__(self, server):
24 1
        self.logger = logging.getLogger(__name__)
25 1
        self.parser = None
26
        self.server = server
27 1
        self.namespaces = {}
28 1
        self.aliases = {}
29
30
    def _map_namespaces(self, namespaces_uris):
31
        """
32
        creates a mapping between the namespaces in the xml file and in the server.
33
        if not present the namespace is registered.
34
        """
35
        namespaces = {}
36
        for ns_index, ns_uri in enumerate(namespaces_uris):
37
            ns_server_index = self.server.register_namespace(ns_uri)
38
            namespaces[ns_index + 1] = (ns_server_index, ns_uri)
39
        return namespaces
40 1
41 1
    def _map_aliases(self, aliases):
42 1
        """
43 1
        maps the import aliases to the correct namespaces        
44 1
        """
45 1
        aliases_mapped = {}
46 1
        for alias, node_id in aliases.items():
47 1
            aliases_mapped[alias] = self._migrate_ns(self.to_nodeid(node_id))
48 1
        return aliases_mapped
49 1
50 1
    def import_xml(self, xmlpath):
51 1
        """
52
        import xml and return added nodes
53 1
        """
54 1
        self.logger.info("Importing XML file %s", xmlpath)
55
        self.parser = xmlparser.XMLParser(xmlpath)
56 1
57
        dnodes = self.parser.get_node_datas()
58
        dnodes = self.make_objects(dnodes)
59 1
60
        self.namespaces = self._map_namespaces(self.parser.get_used_namespaces())
61 1
        self.aliases = self._map_aliases(self.parser.get_aliases())
62 1
63 1
        nodes_parsed = self._sort_nodes_by_parentid(dnodes)
64
65
        nodes = []
66 1
        for nodedata in nodes_parsed:  # self.parser:
67
            if nodedata.nodetype == 'UAObject':
68 1
                node = self.add_object(nodedata)
69
            elif nodedata.nodetype == 'UAObjectType':
70 1
                node = self.add_object_type(nodedata)
71 1
            elif nodedata.nodetype == 'UAVariable':
72 1
                node = self.add_variable(nodedata)
73 1
            elif nodedata.nodetype == 'UAVariableType':
74 1
                node = self.add_variable_type(nodedata)
75 1
            elif nodedata.nodetype == 'UAReferenceType':
76 1
                node = self.add_reference_type(nodedata)
77 1
            elif nodedata.nodetype == 'UADataType':
78 1
                node = self.add_datatype(nodedata)
79 1
            elif nodedata.nodetype == 'UAMethod':
80
                node = self.add_method(nodedata)
81 1
            else:
82
                self.logger.warning("Not implemented node type: %s ", nodedata.nodetype)
83
                continue
84
            nodes.append(node)
85
        return nodes
86
    
87
    def make_objects(self, node_datas):
88
        new_nodes = []
89
        for ndata in node_datas:
90
            ndata.nodeid = ua.NodeId.from_string(ndata.nodeid)
91
            ndata.browsename = ua.QualifiedName.from_string(ndata.browsename)
92 1
            if ndata.parent:
93 1
                ndata.parent = ua.NodeId.from_string(ndata.parent)
94 1
            if ndata.parentlink:
95 1
                ndata.parentlink = self.to_nodeid(ndata.parentlink)
96
            if ndata.typedef:
97 1
                ndata.typedef = self.to_nodeid(ndata.typedef)
98 1
            new_nodes.append(ndata)
99
        return new_nodes
100 1
101 1
    def _migrate_ns(self, nodeid):
102 1
        """
103 1
        Check if the index of nodeid or browsename  given in the xml model file
104 1
        must be converted to a already existing namespace id based on the files
105
        namespace uri
106 1
107
        :returns: NodeId (str)
108 1
        """
109
        if nodeid.NamespaceIndex in self.namespaces:
110 1
            nodeid = copy(nodeid)
111
            nodeid.NamespaceIndex = self.namespaces[nodeid.NamespaceIndex][0]
112 1
        return nodeid
113 1
114 1
    def _get_node(self, obj):
115
        node = ua.AddNodesItem()
116 1
        node.RequestedNewNodeId = self._migrate_ns(obj.nodeid)
117
        node.BrowseName = self._migrate_ns(obj.browsename)
118
        node.NodeClass = getattr(ua.NodeClass, obj.nodetype[2:])
119
        if obj.parent:
120
            node.ParentNodeId = self._migrate_ns(obj.parent)
121
        if obj.parentlink:
122
            node.ReferenceTypeId = self._migrate_ns(obj.parentlink)
123
        if obj.typedef:
124
            node.TypeDefinition = self._migrate_ns(obj.typedef)
125
        return node
126
127
    def to_nodeid(self, nodeid):
128
        if isinstance(nodeid, ua.NodeId):
129
            return nodeid
130
        elif not nodeid:
131
            return ua.NodeId(ua.ObjectIds.String)
132
        elif "=" in nodeid:
133
            return ua.NodeId.from_string(nodeid)
134
        elif hasattr(ua.ObjectIds, nodeid):
135 1
            return ua.NodeId(getattr(ua.ObjectIds, nodeid))
136
        else:
137
            if nodeid in self.aliases:
138
                return self.aliases[nodeid]
139
            else:
140
                return ua.NodeId(getattr(ua.ObjectIds, nodeid))
141
142
    def add_object(self, obj):
143
        node = self._get_node(obj)
144
        attrs = ua.ObjectAttributes()
145
        if obj.desc:
146 View Code Duplication
            attrs.Description = ua.LocalizedText(obj.desc)
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
147
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
148
        attrs.EventNotifier = obj.eventnotifier
149
        node.NodeAttributes = attrs
150
        res = self.server.iserver.isession.add_nodes([node])
151
        self._add_refs(obj)
152
        res[0].StatusCode.check()
153 1
        return res[0].AddedNodeId
154
155
    def add_object_type(self, obj):
156
        node = self._get_node(obj)
157
        attrs = ua.ObjectTypeAttributes()
158
        if obj.desc:
159 View Code Duplication
            attrs.Description = ua.LocalizedText(obj.desc)
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
160
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
161
        attrs.IsAbstract = obj.abstract
162
        node.NodeAttributes = attrs
163
        res = self.server.iserver.isession.add_nodes([node])
164
        self._add_refs(obj)
165
        res[0].StatusCode.check()
166
        return res[0].AddedNodeId
167
168
    def add_variable(self, obj):
169 1
        node = self._get_node(obj)
170
        attrs = ua.VariableAttributes()
171
        if obj.desc:
172
            attrs.Description = ua.LocalizedText(obj.desc)
173
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
174
        attrs.DataType = self.to_nodeid(obj.datatype)
175
        if obj.value is not None:
176
            attrs.Value = self._add_variable_value(obj,)
177
        if obj.rank:
178
            attrs.ValueRank = obj.rank
179
        if obj.accesslevel:
180
            attrs.AccessLevel = obj.accesslevel
181 1
        if obj.useraccesslevel:
182 1
            attrs.UserAccessLevel = obj.useraccesslevel
183 1
        if obj.minsample:
184
            attrs.MinimumSamplingInterval = obj.minsample
185
        if obj.dimensions:
186
            attrs.ArrayDimensions = obj.dimensions
187
        node.NodeAttributes = attrs
188
        res = self.server.iserver.isession.add_nodes([node])
189
        self._add_refs(obj)
190
        res[0].StatusCode.check()
191
        return res[0].AddedNodeId
192
193
    def _make_ext_obj(sefl, obj):
194
        ext = getattr(ua, obj.objname)()
195
        for name, val in obj.body:
196
            if type(val) is str:
197
                raise Exception("Error val should a dict", name, val)
198
            else:
199
                for attname, v in val:
200
                    # tow possible values:
201
                    # either we get value directly
202
                    # or a dict if it s an object or a list
203
                    if type(v) is str:
204
                        setattr(ext, attname, to_python(v, ext, attname))
205
                    else:
206
                        # so we habve either an object or a list...
207
                        obj2 = getattr(ext, attname)
208
                        if isinstance(obj2, ua.NodeId):
209
                            for attname2, v2 in v:
210
                                if attname2 == "Identifier":
211
                                    obj2 = ua.NodeId.from_string(v2)
212
                                    setattr(ext, attname, obj2)
213
                                    break
214
                        elif not isinstance(obj2, ua.NodeId) and not hasattr(obj2, "ua_types"):
215
                            # we probably have a list
216
                            my_list = []
217
                            for vtype, v2 in v:
218
                                my_list.append(xmlparser.ua_type_to_python(v2, vtype))
219
                            setattr(ext, attname, my_list)
220
                        else:
221
                            for attname2, v2 in v:
222
                                setattr(obj2, attname2, to_python(v2, obj2, attname2))
223
                            setattr(ext, attname, obj2)
224
        return ext
225
226
    def _add_variable_value(self, obj):
227
        """
228
        Returns the value for a Variable based on the objects valuetype. 
229
        """
230
        if obj.valuetype == 'ListOfExtensionObject':
231
            values = []
232
            for ext in obj.value:
233
                extobj = self._make_ext_obj(ext)
234
                values.append(extobj)
235
            return values
236
        elif obj.valuetype.startswith("ListOf"):
237
            vtype = obj.valuetype[6:]
238
            if hasattr(ua.ua_binary.Primitives, vtype):
239
                return ua.Variant(obj.value, getattr(ua.VariantType, vtype))
240
            else:
241
                return [getattr(ua, vtype)(v) for v in obj.value]
242
        elif obj.valuetype == 'ExtensionObject':
243
            extobj = self._make_ext_obj(obj.value)
244
            return ua.Variant(extobj, getattr(ua.VariantType, obj.valuetype))
245
        else:
246
            return ua.Variant(obj.value, getattr(ua.VariantType, obj.valuetype))
247
248
    def add_variable_type(self, obj):
249
        node = self._get_node(obj)
250
        attrs = ua.VariableTypeAttributes()
251
        if obj.desc:
252
            attrs.Description = ua.LocalizedText(obj.desc)
253
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
254 View Code Duplication
        attrs.DataType = self.to_nodeid(obj.datatype)
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
255
        if obj.value and len(obj.value) == 1:
256
            attrs.Value = obj.value[0]
257
        if obj.rank:
258
            attrs.ValueRank = obj.rank
259
        if obj.abstract:
260
            attrs.IsAbstract = obj.abstract
261
        if obj.dimensions:
262
            attrs.ArrayDimensions = obj.dimensions
263
        node.NodeAttributes = attrs
264
        res = self.server.iserver.isession.add_nodes([node])
265
        self._add_refs(obj)
266
        res[0].StatusCode.check()
267
        return res[0].AddedNodeId
268
269
    def add_method(self, obj):
270
        node = self._get_node(obj)
271
        attrs = ua.MethodAttributes()
272
        if obj.desc:
273
            attrs.Description = ua.LocalizedText(obj.desc)
274
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
275 View Code Duplication
        if obj.accesslevel:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
276
            attrs.AccessLevel = obj.accesslevel
277
        if obj.useraccesslevel:
278
            attrs.UserAccessLevel = obj.useraccesslevel
279
        if obj.minsample:
280
            attrs.MinimumSamplingInterval = obj.minsample
281
        if obj.dimensions:
282
            attrs.ArrayDimensions = obj.dimensions
283
        node.NodeAttributes = attrs
284
        res = self.server.iserver.isession.add_nodes([node])
285
        self._add_refs(obj)
286
        res[0].StatusCode.check()
287
        return res[0].AddedNodeId
288
289
    def add_reference_type(self, obj):
290
        node = self._get_node(obj)
291
        attrs = ua.ReferenceTypeAttributes()
292
        if obj.desc:
293
            attrs.Description = ua.LocalizedText(obj.desc)
294
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
295
        if obj. inversename:
296
            attrs.InverseName = ua.LocalizedText(obj.inversename)
297
        if obj.abstract:
298
            attrs.IsAbstract = obj.abstract
299
        if obj.symmetric:
300
            attrs.Symmetric = obj.symmetric
301
        node.NodeAttributes = attrs
302
        res = self.server.iserver.isession.add_nodes([node])
303
        self._add_refs(obj)
304
        res[0].StatusCode.check()
305
        return res[0].AddedNodeId
306
307
    def add_datatype(self, obj):
308
        node = self._get_node(obj)
309
        attrs = ua.DataTypeAttributes()
310
        if obj.desc:
311
            attrs.Description = ua.LocalizedText(obj.desc)
312
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
313 View Code Duplication
        if obj.abstract:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
314
            attrs.IsAbstract = obj.abstract
315
        node.NodeAttributes = attrs
316
        res = self.server.iserver.isession.add_nodes([node])
317
        self._add_refs(obj)
318
        res[0].StatusCode.check()
319
        return res[0].AddedNodeId
320
321
    def _add_refs(self, obj):
322
        if not obj.refs:
323
            return
324
        refs = []
325
        for data in obj.refs:
326
            ref = ua.AddReferencesItem()
327
            ref.IsForward = True
328
            ref.ReferenceTypeId = self.to_nodeid(data.reftype)
329
            ref.SourceNodeId = self._migrate_ns(obj.nodeid)
330
            ref.TargetNodeClass = ua.NodeClass.DataType
331
            ref.TargetNodeId = self._migrate_ns(self.to_nodeid(data.target))
332
            refs.append(ref)
333
        self.server.iserver.isession.add_references(refs)
334
335
    def _sort_nodes_by_parentid(self, ndatas):
336
        """
337
        Sort the list of nodes according theire parent node in order to respect
338
        the depency between nodes.
339
340
        :param nodes: list of NodeDataObjects
341
        :returns: list of sorted nodes
342
        """
343
        _ndatas = list(ndatas)
344
        # list of node ids that are already sorted / inserted
345
        sorted_nodes_ids = []
346
        # list of sorted nodes (i.e. XML Elements)
347
        sorted_ndatas = []
348
        all_node_ids = [data.nodeid for data in ndatas]
349
        # list of namespace indexes that are relevant for this import
350
        # we can only respect ordering nodes for namespaces indexes that
351
        # are defined in the xml file itself. Thus we assume that all other
352
        # references namespaces are already known to the server and should
353
        # not create any dependency problems (like "NodeNotFound")
354
        relevant_namespaces = [str(ns) for ns in self.namespaces.keys()]
355
        while len(_ndatas) > 0:
356
            pop_nodes = []
357
            for ndata in _ndatas:
358
                # Insert nodes that
359
                #   (1) have no parent / parent_ns is None (e.g. namespace 0)
360
                #   (2) ns is not in list of relevant namespaces
361
                if ndata.nodeid.NamespaceIndex not in relevant_namespaces or \
362
                        ndata.parent is None or \
363
                        ndata.parent not in all_node_ids:
364
                    sorted_ndatas.append(ndata)
365
                    sorted_nodes_ids.append(ndata.nodeid)
366
                    pop_nodes.append(ndata)
367
                else:
368
                    # Check if the nodes parent is already in the list of
369
                    # inserted nodes
370
                    if ndata.parent in sorted_nodes_ids:
371
                        sorted_ndatas.append(ndata)
372
                        sorted_nodes_ids.append(ndata.nodeid)
373
                        pop_nodes.append(ndata)
374
375
            # Remove inserted nodes from the list
376
            for ndata in pop_nodes:
377
                _ndatas.pop(_ndatas.index(ndata))
378
        return sorted_ndatas
379