asyncua.common.instantiate_util.instantiate()   A
last analyzed

Complexity

Conditions 4

Size

Total Lines 27
Code Lines 19

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 4
eloc 19
nop 7
dl 0
loc 27
rs 9.45
c 0
b 0
f 0
1
"""
2
Instantiate a new node and its child nodes from a node type.
3
"""
4
5
import logging
6
7
from asyncua import ua
8
from .ua_utils import get_node_supertypes, is_child_present
9
from .copy_node_util import _rdesc_from_node, _read_and_copy_attrs
10
from .node_factory import make_node
11
12
logger = logging.getLogger(__name__)
13
14
15
async def instantiate(parent, node_type, nodeid=None, bname=None, dname=None, idx=0, instantiate_optional=True):
16
    """
17
    instantiate a node type under a parent node.
18
    nodeid and browse name of new node can be specified, or just namespace index
19
    If they exists children of the node type, such as components, variables and
20
    properties are also instantiated
21
    """
22
    rdesc = await _rdesc_from_node(parent, node_type)
23
    rdesc.TypeDefinition = node_type.nodeid
24
25
    if nodeid is None:
26
        nodeid = ua.NodeId(namespaceidx=idx)  # will trigger automatic node generation in namespace idx
27
    if bname is None:
28
        bname = rdesc.BrowseName
29
    elif isinstance(bname, str):
30
        bname = ua.QualifiedName.from_string(bname)
31
32
    nodeids = await _instantiate_node(
33
        parent.server,
34
        make_node(parent.server, rdesc.NodeId),
35
        parent.nodeid,
36
        rdesc,
37
        nodeid,
38
        bname,
39
        dname=dname,
40
        instantiate_optional=instantiate_optional)
41
    return [make_node(parent.server, nid) for nid in nodeids]
42
43
44
async def _instantiate_node(server,
45
                            node_type,
46
                            parentid,
47
                            rdesc,
48
                            nodeid,
49
                            bname,
50
                            dname=None,
51
                            recursive=True,
52
                            instantiate_optional=True):
53
    """
54
    instantiate a node type under parent
55
    """
56
    addnode = ua.AddNodesItem()
57
    addnode.RequestedNewNodeId = nodeid
58
    addnode.BrowseName = bname
59
    addnode.ParentNodeId = parentid
60
    addnode.ReferenceTypeId = rdesc.ReferenceTypeId
61
    addnode.TypeDefinition = rdesc.TypeDefinition
62
63
    if rdesc.NodeClass in (ua.NodeClass.Object, ua.NodeClass.ObjectType):
64
        addnode.NodeClass = ua.NodeClass.Object
65
        await _read_and_copy_attrs(node_type, ua.ObjectAttributes(), addnode)
66
67
    elif rdesc.NodeClass in (ua.NodeClass.Variable, ua.NodeClass.VariableType):
68
        addnode.NodeClass = ua.NodeClass.Variable
69
        await _read_and_copy_attrs(node_type, ua.VariableAttributes(), addnode)
70
    elif rdesc.NodeClass in (ua.NodeClass.Method,):
71
        addnode.NodeClass = ua.NodeClass.Method
72
        await _read_and_copy_attrs(node_type, ua.MethodAttributes(), addnode)
73
    elif rdesc.NodeClass in (ua.NodeClass.DataType,):
74
        addnode.NodeClass = ua.NodeClass.DataType
75
        await _read_and_copy_attrs(node_type, ua.DataTypeAttributes(), addnode)
76
    else:
77
        logger.error("Instantiate: Node class not supported: %s", rdesc.NodeClass)
78
        raise RuntimeError("Instantiate: Node class not supported")
79
    if dname is not None:
80
        addnode.NodeAttributes.DisplayName = dname
81
82
    res = (await server.add_nodes([addnode]))[0]
83
    added_nodes = [res.AddedNodeId]
84
85
    if recursive:
86
        parents = await get_node_supertypes(node_type, includeitself=True)
87
        node = make_node(server, res.AddedNodeId)
88
        for parent in parents:
89
            descs = await parent.get_children_descriptions(includesubtypes=False)
90
            for c_rdesc in descs:
91
                # skip items that already exists, prefer the 'lowest' one in object hierarchy
92
                if not await is_child_present(node, c_rdesc.BrowseName):
93
                    c_node_type = make_node(server, c_rdesc.NodeId)
94
                    refs = await c_node_type.get_referenced_nodes(refs=ua.ObjectIds.HasModellingRule)
95
                    if not refs:
96
                        # spec says to ignore nodes without modelling rules
97
                        logger.info("Instantiate: Skip node without modelling rule %s as part of %s",
98
                                    c_rdesc.BrowseName, addnode.BrowseName)
99
                        continue
100
                        # exclude nodes with optional ModellingRule if requested
101
                    if refs[0].nodeid in (ua.NodeId(ua.ObjectIds.ModellingRule_Optional), ua.NodeId(ua.ObjectIds.ModellingRule_OptionalPlaceholder)):
102
                        # instatiate optionals
103
                        if not instantiate_optional:
104
                            logger.info("Instantiate: Skip optional node %s as part of %s", c_rdesc.BrowseName,
105
                                addnode.BrowseName)
106
                            continue
107
                    # if root node being instantiated has a String NodeId, create the children with a String NodeId
108
                    if res.AddedNodeId.NodeIdType is ua.NodeIdType.String:
109
                        inst_nodeid = res.AddedNodeId.Identifier + "." + c_rdesc.BrowseName.Name
110
                        nodeids = await _instantiate_node(
111
                            server,
112
                            c_node_type,
113
                            res.AddedNodeId,
114
                            c_rdesc,
115
                            nodeid=ua.NodeId(identifier=inst_nodeid, namespaceidx=res.AddedNodeId.NamespaceIndex),
116
                            bname=c_rdesc.BrowseName,
117
                            instantiate_optional=instantiate_optional
118
                        )
119
                    else:
120
                        nodeids = await _instantiate_node(
121
                            server,
122
                            c_node_type,
123
                            res.AddedNodeId,
124
                            c_rdesc,
125
                            nodeid=ua.NodeId(namespaceidx=res.AddedNodeId.NamespaceIndex),
126
                            bname=c_rdesc.BrowseName,
127
                            instantiate_optional=instantiate_optional
128
                        )
129
                    added_nodes.extend(nodeids)
130
    return added_nodes
131