Completed
Pull Request — master (#509)
by
unknown
04:56
created

_copy_node()   B

Complexity

Conditions 3

Size

Total Lines 25

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 17
CRAP Score 3.0105

Importance

Changes 0
Metric Value
cc 3
c 0
b 0
f 0
dl 0
loc 25
ccs 17
cts 19
cp 0.8947
crap 3.0105
rs 8.8571
1 1
import logging
2
3 1
from opcua import ua
4 1
from opcua.common.node import Node
5
6
7 1
logger = logging.getLogger(__name__)
8
9
10 1
def copy_node(parent, node, nodeid=None, recursive=True):
11
    """
12
    Copy a node or node tree as child of parent node
13
    """
14 1
    rdesc = _rdesc_from_node(parent, node)
15
16 1
    if nodeid is None:
17 1
        nodeid = ua.NodeId(namespaceidx=node.nodeid.NamespaceIndex)
18 1
    added_nodeids = _copy_node(parent.server, parent.nodeid, rdesc, nodeid, recursive)
19 1
    return [Node(parent.server, nid) for nid in added_nodeids]
20
21
22 1
def _copy_node(server, parent_nodeid, rdesc, nodeid, recursive):
23 1
    addnode = ua.AddNodesItem()
24 1
    addnode.RequestedNewNodeId = nodeid
25 1
    addnode.BrowseName = rdesc.BrowseName
26 1
    addnode.ParentNodeId = parent_nodeid
27 1
    addnode.ReferenceTypeId = rdesc.ReferenceTypeId
28 1
    addnode.TypeDefinition = rdesc.TypeDefinition
29 1
    addnode.NodeClass = rdesc.NodeClass
30
31 1
    node_to_copy = Node(server, rdesc.NodeId)
32
    
33 1
    attrObj = getattr(ua, rdesc.NodeClass.name + "Attributes")
34 1
    _read_and_copy_attrs(node_to_copy, attrObj(), addnode)
35
    
36 1
    res = server.add_nodes([addnode])[0]
37
38 1
    added_nodes = [res.AddedNodeId]
39
        
40 1
    if recursive:
41 1
        descs = node_to_copy.get_children_descriptions()
42 1
        for desc in descs:
43
            nodes = _copy_node(server, res.AddedNodeId, desc, nodeid=ua.NodeId(namespaceidx=desc.NodeId.NamespaceIndex), recursive=True)
44
            added_nodes.extend(nodes)
45
46 1
    return added_nodes
47
48
49 1
def _rdesc_from_node(parent, node):
50 1
    results = node.get_attributes([ua.AttributeIds.NodeClass, ua.AttributeIds.BrowseName, ua.AttributeIds.DisplayName])
51 1
    nclass, qname, dname = [res.Value.Value for res in results]
52
53 1
    rdesc = ua.ReferenceDescription()
54 1
    rdesc.NodeId = node.nodeid
55 1
    rdesc.BrowseName = qname
56 1
    rdesc.DisplayName = dname
57 1
    rdesc.NodeClass = nclass
58 1
    if parent.get_type_definition() == ua.NodeId(ua.ObjectIds.FolderType):
59 1
        rdesc.ReferenceTypeId = ua.NodeId(ua.ObjectIds.Organizes)
60
    else:
61
        rdesc.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasComponent)
62 1
    typedef = node.get_type_definition()
63 1
    if typedef:
64
        rdesc.TypeDefinition = typedef
65 1
    return rdesc
66
67
68 1
def _read_and_copy_attrs(node_type, struct, addnode):
69 1
    names = [name for name in struct.__dict__.keys() if not name.startswith("_") and name not in ("BodyLength", "TypeId", "SpecifiedAttributes", "Encoding", "IsAbstract", "EventNotifier")]
70 1
    attrs = [getattr(ua.AttributeIds, name) for name in names]            
71 1
    for name in names:
72 1
        results = node_type.get_attributes(attrs)
73 1
    for idx, name in enumerate(names):
74 1
        if results[idx].StatusCode.is_good():
75 1
            if name == "Value":
76
                setattr(struct, name, results[idx].Value)
77
            else:
78 1
                setattr(struct, name, results[idx].Value.Value)
79
        else:
80
            logger.warning("Instantiate: while copying attributes from node type {0!s}, attribute {1!s}, statuscode is {2!s}".format(node_type, name, results[idx].StatusCode))            
81
    addnode.NodeAttributes = struct
82