Completed
Push — master ( c75f71...ed2734 )
by Olivier
04:22
created

create_variable()   B

Complexity

Conditions 5

Size

Total Lines 14

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 5
CRAP Score 5

Importance

Changes 2
Bugs 0 Features 1
Metric Value
cc 5
dl 0
loc 14
ccs 5
cts 5
cp 1
crap 5
rs 8.5454
c 2
b 0
f 1
1
"""
2
High level functions to create nodes
3
"""
4 1
from opcua import ua
5 1
from opcua.common import node
6
from opcua.common.instantiate import instantiate
7
8 1
9 1
def _parse_nodeid_qname(*args):
10 1
    try:
11 1
        if isinstance(args[0], int):
12 1
            nodeid = ua.NodeId(0, int(args[0]))
13 1
            qname = ua.QualifiedName(args[1], int(args[0]))
14 1
            return nodeid, qname
15 1
        if isinstance(args[0], ua.NodeId):
16 1
            nodeid = args[0]
17 1
        elif isinstance(args[0], str):
18
            nodeid = ua.NodeId.from_string(args[0])
19 1
        else:
20 1
            raise RuntimeError()
21 1
        if isinstance(args[1], ua.QualifiedName):
22 1
            qname = args[1]
23 1
        elif isinstance(args[1], str):
24
            qname = ua.QualifiedName.from_string(args[1])
25 1
        else:
26 1
            raise RuntimeError()
27 1
        return nodeid, qname
28 1
    except ua.UaError:
29 1
        raise
30 1
    except Exception as ex:
31
        raise TypeError("This method takes either a namespace index and a string as argument or a nodeid and a qualifiedname. Received arguments {} and got exception {}".format(args, ex))
32
33 1
34
def create_folder(parent, nodeid, bname):
35
    """
36
    create a child node folder
37
    arguments are nodeid, browsename
38
    or namespace index, name
39 1
    """
40 1
    nodeid, qname = _parse_nodeid_qname(nodeid, bname)
41
    return node.Node(parent.server, _create_object(parent.server, parent.nodeid, nodeid, qname, ua.ObjectIds.FolderType))
42
43 1
44
def create_object(parent, nodeid, bname, objecttype=None):
45
    """
46
    create a child node object
47
    arguments are nodeid, browsename, [objecttype]
48
    or namespace index, name, [objecttype]
49 1
    if objectype is given (a NodeId) then the type node is instantiated inclusive its child nodes
50 1
    """
51
    nodeid, qname = _parse_nodeid_qname(nodeid, bname)
52
    if objecttype is not None:
53 1
        objecttype = node.Node(parent.server, objecttype)
54
        return instantiate(parent, objecttype, nodeid, bname)[0]
55
    else:
56
        return node.Node(parent.server, _create_object(parent.server, parent.nodeid, nodeid, qname, ua.ObjectIds.BaseObjectType))
57
58
59 1
def create_property(parent, nodeid, bname, val, varianttype=None, datatype=None):
60 1
    """
61 1
    create a child node property
62
    args are nodeid, browsename, value, [variant type]
63
    or idx, name, value, [variant type]
64 1
    """
65
    nodeid, qname = _parse_nodeid_qname(nodeid, bname)
66
    var = ua.Variant(val, varianttype)
67
    if datatype and isinstance(datatype, int):
68
        datatype = ua.NodeId(0, datatype)
69
    if datatype and not isinstance(datatype, ua.NodeId):
70 1
        raise RuntimeError("datatyoe argument must be a nodeid or an int refering to a nodeid")
71 1
    return node.Node(parent.server, _create_variable(parent.server, parent.nodeid, nodeid, qname, var, datatype=datatype, isproperty=True))
72 1
73
74
def create_variable(parent, nodeid, bname, val, varianttype=None, datatype=None):
75 1
    """
76
    create a child node variable
77
    args are nodeid, browsename, value, [variant type], [data type]
78
    or idx, name, value, [variant type], [data type]
79
    """
