Completed
Push — master ( 41c04b...6083f5 )
by Olivier
03:26
created

create_method()   A

Complexity

Conditions 3

Size

Total Lines 20

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 10
CRAP Score 3

Importance

Changes 1
Bugs 0 Features 1
Metric Value
cc 3
c 1
b 0
f 1
dl 0
loc 20
ccs 10
cts 10
cp 1
crap 3
rs 9.4285
1
"""
2
High level functions to create nodes
3
"""
4 1
from opcua import ua
5 1
from opcua.common import node
6 1
from opcua.common.instantiate import instantiate
7
8
9 1
def _parse_nodeid_qname(*args):
10 1
    try:
11 1
        if isinstance(args[0], int):
12 1
            nodeid = ua.NodeId(0, int(args[0]))
13 1
            qname = ua.QualifiedName(args[1], int(args[0]))
14 1
            return nodeid, qname
15 1
        if isinstance(args[0], ua.NodeId):
16 1
            nodeid = args[0]
17 1
        elif isinstance(args[0], str):
18 1
            nodeid = ua.NodeId.from_string(args[0])
19
        else:
20 1
            raise RuntimeError()
21 1
        if isinstance(args[1], ua.QualifiedName):
22 1
            qname = args[1]
23 1
        elif isinstance(args[1], str):
24 1
            qname = ua.QualifiedName.from_string(args[1])
25
        else:
26 1
            raise RuntimeError()
27 1
        return nodeid, qname
28 1
    except ua.UaError:
29 1
        raise
30 1
    except Exception as ex:
31 1
        raise TypeError("This method takes either a namespace index and a string as argument or a nodeid and a qualifiedname. Received arguments {} and got exception {}".format(args, ex))
32
33
34 1
def create_folder(parent, nodeid, bname):
35
    """
36
    create a child node folder
37
    arguments are nodeid, browsename
38
    or namespace index, name
39
    """
40 1
    nodeid, qname = _parse_nodeid_qname(nodeid, bname)
41 1
    return node.Node(parent.server, _create_object(parent.server, parent.nodeid, nodeid, qname, ua.ObjectIds.FolderType))
42
43
44 1
def create_object(parent, nodeid, bname, objecttype=None):
45
    """
46
    create a child node object
47
    arguments are nodeid, browsename, [objecttype]
48
    or namespace index, name, [objecttype]
49
    if objectype is given (a NodeId) then the type node is instantiated inclusive its child nodes
50
    """
51 1
    nodeid, qname = _parse_nodeid_qname(nodeid, bname)
52 1
    if objecttype is not None:
53 1
        objecttype = node.Node(parent.server, objecttype)
54 1
        nodes = instantiate(parent, objecttype, nodeid, bname)[0]
55 1
        return nodes
56
    else:
57 1
        return node.Node(parent.server, _create_object(parent.server, parent.nodeid, nodeid, qname, ua.ObjectIds.BaseObjectType))
58
59
60 1
def create_property(parent, nodeid, bname, val, varianttype=None, datatype=None):
61
    """
62
    create a child node property
63
    args are nodeid, browsename, value, [variant type]
64
    or idx, name, value, [variant type]
65
    """
66 1
    nodeid, qname = _parse_nodeid_qname(nodeid, bname)
67 1
    var = ua.Variant(val, varianttype)
68 1
    if datatype and isinstance(datatype, int):
69 1
        datatype = ua.NodeId(datatype, 0)
70 1
    if datatype and not isinstance(datatype, ua.NodeId):
71
        raise RuntimeError("datatype argument must be a nodeid or an int refering to a nodeid")
72 1
    return node.Node(parent.server, _create_variable(parent.server, parent.nodeid, nodeid, qname, var, datatype=datatype, isproperty=True))
73
74
75 1
def create_variable(parent, nodeid, bname, val, varianttype=None, datatype=None):
76
    """
77
    create a child node variable
78
    args are nodeid, browsename, value, [variant type], [data type]
79
    or idx, name, value, [variant type], [data type]
80
    """
