Completed
Push — master ( 777e81...136b6f )
by Olivier
05:10
created

XmlImporter._split_node_id()   A

Complexity

Conditions 3

Size

Total Lines 13

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 7
CRAP Score 3.6511

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 3
c 1
b 0
f 0
dl 0
loc 13
ccs 7
cts 12
cp 0.5833
crap 3.6511
rs 9.4285
1
"""G
2
add node defined in XML to address space
3
format is the one from opc-ua specification
4
"""
5 1
import logging
6
import sys
7
import re
8 1
import uuid
9 1
import dateutil.parser
10
from copy import copy
11
12 1
from opcua import ua
13
from opcua.common import xmlparser
14 1
15 1
16 1
def to_python(val, obj, attname):
17 1
    if isinstance(obj, ua.NodeId) and attname == "Identifier":
18
        raise RuntimeError("Error we should parse a NodeId here")
19 1
        return ua.NodeId.from_string(val)
20 1
    else:
21 1
        return xmlparser.ua_type_to_python(val, obj.ua_types[attname])
22 1
23 1
24 1
class XmlImporter(object):
25 1
26
    def __init__(self, server):
27 1
        self.logger = logging.getLogger(__name__)
28 1
        self.parser = None
29
        self.server = server
30
        self.namespaces = {}
31
        self.aliases = {}
32
33
    def _map_namespaces(self, namespaces_uris):
34
        """
35
        creates a mapping between the namespaces in the xml file and in the server.
36
        if not present the namespace is registered.
37
        """
38
        namespaces = {}
39
        for ns_index, ns_uri in enumerate(namespaces_uris):
40 1
            ns_server_index = self.server.register_namespace(ns_uri)
41 1
            namespaces[ns_index + 1] = ns_server_index
42 1
        return namespaces
43 1
44 1
    def _map_aliases(self, aliases):
45 1
        """
46 1
        maps the import aliases to the correct namespaces        
47 1
        """
48 1
        aliases_mapped = {}
49 1
        for alias, node_id in aliases.items():
50 1
            aliases_mapped[alias] = self._migrate_ns(self.to_nodeid(node_id))
51 1
        return aliases_mapped
52
53 1
    def import_xml(self, xmlpath):
54 1
        """
55
        import xml and return added nodes
56 1
        """
57
        self.logger.info("Importing XML file %s", xmlpath)
58
        self.parser = xmlparser.XMLParser(xmlpath)
59 1
60
        dnodes = self.parser.get_node_datas()
61 1
        dnodes = self.make_objects(dnodes)
62 1
63 1
        self.namespaces = self._map_namespaces(self.parser.get_used_namespaces())
64
        self.aliases = self._map_aliases(self.parser.get_aliases())
65
66 1
        nodes_parsed = self._sort_nodes_by_parentid(dnodes)
67
68 1
        nodes = []
69
        for nodedata in nodes_parsed:  # self.parser:
70 1
            if nodedata.nodetype == 'UAObject':
71 1
                node = self.add_object(nodedata)
72 1
            elif nodedata.nodetype == 'UAObjectType':
73 1
                node = self.add_object_type(nodedata)
74 1
            elif nodedata.nodetype == 'UAVariable':
75 1
                node = self.add_variable(nodedata)
76 1
            elif nodedata.nodetype == 'UAVariableType':
77 1
                node = self.add_variable_type(nodedata)
78 1
            elif nodedata.nodetype == 'UAReferenceType':
79 1
                node = self.add_reference_type(nodedata)
80
            elif nodedata.nodetype == 'UADataType':
81 1
                node = self.add_datatype(nodedata)
82
            elif nodedata.nodetype == 'UAMethod':
83
                node = self.add_method(nodedata)
84
            else:
85
                self.logger.warning("Not implemented node type: %s ", nodedata.nodetype)
86
                continue
87
            nodes.append(node)
88
        return nodes
89
    
90
    def make_objects(self, node_datas):
91
        new_nodes = []
92 1
        for ndata in node_datas:
93 1
            ndata.nodeid = ua.NodeId.from_string(ndata.nodeid)
94 1
            ndata.browsename = ua.QualifiedName.from_string(ndata.browsename)
95 1
            if ndata.parent:
96
                ndata.parent = ua.NodeId.from_string(ndata.parent)
