Completed
Pull Request — master (#367)
by Olivier
03:50
created

instantiate()   B

Complexity

Conditions 5

Size

Total Lines 19

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 10
CRAP Score 5.0187

Importance

Changes 2
Bugs 0 Features 0
Metric Value
cc 5
c 2
b 0
f 0
dl 0
loc 19
ccs 10
cts 11
cp 0.9091
crap 5.0187
rs 8.5454
1
"""
2
Instantiate a new node and its child nodes from a node type.
3
"""
4
5
6 1
from opcua import Node
7 1
from opcua import ua
8 1
from opcua.common import ua_utils
9 1
from opcua.common.copy_node import _rdesc_from_node, _read_and_copy_attrs
10
11
12 1
def instantiate(parent, node_type, nodeid=None, bname=None, idx=0):
13
    """
14
    instantiate a node type under a parent node.
15
    nodeid and browse name of new node can be specified, or just namespace index
16
    If they exists children of the node type, such as components, variables and
17
    properties are also instantiated
18
    """
19 1
    rdesc = _rdesc_from_node(parent, node_type)
20 1
    rdesc.TypeDefinition = node_type.nodeid
21
22 1
    if nodeid is None:
23 1
        nodeid = ua.NodeId(namespaceidx=idx)  # will trigger automatic node generation in namespace idx
24 1
    if bname is None:
25
        bname = rdesc.BrowseName
26 1
    elif isinstance(bname, str):
27 1
        bname = ua.QualifiedName.from_string(bname)
28
29 1
    nodeids = _instantiate_node(parent.server, parent.nodeid, rdesc, nodeid, bname)
30 1
    return [Node(parent.server, nid) for nid in nodeids]
31
32
33 1
def _instantiate_node(server, parentid, rdesc, nodeid, bname, recursive=True):
34
    """
35
    instantiate a node type under parent
36
    """
37 1
    node_type = Node(server, rdesc.NodeId)
38 1
    refs = node_type.get_referenced_nodes(refs=ua.ObjectIds.HasModellingRule)
39
40
    # skip optional elements
41 1
    if len(refs) == 1 and refs[0].nodeid == ua.NodeId(ua.ObjectIds.ModellingRule_Optional):
42
        return []
43
44 1
    addnode = ua.AddNodesItem()
45 1
    addnode.RequestedNewNodeId = nodeid
46 1
    addnode.BrowseName = bname
47 1
    addnode.ParentNodeId = parentid
48 1
    addnode.ReferenceTypeId = rdesc.ReferenceTypeId
49 1
    addnode.TypeDefinition = rdesc.TypeDefinition
50
51 1
    if rdesc.NodeClass in (ua.NodeClass.Object, ua.NodeClass.ObjectType):
52 1
        addnode.NodeClass = ua.NodeClass.Object
53 1
        _read_and_copy_attrs(node_type, ua.ObjectAttributes(), addnode)
54
55 1
    elif rdesc.NodeClass in (ua.NodeClass.Variable, ua.NodeClass.VariableType):
56 1
        addnode.NodeClass = ua.NodeClass.Variable
57 1
        _read_and_copy_attrs(node_type, ua.VariableAttributes(), addnode)
58
    elif rdesc.NodeClass in (ua.NodeClass.Method,):
59
        addnode.NodeClass = ua.NodeClass.Method
60
        _read_and_copy_attrs(node_type, ua.MethodAttributes(), addnode)
61
    else:
62
        print("Instantiate: Node class not supported: ", rdesc.NodeClass)
63
        return
64
65 1
    res = server.add_nodes([addnode])[0]
66 1
    added_nodes = [res.AddedNodeId]
67
68 1
    if recursive:
69 1
        parents = ua_utils.get_node_supertypes(node_type, includeitself=True)
70 1
        node = Node(server, res.AddedNodeId)
71 1
        for parent in parents:
72 1
            descs = parent.get_children_descriptions(includesubtypes=False)
73 1
            for c_rdesc in descs:
74
                # skip items that already exists, prefer the 'lowest' one in object hierarchy
75 1
                if not ua_utils.is_child_present(node, c_rdesc.BrowseName):
76
                    # if root node being instantiated has a String NodeId, create the children with a String NodeId
77 1
                    if res.AddedNodeId.NodeIdType is ua.NodeIdType.String:
78 1
                        inst_nodeid = res.AddedNodeId.Identifier + "." + c_rdesc.BrowseName.Name
79 1
                        nodeids = _instantiate_node(server, res.AddedNodeId, c_rdesc, nodeid=ua.NodeId(identifier=inst_nodeid, namespaceidx=res.AddedNodeId.NamespaceIndex), bname=c_rdesc.BrowseName)
80
                    else:
81 1
                        nodeids = _instantiate_node(server, res.AddedNodeId, c_rdesc, nodeid=ua.NodeId(namespaceidx=res.AddedNodeId.NamespaceIndex), bname=c_rdesc.BrowseName)
82 1
                    added_nodes.extend(nodeids)
83
84 1
    return added_nodes
85
86
87