80
    nodeid, qname = _parse_nodeid_qname(nodeid, bname)
81
    var = ua.Variant(val, varianttype)
82
    if datatype and isinstance(datatype, int):
83
        datatype = ua.NodeId(0, datatype)
84 1
    if datatype and not isinstance(datatype, ua.NodeId):
85 1
        raise RuntimeError("datatyoe argument must be a nodeid or an int refering to a nodeid")
86 1
87 1
    return node.Node(parent.server, _create_variable(parent.server, parent.nodeid, nodeid, qname, var, datatype=datatype, isproperty=False))
88
89 1
90 1
def create_variable_type(parent, nodeid, bname, datatype):
91 1
    """
92
    Create a new variable type
93 1
    args are nodeid, browsename and datatype
94 1
    or idx, name and data type
95
    """
96
    nodeid, qname = _parse_nodeid_qname(nodeid, bname)
97 1
    if datatype and not isinstance(datatype, ua.NodeId):
98
        raise RuntimeError()
99
    addnode = ua.AddNodesItem()
100
    addnode.RequestedNewNodeId = nodeid
101
    addnode.BrowseName = qname
102
    addnode.NodeClass = ua.NodeClass.Variable
103 1
    addnode.ParentNodeId = parent.nodeid
104 1
    addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasSubtype)
105
    attrs = ua.VariableTypeAttributes()
106
    attrs.Description = ua.LocalizedText(qname.Name)
107 1
    attrs.DisplayName = ua.LocalizedText(qname.Name)
108 1
    attrs.DataType = datatype
109 1
    attrs.IsAbstract = False
110 1
    attrs.WriteMask = 0
111 1
    attrs.UserWriteMask = 0
112
    attrs.Historizing = 0
113 1
    attrs.AccessLevel = ua.AccessLevel.CurrentRead.mask
114 1
    attrs.UserAccessLevel = ua.AccessLevel.CurrentRead.mask
115 1
    addnode.NodeAttributes = attrs
116 1
    results = parent.server.add_nodes([addnode])
117 1
    results[0].StatusCode.check()
118
    return results[0].AddedNodeId
119 1
120 1
121
def create_reference_type(parent, nodeid, bname):
122 1
    """
123
    Create a new reference type
124 1
    args are nodeid and browsename
125 1
    or idx and name
126 1
    """
127 1
    nodeid, qname = _parse_nodeid_qname(nodeid, bname)
128
    addnode = ua.AddNodesItem()
129 1
    addnode.RequestedNewNodeId = nodeid
130 1
    addnode.BrowseName = qname
131 1
    addnode.NodeClass = ua.NodeClass.Variable
132 1
    addnode.ParentNodeId = parent.nodeid
133 1
    addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasSubtype)
134 1
    attrs = ua.ReferenceTypeAttributes()
135 1
    attrs.IsAbstract = False
136 1
    attrs.Description = ua.LocalizedText(qname.Name)
137
    attrs.DisplayName = ua.LocalizedText(qname.Name)
138
    attrs.AccessLevel = ua.AccessLevel.CurrentRead.mask
139 1
    attrs.UserAccessLevel = ua.AccessLevel.CurrentRead.mask
140 1
    addnode.NodeAttributes = attrs
141 1
    results = parent.server.add_nodes([addnode])
142
    results[0].StatusCode.check()
143 1
    return results[0].AddedNodeId
144
145
146 1
def create_object_type(parent, nodeid, bname):
147 1
    """
148 1
    Create a new object type to be instanciated in address space.
149 1
    arguments are nodeid, browsename
150 1
    or namespace index, name
151 1
    """
152 1
    nodeid, qname = _parse_nodeid_qname(nodeid, bname)
153 1
    return node.Node(parent.server, _create_object_type(parent.server, parent.nodeid, nodeid, qname))
