Completed
Push — master ( 6a68a0...bac760 )
by Olivier
02:36
created

_add_variable_attrs()   F

Complexity

Conditions 12

Size

Total Lines 42

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 140.0001
Metric Value
cc 12
dl 0
loc 42
rs 2.7855
ccs 1
cts 26
cp 0.0385
crap 140.0001

How to fix   Complexity   

Complexity

Complex classes like _add_variable_attrs() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
Instanciate a node from a node type.
3
Can also be used to duplicate a node tree
4
"""
5
6
7 1
from opcua import Node
8 1
from opcua import ua
9
10
11 1
class _ReadAdder(object):
12
    """
13
    Internal
14
    """
15
16 1
    def __init__(self, server, nodeid):
17
        self.server = server
18
        self.nodeid = nodeid
19
        self.params = ua.ReadParameters()
20
        self._debug_attr = []
21
22 1
    def add(self, attr):
23
        rv = ua.ReadValueId()
24
        rv.NodeId = self.nodeid
25
        rv.AttributeId = attr
26
        self.params.NodesToRead.append(rv)
27
        self._debug_attr.append(attr)
28
29 1
    def read(self):
30
        vals = self.server.read(self.params)
31
        new_vals = []
32
        for idx, val in enumerate(vals):
33
            if not val.StatusCode.is_good():
34
                print(val)
35
                print("Error attribute %s is not valid for node %s" % (self._debug_attr[idx], self.nodeid))
36
            # val.StatusCode.check()
37
            new_vals.append(val.Value.Value)
38
        return new_vals
39
40
41
# def _read_attributed(server, nodeid, attrs_obj, *attrs):
42
    #ra = _ReadAdder(server, rdesc.NodeId)
43
    # for attr in attrs:
44
        # ra.add(atttr)
45
    #vals = ra.read()
46
47 1
def instanciate_node(parent, node_type, idx):
48
    """
49
    Instanciate a new node under 'parent' using a type
50
    """
51
52
    results = node_type.get_attributes([ua.AttributeIds.NodeClass, ua.AttributeIds.BrowseName, ua.AttributeIds.DisplayName])
53
    nclass, bname, dname = [res.Value.Value for res in results]
54
55
    #descs = node_type.get_children_descriptions(refs=ua.ObjectIds.HasTypeDefinition)
56
    typedef = ua.FourByteNodeId(ua.ObjectIds.BaseObjectType)
57
    # if len(descs) > 1:
58
    #print("DESCS", descs)
59
    #typedef = descs[0].TypeDefinition
60
61
    rdesc = ua.ReferenceDescription()
62
    rdesc.NodeId = node_type.nodeid
63
    rdesc.BrowseName = bname
64
    rdesc.DisplayName = dname
65
    rdesc.NodeClass = nclass
66
    rdesc.ReferenceTypeId = ua.TwoByteNodeId(ua.ObjectIds.HasComponent)
67
    rdesc.TypeDefinition = typedef
68
    print("MYRDESC", rdesc)
69
70
    return _instanciate_node(parent.server, parent.nodeid, rdesc, idx)
71
72
73 1
def _instanciate_node(server, parentid, rdesc, idx):
74
    """
75
    Instanciate a new node under 'parent' using a type
