Completed
Pull Request — master (#306)
by
unknown
04:46
created

_check_browsename_exist()   A

Complexity

Conditions 4

Size

Total Lines 4
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 4
eloc 4
nop 2
dl 0
loc 4
rs 10
c 0
b 0
f 0
1
"""
2
High level functions to create nodes
3
"""
4
import logging
5
from asyncua import ua
6
from .instantiate_util import instantiate
7
from .node_factory import make_node
8
9
_logger = logging.getLogger(__name__)
10
11
12
def _parse_nodeid_qname(*args):
13
    try:
14
        if isinstance(args[0], int):
15
            nodeid = ua.NodeId(0, int(args[0]))
16
            qname = ua.QualifiedName(args[1], int(args[0]))
17
            return nodeid, qname
18
        if isinstance(args[0], ua.NodeId):
19
            nodeid = args[0]
20
        elif isinstance(args[0], str):
21
            nodeid = ua.NodeId.from_string(args[0])
22
        else:
23
            raise RuntimeError()
24
        if isinstance(args[1], ua.QualifiedName):
25
            qname = args[1]
26
        elif isinstance(args[1], str):
27
            qname = ua.QualifiedName.from_string(args[1])
28
        else:
29
            raise RuntimeError()
30
        return nodeid, qname
31
    except ua.UaError:
32
        raise
33
    except Exception as ex:
34
        raise TypeError(
35
            f"This method takes either a namespace index and a string as argument or a nodeid and a qualifiedname."
36
            f" Received arguments {args} and got exception {ex}"
37
        )
38
39
40
async def create_folder(parent, nodeid, bname):
41
    """
42
    create a child node folder
43
    arguments are nodeid, browsename
44
    or namespace index, name
45
    """
46
    try:
47
        _check_browsename_exist(parent, bname)
48
    except ua.uaerrors.BadBrowseNameDuplicated as bnameerr:
49
        _logger.error(bnameerr)
50
        return
51
    nodeid, qname = _parse_nodeid_qname(nodeid, bname)
52
    return make_node(
53
        parent.server,
54
        await _create_object(parent.server, parent.nodeid, nodeid, qname, ua.ObjectIds.FolderType)
55
    )
56
57
58
async def create_object(parent, nodeid, bname, objecttype=None):
59
    """
60
    create a child node object
61
    arguments are nodeid, browsename, [objecttype]
62
    or namespace index, name, [objecttype]
63
    if objectype is given (a NodeId) then the type node is instantiated inclusive its child nodes
64
    """
65
    try:
66
        _check_browsename_exist(parent, bname)
67
    except ua.uaerrors.BadBrowseNameDuplicated as bnameerr:
68
        _logger.error(bnameerr)
69
        return
70
    nodeid, qname = _parse_nodeid_qname(nodeid, bname)
71
    if objecttype is not None:
72
        objecttype = make_node(parent.server, objecttype)
73
        dname = ua.LocalizedText(qname.Name)
74
        nodes = await instantiate(parent, objecttype, nodeid, bname=qname, dname=dname)
75
        return nodes[0]
76
    else:
77
        return make_node(
78
            parent.server,
79
            await _create_object(parent.server, parent.nodeid, nodeid, qname, ua.ObjectIds.BaseObjectType)
80
        )
81
82
83 View Code Duplication
async def create_property(parent, nodeid, bname, val, varianttype=None, datatype=None):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
84
    """
85
    create a child node property
86
    args are nodeid, browsename, value, [variant type]
87
    or idx, name, value, [variant type]
88
    """
89
    try:
90
        _check_browsename_exist(parent, bname)
91
    except ua.uaerrors.BadBrowseNameDuplicated as bnameerr:
92
        _logger.error(bnameerr)
93
        return
94
    nodeid, qname = _parse_nodeid_qname(nodeid, bname)
95
    var = ua.Variant(val, varianttype)
96
    if datatype and isinstance(datatype, int):
97
        datatype = ua.NodeId(datatype, 0)
98
    if datatype and not isinstance(datatype, ua.NodeId):
99
        raise RuntimeError("datatype argument must be a nodeid or an int refering to a nodeid")
100
    return make_node(
101
        parent.server,
102
        await _create_variable(parent.server, parent.nodeid, nodeid, qname, var, datatype=datatype, isproperty=True)
103
    )
104
105
106 View Code Duplication
async def create_variable(parent, nodeid, bname, val, varianttype=None, datatype=None):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
107
    """
108
    create a child node variable
109
    args are nodeid, browsename, value, [variant type], [data type]
110
    or idx, name, value, [variant type], [data type]
111
    """
112
    try:
113
        _check_browsename_exist(parent, bname)
114
    except ua.uaerrors.BadBrowseNameDuplicated as bnameerr:
115
        _logger.error(bnameerr)
116
        return
117
    nodeid, qname = _parse_nodeid_qname(nodeid, bname)
118
    var = ua.Variant(val, varianttype)
119
    if datatype and isinstance(datatype, int):
120
        datatype = ua.NodeId(datatype, 0)
121
    if datatype and not isinstance(datatype, ua.NodeId):
122
        raise RuntimeError("datatype argument must be a nodeid or an int refering to a nodeid")
123
124
    return make_node(
125
        parent.server,
126
        await _create_variable(parent.server, parent.nodeid, nodeid, qname, var, datatype=datatype, isproperty=False)
127
    )
128
129
130 View Code Duplication
async def create_variable_type(parent, nodeid, bname, datatype):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
131
    """
132
    Create a new variable type
133
    args are nodeid, browsename and datatype
134
    or idx, name and data type
135
    """
136
    try:
137
        _check_browsename_exist(parent, bname)
138
    except ua.uaerrors.BadBrowseNameDuplicated as bnameerr:
139
        _logger.error(bnameerr)
140
        return
141
    nodeid, qname = _parse_nodeid_qname(nodeid, bname)
142
    if datatype and isinstance(datatype, int):
143
        datatype = ua.NodeId(datatype, 0)
144
    if datatype and not isinstance(datatype, ua.NodeId):
145
        raise RuntimeError(
146
            f"Data type argument must be a nodeid or an int refering to a nodeid, received: {datatype}")
147
    return make_node(
148
        parent.server,
149
        await _create_variable_type(parent.server, parent.nodeid, nodeid, qname, datatype)
150
    )
151
152
153
async def create_reference_type(parent, nodeid, bname, symmetric=True, inversename=None):
154
    """
155
    Create a new reference type
156
    args are nodeid and browsename
157
    or idx and name
158
    """
159
    try:
160
        _check_browsename_exist(parent, bname)
161
    except ua.uaerrors.BadBrowseNameDuplicated as bnameerr:
162
        _logger.error(bnameerr)
163
        return
164
    nodeid, qname = _parse_nodeid_qname(nodeid, bname)
165
    return make_node(
166
        parent.server,
167
        await _create_reference_type(parent.server, parent.nodeid, nodeid, qname, symmetric, inversename)
168
    )
169
170
171
async def create_object_type(parent, nodeid, bname):
172
    """
173
    Create a new object type to be instanciated in address space.
174
    arguments are nodeid, browsename
175
    or namespace index, name
176
    """
177
    try:
178
        _check_browsename_exist(parent, bname)
179
    except ua.uaerrors.BadBrowseNameDuplicated as bnameerr:
180
        _logger.error(bnameerr)
181
        return
182
    nodeid, qname = _parse_nodeid_qname(nodeid, bname)
183
    return make_node(parent.server, await _create_object_type(parent.server, parent.nodeid, nodeid, qname))
184
185
186
async def create_method(parent, *args):
187
    """
188
    create a child method object
189
    This is only possible on server side!!
190
    args are nodeid, browsename, method_to_be_called, [input argument types], [output argument types]
191
    or idx, name, method_to_be_called, [input argument types], [output argument types]
192
    if argument types is specified, child nodes advertising what arguments the method uses and returns will be created
193
    a callback is a method accepting the nodeid of the parent as first argument and variants after.
194
    returns a list of variants
195
    """
196
    _logger.info('create_method %r', parent)
197
    nodeid, qname = _parse_nodeid_qname(*args[:2])
198
    try:
199
        _check_browsename_exist(parent, qname)
200
    except ua.uaerrors.BadBrowseNameDuplicated as bnameerr:
201
        _logger.error(bnameerr)
202
        return
203
    callback = args[2]
204
    if len(args) > 3:
205
        inputs = args[3]
206
    else:
207
        inputs = []
208
    if len(args) > 4:
209
        outputs = args[4]
210
    else:
211
        outputs = []
212
    return make_node(parent.server, await _create_method(parent, nodeid, qname, callback, inputs, outputs))
213
214
215
async def _create_object(server, parentnodeid, nodeid, qname, objecttype):
216
    addnode = ua.AddNodesItem()
217
    addnode.RequestedNewNodeId = nodeid
218
    addnode.BrowseName = qname
219
    addnode.ParentNodeId = parentnodeid
220
    if await make_node(server, parentnodeid).get_type_definition() == ua.NodeId(ua.ObjectIds.FolderType):
221
        addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.Organizes)
