asyncua.common.manage_nodes._create_object()   A
last analyzed

Complexity

Conditions 3

Size

Total Lines 24
Code Lines 22

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 22
nop 5
dl 0
loc 24
rs 9.352
c 0
b 0
f 0
1
"""
2
High level functions to create nodes
3
"""
4
import logging
5
from asyncua import ua
6
from .instantiate_util import instantiate
7
from .node_factory import make_node
8
9
_logger = logging.getLogger(__name__)
10
11
12
def _parse_nodeid_qname(*args):
13
    try:
14
        if isinstance(args[0], int):
15
            nodeid = ua.NodeId(0, int(args[0]))
16
            qname = ua.QualifiedName(args[1], int(args[0]))
17
            return nodeid, qname
18
        if isinstance(args[0], ua.NodeId):
19
            nodeid = args[0]
20
        elif isinstance(args[0], str):
21
            nodeid = ua.NodeId.from_string(args[0])
22
        else:
23
            raise RuntimeError()
24
        if isinstance(args[1], ua.QualifiedName):
25
            qname = args[1]
26
        elif isinstance(args[1], str):
27
            qname = ua.QualifiedName.from_string(args[1])
28
        else:
29
            raise RuntimeError()
30
        return nodeid, qname
31
    except ua.UaError:
32
        raise
33
    except Exception as ex:
34
        raise TypeError(
35
            f"This method takes either a namespace index and a string as argument or a nodeid and a qualifiedname."
36
            f" Received arguments {args} and got exception {ex}"
37
        )
38
39
40
async def create_folder(parent, nodeid, bname):
41
    """
42
    create a child node folder
43
    arguments are nodeid, browsename
44
    or namespace index, name
45
    """
46
    nodeid, qname = _parse_nodeid_qname(nodeid, bname)
47
    return make_node(
48
        parent.server,
49
        await _create_object(parent.server, parent.nodeid, nodeid, qname, ua.ObjectIds.FolderType)
50
    )
51
52
53
async def create_object(parent, nodeid, bname, objecttype=None, instantiate_optional=True):
54
    """
55
    create a child node object
56
    arguments are nodeid, browsename, [objecttype]
57
    or namespace index, name, [objecttype]
58
    if objectype is given (a NodeId) then the type node is instantiated inclusive its child nodes
59
    """
60
    nodeid, qname = _parse_nodeid_qname(nodeid, bname)
61
    if objecttype is not None:
62
        objecttype = make_node(parent.server, objecttype)
63
        dname = ua.LocalizedText(qname.Name)
64
        nodes = await instantiate(parent, objecttype, nodeid, bname=qname, dname=dname, instantiate_optional=instantiate_optional)
65
        return nodes[0]
66
    else:
67
        return make_node(
68
            parent.server,
69
            await _create_object(parent.server, parent.nodeid, nodeid, qname, ua.ObjectIds.BaseObjectType)
70
        )
71
72
73 View Code Duplication
async def create_property(parent, nodeid, bname, val, varianttype=None, datatype=None):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
74
    """
75
    create a child node property
76
    args are nodeid, browsename, value, [variant type]
77
    or idx, name, value, [variant type]
78
    """
79
    nodeid, qname = _parse_nodeid_qname(nodeid, bname)
80
    var = ua.Variant(val, varianttype)
81
    if datatype and isinstance(datatype, int):
82
        datatype = ua.NodeId(datatype, 0)
83
    if datatype and not isinstance(datatype, ua.NodeId):
84
        raise RuntimeError("datatype argument must be a nodeid or an int refering to a nodeid")
85
    return make_node(
86
        parent.server,
87
        await _create_variable(parent.server, parent.nodeid, nodeid, qname, var, datatype=datatype, isproperty=True)
88
    )
89
90
91 View Code Duplication
async def create_variable(parent, nodeid, bname, val, varianttype=None, datatype=None):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
92
    """
93
    create a child node variable
94
    args are nodeid, browsename, value, [variant type], [data type]
95
    or idx, name, value, [variant type], [data type]
96
    """
97
    nodeid, qname = _parse_nodeid_qname(nodeid, bname)
98
    var = ua.Variant(val, varianttype)
99
    if datatype and isinstance(datatype, int):
100
        datatype = ua.NodeId(datatype, 0)
101
    if datatype and not isinstance(datatype, ua.NodeId):
102
        raise RuntimeError("datatype argument must be a nodeid or an int refering to a nodeid")
103
104
    return make_node(
105
        parent.server,
106
        await _create_variable(parent.server, parent.nodeid, nodeid, qname, var, datatype=datatype, isproperty=False)
107
    )
108
109
110
async def create_variable_type(parent, nodeid, bname, datatype):
111
    """
112
    Create a new variable type
113
    args are nodeid, browsename and datatype
114
    or idx, name and data type
115
    """
