Passed
Pull Request — master (#238)
by Olivier
02:17
created

asyncua.common.xmlimporter.XmlImporter._get_edef()   A

Complexity

Conditions 5

Size

Total Lines 17
Code Lines 16

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 5
eloc 16
nop 3
dl 0
loc 17
rs 9.1333
c 0
b 0
f 0
1
"""
2
add nodes defined in XML to address space
3
format is the one from opc-ua specification
4
"""
5
import logging
6
import uuid
7
from typing import Coroutine, Union, Dict
8
from copy import copy
9
10
from asyncua import ua
11
from .xmlparser import XMLParser, ua_type_to_python
12
from ..ua.uaerrors import UaError
13
14
15
class XmlImporter:
16
17
    def __init__(self, server):
18
        self.logger = logging.getLogger(__name__)
19
        self.parser = None
20
        self.server = server
21
        self.namespaces: Dict[int, int] = {}
22
        self.aliases: Dict[str, ua.NodeId] = {}
23
        self.refs = None
24
25
    async def _map_namespaces(self, namespaces_uris):
26
        """
27
        creates a mapping between the namespaces in the xml file and in the server.
28
        if not present the namespace is registered.
29
        """
30
        namespaces = {}
31
        for ns_index, ns_uri in enumerate(namespaces_uris):
32
            ns_server_index = await self.server.register_namespace(ns_uri)
33
            namespaces[ns_index + 1] = ns_server_index
34
        return namespaces
35
36
    def _map_aliases(self, aliases: dict):
37
        """
38
        maps the import aliases to the correct namespaces
39
        """
40
        aliases_mapped = {}
41
        for alias, node_id in aliases.items():
42
            aliases_mapped[alias] = self.to_nodeid(node_id)
43
        return aliases_mapped
44
45
    async def import_xml(self, xmlpath=None, xmlstring=None):
46
        """
47
        import xml and return added nodes
48
        """
49
        self.logger.info("Importing XML file %s", xmlpath)
50
        self.parser = XMLParser()
51
        await self.parser.parse(xmlpath, xmlstring)
52
        self.namespaces = await self._map_namespaces(self.parser.get_used_namespaces())
53
        self.aliases = self._map_aliases(self.parser.get_aliases())
54
        self.refs = []
55
        dnodes = self.parser.get_node_datas()
56
        dnodes = self.make_objects(dnodes)
57
        nodes_parsed = self._sort_nodes_by_parentid(dnodes)
58
        nodes = []
59
        for nodedata in nodes_parsed:  # self.parser:
60
            try:
61
                node = await self._add_node_data(nodedata)
62
            except Exception:
63
                self.logger.warning("failure adding node %s", nodedata)
64
                raise
65
            nodes.append(node)
66
        self.refs, remaining_refs = [], self.refs
67
        await self._add_references(remaining_refs)
68
        if len(self.refs):
69
            self.logger.warning("The following references could not be imported and are probably broken: %s", self.refs)
70
        return nodes
71
72
    async def _add_node_data(self, nodedata) -> "Node":
73
        if nodedata.nodetype == "UAObject":
74
            node = await self.add_object(nodedata)
75
        elif nodedata.nodetype == "UAObjectType":
76
            node = await self.add_object_type(nodedata)
77
        elif nodedata.nodetype == "UAVariable":
78
            node = await self.add_variable(nodedata)
79
        elif nodedata.nodetype == "UAVariableType":
80
            node = await self.add_variable_type(nodedata)
81
        elif nodedata.nodetype == "UAReferenceType":
82
            node = await self.add_reference_type(nodedata)
83
        elif nodedata.nodetype == "UADataType":
84
            node = await self.add_datatype(nodedata)
85
        elif nodedata.nodetype == "UAMethod":
86
            node = await self.add_method(nodedata)
87
        else:
88
            raise ValueError(f"Not implemented node type: {nodedata.nodetype} ")
89
        return node
90
91
    def _get_server(self):
92
        if hasattr(self.server, "iserver"):
93
            return self.server.iserver.isession
94
        else:
95
            return self.server.uaclient
96
97
    async def _add_references(self, refs):
98
        res = await self._get_server().add_references(refs)
99
100
        for sc, ref in zip(res, refs):
101
            if not sc.is_good():
102
                self.refs.append(ref)
103
104
    def make_objects(self, node_data):
105
        new_nodes = []
106
        for node_datum in node_data:
107
            node_datum.nodeid = ua.NodeId.from_string(node_datum.nodeid)
108
            node_datum.browsename = ua.QualifiedName.from_string(node_datum.browsename)
109
            if node_datum.parent:
110
                node_datum.parent = ua.NodeId.from_string(node_datum.parent)
111
            if node_datum.parentlink:
112
                node_datum.parentlink = self._to_nodeid(node_datum.parentlink)
113
            if node_datum.typedef:
114
                node_datum.typedef = self._to_nodeid(node_datum.typedef)
115
            new_nodes.append(node_datum)
116
        return new_nodes
117
118
    def _migrate_ns(self, nodeid: ua.NodeId) -> ua.NodeId:
119
        """
120
        Check if the index of nodeid or browsename  given in the xml model file
121
        must be converted to a already existing namespace id based on the files
122
        namespace uri
123
124
        :returns: NodeId (str)
125
        """
126
        if nodeid.NamespaceIndex in self.namespaces:
127
            nodeid = copy(nodeid)
128
            nodeid.NamespaceIndex = self.namespaces[nodeid.NamespaceIndex]
129
        return nodeid
130
131
    def _get_node(self, obj):
132
        node = ua.AddNodesItem()
133
        node.RequestedNewNodeId = self._migrate_ns(obj.nodeid)
134
        node.BrowseName = self._migrate_ns(obj.browsename)
135
        self.logger.info("Importing xml node (%s, %s) as (%s %s)", obj.browsename,
136
                         obj.nodeid, node.BrowseName, node.RequestedNewNodeId)
137
        node.NodeClass = getattr(ua.NodeClass, obj.nodetype[2:])
138
        if obj.parent and obj.parentlink:
139
            node.ParentNodeId = self._migrate_ns(obj.parent)
140
            node.ReferenceTypeId = self._migrate_ns(obj.parentlink)
141
        if obj.typedef:
142
            node.TypeDefinition = self._migrate_ns(obj.typedef)
143
        return node
144
145
    def _to_nodeid(self, nodeid: Union[ua.NodeId, None, str]) -> ua.NodeId:
146
        if isinstance(nodeid, ua.NodeId):
147
            return nodeid
148
        elif not nodeid:
149
            return ua.NodeId(ua.ObjectIds.String)
150
        elif "=" in nodeid:
151
            return ua.NodeId.from_string(nodeid)
152
        elif hasattr(ua.ObjectIds, nodeid):
153
            return ua.NodeId(getattr(ua.ObjectIds, nodeid))
154
        else:
155
            if nodeid in self.aliases:
156
                return self.aliases[nodeid]
157
            else:
158
                return ua.NodeId(getattr(ua.ObjectIds, nodeid))
159
160
    def to_nodeid(self, nodeid: Union[ua.NodeId, None, str]) -> ua.NodeId:
161
        return self._migrate_ns(self._to_nodeid(nodeid))
162
163
    async def add_object(self, obj):
164
        node = self._get_node(obj)
165
        attrs = ua.ObjectAttributes()
166
        if obj.desc:
167
            attrs.Description = ua.LocalizedText(obj.desc)
168
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
169
        attrs.EventNotifier = obj.eventnotifier
170
        node.NodeAttributes = attrs
171
        res = await self._get_server().add_nodes([node])
172
        await self._add_refs(obj)
173
        res[0].StatusCode.check()
174
        return res[0].AddedNodeId
175
176
    async def add_object_type(self, obj):
177
        node = self._get_node(obj)
178
        attrs = ua.ObjectTypeAttributes()
179
        if obj.desc:
180
            attrs.Description = ua.LocalizedText(obj.desc)
181
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
182
        attrs.IsAbstract = obj.abstract
183
        node.NodeAttributes = attrs
184
        res = await self._get_server().add_nodes([node])
185
        await self._add_refs(obj)
186
        res[0].StatusCode.check()
187
        return res[0].AddedNodeId
188
189
    async def add_variable(self, obj):
190
        node = self._get_node(obj)
191
        attrs = ua.VariableAttributes()
192
        if obj.desc:
193
            attrs.Description = ua.LocalizedText(obj.desc)
194
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
195
        attrs.DataType = self.to_nodeid(obj.datatype)
196
        if obj.value is not None:
197
            attrs.Value = self._add_variable_value(obj,)
198
        if obj.rank:
199
            attrs.ValueRank = obj.rank
200
        if obj.accesslevel:
201
            attrs.AccessLevel = obj.accesslevel
202
        if obj.useraccesslevel:
203
            attrs.UserAccessLevel = obj.useraccesslevel
204
        if obj.minsample:
205
            attrs.MinimumSamplingInterval = obj.minsample
206
        if obj.dimensions:
207
            attrs.ArrayDimensions = obj.dimensions
208
        node.NodeAttributes = attrs
209
        res = await self._get_server().add_nodes([node])
210
        await self._add_refs(obj)
211
        res[0].StatusCode.check()
212
        return res[0].AddedNodeId
213
214
    def _get_ext_class(self, name: str):
215
        if hasattr(ua, name):
216
            return getattr(ua, name)
217
        elif name in self.aliases.keys():
218
            nodeid = self.aliases[name]
219
            class_type = ua.uatypes.get_extensionobject_class_type(nodeid)
220
            if class_type:
221
                return class_type
222
            else:
223
                raise Exception("Error no extension class registered ", name, nodeid)
224
        else:
225
            raise Exception("Error no alias found for extension class", name)
226
227
    def _make_ext_obj(self, obj):
228
        ext = self._get_ext_class(obj.objname)()
229
        for name, val in obj.body:
230
            if not isinstance(val, list):
231
                raise Exception("Error val should be a list, this is a python-asyncua bug", name, type(val), val)
232
            else:
233
                for attname, v in val:
234
                    self._set_attr(ext, attname, v)
235
        return ext
236
237
    def _get_val_type(self, obj, attname: str):
238
        for name, uatype in obj.ua_types:
239
            if name == attname:
240
                return uatype
241
        raise UaError(f"Attribute '{attname}' defined in xml is not found in object '{obj}'")
242
243
    def _set_attr(self, obj, attname: str, val):
244
        # tow possible values:
245
        # either we get value directly
246
        # or a dict if it s an object or a list
247
        if isinstance(val, str):
248
            pval = ua_type_to_python(val, self._get_val_type(obj, attname))
249
            setattr(obj, attname, pval)
250
        else:
251
            # so we have either an object or a list...
252
            obj2 = getattr(obj, attname)
253
            if isinstance(obj2, ua.NodeId):  # NodeId representation does not follow common rules!!
254
                for attname2, v2 in val:
255
                    if attname2 == "Identifier":
256
                        if hasattr(ua.ObjectIds, v2):
257
                            obj2 = ua.NodeId(getattr(ua.ObjectIds, v2))
258
                        else:
259
                            obj2 = ua.NodeId.from_string(v2)
260
                        setattr(obj, attname, self._migrate_ns(obj2))
261
                        break
262
            elif not hasattr(obj2, "ua_types"):
263
                # we probably have a list
264
                my_list = []
265
                for vtype, v2 in val:
266
                    my_list.append(ua_type_to_python(v2, vtype))
267
                setattr(obj, attname, my_list)
268
            else:
269
                for attname2, v2 in val:
270
                    self._set_attr(obj2, attname2, v2)
271
                setattr(obj, attname, obj2)
272
273
    def _add_variable_value(self, obj):
274
        """
275
        Returns the value for a Variable based on the objects value type.
276
        """
277
        self.logger.debug("Setting value with type %s and value %s", obj.valuetype, obj.value)
278
        if obj.valuetype == 'ListOfExtensionObject':
279
            values = []
280
            for ext in obj.value:
281
                extobj = self._make_ext_obj(ext)
282
                values.append(extobj)
283
            return ua.Variant(values, ua.VariantType.ExtensionObject)
284
        elif obj.valuetype == 'ListOfGuid':
285
            return ua.Variant([
286
                uuid.UUID(guid) for guid in obj.value
287
            ], getattr(ua.VariantType, obj.valuetype[6:]))
288
        elif obj.valuetype.startswith("ListOf"):
289
            vtype = obj.valuetype[6:]
290
            if hasattr(ua.ua_binary.Primitives, vtype):
291
                return ua.Variant(obj.value, getattr(ua.VariantType, vtype))
292
            elif vtype == "LocalizedText":
293
                return ua.Variant([getattr(ua, vtype)(text=item["Text"], locale=item["Locale"]) for item in obj.value])
294
            else:
295
                return ua.Variant([getattr(ua, vtype)(v) for v in obj.value])
296
        elif obj.valuetype == 'ExtensionObject':
297
            extobj = self._make_ext_obj(obj.value)
298
            return ua.Variant(extobj, getattr(ua.VariantType, obj.valuetype))
299
        elif obj.valuetype == 'Guid':
300
            return ua.Variant(uuid.UUID(obj.value), getattr(ua.VariantType, obj.valuetype))
301
        elif obj.valuetype == 'LocalizedText':
302
            ltext = ua.LocalizedText()
303
            for name, val in obj.value:
304
                if name == "Text":
305
                    ltext.Text = val
306
                elif name == "Locale":
307
                    ltext.Locale = val
308
                else:
309
                    self.logger.warning("While parsing localizedText value, unkown element: %s with val: %s", name, val)
310
            return ua.Variant(ltext, ua.VariantType.LocalizedText)
311
        elif obj.valuetype == 'NodeId':
312
            return ua.Variant(ua.NodeId.from_string(obj.value))
313
        else:
314
            return ua.Variant(obj.value, getattr(ua.VariantType, obj.valuetype))
315
316
    async def add_variable_type(self, obj):
317
        node = self._get_node(obj)
318
        attrs = ua.VariableTypeAttributes()
319
        if obj.desc:
320
            attrs.Description = ua.LocalizedText(obj.desc)
321
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
322
        attrs.DataType = self.to_nodeid(obj.datatype)
323
        if obj.value and len(obj.value) == 1:
324
            attrs.Value = obj.value[0]
325
        if obj.rank:
326
            attrs.ValueRank = obj.rank
327
        if obj.abstract:
328
            attrs.IsAbstract = obj.abstract
329
        if obj.dimensions:
330
            attrs.ArrayDimensions = obj.dimensions
331
        node.NodeAttributes = attrs
332
        res = await self._get_server().add_nodes([node])
333
        await self._add_refs(obj)
334
        res[0].StatusCode.check()
335
        return res[0].AddedNodeId
336
337
    async def add_method(self, obj):
338
        node = self._get_node(obj)
339
        attrs = ua.MethodAttributes()
340
        if obj.desc:
341
            attrs.Description = ua.LocalizedText(obj.desc)
342
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
343
        if obj.accesslevel:
344
            attrs.AccessLevel = obj.accesslevel
345
        if obj.useraccesslevel:
346
            attrs.UserAccessLevel = obj.useraccesslevel
347
        if obj.minsample:
348
            attrs.MinimumSamplingInterval = obj.minsample
349
        if obj.dimensions:
350
            attrs.ArrayDimensions = obj.dimensions
351
        node.NodeAttributes = attrs
352
        res = await self._get_server().add_nodes([node])
353
        await self._add_refs(obj)
354
        res[0].StatusCode.check()
355
        return res[0].AddedNodeId
356
357
    async def add_reference_type(self, obj):
358
        node = self._get_node(obj)
359
        attrs = ua.ReferenceTypeAttributes()
360
        if obj.desc:
361
            attrs.Description = ua.LocalizedText(obj.desc)
362
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
363
        if obj. inversename:
364
            attrs.InverseName = ua.LocalizedText(obj.inversename)
365
        if obj.abstract:
366
            attrs.IsAbstract = obj.abstract
367
        if obj.symmetric:
368
            attrs.Symmetric = obj.symmetric
369
        node.NodeAttributes = attrs
370
        res = await self._get_server().add_nodes([node])
371
        await self._add_refs(obj)
372
        res[0].StatusCode.check()
373
        return res[0].AddedNodeId
374
375
    async def add_datatype(self, obj):
376
        node = self._get_node(obj)
377
        attrs = ua.DataTypeAttributes()
378
        if obj.desc:
379
            attrs.Description = ua.LocalizedText(obj.desc)
380
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
381
        if obj.abstract:
382
            attrs.IsAbstract = obj.abstract
383
        if obj.definitions and obj.parent == ua.NodeId(29):
384
            attrs.DataTypeDefinition = self._get_edef(node, obj)
385
            pass
386
        elif obj.definitions and obj.parent == ua.NodeId(22):
387
            #FIXME need better check for subclasses!!
388
            attrs.DataTypeDefinition = self._get_sdef(node, obj)
389
        else:
390
            print("NOT DEFINITON", obj.parent, obj)
391
        node.NodeAttributes = attrs
392
        res = await self._get_server().add_nodes([node])
393
        res[0].StatusCode.check()
394
        await self._add_refs(obj)
395
        return res[0].AddedNodeId
396
397
    async def _add_refs(self, obj):
398
        if not obj.refs:
399
            return
400
        refs = []
401
        for data in obj.refs:
402
            ref = ua.AddReferencesItem()
403
            ref.IsForward = data.forward
404
            ref.ReferenceTypeId = self.to_nodeid(data.reftype)
405
            ref.SourceNodeId = self._migrate_ns(obj.nodeid)
406
            ref.TargetNodeId = self.to_nodeid(data.target)
407
            refs.append(ref)
408
        await self._add_references(refs)
409
410
    def _get_edef(self, node, obj):
411
        if not obj.definitions:
412
            return None
413
        edef = ua.EnumDefinition()
414
        if obj.parent:
415
            edef.BaseDataType = obj.parent
416
        for field in obj.definitions:
417
            f = ua.EnumField()
418
            f.Name = field.name
419
            if field.dname:
420
                f.DisplayName = ua.LocalizedText(text=field.dname)
421
            else:
422
                f.DisplayName = ua.LocalizedText(text=field.name)
423
            f.Value = field.value
424
            f.Description = ua.LocalizedText(text=field.desc)
425
            edef.Fields.append(f)
426
        return edef
427
428
    def _get_sdef(self, node, obj):
429
        if not obj.definitions:
430
            return None
431
        sdef = ua.StructureDefinition()
432
        if obj.parent:
433
            sdef.BaseDataType = self.to_nodeid(obj.parent)
434
        for data in obj.refs:
435
            if data.reftype == "HasEncoding":
436
                # looks likebinary encodingisthe firt one...can someone confirm?
437
                sdef.DefaultEncodingId = self.to_nodeid(data.target)
438
                break
439
        optional = False
440
        for field in obj.definitions:
441
            f = ua.StructureField()
442
            f.Name = field.name
443
            f.DataType = self.to_nodeid(field.datatype)
444
            f.ValueRank = field.valuerank
445
            f.IsOptional = field.optional
446
            if f.IsOptional:
447
                optional = True
448
            f.ArrayDimensions = field.arraydim
449
            f.Description = ua.LocalizedText(text=field.desc)
450
            sdef.Fields.append(f)
451
        if optional:
452
            sdef.StructureType = ua.StructureType.StructureWithOptionalFields
453
        else:
454
            sdef.StructureType = ua.StructureType.Structure
455
        return sdef
456
457
    def _sort_nodes_by_parentid(self, ndatas):
458
        """
459
        Sort the list of nodes according their parent node in order to respect
460
        the dependency between nodes.
461
462
        :param nodes: list of NodeDataObjects
463
        :returns: list of sorted nodes
464
        """
465
466
        sorted_ndatas = []
467
        sorted_nodes_ids = []
468
        all_node_ids = [data.nodeid for data in ndatas]
469
        while ndatas:
470
            for ndata in ndatas[:]:
471
                if ndata.nodeid.NamespaceIndex not in self.namespaces or \
472
                        ndata.parent is None or \
473
                        ndata.parent not in all_node_ids:
474
                    sorted_ndatas.append(ndata)
475
                    sorted_nodes_ids.append(ndata.nodeid)
476
                    ndatas.remove(ndata)
477
                else:
478
                    # Check if the nodes parent is already in the list of
479
                    # inserted nodes
480
                    if ndata.parent in sorted_nodes_ids:
481
                        sorted_ndatas.append(ndata)
482
                        sorted_nodes_ids.append(ndata.nodeid)
483
                        ndatas.remove(ndata)
484
        return sorted_ndatas
485