76
    """
77
78
    print("Instanciating: node %s in %s" % (rdesc, parentid))
79
    addnode = ua.AddNodesItem()
80
    addnode.RequestedNewNodeId = ua.NodeId()
81
    addnode.BrowseName = rdesc.BrowseName
82
    addnode.NodeClass = rdesc.NodeClass
83
    addnode.ParentNodeId = parentid
84
    addnode.ReferenceTypeId = ua.TwoByteNodeId(ua.ObjectIds.HasComponent)
85
    addnode.TypeDefinition = rdesc.TypeDefinition
86
    print("ADDNODE", addnode)
87
88
    node_type = Node(server, rdesc.NodeId)
89
90
    if rdesc.NodeClass in (ua.NodeClass.Object, ua.NodeClass.ObjectType):
91
        print(node_type, " is object")
92
        _read_and_copy_attrs(node_type, ua.ObjectAttributes(), addnode)
93
        #_add_object_attrs(addnode, rdesc, node_type)
94
95
    elif rdesc.NodeClass in (ua.NodeClass.Variable, ua.NodeClass.VariableType):
96
        print(node_type, " is variable")
97
        _read_and_copy_attrs(node_type, ua.VariableAttributes(), addnode)
98
        #_add_variable_attrs(addnode, rdesc, node_type)
99
100
    else:
101
        print("Node class not supported: ", rdesc.NodeClass)
102
103
    print("ADDNODE FINAL ", addnode)
104
    server.add_nodes([addnode])
105
106
    refs = []
107
    ref = ua.AddReferencesItem()
108
    ref.IsForward = True
109
    ref.ReferenceTypeId = addnode.ReferenceTypeId
110
    ref.SourceNodeId = parentid
111
    ref.TargetNodeClass = addnode.NodeClass
112
    ref.TargetNodeId = addnode.RequestedNewNodeId
113
114
    refs.append(ref)
115
    server.add_references(refs)
116
117
    descs = node_type.get_children_descriptions(includesubtypes=False)  # FIXME: should be false
118
    print("node is", rdesc.NodeId, node_type, node_type.get_children())
119
    print("Children are: ", descs)
120
    for rdesc in descs:
121
        _instanciate_node(server, addnode.RequestedNewNodeId, rdesc, idx)
122
    return Node(server, addnode.RequestedNewNodeId)
123
124
125 1
def _add_object_attrs(addnode, node_type):
126
    results = node_type.get_attributes([
127
        ua.AttributeIds.BrowseName,
128
        ua.AttributeIds.Description,
129
        ua.AttributeIds.WriteMask,
130
        ua.AttributeIds.UserWriteMask,
131
        ua.AttributeIds.EventNotifier])
132
133
    attrs = ua.ObjectAttributes()
134
    _set_attr(attrs.DisplayName, results, 0)
135
    _set_attr(attrs.Description, results, 0)
136
    attrs.DisplayName = rdesc.BrowseName
137
    attrs.Description = rdesc.Description
138
    if results[0].StatusCode.is_good():
139
        attrs.Description = results[0].Value.Value
140
    if results[1].StatusCode.is_good():
141
        attrs.WriteMask = results[1].Value.Value
142
    if results[2].StatusCode.is_good():
143
        attrs.UserWriteMask = results[2].Value.Value
144
    if results[3].StatusCode.is_good():
145
        attrs.UserWriteMask = results[3].Value.Value
146
147
    addnode.NodeAttributes = attrs
148
149
150 1
def _read_and_copy_attrs(node_type, struct, addnode):
151
    names = [name for name in struct.__dict__.keys() if not name.startswith("_") and name not in ("BodyLength", "TypeId", "SpecifiedAttributes", "Encoding")]
152
    attrs = [getattr(ua.AttributeIds, name) for name in names]
153
    print("Names are ", names)
154
    for name in names:
155
        results = node_type.get_attributes(attrs)
156
    for idx, name in enumerate(names):
157
        if results[idx].StatusCode.is_good():
158
            if name == "Value":
159
                setattr(struct, name, results[idx].Value)
160
            else:
161
                setattr(struct, name, results[idx].Value.Value)
162
        else:
163
            print("!!!!!!!!!!!!Error, for nodeid %s, attribute %s, statuscode is %s" % (node_type, name, results[idx].StatusCode))
164
    print("struct is ", struct)
165
    addnode.NodeAttributes = struct
166
167
168 1
def _add_variable_attrs(addnode, rdesc, node_type):
169
    results = node_type.get_attributes([
170
        ua.AttributeIds.EventNotifier,
171
        ua.AttributeIds.Description,
172
        ua.AttributeIds.WriteMask,
173
        ua.AttributeIds.UserWriteMask,
174
        ua.AttributeIds.Value,
175
        ua.AttributeIds.DataType,
176
        ua.AttributeIds.ValueRank,
177
        ua.AttributeIds.ArrayDimentions,
178
        ua.AttributeIds.AccessLevel,
179
        ua.AttributeIds.UserAccessLevel,
180
        ua.AttributeIds.MinimumSamplingInterval,
181
        ua.AttributeIds.Historizing])
182
183
    attrs = ua.ObjectAttributes()
184
    if results[0].is_good():
185
        attrs.DisplayName = results[0].Value.Value
186
    if results[1].is_good():
187
        attrs.Description = results[1].Value.Value
188
    if results[2].is_good():
189
        attrs.WriteMask = results[2].Value.Value
190
    if results[3].is_good():
191
        attrs.UserWriteMask = results[3].Value.Value
192
    # if results[4].is_good():
193
        #attrs.Value = results[4].Value.Value
194
    if results[5].is_good():
195
        attrs.DataType = results[5].Value.Value
196
    if results[6].is_good():
197
        attrs.ValueRank = results[6].Value.Value
198
    if results[7].is_good():
199
        attrs.ArrayDimensions = results[7].Value.Value
200
    if results[8].is_good():
201
        attrs.AccessLevel = results[8].Value.Value
202
    if results[9].is_good():
203
        attrs.UserAccessLevel = results[9].Value.Value
204
    if results[10].is_good():
205
        attrs.MinimumSamplingInterval = results[10].Value.Value
206
    if results[11].is_good():
207
        attrs.Historizing = results[11].Value.Value
208
209
    addnode.NodeAttributes = attrs
210
211
212 1
def _set_attr(container, result, idx):
213
    if result.is_good():
214
        container = result.Value.Value
215