Completed
Pull Request — master (#509)
by
unknown
04:56
created

create_method()   A

Complexity

Conditions 3

Size

Total Lines 20

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 8
CRAP Score 3.072

Importance

Changes 1
Bugs 1 Features 0
Metric Value
cc 3
c 1
b 1
f 0
dl 0
loc 20
ccs 8
cts 10
cp 0.8
crap 3.072
rs 9.4285
1
"""
2
High level functions to create nodes
3
"""
4 1
from opcua import ua
5 1
from opcua.common import node
6 1
from opcua.common.instantiate import instantiate
7
8
9 1
def _parse_nodeid_qname(*args):
10 1
    try:
11 1
        if isinstance(args[0], int):
12 1
            nodeid = ua.NodeId(0, int(args[0]))
13 1
            qname = ua.QualifiedName(args[1], int(args[0]))
14 1
            return nodeid, qname
15 1
        if isinstance(args[0], ua.NodeId):
16 1
            nodeid = args[0]
17 1
        elif isinstance(args[0], str):
18 1
            nodeid = ua.NodeId.from_string(args[0])
19
        else:
20 1
            raise RuntimeError()
21 1
        if isinstance(args[1], ua.QualifiedName):
22 1
            qname = args[1]
23 1
        elif isinstance(args[1], str):
24 1
            qname = ua.QualifiedName.from_string(args[1])
25
        else:
26 1
            raise RuntimeError()
27 1
        return nodeid, qname
28 1
    except ua.UaError:
29 1
        raise
30 1
    except Exception as ex:
31 1
        raise TypeError("This method takes either a namespace index and a string as argument or a nodeid and a qualifiedname. Received arguments {0} and got exception {1}".format(args, ex))
32
33
34 1
def create_folder(parent, nodeid, bname):
35
    """
36
    create a child node folder
37
    arguments are nodeid, browsename
38
    or namespace index, name
39
    """
40 1
    nodeid, qname = _parse_nodeid_qname(nodeid, bname)
41 1
    return node.Node(parent.server, _create_object(parent.server, parent.nodeid, nodeid, qname, ua.ObjectIds.FolderType))
42
43
44 1
def create_object(parent, nodeid, bname, objecttype=None):
45
    """
46
    create a child node object
47
    arguments are nodeid, browsename, [objecttype]
48
    or namespace index, name, [objecttype]
49
    if objectype is given (a NodeId) then the type node is instantiated inclusive its child nodes
50
    """
51 1
    nodeid, qname = _parse_nodeid_qname(nodeid, bname)
52 1
    if objecttype is not None:
53 1
        objecttype = node.Node(parent.server, objecttype)
54 1
        dname = ua.LocalizedText(bname)
55 1
        nodes = instantiate(parent, objecttype, nodeid, bname=qname, dname=dname)[0]
56 1
        return nodes
57
    else:
58 1
        return node.Node(parent.server, _create_object(parent.server, parent.nodeid, nodeid, qname, ua.ObjectIds.BaseObjectType))
59
60
61 1
def create_property(parent, nodeid, bname, val, varianttype=None, datatype=None):
62
    """
63
    create a child node property
64
    args are nodeid, browsename, value, [variant type]
65
    or idx, name, value, [variant type]
66
    """
67 1
    nodeid, qname = _parse_nodeid_qname(nodeid, bname)
68 1
    var = ua.Variant(val, varianttype)
69 1
    if datatype and isinstance(datatype, int):
70 1
        datatype = ua.NodeId(datatype, 0)
71 1
    if datatype and not isinstance(datatype, ua.NodeId):
72
        raise RuntimeError("datatype argument must be a nodeid or an int refering to a nodeid")
73 1
    return node.Node(parent.server, _create_variable(parent.server, parent.nodeid, nodeid, qname, var, datatype=datatype, isproperty=True))
74
75
76 1
def create_variable(parent, nodeid, bname, val, varianttype=None, datatype=None):
77
    """
78
    create a child node variable
79
    args are nodeid, browsename, value, [variant type], [data type]
80
    or idx, name, value, [variant type], [data type]
81
    """
82 1
    nodeid, qname = _parse_nodeid_qname(nodeid, bname)
83 1
    var = ua.Variant(val, varianttype)
84 1
    if datatype and isinstance(datatype, int):
85 1
        datatype = ua.NodeId(datatype, 0)
86 1
    if datatype and not isinstance(datatype, ua.NodeId):
87
        raise RuntimeError("datatype argument must be a nodeid or an int refering to a nodeid")
88
89 1
    return node.Node(parent.server, _create_variable(parent.server, parent.nodeid, nodeid, qname, var, datatype=datatype, isproperty=False))
90
91
92 1
def create_variable_type(parent, nodeid, bname, datatype):
93
    """
94
    Create a new variable type
95
    args are nodeid, browsename and datatype
96
    or idx, name and data type
97
    """
98
    nodeid, qname = _parse_nodeid_qname(nodeid, bname)
99
    if datatype and isinstance(datatype, int):
100
        datatype = ua.NodeId(datatype, 0)
101
    if datatype and not isinstance(datatype, ua.NodeId):
102
        raise RuntimeError("Data type argument must be a nodeid or an int refering to a nodeid, received: {}".format(datatype))
103
    return node.Node(parent.server, _create_variable_type(parent.server, parent.nodeid, nodeid, qname, datatype))
104
105
106 1
def create_reference_type(parent, nodeid, bname):
107
    """
108
    Create a new reference type
109
    args are nodeid and browsename
110
    or idx and name
111
    """
112
    nodeid, qname = _parse_nodeid_qname(nodeid, bname)
113
    addnode = ua.AddNodesItem()
114
    addnode.RequestedNewNodeId = nodeid
115
    addnode.BrowseName = qname
116
    addnode.NodeClass = ua.NodeClass.Variable
117
    addnode.ParentNodeId = parent.nodeid
118
    addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasSubtype)
