Passed
Pull Request — master (#358)
by
unknown
02:51
created

XmlImporter._get_add_node_item()   A

Complexity

Conditions 4

Size

Total Lines 13
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 4
eloc 13
nop 2
dl 0
loc 13
rs 9.75
c 0
b 0
f 0
1
"""
2
add nodes defined in XML to address space
3
format is the one from opc-ua specification
4
"""
5
import logging
6
import uuid
7
from typing import Coroutine, Union, Dict
8
from copy import copy
9
10
from asyncua import ua
11
from .xmlparser import XMLParser, ua_type_to_python
12
from ..ua.uaerrors import UaError
13
14
15
_logger = logging.getLogger(__name__)
16
17
18
class XmlImporter:
19
20
    def __init__(self, server):
21
        self.parser = None
22
        self.server = server
23
        self.namespaces: Dict[int, int] = {}  #Dict[IndexInXml, IndexInServer]
24
        self.aliases: Dict[str, ua.NodeId] = {}
25
        self.refs = None
26
27
    async def _map_namespaces(self, namespaces_uris):
28
        """
29
        creates a mapping between the namespaces in the xml file and in the server.
30
        if not present the namespace is registered.
31
        """
32
        namespaces = {}
33
        for ns_index, ns_uri in enumerate(namespaces_uris):
34
            ns_server_index = await self.server.register_namespace(ns_uri)
35
            namespaces[ns_index + 1] = ns_server_index
36
        return namespaces
37
38
    def _map_aliases(self, aliases: dict):
39
        """
40
        maps the import aliases to the correct namespaces
41
        """
42
        aliases_mapped = {}
43
        for alias, node_id in aliases.items():
44
            aliases_mapped[alias] = self._to_migrated_nodeid(node_id)
45
        return aliases_mapped
46
47
    async def _check_requirements(self, xmlpath):
48
        req_models = self.parser.list_required_models(xmlpath)
49
        server_model_list = []
50
        server_namespaces_node = await self.server.nodes.namespaces.get_children()  # get_node(ua.NodeId(ua.ObjectIds.Server_Namespaces))
51
        for model_node in server_namespaces_node:
52
            server_model_list.append({"ModelUri": await(await model_node.get_child("NamespaceUri")).read_value(),
53
                                      "Version": await(await model_node.get_child("NamespaceVersion")).read_value(),
54
                                      "PublicationDate": str(await(await model_node.get_child("NamespacePublicationDate")).read_value())})
55
56
        for model in server_model_list:
57
            for req_model in req_models:
58
                if model["ModelUri"] == req_model["ModelUri"]:
59
                    if model["Version"] >= req_model["Version"] and model["PublicationDate"] >= req_model["PublicationDate"]:
60
                        server_model_list.remove(model)
61
        if len(server_model_list):
62
            return False
63
        return True
64
65
    async def import_xml(self, xmlpath=None, xmlstring=None):
66
        """
67
        import xml and return added nodes
68
        """
69
        _logger.info("Importing XML file %s", xmlpath)
70
        self.parser = XMLParser()
71
        if not (await self._check_requirements(xmlpath)):
72
            raise ImportError("Server doesn't satisfy required XML-Models. Import them first!")
73
        await self.parser.parse(xmlpath, xmlstring)
74
        self.namespaces = await self._map_namespaces(self.parser.get_used_namespaces())
75
        _logger.info("namespace map: %s", self.namespaces)
76
        self.aliases = self._map_aliases(self.parser.get_aliases())
77
        self.refs = []
78
        dnodes = self.parser.get_node_datas()
79
        dnodes = self.make_objects(dnodes)
80
        self._add_missing_parents(dnodes)
81
        nodes_parsed = self._sort_nodes_by_parentid(dnodes)
82
        nodes = []
83
        for nodedata in nodes_parsed:  # self.parser:
84
            try:
85
                node = await self._add_node_data(nodedata)
86
            except Exception:
87
                _logger.warning("failure adding node %s", nodedata)
88
                raise
89
            nodes.append(node)
90
        self.refs, remaining_refs = [], self.refs
91
        await self._add_references(remaining_refs)
92
        if len(self.refs):
93
            _logger.warning("The following references could not be imported and are probably broken: %s", self.refs)
94
        return nodes
95
96
    def _add_missing_parents(self, dnodes):
97
        missing = []
98
        childs = {}
99
        for nd in dnodes:
100
            if not nd.parent or nd.parent == nd.nodeid:
101
                missing.append(nd)
102
            for ref in nd.refs:
103
                if ref.forward:
104
                    if ref.reftype in [self.server.nodes.HasComponent.nodeid, self.server.nodes.HasProperty.nodeid, self.server.nodes.Organizes.nodeid]:
105
                        # if a node has several links, the last one will win
106
                        if ref.target in childs:
107
                            _logger.warning("overwriting parent target, shouldbe fixed", ref.target, nd.nodeid, ref.reftype, childs[ref.target])
108
                        childs[ref.target] = (nd.nodeid, ref.reftype)
109
        for nd in missing:
110
            if nd.nodeid in childs:
111
                target, reftype = childs[nd.nodeid]
112
                nd.parent = target
113
                nd.parentlink = reftype
114
                from IPython import embed
115
                embed()
116
117
    async def _add_node_data(self, nodedata) -> "Node":
118
        if nodedata.nodetype == "UAObject":
119
            node = await self.add_object(nodedata)
120
        elif nodedata.nodetype == "UAObjectType":
121
            node = await self.add_object_type(nodedata)
122
        elif nodedata.nodetype == "UAVariable":
123
            node = await self.add_variable(nodedata)
124
        elif nodedata.nodetype == "UAVariableType":
125
            node = await self.add_variable_type(nodedata)
126
        elif nodedata.nodetype == "UAReferenceType":
127
            node = await self.add_reference_type(nodedata)
128
        elif nodedata.nodetype == "UADataType":
129
            node = await self.add_datatype(nodedata)
130
        elif nodedata.nodetype == "UAMethod":
131
            node = await self.add_method(nodedata)
132
        else:
133
            raise ValueError(f"Not implemented node type: {nodedata.nodetype} ")
134
        return node
135
136
    def _get_server(self):
137
        if hasattr(self.server, "iserver"):
138
            return self.server.iserver.isession
139
        else:
140
            return self.server.uaclient
141
142
    async def _add_references(self, refs):
143
        res = await self._get_server().add_references(refs)
144
145
        for sc, ref in zip(res, refs):
146
            if not sc.is_good():
147
                self.refs.append(ref)
148
149
    def make_objects(self, node_data):
150
        new_nodes = []
151
        for node_datum in node_data:
152
            node_datum.nodeid = self._to_migrated_nodeid(node_datum.nodeid)
153
            node_datum.browsename = ua.QualifiedName.from_string(node_datum.browsename)
154
            if node_datum.parent:
155
                node_datum.parent = self._to_migrated_nodeid(node_datum.parent)
156
            if node_datum.parentlink:
157
                node_datum.parentlink = self._to_migrated_nodeid(node_datum.parentlink)
158
            if node_datum.typedef:
159
                node_datum.typedef = self._to_migrated_nodeid(node_datum.typedef)
160
            if node_datum.datatype:
161
                node_datum.datatype = self._to_migrated_nodeid(node_datum.datatype)
162
            new_nodes.append(node_datum)
163
            for ref in node_datum.refs:
164
                ref.reftype = self._to_migrated_nodeid(ref.reftype)
165
                ref.target = self._to_migrated_nodeid(ref.target)
166
            for field in node_datum.definitions:
167
                if field.datatype:
168
                    field.datatype = self._to_migrated_nodeid(field.datatype)
169
        return new_nodes
170
171
    def _migrate_ns(self, nodeid: ua.NodeId) -> ua.NodeId:
172
        """
173
        Check if the index of nodeid or browsename  given in the xml model file
174
        must be converted to a already existing namespace id based on the files
175
        namespace uri
176
177
        :returns: NodeId (str)
178
        """
179
        if nodeid.NamespaceIndex in self.namespaces:
180
            nodeid = copy(nodeid)
181
            nodeid.NamespaceIndex = self.namespaces[nodeid.NamespaceIndex]
182
        return nodeid
183
184
    def _get_add_node_item(self, obj):
185
        node = ua.AddNodesItem()
186
        node.RequestedNewNodeId = self._migrate_ns(obj.nodeid)
187
        node.BrowseName = self._migrate_ns(obj.browsename)
188
        _logger.info("Importing xml node (%s, %s) as (%s %s)", obj.browsename,
189
                         obj.nodeid, node.BrowseName, node.RequestedNewNodeId)
190
        node.NodeClass = getattr(ua.NodeClass, obj.nodetype[2:])
191
        if obj.parent and obj.parentlink:
192
            node.ParentNodeId = self._migrate_ns(obj.parent)
193
            node.ReferenceTypeId = self._migrate_ns(obj.parentlink)
194
        if obj.typedef:
195
            node.TypeDefinition = self._migrate_ns(obj.typedef)
196
        return node
197
198
    def _to_migrated_nodeid(self, nodeid: Union[ua.NodeId, None, str]) -> ua.NodeId:
199
        nodeid = self._to_nodeid(nodeid)
200
        return self._migrate_ns(nodeid)
201
202
    def _to_nodeid(self, nodeid: Union[ua.NodeId, None, str]) -> ua.NodeId:
203
        if isinstance(nodeid, ua.NodeId):
204
            return nodeid
205
        elif not nodeid:
206
            return ua.NodeId(ua.ObjectIds.String)
207
        elif "=" in nodeid:
208
            return ua.NodeId.from_string(nodeid)
209
        elif hasattr(ua.ObjectIds, nodeid):
210
            return ua.NodeId(getattr(ua.ObjectIds, nodeid))
211
        else:
212
            if nodeid in self.aliases:
213
                return self.aliases[nodeid]
214
            else:
215
                return ua.NodeId(getattr(ua.ObjectIds, nodeid))
216
217
    async def add_object(self, obj):
218
        node = self._get_add_node_item(obj)
219
        attrs = ua.ObjectAttributes()
220
        if obj.desc:
221
            attrs.Description = ua.LocalizedText(obj.desc)
222
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
223
        attrs.EventNotifier = obj.eventnotifier
224
        node.NodeAttributes = attrs
225
        res = await self._get_server().add_nodes([node])
226
        await self._add_refs(obj)
227
        res[0].StatusCode.check()
228
        return res[0].AddedNodeId
229
230
    async def add_object_type(self, obj):
231
        node = self._get_add_node_item(obj)
232
        attrs = ua.ObjectTypeAttributes()
233
        if obj.desc:
234
            attrs.Description = ua.LocalizedText(obj.desc)
235
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
236
        attrs.IsAbstract = obj.abstract
237
        node.NodeAttributes = attrs
238
        res = await self._get_server().add_nodes([node])
239
        await self._add_refs(obj)
240
        res[0].StatusCode.check()
241
        return res[0].AddedNodeId
242
243
    async def add_variable(self, obj):
244
        node = self._get_add_node_item(obj)
245
        attrs = ua.VariableAttributes()
246
        if obj.desc:
247
            attrs.Description = ua.LocalizedText(obj.desc)
248
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
249
        attrs.DataType = obj.datatype
250
        if obj.value is not None:
251
            attrs.Value = self._add_variable_value(obj,)
252
        if obj.rank:
253
            attrs.ValueRank = obj.rank
254
        if obj.accesslevel:
255
            attrs.AccessLevel = obj.accesslevel
256
        if obj.useraccesslevel:
257
            attrs.UserAccessLevel = obj.useraccesslevel
258
        if obj.minsample:
259
            attrs.MinimumSamplingInterval = obj.minsample
260
        if obj.dimensions:
261
            attrs.ArrayDimensions = obj.dimensions
262
        node.NodeAttributes = attrs
263
        res = await self._get_server().add_nodes([node])
264
        await self._add_refs(obj)
265
        res[0].StatusCode.check()
266
        return res[0].AddedNodeId
267
268
    def _get_ext_class(self, name: str):
269
        if hasattr(ua, name):
270
            return getattr(ua, name)
271
        elif name in self.aliases.keys():
272
            nodeid = self.aliases[name]
273
            class_type = ua.uatypes.get_extensionobject_class_type(nodeid)
274
            if class_type:
275
                return class_type
276
            else:
277
                raise Exception("Error no extension class registered ", name, nodeid)
278
        else:
279
            raise Exception("Error no alias found for extension class", name)
280
281
    def _make_ext_obj(self, obj):
282
        ext = self._get_ext_class(obj.objname)()
283
        for name, val in obj.body:
284
            if not isinstance(val, list):
285
                raise Exception("Error val should be a list, this is a python-asyncua bug", name, type(val), val)
286
            else:
287
                for attname, v in val:
288
                    self._set_attr(ext, attname, v)
289
        return ext
290
291
    def _get_val_type(self, obj, attname: str):
292
        for name, uatype in obj.ua_types:
293
            if name == attname:
294
                return uatype
295
        raise UaError(f"Attribute '{attname}' defined in xml is not found in object '{obj}'")
296
297
    def _set_attr(self, obj, attname: str, val):
298
        # tow possible values:
299
        # either we get value directly
300
        # or a dict if it s an object or a list
301
        if isinstance(val, str):
302
            pval = ua_type_to_python(val, self._get_val_type(obj, attname))
303
            setattr(obj, attname, pval)
304
        else:
305
            # so we have either an object or a list...
306
            obj2 = getattr(obj, attname)
307
            if isinstance(obj2, ua.NodeId):  # NodeId representation does not follow common rules!!
308
                for attname2, v2 in val:
309
                    if attname2 == "Identifier":
310
                        if hasattr(ua.ObjectIds, v2):
311
                            obj2 = ua.NodeId(getattr(ua.ObjectIds, v2))
312
                        else:
313
                            obj2 = ua.NodeId.from_string(v2)
314
                        setattr(obj, attname, self._migrate_ns(obj2))
315
                        break
316
            elif not hasattr(obj2, "ua_types"):
317
                # we probably have a list
318
                my_list = []
319
                for vtype, v2 in val:
320
                    my_list.append(ua_type_to_python(v2, vtype))
321
                setattr(obj, attname, my_list)
322
            else:
323
                for attname2, v2 in val:
324
                    self._set_attr(obj2, attname2, v2)
325
                setattr(obj, attname, obj2)
326
327
    def _add_variable_value(self, obj):
328
        """
329
        Returns the value for a Variable based on the objects value type.
330
        """
331
        _logger.debug("Setting value with type %s and value %s", obj.valuetype, obj.value)
332
        if obj.valuetype == 'ListOfExtensionObject':
333
            values = []
334
            for ext in obj.value:
335
                extobj = self._make_ext_obj(ext)
336
                values.append(extobj)
337
            return ua.Variant(values, ua.VariantType.ExtensionObject)
338
        elif obj.valuetype == 'ListOfGuid':
339
            return ua.Variant([
340
                uuid.UUID(guid) for guid in obj.value
341
            ], getattr(ua.VariantType, obj.valuetype[6:]))
342
        elif obj.valuetype.startswith("ListOf"):
343
            vtype = obj.valuetype[6:]
344
            if hasattr(ua.ua_binary.Primitives, vtype):
345
                return ua.Variant(obj.value, getattr(ua.VariantType, vtype))
346
            elif vtype == "LocalizedText":
347
                return ua.Variant([getattr(ua, vtype)(text=item["Text"], locale=item["Locale"]) for item in obj.value])
348
            else:
349
                return ua.Variant([getattr(ua, vtype)(v) for v in obj.value])
350
        elif obj.valuetype == 'ExtensionObject':
351
            extobj = self._make_ext_obj(obj.value)
352
            return ua.Variant(extobj, getattr(ua.VariantType, obj.valuetype))
353
        elif obj.valuetype == 'Guid':
354
            return ua.Variant(uuid.UUID(obj.value), getattr(ua.VariantType, obj.valuetype))
355
        elif obj.valuetype == 'LocalizedText':
356
            ltext = ua.LocalizedText()
357
            for name, val in obj.value:
358
                if name == "Text":
359
                    ltext.Text = val
360
                elif name == "Locale":
361
                    ltext.Locale = val
362
                else:
363
                    _logger.warning("While parsing localizedText value, unkown element: %s with val: %s", name, val)
364
            return ua.Variant(ltext, ua.VariantType.LocalizedText)
365
        elif obj.valuetype == 'NodeId':
366
            return ua.Variant(ua.NodeId.from_string(obj.value))
367
        else:
368
            return ua.Variant(obj.value, getattr(ua.VariantType, obj.valuetype))
369
370
    async def add_variable_type(self, obj):
371
        node = self._get_add_node_item(obj)
372
        attrs = ua.VariableTypeAttributes()
373
        if obj.desc:
374
            attrs.Description = ua.LocalizedText(obj.desc)
375
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
376
        attrs.DataType = obj.datatype
377
        if obj.value and len(obj.value) == 1:
378
            attrs.Value = obj.value[0]
379
        if obj.rank:
380
            attrs.ValueRank = obj.rank
381
        if obj.abstract:
382
            attrs.IsAbstract = obj.abstract
383
        if obj.dimensions:
384
            attrs.ArrayDimensions = obj.dimensions
385
        node.NodeAttributes = attrs
386
        res = await self._get_server().add_nodes([node])
387
        await self._add_refs(obj)
388
        res[0].StatusCode.check()
389
        return res[0].AddedNodeId
390
391
    async def add_method(self, obj):
392
        node = self._get_add_node_item(obj)
393
        attrs = ua.MethodAttributes()
394
        if obj.desc:
395
            attrs.Description = ua.LocalizedText(obj.desc)
396
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
397
        if obj.accesslevel:
398
            attrs.AccessLevel = obj.accesslevel
399
        if obj.useraccesslevel:
400
            attrs.UserAccessLevel = obj.useraccesslevel
401
        if obj.minsample:
402
            attrs.MinimumSamplingInterval = obj.minsample
403
        if obj.dimensions:
404
            attrs.ArrayDimensions = obj.dimensions
405
        node.NodeAttributes = attrs
406
        res = await self._get_server().add_nodes([node])
407
        await self._add_refs(obj)
408
        res[0].StatusCode.check()
409
        return res[0].AddedNodeId
410
411
    async def add_reference_type(self, obj):
412
        node = self._get_add_node_item(obj)
413
        attrs = ua.ReferenceTypeAttributes()
414
        if obj.desc:
415
            attrs.Description = ua.LocalizedText(obj.desc)
416
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
417
        if obj. inversename:
418
            attrs.InverseName = ua.LocalizedText(obj.inversename)
419
        if obj.abstract:
420
            attrs.IsAbstract = obj.abstract
421
        if obj.symmetric:
422
            attrs.Symmetric = obj.symmetric
423
        node.NodeAttributes = attrs
424
        res = await self._get_server().add_nodes([node])
425
        await self._add_refs(obj)
426
        res[0].StatusCode.check()
427
        return res[0].AddedNodeId
428
429
    async def add_datatype(self, obj):
430
        node = self._get_add_node_item(obj)
431
        attrs = ua.DataTypeAttributes()
432
        if obj.desc:
433
            attrs.Description = ua.LocalizedText(obj.desc)
434
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
435
        if obj.abstract:
436
            attrs.IsAbstract = obj.abstract
437
        if not obj.definitions:
438
            pass
439
        else:
440
            if obj.parent == self.server.nodes.enum_data_type.nodeid:
441
                attrs.DataTypeDefinition = self._get_edef(node, obj)
442
            elif obj.parent == self.server.nodes.base_structure_type.nodeid:
443
                attrs.DataTypeDefinition = self._get_sdef(node, obj)
444
            else:
445
                parent_node = self.server.get_node(obj.parent)
446
                path = await parent_node.get_path()
447
                if self.server.nodes.base_structure_type in path:
448
                    attrs.DataTypeDefinition = self._get_sdef(node, obj)
449
                else:
450
                    _logger.warning("%s has datatypedefinition and path %s but we could not find out if this is a struct", obj, path)
451
        node.NodeAttributes = attrs
452
        res = await self._get_server().add_nodes([node])
453
        res[0].StatusCode.check()
454
        await self._add_refs(obj)
455
        return res[0].AddedNodeId
456
457
    async def _add_refs(self, obj):
458
        if not obj.refs:
459
            return
460
        refs = []
461
        for data in obj.refs:
462
            ref = ua.AddReferencesItem()
463
            ref.IsForward = data.forward
464
            ref.ReferenceTypeId = data.reftype
465
            ref.SourceNodeId = obj.nodeid
466
            ref.TargetNodeId = data.target
467
            refs.append(ref)
468
        await self._add_references(refs)
469
470
    def _get_edef(self, node, obj):
471
        if not obj.definitions:
472
            return None
473
        edef = ua.EnumDefinition()
474
        if obj.parent:
475
            edef.BaseDataType = obj.parent
476
        for field in obj.definitions:
477
            f = ua.EnumField()
478
            f.Name = field.name
479
            if field.dname:
480
                f.DisplayName = ua.LocalizedText(text=field.dname)
481
            else:
482
                f.DisplayName = ua.LocalizedText(text=field.name)
483
            f.Value = field.value
484
            f.Description = ua.LocalizedText(text=field.desc)
485
            edef.Fields.append(f)
486
        return edef
487
488
    def _get_sdef(self, node, obj):
489
        if not obj.definitions:
490
            return None
491
        sdef = ua.StructureDefinition()
492
        if obj.parent:
493
            sdef.BaseDataType = obj.parent
494
        for data in obj.refs:
495
            if data.reftype == self.server.nodes.HasEncoding.nodeid:
496
                # looks likebinary encodingisthe firt one...can someone confirm?
497
                sdef.DefaultEncodingId = data.target
498
                break
499
        optional = False
500
        for field in obj.definitions:
501
            f = ua.StructureField()
502
            f.Name = field.name
503
            f.DataType = field.datatype
504
            f.ValueRank = field.valuerank
505
            f.IsOptional = field.optional
506
            if f.IsOptional:
507
                optional = True
508
            f.ArrayDimensions = field.arraydim
509
            f.Description = ua.LocalizedText(text=field.desc)
510
            sdef.Fields.append(f)
511
        if optional:
512
            sdef.StructureType = ua.StructureType.StructureWithOptionalFields
513
        else:
514
            sdef.StructureType = ua.StructureType.Structure
515
        return sdef
516
517
    def _sort_nodes_by_parentid(self, ndatas):
518
        """
519
        Sort the list of nodes according their parent node in order to respect
520
        the dependency between nodes.
521
522
        :param nodes: list of NodeDataObjects
523
        :returns: list of sorted nodes
524
        """
525
526
        sorted_ndatas = []
527
        sorted_nodes_ids = []
528
        all_node_ids = [data.nodeid for data in ndatas]
529
        while ndatas:
530
            for ndata in ndatas[:]:
531
                if ndata.nodeid.NamespaceIndex not in self.namespaces.values() or \
532
                        ndata.parent is None or \
533
                        ndata.parent not in all_node_ids:
534
                    sorted_ndatas.append(ndata)
535
                    sorted_nodes_ids.append(ndata.nodeid)
536
                    ndatas.remove(ndata)
537
                else:
538
                    # Check if the nodes parent is already in the list of
539
                    # inserted nodes
540
                    if ndata.parent in sorted_nodes_ids:
541
                        sorted_ndatas.append(ndata)
542
                        sorted_nodes_ids.append(ndata.nodeid)
543
                        ndatas.remove(ndata)
544
        return sorted_ndatas
545