116
    nodeid, qname = _parse_nodeid_qname(nodeid, bname)
117
    if datatype and isinstance(datatype, int):
118
        datatype = ua.NodeId(datatype, 0)
119
    if datatype and not isinstance(datatype, ua.NodeId):
120
        raise RuntimeError(
121
            f"Data type argument must be a nodeid or an int refering to a nodeid, received: {datatype}")
122
    return make_node(
123
        parent.server,
124
        await _create_variable_type(parent.server, parent.nodeid, nodeid, qname, datatype)
125
    )
126
127
128
async def create_reference_type(parent, nodeid, bname, symmetric=True, inversename=None):
129
    """
130
    Create a new reference type
131
    args are nodeid and browsename
132
    or idx and name
133
    """
134
    nodeid, qname = _parse_nodeid_qname(nodeid, bname)
135
    return make_node(
136
        parent.server,
137
        await _create_reference_type(parent.server, parent.nodeid, nodeid, qname, symmetric, inversename)
138
    )
139
140
141
async def create_object_type(parent, nodeid, bname):
142
    """
143
    Create a new object type to be instanciated in address space.
144
    arguments are nodeid, browsename
145
    or namespace index, name
146
    """
147
    nodeid, qname = _parse_nodeid_qname(nodeid, bname)
148
    return make_node(parent.server, await _create_object_type(parent.server, parent.nodeid, nodeid, qname))
149
150
151
async def create_method(parent, *args):
152
    """
153
    create a child method object
154
    This is only possible on server side!!
155
    args are nodeid, browsename, method_to_be_called, [input argument types], [output argument types]
156
    or idx, name, method_to_be_called, [input argument types], [output argument types]
157
    if argument types is specified, child nodes advertising what arguments the method uses and returns will be created
158
    a callback is a method accepting the nodeid of the parent as first argument and variants after.
159
    returns a list of variants
160
    """
161
    _logger.info('create_method %r', parent)
162
    nodeid, qname = _parse_nodeid_qname(*args[:2])
163
    callback = args[2]
164
    if len(args) > 3:
165
        inputs = args[3]
166
    else:
167
        inputs = []
168
    if len(args) > 4:
169
        outputs = args[4]
170
    else:
171
        outputs = []
172
    return make_node(parent.server, await _create_method(parent, nodeid, qname, callback, inputs, outputs))
173
174
175
async def _create_object(server, parentnodeid, nodeid, qname, objecttype):
176
    addnode = ua.AddNodesItem()
177
    addnode.RequestedNewNodeId = nodeid
178
    addnode.BrowseName = qname
179
    addnode.ParentNodeId = parentnodeid
180
    if await make_node(server, parentnodeid).get_type_definition() == ua.NodeId(ua.ObjectIds.FolderType):
181
        addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.Organizes)
182
    else:
183
        addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasComponent)
184
    addnode.NodeClass = ua.NodeClass.Object
185
    if isinstance(objecttype, int):
186
        addnode.TypeDefinition = ua.NodeId(objecttype)
187
    else:
188
        addnode.TypeDefinition = objecttype
189
    attrs = ua.ObjectAttributes()
190
    attrs.EventNotifier = 0
191
    attrs.Description = ua.LocalizedText(qname.Name)
192
    attrs.DisplayName = ua.LocalizedText(qname.Name)
193
    attrs.WriteMask = 0
194
    attrs.UserWriteMask = 0
195
    addnode.NodeAttributes = attrs
196
    results = await server.add_nodes([addnode])
197
    results[0].StatusCode.check()
198
    return results[0].AddedNodeId
199
200
201
async def _create_reference_type(server, parentnodeid, nodeid, qname, symmetric, inversename):
202
    addnode = ua.AddNodesItem()
203
    addnode.RequestedNewNodeId = nodeid
204
    addnode.BrowseName = qname
205
    addnode.NodeClass = ua.NodeClass.ReferenceType
206
    addnode.ParentNodeId = parentnodeid
207
    addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasSubtype)
208
    attrs = ua.ReferenceTypeAttributes()
209
    attrs.IsAbstract = False
210
    attrs.Description = ua.LocalizedText(qname.Name)
211
    attrs.DisplayName = ua.LocalizedText(qname.Name)
212
    attrs.Symmetric = symmetric
213
    attrs.InverseName = ua.LocalizedText(inversename)
214
    attrs.UserWriteMask = 0
215
    addnode.NodeAttributes = attrs
216
217
    results = await server.add_nodes([addnode])
218
    results[0].StatusCode.check()
219
    return results[0].AddedNodeId
220
221
222
async def _create_object_type(server, parentnodeid, nodeid, qname):
223
    addnode = ua.AddNodesItem()