119
    attrs = ua.ReferenceTypeAttributes()
120
    attrs.IsAbstract = False
121
    attrs.Description = ua.LocalizedText(qname.Name)
122
    attrs.DisplayName = ua.LocalizedText(qname.Name)
123
    attrs.AccessLevel = ua.AccessLevel.CurrentRead.mask
124
    attrs.UserAccessLevel = ua.AccessLevel.CurrentRead.mask
125
    addnode.NodeAttributes = attrs
126
    results = parent.server.add_nodes([addnode])
127
    results[0].StatusCode.check()
128
    return node.Node(parent.server, results[0].AddedNodeId)
129
130
131 1
def create_object_type(parent, nodeid, bname):
132
    """
133
    Create a new object type to be instanciated in address space.
134
    arguments are nodeid, browsename
135
    or namespace index, name
136
    """
137 1
    nodeid, qname = _parse_nodeid_qname(nodeid, bname)
138 1
    return node.Node(parent.server, _create_object_type(parent.server, parent.nodeid, nodeid, qname))
139
140
141 1
def create_method(parent, *args):
142
    """
143
    create a child method object
144
    This is only possible on server side!!
145
    args are nodeid, browsename, method_to_be_called, [input argument types], [output argument types]
146
    or idx, name, method_to_be_called, [input argument types], [output argument types]
147
    if argument types is specified, child nodes advertising what arguments the method uses and returns will be created
148
    a callback is a method accepting the nodeid of the parent as first argument and variants after. returns a list of variants
149
    """
150 1
    nodeid, qname = _parse_nodeid_qname(*args[:2])
151 1
    callback = args[2]
152 1
    if len(args) > 3:
153 1
        inputs = args[3]
154
    else:
155
        inputs = []
156 1
    if len(args) > 4:
157 1
        outputs = args[4]
158
    else:
159
        outputs = []
160 1
    return node.Node(parent.server, _create_method(parent, nodeid, qname, callback, inputs, outputs))
161
162 View Code Duplication
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
163 1
def _create_object(server, parentnodeid, nodeid, qname, objecttype):
164 1
    addnode = ua.AddNodesItem()
165 1
    addnode.RequestedNewNodeId = nodeid
166 1
    addnode.BrowseName = qname
167 1
    addnode.ParentNodeId = parentnodeid
168 1
    if node.Node(server, parentnodeid).get_type_definition() == ua.NodeId(ua.ObjectIds.FolderType):
169 1
        addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.Organizes)
170
    else:
171 1
        addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasComponent)
172 1
    addnode.NodeClass = ua.NodeClass.Object
173 1
    if isinstance(objecttype, int):
174 1
        addnode.TypeDefinition = ua.NodeId(objecttype)
175
    else:
176
        addnode.TypeDefinition = objecttype
177 1
    attrs = ua.ObjectAttributes()
178 1
    attrs.EventNotifier = 0
179
180 1
    attrs.Description = ua.LocalizedText(qname.Name)
181 1
    attrs.DisplayName = ua.LocalizedText(qname.Name)
182 1
    attrs.WriteMask = 0
183 1
    attrs.UserWriteMask = 0
184 1
    addnode.NodeAttributes = attrs
185 1
    results = server.add_nodes([addnode])
186 1
    results[0].StatusCode.check()
187 1
    return results[0].AddedNodeId
188
189
190 1
def _create_object_type(server, parentnodeid, nodeid, qname):
191 1
    addnode = ua.AddNodesItem()
192 1
    addnode.RequestedNewNodeId = nodeid
193 1
    addnode.BrowseName = qname
194 1
    addnode.ParentNodeId = parentnodeid
195 1
    addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasSubtype)
196 1
    addnode.NodeClass = ua.NodeClass.ObjectType
