Passed
Pull Request — master (#238)
by Olivier
02:38
created

asyncua.common.xmlimporter.XmlImporter._get_sdef()   B

Complexity

Conditions 8

Size

Total Lines 28
Code Lines 26

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 8
eloc 26
nop 3
dl 0
loc 28
rs 7.3333
c 0
b 0
f 0
1
"""
2
add nodes defined in XML to address space
3
format is the one from opc-ua specification
4
"""
5
import logging
6
import uuid
7
from typing import Coroutine, Union, Dict
8
from copy import copy
9
10
from asyncua import ua
11
from .xmlparser import XMLParser, ua_type_to_python
12
from ..ua.uaerrors import UaError
13
14
15
_logger = logging.getLogger(__name__)
16
17
18
class XmlImporter:
19
20
    def __init__(self, server):
21
        self.parser = None
22
        self.server = server
23
        self.namespaces: Dict[int, int] = {}  #Dict[IndexInXml, IndexInServer]
24
        self.aliases: Dict[str, ua.NodeId] = {}
25
        self.refs = None
26
27
    async def _map_namespaces(self, namespaces_uris):
28
        """
29
        creates a mapping between the namespaces in the xml file and in the server.
30
        if not present the namespace is registered.
31
        """
32
        namespaces = {}
33
        for ns_index, ns_uri in enumerate(namespaces_uris):
34
            ns_server_index = await self.server.register_namespace(ns_uri)
35
            namespaces[ns_index + 1] = ns_server_index
36
        return namespaces
37
38
    def _map_aliases(self, aliases: dict):
39
        """
40
        maps the import aliases to the correct namespaces
41
        """
42
        aliases_mapped = {}
43
        for alias, node_id in aliases.items():
44
            aliases_mapped[alias] = self._to_migrated_nodeid(node_id)
45
        return aliases_mapped
46
47
    async def import_xml(self, xmlpath=None, xmlstring=None):
48
        """
49
        import xml and return added nodes
50
        """
51
        _logger.info("Importing XML file %s", xmlpath)
52
        self.parser = XMLParser()
53
        await self.parser.parse(xmlpath, xmlstring)
54
        self.namespaces = await self._map_namespaces(self.parser.get_used_namespaces())
55
        self.aliases = self._map_aliases(self.parser.get_aliases())
56
        self.refs = []
57
        dnodes = self.parser.get_node_datas()
58
        dnodes = self.make_objects(dnodes)
59
        nodes_parsed = self._sort_nodes_by_parentid(dnodes)
60
        nodes = []
61
        for nodedata in nodes_parsed:  # self.parser:
62
            try:
63
                node = await self._add_node_data(nodedata)
64
            except Exception:
65
                _logger.warning("failure adding node %s", nodedata)
66
                raise
67
            nodes.append(node)
68
        self.refs, remaining_refs = [], self.refs
69
        await self._add_references(remaining_refs)
70
        if len(self.refs):
71
            _logger.warning("The following references could not be imported and are probably broken: %s", self.refs)
72
        return nodes
73
74
    async def _add_node_data(self, nodedata) -> "Node":
75
        if nodedata.nodetype == "UAObject":
76
            node = await self.add_object(nodedata)
77
        elif nodedata.nodetype == "UAObjectType":
78
            node = await self.add_object_type(nodedata)
79
        elif nodedata.nodetype == "UAVariable":
80
            node = await self.add_variable(nodedata)
81
        elif nodedata.nodetype == "UAVariableType":
82
            node = await self.add_variable_type(nodedata)
83
        elif nodedata.nodetype == "UAReferenceType":
84
            node = await self.add_reference_type(nodedata)
85
        elif nodedata.nodetype == "UADataType":
86
            node = await self.add_datatype(nodedata)
87
        elif nodedata.nodetype == "UAMethod":
88
            node = await self.add_method(nodedata)
89
        else:
90
            raise ValueError(f"Not implemented node type: {nodedata.nodetype} ")
91
        return node
92
93
    def _get_server(self):
94
        if hasattr(self.server, "iserver"):
95
            return self.server.iserver.isession
96
        else:
97
            return self.server.uaclient
98
99
    async def _add_references(self, refs):
100
        res = await self._get_server().add_references(refs)
101
102
        for sc, ref in zip(res, refs):
103
            if not sc.is_good():
104
                self.refs.append(ref)
105
106
    def make_objects(self, node_data):
107
        new_nodes = []
108
        for node_datum in node_data:
109
            node_datum.nodeid = self._to_migrated_nodeid(node_datum.nodeid)
110
            node_datum.browsename = ua.QualifiedName.from_string(node_datum.browsename)
111
            if node_datum.parent:
112
                node_datum.parent = self._to_migrated_nodeid(node_datum.parent)
113
            if node_datum.parentlink:
114
                node_datum.parentlink = self._to_migrated_nodeid(node_datum.parentlink)
115
            if node_datum.typedef:
116
                node_datum.typedef = self._to_migrated_nodeid(node_datum.typedef)
117
            if node_datum.datatype:
118
                node_datum.datatype = self._to_migrated_nodeid(node_datum.datatype)
119
            new_nodes.append(node_datum)
120
            for ref in node_datum.refs:
121
                ref.reftype = self._to_migrated_nodeid(ref.reftype)
122
                ref.target = self._to_migrated_nodeid(ref.target)
123
            for field in node_datum.definitions:
124
                if field.datatype:
125
                    field.datatype = self._to_migrated_nodeid(field.datatype)
126
        return new_nodes
127
128
    def _migrate_ns(self, nodeid: ua.NodeId) -> ua.NodeId:
129
        """
130
        Check if the index of nodeid or browsename  given in the xml model file
131
        must be converted to a already existing namespace id based on the files
132
        namespace uri
133
134
        :returns: NodeId (str)
135
        """
136
        if nodeid.NamespaceIndex in self.namespaces:
137
            nodeid = copy(nodeid)
138
            nodeid.NamespaceIndex = self.namespaces[nodeid.NamespaceIndex]
139
        return nodeid
140
141
    def _get_node(self, obj):
142
        node = ua.AddNodesItem()
143
        node.RequestedNewNodeId = self._migrate_ns(obj.nodeid)
144
        node.BrowseName = self._migrate_ns(obj.browsename)
145
        _logger.info("Importing xml node (%s, %s) as (%s %s)", obj.browsename,
146
                         obj.nodeid, node.BrowseName, node.RequestedNewNodeId)
147
        node.NodeClass = getattr(ua.NodeClass, obj.nodetype[2:])
148
        if obj.parent and obj.parentlink:
149
            node.ParentNodeId = self._migrate_ns(obj.parent)
150
            node.ReferenceTypeId = self._migrate_ns(obj.parentlink)
151
        if obj.typedef:
152
            node.TypeDefinition = self._migrate_ns(obj.typedef)
153
        return node
154
155
    def _to_migrated_nodeid(self, nodeid: Union[ua.NodeId, None, str]) -> ua.NodeId:
156
        nodeid = self._to_nodeid(nodeid)
157
        return self._migrate_ns(nodeid)
158
159
    def _to_nodeid(self, nodeid: Union[ua.NodeId, None, str]) -> ua.NodeId:
160
        if isinstance(nodeid, ua.NodeId):
161
            return nodeid
162
        elif not nodeid:
163
            return ua.NodeId(ua.ObjectIds.String)
164
        elif "=" in nodeid:
165
            return ua.NodeId.from_string(nodeid)
166
        elif hasattr(ua.ObjectIds, nodeid):
167
            return ua.NodeId(getattr(ua.ObjectIds, nodeid))
168
        else:
169
            if nodeid in self.aliases:
170
                return self.aliases[nodeid]
171
            else:
172
                return ua.NodeId(getattr(ua.ObjectIds, nodeid))
173
174
    async def add_object(self, obj):
175
        node = self._get_node(obj)
176
        attrs = ua.ObjectAttributes()
177
        if obj.desc:
178
            attrs.Description = ua.LocalizedText(obj.desc)
179
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
180
        attrs.EventNotifier = obj.eventnotifier
181
        node.NodeAttributes = attrs
182
        res = await self._get_server().add_nodes([node])
183
        await self._add_refs(obj)
184
        res[0].StatusCode.check()
185
        return res[0].AddedNodeId
186
187
    async def add_object_type(self, obj):
188
        node = self._get_node(obj)
189
        attrs = ua.ObjectTypeAttributes()
190
        if obj.desc:
191
            attrs.Description = ua.LocalizedText(obj.desc)
192
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
193
        attrs.IsAbstract = obj.abstract
194
        node.NodeAttributes = attrs
195
        res = await self._get_server().add_nodes([node])
196
        await self._add_refs(obj)
197
        res[0].StatusCode.check()
198
        return res[0].AddedNodeId
199
200
    async def add_variable(self, obj):
201
        node = self._get_node(obj)
202
        attrs = ua.VariableAttributes()
203
        if obj.desc:
204
            attrs.Description = ua.LocalizedText(obj.desc)
205
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
206
        attrs.DataType = obj.datatype
207
        if obj.value is not None:
208
            attrs.Value = self._add_variable_value(obj,)
209
        if obj.rank:
210
            attrs.ValueRank = obj.rank
211
        if obj.accesslevel:
212
            attrs.AccessLevel = obj.accesslevel
213
        if obj.useraccesslevel:
214
            attrs.UserAccessLevel = obj.useraccesslevel
215
        if obj.minsample:
216
            attrs.MinimumSamplingInterval = obj.minsample
217
        if obj.dimensions:
218
            attrs.ArrayDimensions = obj.dimensions
219
        node.NodeAttributes = attrs
220
        res = await self._get_server().add_nodes([node])
221
        await self._add_refs(obj)
222
        res[0].StatusCode.check()
223
        return res[0].AddedNodeId
224
225
    def _get_ext_class(self, name: str):
226
        if hasattr(ua, name):
227
            return getattr(ua, name)
228
        elif name in self.aliases.keys():
229
            nodeid = self.aliases[name]
230
            class_type = ua.uatypes.get_extensionobject_class_type(nodeid)
231
            if class_type:
232
                return class_type
233
            else:
234
                raise Exception("Error no extension class registered ", name, nodeid)
235
        else:
236
            raise Exception("Error no alias found for extension class", name)
237
238
    def _make_ext_obj(self, obj):
239
        ext = self._get_ext_class(obj.objname)()
240
        for name, val in obj.body:
241
            if not isinstance(val, list):
242
                raise Exception("Error val should be a list, this is a python-asyncua bug", name, type(val), val)
243
            else:
244
                for attname, v in val:
245
                    self._set_attr(ext, attname, v)
246
        return ext
247
248
    def _get_val_type(self, obj, attname: str):
249
        for name, uatype in obj.ua_types:
250
            if name == attname:
251
                return uatype
252
        raise UaError(f"Attribute '{attname}' defined in xml is not found in object '{obj}'")
253
254
    def _set_attr(self, obj, attname: str, val):
255
        # tow possible values:
256
        # either we get value directly
257
        # or a dict if it s an object or a list
258
        if isinstance(val, str):
259
            pval = ua_type_to_python(val, self._get_val_type(obj, attname))
260
            setattr(obj, attname, pval)
261
        else:
262
            # so we have either an object or a list...
263
            obj2 = getattr(obj, attname)
264
            if isinstance(obj2, ua.NodeId):  # NodeId representation does not follow common rules!!
265
                for attname2, v2 in val:
266
                    if attname2 == "Identifier":
267
                        if hasattr(ua.ObjectIds, v2):
268
                            obj2 = ua.NodeId(getattr(ua.ObjectIds, v2))
269
                        else:
270
                            obj2 = ua.NodeId.from_string(v2)
271
                        setattr(obj, attname, self._migrate_ns(obj2))
272
                        break
273
            elif not hasattr(obj2, "ua_types"):
274
                # we probably have a list
275
                my_list = []
276
                for vtype, v2 in val:
277
                    my_list.append(ua_type_to_python(v2, vtype))
278
                setattr(obj, attname, my_list)
279
            else:
280
                for attname2, v2 in val:
281
                    self._set_attr(obj2, attname2, v2)
282
                setattr(obj, attname, obj2)
283
284
    def _add_variable_value(self, obj):
285
        """
286
        Returns the value for a Variable based on the objects value type.
287
        """
288
        _logger.debug("Setting value with type %s and value %s", obj.valuetype, obj.value)
289
        if obj.valuetype == 'ListOfExtensionObject':
290
            values = []
291
            for ext in obj.value:
292
                extobj = self._make_ext_obj(ext)
293
                values.append(extobj)
294
            return ua.Variant(values, ua.VariantType.ExtensionObject)
295
        elif obj.valuetype == 'ListOfGuid':
296
            return ua.Variant([
297
                uuid.UUID(guid) for guid in obj.value
298
            ], getattr(ua.VariantType, obj.valuetype[6:]))
299
        elif obj.valuetype.startswith("ListOf"):
300
            vtype = obj.valuetype[6:]
301
            if hasattr(ua.ua_binary.Primitives, vtype):
302
                return ua.Variant(obj.value, getattr(ua.VariantType, vtype))
303
            elif vtype == "LocalizedText":
304
                return ua.Variant([getattr(ua, vtype)(text=item["Text"], locale=item["Locale"]) for item in obj.value])
305
            else:
306
                return ua.Variant([getattr(ua, vtype)(v) for v in obj.value])
307
        elif obj.valuetype == 'ExtensionObject':
308
            extobj = self._make_ext_obj(obj.value)
309
            return ua.Variant(extobj, getattr(ua.VariantType, obj.valuetype))
310
        elif obj.valuetype == 'Guid':
311
            return ua.Variant(uuid.UUID(obj.value), getattr(ua.VariantType, obj.valuetype))
312
        elif obj.valuetype == 'LocalizedText':
313
            ltext = ua.LocalizedText()
314
            for name, val in obj.value:
315
                if name == "Text":
316
                    ltext.Text = val
317
                elif name == "Locale":
318
                    ltext.Locale = val
319
                else:
320
                    _logger.warning("While parsing localizedText value, unkown element: %s with val: %s", name, val)
321
            return ua.Variant(ltext, ua.VariantType.LocalizedText)
322
        elif obj.valuetype == 'NodeId':
323
            return ua.Variant(ua.NodeId.from_string(obj.value))
324
        else:
325
            return ua.Variant(obj.value, getattr(ua.VariantType, obj.valuetype))
326
327
    async def add_variable_type(self, obj):
328
        node = self._get_node(obj)
329
        attrs = ua.VariableTypeAttributes()
330
        if obj.desc:
331
            attrs.Description = ua.LocalizedText(obj.desc)
332
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
333
        attrs.DataType = obj.datatype
334
        if obj.value and len(obj.value) == 1:
335
            attrs.Value = obj.value[0]
336
        if obj.rank:
337
            attrs.ValueRank = obj.rank
338
        if obj.abstract:
339
            attrs.IsAbstract = obj.abstract
340
        if obj.dimensions:
341
            attrs.ArrayDimensions = obj.dimensions
342
        node.NodeAttributes = attrs
343
        res = await self._get_server().add_nodes([node])
344
        await self._add_refs(obj)
345
        res[0].StatusCode.check()
346
        return res[0].AddedNodeId
347
348
    async def add_method(self, obj):
349
        node = self._get_node(obj)
350
        attrs = ua.MethodAttributes()
351
        if obj.desc:
352
            attrs.Description = ua.LocalizedText(obj.desc)
353
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
354
        if obj.accesslevel:
355
            attrs.AccessLevel = obj.accesslevel
356
        if obj.useraccesslevel:
357
            attrs.UserAccessLevel = obj.useraccesslevel
358
        if obj.minsample:
359
            attrs.MinimumSamplingInterval = obj.minsample
360
        if obj.dimensions:
361
            attrs.ArrayDimensions = obj.dimensions
362
        node.NodeAttributes = attrs
363
        res = await self._get_server().add_nodes([node])
364
        await self._add_refs(obj)
365
        res[0].StatusCode.check()
366
        return res[0].AddedNodeId
367
368
    async def add_reference_type(self, obj):
369
        node = self._get_node(obj)
370
        attrs = ua.ReferenceTypeAttributes()
371
        if obj.desc:
372
            attrs.Description = ua.LocalizedText(obj.desc)
373
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
374
        if obj. inversename:
375
            attrs.InverseName = ua.LocalizedText(obj.inversename)
376
        if obj.abstract:
377
            attrs.IsAbstract = obj.abstract
378
        if obj.symmetric:
379
            attrs.Symmetric = obj.symmetric
380
        node.NodeAttributes = attrs
381
        res = await self._get_server().add_nodes([node])
382
        await self._add_refs(obj)
383
        res[0].StatusCode.check()
384
        return res[0].AddedNodeId
385
386
    async def add_datatype(self, obj):
387
        node = self._get_node(obj)
388
        attrs = ua.DataTypeAttributes()
389
        if obj.desc:
390
            attrs.Description = ua.LocalizedText(obj.desc)
391
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
392
        if obj.abstract:
393
            attrs.IsAbstract = obj.abstract
394
        if not obj.definitions:
395
            print("NOT DEFINITON", obj.parent, obj)
396
        else:
397
            if obj.parent == self.server.nodes.enum_data_type:
398
                attrs.DataTypeDefinition = self._get_edef(node, obj)
399
            elif obj.parent == self.server.nodes.base_structure_type:
400
                attrs.DataTypeDefinition = self._get_sdef(node, obj)
401
            else:
402
                parent_node = self.server.get_node(obj.parent)
403
                path = await parent_node.get_path()
404
                if self.server.nodes.base_structure_type in path:
405
                    attrs.DataTypeDefinition = self._get_sdef(node, obj)
406
                else:
407
                    _logger.warning("%s has datatypedefinition and path %s but we could not find out if this is a struct", obj, path)
408
        node.NodeAttributes = attrs
409
        res = await self._get_server().add_nodes([node])
410
        res[0].StatusCode.check()
411
        await self._add_refs(obj)
412
        return res[0].AddedNodeId
413
414
    async def _add_refs(self, obj):
415
        if not obj.refs:
416
            return
417
        refs = []
418
        for data in obj.refs:
419
            ref = ua.AddReferencesItem()
420
            ref.IsForward = data.forward
421
            ref.ReferenceTypeId = data.reftype
422
            ref.SourceNodeId = obj.nodeid
423
            ref.TargetNodeId = data.target
424
            refs.append(ref)
425
        await self._add_references(refs)
426
427
    def _get_edef(self, node, obj):
428
        if not obj.definitions:
429
            return None
430
        edef = ua.EnumDefinition()
431
        if obj.parent:
432
            edef.BaseDataType = obj.parent
433
        for field in obj.definitions:
434
            f = ua.EnumField()
435
            f.Name = field.name
436
            if field.dname:
437
                f.DisplayName = ua.LocalizedText(text=field.dname)
438
            else:
439
                f.DisplayName = ua.LocalizedText(text=field.name)
440
            f.Value = field.value
441
            f.Description = ua.LocalizedText(text=field.desc)
442
            edef.Fields.append(f)
443
        return edef
444
445
    def _get_sdef(self, node, obj):
446
        if not obj.definitions:
447
            return None
448
        sdef = ua.StructureDefinition()
449
        if obj.parent:
450
            sdef.BaseDataType = obj.parent
451
        for data in obj.refs:
452
            if data.reftype == "HasEncoding":
453
                # looks likebinary encodingisthe firt one...can someone confirm?
454
                sdef.DefaultEncodingId = data.target
455
                break
456
        optional = False
457
        for field in obj.definitions:
458
            f = ua.StructureField()
459
            f.Name = field.name
460
            f.DataType = field.datatype
461
            f.ValueRank = field.valuerank
462
            f.IsOptional = field.optional
463
            if f.IsOptional:
464
                optional = True
465
            f.ArrayDimensions = field.arraydim
466
            f.Description = ua.LocalizedText(text=field.desc)
467
            sdef.Fields.append(f)
468
        if optional:
469
            sdef.StructureType = ua.StructureType.StructureWithOptionalFields
470
        else:
471
            sdef.StructureType = ua.StructureType.Structure
472
        return sdef
473
474
    def _sort_nodes_by_parentid(self, ndatas):
475
        """
476
        Sort the list of nodes according their parent node in order to respect
477
        the dependency between nodes.
478
479
        :param nodes: list of NodeDataObjects
480
        :returns: list of sorted nodes
481
        """
482
483
        sorted_ndatas = []
484
        sorted_nodes_ids = []
485
        all_node_ids = [data.nodeid for data in ndatas]
486
        while ndatas:
487
            for ndata in ndatas[:]:
488
                if ndata.nodeid.NamespaceIndex not in self.namespaces.values() or \
489
                        ndata.parent is None or \
490
                        ndata.parent not in all_node_ids:
491
                    sorted_ndatas.append(ndata)
492
                    sorted_nodes_ids.append(ndata.nodeid)
493
                    ndatas.remove(ndata)
494
                else:
495
                    # Check if the nodes parent is already in the list of
496
                    # inserted nodes
497
                    if ndata.parent in sorted_nodes_ids:
498
                        sorted_ndatas.append(ndata)
499
                        sorted_nodes_ids.append(ndata.nodeid)
500
                        ndatas.remove(ndata)
501
        return sorted_ndatas
502