222
    else:
223
        addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasComponent)
224
    addnode.NodeClass = ua.NodeClass.Object
225
    if isinstance(objecttype, int):
226
        addnode.TypeDefinition = ua.NodeId(objecttype)
227
    else:
228
        addnode.TypeDefinition = objecttype
229
    attrs = ua.ObjectAttributes()
230
    attrs.EventNotifier = 0
231
    attrs.Description = ua.LocalizedText(qname.Name)
232
    attrs.DisplayName = ua.LocalizedText(qname.Name)
233
    attrs.WriteMask = 0
234
    attrs.UserWriteMask = 0
235
    addnode.NodeAttributes = attrs
236
    results = await server.add_nodes([addnode])
237
    results[0].StatusCode.check()
238
    return results[0].AddedNodeId
239
240
241
async def _create_reference_type(server, parentnodeid, nodeid, qname, symmetric, inversename):
242
    addnode = ua.AddNodesItem()
243
    addnode.RequestedNewNodeId = nodeid
244
    addnode.BrowseName = qname
245
    addnode.NodeClass = ua.NodeClass.ReferenceType
246
    addnode.ParentNodeId = parentnodeid
247
    addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasSubtype)
248
    attrs = ua.ReferenceTypeAttributes()
249
    attrs.IsAbstract = False
