Completed
Push — master ( c97137...1c6e5d )
by Olivier
03:49
created

XmlImporter.add_variable()   D

Complexity

Conditions 8

Size

Total Lines 24

Duplication

Lines 13
Ratio 54.17 %

Code Coverage

Tests 0
CRAP Score 72

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 8
dl 13
loc 24
ccs 0
cts 0
cp 0
crap 72
rs 4.3478
c 1
b 0
f 0
1
"""G
2
add node defined in XML to address space
3
format is the one from opc-ua specification
4
"""
5 1
import logging
6
import sys
7
import re
8 1
9 1
from opcua import ua
10
from opcua.common import xmlparser
11
12 1
13
def ua_type_to_python(val, uatype):
14 1
    if uatype.startswith("Int") or uatype.startswith("UInt"):
15 1
        return int(val)
16 1
    elif uatype in ("String"):
17 1
        return val
18
    elif uatype in ("Bytes", "Bytes", "ByteString", "ByteArray"):
19 1
        if sys.version_info.major > 2:
20 1
            return bytes(val, 'utf8')
21 1
        else:
22 1
            return val
23 1
    else:
24 1
        raise Exception("uatype nopt handled", uatype, " for val ", val)
25 1
26
27 1
def to_python(val, obj, attname):
28 1
    if isinstance(obj, ua.NodeId) and attname == "Identifier":
29
        raise RuntimeError("Error we should parse a NodeId here")
30
        return ua.NodeId.from_string(val)
31
    else:
32
        return ua_type_to_python(val, obj.ua_types[attname])
33
34
35
class XmlImporter(object):
36
37
    def __init__(self, server):
38
        self.logger = logging.getLogger(__name__)
39
        self.parser = None
40 1
        self.server = server
41 1
        self.namespaces = {}
42 1
        self.aliases = {}
43 1
        self._re_nodeid = re.compile(r"^ns=(?P<ns>\d+[^;]*);i=(?P<i>\d+)")
44 1
45 1
    def _map_namespaces(self, namespaces_uris, act_server):
46 1
        """
47 1
        creates a mapping between the namespaces in the xml file and in the server.
48 1
        if not present the namespace is registered.
49 1
        """
50 1
        namespaces = {}
51 1
        for ns_index, ns_uri in enumerate(namespaces_uris):
52
            ns_server_index = act_server.register_namespace(ns_uri)
53 1
            namespaces[ns_index + 1] = (ns_server_index, ns_uri)
54 1
            self.logger.info("namespace offset", ns_index + 1, (ns_server_index, ns_uri))
55
        return namespaces
56 1
57
    def _map_aliases(self, aliases):
58
        """
59 1
        maps the import aliases to the correct namespaces        
60
        """
61 1
        aliases_mapped = {}
62 1
        for alias, node_id in aliases.items():
63 1
            aliases_mapped[alias] = self._get_node_id(node_id)
64
        return aliases_mapped
65
66 1
    def import_xml(self, xmlpath, act_server):
67
        """
68 1
        import xml and return added nodes
69
        """
70 1
        self.logger.info("Importing XML file %s", xmlpath)
71 1
        self.parser = xmlparser.XMLParser(xmlpath, act_server)
72 1
73 1 View Code Duplication
        self.namespaces = self._map_namespaces(self.parser.get_used_namespaces(), act_server)
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
74 1
        self.aliases = self._map_aliases(self.parser.get_aliases())
75 1
76 1
        # The ordering of nodes currently only works if namespaces are
77 1
        # defined in XML.
78 1
        # Also, it is recommended not to use node ids without namespace prefix!
79 1
        nodes_parsed = self._sort_nodes_by_parentid(self.parser)
80
81 1
        nodes = []
82
        for nodedata in nodes_parsed:  # self.parser:
83
            if nodedata.nodetype == 'UAObject':
84
                node = self.add_object(nodedata)
85 View Code Duplication
            elif nodedata.nodetype == 'UAObjectType':
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
86
                node = self.add_object_type(nodedata)
87
            elif nodedata.nodetype == 'UAVariable':
88
                node = self.add_variable(nodedata)
89
            elif nodedata.nodetype == 'UAVariableType':
90
                node = self.add_variable_type(nodedata)
91
            elif nodedata.nodetype == 'UAReferenceType':
92 1
                node = self.add_reference_type(nodedata)
93 1
            elif nodedata.nodetype == 'UADataType':