81 1
    nodeid, qname = _parse_nodeid_qname(nodeid, bname)
82 1
    var = ua.Variant(val, varianttype)
83 1
    if datatype and isinstance(datatype, int):
84 1
        datatype = ua.NodeId(datatype, 0)
85 1
    if datatype and not isinstance(datatype, ua.NodeId):
86
        raise RuntimeError("datatype argument must be a nodeid or an int refering to a nodeid")
87
88 1
    return node.Node(parent.server, _create_variable(parent.server, parent.nodeid, nodeid, qname, var, datatype=datatype, isproperty=False))
89
90
91 1 View Code Duplication
def create_variable_type(parent, nodeid, bname, datatype):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
92
    """
93
    Create a new variable type
94
    args are nodeid, browsename and datatype
95
    or idx, name and data type
96
    """
97
    nodeid, qname = _parse_nodeid_qname(nodeid, bname)
98
    if datatype and not isinstance(datatype, ua.NodeId):
99
        raise RuntimeError("Data type should be nodeid, got {}".format(datatype))
100
    addnode = ua.AddNodesItem()
101
    addnode.RequestedNewNodeId = nodeid
102
    addnode.BrowseName = qname
103
    addnode.NodeClass = ua.NodeClass.Variable
104
    addnode.ParentNodeId = parent.nodeid
105
    addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasSubtype)
106
    attrs = ua.VariableTypeAttributes()
107
    attrs.Description = ua.LocalizedText(qname.Name)
108
    attrs.DisplayName = ua.LocalizedText(qname.Name)
109
    attrs.DataType = datatype
110
    attrs.IsAbstract = False
111
    attrs.WriteMask = 0
112
    attrs.UserWriteMask = 0
113
    addnode.NodeAttributes = attrs
114
    results = parent.server.add_nodes([addnode])
115
    results[0].StatusCode.check()
116
    return results[0].AddedNodeId
117
118
119 1
def create_reference_type(parent, nodeid, bname):
120
    """
121
    Create a new reference type
122
    args are nodeid and browsename
123
    or idx and name
124
    """
125
    nodeid, qname = _parse_nodeid_qname(nodeid, bname)
126
    addnode = ua.AddNodesItem()
127
    addnode.RequestedNewNodeId = nodeid
128
    addnode.BrowseName = qname
129
    addnode.NodeClass = ua.NodeClass.Variable
130
    addnode.ParentNodeId = parent.nodeid
131
    addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasSubtype)
132
    attrs = ua.ReferenceTypeAttributes()
133
    attrs.IsAbstract = False
134
    attrs.Description = ua.LocalizedText(qname.Name)
135
    attrs.DisplayName = ua.LocalizedText(qname.Name)
136
    attrs.AccessLevel = ua.AccessLevel.CurrentRead.mask
137
    attrs.UserAccessLevel = ua.AccessLevel.CurrentRead.mask
138
    addnode.NodeAttributes = attrs
139
    results = parent.server.add_nodes([addnode])
140
    results[0].StatusCode.check()
141
    return results[0].AddedNodeId
142
143
144 1
def create_object_type(parent, nodeid, bname):
145
    """
146
    Create a new object type to be instanciated in address space.
147
    arguments are nodeid, browsename
148
    or namespace index, name
149
    """
150 1
    nodeid, qname = _parse_nodeid_qname(nodeid, bname)
151 1
    return node.Node(parent.server, _create_object_type(parent.server, parent.nodeid, nodeid, qname))
152
153
154 1
def create_method(parent, *args):
155
    """
156
    create a child method object
157
    This is only possible on server side!!
158
    args are nodeid, browsename, method_to_be_called, [input argument types], [output argument types]
159
    or idx, name, method_to_be_called, [input argument types], [output argument types]
160
    if argument types is specified, child nodes advertising what arguments the method uses and returns will be created
161
    a callback is a method accepting the nodeid of the parent as first argument and variants after. returns a list of variants
162
    """