97 1
            if ndata.parentlink:
98 1
                ndata.parentlink = self.to_nodeid(ndata.parentlink)
99
            if ndata.typedef:
100 1
                ndata.typedef = self.to_nodeid(ndata.typedef)
101 1
            new_nodes.append(ndata)
102 1
        return new_nodes
103 1
104 1
    def _migrate_ns(self, nodeid):
105
        """
106 1
        Check if the index of nodeid or browsename  given in the xml model file
107
        must be converted to a already existing namespace id based on the files
108 1
        namespace uri
109
110 1
        :returns: NodeId (str)
111
        """
112 1
        if nodeid.NamespaceIndex in self.namespaces:
113 1
            nodeid = copy(nodeid)
114 1
            nodeid.NamespaceIndex = self.namespaces[nodeid.NamespaceIndex]
115
        return nodeid
116 1
117
    def _get_node(self, obj):
118
        node = ua.AddNodesItem()
119
        node.RequestedNewNodeId = self._migrate_ns(obj.nodeid)
120
        node.BrowseName = self._migrate_ns(obj.browsename)
121
        node.NodeClass = getattr(ua.NodeClass, obj.nodetype[2:])
122
        if obj.parent:
123
            node.ParentNodeId = self._migrate_ns(obj.parent)
124
        if obj.parentlink:
125
            node.ReferenceTypeId = self._migrate_ns(obj.parentlink)
126
        if obj.typedef:
127
            node.TypeDefinition = self._migrate_ns(obj.typedef)
128
        return node
129
130
    def to_nodeid(self, nodeid):
131
        if isinstance(nodeid, ua.NodeId):
132
            return nodeid
133
        elif not nodeid:
134
            return ua.NodeId(ua.ObjectIds.String)
135 1
        elif "=" in nodeid:
136
            return ua.NodeId.from_string(nodeid)
137
        elif hasattr(ua.ObjectIds, nodeid):
138
            return ua.NodeId(getattr(ua.ObjectIds, nodeid))
139
        else:
140
            if nodeid in self.aliases:
141
                return self.aliases[nodeid]
142
            else:
143
                return ua.NodeId(getattr(ua.ObjectIds, nodeid))
144
145 View Code Duplication
    def add_object(self, obj):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
146
        node = self._get_node(obj)
147
        attrs = ua.ObjectAttributes()
148
        if obj.desc:
149
            attrs.Description = ua.LocalizedText(obj.desc)
150
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
151
        attrs.EventNotifier = obj.eventnotifier
152
        node.NodeAttributes = attrs
153 1
        res = self.server.iserver.isession.add_nodes([node])
154
        self._add_refs(obj)
155
        res[0].StatusCode.check()
156
        return res[0].AddedNodeId
157
158 View Code Duplication
    def add_object_type(self, obj):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
159
        node = self._get_node(obj)
160
        attrs = ua.ObjectTypeAttributes()
161
        if obj.desc:
162
            attrs.Description = ua.LocalizedText(obj.desc)
163
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
164
        attrs.IsAbstract = obj.abstract
165
        node.NodeAttributes = attrs
166
        res = self.server.iserver.isession.add_nodes([node])
167
        self._add_refs(obj)
168
        res[0].StatusCode.check()
169 1
        return res[0].AddedNodeId
170
171
    def add_variable(self, obj):
172
        node = self._get_node(obj)
173
        attrs = ua.VariableAttributes()
174
        if obj.desc:
175
            attrs.Description = ua.LocalizedText(obj.desc)
176
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
177
        attrs.DataType = self.to_nodeid(obj.datatype)
178
        if obj.value is not None:
179
            attrs.Value = self._add_variable_value(obj,)
180
        if obj.rank:
181 1
            attrs.ValueRank = obj.rank
182 1
        if obj.accesslevel:
183 1
            attrs.AccessLevel = obj.accesslevel
184
        if obj.useraccesslevel:
185
            attrs.UserAccessLevel = obj.useraccesslevel
186
        if obj.minsample:
187
            attrs.MinimumSamplingInterval = obj.minsample
188
        if obj.dimensions:
189
            attrs.ArrayDimensions = obj.dimensions
190
        node.NodeAttributes = attrs
191
        res = self.server.iserver.isession.add_nodes([node])