197 1
    attrs = ua.ObjectTypeAttributes()
198 1
    attrs.IsAbstract = False
199 1
    attrs.Description = ua.LocalizedText(qname.Name)
200 1
    attrs.DisplayName = ua.LocalizedText(qname.Name)
201 1
    attrs.WriteMask = 0
202 1
    attrs.UserWriteMask = 0
203 1
    addnode.NodeAttributes = attrs
204 1
    results = server.add_nodes([addnode])
205 1
    results[0].StatusCode.check()
206 1
    return results[0].AddedNodeId
207
208
209 1
def _create_variable(server, parentnodeid, nodeid, qname, var, datatype=None, isproperty=False):
210 1
    addnode = ua.AddNodesItem()
211 1
    addnode.RequestedNewNodeId = nodeid
212 1
    addnode.BrowseName = qname
213 1
    addnode.NodeClass = ua.NodeClass.Variable
214 1
    addnode.ParentNodeId = parentnodeid
215 1
    if isproperty:
216 1
        addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasProperty)
217 1
        addnode.TypeDefinition = ua.NodeId(ua.ObjectIds.PropertyType)
218
    else:
219 1
        addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasComponent)
220 1
        addnode.TypeDefinition = ua.NodeId(ua.ObjectIds.BaseDataVariableType)
221 1
    attrs = ua.VariableAttributes()
222 1
    attrs.Description = ua.LocalizedText(qname.Name)
223 1
    attrs.DisplayName = ua.LocalizedText(qname.Name)
224 1
    if datatype:
225 1
        attrs.DataType = datatype
226
    else:
227 1
        attrs.DataType = _guess_datatype(var)
228
229 1
    attrs.Value = var
230 1
    if not isinstance(var.Value, (list, tuple)):
231 1
        attrs.ValueRank = ua.ValueRank.Scalar
232
    else:
233 1
        if var.Dimensions:
234 1
            attrs.ValueRank = len(var.Dimensions)
235 1
            attrs.ArrayDimensions = var.Dimensions
236 1
    attrs.WriteMask = 0
237 1
    attrs.UserWriteMask = 0
238 1
    attrs.Historizing = 0
239 1
    attrs.AccessLevel = ua.AccessLevel.CurrentRead.mask
240 1
    attrs.UserAccessLevel = ua.AccessLevel.CurrentRead.mask
241 1
    addnode.NodeAttributes = attrs
242 1
    results = server.add_nodes([addnode])
243 1
    results[0].StatusCode.check()
244 1
    return results[0].AddedNodeId
245
246
247 1
def _create_variable_type(server, parentnodeid, nodeid, qname, datatype, value=None):
248
    addnode = ua.AddNodesItem()
249
    addnode.RequestedNewNodeId = nodeid
250
    addnode.BrowseName = qname
251
    addnode.NodeClass = ua.NodeClass.VariableType
252
    addnode.ParentNodeId = parentnodeid
253
    addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasSubtype)
254
    #addnode.TypeDefinition = ua.NodeId(ua.ObjectIds.BaseDataVariableType)
255
    attrs = ua.VariableTypeAttributes()
256
    attrs.Description = ua.LocalizedText(qname.Name)
257
    attrs.DisplayName = ua.LocalizedText(qname.Name)
258
    attrs.DataType = datatype
259
    attrs.IsAbstract = False
260
    if value:
261
        attrs.Value = value
262
        if isinstance(value, (list, tuple)):
263
            attrs.ValueRank = ua.ValueRank.OneDimension
264
        else:
265
            attrs.ValueRank = ua.ValueRank.Scalar
266
    #attrs.ArrayDimensions = None
267
    attrs.WriteMask = 0
268
    attrs.UserWriteMask = 0
269
    addnode.NodeAttributes = attrs
270
    results = server.add_nodes([addnode])
271
    results[0].StatusCode.check()
272
    return results[0].AddedNodeId
273
274 View Code Duplication
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
275 1
def create_data_type(parent, nodeid, bname, description=None):
276
    """
277
    Create a new data type to be used in new variables, etc ..
278
    arguments are nodeid, browsename
279
    or namespace index, name
280
    """
281 1
    nodeid, qname = _parse_nodeid_qname(nodeid, bname)
282
283 1
    addnode = ua.AddNodesItem()
284 1
    addnode.RequestedNewNodeId = nodeid
285 1
    addnode.BrowseName = qname
286 1
    addnode.NodeClass = ua.NodeClass.DataType
287 1
    addnode.ParentNodeId = parent.nodeid
288 1
    addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasSubtype)
289
    #addnode.TypeDefinition = ua.NodeId(ua.ObjectIds.BaseDataVariableType) # No type definition for types
290 1
    attrs = ua.DataTypeAttributes()
291 1
    if description is None:
292 1
        attrs.Description = ua.LocalizedText(qname.Name)
293
    else:
294
        attrs.Description = ua.LocalizedText(description)
295 1
    attrs.DisplayName = ua.LocalizedText(qname.Name)
296 1
    attrs.WriteMask = 0
297 1
    attrs.UserWriteMask = 0
298 1
    attrs.IsAbstract = False  # True mean they cannot be instanciated
299 1
    addnode.NodeAttributes = attrs
300 1
    results = parent.server.add_nodes([addnode])
301 1
    results[0].StatusCode.check()
302 1
    return node.Node(parent.server, results[0].AddedNodeId)
303
304
305 1
def _create_method(parent, nodeid, qname, callback, inputs, outputs):
306 1
    addnode = ua.AddNodesItem()
307 1
    addnode.RequestedNewNodeId = nodeid
308 1
    addnode.BrowseName = qname
309 1
    addnode.NodeClass = ua.NodeClass.Method
310 1
    addnode.ParentNodeId = parent.nodeid
311 1
    addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasComponent)
312
    #node.TypeDefinition = ua.NodeId(ua.ObjectIds.BaseObjectType)
313 1
    attrs = ua.MethodAttributes()
314 1
    attrs.Description = ua.LocalizedText(qname.Name)
315 1
    attrs.DisplayName = ua.LocalizedText(qname.Name)
316 1
    attrs.WriteMask = 0
317 1
    attrs.UserWriteMask = 0
318 1
    attrs.Executable = True
319 1
    attrs.UserExecutable = True
320 1
    addnode.NodeAttributes = attrs
321 1
    results = parent.server.add_nodes([addnode])
322 1
    results[0].StatusCode.check()
323 1
    method = node.Node(parent.server, results[0].AddedNodeId)
324 1
    if inputs:
325 1
        create_property(method,
326
                        ua.NodeId(namespaceidx=method.nodeid.NamespaceIndex),
327
                        ua.QualifiedName("InputArguments", 0),
328
                        [_vtype_to_argument(vtype) for vtype in inputs],
329
                        varianttype=ua.VariantType.ExtensionObject,
330
                        datatype=ua.ObjectIds.Argument)
331 1
    if outputs:
332 1
        create_property(method,
333
                        ua.NodeId(namespaceidx=method.nodeid.NamespaceIndex),
334
                        ua.QualifiedName("OutputArguments", 0),
335
                        [_vtype_to_argument(vtype) for vtype in outputs],
336
                        varianttype=ua.VariantType.ExtensionObject,
337
                        datatype=ua.ObjectIds.Argument)
338 1
    if hasattr(parent.server, "add_method_callback"):
339 1
        parent.server.add_method_callback(method.nodeid, callback)
340 1
    return results[0].AddedNodeId
341
342
343 1
def _vtype_to_argument(vtype):
344 1
    if isinstance(vtype, ua.Argument):
345
        return vtype
346 1
    arg = ua.Argument()
347 1
    if isinstance(vtype, ua.VariantType):
348 1
        arg.DataType = ua.NodeId(vtype.value)
349
    else:
350
        arg.DataType = ua.NodeId(vtype)
351 1
    return arg
352
353
354 1
def _guess_datatype(variant):
355 1
    if variant.VariantType == ua.VariantType.ExtensionObject:
356 1
        if variant.Value is None:
357
            raise ua.UaError("Cannot guess DataType from Null ExtensionObject")
358 1
        if type(variant.Value) in (list, tuple):
359 1
            if len(variant.Value) == 0:
360
                raise ua.UaError("Cannot guess DataType from Null ExtensionObject")
361 1
            extobj = variant.Value[0]
362
        else:
363 1
            extobj = variant.Value
364 1
        classname = extobj.__class__.__name__
365 1
        return ua.NodeId(getattr(ua.ObjectIds, classname))
366
    else:
367 1
        return ua.NodeId(getattr(ua.ObjectIds, variant.VariantType.name))
368
369
370 1
def delete_nodes(server, nodes, recursive=False):
371
    """
372
    Delete specified nodes. Optionally delete recursively all nodes with a
373
    downward hierachic references to the node
374
    """
375 1
    nodestodelete = []
376 1
    if recursive:
377 1
        nodes += _add_childs(nodes)
378 1
    for mynode in nodes:
379 1
        it = ua.DeleteNodesItem()
380 1
        it.NodeId = mynode.nodeid
381 1
        it.DeleteTargetReferences = True
382 1
        nodestodelete.append(it)
383 1
    params = ua.DeleteNodesParameters()
384 1
    params.NodesToDelete = nodestodelete
385 1
    return server.delete_nodes(params)
386
387
388 1
def _add_childs(nodes):
389 1
    results = []
390 1
    for mynode in nodes[:]:
391 1
        results += mynode.get_children()
392 1
    return results
393
394
395