224
    addnode.RequestedNewNodeId = nodeid
225
    addnode.BrowseName = qname
226
    addnode.ParentNodeId = parentnodeid
227
    addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasSubtype)
228
    addnode.NodeClass = ua.NodeClass.ObjectType
229
    attrs = ua.ObjectTypeAttributes()
230
    attrs.IsAbstract = False
231
    attrs.Description = ua.LocalizedText(qname.Name)
232
    attrs.DisplayName = ua.LocalizedText(qname.Name)
233
    attrs.WriteMask = 0
234
    attrs.UserWriteMask = 0
235
    addnode.NodeAttributes = attrs
236
    results = await server.add_nodes([addnode])
237
    results[0].StatusCode.check()
238
    return results[0].AddedNodeId
239
240
241
async def _create_variable(server, parentnodeid, nodeid, qname, var, datatype=None, isproperty=False):
242
    addnode = ua.AddNodesItem()
243
    addnode.RequestedNewNodeId = nodeid
244
    addnode.BrowseName = qname
245
    addnode.NodeClass = ua.NodeClass.Variable
246
    addnode.ParentNodeId = parentnodeid
247
    if isproperty:
248
        addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasProperty)
249
        addnode.TypeDefinition = ua.NodeId(ua.ObjectIds.PropertyType)
250
    else:
251
        addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasComponent)
252
        addnode.TypeDefinition = ua.NodeId(ua.ObjectIds.BaseDataVariableType)
253
    attrs = ua.VariableAttributes()
254
    attrs.Description = ua.LocalizedText(qname.Name)
255
    attrs.DisplayName = ua.LocalizedText(qname.Name)
256
    if datatype:
257
        attrs.DataType = datatype
258
    else:
259
        attrs.DataType = _guess_datatype(var)
260
261
    attrs.Value = var
262
    if not isinstance(var.Value, (list, tuple)):
263
        attrs.ValueRank = ua.ValueRank.Scalar
264
    else:
265
        if var.Dimensions:
266
            attrs.ValueRank = len(var.Dimensions)
267
            attrs.ArrayDimensions = var.Dimensions
268
    attrs.WriteMask = 0
269
    attrs.UserWriteMask = 0
270
    attrs.Historizing = False
271
    attrs.AccessLevel = ua.AccessLevel.CurrentRead.mask
272
    attrs.UserAccessLevel = ua.AccessLevel.CurrentRead.mask
273
    addnode.NodeAttributes = attrs
274
    results = await server.add_nodes([addnode])
275
    results[0].StatusCode.check()
276
    return results[0].AddedNodeId
277
278
279
async def _create_variable_type(server, parentnodeid, nodeid, qname, datatype, value=None):
280
    addnode = ua.AddNodesItem()
281
    addnode.RequestedNewNodeId = nodeid
282
    addnode.BrowseName = qname
283
    addnode.NodeClass = ua.NodeClass.VariableType
284
    addnode.ParentNodeId = parentnodeid
285
    addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasSubtype)
286
    # addnode.TypeDefinition = ua.NodeId(ua.ObjectIds.BaseDataVariableType)
287
    attrs = ua.VariableTypeAttributes()
288
    attrs.Description = ua.LocalizedText(qname.Name)
289
    attrs.DisplayName = ua.LocalizedText(qname.Name)
290
    attrs.DataType = datatype
291
    attrs.IsAbstract = False
292
    if value:
293
        attrs.Value = value
294
        if isinstance(value, (list, tuple)):
295
            attrs.ValueRank = ua.ValueRank.OneDimension
296
        else:
297
            attrs.ValueRank = ua.ValueRank.Scalar
298
    # attrs.ArrayDimensions = None
299
    attrs.WriteMask = 0
300
    attrs.UserWriteMask = 0
301
    addnode.NodeAttributes = attrs
302
    results = await server.add_nodes([addnode])
303
    results[0].StatusCode.check()
304
    return results[0].AddedNodeId
305
306
307
async def create_data_type(parent, nodeid, bname, description=None):
308
    """
309
    Create a new data type to be used in new variables, etc ..
310
    arguments are nodeid, browsename
311
    or namespace index, name
312
    """
313
    nodeid, qname = _parse_nodeid_qname(nodeid, bname)
314
    addnode = ua.AddNodesItem()
315
    addnode.RequestedNewNodeId = nodeid
316
    addnode.BrowseName = qname
317
    addnode.NodeClass = ua.NodeClass.DataType
318
    addnode.ParentNodeId = parent.nodeid
319
    addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasSubtype)
320
    # addnode.TypeDefinition = ua.NodeId(ua.ObjectIds.BaseDataVariableType) # No type definition for types
321
    attrs = ua.DataTypeAttributes()
322
    if description is None:
323
        attrs.Description = ua.LocalizedText(qname.Name)