154 1
155
156 1
def create_method(parent, *args):
157 1
    """
158 1
    create a child method object
159 1
    This is only possible on server side!!
160 1
    args are nodeid, browsename, method_to_be_called, [input argument types], [output argument types]
161 1
    or idx, name, method_to_be_called, [input argument types], [output argument types]
162 1
    if argument types is specified, child nodes advertising what arguments the method uses and returns will be created
163 1
    a callback is a method accepting the nodeid of the parent as first argument and variants after. returns a list of variants
164
    """
165
    nodeid, qname = _parse_nodeid_qname(*args[:2])
166 1
    callback = args[2]
167
    if len(args) > 3:
168 1
        inputs = args[3]
169 1
    else:
170 1
        inputs = []
171 1
    if len(args) > 4:
172 1
        outputs = args[4]
173 1
    else:
174 1
        outputs = []
175 1
    return node.Node(parent.server, _create_method(parent, nodeid, qname, callback, inputs, outputs))
176 1
177
178
def _create_object(server, parentnodeid, nodeid, qname, objecttype):
179 1
    addnode = ua.AddNodesItem()
180 1
    addnode.RequestedNewNodeId = nodeid
181 1
    addnode.BrowseName = qname
182 1
    addnode.ParentNodeId = parentnodeid
183 1
    if node.Node(server, parentnodeid).get_type_definition() == ua.NodeId(ua.ObjectIds.FolderType):
184 1
        addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.Organizes)
185 1
    else:
186
        addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasComponent)
187 1
    addnode.NodeClass = ua.NodeClass.Object
188 1
    if isinstance(objecttype, int):
189 1
        addnode.TypeDefinition = ua.NodeId(objecttype)
190 1
    elif isinstance(objecttype, ua.NodeId):
191 1
        addnode.TypeDefinition = objecttype
192 1
    attrs = ua.ObjectAttributes()
193 1
    attrs.EventNotifier = 0
194 1
195 1
    attrs.Description = ua.LocalizedText(qname.Name)
196 1
    attrs.DisplayName = ua.LocalizedText(qname.Name)
197 1
    attrs.WriteMask = 0
198 1
    attrs.UserWriteMask = 0
199 1
    addnode.NodeAttributes = attrs
200 1
    results = server.add_nodes([addnode])
201 1
    results[0].StatusCode.check()
202 1
    return results[0].AddedNodeId
203 1
204
205
def _create_object_type(server, parentnodeid, nodeid, qname):
206 1
    addnode = ua.AddNodesItem()
207 1
    addnode.RequestedNewNodeId = nodeid
208
    addnode.BrowseName = qname
209
    addnode.ParentNodeId = parentnodeid
210 1
    addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasSubtype)
211 1
    addnode.NodeClass = ua.NodeClass.ObjectType
212 1
    attrs = ua.ObjectTypeAttributes()
213 1
    attrs.IsAbstract = False
214
    attrs.Description = ua.LocalizedText(qname.Name)
215
    attrs.DisplayName = ua.LocalizedText(qname.Name)
216 1
    attrs.WriteMask = 0
217 1
    attrs.UserWriteMask = 0
218 1
    addnode.NodeAttributes = attrs
219
    results = server.add_nodes([addnode])
220 1
    results[0].StatusCode.check()
221 1
    return results[0].AddedNodeId
222
223 1
224
def _create_variable(server, parentnodeid, nodeid, qname, var, datatype=None, isproperty=False):
225
    addnode = ua.AddNodesItem()
226 1
    addnode.RequestedNewNodeId = nodeid
227 1
    addnode.BrowseName = qname
228
    addnode.NodeClass = ua.NodeClass.Variable
229 1
    addnode.ParentNodeId = parentnodeid
230
    if isproperty:
231
        addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasProperty)
232 1
        addnode.TypeDefinition = ua.NodeId(ua.ObjectIds.PropertyType)
233
    else:
234
        addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasComponent)
