Test Failed
Push — dev ( 9f8731...6735ce )
by Olivier
01:54
created

opcua.common.delete_nodes()   A

Complexity

Conditions 3

Size

Total Lines 14

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 3
dl 0
loc 14
rs 9.4285
1
"""
2
High level functions to create nodes
3
"""
4
from opcua import ua
5
from opcua.common import node
6
7
8
def _parse_add_args(*args):
9
    if isinstance(args[0], ua.NodeId):
10
        return args[0], args[1]
11
    elif isinstance(args[0], str):
12
        return ua.NodeId.from_string(args[0]), ua.QualifiedName.from_string(args[1])
13
    elif isinstance(args[0], int):
14
        return ua.generate_nodeid(args[0]), ua.QualifiedName(args[1], args[0])
15
    else:
16
        raise TypeError("Add methods takes a nodeid and a qualifiedname as argument, received %s" % args)
17
18
19
def create_folder(parent, *args):
20
    """
21
    create a child node folder
22
    arguments are nodeid, browsename
23
    or namespace index, name
24
    """
25
    nodeid, qname = _parse_add_args(*args)
26
    return node.Node(parent.server, _create_folder(parent.server, parent.nodeid, nodeid, qname))
27
28
29
def create_object(parent, *args):
30
    """
31
    create a child node object
32
    arguments are nodeid, browsename
33
    or namespace index, name
34
    """
35
    nodeid, qname = _parse_add_args(*args)
36
    return node.Node(parent.server, _create_object(parent.server, parent.nodeid, nodeid, qname))
37
38
39
def create_property(parent, *args):
40
    """
41
    create a child node property
42
    args are nodeid, browsename, value, [variant type]
43
    or idx, name, value, [variant type]
44
    """
45
    nodeid, qname = _parse_add_args(*args[:2])
46
    val = _to_variant(*args[2:])
47
    return node.Node(parent.server, _create_variable(parent.server, parent.nodeid, nodeid, qname, val, isproperty=True))
48
49
50
def create_variable(parent, *args):
51
    """
52
    create a child node variable
53
    args are nodeid, browsename, value, [variant type]
54
    or idx, name, value, [variant type]
55
    """
56
    nodeid, qname = _parse_add_args(*args[:2])
57
    val = _to_variant(*args[2:])
58
    return node.Node(parent.server, _create_variable(parent.server, parent.nodeid, nodeid, qname, val, isproperty=False))
59
60
61
def create_method(parent, *args):
62
    """
63
    create a child method object
64
    This is only possible on server side!!
65
    args are nodeid, browsename, method_to_be_called, [input argument types], [output argument types]
66
    or idx, name, method_to_be_called, [input argument types], [output argument types]
67
    if argument types is specified, child nodes advertising what arguments the method uses and returns will be created
68
    a callback is a method accepting the nodeid of the parent as first argument and variants after. returns a list of variants
69
    """
70
    nodeid, qname = _parse_add_args(*args[:2])
71
    callback = args[2]
72
    if len(args) > 3:
73
        inputs = args[3]
74
    if len(args) > 4:
75
        outputs = args[4]
76
    return _create_method(parent, nodeid, qname, callback, inputs, outputs)
77
78
79
def _create_folder(server, parentnodeid, nodeid, qname):
80
    addnode = ua.AddNodesItem()
81
    addnode.RequestedNewNodeId = nodeid
82
    addnode.BrowseName = qname
83
    addnode.NodeClass = ua.NodeClass.Object
84
    addnode.ParentNodeId = parentnodeid
85
    addnode.ReferenceTypeId = ua.NodeId.from_string("i=35")
86
    addnode.TypeDefinition = ua.NodeId.from_string("i=61")
87
    attrs = ua.ObjectAttributes()
88
    attrs.Description = ua.LocalizedText(qname.Name)
89
    attrs.DisplayName = ua.LocalizedText(qname.Name)
90
    attrs.WriteMask = ua.OpenFileMode.Read
91
    attrs.UserWriteMask = ua.OpenFileMode.Read
92
    attrs.EventNotifier = 0
93
    addnode.NodeAttributes = attrs
94
    results = server.add_nodes([addnode])
95
    results[0].StatusCode.check()
96
    return nodeid
97
98
99
def _create_object(server, parentnodeid, nodeid, qname):
100
    addnode = ua.AddNodesItem()
101
    addnode.RequestedNewNodeId = nodeid
102
    addnode.BrowseName = qname
103
    addnode.NodeClass = ua.NodeClass.Object
104
    addnode.ParentNodeId = parentnodeid
105
    addnode.ReferenceTypeId = ua.NodeId.from_string("i=35")
106
    addnode.TypeDefinition = ua.NodeId(ua.ObjectIds.BaseObjectType)
107
    attrs = ua.ObjectAttributes()
108
    attrs.Description = ua.LocalizedText(qname.Name)
109
    attrs.DisplayName = ua.LocalizedText(qname.Name)
110
    attrs.EventNotifier = 0
111
    attrs.WriteMask = ua.OpenFileMode.Read
112
    attrs.UserWriteMask = ua.OpenFileMode.Read
113
    addnode.NodeAttributes = attrs
114
    results = server.add_nodes([addnode])
