Completed
Pull Request — master (#133)
by Denis
02:23
created

opcua.common._add_variable_attrs()   F

Complexity

Conditions 12

Size

Total Lines 42

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 140.0001
Metric Value
cc 12
dl 0
loc 42
ccs 1
cts 26
cp 0.0385
crap 140.0001
rs 2.7855

How to fix   Complexity   

Complexity

Complex classes like opcua.common._add_variable_attrs() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
Instanciate a node from a node type.
3
Can also be used to duplicate a node tree
4
"""
5
6
7 1
from opcua import Node
8 1
from opcua import ua
9
10
11 1
class _ReadAdder(object):
12
    """
13
    Internal
14
    """
15 1
    def __init__(self, server, nodeid):
16
        self.server = server
17
        self.nodeid = nodeid
18
        self.params = ua.ReadParameters()
19
        self._debug_attr = []
20
21 1
    def add(self, attr):
22
        rv = ua.ReadValueId()
23
        rv.NodeId = self.nodeid
24
        rv.AttributeId = attr
25
        self.params.NodesToRead.append(rv)
26
        self._debug_attr.append(attr)
27
28 1
    def read(self):
29
        vals = self.server.read(self.params)
30
        new_vals = []
31
        for idx, val in enumerate(vals):
32
            if not val.StatusCode.is_good():
33
                print(val)
34
                print("Error attribute %s is not valid for node %s" % ( self._debug_attr[idx], self.nodeid))
35
            #val.StatusCode.check()
36
            new_vals.append(val.Value.Value)
37
        return new_vals
38
39
40
#def _read_attributed(server, nodeid, attrs_obj, *attrs):
41
    #ra = _ReadAdder(server, rdesc.NodeId)
42
    #for attr in attrs:
43
        #ra.add(atttr)
44
    #vals = ra.read()
45
46 1
def instanciate_node(parent, node_type, idx):
47
    """
48
    Instanciate a new node under 'parent' using a type
49
    """
50
    
51
    results = node_type.get_attributes([ua.AttributeIds.NodeClass, ua.AttributeIds.BrowseName, ua.AttributeIds.DisplayName])
52
    nclass, bname, dname = [res.Value.Value for res in results]
53
54
    #descs = node_type.get_children_descriptions(refs=ua.ObjectIds.HasTypeDefinition)
55
    typedef = ua.FourByteNodeId(ua.ObjectIds.BaseObjectType)
56
    #if len(descs) > 1:
57
        #print("DESCS", descs)
58
        #typedef = descs[0].TypeDefinition
59
60
    rdesc = ua.ReferenceDescription()
61
    rdesc.NodeId = node_type.nodeid
62
    rdesc.BrowseName = bname
63
    rdesc.DisplayName = dname
64
    rdesc.NodeClass = nclass
65
    rdesc.ReferenceTypeId = ua.TwoByteNodeId(ua.ObjectIds.HasComponent)
66
    rdesc.TypeDefinition = typedef
67
    print("MYRDESC", rdesc)
68
69
    return _instanciate_node(parent.server, parent.nodeid, rdesc, idx)
70
71
72 1
def _instanciate_node(server, parentid, rdesc, idx):
73
    """
74
    Instanciate a new node under 'parent' using a type
75
    """
76
77
    print("Instanciating: node %s in %s" % (rdesc, parentid))
78
    addnode = ua.AddNodesItem()
79
    addnode.RequestedNewNodeId = ua.generate_nodeid(idx)
80
    addnode.BrowseName = rdesc.BrowseName
81
    addnode.NodeClass = rdesc.NodeClass
82
    addnode.ParentNodeId = parentid
83
    addnode.ReferenceTypeId = ua.TwoByteNodeId(ua.ObjectIds.HasComponent)
84
    addnode.TypeDefinition = rdesc.TypeDefinition
85
    print("ADDNODE", addnode)
86
87
    node_type = Node(server, rdesc.NodeId)
88
89
    if rdesc.NodeClass in (ua.NodeClass.Object, ua.NodeClass.ObjectType):
90
        print(node_type, " is object")
91
        _read_and_copy_attrs(node_type, ua.ObjectAttributes(), addnode)
92
        #_add_object_attrs(addnode, rdesc, node_type)
93
94
    elif rdesc.NodeClass in (ua.NodeClass.Variable, ua.NodeClass.VariableType):
95
        print(node_type, " is variable")
96
        _read_and_copy_attrs(node_type, ua.VariableAttributes(), addnode)
97
        #_add_variable_attrs(addnode, rdesc, node_type)
98
99
    else:
100
        print("Node class not supported: ", rdesc.NodeClass)
101
102
    print("ADDNODE FINAL ", addnode)
103
    server.add_nodes([addnode])
104
105
    refs = []
106
    ref = ua.AddReferencesItem()
107
    ref.IsForward = True
108
    ref.ReferenceTypeId = addnode.ReferenceTypeId
109
    ref.SourceNodeId = parentid
110
    ref.TargetNodeClass = addnode.NodeClass
111
    ref.TargetNodeId = addnode.RequestedNewNodeId
112
113
    refs.append(ref)
114
    server.add_references(refs)
115
116
    descs = node_type.get_children_descriptions(includesubtypes=False) #FIXME: should be false
117
    print("node is", rdesc.NodeId, node_type, node_type.get_children())
118
    print("Children are: ", descs)
119
    for rdesc in descs:
120
        _instanciate_node(server, addnode.RequestedNewNodeId, rdesc, idx)
121
    return Node(server, addnode.RequestedNewNodeId)
122
123
124 1
def _add_object_attrs(addnode, node_type):
125
    results = node_type.get_attributes([
126
        ua.AttributeIds.BrowseName,
127
        ua.AttributeIds.Description,
128
        ua.AttributeIds.WriteMask,
129
        ua.AttributeIds.UserWriteMask,
130
        ua.AttributeIds.EventNotifier])
131
132
    attrs = ua.ObjectAttributes()
133
    _set_attr(attrs.DisplayName, results, 0)
134
    _set_attr(attrs.Description, results, 0)
135
    attrs.DisplayName = rdesc.BrowseName
136
    attrs.Description = rdesc.Description
137
    if results[0].StatusCode.is_good():
138
        attrs.Description = results[0].Value.Value
139
    if results[1].StatusCode.is_good():
140
        attrs.WriteMask = results[1].Value.Value
141
    if results[2].StatusCode.is_good():
142
        attrs.UserWriteMask = results[2].Value.Value        
143
    if results[3].StatusCode.is_good():
144
        attrs.UserWriteMask = results[3].Value.Value        
145
146
    addnode.NodeAttributes = attrs
147
148
149 1
def _read_and_copy_attrs(node_type, struct, addnode):
150
    names = [name for name in struct.__dict__.keys() if not name.startswith("_") and name not in ("BodyLength", "TypeId", "SpecifiedAttributes", "Encoding")]
151
    attrs = [getattr(ua.AttributeIds, name) for name in names]
152
    print("Names are ", names)
153
    for name in names:
154
        results = node_type.get_attributes(attrs)
155
    for idx, name in enumerate(names):
156
        if results[idx].StatusCode.is_good():
157
            if name == "Value":
158
                setattr(struct, name, results[idx].Value)
159
            else:
160
                setattr(struct, name, results[idx].Value.Value)
161
        else:
162
            print("!!!!!!!!!!!!Error, for nodeid %s, attribute %s, statuscode is %s" % (node_type, name, results[idx].StatusCode))
163
    print("struct is ", struct)
164
    addnode.NodeAttributes = struct
165
166
167 1
def _add_variable_attrs(addnode, rdesc, node_type):
168
    results = node_type.get_attributes([
169
                ua.AttributeIds.EventNotifier,
170
                ua.AttributeIds.Description,
171
                ua.AttributeIds.WriteMask,
172
                ua.AttributeIds.UserWriteMask,
173
                ua.AttributeIds.Value,
174
                ua.AttributeIds.DataType,
175
                ua.AttributeIds.ValueRank,
176
                ua.AttributeIds.ArrayDimentions,
177
                ua.AttributeIds.AccessLevel,
178
                ua.AttributeIds.UserAccessLevel,
179
                ua.AttributeIds.MinimumSamplingInterval,
180
                ua.AttributeIds.Historizing])
181
182
    attrs = ua.ObjectAttributes()
183
    if results[0].is_good():
184
        attrs.DisplayName = results[0].Value.Value
185
    if results[1].is_good():
186
        attrs.Description = results[1].Value.Value
187
    if results[2].is_good():
188
        attrs.WriteMask = results[2].Value.Value
189
    if results[3].is_good():
190
        attrs.UserWriteMask = results[3].Value.Value
191
    #if results[4].is_good():
192
        #attrs.Value = results[4].Value.Value
193
    if results[5].is_good():
194
        attrs.DataType = results[5].Value.Value
195
    if results[6].is_good():
196
        attrs.ValueRank = results[6].Value.Value
197
    if results[7].is_good():
198
        attrs.ArrayDimensions = results[7].Value.Value
199
    if results[8].is_good():
200
        attrs.AccessLevel = results[8].Value.Value
201
    if results[9].is_good():
202
        attrs.UserAccessLevel = results[9].Value.Value
203
    if results[10].is_good():
204
        attrs.MinimumSamplingInterval = results[10].Value.Value
205
    if results[11].is_good():
206
        attrs.Historizing = results[11].Value.Value
207
208
    addnode.NodeAttributes = attrs
209
210 1
def _set_attr(container, result, idx):
211
    if result.is_good():
212
        container = result.Value.Value
213
214
215