Completed
Pull Request — master (#271)
by
unknown
04:20
created

_instantiate_node()   D

Complexity

Conditions 10

Size

Total Lines 51

Duplication

Lines 0
Ratio 0 %

Importance

Changes 3
Bugs 0 Features 0
Metric Value
cc 10
c 3
b 0
f 0
dl 0
loc 51
rs 4.2352

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like _instantiate_node() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
Instantiate a new node and its child nodes from a node type.
3
"""
4
5
6
from opcua import Node
7
from opcua import ua
8
from opcua.common.uaerrors import UaError
9
10
#TODO: Should this be move to a more general location that instantiate.py ?
11
def get_sub_types(server, node, includeitself=False, skipbase = True):
12
    """
13
    return get all subtype parents of node recursive
14
    :param server: used in case node is nodeid         
15
    :param node: can be a ua.Node or ua.NodeId
16
    :param includeitself: include also node to the list
17
    :param skipbase don't include the toplevel one
18
    :returns list of ua.Node, top parent first 
19
    """
20
    parents =[]    
21
    if isinstance(node, ua.NodeId): 
22
        node = Node(server, node)    
23
    if includeitself:
24
        parents.append(node)
25
    parents.extend(_get_sub_types(node))
26
    if skipbase and len(parents) > 1:
27
        parents = parents [:-1]
28
        
29
    return parents
30
31
def _get_sub_types(node):
32
    """
33
    recursive implementation of get_sub_types
34
    """
35
    basetypes = []
36
    parents = node.get_referenced_nodes(refs=ua.ObjectIds.HasSubtype, direction=ua.BrowseDirection.Inverse, includesubtypes=True)
37
    if len(parents) != 0:  
38
        #TODO: Is it possible to have multiple subtypes ? If so extended support for it
39
       basetypes.append(parents[0])  
40
       basetypes.extend( _get_sub_types(parents[0]) )
41
       
42
    return basetypes
43
    
44
def instantiate(parent, node_type, nodeid=None, bname=None, idx=0):
45
    """
46
    instantiate a node type under a parent node.
47
    nodeid and browse name of new node can be specified, or just namespace index
48
    If they exists children of the node type, such as components, variables and 
49
    properties are also instantiated
50
    """
51
52
    results = node_type.get_attributes([ua.AttributeIds.NodeClass, ua.AttributeIds.BrowseName, ua.AttributeIds.DisplayName])
53
    nclass, qname, dname = [res.Value.Value for res in results]
54
55
    rdesc = ua.ReferenceDescription()
56
    rdesc.NodeId = node_type.nodeid
57
    rdesc.BrowseName = qname
58
    rdesc.DisplayName = dname
59
    rdesc.NodeClass = nclass
60
    if parent.get_type_definition() == ua.NodeId(ua.ObjectIds.FolderType):
61
        rdesc.ReferenceTypeId = ua.NodeId(ua.ObjectIds.Organizes)
62
    else:
63
        rdesc.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasComponent)
64
    rdesc.TypeDefinition = node_type.nodeid
65
    if nodeid is None:
66
        nodeid = ua.NodeId(namespaceidx=idx)  # will trigger automatic node generation in namespace idx
67
    if bname is None:
68
        bname = rdesc.BrowseName
69
    elif isinstance(bname, str):
70
        bname = ua.QualifiedName.from_string(bname)    
71
72
    return _instantiate_node(parent.server, parent.nodeid, rdesc, nodeid, bname)
73
74
75
def _instantiate_node(server, parentid, rdesc, nodeid, bname, recursive=True):
76
    """
77
    instantiate a node type under parent
78
    """
79
    
80
    addnode = ua.AddNodesItem()
81
    addnode.RequestedNewNodeId = nodeid
82
    addnode.BrowseName = bname
83
    addnode.ParentNodeId = parentid
84
    addnode.ReferenceTypeId = rdesc.ReferenceTypeId
85
    addnode.TypeDefinition = rdesc.TypeDefinition
86
87
    node_type = Node(server, rdesc.NodeId)
88
    
89
    refs = node_type.get_referenced_nodes(refs=ua.ObjectIds.HasModellingRule)
90
    # skip optional elements
91
    if not(len(refs) == 1 and refs[0].nodeid == ua.NodeId(ua.ObjectIds.ModellingRule_Optional) ):
92
        
93
        if rdesc.NodeClass in (ua.NodeClass.Object, ua.NodeClass.ObjectType):
94
            addnode.NodeClass = ua.NodeClass.Object
95
            _read_and_copy_attrs(node_type, ua.ObjectAttributes(), addnode)
96
97
        elif rdesc.NodeClass in (ua.NodeClass.Variable, ua.NodeClass.VariableType):
98
            addnode.NodeClass = ua.NodeClass.Variable
99
            _read_and_copy_attrs(node_type, ua.VariableAttributes(), addnode)            
100
        elif rdesc.NodeClass in (ua.NodeClass.Method,):
101
            addnode.NodeClass = ua.NodeClass.Method
102
            _read_and_copy_attrs(node_type, ua.MethodAttributes(), addnode)
103
        else:
104
            print("Instantiate: Node class not supported: ", rdesc.NodeClass)
105
            return
106
    
107
        res = server.add_nodes([addnode])[0]
108
        
109
        if recursive:
110
            parents = get_sub_types( server,node_type, includeitself = True)
111
            for parent in parents:
112
                descs = parent.get_children_descriptions(includesubtypes=False)
113
                for c_rdesc in descs:
114
                    #TODO: smells, is there a better way to test if a browse name is already present ?                    
115
                    # skip items that already exists, prefer the 'lowest' one in object hierarchy
116
                    node = Node(server, res.AddedNodeId)
117
                    try:
118
                        node.get_child(c_rdesc.BrowseName) 
119
                    except UaError as e:
120
                        _instantiate_node(server, res.AddedNodeId, c_rdesc, nodeid=ua.NodeId(namespaceidx=res.AddedNodeId.NamespaceIndex), bname=c_rdesc.BrowseName)
121
                    
122
        return Node(server, res.AddedNodeId)
123
    
124
    else:
125
        return None
126
127
128
def _read_and_copy_attrs(node_type, struct, addnode):
129
    names = [name for name in struct.__dict__.keys() if not name.startswith("_") and name not in ("BodyLength", "TypeId", "SpecifiedAttributes", "Encoding", "IsAbstract", "EventNotifier")]
130
    attrs = [getattr(ua.AttributeIds, name) for name in names]            
131
    for name in names:
132
        results = node_type.get_attributes(attrs)
133
    for idx, name in enumerate(names):
134
        if results[idx].StatusCode.is_good():
135
            if name == "Value":
136
                setattr(struct, name, results[idx].Value)
137
            else:
138
                setattr(struct, name, results[idx].Value.Value)
139
        else:
140
            print("Instantiate: while copying attributes from node type %s, attribute %s, statuscode is %s" % (node_type, name, results[idx].StatusCode))            
141
    addnode.NodeAttributes = struct
142