XmlImporter._add_variable_value()   F
last analyzed

Complexity

Conditions 14

Size

Total Lines 42
Code Lines 36

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 14
eloc 36
nop 2
dl 0
loc 42
rs 3.6
c 0
b 0
f 0

How to fix   Complexity   

Complexity

Complex classes like asyncua.common.xmlimporter.XmlImporter._add_variable_value() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
add nodes defined in XML to address space
3
format is the one from opc-ua specification
4
"""
5
import logging
6
import uuid
7
from typing import Union, Dict
8
from copy import copy
9
10
from asyncua import ua
11
from .xmlparser import XMLParser, ua_type_to_python
12
from ..ua.uaerrors import UaError
13
14
15
_logger = logging.getLogger(__name__)
16
17
18
class XmlImporter:
19
20
    def __init__(self, server):
21
        self.parser = None
22
        self.server = server
23
        self.namespaces: Dict[int, int] = {}  #Dict[IndexInXml, IndexInServer]
24
        self.aliases: Dict[str, ua.NodeId] = {}
25
        self.refs = None
26
27
    async def _map_namespaces(self, namespaces_uris):
28
        """
29
        creates a mapping between the namespaces in the xml file and in the server.
30
        if not present the namespace is registered.
31
        """
32
        namespaces = {}
33
        for ns_index, ns_uri in enumerate(namespaces_uris):
34
            ns_server_index = await self.server.register_namespace(ns_uri)
35
            namespaces[ns_index + 1] = ns_server_index
36
        return namespaces
37
38
    def _map_aliases(self, aliases: dict):
39
        """
40
        maps the import aliases to the correct namespaces
41
        """
42
        aliases_mapped = {}
43
        for alias, node_id in aliases.items():
44
            aliases_mapped[alias] = self._to_migrated_nodeid(node_id)
45
        return aliases_mapped
46
47
    async def import_xml(self, xmlpath=None, xmlstring=None):
48
        """
49
        import xml and return added nodes
50
        """
51
        _logger.info("Importing XML file %s", xmlpath)
52
        self.parser = XMLParser()
53
        await self.parser.parse(xmlpath, xmlstring)
54
        self.namespaces = await self._map_namespaces(self.parser.get_used_namespaces())
55
        _logger.info("namespace map: %s", self.namespaces)
56
        self.aliases = self._map_aliases(self.parser.get_aliases())
57
        self.refs = []
58
        dnodes = self.parser.get_node_datas()
59
        dnodes = self.make_objects(dnodes)
60
        self._add_missing_parents(dnodes)
61
        nodes_parsed = self._sort_nodes_by_parentid(dnodes)
62
        nodes = []
63
        for nodedata in nodes_parsed:  # self.parser:
64
            try:
65
                node = await self._add_node_data(nodedata)
66
            except Exception:
67
                _logger.warning("failure adding node %s", nodedata)
68
                raise
69
            nodes.append(node)
70
        self.refs, remaining_refs = [], self.refs
71
        await self._add_references(remaining_refs)
72
        if len(self.refs):
73
            _logger.warning("The following references could not be imported and are probably broken: %s", self.refs)
74
        return nodes
75
76
    def _add_missing_parents(self, dnodes):
77
        missing = []
78
        childs = {}
79
        for nd in dnodes:
80
            if not nd.parent or nd.parent == nd.nodeid:
81
                missing.append(nd)
82
            for ref in nd.refs:
83
                if ref.forward:
84
                    if ref.reftype in [self.server.nodes.HasComponent.nodeid, self.server.nodes.HasProperty.nodeid, self.server.nodes.Organizes.nodeid]:
85
                        # if a node has several links, the last one will win
86
                        if ref.target in childs:
87
                            _logger.warning("overwriting parent target, shouldbe fixed", ref.target, nd.nodeid, ref.reftype, childs[ref.target])
88
                        childs[ref.target] = (nd.nodeid, ref.reftype)
89
        for nd in missing:
90
            if nd.nodeid in childs:
91
                target, reftype = childs[nd.nodeid]
92
                nd.parent = target
93
                nd.parentlink = reftype
94
                from IPython import embed
95
                embed()
96
97
    async def _add_node_data(self, nodedata) -> ua.NodeId:
98
        if nodedata.nodetype == "UAObject":
99
            node = await self.add_object(nodedata)
100
        elif nodedata.nodetype == "UAObjectType":
101
            node = await self.add_object_type(nodedata)
102
        elif nodedata.nodetype == "UAVariable":
103
            node = await self.add_variable(nodedata)
104
        elif nodedata.nodetype == "UAVariableType":
105
            node = await self.add_variable_type(nodedata)
106
        elif nodedata.nodetype == "UAReferenceType":
107
            node = await self.add_reference_type(nodedata)
108
        elif nodedata.nodetype == "UADataType":
109
            node = await self.add_datatype(nodedata)
110
        elif nodedata.nodetype == "UAMethod":
111
            node = await self.add_method(nodedata)
112
        else:
113
            raise ValueError(f"Not implemented node type: {nodedata.nodetype} ")
114
        return node
115
116
    def _get_server(self):
117
        if hasattr(self.server, "iserver"):
118
            return self.server.iserver.isession
119
        else:
120
            return self.server.uaclient
121
122
    async def _add_references(self, refs):
123
        res = await self._get_server().add_references(refs)
124
125
        for sc, ref in zip(res, refs):
126
            if not sc.is_good():
127
                self.refs.append(ref)
128
129
    def make_objects(self, node_data):
130
        new_nodes = []
131
        for node_datum in node_data:
132
            node_datum.nodeid = self._to_migrated_nodeid(node_datum.nodeid)
133
            node_datum.browsename = ua.QualifiedName.from_string(node_datum.browsename)
134
            if node_datum.parent:
135
                node_datum.parent = self._to_migrated_nodeid(node_datum.parent)
136
            if node_datum.parentlink:
137
                node_datum.parentlink = self._to_migrated_nodeid(node_datum.parentlink)
138
            if node_datum.typedef:
139
                node_datum.typedef = self._to_migrated_nodeid(node_datum.typedef)
140
            if node_datum.datatype:
141
                node_datum.datatype = self._to_migrated_nodeid(node_datum.datatype)
142
            new_nodes.append(node_datum)
143
            for ref in node_datum.refs:
144
                ref.reftype = self._to_migrated_nodeid(ref.reftype)
145
                ref.target = self._to_migrated_nodeid(ref.target)
146
            for field in node_datum.definitions:
147
                if field.datatype:
148
                    field.datatype = self._to_migrated_nodeid(field.datatype)
149
        return new_nodes
150
151
    def _migrate_ns(self, nodeid: ua.NodeId) -> ua.NodeId:
152
        """
