Completed
Push — master ( 5318bd...777e81 )
by Olivier
04:55
created

XmlImporter.add_object()   A

Complexity

Conditions 2

Size

Total Lines 11

Duplication

Lines 10
Ratio 90.91 %

Code Coverage

Tests 3
CRAP Score 3.372

Importance

Changes 0
Metric Value
cc 2
dl 10
loc 11
ccs 3
cts 10
cp 0.3
crap 3.372
rs 9.4285
c 0
b 0
f 0
1
"""G
2
add node defined in XML to address space
3
format is the one from opc-ua specification
4
"""
5 1
import logging
6
import sys
7
import re
8 1
import uuid
9 1
import dateutil.parser
10
11
from opcua import ua
12 1
from opcua.common import xmlparser
13
14 1
15 1
def ua_type_to_python(val, uatype):
16 1
    if uatype.startswith("Int") or uatype.startswith("UInt"):
17 1
        return int(val)
18
    elif uatype in ("Double", "Float"):
19 1
        return float(val)
20 1
    elif uatype in ("String"):
21 1
        return val
22 1
    elif uatype in ("Bytes", "Bytes", "ByteString", "ByteArray"):
23 1
        if sys.version_info.major > 2:
24 1
            return bytes(val, 'utf8')
25 1
        else:
26
            return val
27 1
    else:
28 1
        raise Exception("uatype nopt handled", uatype, " for val ", val)
29
30
31
def to_python(val, obj, attname):
32
    if isinstance(obj, ua.NodeId) and attname == "Identifier":
33
        raise RuntimeError("Error we should parse a NodeId here")
34
        return ua.NodeId.from_string(val)
35
    else:
36
        return ua_type_to_python(val, obj.ua_types[attname])
37
38
39
class XmlImporter(object):
40 1
41 1
    def __init__(self, server):
42 1
        self.logger = logging.getLogger(__name__)
43 1
        self.parser = None
44 1
        self.server = server
45 1
        self.namespaces = {}
46 1
        self.aliases = {}
47 1
        self._re_nodeid = re.compile(r"^ns=(?P<ns>\d+[^;]*);i=(?P<i>\d+)")
48 1
49 1
    def _map_namespaces(self, namespaces_uris, act_server):
50 1
        """
51 1
        creates a mapping between the namespaces in the xml file and in the server.
52
        if not present the namespace is registered.
53 1
        """
54 1
        namespaces = {}
55
        for ns_index, ns_uri in enumerate(namespaces_uris):
56 1
            ns_server_index = act_server.register_namespace(ns_uri)
57
            namespaces[ns_index + 1] = (ns_server_index, ns_uri)
58
            self.logger.info("namespace offset", ns_index + 1, (ns_server_index, ns_uri))
59 1
        return namespaces
60
61 1
    def _map_aliases(self, aliases):
62 1
        """
63 1
        maps the import aliases to the correct namespaces        
64
        """
65
        aliases_mapped = {}
66 1
        for alias, node_id in aliases.items():
67
            aliases_mapped[alias] = self._get_node_id(node_id)
68 1
        return aliases_mapped
69
70 1
    def import_xml(self, xmlpath, act_server):
71 1
        """
72 1
        import xml and return added nodes
73 1
        """
74 1
        self.logger.info("Importing XML file %s", xmlpath)
75 1
        self.parser = xmlparser.XMLParser(xmlpath, act_server)
76 1
77 1
        self.namespaces = self._map_namespaces(self.parser.get_used_namespaces(), act_server)
78 1
        self.aliases = self._map_aliases(self.parser.get_aliases())
79 1
80
        # The ordering of nodes currently only works if namespaces are defined in XML.
81 1
        # Also, it is recommended not to use node ids without namespace prefix!
82
        nodes_parsed = self._sort_nodes_by_parentid(self.parser)
83
84
        nodes = []
85
        for nodedata in nodes_parsed:  # self.parser:
86
            if nodedata.nodetype == 'UAObject':
87
                node = self.add_object(nodedata)
88
            elif nodedata.nodetype == 'UAObjectType':
89
                node = self.add_object_type(nodedata)
90
            elif nodedata.nodetype == 'UAVariable':
91
                node = self.add_variable(nodedata)
92 1
            elif nodedata.nodetype == 'UAVariableType':
93 1
                node = self.add_variable_type(nodedata)
94 1
            elif nodedata.nodetype == 'UAReferenceType':
95 1
                node = self.add_reference_type(nodedata)
96
            elif nodedata.nodetype == 'UADataType':