250
    attrs.Description = ua.LocalizedText(qname.Name)
251
    attrs.DisplayName = ua.LocalizedText(qname.Name)
252
    attrs.Symmetric = symmetric
253
    attrs.InverseName = ua.LocalizedText(inversename)
254
    attrs.UserWriteMask = 0
255
    addnode.NodeAttributes = attrs
256
257
    results = await server.add_nodes([addnode])
258
    results[0].StatusCode.check()
259
    return results[0].AddedNodeId
260
261
262
async def _create_object_type(server, parentnodeid, nodeid, qname):
263
    addnode = ua.AddNodesItem()
264
    addnode.RequestedNewNodeId = nodeid
265
    addnode.BrowseName = qname
266
    addnode.ParentNodeId = parentnodeid
267
    addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasSubtype)
268
    addnode.NodeClass = ua.NodeClass.ObjectType
269
    attrs = ua.ObjectTypeAttributes()
270
    attrs.IsAbstract = False
271
    attrs.Description = ua.LocalizedText(qname.Name)
272
    attrs.DisplayName = ua.LocalizedText(qname.Name)
273
    attrs.WriteMask = 0
274
    attrs.UserWriteMask = 0
275
    addnode.NodeAttributes = attrs
276
    results = await server.add_nodes([addnode])
277
    results[0].StatusCode.check()
278
    return results[0].AddedNodeId
279
280
281
async def _create_variable(server, parentnodeid, nodeid, qname, var, datatype=None, isproperty=False):
282
    addnode = ua.AddNodesItem()
283
    addnode.RequestedNewNodeId = nodeid
284
    addnode.BrowseName = qname
285
    addnode.NodeClass = ua.NodeClass.Variable
286
    addnode.ParentNodeId = parentnodeid
287
    if isproperty:
288
        addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasProperty)
289
        addnode.TypeDefinition = ua.NodeId(ua.ObjectIds.PropertyType)
290
    else:
291
        addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasComponent)
292
        addnode.TypeDefinition = ua.NodeId(ua.ObjectIds.BaseDataVariableType)
293
    attrs = ua.VariableAttributes()
294
    attrs.Description = ua.LocalizedText(qname.Name)
