Completed
Pull Request — master (#244)
by Olivier
02:48
created

instantiate()   B

Complexity

Conditions 5

Size

Total Lines 25

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 5
c 1
b 0
f 0
dl 0
loc 25
rs 8.0894
1
"""
2
instantiate a node from a node type.
3
Can also be used to duplicate a node tree
4
"""
5
6
7
from opcua import Node
8
from opcua import ua
9
10
11
class _ReadAdder(object):
12
    """
13
    Internal
14
    """
15
16
    def __init__(self, server, nodeid):
17
        self.server = server
18
        self.nodeid = nodeid
19
        self.params = ua.ReadParameters()
20
        self._debug_attr = []
21
22
    def add(self, attr):
23
        rv = ua.ReadValueId()
24
        rv.NodeId = self.nodeid
25
        rv.AttributeId = attr
26
        self.params.NodesToRead.append(rv)
27
        self._debug_attr.append(attr)
28
29
    def read(self):
30
        vals = self.server.read(self.params)
31
        new_vals = []
32
        for idx, val in enumerate(vals):
33
            if not val.StatusCode.is_good():
34
                print(val)
35
                print("Error attribute %s is not valid for node %s" % (self._debug_attr[idx], self.nodeid))
36
            # val.StatusCode.check()
37
            new_vals.append(val.Value.Value)
38
        return new_vals
39
40
41
# def _read_attributed(server, nodeid, attrs_obj, *attrs):
42
    #ra = _ReadAdder(server, rdesc.NodeId)
43
    # for attr in attrs:
44
        # ra.add(atttr)
45
    #vals = ra.read()
46
47
def instantiate(parent, node_type, nodeid=None, bname=None, idx=0):
48
    """
49
    instantiate a node type under a parent node.
50
    nodeid and browse name of new node can be specified, or just namespace index
51
    """
52
53
    results = node_type.get_attributes([ua.AttributeIds.NodeClass, ua.AttributeIds.BrowseName, ua.AttributeIds.DisplayName])
54
    nclass, qname, dname = [res.Value.Value for res in results]
55
56
    rdesc = ua.ReferenceDescription()
57
    rdesc.NodeId = node_type.nodeid
58
    rdesc.BrowseName = qname
59
    rdesc.DisplayName = dname
60
    rdesc.NodeClass = nclass
61
    rdesc.ReferenceTypeId = ua.TwoByteNodeId(ua.ObjectIds.HasComponent)
62
    rdesc.TypeDefinition = node_type.nodeid
63
    print("MYRDESC", rdesc)
64
    if nodeid is None:
65
        nodeid = ua.NodeId(namespaceidx=idx)  # will trigger automtic node generation in namespace idx
66
    if bname is None:
67
        bname = rdesc.BrowseName
68
    elif isinstance(bname, str):
69
        bname = ua.QualifiedName.from_string(bname)
70
71
    return _instantiate_node(parent.server, parent.nodeid, rdesc, nodeid, bname)
72
73
74
def _instantiate_node(server, parentid, rdesc, nodeid, bname, recursive=True):
75
    """
76
    instantiate a node type under parent
77
    """
78
79
    print("\n\nInstanciating: node %s in %s" % (nodeid, parentid))
80
    print(rdesc)
81
    addnode = ua.AddNodesItem()
82
    addnode.RequestedNewNodeId = nodeid 
83
    addnode.BrowseName = bname 
84
    addnode.ParentNodeId = parentid
85
    addnode.ReferenceTypeId = rdesc.ReferenceTypeId
86
    addnode.TypeDefinition = rdesc.TypeDefinition
87
    #print("ADDNODE", addnode)
88
89
    node_type = Node(server, rdesc.NodeId)
90
91
    if rdesc.NodeClass in (ua.NodeClass.Object, ua.NodeClass.ObjectType):
92
        addnode.NodeClass = ua.NodeClass.Object
93
        print(node_type, " is object", rdesc.NodeClass)
94
        _read_and_copy_attrs(node_type, ua.ObjectAttributes(), addnode)
95
96
    elif rdesc.NodeClass in (ua.NodeClass.Variable, ua.NodeClass.VariableType):
97
        addnode.NodeClass = ua.NodeClass.Variable
98
        print(node_type, " is variable", rdesc.NodeClass)
99
        _read_and_copy_attrs(node_type, ua.VariableAttributes(), addnode)
100
101
    else:
102
        print("Node class not supported: ", rdesc.NodeClass)
103
104
    #print("ADDNODE FINAL ", addnode)
105
    res = server.add_nodes([addnode])[0]
106
    print("RES", res)  
107
108
    #refs = []
109
    #ref = ua.AddReferencesItem()
110
    #ref.IsForward = True
111
    #ref.ReferenceTypeId = addnode.ReferenceTypeId
112
    #ref.SourceNodeId = parentid
113
    #ref.TargetNodeClass = addnode.NodeClass
114
    #ref.TargetNodeId = res.AddedNodeId
115
116
    #refs.append(ref)
117
    #server.add_references(refs)
118
119
    if recursive:
120
        descs = node_type.get_children_descriptions(includesubtypes=False)
121
        for c_rdesc in descs:
122
            print("       Instanciating children", c_rdesc)
123
            _instantiate_node(server, res.AddedNodeId, c_rdesc, nodeid=ua.NodeId(namespaceidx=res.AddedNodeId.NamespaceIndex), bname=c_rdesc.BrowseName)
124
    return Node(server, addnode.RequestedNewNodeId)
125
126
127
def _add_object_attrs(addnode, rdesc, node_type):
128
    results = node_type.get_attributes([
129
        ua.AttributeIds.BrowseName,
130
        ua.AttributeIds.Description,
131
        ua.AttributeIds.WriteMask,
132
        ua.AttributeIds.UserWriteMask,
133
        ua.AttributeIds.EventNotifier])
134
135
    attrs = ua.ObjectAttributes()
136
    _set_attr(attrs.DisplayName, results, 0)
137
    _set_attr(attrs.Description, results, 0)
138
    attrs.DisplayName = rdesc.BrowseName
139
    attrs.Description = rdesc.Description
140
    if results[0].StatusCode.is_good():
141
        attrs.Description = results[0].Value.Value
142
    if results[1].StatusCode.is_good():
143
        attrs.WriteMask = results[1].Value.Value
144
    if results[2].StatusCode.is_good():
145
        attrs.UserWriteMask = results[2].Value.Value
146
    if results[3].StatusCode.is_good():
147
        attrs.UserWriteMask = results[3].Value.Value
148
149
    addnode.NodeAttributes = attrs
150
151
152
def _read_and_copy_attrs(node_type, struct, addnode):
153
    names = [name for name in struct.__dict__.keys() if not name.startswith("_") and name not in ("BodyLength", "TypeId", "SpecifiedAttributes", "Encoding", "IsAbstract")]
154
    attrs = [getattr(ua.AttributeIds, name) for name in names]
155
    #print("Names are ", names)
156
    for name in names:
157
        results = node_type.get_attributes(attrs)
158
    for idx, name in enumerate(names):
159
        if results[idx].StatusCode.is_good():
160
            if name == "Value":
161
                setattr(struct, name, results[idx].Value)
162
            else:
163
                setattr(struct, name, results[idx].Value.Value)
164
        else:
165
            print("!!!!!!!!!!!!Error, for nodeid %s, attribute %s, statuscode is %s" % (node_type, name, results[idx].StatusCode))
166
    #print("struct is ", struct)
167
    addnode.NodeAttributes = struct
168
169
170
def _add_variable_attrs(addnode, rdesc, node_type):
171
    results = node_type.get_attributes([
172
        ua.AttributeIds.EventNotifier,
173
        ua.AttributeIds.Description,
174
        ua.AttributeIds.WriteMask,
175
        ua.AttributeIds.UserWriteMask,
176
        ua.AttributeIds.Value,
177
        ua.AttributeIds.DataType,
178
        ua.AttributeIds.ValueRank,
179
        ua.AttributeIds.ArrayDimensions,
180
        ua.AttributeIds.AccessLevel,
181
        ua.AttributeIds.UserAccessLevel,
182
        ua.AttributeIds.MinimumSamplingInterval,
183
        ua.AttributeIds.Historizing])
184
185
    attrs = ua.ObjectAttributes()
186
    if results[0].is_good():
187
        attrs.DisplayName = results[0].Value.Value
188
    if results[1].is_good():
189
        attrs.Description = results[1].Value.Value
190
    if results[2].is_good():
191
        attrs.WriteMask = results[2].Value.Value
192
    if results[3].is_good():
193
        attrs.UserWriteMask = results[3].Value.Value
194
    # if results[4].is_good():
195
        #attrs.Value = results[4].Value.Value
196
    if results[5].is_good():
197
        attrs.DataType = results[5].Value.Value
198
    if results[6].is_good():
199
        attrs.ValueRank = results[6].Value.Value
200
    if results[7].is_good():
201
        attrs.ArrayDimensions = results[7].Value.Value
202
    if results[8].is_good():
203
        attrs.AccessLevel = results[8].Value.Value
204
    if results[9].is_good():
205
        attrs.UserAccessLevel = results[9].Value.Value
206
    if results[10].is_good():
207
        attrs.MinimumSamplingInterval = results[10].Value.Value
208
    if results[11].is_good():
209
        attrs.Historizing = results[11].Value.Value
210
211
    addnode.NodeAttributes = attrs
212
213
214
def _set_attr(container, result, idx):
215
    if result.is_good():
216
        container = result.Value.Value
217