97 1
                node = self.add_datatype(nodedata)
98 1
            elif nodedata.nodetype == 'UAMethod':
99
                node = self.add_method(nodedata)
100 1
            else:
101 1
                self.logger.warning("Not implemented node type: %s ", nodedata.nodetype)
102 1
                continue
103 1
            nodes.append(node)
104 1
        return nodes
105
106 1
    def _split_node_id(self, value):
107
        """
108 1
        Split the fq node id into namespace and id part.
109
110 1
        :returns: (namespace, id)
111
        """
112 1
        if not value:
113 1
            return (None, value)
114 1
        r_match = self._re_nodeid.search(value)
115
        if r_match:
116 1
            return r_match.groups()
117
118
        return (None, value)
119
120
    def _parse_bname(self, bname):
121
        """
122
        Parse a browsename and correct the namespace index.
123
        """
124
        if bname.find(':') != -1:
125
            browse_ns, browse_name = bname.split(':')
126
            if browse_ns:
127
                ns_server = self.namespaces.get(int(browse_ns), None)
128
                if ns_server:
129
                    return '%d:%s' % (ns_server[0], browse_name)
130
        return bname
131
132
    def _get_node_id(self, value):
133
        """
134
        Check if the nodeid given in the xml model file must be converted
135 1
        to a already existing namespace id based on the files namespace uri
136
137
        :returns: NodeId (str)
138
        """
139
        result = value
140
141
        node_ns, node_id = self._split_node_id(value)
142
        if node_ns:
143
            ns_server = self.namespaces.get(int(node_ns), None)
144
            if ns_server:
145
                result = "ns={};i={}".format(ns_server[0], node_id)
146
        return result
147
148
    def _get_node(self, obj):
149
        node = ua.AddNodesItem()
150
        node.RequestedNewNodeId = ua.NodeId.from_string(self._get_node_id(obj.nodeid))
151
        node.BrowseName = ua.QualifiedName.from_string(self._parse_bname(obj.browsename))
152
        node.NodeClass = getattr(ua.NodeClass, obj.nodetype[2:])
153 1
        if obj.parent:
154
            node.ParentNodeId = ua.NodeId.from_string(self._get_node_id(obj.parent))
155
        if obj.parentlink:
156
            node.ReferenceTypeId = self.to_nodeid(self._get_node_id(obj.parentlink))
157
        if obj.typedef:
158
            node.TypeDefinition = ua.NodeId.from_string(self._get_node_id(obj.typedef))
159
        return node
160
161
    def to_nodeid(self, nodeid):
162
        if not nodeid:
163
            return ua.NodeId(ua.ObjectIds.String)
164
        elif "=" in nodeid:
165
            return ua.NodeId.from_string(nodeid)
166
        elif hasattr(ua.ObjectIds, nodeid):
167
            return ua.NodeId(getattr(ua.ObjectIds, nodeid))
168
        else:
169 1
            if nodeid in self.aliases:
170
                nodeid = self.aliases[nodeid]
171
            else:
172
                nodeid = "i={}".format(getattr(ua.ObjectIds, nodeid))
173
            return ua.NodeId.from_string(nodeid)
174 View Code Duplication
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
175
    def add_object(self, obj):
176
        node = self._get_node(obj)
177
        attrs = ua.ObjectAttributes()
178
        if obj.desc:
179
            attrs.Description = ua.LocalizedText(obj.desc)
180
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
181 1
        attrs.EventNotifier = obj.eventnotifier
182 1
        node.NodeAttributes = attrs
183 1
        res = self.server.add_nodes([node])
184
        self._add_refs(obj)
185
        return res[0].AddedNodeId
186 View Code Duplication
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
187
    def add_object_type(self, obj):
188
        node = self._get_node(obj)
189
        attrs = ua.ObjectTypeAttributes()
190
        if obj.desc:
191
            attrs.Description = ua.LocalizedText(obj.desc)
192
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
193
        attrs.IsAbstract = obj.abstract
194
        node.NodeAttributes = attrs
195
        res = self.server.add_nodes([node])
196
        self._add_refs(obj)
197
        return res[0].AddedNodeId
198
199
    def add_variable(self, obj):
200
        node = self._get_node(obj)
201
        attrs = ua.VariableAttributes()
202
        if obj.desc:
203
            attrs.Description = ua.LocalizedText(obj.desc)
204
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
205
        attrs.DataType = self.to_nodeid(obj.datatype)
206
        # if obj.value and len(obj.value) == 1:
207
        if obj.value is not None:
208
            attrs.Value = self._add_variable_value(obj,)
209
        if obj.rank:
210
            attrs.ValueRank = obj.rank
211
        if obj.accesslevel:
212
            attrs.AccessLevel = obj.accesslevel
213
        if obj.useraccesslevel:
214
            attrs.UserAccessLevel = obj.useraccesslevel
215
        if obj.minsample:
216
            attrs.MinimumSamplingInterval = obj.minsample
217
        if obj.dimensions:
218
            attrs.ArrayDimensions = obj.dimensions
219
        node.NodeAttributes = attrs
220
        res = self.server.add_nodes([node])
221
        self._add_refs(obj)
222
        return res[0].AddedNodeId
223
224
    def _make_ext_obj(sefl, obj):
225
        ext = getattr(ua, obj.objname)()
226
        for name, val in obj.body:
227
            if type(val) is str:
228
                raise Exception("Error val should a dict", name, val)
229
            else:
230
                for attname, v in val:
231
                    # tow possible values:
232
                    # either we get value directly
233
                    # or a dict if it s an object or a list
234
                    if type(v) is str:
235
                        setattr(ext, attname, to_python(v, ext, attname))
236
                    else:
237
                        # so we have either an object or a list...
238
                        obj2 = getattr(ext, attname)
239
                        if isinstance(obj2, ua.NodeId):
240
                            for attname2, v2 in v:
241
                                if attname2 == "Identifier":
242
                                    obj2 = ua.NodeId.from_string(v2)
243
                                    setattr(ext, attname, obj2)
244
                                    break
245
                        elif not isinstance(obj2, ua.NodeId) and not hasattr(obj2, "ua_types"):
246
                            # we probably have a list
247
                            my_list = []
248
                            for vtype, v2 in v:
249
                                my_list.append(ua_type_to_python(v2, vtype))
250
                            setattr(ext, attname, my_list)
251
                        else:
252
                            for attname2, v2 in v:
253
                                setattr(obj2, attname2, to_python(v2, obj2, attname2))
254
                            setattr(ext, attname, obj2)
255
        return ext
256
257
    def _add_variable_value(self, obj):
258
        """
259
        Returns the value for a Variable based on the objects value type.
260
        """
261
        if obj.valuetype == 'ListOfExtensionObject':
262
            values = []
263
            for ext in obj.value:
264
                extobj = self._make_ext_obj(ext)
265
                values.append(extobj)
266
            return values
267
        elif obj.valuetype.startswith("ListOf"):
268
            vtype = obj.valuetype[6:]
269
            return [getattr(ua, vtype)(v) for v in obj.value]
270
        elif obj.valuetype == 'ExtensionObject':
271
            extobj = self._make_ext_obj(obj.value)
272
            return ua.Variant(extobj, getattr(ua.VariantType, obj.valuetype))
273
        elif obj.valuetype == 'DateTime':
274
            return ua.Variant(dateutil.parser.parse(obj.value), getattr(ua.VariantType, obj.valuetype))
275 View Code Duplication
        elif obj.valuetype == 'Guid':
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
276
            return ua.Variant(uuid.UUID(obj.value), getattr(ua.VariantType, obj.valuetype))
277
        else:
278
            return ua.Variant(obj.value, getattr(ua.VariantType, obj.valuetype))
279
280
    def add_variable_type(self, obj):
281
        node = self._get_node(obj)
282
        attrs = ua.VariableTypeAttributes()
283
        if obj.desc:
284
            attrs.Description = ua.LocalizedText(obj.desc)
285
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
286
        attrs.DataType = self.to_nodeid(obj.datatype)
287
        if obj.value and len(obj.value) == 1:
288
            attrs.Value = obj.value[0]
289
        if obj.rank:
290
            attrs.ValueRank = obj.rank
291
        if obj.abstract:
292
            attrs.IsAbstract = obj.abstract
293
        if obj.dimensions:
294
            attrs.ArrayDimensions = obj.dimensions
295 View Code Duplication
        node.NodeAttributes = attrs
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
296
        res = self.server.add_nodes([node])
297
        self._add_refs(obj)
298
        return res[0].AddedNodeId
299
300
    def add_method(self, obj):
301
        node = self._get_node(obj)
302
        attrs = ua.MethodAttributes()
303
        if obj.desc:
304
            attrs.Description = ua.LocalizedText(obj.desc)
305
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
306
        if obj.accesslevel:
307
            attrs.AccessLevel = obj.accesslevel
308
        if obj.useraccesslevel:
309
            attrs.UserAccessLevel = obj.useraccesslevel