115
    results[0].StatusCode.check()
116
    return nodeid
117
118
119
def _to_variant(val, vtype=None):
120
    if isinstance(val, ua.Variant):
121
        return val
122
    else:
123
        return ua.Variant(val, vtype)
124
125
126
def _create_variable(server, parentnodeid, nodeid, qname, val, isproperty=False):
127
    addnode = ua.AddNodesItem()
128
    addnode.RequestedNewNodeId = nodeid
129
    addnode.BrowseName = qname
130
    addnode.NodeClass = ua.NodeClass.Variable
131
    addnode.ParentNodeId = parentnodeid
132
    if isproperty:
133
        addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasProperty)
134
        addnode.TypeDefinition = ua.NodeId(ua.ObjectIds.PropertyType)
135
    else:
136
        addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasComponent)
137
        addnode.TypeDefinition = ua.NodeId(ua.ObjectIds.BaseDataVariableType)
138
    attrs = ua.VariableAttributes()
139
    attrs.Description = ua.LocalizedText(qname.Name)
140
    attrs.DisplayName = ua.LocalizedText(qname.Name)
141
    attrs.DataType = _guess_uatype(val)
142
    attrs.Value = val
143
    if isinstance(val, list) or isinstance(val, tuple):
144
        attrs.ValueRank = ua.ValueRank.OneDimension
145
    else:
146
        attrs.ValueRank = ua.ValueRank.Scalar
147
    #attrs.ArrayDimensions = None
148
    attrs.WriteMask = ua.OpenFileMode.Read
149
    attrs.UserWriteMask = ua.OpenFileMode.Read
150
    attrs.Historizing = 0
151
    addnode.NodeAttributes = attrs
152
    results = server.add_nodes([addnode])
153
    results[0].StatusCode.check()
154
    return nodeid
155
156
157
def _create_method(parent, nodeid, qname, callback, inputs, outputs):
158
    addnode = ua.AddNodesItem()
159
    addnode.RequestedNewNodeId = nodeid
160
    addnode.BrowseName = qname
161
    addnode.NodeClass = ua.NodeClass.Method
162
    addnode.ParentNodeId = parent.nodeid
163
    addnode.ReferenceTypeId = ua.NodeId.from_string("i=47")
164
    #node.TypeDefinition = ua.NodeId(ua.ObjectIds.BaseObjectType)
165
    attrs = ua.MethodAttributes()
166
    attrs.Description = ua.LocalizedText(qname.Name)
167
    attrs.DisplayName = ua.LocalizedText(qname.Name)
168
    attrs.WriteMask = ua.OpenFileMode.Read
169
    attrs.UserWriteMask = ua.OpenFileMode.Read
170
    attrs.Executable = True
171
    attrs.UserExecutable = True
172
    addnode.NodeAttributes = attrs
173
    results = parent.server.add_nodes([addnode])
174
    results[0].StatusCode.check()
175
    method = node.Node(parent.server, nodeid)
176
    if inputs:
177
        create_property(method, ua.generate_nodeid(qname.NamespaceIndex), ua.QualifiedName("InputArguments", 0), [_vtype_to_argument(vtype) for vtype in inputs])
178
    if outputs:
179
        create_property(method, ua.generate_nodeid(qname.NamespaceIndex), ua.QualifiedName("OutputArguments", 0), [_vtype_to_argument(vtype) for vtype in outputs])
180
    parent.server.add_method_callback(method.nodeid, callback)
181
    return nodeid
182
183
184
def _vtype_to_argument(vtype):
185
    if isinstance(vtype, ua.Argument):
186
        return vtype
187
188
    arg = ua.Argument()
189
    v = ua.Variant(None, vtype)
190
    arg.DataType = _guess_uatype(v)
191
    return arg
192
193
194
def _guess_uatype(variant):
195
    if variant.VariantType == ua.VariantType.ExtensionObject:
196
        if variant.Value is None:
197
            raise ua.UaError("Cannot guess DataType from Null ExtensionObject")
198
        if type(variant.Value) in (list, tuple):
199
            if len(variant.Value) == 0:
200
                raise ua.UaError("Cannot guess DataType from Null ExtensionObject")
201
            extobj = variant.Value[0]
202
        else:
203
            extobj = variant.Value
204
        classname = extobj.__class__.__name__
205
        return ua.NodeId(getattr(ua.ObjectIds, classname))
206
    else:
207
        return ua.NodeId(getattr(ua.ObjectIds, variant.VariantType.name))
208
209
210
def delete_nodes(server, nodes, recursive=False):
211
    """
212
    Delete specified nodes. Optionally delete recursively all nodes with a 
213
    downward hierachic references to the node
214
    """
215
    nodestodelete = []
216
    if recursive:
217
        nodes += _add_childs(nodes)
218
    for mynode in nodes:
219
        it = ua.DeleteNodesItem()
220
        it.NodeId = mynode.nodeid
221
        it.DeleteTargetReferences = True
222
        nodestodelete.append(it)
223
    return server.delete_nodes(nodestodelete)
224
225
226
def _add_childs(nodes):
227
    results = []
228
    for mynode in nodes[:]:
229
        results += mynode.get_children()
230
    return results
231
232
233