Test Failed
Pull Request — master (#306)
by
unknown
02:37
created

_check_browsename_exist()   A

Complexity

Conditions 4

Size

Total Lines 5
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 4
eloc 5
nop 2
dl 0
loc 5
rs 10
c 0
b 0
f 0
1
"""
2
High level functions to create nodes
3
"""
4
import logging
5
from asyncua import ua
6
from .instantiate_util import instantiate
7
from .node_factory import make_node
8
9
_logger = logging.getLogger(__name__)
10
11
12
def _parse_nodeid_qname(*args):
13
    try:
14
        if isinstance(args[0], int):
15
            nodeid = ua.NodeId(0, int(args[0]))
16
            qname = ua.QualifiedName(args[1], int(args[0]))
17
            return nodeid, qname
18
        if isinstance(args[0], ua.NodeId):
19
            nodeid = args[0]
20
        elif isinstance(args[0], str):
21
            nodeid = ua.NodeId.from_string(args[0])
22
        else:
23
            raise RuntimeError()
24
        if isinstance(args[1], ua.QualifiedName):
25
            qname = args[1]
26
        elif isinstance(args[1], str):
27
            qname = ua.QualifiedName.from_string(args[1])
28
        else:
29
            raise RuntimeError()
30
        return nodeid, qname
31
    except ua.UaError:
32
        raise
33
    except Exception as ex:
34
        raise TypeError(
35
            f"This method takes either a namespace index and a string as argument or a nodeid and a qualifiedname."
36
            f" Received arguments {args} and got exception {ex}"
37
        )
38
39
40
async def create_folder(parent, nodeid, bname):
41
    """
42
    create a child node folder
43
    arguments are nodeid, browsename
44
    or namespace index, name
45
    """
46
    status = _check_browsename_exist(parent, bname)
47
    if status is not None:
48
        return status
49
    nodeid, qname = _parse_nodeid_qname(nodeid, bname)
50
    return make_node(
51
        parent.server,
52
        await _create_object(parent.server, parent.nodeid, nodeid, qname, ua.ObjectIds.FolderType)
53
    )
54
55
56
async def create_object(parent, nodeid, bname, objecttype=None):
57
    """
58
    create a child node object
59
    arguments are nodeid, browsename, [objecttype]
60
    or namespace index, name, [objecttype]
61
    if objectype is given (a NodeId) then the type node is instantiated inclusive its child nodes
62
    """
63
    status = _check_browsename_exist(parent, bname)
64
    if status is not None:
65
        return status
66
    nodeid, qname = _parse_nodeid_qname(nodeid, bname)
67
    if objecttype is not None:
68
        objecttype = make_node(parent.server, objecttype)
69
        dname = ua.LocalizedText(qname.Name)
70
        nodes = await instantiate(parent, objecttype, nodeid, bname=qname, dname=dname)
71
        return nodes[0]
72
    else:
73
        return make_node(
74
            parent.server,
75
            await _create_object(parent.server, parent.nodeid, nodeid, qname, ua.ObjectIds.BaseObjectType)
76
        )
77
78
79 View Code Duplication
async def create_property(parent, nodeid, bname, val, varianttype=None, datatype=None):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
80
    """
81
    create a child node property
82
    args are nodeid, browsename, value, [variant type]
83
    or idx, name, value, [variant type]
84
    """
85
    status = _check_browsename_exist(parent, bname)
86
    if status is not None:
87
        return status
88
    nodeid, qname = _parse_nodeid_qname(nodeid, bname)
89
    var = ua.Variant(val, varianttype)
90
    if datatype and isinstance(datatype, int):
91
        datatype = ua.NodeId(datatype, 0)
92
    if datatype and not isinstance(datatype, ua.NodeId):
93
        raise RuntimeError("datatype argument must be a nodeid or an int refering to a nodeid")
94
    return make_node(
95
        parent.server,
96
        await _create_variable(parent.server, parent.nodeid, nodeid, qname, var, datatype=datatype, isproperty=True)
97
    )
98
99
100 View Code Duplication
async def create_variable(parent, nodeid, bname, val, varianttype=None, datatype=None):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
101
    """
102
    create a child node variable
103
    args are nodeid, browsename, value, [variant type], [data type]
104
    or idx, name, value, [variant type], [data type]
105
    """
106
    status = _check_browsename_exist(parent, bname)
107
    if status is not None:
108
        return status
109
    nodeid, qname = _parse_nodeid_qname(nodeid, bname)
110
    var = ua.Variant(val, varianttype)
111
    if datatype and isinstance(datatype, int):
112
        datatype = ua.NodeId(datatype, 0)
113
    if datatype and not isinstance(datatype, ua.NodeId):
114
        raise RuntimeError("datatype argument must be a nodeid or an int refering to a nodeid")
115
116
    return make_node(
117
        parent.server,
118
        await _create_variable(parent.server, parent.nodeid, nodeid, qname, var, datatype=datatype, isproperty=False)
119
    )
120
121
122
async def create_variable_type(parent, nodeid, bname, datatype):
123
    """
124
    Create a new variable type
125
    args are nodeid, browsename and datatype
126
    or idx, name and data type
127
    """
128
    status = _check_browsename_exist(parent, bname)
129
    if status is not None:
130
        return status
131
    nodeid, qname = _parse_nodeid_qname(nodeid, bname)
132
    if datatype and isinstance(datatype, int):
133
        datatype = ua.NodeId(datatype, 0)
134
    if datatype and not isinstance(datatype, ua.NodeId):
135
        raise RuntimeError(
136
            f"Data type argument must be a nodeid or an int refering to a nodeid, received: {datatype}")
137
    return make_node(
138
        parent.server,
139
        await _create_variable_type(parent.server, parent.nodeid, nodeid, qname, datatype)
140
    )
141
142
143
async def create_reference_type(parent, nodeid, bname, symmetric=True, inversename=None):
144
    """
145
    Create a new reference type
146
    args are nodeid and browsename
147
    or idx and name
148
    """
149
    status = _check_browsename_exist(parent, bname)
150
    if status is not None:
151
        return status
152
    nodeid, qname = _parse_nodeid_qname(nodeid, bname)
153
    return make_node(
154
        parent.server,
155
        await _create_reference_type(parent.server, parent.nodeid, nodeid, qname, symmetric, inversename)
156
    )
157
158
159
async def create_object_type(parent, nodeid, bname):
160
    """
161
    Create a new object type to be instanciated in address space.
162
    arguments are nodeid, browsename
163
    or namespace index, name
164
    """
165
    status = _check_browsename_exist(parent, bname)
166
    if status is not None:
167
        return status
168
    nodeid, qname = _parse_nodeid_qname(nodeid, bname)
169
    return make_node(parent.server, await _create_object_type(parent.server, parent.nodeid, nodeid, qname))
170
171
172
async def create_method(parent, *args):
173
    """
174
    create a child method object
175
    This is only possible on server side!!
176
    args are nodeid, browsename, method_to_be_called, [input argument types], [output argument types]
177
    or idx, name, method_to_be_called, [input argument types], [output argument types]
178
    if argument types is specified, child nodes advertising what arguments the method uses and returns will be created
179
    a callback is a method accepting the nodeid of the parent as first argument and variants after.
180
    returns a list of variants
181
    """
182
    _logger.info('create_method %r', parent)
183
    nodeid, qname = _parse_nodeid_qname(*args[:2])
184
    status = _check_browsename_exist(parent, qname)
185
    if status is not None:
186
        return status
187
    callback = args[2]
188
    if len(args) > 3:
189
        inputs = args[3]
190
    else:
191
        inputs = []
192
    if len(args) > 4:
193
        outputs = args[4]
194
    else:
195
        outputs = []
196
    return make_node(parent.server, await _create_method(parent, nodeid, qname, callback, inputs, outputs))
197
198
199
async def _create_object(server, parentnodeid, nodeid, qname, objecttype):
200
    addnode = ua.AddNodesItem()
201
    addnode.RequestedNewNodeId = nodeid
202
    addnode.BrowseName = qname
203
    addnode.ParentNodeId = parentnodeid
204
    if await make_node(server, parentnodeid).get_type_definition() == ua.NodeId(ua.ObjectIds.FolderType):
205
        addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.Organizes)
