Completed
Pull Request — master (#244)
by Olivier
04:06
created

_instantiate_node()   B

Complexity

Conditions 5

Size

Total Lines 32

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 5
c 1
b 0
f 0
dl 0
loc 32
rs 8.0894
1
"""
2
Instantiate a new node and its child nodes from a node type.
3
"""
4
5
6
from opcua import Node
7
from opcua import ua
8
9
10
def instantiate(parent, node_type, nodeid=None, bname=None, idx=0):
11
    """
12
    instantiate a node type under a parent node.
13
    nodeid and browse name of new node can be specified, or just namespace index
14
    If they exists children of the node type, such as components, variables and 
15
    properties are also instantiated
16
    """
17
18
    results = node_type.get_attributes([ua.AttributeIds.NodeClass, ua.AttributeIds.BrowseName, ua.AttributeIds.DisplayName])
19
    nclass, qname, dname = [res.Value.Value for res in results]
20
21
    rdesc = ua.ReferenceDescription()
22
    rdesc.NodeId = node_type.nodeid
23
    rdesc.BrowseName = qname
24
    rdesc.DisplayName = dname
25
    rdesc.NodeClass = nclass
26
    if parent.get_type_definition() == ua.NodeId(ua.ObjectIds.FolderType):
27
        rdesc.ReferenceTypeId = ua.NodeId(ua.ObjectIds.Organizes)
28
    else:
29
        rdesc.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasComponent)
30
    rdesc.TypeDefinition = node_type.nodeid
31
    if nodeid is None:
32
        nodeid = ua.NodeId(namespaceidx=idx)  # will trigger automatic node generation in namespace idx
33
    if bname is None:
34
        bname = rdesc.BrowseName
35
    elif isinstance(bname, str):
36
        bname = ua.QualifiedName.from_string(bname)
37
38
    return _instantiate_node(parent.server, parent.nodeid, rdesc, nodeid, bname)
39
40
41
def _instantiate_node(server, parentid, rdesc, nodeid, bname, recursive=True):
42
    """
43
    instantiate a node type under parent
44
    """
45
46
    addnode = ua.AddNodesItem()
47
    addnode.RequestedNewNodeId = nodeid 
48
    addnode.BrowseName = bname 
49
    addnode.ParentNodeId = parentid
50
    addnode.ReferenceTypeId = rdesc.ReferenceTypeId
51
    addnode.TypeDefinition = rdesc.TypeDefinition
52
53
    node_type = Node(server, rdesc.NodeId)
54
55
    if rdesc.NodeClass in (ua.NodeClass.Object, ua.NodeClass.ObjectType):
56
        addnode.NodeClass = ua.NodeClass.Object
57
        _read_and_copy_attrs(node_type, ua.ObjectAttributes(), addnode)
58
59
    elif rdesc.NodeClass in (ua.NodeClass.Variable, ua.NodeClass.VariableType):
60
        addnode.NodeClass = ua.NodeClass.Variable
61
        _read_and_copy_attrs(node_type, ua.VariableAttributes(), addnode)
62
63
    else:
64
        print("Instantiate: Node class not supported: ", rdesc.NodeClass)
65
66
    res = server.add_nodes([addnode])[0]
67
68
    if recursive:
69
        descs = node_type.get_children_descriptions(includesubtypes=False)
70
        for c_rdesc in descs:
71
            _instantiate_node(server, res.AddedNodeId, c_rdesc, nodeid=ua.NodeId(namespaceidx=res.AddedNodeId.NamespaceIndex), bname=c_rdesc.BrowseName)
72
    return Node(server, res.AddedNodeId)
73
74
75
def _read_and_copy_attrs(node_type, struct, addnode):
76
    names = [name for name in struct.__dict__.keys() if not name.startswith("_") and name not in ("BodyLength", "TypeId", "SpecifiedAttributes", "Encoding", "IsAbstract", "EventNotifier")]
77
    attrs = [getattr(ua.AttributeIds, name) for name in names]
78
    for name in names:
79
        results = node_type.get_attributes(attrs)
80
    for idx, name in enumerate(names):
81
        if results[idx].StatusCode.is_good():
82
            if name == "Value":
83
                setattr(struct, name, results[idx].Value)
84
            else:
85
                setattr(struct, name, results[idx].Value.Value)
86
        else:
87
            print("Instantiate: while copying attributes from node type %s, attribute %s, statuscode is %s" % (node_type, name, results[idx].StatusCode))
88
    addnode.NodeAttributes = struct
89
90