Completed
Pull Request — master (#244)
by Olivier
04:07
created

instantiate()   B

Complexity

Conditions 5

Size

Total Lines 25

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 5
c 1
b 0
f 0
dl 0
loc 25
rs 8.0894
1
"""
2
instantiate a node from a node type.
3
Can also be used to duplicate a node tree
4
"""
5
6
7
from opcua import Node
8
from opcua import ua
9
10
11
def instantiate(parent, node_type, nodeid=None, bname=None, idx=0):
12
    """
13
    instantiate a node type under a parent node.
14
    nodeid and browse name of new node can be specified, or just namespace index
15
    """
16
17
    results = node_type.get_attributes([ua.AttributeIds.NodeClass, ua.AttributeIds.BrowseName, ua.AttributeIds.DisplayName])
18
    nclass, qname, dname = [res.Value.Value for res in results]
19
20
    rdesc = ua.ReferenceDescription()
21
    rdesc.NodeId = node_type.nodeid
22
    rdesc.BrowseName = qname
23
    rdesc.DisplayName = dname
24
    rdesc.NodeClass = nclass
25
    rdesc.ReferenceTypeId = ua.TwoByteNodeId(ua.ObjectIds.HasComponent)
26
    rdesc.TypeDefinition = node_type.nodeid
27
    print("MYRDESC", rdesc)
28
    if nodeid is None:
29
        nodeid = ua.NodeId(namespaceidx=idx)  # will trigger automatic node generation in namespace idx
30
    if bname is None:
31
        bname = rdesc.BrowseName
32
    elif isinstance(bname, str):
33
        bname = ua.QualifiedName.from_string(bname)
34
35
    return _instantiate_node(parent.server, parent.nodeid, rdesc, nodeid, bname)
36
37
38
def _instantiate_node(server, parentid, rdesc, nodeid, bname, recursive=True):
39
    """
40
    instantiate a node type under parent
41
    """
42
43
    print("\n\nInstanciating: node %s in %s" % (nodeid, parentid))
44
    print(rdesc)
45
    addnode = ua.AddNodesItem()
46
    addnode.RequestedNewNodeId = nodeid 
47
    addnode.BrowseName = bname 
48
    addnode.ParentNodeId = parentid
49
    addnode.ReferenceTypeId = rdesc.ReferenceTypeId
50
    addnode.TypeDefinition = rdesc.TypeDefinition
51
52
    node_type = Node(server, rdesc.NodeId)
53
54
    if rdesc.NodeClass in (ua.NodeClass.Object, ua.NodeClass.ObjectType):
55
        addnode.NodeClass = ua.NodeClass.Object
56
        _read_and_copy_attrs(node_type, ua.ObjectAttributes(), addnode)
57
58
    elif rdesc.NodeClass in (ua.NodeClass.Variable, ua.NodeClass.VariableType):
59
        addnode.NodeClass = ua.NodeClass.Variable
60
        _read_and_copy_attrs(node_type, ua.VariableAttributes(), addnode)
61
62
    else:
63
        print("Node class not supported: ", rdesc.NodeClass)
64
65
    res = server.add_nodes([addnode])[0]
66
67
    if recursive:
68
        descs = node_type.get_children_descriptions(includesubtypes=False)
69
        for c_rdesc in descs:
70
            print("       Instanciating children", c_rdesc)
71
            _instantiate_node(server, res.AddedNodeId, c_rdesc, nodeid=ua.NodeId(namespaceidx=res.AddedNodeId.NamespaceIndex), bname=c_rdesc.BrowseName)
72
    return Node(server, addnode.RequestedNewNodeId)
73
74
75
def _read_and_copy_attrs(node_type, struct, addnode):
76
    names = [name for name in struct.__dict__.keys() if not name.startswith("_") and name not in ("BodyLength", "TypeId", "SpecifiedAttributes", "Encoding", "IsAbstract")]
77
    attrs = [getattr(ua.AttributeIds, name) for name in names]
78
    for name in names:
79
        results = node_type.get_attributes(attrs)
80
    for idx, name in enumerate(names):
81
        if results[idx].StatusCode.is_good():
82
            if name == "Value":
83
                setattr(struct, name, results[idx].Value)
84
            else:
85
                setattr(struct, name, results[idx].Value.Value)
86
        else:
87
            print("!!!!!!!!!!!!Error, for nodeid %s, attribute %s, statuscode is %s" % (node_type, name, results[idx].StatusCode))
88
    addnode.NodeAttributes = struct
89
90