206
    else:
207
        addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasComponent)
208
    addnode.NodeClass = ua.NodeClass.Object
209
    if isinstance(objecttype, int):
210
        addnode.TypeDefinition = ua.NodeId(objecttype)
211
    else:
212
        addnode.TypeDefinition = objecttype
213
    attrs = ua.ObjectAttributes()
214
    attrs.EventNotifier = 0
215
    attrs.Description = ua.LocalizedText(qname.Name)
216
    attrs.DisplayName = ua.LocalizedText(qname.Name)
217
    attrs.WriteMask = 0
218
    attrs.UserWriteMask = 0
219
    addnode.NodeAttributes = attrs
220
    results = await server.add_nodes([addnode])
221
    results[0].StatusCode.check()
222
    return results[0].AddedNodeId
223
224
225
async def _create_reference_type(server, parentnodeid, nodeid, qname, symmetric, inversename):
226
    addnode = ua.AddNodesItem()
227
    addnode.RequestedNewNodeId = nodeid
228
    addnode.BrowseName = qname
229
    addnode.NodeClass = ua.NodeClass.ReferenceType
230
    addnode.ParentNodeId = parentnodeid
231
    addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasSubtype)
232
    attrs = ua.ReferenceTypeAttributes()
233
    attrs.IsAbstract = False