94 1
                node = self.add_datatype(nodedata)
95 1
            elif nodedata.nodetype == 'UAMethod':
96
                node = self.add_method(nodedata)
97 1
            else:
98 1
                self.logger.warning("Not implemented node type: %s ", nodedata.nodetype)
99
                continue
100 1
            nodes.append(node)
101 1
        return nodes
102 1
103 1
    def _split_node_id(self, value):
104 1
        """
105
        Split the fq node id into namespace and id part.
106 1
107
        :returns: (namespace, id)
108 1
        """
109
        if not value:
110 1
            return (None, value)
111
        r_match = self._re_nodeid.search(value)
112 1
        if r_match:
113 1
            return r_match.groups()
114 1
115
        return (None, value)
116 1
117
    def _parse_bname(self, bname):
118
        """
119
        Parse a browsename and correct the namespace index.
120
        """
121
        if bname.find(':') != -1:
122
            browse_ns, browse_name = bname.split(':')
123
            if browse_ns:
124
                ns_server = self.namespaces.get(int(browse_ns), None)
125
                if ns_server:
126
                    return '%d:%s' % (ns_server[0], browse_name)
127
        return bname
128
129
    def _get_node_id(self, value):
130
        """
131
        Check if the nodeid given in the xml model file must be converted
132
        to a already existing namespace id based on the files namespace uri
133
134
        :returns: NodeId (str)
135 1
        """
136
        result = value
137
138
        node_ns, node_id = self._split_node_id(value)
139
        if node_ns:
140
            ns_server = self.namespaces.get(int(node_ns), None)
141
            if ns_server:
142
                result = "ns={};i={}".format(ns_server[0], node_id)
143
        return result
144
145
    def _get_node(self, obj):
146
        node = ua.AddNodesItem()
147
        node.RequestedNewNodeId = ua.NodeId.from_string(self._get_node_id(obj.nodeid))
148
        node.BrowseName = ua.QualifiedName.from_string(self._parse_bname(obj.browsename))
149
        node.NodeClass = getattr(ua.NodeClass, obj.nodetype[2:])
150
        if obj.parent:
151
            node.ParentNodeId = ua.NodeId.from_string(self._get_node_id(obj.parent))
152
        if obj.parentlink:
153 1
            node.ReferenceTypeId = self.to_nodeid(self._get_node_id(obj.parentlink))
154
        if obj.typedef:
155
            node.TypeDefinition = ua.NodeId.from_string(self._get_node_id(obj.typedef))
156
        return node
157
158
    def to_nodeid(self, nodeid):
159
        if not nodeid:
160
            return ua.NodeId(ua.ObjectIds.String)
161
        elif "=" in nodeid:
162
            return ua.NodeId.from_string(nodeid)
163
        elif hasattr(ua.ObjectIds, nodeid):
164
            return ua.NodeId(getattr(ua.ObjectIds, nodeid))
165
        else:
166
            if nodeid in self.aliases:
167
                nodeid = self.aliases[nodeid]
168
            else:
169 1
                nodeid = "i={}".format(getattr(ua.ObjectIds, nodeid))
170
            return ua.NodeId.from_string(nodeid)
171
172
    def add_object(self, obj):
173
        node = self._get_node(obj)
174
        attrs = ua.ObjectAttributes()
175
        if obj.desc:
176
            attrs.Description = ua.LocalizedText(obj.desc)
177
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
178
        attrs.EventNotifier = obj.eventnotifier
179
        node.NodeAttributes = attrs
180
        res = self.server.add_nodes([node])
181 1
        self._add_refs(obj)
182 1
        return res[0].AddedNodeId
183 1
184
    def add_object_type(self, obj):
185
        node = self._get_node(obj)
186
        attrs = ua.ObjectTypeAttributes()
187
        if obj.desc:
188
            attrs.Description = ua.LocalizedText(obj.desc)
189
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
190
        attrs.IsAbstract = obj.abstract
191
        node.NodeAttributes = attrs
192
        res = self.server.add_nodes([node])
193
        self._add_refs(obj)
194
        return res[0].AddedNodeId
195
196
    def add_variable(self, obj):
197
        node = self._get_node(obj)
198
        attrs = ua.VariableAttributes()
199
        if obj.desc:
200
            attrs.Description = ua.LocalizedText(obj.desc)
201
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
202
        attrs.DataType = self.to_nodeid(obj.datatype)
203
        # if obj.value and len(obj.value) == 1:
204
        if obj.value is not None:
205
            attrs.Value = self._add_variable_value(obj,)
206
        if obj.rank:
207 View Code Duplication
            attrs.ValueRank = obj.rank
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
208
        if obj.accesslevel:
209
            attrs.AccessLevel = obj.accesslevel
210
        if obj.useraccesslevel:
211
            attrs.UserAccessLevel = obj.useraccesslevel
212
        if obj.minsample:
213
            attrs.MinimumSamplingInterval = obj.minsample
214
        if obj.dimensions:
215
            attrs.ArrayDimensions = obj.dimensions
216
        node.NodeAttributes = attrs
217
        res = self.server.add_nodes([node])
218
        self._add_refs(obj)
219 View Code Duplication
        return res[0].AddedNodeId
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
220
221
    def _make_ext_obj(sefl, obj):
222
        ext = getattr(ua, obj.objname)()
223
        for name, val in obj.body:
224
            if type(val) is str:
225
                raise Exception("Error val should a dict", name, val)
226
            else:
227
                for attname, v in val:
228
                    # tow possible values:
229
                    # either we get value directly
230
                    # or a dict if it s an object or a list
231
                    if type(v) is str:
232
                        setattr(ext, attname, to_python(v, ext, attname))
233
                    else:
234
                        # so we habve either an object or a list...
235
                        obj2 = getattr(ext, attname)
236
                        if isinstance(obj2, ua.NodeId):
237
                            for attname2, v2 in v:
238 View Code Duplication
                                if attname2 == "Identifier":
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
239
                                    obj2 = ua.NodeId.from_string(v2)
240
                                    setattr(ext, attname, obj2)
241
                                    break
242
                        elif not isinstance(obj2, ua.NodeId) and not hasattr(obj2, "ua_types"):
243
                            # we probably have a list
244
                            my_list = []
245
                            for vtype, v2 in v:
246
                                my_list.append(ua_type_to_python(v2, vtype))
247
                            setattr(ext, attname, my_list)
248
                        else:
249
                            for attname2, v2 in v:
250
                                setattr(obj2, attname2, to_python(v2, obj2, attname2))
251
                            setattr(ext, attname, obj2)
252
        return ext
253
254
    def _add_variable_value(self, obj):
255
        """
256
        Returns the value for a Variable based on the objects valuetype. 
257
        """
258
        if obj.valuetype == 'ListOfExtensionObject':
259
            values = []
260
            for ext in obj.value:
261
                extobj = self._make_ext_obj(ext)
262
                values.append(extobj)
263
            return values
264
        elif obj.valuetype.startswith("ListOf"):
265
            vtype = obj.valuetype[6:]
266
            return [getattr(ua, vtype)(v) for v in obj.value]
267
        elif obj.valuetype == 'ExtensionObject':
268
            extobj = self._make_ext_obj(obj.value)
269
            return ua.Variant(extobj, getattr(ua.VariantType, obj.valuetype))
270
        else:
271
            return ua.Variant(obj.value, getattr(ua.VariantType, obj.valuetype))
272
273 View Code Duplication
    def add_variable_type(self, obj):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
274
        node = self._get_node(obj)
275
        attrs = ua.VariableTypeAttributes()
276
        if obj.desc:
277
            attrs.Description = ua.LocalizedText(obj.desc)
278
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
279
        attrs.DataType = self.to_nodeid(obj.datatype)
280
        if obj.value and len(obj.value) == 1:
281
            attrs.Value = obj.value[0]
282
        if obj.rank:
283
            attrs.ValueRank = obj.rank
284
        if obj.abstract:
285
            attrs.IsAbstract = obj.abstract
286
        if obj.dimensions:
287
            attrs.ArrayDimensions = obj.dimensions
288
        node.NodeAttributes = attrs
289
        res = self.server.add_nodes([node])
290
        self._add_refs(obj)
291
        return res[0].AddedNodeId
292
293 View Code Duplication
    def add_method(self, obj):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
294
        node = self._get_node(obj)
295
        attrs = ua.MethodAttributes()
296
        if obj.desc:
297
            attrs.Description = ua.LocalizedText(obj.desc)
298
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
299
        if obj.accesslevel:
300
            attrs.AccessLevel = obj.accesslevel
301
        if obj.useraccesslevel:
302
            attrs.UserAccessLevel = obj.useraccesslevel
303
        if obj.minsample:
304
            attrs.MinimumSamplingInterval = obj.minsample
