Passed
Pull Request — master (#358)
by
unknown
03:50 queued 01:08
created

XmlImporter._check_requirements()   C

Complexity

Conditions 9

Size

Total Lines 15
Code Lines 15

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 9
eloc 15
nop 2
dl 0
loc 15
rs 6.6666
c 0
b 0
f 0
1
"""
2
add nodes defined in XML to address space
3
format is the one from opc-ua specification
4
"""
5
import logging
6
import uuid
7
from typing import Coroutine, Union, Dict
8
from copy import copy
9
10
from asyncua import ua
11
from .xmlparser import XMLParser, ua_type_to_python
12
from ..ua.uaerrors import UaError
13
14
15
_logger = logging.getLogger(__name__)
16
17
18
class XmlImporter:
19
20
    def __init__(self, server):
21
        self.parser = None
22
        self.server = server
23
        self.namespaces: Dict[int, int] = {}  #Dict[IndexInXml, IndexInServer]
24
        self.aliases: Dict[str, ua.NodeId] = {}
25
        self.refs = None
26
27
    async def _map_namespaces(self, namespaces_uris):
28
        """
29
        creates a mapping between the namespaces in the xml file and in the server.
30
        if not present the namespace is registered.
31
        """
32
        namespaces = {}
33
        for ns_index, ns_uri in enumerate(namespaces_uris):
34
            ns_server_index = await self.server.register_namespace(ns_uri)
35
            namespaces[ns_index + 1] = ns_server_index
36
        return namespaces
37
38
    def _map_aliases(self, aliases: dict):
39
        """
40
        maps the import aliases to the correct namespaces
41
        """
42
        aliases_mapped = {}
43
        for alias, node_id in aliases.items():
44
            aliases_mapped[alias] = self._to_migrated_nodeid(node_id)
45
        return aliases_mapped
46
47
48
    async def _get_existing_model_in_namespace(self):
49
        server_model_list = []
50
        server_namespaces_node = await self.server.nodes.namespaces.get_children()  # get_node(ua.NodeId(ua.ObjectIds.Server_Namespaces))
51
        for model_node in server_namespaces_node:
52
            server_model_list.append({"ModelUri": await(await model_node.get_child("NamespaceUri")).read_value(),
53
                                      "Version": await(await model_node.get_child("NamespaceVersion")).read_value(),
54
                                      "PublicationDate": str(await(await model_node.get_child("NamespacePublicationDate")).read_value())})
55
        return server_model_list
56
57
    async def _check_requirements(self, xmlpath):
58
        req_models = self.parser.list_required_models(xmlpath)
59
        if req_models == []:
60
            return None
61
        server_model_list = await self._get_existing_model_in_namespace()
62
        for model in server_model_list:
63
            for req_model in req_models:
64
                if model["ModelUri"] == req_model["ModelUri"]:
65
                    if model["Version"] >= req_model["Version"] and model["PublicationDate"] >= req_model["PublicationDate"]:
66
                        server_model_list.remove(model)
67
        if len(server_model_list):
68
            for missing_model in server_model_list:
69
                _logger.warning("Model is missing: ", missing_model)
70
            raise ImportError("Server doesn't satisfy required XML-Models. Import them first!")
71
        return None
72
73
    async def import_xml(self, xmlpath=None, xmlstring=None):
74
        """
75
        import xml and return added nodes
76
        """
77
        _logger.info("Importing XML file %s", xmlpath)
78
        self.parser = XMLParser()
79
        await self._check_requirements(xmlpath)
80
        await self.parser.parse(xmlpath, xmlstring)
81
        self.namespaces = await self._map_namespaces(self.parser.get_used_namespaces())
82
        _logger.info("namespace map: %s", self.namespaces)
83
        self.aliases = self._map_aliases(self.parser.get_aliases())
84
        self.refs = []
85
        dnodes = self.parser.get_node_datas()
86
        dnodes = self.make_objects(dnodes)
87
        self._add_missing_parents(dnodes)
88
        nodes_parsed = self._sort_nodes_by_parentid(dnodes)
89
        nodes = []
90
        for nodedata in nodes_parsed:  # self.parser:
91
            try:
92
                node = await self._add_node_data(nodedata)
93
            except Exception:
94
                _logger.warning("failure adding node %s", nodedata)
95
                raise
96
            nodes.append(node)
97
        self.refs, remaining_refs = [], self.refs
98
        await self._add_references(remaining_refs)
99
        if len(self.refs):
100
            _logger.warning("The following references could not be imported and are probably broken: %s", self.refs)
101
        return nodes
102
103
    def _add_missing_parents(self, dnodes):
104
        missing = []
105
        childs = {}
106
        for nd in dnodes:
107
            if not nd.parent or nd.parent == nd.nodeid:
108
                missing.append(nd)
109
            for ref in nd.refs:
110
                if ref.forward:
111
                    if ref.reftype in [self.server.nodes.HasComponent.nodeid, self.server.nodes.HasProperty.nodeid, self.server.nodes.Organizes.nodeid]:
112
                        # if a node has several links, the last one will win
113
                        if ref.target in childs:
114
                            _logger.warning("overwriting parent target, shouldbe fixed", ref.target, nd.nodeid, ref.reftype, childs[ref.target])
115
                        childs[ref.target] = (nd.nodeid, ref.reftype)
116
        for nd in missing:
117
            if nd.nodeid in childs:
118
                target, reftype = childs[nd.nodeid]
119
                nd.parent = target
120
                nd.parentlink = reftype
121
                from IPython import embed
122
                embed()
123
124
    async def _add_node_data(self, nodedata) -> "Node":
125
        if nodedata.nodetype == "UAObject":
126
            node = await self.add_object(nodedata)
127
        elif nodedata.nodetype == "UAObjectType":
128
            node = await self.add_object_type(nodedata)
129
        elif nodedata.nodetype == "UAVariable":
130
            node = await self.add_variable(nodedata)
131
        elif nodedata.nodetype == "UAVariableType":
132
            node = await self.add_variable_type(nodedata)
133
        elif nodedata.nodetype == "UAReferenceType":
134
            node = await self.add_reference_type(nodedata)
135
        elif nodedata.nodetype == "UADataType":
136
            node = await self.add_datatype(nodedata)
137
        elif nodedata.nodetype == "UAMethod":
138
            node = await self.add_method(nodedata)
139
        else:
140
            raise ValueError(f"Not implemented node type: {nodedata.nodetype} ")
141
        return node
142
143
    def _get_server(self):
144
        if hasattr(self.server, "iserver"):
145
            return self.server.iserver.isession
146
        else:
147
            return self.server.uaclient
148
149
    async def _add_references(self, refs):
150
        res = await self._get_server().add_references(refs)
151
152
        for sc, ref in zip(res, refs):
153
            if not sc.is_good():
154
                self.refs.append(ref)
155
156
    def make_objects(self, node_data):
157
        new_nodes = []
158
        for node_datum in node_data:
159
            node_datum.nodeid = self._to_migrated_nodeid(node_datum.nodeid)
160
            node_datum.browsename = ua.QualifiedName.from_string(node_datum.browsename)
161
            if node_datum.parent:
162
                node_datum.parent = self._to_migrated_nodeid(node_datum.parent)
163
            if node_datum.parentlink:
164
                node_datum.parentlink = self._to_migrated_nodeid(node_datum.parentlink)
165
            if node_datum.typedef:
166
                node_datum.typedef = self._to_migrated_nodeid(node_datum.typedef)
167
            if node_datum.datatype:
168
                node_datum.datatype = self._to_migrated_nodeid(node_datum.datatype)
169
            new_nodes.append(node_datum)
170
            for ref in node_datum.refs:
171
                ref.reftype = self._to_migrated_nodeid(ref.reftype)
172
                ref.target = self._to_migrated_nodeid(ref.target)
173
            for field in node_datum.definitions:
174
                if field.datatype:
175
                    field.datatype = self._to_migrated_nodeid(field.datatype)
176
        return new_nodes
177
178
    def _migrate_ns(self, nodeid: ua.NodeId) -> ua.NodeId:
179
        """
180
        Check if the index of nodeid or browsename  given in the xml model file
181
        must be converted to a already existing namespace id based on the files
182
        namespace uri
183
184
        :returns: NodeId (str)
185
        """
186
        if nodeid.NamespaceIndex in self.namespaces:
187
            nodeid = copy(nodeid)
188
            nodeid.NamespaceIndex = self.namespaces[nodeid.NamespaceIndex]
189
        return nodeid
190
191
    def _get_add_node_item(self, obj):
192
        node = ua.AddNodesItem()
193
        node.RequestedNewNodeId = self._migrate_ns(obj.nodeid)
194
        node.BrowseName = self._migrate_ns(obj.browsename)
195
        _logger.info("Importing xml node (%s, %s) as (%s %s)", obj.browsename,
196
                     obj.nodeid, node.BrowseName, node.RequestedNewNodeId)
197
        node.NodeClass = getattr(ua.NodeClass, obj.nodetype[2:])
198
        if obj.parent and obj.parentlink:
199
            node.ParentNodeId = self._migrate_ns(obj.parent)
200
            node.ReferenceTypeId = self._migrate_ns(obj.parentlink)
201
        if obj.typedef:
202
            node.TypeDefinition = self._migrate_ns(obj.typedef)
203
        return node
204
205
    def _to_migrated_nodeid(self, nodeid: Union[ua.NodeId, None, str]) -> ua.NodeId:
206
        nodeid = self._to_nodeid(nodeid)
207
        return self._migrate_ns(nodeid)
208
209
    def _to_nodeid(self, nodeid: Union[ua.NodeId, None, str]) -> ua.NodeId:
210
        if isinstance(nodeid, ua.NodeId):
211
            return nodeid
212
        elif not nodeid:
213
            return ua.NodeId(ua.ObjectIds.String)
214
        elif "=" in nodeid:
215
            return ua.NodeId.from_string(nodeid)
216
        elif hasattr(ua.ObjectIds, nodeid):
217
            return ua.NodeId(getattr(ua.ObjectIds, nodeid))
218
        else:
219
            if nodeid in self.aliases:
220
                return self.aliases[nodeid]
221
            else:
222
                return ua.NodeId(getattr(ua.ObjectIds, nodeid))
223
224
    async def add_object(self, obj):
225
        node = self._get_add_node_item(obj)
226
        attrs = ua.ObjectAttributes()
227
        if obj.desc:
228
            attrs.Description = ua.LocalizedText(obj.desc)
229
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
230
        attrs.EventNotifier = obj.eventnotifier
231
        node.NodeAttributes = attrs
232
        res = await self._get_server().add_nodes([node])
233
        await self._add_refs(obj)
234
        res[0].StatusCode.check()
235
        return res[0].AddedNodeId
236
237
    async def add_object_type(self, obj):
238
        node = self._get_add_node_item(obj)
239
        attrs = ua.ObjectTypeAttributes()
240
        if obj.desc:
241
            attrs.Description = ua.LocalizedText(obj.desc)
242
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
243
        attrs.IsAbstract = obj.abstract
244
        node.NodeAttributes = attrs
245
        res = await self._get_server().add_nodes([node])
246
        await self._add_refs(obj)
247
        res[0].StatusCode.check()
248
        return res[0].AddedNodeId
249
250
    async def add_variable(self, obj):
251
        node = self._get_add_node_item(obj)
252
        attrs = ua.VariableAttributes()
253
        if obj.desc:
254
            attrs.Description = ua.LocalizedText(obj.desc)
255
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
256
        attrs.DataType = obj.datatype
257
        if obj.value is not None:
258
            attrs.Value = self._add_variable_value(obj,)
259
        if obj.rank:
260
            attrs.ValueRank = obj.rank
261
        if obj.accesslevel:
262
            attrs.AccessLevel = obj.accesslevel
263
        if obj.useraccesslevel:
264
            attrs.UserAccessLevel = obj.useraccesslevel
265
        if obj.minsample:
266
            attrs.MinimumSamplingInterval = obj.minsample
267
        if obj.dimensions:
268
            attrs.ArrayDimensions = obj.dimensions
269
        node.NodeAttributes = attrs
270
        res = await self._get_server().add_nodes([node])
271
        await self._add_refs(obj)
272
        res[0].StatusCode.check()
273
        return res[0].AddedNodeId
274
275
    def _get_ext_class(self, name: str):
276
        if hasattr(ua, name):
277
            return getattr(ua, name)
278
        elif name in self.aliases.keys():
279
            nodeid = self.aliases[name]
280
            class_type = ua.uatypes.get_extensionobject_class_type(nodeid)
281
            if class_type:
282
                return class_type
283
            else:
284
                raise Exception("Error no extension class registered ", name, nodeid)
285
        else:
286
            raise Exception("Error no alias found for extension class", name)
287
288
    def _make_ext_obj(self, obj):
289
        ext = self._get_ext_class(obj.objname)()
290
        for name, val in obj.body:
291
            if not isinstance(val, list):
292
                raise Exception("Error val should be a list, this is a python-asyncua bug", name, type(val), val)
293
            else:
294
                for attname, v in val:
295
                    self._set_attr(ext, attname, v)
296
        return ext
297
298
    def _get_val_type(self, obj, attname: str):
299
        for name, uatype in obj.ua_types:
300
            if name == attname:
301
                return uatype
302
        raise UaError(f"Attribute '{attname}' defined in xml is not found in object '{obj}'")
303
304
    def _set_attr(self, obj, attname: str, val):
305
        # tow possible values:
306
        # either we get value directly
307
        # or a dict if it s an object or a list
308
        if isinstance(val, str):
309
            pval = ua_type_to_python(val, self._get_val_type(obj, attname))
310
            setattr(obj, attname, pval)
311
        else:
312
            # so we have either an object or a list...
313
            obj2 = getattr(obj, attname)
314
            if isinstance(obj2, ua.NodeId):  # NodeId representation does not follow common rules!!
315
                for attname2, v2 in val:
316
                    if attname2 == "Identifier":
317
                        if hasattr(ua.ObjectIds, v2):
318
                            obj2 = ua.NodeId(getattr(ua.ObjectIds, v2))
319
                        else:
320
                            obj2 = ua.NodeId.from_string(v2)
321
                        setattr(obj, attname, self._migrate_ns(obj2))
322
                        break
323
            elif not hasattr(obj2, "ua_types"):
324
                # we probably have a list
325
                my_list = []
326
                for vtype, v2 in val:
327
                    my_list.append(ua_type_to_python(v2, vtype))
328
                setattr(obj, attname, my_list)
329
            else:
330
                for attname2, v2 in val:
331
                    self._set_attr(obj2, attname2, v2)
332
                setattr(obj, attname, obj2)
333
334
    def _add_variable_value(self, obj):
335
        """
336
        Returns the value for a Variable based on the objects value type.
337
        """
338
        _logger.debug("Setting value with type %s and value %s", obj.valuetype, obj.value)
339
        if obj.valuetype == 'ListOfExtensionObject':
340
            values = []
341
            for ext in obj.value:
342
                extobj = self._make_ext_obj(ext)
343
                values.append(extobj)
344
            return ua.Variant(values, ua.VariantType.ExtensionObject)
345
        elif obj.valuetype == 'ListOfGuid':
346
            return ua.Variant([
347
                uuid.UUID(guid) for guid in obj.value
348
            ], getattr(ua.VariantType, obj.valuetype[6:]))
349
        elif obj.valuetype.startswith("ListOf"):
350
            vtype = obj.valuetype[6:]
351
            if hasattr(ua.ua_binary.Primitives, vtype):
352
                return ua.Variant(obj.value, getattr(ua.VariantType, vtype))
353
            elif vtype == "LocalizedText":
354
                return ua.Variant([getattr(ua, vtype)(text=item["Text"], locale=item["Locale"]) for item in obj.value])
355
            else:
356
                return ua.Variant([getattr(ua, vtype)(v) for v in obj.value])
357
        elif obj.valuetype == 'ExtensionObject':
358
            extobj = self._make_ext_obj(obj.value)
359
            return ua.Variant(extobj, getattr(ua.VariantType, obj.valuetype))
360
        elif obj.valuetype == 'Guid':
361
            return ua.Variant(uuid.UUID(obj.value), getattr(ua.VariantType, obj.valuetype))
362
        elif obj.valuetype == 'LocalizedText':
363
            ltext = ua.LocalizedText()
364
            for name, val in obj.value:
365
                if name == "Text":
366
                    ltext.Text = val
367
                elif name == "Locale":
368
                    ltext.Locale = val
369
                else:
370
                    _logger.warning("While parsing localizedText value, unkown element: %s with val: %s", name, val)
371
            return ua.Variant(ltext, ua.VariantType.LocalizedText)
372
        elif obj.valuetype == 'NodeId':
373
            return ua.Variant(ua.NodeId.from_string(obj.value))
374
        else:
375
            return ua.Variant(obj.value, getattr(ua.VariantType, obj.valuetype))
376
377
    async def add_variable_type(self, obj):
378
        node = self._get_add_node_item(obj)
379
        attrs = ua.VariableTypeAttributes()
380
        if obj.desc:
381
            attrs.Description = ua.LocalizedText(obj.desc)
382
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
383
        attrs.DataType = obj.datatype
384
        if obj.value and len(obj.value) == 1:
385
            attrs.Value = obj.value[0]
386
        if obj.rank:
387
            attrs.ValueRank = obj.rank
388
        if obj.abstract:
389
            attrs.IsAbstract = obj.abstract
390
        if obj.dimensions:
391
            attrs.ArrayDimensions = obj.dimensions
392
        node.NodeAttributes = attrs
393
        res = await self._get_server().add_nodes([node])
394
        await self._add_refs(obj)
395
        res[0].StatusCode.check()
396
        return res[0].AddedNodeId
397
398
    async def add_method(self, obj):
399
        node = self._get_add_node_item(obj)
400
        attrs = ua.MethodAttributes()
401
        if obj.desc:
402
            attrs.Description = ua.LocalizedText(obj.desc)
403
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
404
        if obj.accesslevel:
405
            attrs.AccessLevel = obj.accesslevel
406
        if obj.useraccesslevel:
407
            attrs.UserAccessLevel = obj.useraccesslevel
408
        if obj.minsample:
409
            attrs.MinimumSamplingInterval = obj.minsample
410
        if obj.dimensions:
411
            attrs.ArrayDimensions = obj.dimensions
412
        node.NodeAttributes = attrs
413
        res = await self._get_server().add_nodes([node])
414
        await self._add_refs(obj)
415
        res[0].StatusCode.check()
416
        return res[0].AddedNodeId
417
418
    async def add_reference_type(self, obj):
419
        node = self._get_add_node_item(obj)
420
        attrs = ua.ReferenceTypeAttributes()
421
        if obj.desc:
422
            attrs.Description = ua.LocalizedText(obj.desc)
423
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
424
        if obj. inversename:
425
            attrs.InverseName = ua.LocalizedText(obj.inversename)
426
        if obj.abstract:
427
            attrs.IsAbstract = obj.abstract
428
        if obj.symmetric:
429
            attrs.Symmetric = obj.symmetric
430
        node.NodeAttributes = attrs
431
        res = await self._get_server().add_nodes([node])
432
        await self._add_refs(obj)
433
        res[0].StatusCode.check()
434
        return res[0].AddedNodeId
435
436
    async def add_datatype(self, obj):
437
        node = self._get_add_node_item(obj)
438
        attrs = ua.DataTypeAttributes()
439
        if obj.desc:
440
            attrs.Description = ua.LocalizedText(obj.desc)
441
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
442
        if obj.abstract:
443
            attrs.IsAbstract = obj.abstract
444
        if not obj.definitions:
445
            pass
446
        else:
447
            if obj.parent == self.server.nodes.enum_data_type.nodeid:
448
                attrs.DataTypeDefinition = self._get_edef(node, obj)
449
            elif obj.parent == self.server.nodes.base_structure_type.nodeid:
450
                attrs.DataTypeDefinition = self._get_sdef(node, obj)
451
            else:
452
                parent_node = self.server.get_node(obj.parent)
453
                path = await parent_node.get_path()
454
                if self.server.nodes.base_structure_type in path:
455
                    attrs.DataTypeDefinition = self._get_sdef(node, obj)
456
                else:
457
                    _logger.warning("%s has datatypedefinition and path %s but we could not find out if this is a struct", obj, path)
458
        node.NodeAttributes = attrs
459
        res = await self._get_server().add_nodes([node])
460
        res[0].StatusCode.check()
461
        await self._add_refs(obj)
462
        return res[0].AddedNodeId
463
464
    async def _add_refs(self, obj):
465
        if not obj.refs:
466
            return
467
        refs = []
468
        for data in obj.refs:
469
            ref = ua.AddReferencesItem()
470
            ref.IsForward = data.forward
471
            ref.ReferenceTypeId = data.reftype
472
            ref.SourceNodeId = obj.nodeid
473
            ref.TargetNodeId = data.target
474
            refs.append(ref)
475
        await self._add_references(refs)
476
477
    def _get_edef(self, node, obj):
478
        if not obj.definitions:
479
            return None
480
        edef = ua.EnumDefinition()
481
        if obj.parent:
482
            edef.BaseDataType = obj.parent
483
        for field in obj.definitions:
484
            f = ua.EnumField()
485
            f.Name = field.name
486
            if field.dname:
487
                f.DisplayName = ua.LocalizedText(text=field.dname)
488
            else:
489
                f.DisplayName = ua.LocalizedText(text=field.name)
490
            f.Value = field.value
491
            f.Description = ua.LocalizedText(text=field.desc)
492
            edef.Fields.append(f)
493
        return edef
494
495
    def _get_sdef(self, node, obj):
496
        if not obj.definitions:
497
            return None
498
        sdef = ua.StructureDefinition()
499
        if obj.parent:
500
            sdef.BaseDataType = obj.parent
501
        for data in obj.refs:
502
            if data.reftype == self.server.nodes.HasEncoding.nodeid:
503
                # looks likebinary encodingisthe firt one...can someone confirm?
504
                sdef.DefaultEncodingId = data.target
505
                break
506
        optional = False
507
        for field in obj.definitions:
508
            f = ua.StructureField()
509
            f.Name = field.name
510
            f.DataType = field.datatype
511
            f.ValueRank = field.valuerank
512
            f.IsOptional = field.optional
513
            if f.IsOptional:
514
                optional = True
515
            f.ArrayDimensions = field.arraydim
516
            f.Description = ua.LocalizedText(text=field.desc)
517
            sdef.Fields.append(f)
518
        if optional:
519
            sdef.StructureType = ua.StructureType.StructureWithOptionalFields
520
        else:
521
            sdef.StructureType = ua.StructureType.Structure
522
        return sdef
523
524
    def _sort_nodes_by_parentid(self, ndatas):
525
        """
526
        Sort the list of nodes according their parent node in order to respect
527
        the dependency between nodes.
528
529
        :param nodes: list of NodeDataObjects
530
        :returns: list of sorted nodes
531
        """
532
533
        sorted_ndatas = []
534
        sorted_nodes_ids = []
535
        all_node_ids = [data.nodeid for data in ndatas]
536
        while ndatas:
537
            for ndata in ndatas[:]:
538
                if ndata.nodeid.NamespaceIndex not in self.namespaces.values() or \
539
                        ndata.parent is None or \
540
                        ndata.parent not in all_node_ids:
541
                    sorted_ndatas.append(ndata)
542
                    sorted_nodes_ids.append(ndata.nodeid)
543
                    ndatas.remove(ndata)
544
                else:
545
                    # Check if the nodes parent is already in the list of
546
                    # inserted nodes
547
                    if ndata.parent in sorted_nodes_ids:
548
                        sorted_ndatas.append(ndata)
549
                        sorted_nodes_ids.append(ndata.nodeid)
550
                        ndatas.remove(ndata)
551
        return sorted_ndatas
552