234
    attrs.Description = ua.LocalizedText(qname.Name)
235
    attrs.DisplayName = ua.LocalizedText(qname.Name)
236
    attrs.Symmetric = symmetric
237
    attrs.InverseName = ua.LocalizedText(inversename)
238
    attrs.UserWriteMask = 0
239
    addnode.NodeAttributes = attrs
240
241
    results = await server.add_nodes([addnode])
242
    results[0].StatusCode.check()
243
    return results[0].AddedNodeId
244
245
246
async def _create_object_type(server, parentnodeid, nodeid, qname):
247
    addnode = ua.AddNodesItem()
248
    addnode.RequestedNewNodeId = nodeid
249
    addnode.BrowseName = qname
250
    addnode.ParentNodeId = parentnodeid
251
    addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasSubtype)
252
    addnode.NodeClass = ua.NodeClass.ObjectType
253
    attrs = ua.ObjectTypeAttributes()
254
    attrs.IsAbstract = False
255
    attrs.Description = ua.LocalizedText(qname.Name)
256
    attrs.DisplayName = ua.LocalizedText(qname.Name)
257
    attrs.WriteMask = 0
258
    attrs.UserWriteMask = 0
259
    addnode.NodeAttributes = attrs
260
    results = await server.add_nodes([addnode])
261
    results[0].StatusCode.check()
262
    return results[0].AddedNodeId
263
264
265
async def _create_variable(server, parentnodeid, nodeid, qname, var, datatype=None, isproperty=False):
266
    addnode = ua.AddNodesItem()
267
    addnode.RequestedNewNodeId = nodeid
268
    addnode.BrowseName = qname
269
    addnode.NodeClass = ua.NodeClass.Variable
270
    addnode.ParentNodeId = parentnodeid
271
    if isproperty:
272
        addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasProperty)
273
        addnode.TypeDefinition = ua.NodeId(ua.ObjectIds.PropertyType)
274
    else:
275
        addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasComponent)
276
        addnode.TypeDefinition = ua.NodeId(ua.ObjectIds.BaseDataVariableType)
277
    attrs = ua.VariableAttributes()
278
    attrs.Description = ua.LocalizedText(qname.Name)
279
    attrs.DisplayName = ua.LocalizedText(qname.Name)
280
    if datatype:
281
        attrs.DataType = datatype
282
    else:
283
        attrs.DataType = _guess_datatype(var)
284
285
    attrs.Value = var
286
    if not isinstance(var.Value, (list, tuple)):
287
        attrs.ValueRank = ua.ValueRank.Scalar
288
    else:
289
        if var.Dimensions:
290
            attrs.ValueRank = len(var.Dimensions)
