Completed
Pull Request — master (#327)
by Olivier
03:40
created

XmlImporter._parse_bname()   A

Complexity

Conditions 4

Size

Total Lines 11

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 20

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 4
c 1
b 0
f 0
dl 0
loc 11
ccs 0
cts 11
cp 0
crap 20
rs 9.2
1
"""G
2
add node defined in XML to address space
3
format is the one from opc-ua specification
4
"""
5 1
import logging
6
import sys
7
from copy import copy
8 1
9 1
from opcua import ua
10
from opcua.common import xmlparser
11
12 1
13
def to_python(val, obj, attname):
14 1
    if isinstance(obj, ua.NodeId) and attname == "Identifier":
15 1
        raise RuntimeError("Error we should parse a NodeId here")
16 1
        return ua.NodeId.from_string(val)
17 1
    else:
18
        return xmlparser.ua_type_to_python(val, obj.ua_types[attname])
19 1
20 1
21 1
class XmlImporter(object):
22 1
23 1
    def __init__(self, server):
24 1
        self.logger = logging.getLogger(__name__)
25 1
        self.parser = None
26
        self.server = server
27 1
        self.namespaces = {}
28 1
        self.aliases = {}
29
30
    def _map_namespaces(self, namespaces_uris):
31
        """
32
        creates a mapping between the namespaces in the xml file and in the server.
33
        if not present the namespace is registered.
34
        """
35
        print("original ns", self.server.get_namespace_array())
36
        print("xml ns", namespaces_uris)
37
        namespaces = {}
38
        for ns_index, ns_uri in enumerate(namespaces_uris):
39
            ns_server_index = self.server.register_namespace(ns_uri)
40 1
            namespaces[ns_index + 1] = (ns_server_index, ns_uri)
41 1
        print("new ns", self.server.get_namespace_array())
42 1
        print(namespaces)
43 1
        return namespaces
44 1
45 1
    def _map_aliases(self, aliases):
46 1
        """
47 1
        maps the import aliases to the correct namespaces        
48 1
        """
49 1
        aliases_mapped = {}
50 1
        for alias, node_id in aliases.items():
51 1
            aliases_mapped[alias] = self._migrate_ns(self.to_nodeid(node_id))
52
        return aliases_mapped
53 1
54 1
    def import_xml(self, xmlpath):
55
        """
56 1
        import xml and return added nodes
57
        """
58
        self.logger.info("Importing XML file %s", xmlpath)
59 1
        self.parser = xmlparser.XMLParser(xmlpath)
60
61 1
        dnodes = self.parser.get_node_datas()
62 1
        dnodes = self.make_objects(dnodes)
63 1
64
        self.namespaces = self._map_namespaces(self.parser.get_used_namespaces())
65
        self.aliases = self._map_aliases(self.parser.get_aliases())
66 1
67
        nodes_parsed = self._sort_nodes_by_parentid(dnodes)
68 1
69
        nodes = []
70 1
        for nodedata in nodes_parsed:  # self.parser:
71 1
            if nodedata.nodetype == 'UAObject':
72 1
                node = self.add_object(nodedata)
73 1
            elif nodedata.nodetype == 'UAObjectType':
74 1
                node = self.add_object_type(nodedata)
75 1
            elif nodedata.nodetype == 'UAVariable':
76 1
                node = self.add_variable(nodedata)
77 1
            elif nodedata.nodetype == 'UAVariableType':
78 1
                node = self.add_variable_type(nodedata)
79 1
            elif nodedata.nodetype == 'UAReferenceType':
80
                node = self.add_reference_type(nodedata)
81 1
            elif nodedata.nodetype == 'UADataType':
82
                node = self.add_datatype(nodedata)
83
            elif nodedata.nodetype == 'UAMethod':
84
                node = self.add_method(nodedata)
85
            else:
86
                self.logger.warning("Not implemented node type: %s ", nodedata.nodetype)
87
                continue
88
            print("ADDED", node)
89
            nodes.append(node)
90
        return nodes
91
    
92 1
    def make_objects(self, node_datas):
93 1
        new_nodes = []
94 1
        for ndata in node_datas:
95 1
            ndata.nodeid = ua.NodeId.from_string(ndata.nodeid)
96
            ndata.browsename = ua.QualifiedName.from_string(ndata.browsename)
97 1
            if ndata.parent:
98 1
                ndata.parent = ua.NodeId.from_string(ndata.parent)
99
            if ndata.parentlink:
100 1
                ndata.parentlink = self.to_nodeid(ndata.parentlink)
101 1
            if ndata.typedef:
102 1
                ndata.typedef = self.to_nodeid(ndata.typedef)
103 1
            new_nodes.append(ndata)
104 1
        return new_nodes
105
106 1
    def _migrate_ns(self, nodeid):
107
        """
108 1
        Check if the index of nodeid or browsename  given in the xml model file
109
        must be converted to a already existing namespace id based on the files
110 1
        namespace uri
111
112 1
        :returns: NodeId (str)
113 1
        """
114 1
        if nodeid.NamespaceIndex in self.namespaces:
115
            nodeid = copy(nodeid)
116 1
            nodeid.NamespaceIndex = self.namespaces[nodeid.NamespaceIndex][0]
117
        return nodeid
118
119
    def _get_node(self, obj):
120
        node = ua.AddNodesItem()
121
        node.RequestedNewNodeId = self._migrate_ns(obj.nodeid)
122
        print("\nADDING", node.RequestedNewNodeId, node.RequestedNewNodeId.Identifier)
123
        node.BrowseName = self._migrate_ns(obj.browsename)
124
        node.NodeClass = getattr(ua.NodeClass, obj.nodetype[2:])
125
        if obj.parent:
126
            node.ParentNodeId = self._migrate_ns(obj.parent)
127
        if obj.parentlink:
128
            node.ReferenceTypeId = self._migrate_ns(obj.parentlink)
129
        if obj.typedef:
130
            node.TypeDefinition = self._migrate_ns(obj.typedef)
131
        return node
132
133
    def to_nodeid(self, nodeid):
134
        if isinstance(nodeid, ua.NodeId):
135 1
            return nodeid
136
        elif not nodeid:
137
            return ua.NodeId(ua.ObjectIds.String)
138
        elif "=" in nodeid:
139
            return ua.NodeId.from_string(nodeid)
140
        elif hasattr(ua.ObjectIds, nodeid):
141
            return ua.NodeId(getattr(ua.ObjectIds, nodeid))
142
        else:
143
            if nodeid in self.aliases:
144
                return self.aliases[nodeid]
145
            else:
146 View Code Duplication
                return ua.NodeId(getattr(ua.ObjectIds, nodeid))
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
147
148
    def add_object(self, obj):
149
        node = self._get_node(obj)
150
        attrs = ua.ObjectAttributes()
151
        if obj.desc:
152
            attrs.Description = ua.LocalizedText(obj.desc)
153 1
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
154
        attrs.EventNotifier = obj.eventnotifier
155
        node.NodeAttributes = attrs
156
        res = self.server.iserver.isession.add_nodes([node])
157
        self._add_refs(obj)
158
        res[0].StatusCode.check()
159 View Code Duplication
        return res[0].AddedNodeId
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
160
161
    def add_object_type(self, obj):
162
        node = self._get_node(obj)
163
        attrs = ua.ObjectTypeAttributes()
164
        if obj.desc:
165
            attrs.Description = ua.LocalizedText(obj.desc)
166
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
167
        attrs.IsAbstract = obj.abstract
168
        node.NodeAttributes = attrs
169 1
        res = self.server.iserver.isession.add_nodes([node])
170
        self._add_refs(obj)
171
        res[0].StatusCode.check()
172
        return res[0].AddedNodeId
173
174
    def add_variable(self, obj):
175
        node = self._get_node(obj)
176
        attrs = ua.VariableAttributes()
177
        if obj.desc:
178
            attrs.Description = ua.LocalizedText(obj.desc)
179
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
180
        attrs.DataType = self.to_nodeid(obj.datatype)
181 1
        print("VAL", obj.value)
182 1
        if obj.value is not None:
183 1
            attrs.Value = self._add_variable_value(obj,)
184
        if obj.rank:
185
            attrs.ValueRank = obj.rank
186
        if obj.accesslevel:
187
            attrs.AccessLevel = obj.accesslevel
188
        if obj.useraccesslevel:
189
            attrs.UserAccessLevel = obj.useraccesslevel
190
        if obj.minsample:
191
            attrs.MinimumSamplingInterval = obj.minsample
192
        if obj.dimensions:
193
            attrs.ArrayDimensions = obj.dimensions
194
        node.NodeAttributes = attrs
195
        res = self.server.iserver.isession.add_nodes([node])
196
        self._add_refs(obj)
197
        res[0].StatusCode.check()
198
        return res[0].AddedNodeId
199
200
    def _make_ext_obj(sefl, obj):
201
        ext = getattr(ua, obj.objname)()
202
        for name, val in obj.body:
203
            if type(val) is str:
204
                raise Exception("Error val should a dict", name, val)
205
            else:
206
                for attname, v in val:
207
                    # tow possible values:
208
                    # either we get value directly
209
                    # or a dict if it s an object or a list
210
                    if type(v) is str:
211
                        setattr(ext, attname, to_python(v, ext, attname))
212
                    else:
213
                        # so we habve either an object or a list...
214
                        obj2 = getattr(ext, attname)
215
                        if isinstance(obj2, ua.NodeId):
216
                            for attname2, v2 in v:
217
                                if attname2 == "Identifier":
218
                                    obj2 = ua.NodeId.from_string(v2)
219
                                    setattr(ext, attname, obj2)
220
                                    break
221
                        elif not isinstance(obj2, ua.NodeId) and not hasattr(obj2, "ua_types"):
222
                            # we probably have a list
223
                            my_list = []
224
                            for vtype, v2 in v:
225
                                my_list.append(xmlparser.ua_type_to_python(v2, vtype))
226
                            setattr(ext, attname, my_list)
227
                        else:
228
                            for attname2, v2 in v:
229
                                setattr(obj2, attname2, to_python(v2, obj2, attname2))
230
                            setattr(ext, attname, obj2)
231
        return ext
232
233
    def _add_variable_value(self, obj):
234
        """
235
        Returns the value for a Variable based on the objects valuetype. 
236
        """
237
        print("KKKKKKKKKKKKKKKKKK", obj, obj.valuetype)
238
        if obj.valuetype == 'ListOfExtensionObject':
239
            values = []
240
            for ext in obj.value:
241
                extobj = self._make_ext_obj(ext)
242
                values.append(extobj)
243
            return values
244
        elif obj.valuetype.startswith("ListOf"):
245
            vtype = obj.valuetype[6:]
246
            if hasattr(ua.ua_binary.Primitives, vtype):
247
                return ua.Variant(obj.value, getattr(ua.VariantType, vtype))
248
            else:
249
                return [getattr(ua, vtype)(v) for v in obj.value]
250
        elif obj.valuetype == 'ExtensionObject':
251
            extobj = self._make_ext_obj(obj.value)
252
            return ua.Variant(extobj, getattr(ua.VariantType, obj.valuetype))
253
        else:
254 View Code Duplication
            return ua.Variant(obj.value, getattr(ua.VariantType, obj.valuetype))
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
255
256
    def add_variable_type(self, obj):
257
        node = self._get_node(obj)
258
        attrs = ua.VariableTypeAttributes()
259
        if obj.desc:
260
            attrs.Description = ua.LocalizedText(obj.desc)
261
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
262
        attrs.DataType = self.to_nodeid(obj.datatype)
263
        if obj.value and len(obj.value) == 1:
264
            attrs.Value = obj.value[0]
265
        if obj.rank:
266
            attrs.ValueRank = obj.rank
267
        if obj.abstract:
268
            attrs.IsAbstract = obj.abstract
269
        if obj.dimensions:
270
            attrs.ArrayDimensions = obj.dimensions
271
        node.NodeAttributes = attrs
272
        res = self.server.iserver.isession.add_nodes([node])
273
        self._add_refs(obj)
274
        res[0].StatusCode.check()
275 View Code Duplication
        return res[0].AddedNodeId
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
276
277
    def add_method(self, obj):
278
        node = self._get_node(obj)
279
        attrs = ua.MethodAttributes()
280
        if obj.desc:
281
            attrs.Description = ua.LocalizedText(obj.desc)
282
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
283
        if obj.accesslevel:
284
            attrs.AccessLevel = obj.accesslevel
285
        if obj.useraccesslevel:
286
            attrs.UserAccessLevel = obj.useraccesslevel
287
        if obj.minsample:
288
            attrs.MinimumSamplingInterval = obj.minsample
289
        if obj.dimensions:
290
            attrs.ArrayDimensions = obj.dimensions
291
        node.NodeAttributes = attrs
292
        res = self.server.iserver.isession.add_nodes([node])
293
        self._add_refs(obj)
294
        res[0].StatusCode.check()
295
        return res[0].AddedNodeId
296
297
    def add_reference_type(self, obj):
298
        node = self._get_node(obj)
299
        attrs = ua.ReferenceTypeAttributes()
300
        if obj.desc:
301
            attrs.Description = ua.LocalizedText(obj.desc)
302
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
303
        if obj. inversename:
304
            attrs.InverseName = ua.LocalizedText(obj.inversename)
305
        if obj.abstract:
306
            attrs.IsAbstract = obj.abstract
307
        if obj.symmetric:
308
            attrs.Symmetric = obj.symmetric
309
        node.NodeAttributes = attrs
310
        res = self.server.iserver.isession.add_nodes([node])
311
        self._add_refs(obj)
312
        res[0].StatusCode.check()
313 View Code Duplication
        return res[0].AddedNodeId
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
314
315
    def add_datatype(self, obj):
316
        node = self._get_node(obj)
317
        attrs = ua.DataTypeAttributes()
318
        if obj.desc:
319
            attrs.Description = ua.LocalizedText(obj.desc)
320
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
321
        if obj.abstract:
322
            attrs.IsAbstract = obj.abstract
323
        node.NodeAttributes = attrs
324
        res = self.server.iserver.isession.add_nodes([node])
325
        self._add_refs(obj)
326
        res[0].StatusCode.check()
327
        return res[0].AddedNodeId
328
329
    def _add_refs(self, obj):
330
        if not obj.refs:
331
            return
332
        refs = []
333
        for data in obj.refs:
334
            ref = ua.AddReferencesItem()
335
            ref.IsForward = True
336
            ref.ReferenceTypeId = self.to_nodeid(data.reftype)
337
            ref.SourceNodeId = self._migrate_ns(obj.nodeid)
338
            ref.TargetNodeClass = ua.NodeClass.DataType
339
            ref.TargetNodeId = self._migrate_ns(self.to_nodeid(data.target))
340
            refs.append(ref)
341
        self.server.iserver.isession.add_references(refs)
342
343
    def _sort_nodes_by_parentid(self, ndatas):
344
        """
345
        Sort the list of nodes according theire parent node in order to respect
346
        the depency between nodes.
347
348
        :param nodes: list of NodeDataObjects
349
        :returns: list of sorted nodes
350
        """
351
        _ndatas = list(ndatas)
352
        # list of node ids that are already sorted / inserted
353
        sorted_nodes_ids = []
354
        # list of sorted nodes (i.e. XML Elements)
355
        sorted_ndatas = []
356
        all_node_ids = [data.nodeid for data in ndatas]
357
        # list of namespace indexes that are relevant for this import
358
        # we can only respect ordering nodes for namespaces indexes that
359
        # are defined in the xml file itself. Thus we assume that all other
360
        # references namespaces are already known to the server and should
361
        # not create any dependency problems (like "NodeNotFound")
362
        relevant_namespaces = [str(ns) for ns in self.namespaces.keys()]
363
        while len(_ndatas) > 0:
364
            pop_nodes = []
365
            print(_ndatas)
366
            for ndata in _ndatas:
367
                # Insert nodes that
368
                #   (1) have no parent / parent_ns is None (e.g. namespace 0)
369
                #   (2) ns is not in list of relevant namespaces
370
                if ndata.nodeid.NamespaceIndex not in relevant_namespaces or \
371
                        ndata.parent is None or \
372
                        ndata.parent not in all_node_ids:
373
                    sorted_ndatas.append(ndata)
374
                    sorted_nodes_ids.append(ndata.nodeid)
375
                    pop_nodes.append(ndata)
376
                else:
377
                    # Check if the nodes parent is already in the list of
378
                    # inserted nodes
379
                    if ndata.parent in sorted_nodes_ids:
380
                        sorted_ndatas.append(ndata)
381
                        sorted_nodes_ids.append(ndata.nodeid)
382
                        pop_nodes.append(ndata)
383
384
            # Remove inserted nodes from the list
385
            for ndata in pop_nodes:
386
                _ndatas.pop(_ndatas.index(ndata))
387
        print("SORTD", sorted_ndatas)
388
        return sorted_ndatas
389