235
        addnode.TypeDefinition = ua.NodeId(ua.ObjectIds.BaseDataVariableType)
236
    attrs = ua.VariableAttributes()
237 1
    attrs.Description = ua.LocalizedText(qname.Name)
238 1
    attrs.DisplayName = ua.LocalizedText(qname.Name)
239 1
    if datatype:
240 1
        attrs.DataType = datatype
241 1
    else:
242 1
        attrs.DataType = _guess_datatype(var)
243 1
244 1
    attrs.Value = var
245 1
    if isinstance(var, list) or isinstance(var, tuple):
246
        attrs.ValueRank = ua.ValueRank.OneDimension
247
    else:
248 1
        attrs.ValueRank = ua.ValueRank.Scalar
249 1
    #attrs.ArrayDimensions = None
250 1
    attrs.WriteMask = 0
251 1
    attrs.UserWriteMask = 0
252 1
    attrs.Historizing = 0
253
    attrs.AccessLevel = ua.AccessLevel.CurrentRead.mask
254
    attrs.UserAccessLevel = ua.AccessLevel.CurrentRead.mask
255
    addnode.NodeAttributes = attrs
256
    results = server.add_nodes([addnode])
257
    results[0].StatusCode.check()
258
    return results[0].AddedNodeId
259
260
261
def _create_variable_type(server, parentnodeid, nodeid, qname, datatype, value=None):
262
    addnode = ua.AddNodesItem()
263
    addnode.RequestedNewNodeId = nodeid
264
    addnode.BrowseName = qname
265
    addnode.NodeClass = ua.NodeClass.VariableType
266
    addnode.ParentNodeId = parentnodeid
267
    addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasSubtype)
268
    #addnode.TypeDefinition = ua.NodeId(ua.ObjectIds.BaseDataVariableType)
269
    attrs = ua.VariableTypeAttributes()
270
    attrs.Description = ua.LocalizedText(qname.Name)
271
    attrs.DisplayName = ua.LocalizedText(qname.Name)
272
    attrs.DataType = datatype
273
    if value:
274
        attrs.Value = value
275
        if isinstance(value, (list, tuple)):
276
            attrs.ValueRank = ua.ValueRank.OneDimension
277
        else:
278
            attrs.ValueRank = ua.ValueRank.Scalar
279
    #attrs.ArrayDimensions = None
280
    attrs.WriteMask = 0
281
    attrs.UserWriteMask = 0
282
    attrs.Historizing = 0
283
    attrs.AccessLevel = ua.AccessLevel.CurrentRead.mask
284
    attrs.UserAccessLevel = ua.AccessLevel.CurrentRead.mask
285
    addnode.NodeAttributes = attrs
286
    results = server.add_nodes([addnode])
287
    results[0].StatusCode.check()
288
    return results[0].AddedNodeId
289
290
291
def create_data_type(parent, nodeid, bname, description=None):
292
    """
293
    Create a new data type to be used in new variables, etc ..
294
    arguments are nodeid, browsename
295
    or namespace index, name
296
    """
297
    nodeid, qname = _parse_nodeid_qname(nodeid, bname)
298
299
    addnode = ua.AddNodesItem()
300
    addnode.RequestedNewNodeId = nodeid
301
    addnode.BrowseName = qname
302
    addnode.NodeClass = ua.NodeClass.DataType
303
    addnode.ParentNodeId = parent.nodeid
304
    addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasSubtype)
305
    #addnode.TypeDefinition = ua.NodeId(ua.ObjectIds.BaseDataVariableType) # No type definition for types
306
    attrs = ua.DataTypeAttributes()
307
    if description is None:
308
        attrs.Description = ua.LocalizedText(qname.Name)
309
    else:
310
        attrs.Description = ua.LocalizedText(description)
311
    attrs.DisplayName = ua.LocalizedText(qname.Name)
312
    attrs.WriteMask = 0
313
    attrs.UserWriteMask = 0