295
    attrs.DisplayName = ua.LocalizedText(qname.Name)
296
    if datatype:
297
        attrs.DataType = datatype
298
    else:
299
        attrs.DataType = _guess_datatype(var)
300
301
    attrs.Value = var
302
    if not isinstance(var.Value, (list, tuple)):
303
        attrs.ValueRank = ua.ValueRank.Scalar
304
    else:
305
        if var.Dimensions:
306
            attrs.ValueRank = len(var.Dimensions)
307
            attrs.ArrayDimensions = var.Dimensions
308
    attrs.WriteMask = 0
309
    attrs.UserWriteMask = 0
310
    attrs.Historizing = False
311
    attrs.AccessLevel = ua.AccessLevel.CurrentRead.mask
312
    attrs.UserAccessLevel = ua.AccessLevel.CurrentRead.mask
313
    addnode.NodeAttributes = attrs
314
    results = await server.add_nodes([addnode])
315
    results[0].StatusCode.check()
316
    return results[0].AddedNodeId
317
318
319
async def _create_variable_type(server, parentnodeid, nodeid, qname, datatype, value=None):
320
    addnode = ua.AddNodesItem()
321
    addnode.RequestedNewNodeId = nodeid
322
    addnode.BrowseName = qname
323
    addnode.NodeClass = ua.NodeClass.VariableType
324
    addnode.ParentNodeId = parentnodeid
325
    addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasSubtype)
326
    # addnode.TypeDefinition = ua.NodeId(ua.ObjectIds.BaseDataVariableType)
327
    attrs = ua.VariableTypeAttributes()
328
    attrs.Description = ua.LocalizedText(qname.Name)
329
    attrs.DisplayName = ua.LocalizedText(qname.Name)
330
    attrs.DataType = datatype
331
    attrs.IsAbstract = False
332
    if value:
333
        attrs.Value = value
334
        if isinstance(value, (list, tuple)):
335
            attrs.ValueRank = ua.ValueRank.OneDimension
336
        else:
337
            attrs.ValueRank = ua.ValueRank.Scalar
338
    # attrs.ArrayDimensions = None
339
    attrs.WriteMask = 0
340
    attrs.UserWriteMask = 0
341
    addnode.NodeAttributes = attrs
342
    results = await server.add_nodes([addnode])
343
    results[0].StatusCode.check()
344
    return results[0].AddedNodeId
345
346
347
async def create_data_type(parent, nodeid, bname, description=None):
348
    """
349
    Create a new data type to be used in new variables, etc ..
350
    arguments are nodeid, browsename
351
    or namespace index, name
352
    """
353
    nodeid, qname = _parse_nodeid_qname(nodeid, bname)
354
    addnode = ua.AddNodesItem()
355
    addnode.RequestedNewNodeId = nodeid
356
    addnode.BrowseName = qname
357
    addnode.NodeClass = ua.NodeClass.DataType
358
    addnode.ParentNodeId = parent.nodeid
359
    addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasSubtype)
360
    # addnode.TypeDefinition = ua.NodeId(ua.ObjectIds.BaseDataVariableType) # No type definition for types
361
    attrs = ua.DataTypeAttributes()
362
    if description is None:
363
        attrs.Description = ua.LocalizedText(qname.Name)
364
    else:
365
        attrs.Description = ua.LocalizedText(description)
366
    attrs.DisplayName = ua.LocalizedText(qname.Name)
367
    attrs.WriteMask = 0
368
    attrs.UserWriteMask = 0
369
    attrs.IsAbstract = False  # True mean they cannot be instanciated
370
    addnode.NodeAttributes = attrs
371
    results = await parent.server.add_nodes([addnode])
372
    results[0].StatusCode.check()
373
    return make_node(parent.server, results[0].AddedNodeId)
374
375
376
async def _create_method(parent, nodeid, qname, callback, inputs, outputs):
377
    addnode = ua.AddNodesItem()
378
    addnode.RequestedNewNodeId = nodeid
379
    addnode.BrowseName = qname
380
    addnode.NodeClass = ua.NodeClass.Method
381
    addnode.ParentNodeId = parent.nodeid
382
    addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasComponent)
383
    # node.TypeDefinition = ua.NodeId(ua.ObjectIds.BaseObjectType)