291
            attrs.ArrayDimensions = var.Dimensions
292
    attrs.WriteMask = 0
293
    attrs.UserWriteMask = 0
294
    attrs.Historizing = False
295
    attrs.AccessLevel = ua.AccessLevel.CurrentRead.mask
296
    attrs.UserAccessLevel = ua.AccessLevel.CurrentRead.mask
297
    addnode.NodeAttributes = attrs
298
    results = await server.add_nodes([addnode])
299
    results[0].StatusCode.check()
300
    return results[0].AddedNodeId
301
302
303
async def _create_variable_type(server, parentnodeid, nodeid, qname, datatype, value=None):
304
    addnode = ua.AddNodesItem()
305
    addnode.RequestedNewNodeId = nodeid
306
    addnode.BrowseName = qname
307
    addnode.NodeClass = ua.NodeClass.VariableType
308
    addnode.ParentNodeId = parentnodeid
309
    addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasSubtype)
310
    # addnode.TypeDefinition = ua.NodeId(ua.ObjectIds.BaseDataVariableType)
311
    attrs = ua.VariableTypeAttributes()
312
    attrs.Description = ua.LocalizedText(qname.Name)
313
    attrs.DisplayName = ua.LocalizedText(qname.Name)
314
    attrs.DataType = datatype
315
    attrs.IsAbstract = False
316
    if value:
317
        attrs.Value = value
318
        if isinstance(value, (list, tuple)):
319
            attrs.ValueRank = ua.ValueRank.OneDimension
320
        else:
321
            attrs.ValueRank = ua.ValueRank.Scalar
322
    # attrs.ArrayDimensions = None
323
    attrs.WriteMask = 0
324
    attrs.UserWriteMask = 0
325
    addnode.NodeAttributes = attrs
326
    results = await server.add_nodes([addnode])
327
    results[0].StatusCode.check()
328
    return results[0].AddedNodeId
329
330
331
async def create_data_type(parent, nodeid, bname, description=None):
332
    """
333
    Create a new data type to be used in new variables, etc ..
334
    arguments are nodeid, browsename
335
    or namespace index, name
336
    """
337
    nodeid, qname = _parse_nodeid_qname(nodeid, bname)
338
    addnode = ua.AddNodesItem()
339
    addnode.RequestedNewNodeId = nodeid
340
    addnode.BrowseName = qname
341
    addnode.NodeClass = ua.NodeClass.DataType
342
    addnode.ParentNodeId = parent.nodeid
343
    addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasSubtype)
344
    # addnode.TypeDefinition = ua.NodeId(ua.ObjectIds.BaseDataVariableType) # No type definition for types
345
    attrs = ua.DataTypeAttributes()
346
    if description is None:
347
        attrs.Description = ua.LocalizedText(qname.Name)
348
    else:
349
        attrs.Description = ua.LocalizedText(description)
350
    attrs.DisplayName = ua.LocalizedText(qname.Name)
351
    attrs.WriteMask = 0
352
    attrs.UserWriteMask = 0
353
    attrs.IsAbstract = False  # True mean they cannot be instanciated
354
    addnode.NodeAttributes = attrs
355
    results = await parent.server.add_nodes([addnode])
356
    results[0].StatusCode.check()
357
    return make_node(parent.server, results[0].AddedNodeId)
358
359
360
async def _create_method(parent, nodeid, qname, callback, inputs, outputs):
361
    addnode = ua.AddNodesItem()
362
    addnode.RequestedNewNodeId = nodeid
363
    addnode.BrowseName = qname
364
    addnode.NodeClass = ua.NodeClass.Method
365
    addnode.ParentNodeId = parent.nodeid
366
    addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasComponent)
367
    # node.TypeDefinition = ua.NodeId(ua.ObjectIds.BaseObjectType)
368
    attrs = ua.MethodAttributes()
369
    attrs.Description = ua.LocalizedText(qname.Name)
370
    attrs.DisplayName = ua.LocalizedText(qname.Name)
371
    attrs.WriteMask = 0
372
    attrs.UserWriteMask = 0
373
    attrs.Executable = True