153
        Check if the index of nodeid or browsename  given in the xml model file
154
        must be converted to a already existing namespace id based on the files
155
        namespace uri
156
157
        :returns: NodeId (str)
158
        """
159
        if nodeid.NamespaceIndex in self.namespaces:
160
            nodeid = copy(nodeid)
161
            nodeid.NamespaceIndex = self.namespaces[nodeid.NamespaceIndex]
162
        return nodeid
163
164
    def _get_add_node_item(self, obj):
165
        node = ua.AddNodesItem()
166
        node.RequestedNewNodeId = self._migrate_ns(obj.nodeid)
167
        node.BrowseName = self._migrate_ns(obj.browsename)
168
        _logger.info("Importing xml node (%s, %s) as (%s %s)", obj.browsename,
169
                         obj.nodeid, node.BrowseName, node.RequestedNewNodeId)
170
        node.NodeClass = getattr(ua.NodeClass, obj.nodetype[2:])
171
        if obj.parent and obj.parentlink:
172
            node.ParentNodeId = self._migrate_ns(obj.parent)
173
            node.ReferenceTypeId = self._migrate_ns(obj.parentlink)
174
        if obj.typedef:
175
            node.TypeDefinition = self._migrate_ns(obj.typedef)
176
        return node
177
178
    def _to_migrated_nodeid(self, nodeid: Union[ua.NodeId, None, str]) -> ua.NodeId:
179
        nodeid = self._to_nodeid(nodeid)
180
        return self._migrate_ns(nodeid)
181
182
    def _to_nodeid(self, nodeid: Union[ua.NodeId, None, str]) -> ua.NodeId:
183
        if isinstance(nodeid, ua.NodeId):
184
            return nodeid
185
        elif not nodeid:
186
            return ua.NodeId(ua.ObjectIds.String)
187
        elif "=" in nodeid:
188
            return ua.NodeId.from_string(nodeid)
189
        elif hasattr(ua.ObjectIds, nodeid):
190
            return ua.NodeId(getattr(ua.ObjectIds, nodeid))
191
        else:
192
            if nodeid in self.aliases:
193
                return self.aliases[nodeid]
194
            else:
195
                return ua.NodeId(getattr(ua.ObjectIds, nodeid))
196
197
    async def add_object(self, obj):
198
        node = self._get_add_node_item(obj)
199
        attrs = ua.ObjectAttributes()
200
        if obj.desc:
201
            attrs.Description = ua.LocalizedText(obj.desc)
202
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
203
        attrs.EventNotifier = obj.eventnotifier
204
        node.NodeAttributes = attrs
205
        res = await self._get_server().add_nodes([node])
206
        await self._add_refs(obj)
207
        res[0].StatusCode.check()
208
        return res[0].AddedNodeId
209
210
    async def add_object_type(self, obj):
211
        node = self._get_add_node_item(obj)
212
        attrs = ua.ObjectTypeAttributes()
213
        if obj.desc:
214
            attrs.Description = ua.LocalizedText(obj.desc)
215
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
216
        attrs.IsAbstract = obj.abstract
217
        node.NodeAttributes = attrs
218
        res = await self._get_server().add_nodes([node])
219
        await self._add_refs(obj)
220
        res[0].StatusCode.check()
221
        return res[0].AddedNodeId
222
223
    async def add_variable(self, obj):
224
        node = self._get_add_node_item(obj)
225
        attrs = ua.VariableAttributes()
226
        if obj.desc:
227
            attrs.Description = ua.LocalizedText(obj.desc)
228
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
229
        attrs.DataType = obj.datatype
230
        if obj.value is not None:
231
            attrs.Value = self._add_variable_value(obj,)
232
        if obj.rank:
233
            attrs.ValueRank = obj.rank
234
        if obj.accesslevel:
235
            attrs.AccessLevel = obj.accesslevel
236
        if obj.useraccesslevel:
237
            attrs.UserAccessLevel = obj.useraccesslevel
238
        if obj.minsample:
239
            attrs.MinimumSamplingInterval = obj.minsample
240
        if obj.dimensions:
241
            attrs.ArrayDimensions = obj.dimensions
242
        node.NodeAttributes = attrs
243
        res = await self._get_server().add_nodes([node])
244
        await self._add_refs(obj)
245
        res[0].StatusCode.check()
246
        return res[0].AddedNodeId
247
248
    def _get_ext_class(self, name: str):
249
        if hasattr(ua, name):
250
            return getattr(ua, name)
251
        elif name in self.aliases.keys():
252
            nodeid = self.aliases[name]
253
            class_type = ua.uatypes.get_extensionobject_class_type(nodeid)
254
            if class_type:
255
                return class_type
256
            else:
257
                raise Exception("Error no extension class registered ", name, nodeid)
258
        else:
259
            raise Exception("Error no alias found for extension class", name)
260
261
    def _make_ext_obj(self, obj):
262
        ext = self._get_ext_class(obj.objname)()
263
        for name, val in obj.body:
264
            if not isinstance(val, list):
265
                raise Exception("Error val should be a list, this is a python-asyncua bug", name, type(val), val)
266
            else:
267
                for attname, v in val:
268
                    self._set_attr(ext, attname, v)
269
        return ext
270
271
    def _get_val_type(self, obj, attname: str):
272
        for name, uatype in obj.ua_types:
273
            if name == attname:
274
                return uatype
275
        raise UaError(f"Attribute '{attname}' defined in xml is not found in object '{obj}'")
276
277
    def _set_attr(self, obj, attname: str, val):
278
        # tow possible values:
279
        # either we get value directly
280
        # or a dict if it s an object or a list
281
        if isinstance(val, str):
282
            pval = ua_type_to_python(val, self._get_val_type(obj, attname))
283
            setattr(obj, attname, pval)
284
        else:
285
            # so we have either an object or a list...
286
            obj2 = getattr(obj, attname)
287
            if isinstance(obj2, ua.NodeId):  # NodeId representation does not follow common rules!!
288
                for attname2, v2 in val:
289
                    if attname2 == "Identifier":
290
                        if hasattr(ua.ObjectIds, v2):
291
                            obj2 = ua.NodeId(getattr(ua.ObjectIds, v2))
292
                        else:
293
                            obj2 = ua.NodeId.from_string(v2)
294
                        setattr(obj, attname, self._migrate_ns(obj2))
295
                        break
296
            elif not hasattr(obj2, "ua_types"):
297
                # we probably have a list
298
                my_list = []
299
                for vtype, v2 in val:
300
                    my_list.append(ua_type_to_python(v2, vtype))
301
                setattr(obj, attname, my_list)
302
            else:
303
                for attname2, v2 in val:
304
                    self._set_attr(obj2, attname2, v2)
305
                setattr(obj, attname, obj2)
306
307
    def _add_variable_value(self, obj):
308
        """