192
        self._add_refs(obj)
193
        res[0].StatusCode.check()
194
        return res[0].AddedNodeId
195
196
    def _make_ext_obj(sefl, obj):
197
        ext = getattr(ua, obj.objname)()
198
        for name, val in obj.body:
199
            if type(val) is str:
200
                raise Exception("Error val should a dict", name, val)
201
            else:
202
                for attname, v in val:
203
                    # tow possible values:
204
                    # either we get value directly
205
                    # or a dict if it s an object or a list
206
                    if type(v) is str:
207
                        setattr(ext, attname, to_python(v, ext, attname))
208
                    else:
209
                        # so we have either an object or a list...
210
                        obj2 = getattr(ext, attname)
211
                        if isinstance(obj2, ua.NodeId):
212
                            for attname2, v2 in v:
213
                                if attname2 == "Identifier":
214
                                    obj2 = ua.NodeId.from_string(v2)
215
                                    setattr(ext, attname, obj2)
216
                                    break
217
                        elif not isinstance(obj2, ua.NodeId) and not hasattr(obj2, "ua_types"):
218
                            # we probably have a list
219
                            my_list = []
220
                            for vtype, v2 in v:
221
                                my_list.append(xmlparser.ua_type_to_python(v2, vtype))
222
                            setattr(ext, attname, my_list)
223
                        else:
224
                            for attname2, v2 in v:
225
                                setattr(obj2, attname2, to_python(v2, obj2, attname2))
226
                            setattr(ext, attname, obj2)
227
        return ext
228
229
    def _add_variable_value(self, obj):
230
        """
231
        Returns the value for a Variable based on the objects value type.
232
        """
233
        if obj.valuetype == 'ListOfExtensionObject':
234
            values = []
235
            for ext in obj.value:
236
                extobj = self._make_ext_obj(ext)
237
                values.append(extobj)
238
            return values
239
        elif obj.valuetype.startswith("ListOf"):
240
            vtype = obj.valuetype[6:]
241
            if hasattr(ua.ua_binary.Primitives, vtype):
242
                return ua.Variant(obj.value, getattr(ua.VariantType, vtype))
243
            else:
244
                return [getattr(ua, vtype)(v) for v in obj.value]
245
        elif obj.valuetype == 'ExtensionObject':
246
            extobj = self._make_ext_obj(obj.value)
247
            return ua.Variant(extobj, getattr(ua.VariantType, obj.valuetype))
248
        elif obj.valuetype == 'DateTime':
249
            return ua.Variant(dateutil.parser.parse(obj.value), getattr(ua.VariantType, obj.valuetype))
250
        elif obj.valuetype == 'Guid':
251
            return ua.Variant(uuid.UUID(obj.value), getattr(ua.VariantType, obj.valuetype))
252
        else:
253
            return ua.Variant(obj.value, getattr(ua.VariantType, obj.valuetype))
254
255 View Code Duplication
    def add_variable_type(self, obj):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
256
        node = self._get_node(obj)
257
        attrs = ua.VariableTypeAttributes()
258
        if obj.desc:
259
            attrs.Description = ua.LocalizedText(obj.desc)
260
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
261
        attrs.DataType = self.to_nodeid(obj.datatype)
262
        if obj.value and len(obj.value) == 1:
263
            attrs.Value = obj.value[0]
264
        if obj.rank:
265
            attrs.ValueRank = obj.rank
266
        if obj.abstract:
267
            attrs.IsAbstract = obj.abstract
268
        if obj.dimensions:
269
            attrs.ArrayDimensions = obj.dimensions
270
        node.NodeAttributes = attrs
271
        res = self.server.iserver.isession.add_nodes([node])
272
        self._add_refs(obj)
273
        res[0].StatusCode.check()
274
        return res[0].AddedNodeId
275
276 View Code Duplication
    def add_method(self, obj):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
277
        node = self._get_node(obj)
278
        attrs = ua.MethodAttributes()
279
        if obj.desc:
280
            attrs.Description = ua.LocalizedText(obj.desc)
281
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
282
        if obj.accesslevel:
283
            attrs.AccessLevel = obj.accesslevel
284
        if obj.useraccesslevel:
285
            attrs.UserAccessLevel = obj.useraccesslevel
286
        if obj.minsample:
287
            attrs.MinimumSamplingInterval = obj.minsample
288
        if obj.dimensions:
289
            attrs.ArrayDimensions = obj.dimensions
290
        node.NodeAttributes = attrs
291
        res = self.server.iserver.isession.add_nodes([node])
292
        self._add_refs(obj)
293
        res[0].StatusCode.check()
294
        return res[0].AddedNodeId
295
296
    def add_reference_type(self, obj):
297
        node = self._get_node(obj)
298
        attrs = ua.ReferenceTypeAttributes()
299
        if obj.desc:
300
            attrs.Description = ua.LocalizedText(obj.desc)
301
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
302
        if obj. inversename:
303
            attrs.InverseName = ua.LocalizedText(obj.inversename)
304
        if obj.abstract:
305
            attrs.IsAbstract = obj.abstract
306
        if obj.symmetric:
307
            attrs.Symmetric = obj.symmetric
308
        node.NodeAttributes = attrs
309
        res = self.server.iserver.isession.add_nodes([node])
310
        self._add_refs(obj)
311
        res[0].StatusCode.check()
312
        return res[0].AddedNodeId
313
314 View Code Duplication
    def add_datatype(self, obj):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
315
        node = self._get_node(obj)
316
        attrs = ua.DataTypeAttributes()
317
        if obj.desc:
318
            attrs.Description = ua.LocalizedText(obj.desc)
319
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
320
        if obj.abstract:
321
            attrs.IsAbstract = obj.abstract
322
        node.NodeAttributes = attrs
323
        res = self.server.iserver.isession.add_nodes([node])
324
        self._add_refs(obj)
325
        res[0].StatusCode.check()
326
        return res[0].AddedNodeId
327
328
    def _add_refs(self, obj):
329
        if not obj.refs:
330
            return
331
        refs = []
332
        for data in obj.refs:
333
            ref = ua.AddReferencesItem()
334
            ref.IsForward = True
335
            ref.ReferenceTypeId = self.to_nodeid(data.reftype)
336
            ref.SourceNodeId = self._migrate_ns(obj.nodeid)
337
            ref.TargetNodeClass = ua.NodeClass.DataType
338
            ref.TargetNodeId = self._migrate_ns(self.to_nodeid(data.target))
339
            refs.append(ref)
340
        self.server.iserver.isession.add_references(refs)
341
342
    def _sort_nodes_by_parentid(self, ndatas):
343
        """
344
        Sort the list of nodes according their parent node in order to respect
345
        the dependency between nodes.
346
347
        :param nodes: list of NodeDataObjects
348
        :returns: list of sorted nodes
349
        """
350
        _ndatas = list(ndatas)
351
        # list of node ids that are already sorted / inserted
352
        sorted_nodes_ids = []
353
        # list of sorted nodes (i.e. XML Elements)
354
        sorted_ndatas = []
355
        all_node_ids = [data.nodeid for data in ndatas]
356
        # list of namespace indexes that are relevant for this import
357
        # we can only respect ordering nodes for namespaces indexes that
358
        # are defined in the xml file itself. Thus we assume that all other
359
        # references namespaces are already known to the server and should
360
        # not create any dependency problems (like "NodeNotFound")
361
        while len(_ndatas) > 0:
362
            pop_nodes = []
363
            for ndata in _ndatas:
364
                # Insert nodes that
365
                #   (1) have no parent / parent_ns is None (e.g. namespace 0)
366
                #   (2) ns is not in list of relevant namespaces
367
                if ndata.nodeid.NamespaceIndex not in self.namespaces or \
368
                        ndata.parent is None or \
369
                        ndata.parent not in all_node_ids:
370
                    sorted_ndatas.append(ndata)
371
                    sorted_nodes_ids.append(ndata.nodeid)
372
                    pop_nodes.append(ndata)
373
                else:
374
                    # Check if the nodes parent is already in the list of
375
                    # inserted nodes
376
                    if ndata.parent in sorted_nodes_ids:
377
                        sorted_ndatas.append(ndata)
378
                        sorted_nodes_ids.append(ndata.nodeid)
379
                        pop_nodes.append(ndata)
380
            # Remove inserted nodes from the list
381
            for ndata in pop_nodes:
382
                _ndatas.pop(_ndatas.index(ndata))
383
        return sorted_ndatas
384