163 1
    nodeid, qname = _parse_nodeid_qname(*args[:2])
164 1
    callback = args[2]
165 1
    if len(args) > 3:
166 1
        inputs = args[3]
167
    else:
168 1
        inputs = []
169 1
    if len(args) > 4:
170 1
        outputs = args[4]
171
    else:
172 1
        outputs = []
173 1
    return node.Node(parent.server, _create_method(parent, nodeid, qname, callback, inputs, outputs))
174
175
176 1
def _create_object(server, parentnodeid, nodeid, qname, objecttype):
177 1
    addnode = ua.AddNodesItem()
178 1
    addnode.RequestedNewNodeId = nodeid
179 1
    addnode.BrowseName = qname
180 1
    addnode.ParentNodeId = parentnodeid
181 1
    if node.Node(server, parentnodeid).get_type_definition() == ua.NodeId(ua.ObjectIds.FolderType):
182 1
        addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.Organizes)
183
    else:
184 1
        addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasComponent)
185 1
    addnode.NodeClass = ua.NodeClass.Object
186 1
    if isinstance(objecttype, int):
187 1
        addnode.TypeDefinition = ua.NodeId(objecttype)
188
    elif isinstance(objecttype, ua.NodeId):
189
        addnode.TypeDefinition = objecttype
190 1
    attrs = ua.ObjectAttributes()
191 1
    attrs.EventNotifier = 0
192
193 1
    attrs.Description = ua.LocalizedText(qname.Name)
194 1
    attrs.DisplayName = ua.LocalizedText(qname.Name)
195 1
    attrs.WriteMask = 0
196 1
    attrs.UserWriteMask = 0
197 1
    addnode.NodeAttributes = attrs
198 1
    results = server.add_nodes([addnode])
199 1
    results[0].StatusCode.check()
200 1
    return results[0].AddedNodeId
201
202
203 1
def _create_object_type(server, parentnodeid, nodeid, qname):
204 1
    addnode = ua.AddNodesItem()
205 1
    addnode.RequestedNewNodeId = nodeid
206 1
    addnode.BrowseName = qname
207 1
    addnode.ParentNodeId = parentnodeid
208 1
    addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasSubtype)
209 1
    addnode.NodeClass = ua.NodeClass.ObjectType
210 1
    attrs = ua.ObjectTypeAttributes()
211 1
    attrs.IsAbstract = False
212 1
    attrs.Description = ua.LocalizedText(qname.Name)
213 1
    attrs.DisplayName = ua.LocalizedText(qname.Name)
214 1
    attrs.WriteMask = 0
215 1
    attrs.UserWriteMask = 0
216 1
    addnode.NodeAttributes = attrs
217 1
    results = server.add_nodes([addnode])
218 1
    results[0].StatusCode.check()
219 1
    return results[0].AddedNodeId
220
221
222 1
def _create_variable(server, parentnodeid, nodeid, qname, var, datatype=None, isproperty=False):
223 1
    addnode = ua.AddNodesItem()
224 1
    addnode.RequestedNewNodeId = nodeid
225 1
    addnode.BrowseName = qname
226 1
    addnode.NodeClass = ua.NodeClass.Variable
227 1
    addnode.ParentNodeId = parentnodeid
228 1
    if isproperty:
229 1
        addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasProperty)
230 1
        addnode.TypeDefinition = ua.NodeId(ua.ObjectIds.PropertyType)
231
    else:
232 1
        addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasComponent)
233 1
        addnode.TypeDefinition = ua.NodeId(ua.ObjectIds.BaseDataVariableType)
234 1
    attrs = ua.VariableAttributes()
235 1
    attrs.Description = ua.LocalizedText(qname.Name)
236 1
    attrs.DisplayName = ua.LocalizedText(qname.Name)
237 1
    if datatype:
238 1
        attrs.DataType = datatype
239
    else:
240 1
        attrs.DataType = _guess_datatype(var)
241
242 1
    attrs.Value = var
243 1
    if not isinstance(var.Value, (list, tuple)):