309
        Returns the value for a Variable based on the objects value type.
310
        """
311
        _logger.debug("Setting value with type %s and value %s", obj.valuetype, obj.value)
312
        if obj.valuetype == 'ListOfExtensionObject':
313
            values = []
314
            for ext in obj.value:
315
                extobj = self._make_ext_obj(ext)
316
                values.append(extobj)
317
            return ua.Variant(values, ua.VariantType.ExtensionObject)
318
        elif obj.valuetype == 'ListOfGuid':
319
            return ua.Variant([
320
                uuid.UUID(guid) for guid in obj.value
321
            ], getattr(ua.VariantType, obj.valuetype[6:]))
322
        elif obj.valuetype.startswith("ListOf"):
323
            vtype = obj.valuetype[6:]
324
            if hasattr(ua.ua_binary.Primitives, vtype):
325
                return ua.Variant(obj.value, getattr(ua.VariantType, vtype))
326
            elif vtype == "LocalizedText":
327
                return ua.Variant([getattr(ua, vtype)(text=item["Text"], locale=item["Locale"]) for item in obj.value])
328
            else:
329
                return ua.Variant([getattr(ua, vtype)(v) for v in obj.value])
330
        elif obj.valuetype == 'ExtensionObject':
331
            extobj = self._make_ext_obj(obj.value)
332
            return ua.Variant(extobj, getattr(ua.VariantType, obj.valuetype))
333
        elif obj.valuetype == 'Guid':
334
            return ua.Variant(uuid.UUID(obj.value), getattr(ua.VariantType, obj.valuetype))
335
        elif obj.valuetype == 'LocalizedText':
336
            ltext = ua.LocalizedText()
337
            for name, val in obj.value:
338
                if name == "Text":
339
                    ltext.Text = val
340
                elif name == "Locale":
341
                    ltext.Locale = val
342
                else:
343
                    _logger.warning("While parsing localizedText value, unkown element: %s with val: %s", name, val)
344
            return ua.Variant(ltext, ua.VariantType.LocalizedText)
345
        elif obj.valuetype == 'NodeId':
346
            return ua.Variant(ua.NodeId.from_string(obj.value))
347
        else:
348
            return ua.Variant(obj.value, getattr(ua.VariantType, obj.valuetype))
349
350
    async def add_variable_type(self, obj):
351
        node = self._get_add_node_item(obj)
352
        attrs = ua.VariableTypeAttributes()
353
        if obj.desc:
354
            attrs.Description = ua.LocalizedText(obj.desc)
355
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
356
        attrs.DataType = obj.datatype
357
        if obj.value and len(obj.value) == 1:
358
            attrs.Value = obj.value[0]
359
        if obj.rank:
360
            attrs.ValueRank = obj.rank
361
        if obj.abstract:
362
            attrs.IsAbstract = obj.abstract
363
        if obj.dimensions:
364
            attrs.ArrayDimensions = obj.dimensions
365
        node.NodeAttributes = attrs
366
        res = await self._get_server().add_nodes([node])
367
        await self._add_refs(obj)
368
        res[0].StatusCode.check()
369
        return res[0].AddedNodeId
370
371
    async def add_method(self, obj):
372
        node = self._get_add_node_item(obj)
373
        attrs = ua.MethodAttributes()
374
        if obj.desc:
375
            attrs.Description = ua.LocalizedText(obj.desc)
376
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
377
        if obj.accesslevel:
378
            attrs.AccessLevel = obj.accesslevel
379
        if obj.useraccesslevel:
380
            attrs.UserAccessLevel = obj.useraccesslevel
381
        if obj.minsample:
382
            attrs.MinimumSamplingInterval = obj.minsample
383
        if obj.dimensions:
384
            attrs.ArrayDimensions = obj.dimensions
385
        node.NodeAttributes = attrs
386
        res = await self._get_server().add_nodes([node])
387
        await self._add_refs(obj)
388
        res[0].StatusCode.check()
389
        return res[0].AddedNodeId
390
391
    async def add_reference_type(self, obj):
392
        node = self._get_add_node_item(obj)
393
        attrs = ua.ReferenceTypeAttributes()
394
        if obj.desc:
395
            attrs.Description = ua.LocalizedText(obj.desc)
396
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
397
        if obj. inversename:
398
            attrs.InverseName = ua.LocalizedText(obj.inversename)
399
        if obj.abstract:
400
            attrs.IsAbstract = obj.abstract
401
        if obj.symmetric:
402
            attrs.Symmetric = obj.symmetric
403
        node.NodeAttributes = attrs
404
        res = await self._get_server().add_nodes([node])
405
        await self._add_refs(obj)
406
        res[0].StatusCode.check()
407
        return res[0].AddedNodeId
408
409
    async def add_datatype(self, obj):
410
        node = self._get_add_node_item(obj)
411
        attrs = ua.DataTypeAttributes()
412
        if obj.desc:
413
            attrs.Description = ua.LocalizedText(obj.desc)
414
        attrs.DisplayName = ua.LocalizedText(obj.displayname)
415
        if obj.abstract:
416
            attrs.IsAbstract = obj.abstract
417
        if not obj.definitions:
418
            pass
419
        else:
420
            if obj.parent == self.server.nodes.enum_data_type.nodeid:
421
                attrs.DataTypeDefinition = self._get_edef(node, obj)
422
            elif obj.parent == self.server.nodes.base_structure_type.nodeid:
423
                attrs.DataTypeDefinition = self._get_sdef(node, obj)
424
            else:
425
                parent_node = self.server.get_node(obj.parent)
426
                path = await parent_node.get_path()
427
                if self.server.nodes.base_structure_type in path:
428
                    attrs.DataTypeDefinition = self._get_sdef(node, obj)
429
                else:
430
                    _logger.warning("%s has datatypedefinition and path %s but we could not find out if this is a struct", obj, path)
431
        node.NodeAttributes = attrs
432
        res = await self._get_server().add_nodes([node])
433
        res[0].StatusCode.check()
434
        await self._add_refs(obj)
435
        return res[0].AddedNodeId
436
437
    async def _add_refs(self, obj):
438
        if not obj.refs:
439
            return
440
        refs = []
441
        for data in obj.refs:
442
            ref = ua.AddReferencesItem()
443
            ref.IsForward = data.forward
444
            ref.ReferenceTypeId = data.reftype
445
            ref.SourceNodeId = obj.nodeid
446
            ref.TargetNodeId = data.target
447
            refs.append(ref)
448
        await self._add_references(refs)
449
450
    def _get_edef(self, node, obj):
451
        if not obj.definitions:
452
            return None
453
        edef = ua.EnumDefinition()
454
        if obj.parent:
455
            edef.BaseDataType = obj.parent
456
        for field in obj.definitions:
457
            f = ua.EnumField()
458
            f.Name = field.name
459
            if field.dname:
460
                f.DisplayName = ua.LocalizedText(text=field.dname)
461
            else:
462
                f.DisplayName = ua.LocalizedText(text=field.name)
463
            f.Value = field.value
464
            f.Description = ua.LocalizedText(text=field.desc)
465
            edef.Fields.append(f)
466
        return edef
467
468
    def _get_sdef(self, node, obj):
469
        if not obj.definitions:
470
            return None
471
        sdef = ua.StructureDefinition()
472
        if obj.parent:
473
            sdef.BaseDataType = obj.parent
474
        for data in obj.refs:
475
            if data.reftype == self.server.nodes.HasEncoding.nodeid:
476
                # looks likebinary encodingisthe firt one...can someone confirm?
477
                sdef.DefaultEncodingId = data.target
478
                break
479
        optional = False
480
        for field in obj.definitions:
481
            f = ua.StructureField()
482
            f.Name = field.name
483
            f.DataType = field.datatype
484
            f.ValueRank = field.valuerank
485
            f.IsOptional = field.optional
486
            if f.IsOptional:
487
                optional = True
488
            f.ArrayDimensions = field.arraydim
489
            f.Description = ua.LocalizedText(text=field.desc)
490
            sdef.Fields.append(f)
491
        if optional:
492
            sdef.StructureType = ua.StructureType.StructureWithOptionalFields
493
        else:
494
            sdef.StructureType = ua.StructureType.Structure
495
        return sdef
496
497
    def _sort_nodes_by_parentid(self, ndatas):
498
        """
499
        Sort the list of nodes according their parent node in order to respect
500
        the dependency between nodes.
501
502
        :param nodes: list of NodeDataObjects
503
        :returns: list of sorted nodes
504
        """
505
506
        sorted_ndatas = []
507
        sorted_nodes_ids = []
508
        all_node_ids = [data.nodeid for data in ndatas]
509
        while ndatas:
510
            for ndata in ndatas[:]:
511
                if ndata.nodeid.NamespaceIndex not in self.namespaces.values() or \
512
                        ndata.parent is None or \
513
                        ndata.parent not in all_node_ids:
514
                    sorted_ndatas.append(ndata)
515
                    sorted_nodes_ids.append(ndata.nodeid)
516
                    ndatas.remove(ndata)
517
                else:
518
                    # Check if the nodes parent is already in the list of
519
                    # inserted nodes
520
                    if ndata.parent in sorted_nodes_ids:
521
                        sorted_ndatas.append(ndata)
522
                        sorted_nodes_ids.append(ndata.nodeid)
523
                        ndatas.remove(ndata)
524
        return sorted_ndatas
525