Completed
Pull Request — master (#327)
by Olivier
03:39
created

XmlImporter.import_xml()   F

Complexity

Conditions 9

Size

Total Lines 36

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 21
CRAP Score 11.7194

Importance

Changes 3
Bugs 1 Features 0
Metric Value
cc 9
dl 0
loc 36
ccs 21
cts 31
cp 0.6774
crap 11.7194
rs 3
c 3
b 1
f 0
1
"""G
2
add node defined in XML to address space
3
format is the one from opc-ua specification
4
"""
5 1
import logging
6
import sys
7
8 1
from opcua import ua
9 1
from opcua.common import xmlparser
10
11
12 1
def ua_type_to_python(val, uatype):
13
    if uatype.startswith("Int") or uatype.startswith("UInt"):
14 1
        return int(val)
15 1
    elif uatype in ("String"):
16 1
        return val
17 1
    elif uatype in ("Bytes", "Bytes", "ByteString", "ByteArray"):
18
        if sys.version_info.major > 2:
19 1
            return bytes(val, 'utf8')
20 1
        else:
21 1
            return val
22 1
    else:
23 1
        raise Exception("uatype nopt handled", uatype, " for val ", val)
24 1
25 1
26
def to_python(val, obj, attname):
27 1
    if isinstance(obj, ua.NodeId) and attname == "Identifier":
28 1
        raise RuntimeError("Error we should parse a NodeId here")
29
        return ua.NodeId.from_string(val)
30
    else:
31
        return ua_type_to_python(val, obj.ua_types[attname])
32
33
34
class XmlImporter(object):
35
36
    def __init__(self, server):
37
        self.logger = logging.getLogger(__name__)
38
        self.parser = None
39
        self.server = server
40 1
        self.namespaces = {}
41 1
        self.aliases = {}
42 1
43 1
    def _map_namespaces(self, namespaces_uris):
44 1
        """
45 1
        creates a mapping between the namespaces in the xml file and in the server.
46 1
        if not present the namespace is registered.
47 1
        """
48 1
        print("original ns", self.server.get_namespace_array())
49 1
        print("xml ns", namespaces_uris)
50 1
        namespaces = {}
51 1
        for ns_index, ns_uri in enumerate(namespaces_uris):
52
            ns_server_index = self.server.register_namespace(ns_uri)
53 1
            namespaces[ns_index + 1] = (ns_server_index, ns_uri)
54 1
        print("new ns", self.server.get_namespace_array())
55
        print(namespaces)
56 1
        return namespaces
57
58
    def _map_aliases(self, aliases):
59 1
        """
60
        maps the import aliases to the correct namespaces        
61 1
        """
62 1
        aliases_mapped = {}
63 1
        for alias, node_id in aliases.items():
64
            aliases_mapped[alias] = self._migrate_ns(self.to_nodeid(node_id))
65
        return aliases_mapped
66 1
67
    def import_xml(self, xmlpath):
68 1
        """
69
        import xml and return added nodes
70 1
        """
71 1
        self.logger.info("Importing XML file %s", xmlpath)
72 1
        self.parser = xmlparser.XMLParser(xmlpath)
73 1
74 1
        dnodes = self.parser.get_node_datas()
75 1
        dnodes = self.make_objects(dnodes)
76 1
77 1
        self.namespaces = self._map_namespaces(self.parser.get_used_namespaces())
78 1
        self.aliases = self._map_aliases(self.parser.get_aliases())
79 1
80
        nodes_parsed = self._sort_nodes_by_parentid(dnodes)
81 1
82
        nodes = []
83
        for nodedata in nodes_parsed:  # self.parser:
84
            if nodedata.nodetype == 'UAObject':
85
                node = self.add_object(nodedata)
86
            elif nodedata.nodetype == 'UAObjectType':
87
                node = self.add_object_type(nodedata)
88
            elif nodedata.nodetype == 'UAVariable':
89
                node = self.add_variable(nodedata)
90
            elif nodedata.nodetype == 'UAVariableType':
91
                node = self.add_variable_type(nodedata)
92 1
            elif nodedata.nodetype == 'UAReferenceType':
93 1
                node = self.add_reference_type(nodedata)
94 1
            elif nodedata.nodetype == 'UADataType':
95 1
                node = self.add_datatype(nodedata)
96
            elif nodedata.nodetype == 'UAMethod':
97 1
                node = self.add_method(nodedata)
98 1
            else:
99
                self.logger.warning("Not implemented node type: %s ", nodedata.nodetype)
100 1
                continue
101 1
            nodes.append(node)
102 1
        return nodes
103 1
    
104 1
    def make_objects(self, node_datas):
105
        new_nodes = []
106 1
        for ndata in node_datas:
107
            ndata.nodeid = ua.NodeId.from_string(ndata.nodeid)
108 1
            ndata.browsename = ua.QualifiedName.from_string(ndata.browsename)
109
            if ndata.parent:
110 1
                ndata.parent = ua.NodeId.from_string(ndata.parent)
111
            if ndata.parentlink:
112 1
                ndata.parentlink = self.to_nodeid(ndata.parentlink)
113 1
            if ndata.typedef:
114 1
                ndata.typedef = self.to_nodeid(ndata.typedef)
115
            new_nodes.append(ndata)
116 1
        return new_nodes
117
118
    def _migrate_ns(self, nodeid):
119
        """
120
        Check if the index of nodeid or browsename  given in the xml model file
121
        must be converted to a already existing namespace id based on the files
122
        namespace uri
123
124
        :returns: NodeId (str)
125
        """
126
        if nodeid.NamespaceIndex in self.namespaces:
127
            print("Migrating", nodeid, " to ", end="")
128
            nodeid.NamespaceIndex = self.namespaces[nodeid.NamespaceIndex][0]
129
            print(nodeid)
130
        return nodeid
131
132
    def _get_node(self, obj):
133
        node = ua.AddNodesItem()
134
        node.RequestedNewNodeId = self._migrate_ns(obj.nodeid)
135 1
        node.BrowseName = self._migrate_ns(obj.browsename)
136
        node.NodeClass = getattr(ua.NodeClass, obj.nodetype[2:])
137
        if obj.parent:
138
            node.ParentNodeId = self._migrate_ns(obj.parent)
139
        if obj.parentlink:
140
            node.ReferenceTypeId = self._migrate_ns(obj.parentlink)
141
        if obj.typedef:
142
            node.TypeDefinition = self._migrate_ns(obj.typedef)
143
        return node
144
145
    def to_nodeid(self, nodeid):
146
        if not nodeid:
147
            return ua.NodeId(ua.ObjectIds.String)
148
        elif "=" in nodeid:
149
            return ua.NodeId.from_string(nodeid)
150
        elif hasattr(ua.ObjectIds, nodeid):
151
            return ua.NodeId(getattr(ua.ObjectIds, nodeid))
152
        else:
153 1
            if nodeid in self.aliases:
154
                nodeid = self.aliases[nodeid]
155
            else:
156
                nodeid = "i={}".format(getattr(ua.ObjectIds, nodeid))
157
            return ua.NodeId.from_string(nodeid)
158
159 View Code Duplication
    def add_object(self, obj):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
160
        node = self._get_node(obj)
161
        attrs = ua.ObjectAttributes()
162
        if obj.desc:
163
            attrs.Description = ua.LocalizedText(obj.desc)
164
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
165
        attrs.EventNotifier = obj.eventnotifier
166
        node.NodeAttributes = attrs
167
        res = self.server.iserver.isession.add_nodes([node])
168
        self._add_refs(obj)
169 1
        return res[0].AddedNodeId
170
171 View Code Duplication
    def add_object_type(self, obj):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
172
        node = self._get_node(obj)
173
        attrs = ua.ObjectTypeAttributes()
174
        if obj.desc:
175
            attrs.Description = ua.LocalizedText(obj.desc)
176
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
177
        attrs.IsAbstract = obj.abstract
178
        node.NodeAttributes = attrs
179
        res = self.server.iserver.isession.add_nodes([node])
180
        self._add_refs(obj)
181 1
        return res[0].AddedNodeId
182 1
183 1
    def add_variable(self, obj):
184
        node = self._get_node(obj)
185
        attrs = ua.VariableAttributes()
186
        if obj.desc:
187
            attrs.Description = ua.LocalizedText(obj.desc)
188
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
189
        attrs.DataType = self.to_nodeid(obj.datatype)
190
        # if obj.value and len(obj.value) == 1:
191
        if obj.value is not None:
192
            attrs.Value = self._add_variable_value(obj,)
193
        if obj.rank:
194
            attrs.ValueRank = obj.rank
195
        if obj.accesslevel:
196
            attrs.AccessLevel = obj.accesslevel
197
        if obj.useraccesslevel:
198
            attrs.UserAccessLevel = obj.useraccesslevel
199
        if obj.minsample:
200
            attrs.MinimumSamplingInterval = obj.minsample
201
        if obj.dimensions:
202
            attrs.ArrayDimensions = obj.dimensions
203
        node.NodeAttributes = attrs
204
        res = self.server.iserver.isession.add_nodes([node])
205
        self._add_refs(obj)
206
        return res[0].AddedNodeId
207
208
    def _make_ext_obj(sefl, obj):
209
        ext = getattr(ua, obj.objname)()
210
        for name, val in obj.body:
211
            if type(val) is str:
212
                raise Exception("Error val should a dict", name, val)
213
            else:
214
                for attname, v in val:
215
                    # tow possible values:
216
                    # either we get value directly
217
                    # or a dict if it s an object or a list
218
                    if type(v) is str:
219
                        setattr(ext, attname, to_python(v, ext, attname))
220
                    else:
221
                        # so we habve either an object or a list...
222
                        obj2 = getattr(ext, attname)
223
                        if isinstance(obj2, ua.NodeId):
224
                            for attname2, v2 in v:
225
                                if attname2 == "Identifier":
226
                                    obj2 = ua.NodeId.from_string(v2)
227
                                    setattr(ext, attname, obj2)
228
                                    break
229
                        elif not isinstance(obj2, ua.NodeId) and not hasattr(obj2, "ua_types"):
230
                            # we probably have a list
231
                            my_list = []
232
                            for vtype, v2 in v:
233
                                my_list.append(ua_type_to_python(v2, vtype))
234
                            setattr(ext, attname, my_list)
235
                        else:
236
                            for attname2, v2 in v:
237
                                setattr(obj2, attname2, to_python(v2, obj2, attname2))
238
                            setattr(ext, attname, obj2)
239
        return ext
240
241
    def _add_variable_value(self, obj):
242
        """
243
        Returns the value for a Variable based on the objects valuetype. 
244
        """
245
        if obj.valuetype == 'ListOfExtensionObject':
246
            values = []
247
            for ext in obj.value:
248
                extobj = self._make_ext_obj(ext)
249
                values.append(extobj)
250
            return values
251
        elif obj.valuetype.startswith("ListOf"):
252
            vtype = obj.valuetype[6:]
253
            return [getattr(ua, vtype)(v) for v in obj.value]
254
        elif obj.valuetype == 'ExtensionObject':
255
            extobj = self._make_ext_obj(obj.value)
256
            return ua.Variant(extobj, getattr(ua.VariantType, obj.valuetype))
257
        else:
258
            return ua.Variant(obj.value, getattr(ua.VariantType, obj.valuetype))
259
260 View Code Duplication
    def add_variable_type(self, obj):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
261
        node = self._get_node(obj)
262
        attrs = ua.VariableTypeAttributes()
263
        if obj.desc:
264
            attrs.Description = ua.LocalizedText(obj.desc)
265
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
266
        attrs.DataType = self.to_nodeid(obj.datatype)
267
        if obj.value and len(obj.value) == 1:
268
            attrs.Value = obj.value[0]
269
        if obj.rank:
270
            attrs.ValueRank = obj.rank
271
        if obj.abstract:
272
            attrs.IsAbstract = obj.abstract
273
        if obj.dimensions:
274
            attrs.ArrayDimensions = obj.dimensions
275
        node.NodeAttributes = attrs
276
        res = self.server.iserver.isession.add_nodes([node])
277
        self._add_refs(obj)
278
        return res[0].AddedNodeId
279
280 View Code Duplication
    def add_method(self, obj):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
281
        node = self._get_node(obj)
282
        attrs = ua.MethodAttributes()
283
        if obj.desc:
284
            attrs.Description = ua.LocalizedText(obj.desc)
285
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
286
        if obj.accesslevel:
287
            attrs.AccessLevel = obj.accesslevel
288
        if obj.useraccesslevel:
289
            attrs.UserAccessLevel = obj.useraccesslevel
290
        if obj.minsample:
291
            attrs.MinimumSamplingInterval = obj.minsample
292
        if obj.dimensions:
293
            attrs.ArrayDimensions = obj.dimensions
294
        node.NodeAttributes = attrs
295
        res = self.server.iserver.isession.add_nodes([node])
296
        self._add_refs(obj)
297
        return res[0].AddedNodeId
298
299
    def add_reference_type(self, obj):
300
        node = self._get_node(obj)
301
        attrs = ua.ReferenceTypeAttributes()
302
        if obj.desc:
303
            attrs.Description = ua.LocalizedText(obj.desc)
304
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
305
        if obj. inversename:
306
            attrs.InverseName = ua.LocalizedText(obj.inversename)
307
        if obj.abstract:
308
            attrs.IsAbstract = obj.abstract
309
        if obj.symmetric:
310
            attrs.Symmetric = obj.symmetric
311
        node.NodeAttributes = attrs
312
        res = self.server.iserver.isession.add_nodes([node])
313
        self._add_refs(obj)
314
        return res[0].AddedNodeId
315
316 View Code Duplication
    def add_datatype(self, obj):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
317
        node = self._get_node(obj)
318
        attrs = ua.DataTypeAttributes()
319
        if obj.desc:
320
            attrs.Description = ua.LocalizedText(obj.desc)
321
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
322
        if obj.abstract:
323
            attrs.IsAbstract = obj.abstract
324
        node.NodeAttributes = attrs
325
        res = self.server.iserver.isession.add_nodes([node])
326
        self._add_refs(obj)
327
        return res[0].AddedNodeId
328
329
    def _add_refs(self, obj):
330
        if not obj.refs:
331
            return
332
        refs = []
333
        for data in obj.refs:
334
            ref = ua.AddReferencesItem()
335
            ref.IsForward = True
336
            ref.ReferenceTypeId = self.to_nodeid(data.reftype)
337
            ref.SourceNodeId = self._migrate_ns(obj.nodeid)
338
            ref.TargetNodeClass = ua.NodeClass.DataType
339
            ref.TargetNodeId = self._migrate_ns(self.to_nodeid(data.target))
340
            refs.append(ref)
341
        self.server.iserver.isession.add_references(refs)
342
343
    # FIX: wrong order of node sorting .. need to find out what is wrong
344
    def _sort_nodes_by_parentid(self, ndatas):
345
        """
346
        Sort the list of nodes according theire parent node in order to respect
347
        the depency between nodes.
348
349
        :param nodes: list of NodeDataObjects
350
        :returns: list of sorted nodes
351
        """
352
        _ndatas = list(ndatas)
353
        # list of node ids that are already sorted / inserted
354
        sorted_nodes_ids = []
355
        # list of sorted nodes (i.e. XML Elements)
356
        sorted_ndatas = []
357
        all_node_ids = [data.nodeid for data in ndatas]
358
        # list of namespace indexes that are relevant for this import
359
        # we can only respect ordering nodes for namespaces indexes that
360
        # are defined in the xml file itself. Thus we assume that all other
361
        # references namespaces are already known to the server and should
362
        # not create any dependency problems (like "NodeNotFound")
363
        relevant_namespaces = [str(ns) for ns in self.namespaces.keys()]
364
        while len(_ndatas) > 0:
365
            pop_nodes = []
366
            print(_ndatas)
367
            for ndata in _ndatas:
368
                # Insert nodes that
369
                #   (1) have no parent / parent_ns is None (e.g. namespace 0)
370
                #   (2) ns is not in list of relevant namespaces
371
                if ndata.nodeid.NamespaceIndex not in relevant_namespaces or \
372
                        ndata.parent is None or \
373
                        ndata.parent not in all_node_ids:
374
                    sorted_ndatas.append(ndata)
375
                    sorted_nodes_ids.append(ndata.nodeid)
376
                    pop_nodes.append(ndata)
377
                else:
378
                    # Check if the nodes parent is already in the list of
379
                    # inserted nodes
380
                    if ndata.parent in sorted_nodes_ids:
381
                        sorted_ndatas.append(ndata)
382
                        sorted_nodes_ids.append(ndata.nodeid)
383
                        pop_nodes.append(ndata)
384
385
            # Remove inserted nodes from the list
386
            for ndata in pop_nodes:
387
                _ndatas.pop(_ndatas.index(ndata))
388
        return sorted_ndatas
389