244 1
        attrs.ValueRank = ua.ValueRank.Scalar
245
    else:
246 1
        if var.Dimensions:
247 1
            attrs.ValueRank = len(var.Dimensions)
248 1
            attrs.ArrayDimensions = var.Dimensions
249 1
    attrs.WriteMask = 0
250 1
    attrs.UserWriteMask = 0
251 1
    attrs.Historizing = 0
252 1
    attrs.AccessLevel = ua.AccessLevel.CurrentRead.mask
253 1
    attrs.UserAccessLevel = ua.AccessLevel.CurrentRead.mask
254 1
    addnode.NodeAttributes = attrs
255 1
    results = server.add_nodes([addnode])
256 1
    results[0].StatusCode.check()
257 1
    return results[0].AddedNodeId
258
259
260 1
def _create_variable_type(server, parentnodeid, nodeid, qname, datatype, value=None):
261
    addnode = ua.AddNodesItem()
262
    addnode.RequestedNewNodeId = nodeid
263
    addnode.BrowseName = qname
264
    addnode.NodeClass = ua.NodeClass.VariableType
265
    addnode.ParentNodeId = parentnodeid
266
    addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasSubtype)
267
    #addnode.TypeDefinition = ua.NodeId(ua.ObjectIds.BaseDataVariableType)
268
    attrs = ua.VariableTypeAttributes()
269
    attrs.Description = ua.LocalizedText(qname.Name)
270
    attrs.DisplayName = ua.LocalizedText(qname.Name)
271
    attrs.DataType = datatype
272
    if value:
273
        attrs.Value = value
274
        if isinstance(value, (list, tuple)):
275
            attrs.ValueRank = ua.ValueRank.OneDimension
276
        else:
277
            attrs.ValueRank = ua.ValueRank.Scalar
278
    #attrs.ArrayDimensions = None
279
    attrs.WriteMask = 0
280
    attrs.UserWriteMask = 0
281
    attrs.Historizing = 0
282
    attrs.AccessLevel = ua.AccessLevel.CurrentRead.mask
283
    attrs.UserAccessLevel = ua.AccessLevel.CurrentRead.mask
284
    addnode.NodeAttributes = attrs
285
    results = server.add_nodes([addnode])
286
    results[0].StatusCode.check()
287
    return results[0].AddedNodeId
288
289
290 1 View Code Duplication
def create_data_type(parent, nodeid, bname, description=None):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
291
    """
292
    Create a new data type to be used in new variables, etc ..
293
    arguments are nodeid, browsename
294
    or namespace index, name
295
    """
296 1
    nodeid, qname = _parse_nodeid_qname(nodeid, bname)
297
298 1
    addnode = ua.AddNodesItem()
299 1
    addnode.RequestedNewNodeId = nodeid
300 1
    addnode.BrowseName = qname
301 1
    addnode.NodeClass = ua.NodeClass.DataType
302 1
    addnode.ParentNodeId = parent.nodeid
303 1
    addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasSubtype)
304
    #addnode.TypeDefinition = ua.NodeId(ua.ObjectIds.BaseDataVariableType) # No type definition for types
305 1
    attrs = ua.DataTypeAttributes()
306 1
    if description is None:
307 1
        attrs.Description = ua.LocalizedText(qname.Name)
308
    else:
309
        attrs.Description = ua.LocalizedText(description)
310 1
    attrs.DisplayName = ua.LocalizedText(qname.Name)
311 1
    attrs.WriteMask = 0
312 1
    attrs.UserWriteMask = 0
313 1
    attrs.IsAbstract = False  # True mean they cannot be instanciated
314 1
    addnode.NodeAttributes = attrs
315 1
    results = parent.server.add_nodes([addnode])
316 1
    results[0].StatusCode.check()
317 1
    return node.Node(parent.server, results[0].AddedNodeId)
318
319
320 1
def _create_method(parent, nodeid, qname, callback, inputs, outputs):
321 1
    addnode = ua.AddNodesItem()