374
    attrs.UserExecutable = True
375
    addnode.NodeAttributes = attrs
376
    results = await parent.server.add_nodes([addnode])
377
    results[0].StatusCode.check()
378
    method = make_node(parent.server, results[0].AddedNodeId)
379
    if inputs:
380
        await create_property(
381
            method,
382
            ua.NodeId(namespaceidx=method.nodeid.NamespaceIndex),
383
            ua.QualifiedName("InputArguments", 0),
384
            [_vtype_to_argument(vtype) for vtype in inputs],
385
            varianttype=ua.VariantType.ExtensionObject,
386
            datatype=ua.ObjectIds.Argument
387
        )
388
    if outputs:
389
        await create_property(
390
            method,
391
            ua.NodeId(namespaceidx=method.nodeid.NamespaceIndex),
392
            ua.QualifiedName("OutputArguments", 0),
393
            [_vtype_to_argument(vtype) for vtype in outputs],
394
            varianttype=ua.VariantType.ExtensionObject,
395
            datatype=ua.ObjectIds.Argument
396
        )
397
    if hasattr(parent.server, "add_method_callback"):
398
        parent.server.add_method_callback(method.nodeid, callback)
399
    return results[0].AddedNodeId
400
401
402
def _vtype_to_argument(vtype):
403
    if isinstance(vtype, ua.Argument):
404
        return vtype
405
    arg = ua.Argument()
406
    if isinstance(vtype, ua.VariantType):
407
        arg.DataType = ua.NodeId(vtype.value)
408
    else:
409
        arg.DataType = ua.NodeId(vtype)
410
    return arg
411
412
413
def _guess_datatype(variant):
414
    if variant.VariantType == ua.VariantType.ExtensionObject:
415
        if variant.Value is None:
416
            raise ua.UaError("Cannot guess DataType from Null ExtensionObject")
417
        if type(variant.Value) in (list, tuple):
418
            if len(variant.Value) == 0:
419
                raise ua.UaError("Cannot guess DataType from Null ExtensionObject")
420
            extobj = variant.Value[0]
421
        else:
422
            extobj = variant.Value
423
        classname = extobj.__class__.__name__
424
        if not hasattr(ua.ObjectIds, classname):
425
            raise ua.UaError(f"Cannot guess DataType of {variant} of python type {type(variant)}")
426
        return ua.NodeId(getattr(ua.ObjectIds, classname))
427
    else:
428
        return ua.NodeId(getattr(ua.ObjectIds, variant.VariantType.name))
429
430
431
async def delete_nodes(server, nodes, recursive=False, delete_target_references=True):
432
    """
433
    Delete specified nodes. Optionally delete recursively all nodes with a
434
    downward hierachic references to the node
435
    return the list of deleted node and the result
436
    """
437
    nodestodelete = []
438
    if recursive:
439
        nodes = await _add_childs(nodes)
440
    for mynode in nodes:
441
        it = ua.DeleteNodesItem()
442
        it.NodeId = mynode.nodeid
443
        it.DeleteTargetReferences = delete_target_references
444
        nodestodelete.append(it)
445
    params = ua.DeleteNodesParameters()
446
    params.NodesToDelete = nodestodelete
447
    return nodes, await server.delete_nodes(params)
448
449
450
async def _add_childs(nodes):
451
    results = []
452
    for mynode in nodes:
453
        results += await _add_childs(await mynode.get_children())
454
        results += [mynode]
455
    return results
456
457
458
def _check_browsename_exist(parent, bname):
459
    for ref in parent.server.aspace._nodes[parent.nodeid].references:
460
        if bname == ref.BrowseName.Name and (ref.ReferenceTypeId.Identifier in [ua.ObjectIds.HasChild, ua.ObjectIds.HasComponent, ua.ObjectIds.HasProperty, ua.ObjectIds.HasSubtype, ua.ObjectIds.HasOrderedComponent]):
461
            _logger.error(ua.status_codes.code_to_name_doc[ua.StatusCodes.BadBrowseNameDuplicated])
462
            return ua.StatusCode(ua.StatusCodes.BadBrowseNameDuplicated)
463