384
    attrs = ua.MethodAttributes()
385
    attrs.Description = ua.LocalizedText(qname.Name)
386
    attrs.DisplayName = ua.LocalizedText(qname.Name)
387
    attrs.WriteMask = 0
388
    attrs.UserWriteMask = 0
389
    attrs.Executable = True
390
    attrs.UserExecutable = True
391
    addnode.NodeAttributes = attrs
392
    results = await parent.server.add_nodes([addnode])
393
    results[0].StatusCode.check()
394
    method = make_node(parent.server, results[0].AddedNodeId)
395
    if inputs:
396
        await create_property(
397
            method,
398
            ua.NodeId(namespaceidx=method.nodeid.NamespaceIndex),
399
            ua.QualifiedName("InputArguments", 0),
400
            [_vtype_to_argument(vtype) for vtype in inputs],
401
            varianttype=ua.VariantType.ExtensionObject,
402
            datatype=ua.ObjectIds.Argument
403
        )
404
    if outputs:
405
        await create_property(
406
            method,
407
            ua.NodeId(namespaceidx=method.nodeid.NamespaceIndex),
408
            ua.QualifiedName("OutputArguments", 0),
409
            [_vtype_to_argument(vtype) for vtype in outputs],
410
            varianttype=ua.VariantType.ExtensionObject,
411
            datatype=ua.ObjectIds.Argument
412
        )
413
    if hasattr(parent.server, "add_method_callback"):
414
        parent.server.add_method_callback(method.nodeid, callback)
415
    return results[0].AddedNodeId
416
417
418
def _vtype_to_argument(vtype):
419
    if isinstance(vtype, ua.Argument):
420
        return vtype
421
    arg = ua.Argument()
422
    if isinstance(vtype, ua.VariantType):
423
        arg.DataType = ua.NodeId(vtype.value)
424
    else:
425
        arg.DataType = ua.NodeId(vtype)
426
    return arg
427
428
429
def _guess_datatype(variant):
430
    if variant.VariantType == ua.VariantType.ExtensionObject:
431
        if variant.Value is None:
432
            raise ua.UaError("Cannot guess DataType from Null ExtensionObject")
433
        if type(variant.Value) in (list, tuple):
434
            if len(variant.Value) == 0:
435
                raise ua.UaError("Cannot guess DataType from Null ExtensionObject")
436
            extobj = variant.Value[0]
437
        else:
438
            extobj = variant.Value
439
        classname = extobj.__class__.__name__
440
        if not hasattr(ua.ObjectIds, classname):
441
            raise ua.UaError(f"Cannot guess DataType of {variant} of python type {type(variant)}")
442
        return ua.NodeId(getattr(ua.ObjectIds, classname))
443
    else:
444
        return ua.NodeId(getattr(ua.ObjectIds, variant.VariantType.name))
445
446
447
async def delete_nodes(server, nodes, recursive=False, delete_target_references=True):
448
    """
449
    Delete specified nodes. Optionally delete recursively all nodes with a
450
    downward hierachic references to the node
451
    return the list of deleted node and the result
452
    """
453
    nodestodelete = []
454
    if recursive:
455
        nodes = await _add_childs(nodes)
456
    for mynode in nodes:
457
        it = ua.DeleteNodesItem()
458
        it.NodeId = mynode.nodeid
459
        it.DeleteTargetReferences = delete_target_references
460
        nodestodelete.append(it)
461
    params = ua.DeleteNodesParameters()
462
    params.NodesToDelete = nodestodelete
463
    return nodes, await server.delete_nodes(params)
464
465
466
async def _add_childs(nodes):
467
    results = []
468
    for mynode in nodes:
469
        results += await _add_childs(await mynode.get_children())
470
        results += [mynode]
471
    return results
472
473
474
def _check_browsename_exist(parent, bname):
475
    for ref in parent.server.aspace._nodes[parent.nodeid].references:
476
        if bname == ref.BrowseName.Name and (ref.ReferenceTypeId.Identifier in [ua.ObjectIds.HasChild, ua.ObjectIds.HasComponent, ua.ObjectIds.HasProperty, ua.ObjectIds.HasSubtype, ua.ObjectIds.HasOrderedComponent]):
477
            raise ua.uaerrors.BadBrowseNameDuplicated
478