310
        if obj.minsample:
311
            attrs.MinimumSamplingInterval = obj.minsample
312
        if obj.dimensions:
313
            attrs.ArrayDimensions = obj.dimensions
314 View Code Duplication
        node.NodeAttributes = attrs
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
315
        res = self.server.add_nodes([node])
316
        self._add_refs(obj)
317
        return res[0].AddedNodeId
318
319
    def add_reference_type(self, obj):
320
        node = self._get_node(obj)
321
        attrs = ua.ReferenceTypeAttributes()
322
        if obj.desc:
323
            attrs.Description = ua.LocalizedText(obj.desc)
324
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
325
        if obj. inversename:
326
            attrs.InverseName = ua.LocalizedText(obj.inversename)
327
        if obj.abstract:
328
            attrs.IsAbstract = obj.abstract
329
        if obj.symmetric:
330
            attrs.Symmetric = obj.symmetric
331 View Code Duplication
        node.NodeAttributes = attrs
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
332
        res = self.server.add_nodes([node])
333
        self._add_refs(obj)
334
        return res[0].AddedNodeId
335
336
    def add_datatype(self, obj):
337
        node = self._get_node(obj)
338
        attrs = ua.DataTypeAttributes()
339
        if obj.desc:
340
            attrs.Description = ua.LocalizedText(obj.desc)
341
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
342
        if obj.abstract:
343
            attrs.IsAbstract = obj.abstract
344
        node.NodeAttributes = attrs
345
        res = self.server.add_nodes([node])
346
        self._add_refs(obj)
347
        return res[0].AddedNodeId
348
349
    def _add_refs(self, obj):
350
        if not obj.refs:
351
            return
352
        refs = []
353
        for data in obj.refs:
354
            ref = ua.AddReferencesItem()
355
            ref.IsForward = True
356
            ref.ReferenceTypeId = self.to_nodeid(data.reftype)
357
            ref.SourceNodeId = ua.NodeId.from_string(self._get_node_id(obj.nodeid))
358
            ref.TargetNodeClass = ua.NodeClass.DataType
359
            ref.TargetNodeId = ua.NodeId.from_string(self._get_node_id(data.target))
360
            refs.append(ref)
361
        self.server.add_references(refs)
362
363
    # FIX: wrong order of node sorting .. need to find out what is wrong
364
    def _sort_nodes_by_parentid(self, nodes):
365
        """
366
        Sort the list of nodes according their parent node in order to respect
367
        the dependency between nodes.
368
369
        :param nodes: list of NodeDataObjects
370
        :returns: list of sorted nodes
371
        """
372
        _nodes = list(nodes)
373
        # list of node ids that are already sorted / inserted
374
        sorted_nodes_ids = []
375
        # list of sorted nodes (i.e. XML Elements)
376
        sorted_nodes = []
377
        # list of namespace indexes that are relevant for this import
378
        # we can only respect ordering nodes for namespaces indexes that
379
        # are defined in the xml file itself. Thus we assume that all other
380
        # references namespaces are already known to the server and should
381
        # not create any dependency problems (like "NodeNotFound")
382
        relevant_namespaces = [str(ns) for ns in self.namespaces.keys()]
383
        while len(_nodes) > 0:
384
            pop_nodes = []
385
            for node in _nodes:
386
                insert = None
387
                # Get the node and parent node namespace and id parts
388
                node_ns, node_id = self._split_node_id(node.nodeid)
389
                parent_ns, parent_id = self._split_node_id(node.parent)
390
                # Insert nodes that
391
                #   (1) have no parent / parent_ns is None (e.g. namespace 0)
392
                #   (2) ns is not in list of relevant namespaces
393
                if (parent_ns is None or (len(relevant_namespaces) >= 1 and node_ns not in relevant_namespaces) or
394
                        parent_id is None):
395
                    insert = 0
396
                else:
397
                    # Check if the nodes parent is already in the list of inserted nodes
398
                    if node.parent in sorted_nodes_ids:
399
                        insert = -1
400
                if insert == 0:
401
                    sorted_nodes.insert(insert, node)
402
                    sorted_nodes_ids.insert(insert, node.nodeid)
403
                    pop_nodes.append(node)
404
                elif insert == -1:
405
                    sorted_nodes.append(node)
406
                    sorted_nodes_ids.append(node.nodeid)
407
                    pop_nodes.append(node)
408
409
            # Remove inserted nodes from the list
410
            for node in pop_nodes:
411
                _nodes.pop(_nodes.index(node))
412
        return sorted_nodes
413