305
        if obj.dimensions:
306
            attrs.ArrayDimensions = obj.dimensions
307
        node.NodeAttributes = attrs
308
        res = self.server.add_nodes([node])
309
        self._add_refs(obj)
310
        return res[0].AddedNodeId
311
312
    def add_reference_type(self, obj):
313
        node = self._get_node(obj)
314
        attrs = ua.ReferenceTypeAttributes()
315
        if obj.desc:
316
            attrs.Description = ua.LocalizedText(obj.desc)
317
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
318
        if obj. inversename:
319
            attrs.InverseName = ua.LocalizedText(obj.inversename)
320
        if obj.abstract:
321
            attrs.IsAbstract = obj.abstract
322
        if obj.symmetric:
323
            attrs.Symmetric = obj.symmetric
324
        node.NodeAttributes = attrs
325
        res = self.server.add_nodes([node])
326
        self._add_refs(obj)
327
        return res[0].AddedNodeId
328
329
    def add_datatype(self, obj):
330
        node = self._get_node(obj)
331
        attrs = ua.DataTypeAttributes()
332
        if obj.desc:
333
            attrs.Description = ua.LocalizedText(obj.desc)
334
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
335
        if obj.abstract:
336
            attrs.IsAbstract = obj.abstract
337
        node.NodeAttributes = attrs
338
        res = self.server.add_nodes([node])
339
        self._add_refs(obj)
340
        return res[0].AddedNodeId
341
342
    def _add_refs(self, obj):
343
        if not obj.refs:
344
            return
345
        refs = []
346
        for data in obj.refs:
347
            ref = ua.AddReferencesItem()
348
            ref.IsForward = True
349
            ref.ReferenceTypeId = self.to_nodeid(data.reftype)
350
            ref.SourceNodeId = ua.NodeId.from_string(self._get_node_id(obj.nodeid))
351
            ref.TargetNodeClass = ua.NodeClass.DataType
352
            ref.TargetNodeId = ua.NodeId.from_string(self._get_node_id(data.target))
353
            refs.append(ref)
354
        self.server.add_references(refs)
355
356
    # FIX: wrong order of node sorting .. need to find out what is wrong
357
    def _sort_nodes_by_parentid(self, nodes):
358
        """
359
        Sort the list of nodes according theire parent node in order to respect
360
        the depency between nodes.
361
362
        :param nodes: list of NodeDataObjects
363
        :returns: list of sorted nodes
364
        """
365
        _nodes = list(nodes)
366
        # list of node ids that are already sorted / inserted
367
        sorted_nodes_ids = []
368
        # list of sorted nodes (i.e. XML Elements)
369
        sorted_nodes = []
370
        # list of namespace indexes that are relevant for this import
371
        # we can only respect ordering nodes for namespaces indexes that
372
        # are defined in the xml file itself. Thus we assume that all other
373
        # references namespaces are already known to the server and should
374
        # not create any dependency problems (like "NodeNotFound")
375
        relevant_namespaces = [str(ns) for ns in self.namespaces.keys()]
376
        while len(_nodes) > 0:
377
            pop_nodes = []
378
            for node in _nodes:
379
                insert = None
380
                # Get the node and parent node namespace and id parts
381
                node_ns, node_id = self._split_node_id(node.nodeid)
382
                parent_ns, parent_id = self._split_node_id(node.parent)
383
                # Insert nodes that
384
                #   (1) have no parent / parent_ns is None (e.g. namespace 0)
385
                #   (2) ns is not in list of relevant namespaces
386
                if (parent_ns is None or
387
                    (len(relevant_namespaces) >= 1 and node_ns not in relevant_namespaces) or
388
                    parent_id is None):
389
                    insert = 0
390
                else:
391
                    # Check if the nodes parent is already in the list of
392
                    # inserted nodes
393
                    if node.parent in sorted_nodes_ids:
394
                        insert = -1
395
                if insert == 0:
396
                    sorted_nodes.insert(insert, node)
397
                    sorted_nodes_ids.insert(insert, node.nodeid)
398
                    pop_nodes.append(node)
399
                elif insert == -1:
400
                    sorted_nodes.append(node)
401
                    sorted_nodes_ids.append(node.nodeid)
402
                    pop_nodes.append(node)
403
404
            # Remove inserted nodes from the list
405
            for node in pop_nodes:
406
                _nodes.pop(_nodes.index(node))
407
        return sorted_nodes
408