322 1
    addnode.RequestedNewNodeId = nodeid
323 1
    addnode.BrowseName = qname
324 1
    addnode.NodeClass = ua.NodeClass.Method
325 1
    addnode.ParentNodeId = parent.nodeid
326 1
    addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasComponent)
327
    #node.TypeDefinition = ua.NodeId(ua.ObjectIds.BaseObjectType)
328 1
    attrs = ua.MethodAttributes()
329 1
    attrs.Description = ua.LocalizedText(qname.Name)
330 1
    attrs.DisplayName = ua.LocalizedText(qname.Name)
331 1
    attrs.WriteMask = 0
332 1
    attrs.UserWriteMask = 0
333 1
    attrs.Executable = True
334 1
    attrs.UserExecutable = True
335 1
    addnode.NodeAttributes = attrs
336 1
    results = parent.server.add_nodes([addnode])
337 1
    results[0].StatusCode.check()
338 1
    method = node.Node(parent.server, results[0].AddedNodeId)
339 1
    if inputs:
340 1
        create_property(method,
341
                        ua.NodeId(namespaceidx=method.nodeid.NamespaceIndex),
342
                        ua.QualifiedName("InputArguments", 0),
343
                        [_vtype_to_argument(vtype) for vtype in inputs],
344
                        varianttype=ua.VariantType.ExtensionObject,
345
                        datatype=ua.ObjectIds.Argument)
346 1
    if outputs:
347 1
        create_property(method,
348
                        ua.NodeId(namespaceidx=method.nodeid.NamespaceIndex),
349
                        ua.QualifiedName("OutputArguments", 0),
350
                        [_vtype_to_argument(vtype) for vtype in outputs],
351
                        varianttype=ua.VariantType.ExtensionObject,
352
                        datatype=ua.ObjectIds.Argument)
353 1
    parent.server.add_method_callback(method.nodeid, callback)
354 1
    return results[0].AddedNodeId
355
356
357 1
def _vtype_to_argument(vtype):
358 1
    if isinstance(vtype, ua.Argument):
359
        return vtype
360 1
    arg = ua.Argument()
361 1
    if isinstance(vtype, ua.VariantType):
362 1
        arg.DataType = ua.NodeId(vtype.value)
363
    else:
364
        arg.DataType = ua.NodeId(vtype)
365 1
    return arg
366
367
368 1
def _guess_datatype(variant):
369 1
    if variant.VariantType == ua.VariantType.ExtensionObject:
370 1
        if variant.Value is None:
371
            raise ua.UaError("Cannot guess DataType from Null ExtensionObject")
372 1
        if type(variant.Value) in (list, tuple):
373
            if len(variant.Value) == 0:
374
                raise ua.UaError("Cannot guess DataType from Null ExtensionObject")
375
            extobj = variant.Value[0]
376
        else:
377 1
            extobj = variant.Value
378 1
        classname = extobj.__class__.__name__
379 1
        return ua.NodeId(getattr(ua.ObjectIds, classname))
380
    else:
381 1
        return ua.NodeId(getattr(ua.ObjectIds, variant.VariantType.name))
382
383
384 1
def delete_nodes(server, nodes, recursive=False):
385
    """
386
    Delete specified nodes. Optionally delete recursively all nodes with a
387
    downward hierachic references to the node
388
    """
389 1
    nodestodelete = []
390 1
    if recursive:
391 1
        nodes += _add_childs(nodes)
392 1
    for mynode in nodes:
393 1
        it = ua.DeleteNodesItem()
394 1
        it.NodeId = mynode.nodeid
395 1
        it.DeleteTargetReferences = True
396 1
        nodestodelete.append(it)
397 1
    params = ua.DeleteNodesParameters()
398 1
    params.NodesToDelete = nodestodelete
399 1
    return server.delete_nodes(params)
400
401
402 1
def _add_childs(nodes):
403 1
    results = []
404 1
    for mynode in nodes[:]:
405 1
        results += mynode.get_children()
406 1
    return results
407
408
409