Completed
Pull Request — master (#283)
by Olivier
04:14
created

_copy_node()   B

Complexity

Conditions 3

Size

Total Lines 27

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 3
c 1
b 0
f 0
dl 0
loc 27
rs 8.8571
1
from opcua import ua
2
from opcua import Node
3
4
5
def copy_node(parent, node, nodeid=None, idx=0, recursive=True):
6
    """
7
    Copy a node or node tree as child of parent node
8
    """
9
    print("Copying", node, "into ", parent, "in idx", idx)
10
    rdesc = _rdesc_from_node(parent, node)
11
12
    if nodeid is None:
13
        nodeid = ua.NodeId(namespaceidx=idx)  # will trigger automatic node generation in namespace idx
14
    added_nodeids = _copy_node(parent.server, parent.nodeid, rdesc, nodeid, recursive)
15
    return [Node(parent.server, nid) for nid in added_nodeids]
16
17
18
def _copy_node(server, parent_nodeid, rdesc, nodeid, recursive):
19
    addnode = ua.AddNodesItem()
20
    addnode.RequestedNewNodeId = nodeid
21
    addnode.BrowseName = rdesc.BrowseName
22
    addnode.ParentNodeId = parent_nodeid
23
    addnode.ReferenceTypeId = rdesc.ReferenceTypeId
24
    addnode.TypeDefinition = rdesc.TypeDefinition
25
    addnode.NodeClass = rdesc.NodeClass
26
27
    node_type = Node(server, rdesc.NodeId)
28
    
29
    attrObj = getattr(ua, rdesc.NodeClass.name + "Attributes")
30
    print("attrObj" is attrObj)
31
    _read_and_copy_attrs(node_type, attrObj(), addnode)
32
    
33
    res = server.add_nodes([addnode])[0]
34
    print("Added result", res)
35
36
    added_nodes = [res.AddedNodeId]
37
        
38
    if recursive:
39
        descs = node_type.get_children_descriptions()
40
        for desc in descs:
41
            nodes = _copy_node(server, res.AddedNodeId, desc, nodeid=ua.NodeId(namespaceidx=res.AddedNodeId.NamespaceIndex), recursive=recursive)
42
            added_nodes.extend(nodes)
43
44
    return added_nodes
45
46
47
def _rdesc_from_node(parent, node):
48
    results = node.get_attributes([ua.AttributeIds.NodeClass, ua.AttributeIds.BrowseName, ua.AttributeIds.DisplayName])
49
    nclass, qname, dname = [res.Value.Value for res in results]
50
51
    rdesc = ua.ReferenceDescription()
52
    rdesc.NodeId = node.nodeid
53
    rdesc.BrowseName = qname
54
    rdesc.DisplayName = dname
55
    rdesc.NodeClass = nclass
56
    if parent.get_type_definition() == ua.NodeId(ua.ObjectIds.FolderType):
57
        rdesc.ReferenceTypeId = ua.NodeId(ua.ObjectIds.Organizes)
58
    else:
59
        rdesc.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasComponent)
60
    rdesc.TypeDefinition = node.nodeid
61
    return rdesc
62
63
64
65
def _read_and_copy_attrs(node_type, struct, addnode):
66
    names = [name for name in struct.__dict__.keys() if not name.startswith("_") and name not in ("BodyLength", "TypeId", "SpecifiedAttributes", "Encoding", "IsAbstract", "EventNotifier")]
67
    attrs = [getattr(ua.AttributeIds, name) for name in names]            
68
    for name in names:
69
        results = node_type.get_attributes(attrs)
70
    for idx, name in enumerate(names):
71
        if results[idx].StatusCode.is_good():
72
            if name == "Value":
73
                setattr(struct, name, results[idx].Value)
74
            else:
75
                setattr(struct, name, results[idx].Value.Value)
76
        else:
77
            print("Instantiate: while copying attributes from node type %s, attribute %s, statuscode is %s" % (node_type, name, results[idx].StatusCode))            
78
    addnode.NodeAttributes = struct
79