324
    else:
325
        attrs.Description = ua.LocalizedText(description)
326
    attrs.DisplayName = ua.LocalizedText(qname.Name)
327
    attrs.WriteMask = 0
328
    attrs.UserWriteMask = 0
329
    attrs.IsAbstract = False  # True mean they cannot be instanciated
330
    addnode.NodeAttributes = attrs
331
    results = await parent.server.add_nodes([addnode])
332
    results[0].StatusCode.check()
333
    return make_node(parent.server, results[0].AddedNodeId)
334
335
336
async def _create_method(parent, nodeid, qname, callback, inputs, outputs):
337
    addnode = ua.AddNodesItem()
338
    addnode.RequestedNewNodeId = nodeid
339
    addnode.BrowseName = qname
340
    addnode.NodeClass = ua.NodeClass.Method
341
    addnode.ParentNodeId = parent.nodeid
342
    addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasComponent)
343
    # node.TypeDefinition = ua.NodeId(ua.ObjectIds.BaseObjectType)
344
    attrs = ua.MethodAttributes()
345
    attrs.Description = ua.LocalizedText(qname.Name)
346
    attrs.DisplayName = ua.LocalizedText(qname.Name)
347
    attrs.WriteMask = 0
348
    attrs.UserWriteMask = 0
349
    attrs.Executable = True
350
    attrs.UserExecutable = True
351
    addnode.NodeAttributes = attrs
352
    results = await parent.server.add_nodes([addnode])
353
    results[0].StatusCode.check()
354
    method = make_node(parent.server, results[0].AddedNodeId)
355
    if inputs:
356
        await create_property(
357
            method,
358
            ua.NodeId(namespaceidx=method.nodeid.NamespaceIndex),
359
            ua.QualifiedName("InputArguments", 0),
360
            [_vtype_to_argument(vtype) for vtype in inputs],
361
            varianttype=ua.VariantType.ExtensionObject,
362
            datatype=ua.ObjectIds.Argument
363
        )
364
    if outputs:
365
        await create_property(
366
            method,
367
            ua.NodeId(namespaceidx=method.nodeid.NamespaceIndex),
368
            ua.QualifiedName("OutputArguments", 0),
369
            [_vtype_to_argument(vtype) for vtype in outputs],
370
            varianttype=ua.VariantType.ExtensionObject,
371
            datatype=ua.ObjectIds.Argument
372
        )
373
    if hasattr(parent.server, "add_method_callback"):
374
        parent.server.add_method_callback(method.nodeid, callback)
375
    return results[0].AddedNodeId
376
377
378
def _vtype_to_argument(vtype):
379
    if isinstance(vtype, ua.Argument):
380
        return vtype
381
    arg = ua.Argument()
382
    if isinstance(vtype, ua.VariantType):
383
        arg.DataType = ua.NodeId(vtype.value)
384
    else:
385
        arg.DataType = ua.NodeId(vtype)
386
    return arg
387
388
389
def _guess_datatype(variant):
390
    if variant.VariantType == ua.VariantType.ExtensionObject:
391
        if variant.Value is None:
392
            raise ua.UaError("Cannot guess DataType from Null ExtensionObject")
393
        if type(variant.Value) in (list, tuple):
394
            if len(variant.Value) == 0:
395
                raise ua.UaError("Cannot guess DataType from Null ExtensionObject")
396
            extobj = variant.Value[0]
397
        else:
398
            extobj = variant.Value
399
        classname = extobj.__class__.__name__
400
        if not hasattr(ua.ObjectIds, classname):
401
            raise ua.UaError(f"Cannot guess DataType of {variant} of python type {type(variant)}")
402
        return ua.NodeId(getattr(ua.ObjectIds, classname))
403
    else:
404
        return ua.NodeId(getattr(ua.ObjectIds, variant.VariantType.name))
405
406
407
async def delete_nodes(server, nodes, recursive=False, delete_target_references=True):
408
    """
409
    Delete specified nodes. Optionally delete recursively all nodes with a
410
    downward hierachic references to the node
411
    return the list of deleted node and the result
412
    """
413
    nodestodelete = []
414
    if recursive:
415
        nodes = await _add_childs(nodes)
416
    for mynode in nodes:
417
        it = ua.DeleteNodesItem()
418
        it.NodeId = mynode.nodeid
419
        it.DeleteTargetReferences = delete_target_references
420
        nodestodelete.append(it)
421
    params = ua.DeleteNodesParameters()
422
    params.NodesToDelete = nodestodelete
423
    return nodes, await server.delete_nodes(params)
424
425
426
async def _add_childs(nodes):
427
    results = []
428
    for mynode in nodes:
429
        results += await _add_childs(await mynode.get_children())
430
        results += [mynode]
431
    return results
432