314
    attrs.IsAbstract = False  # True mean they cannot be instanciated
315
    addnode.NodeAttributes = attrs
316
    results = parent.server.add_nodes([addnode])
317
    results[0].StatusCode.check()
318
    return node.Node(parent.server, results[0].AddedNodeId)
319
320
321
def _create_method(parent, nodeid, qname, callback, inputs, outputs):
322
    addnode = ua.AddNodesItem()
323
    addnode.RequestedNewNodeId = nodeid
324
    addnode.BrowseName = qname
325
    addnode.NodeClass = ua.NodeClass.Method
326
    addnode.ParentNodeId = parent.nodeid
327
    addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasComponent)
328
    #node.TypeDefinition = ua.NodeId(ua.ObjectIds.BaseObjectType)
329
    attrs = ua.MethodAttributes()
330
    attrs.Description = ua.LocalizedText(qname.Name)
331
    attrs.DisplayName = ua.LocalizedText(qname.Name)
332
    attrs.WriteMask = 0
333
    attrs.UserWriteMask = 0
334
    attrs.Executable = True
335
    attrs.UserExecutable = True
336
    addnode.NodeAttributes = attrs
337
    results = parent.server.add_nodes([addnode])
338
    results[0].StatusCode.check()
339
    method = node.Node(parent.server, results[0].AddedNodeId)
340
    create_property(method,
341
                    ua.NodeId(),
342
                    ua.QualifiedName("InputArguments", 0),
343
                    [_vtype_to_argument(vtype) for vtype in inputs],
344
                    varianttype=ua.VariantType.ExtensionObject,
345
                    datatype=ua.ObjectIds.Argument)
346
    create_property(method,
347
                    ua.NodeId(),
348
                    ua.QualifiedName("OutputArguments", 0),
349
                    [_vtype_to_argument(vtype) for vtype in outputs],
350
                    varianttype=ua.VariantType.ExtensionObject,
351
                    datatype=ua.ObjectIds.Argument)
352
    parent.server.add_method_callback(method.nodeid, callback)
353
    return results[0].AddedNodeId
354
355
356
def _vtype_to_argument(vtype):
357
    if isinstance(vtype, ua.Argument):
358
        return vtype
359
360
    arg = ua.Argument()
361
    v = ua.Variant(None, vtype)
362
    arg.DataType = _guess_datatype(v)
363
    return arg
364
365
366
def _guess_datatype(variant):
367
    if variant.VariantType == ua.VariantType.ExtensionObject:
368
        if variant.Value is None:
369
            raise ua.UaError("Cannot guess DataType from Null ExtensionObject")
370
        if type(variant.Value) in (list, tuple):
371
            if len(variant.Value) == 0:
372
                raise ua.UaError("Cannot guess DataType from Null ExtensionObject")
373
            extobj = variant.Value[0]
374
        else:
375
            extobj = variant.Value
376
        classname = extobj.__class__.__name__
377
        return ua.NodeId(getattr(ua.ObjectIds, classname))
378
    else:
379
        return ua.NodeId(getattr(ua.ObjectIds, variant.VariantType.name))
380
381
382
def delete_nodes(server, nodes, recursive=False):
383
    """
384
    Delete specified nodes. Optionally delete recursively all nodes with a
385
    downward hierachic references to the node
386
    """
387
    nodestodelete = []
388
    if recursive:
389
        nodes += _add_childs(nodes)
390
    for mynode in nodes:
391
        it = ua.DeleteNodesItem()
392
        it.NodeId = mynode.nodeid
393
        it.DeleteTargetReferences = True
394
        nodestodelete.append(it)
395
    params = ua.DeleteNodesParameters()
396
    params.NodesToDelete = nodestodelete
397
    return server.delete_nodes(params)
398
399
400
def _add_childs(nodes):
401
    results = []
402
    for mynode in nodes[:]:
403
        results += mynode.get_children()
404
    return results
405
406
407