Completed
Pull Request — master (#244)
by Olivier
04:00
created

create_object()   A

Complexity

Conditions 2

Size

Total Lines 13

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 2

Importance

Changes 2
Bugs 0 Features 1
Metric Value
cc 2
c 2
b 0
f 1
dl 0
loc 13
rs 9.4285
ccs 3
cts 3
cp 1
crap 2
1
"""
2
High level functions to create nodes
3
"""
4 1
from opcua import ua
5 1
from opcua.common import node
6
from opcua.common.instantiate import instantiate
7
8 1
9 1
def _parse_nodeid_qname(*args):
10 1
    try:
11 1
        if isinstance(args[0], int):
12 1
            nodeid = ua.NodeId(0, int(args[0]))
13 1
            qname = ua.QualifiedName(args[1], int(args[0]))
14 1
            return nodeid, qname
15 1
        if isinstance(args[0], ua.NodeId):
16 1
            nodeid = args[0]
17 1
        elif isinstance(args[0], str):
18
            nodeid = ua.NodeId.from_string(args[0])
19 1
        else:
20 1
            raise RuntimeError()
21 1
        if isinstance(args[1], ua.QualifiedName):
22 1
            qname = args[1]
23 1
        elif isinstance(args[1], str):
24
            qname = ua.QualifiedName.from_string(args[1])
25 1
        else:
26 1
            raise RuntimeError()
27 1
        return nodeid, qname
28 1
    except ua.UaError:
29 1
        raise
30 1
    except Exception as ex:
31
        raise TypeError("This method takes either a namespace index and a string as argument or a nodeid and a qualifiedname. Received arguments {} and got exception {}".format(args, ex))
32
33 1
34
def create_folder(parent, nodeid, bname):
35
    """
36
    create a child node folder
37
    arguments are nodeid, browsename
38
    or namespace index, name
39 1
    """
40 1
    nodeid, qname = _parse_nodeid_qname(nodeid, bname)
41
    return node.Node(parent.server, _create_object(parent.server, parent.nodeid, nodeid, qname, ua.ObjectIds.FolderType))
42
43 1
44
def create_object(parent, nodeid, bname, objecttype=None):
45
    """
46
    create a child node object
47
    arguments are nodeid, browsename, [objecttype]
48
    or namespace index, name, [objecttype]
49 1
    if objectype is given (a NodeId) then the type node is instantiated inclusive its child nodes
50 1
    """
51
    nodeid, qname = _parse_nodeid_qname(nodeid, bname)
52
    if objecttype is not None:
53 1
        objecttype = node.Node(parent.server, objecttype)
54
        return instantiate(parent, objecttype, nodeid, bname)
55
    else:
56
        return node.Node(parent.server, _create_object(parent.server, parent.nodeid, nodeid, qname, ua.ObjectIds.BaseObjectType))
57
58
59 1
def create_property(parent, nodeid, bname, val, varianttype=None, datatype=None):
60 1
    """
61 1
    create a child node property
62
    args are nodeid, browsename, value, [variant type]
63
    or idx, name, value, [variant type]
64 1
    """
65
    nodeid, qname = _parse_nodeid_qname(nodeid, bname)
66
    var = ua.Variant(val, varianttype)
67
    if datatype and not isinstance(datatype, ua.NodeId):
68
        raise RuntimeError()
69
    return node.Node(parent.server, _create_variable(parent.server, parent.nodeid, nodeid, qname, var, datatype=datatype, isproperty=True))
70 1
71 1
72 1
def create_variable(parent, nodeid, bname, val, varianttype=None, datatype=None):
73
    """
74
    create a child node variable
75 1
    args are nodeid, browsename, value, [variant type], [data type]
76
    or idx, name, value, [variant type], [data type]
77
    """
78
    nodeid, qname = _parse_nodeid_qname(nodeid, bname)
79
    var = ua.Variant(val, varianttype)
80
    if datatype and not isinstance(datatype, ua.NodeId):
81
        raise RuntimeError()
82
83
    return node.Node(parent.server, _create_variable(parent.server, parent.nodeid, nodeid, qname, var, datatype=datatype, isproperty=False))
84 1
85 1
86 1
def create_variable_type(parent, nodeid, bname, datatype):
87 1
    """
88
    Create a new variable type
89 1
    args are nodeid, browsename and datatype
90 1
    or idx, name and data type
91 1
    """
92
    nodeid, qname = _parse_nodeid_qname(nodeid, bname)
93 1
    if datatype and not isinstance(datatype, ua.NodeId):
94 1
        raise RuntimeError()
95
    addnode = ua.AddNodesItem()
96
    addnode.RequestedNewNodeId = nodeid
97 1
    addnode.BrowseName = qname
98
    addnode.NodeClass = ua.NodeClass.Variable
99
    addnode.ParentNodeId = parent.nodeid
100
    addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasSubtype)
101
    attrs = ua.VariableTypeAttributes()
102
    attrs.Description = ua.LocalizedText(qname.Name)
103 1
    attrs.DisplayName = ua.LocalizedText(qname.Name)
104 1
    attrs.DataType = datatype
105
    attrs.IsAbstract = False
106
    attrs.WriteMask = 0
107 1
    attrs.UserWriteMask = 0
108 1
    attrs.Historizing = 0
109 1
    attrs.AccessLevel = ua.AccessLevelMask.CurrentRead
110 1
    attrs.UserAccessLevel = ua.AccessLevelMask.CurrentRead
111 1
    addnode.NodeAttributes = attrs
112
    results = parent.server.add_nodes([addnode])
113 1
    results[0].StatusCode.check()
114 1
    return results[0].AddedNodeId
115 1
116 1
117 1
def create_reference_type(parent, nodeid, bname):
118
    """
119 1
    Create a new reference type
120 1
    args are nodeid and browsename
121
    or idx and name
122 1
    """
123
    nodeid, qname = _parse_nodeid_qname(nodeid, bname)
124 1
    addnode = ua.AddNodesItem()
125 1
    addnode.RequestedNewNodeId = nodeid
126 1
    addnode.BrowseName = qname
127 1
    addnode.NodeClass = ua.NodeClass.Variable
128
    addnode.ParentNodeId = parent.nodeid
129 1
    addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasSubtype)
130 1
    attrs = ua.ReferenceTypeAttributes()
131 1
    attrs.IsAbstract = False
132 1
    attrs.Description = ua.LocalizedText(qname.Name)
133 1
    attrs.DisplayName = ua.LocalizedText(qname.Name)
134 1
    attrs.AccessLevel = ua.AccessLevelMask.CurrentRead
135 1
    attrs.UserAccessLevel = ua.AccessLevelMask.CurrentRead
136 1
    addnode.NodeAttributes = attrs
137
    results = parent.server.add_nodes([addnode])
138
    results[0].StatusCode.check()
139 1
    return results[0].AddedNodeId
140 1
141 1
142
def create_object_type(parent, nodeid, bname):
143 1
    """
144
    Create a new object type to be instanciated in address space.
145
    arguments are nodeid, browsename
146 1
    or namespace index, name
147 1
    """
148 1
    nodeid, qname = _parse_nodeid_qname(nodeid, bname)
149 1
    return node.Node(parent.server, _create_object_type(parent.server, parent.nodeid, nodeid, qname))
150 1
151 1
152 1
def create_method(parent, *args):
153 1
    """
154 1
    create a child method object
155
    This is only possible on server side!!
156 1
    args are nodeid, browsename, method_to_be_called, [input argument types], [output argument types]
157 1
    or idx, name, method_to_be_called, [input argument types], [output argument types]
158 1
    if argument types is specified, child nodes advertising what arguments the method uses and returns will be created
159 1
    a callback is a method accepting the nodeid of the parent as first argument and variants after. returns a list of variants
160 1
    """
161 1
    nodeid, qname = _parse_nodeid_qname(*args[:2])
162 1
    callback = args[2]
163 1
    if len(args) > 3:
164
        inputs = args[3]
165
    else:
166 1
        inputs = []
167
    if len(args) > 4:
168 1
        outputs = args[4]
169 1
    else:
170 1
        outputs = []
171 1
    return node.Node(parent.server, _create_method(parent, nodeid, qname, callback, inputs, outputs))
172 1
173 1
174 1
def _create_object(server, parentnodeid, nodeid, qname, objecttype):
175 1
    addnode = ua.AddNodesItem()
176 1
    addnode.RequestedNewNodeId = nodeid
177
    addnode.BrowseName = qname
178
    addnode.ParentNodeId = parentnodeid
179 1
    if node.Node(server, parentnodeid).get_type_definition() == ua.NodeId(ua.ObjectIds.FolderType):
180 1
        addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.Organizes)
181 1
    else:
182 1
        addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasComponent)
183 1
    addnode.NodeClass = ua.NodeClass.Object
184 1
    if isinstance(objecttype, int):
185 1
        addnode.TypeDefinition = ua.NodeId(objecttype)
186
    elif isinstance(objecttype, ua.NodeId):
187 1
        addnode.TypeDefinition = objecttype
188 1
    attrs = ua.ObjectAttributes()
189 1
    attrs.EventNotifier = 0
190 1
191 1
    attrs.Description = ua.LocalizedText(qname.Name)
192 1
    attrs.DisplayName = ua.LocalizedText(qname.Name)
193 1
    attrs.WriteMask = 0
194 1
    attrs.UserWriteMask = 0
195 1
    addnode.NodeAttributes = attrs
196 1
    results = server.add_nodes([addnode])
197 1
    results[0].StatusCode.check()
198 1
    return results[0].AddedNodeId
199 1
200 1
201 1
def _create_object_type(server, parentnodeid, nodeid, qname):
202 1
    addnode = ua.AddNodesItem()
203 1
    addnode.RequestedNewNodeId = nodeid
204
    addnode.BrowseName = qname
205
    addnode.ParentNodeId = parentnodeid
206 1
    addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasSubtype)
207 1
    addnode.NodeClass = ua.NodeClass.ObjectType
208
    attrs = ua.ObjectTypeAttributes()
209
    attrs.IsAbstract = False
210 1
    attrs.Description = ua.LocalizedText(qname.Name)
211 1
    attrs.DisplayName = ua.LocalizedText(qname.Name)
212 1
    attrs.WriteMask = 0
213 1
    attrs.UserWriteMask = 0
214
    addnode.NodeAttributes = attrs
215
    results = server.add_nodes([addnode])
216 1
    results[0].StatusCode.check()
217 1
    return results[0].AddedNodeId
218 1
219
220 1
def _create_variable(server, parentnodeid, nodeid, qname, var, datatype=None, isproperty=False):
221 1
    addnode = ua.AddNodesItem()
222
    addnode.RequestedNewNodeId = nodeid
223 1
    addnode.BrowseName = qname
224
    addnode.NodeClass = ua.NodeClass.Variable
225
    addnode.ParentNodeId = parentnodeid
226 1
    if isproperty:
227 1
        addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasProperty)
228
        addnode.TypeDefinition = ua.NodeId(ua.ObjectIds.PropertyType)
229 1
    else:
230
        addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasComponent)
231
        addnode.TypeDefinition = ua.NodeId(ua.ObjectIds.BaseDataVariableType)
232 1
    attrs = ua.VariableAttributes()
233
    attrs.Description = ua.LocalizedText(qname.Name)
234
    attrs.DisplayName = ua.LocalizedText(qname.Name)
235
    if datatype:
236
        attrs.DataType = datatype
237 1
    else:
238 1
        attrs.DataType = _guess_datatype(var)
239 1
240 1
    attrs.Value = var
241 1
    if isinstance(var, list) or isinstance(var, tuple):
242 1
        attrs.ValueRank = ua.ValueRank.OneDimension
243 1
    else:
244 1
        attrs.ValueRank = ua.ValueRank.Scalar
245 1
    #attrs.ArrayDimensions = None
246
    attrs.WriteMask = 0
247
    attrs.UserWriteMask = 0
248 1
    attrs.Historizing = 0
249 1
    attrs.AccessLevel = ua.AccessLevelMask.CurrentRead
250 1
    attrs.UserAccessLevel = ua.AccessLevelMask.CurrentRead
251 1
    addnode.NodeAttributes = attrs
252 1
    results = server.add_nodes([addnode])
253
    results[0].StatusCode.check()
254
    return results[0].AddedNodeId
255
256
257
def _create_variable_type(server, parentnodeid, nodeid, qname, datatype, value=None):
258
    addnode = ua.AddNodesItem()
259
    addnode.RequestedNewNodeId = nodeid
260
    addnode.BrowseName = qname
261
    addnode.NodeClass = ua.NodeClass.VariableType
262
    addnode.ParentNodeId = parentnodeid
263
    addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasSubtype)
264
    #addnode.TypeDefinition = ua.NodeId(ua.ObjectIds.BaseDataVariableType)
265
    attrs = ua.VariableTypeAttributes()
266
    attrs.Description = ua.LocalizedText(qname.Name)
267
    attrs.DisplayName = ua.LocalizedText(qname.Name)
268
    attrs.DataType = datatype
269
    if value:
270
        attrs.Value = value
271
        if isinstance(value, (list, tuple)):
272
            attrs.ValueRank = ua.ValueRank.OneDimension
273
        else:
274
            attrs.ValueRank = ua.ValueRank.Scalar
275
    #attrs.ArrayDimensions = None
276
    attrs.WriteMask = 0
277
    attrs.UserWriteMask = 0
278
    attrs.Historizing = 0
279
    attrs.AccessLevel = ua.AccessLevelMask.CurrentRead
280
    attrs.UserAccessLevel = ua.AccessLevelMask.CurrentRead
281
    addnode.NodeAttributes = attrs
282
    results = server.add_nodes([addnode])
283
    results[0].StatusCode.check()
284
    return results[0].AddedNodeId
285
286
287
def create_data_type(parent, nodeid, bname, description=None):
288
    """
289
    Create a new data type to be used in new variables, etc ..
290
    arguments are nodeid, browsename
291
    or namespace index, name
292
    """
293
    nodeid, qname = _parse_nodeid_qname(nodeid, bname)
294
295
    addnode = ua.AddNodesItem()
296
    addnode.RequestedNewNodeId = nodeid
297
    addnode.BrowseName = qname
298
    addnode.NodeClass = ua.NodeClass.DataType
299
    addnode.ParentNodeId = parent.nodeid
300
    addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasSubtype)
301
    #addnode.TypeDefinition = ua.NodeId(ua.ObjectIds.BaseDataVariableType) # No type definition for types
302
    attrs = ua.DataTypeAttributes()
303
    if description is None:
304
        attrs.Description = ua.LocalizedText(qname.Name)
305
    else:
306
        attrs.Description = ua.LocalizedText(description)
307
    attrs.DisplayName = ua.LocalizedText(qname.Name)
308
    attrs.WriteMask = 0
309
    attrs.UserWriteMask = 0
310
    attrs.IsAbstract = False  # True mean they cannot be instanciated
311
    addnode.NodeAttributes = attrs
312
    results = parent.server.add_nodes([addnode])
313
    results[0].StatusCode.check()
314
    return node.Node(parent.server, results[0].AddedNodeId)
315
316
317
def _create_method(parent, nodeid, qname, callback, inputs, outputs):
318
    addnode = ua.AddNodesItem()
319
    addnode.RequestedNewNodeId = nodeid
320
    addnode.BrowseName = qname
321
    addnode.NodeClass = ua.NodeClass.Method
322
    addnode.ParentNodeId = parent.nodeid
323
    addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasComponent)
324
    #node.TypeDefinition = ua.NodeId(ua.ObjectIds.BaseObjectType)
325
    attrs = ua.MethodAttributes()
326
    attrs.Description = ua.LocalizedText(qname.Name)
327
    attrs.DisplayName = ua.LocalizedText(qname.Name)
328
    attrs.WriteMask = 0
329
    attrs.UserWriteMask = 0
330
    attrs.Executable = True
331
    attrs.UserExecutable = True
332
    addnode.NodeAttributes = attrs
333
    results = parent.server.add_nodes([addnode])
334
    results[0].StatusCode.check()
335
    method = node.Node(parent.server, results[0].AddedNodeId)
336
    if inputs:
337
        create_property(method, ua.NodeId(), ua.QualifiedName("InputArguments", 0), [_vtype_to_argument(vtype) for vtype in inputs])
338
    if outputs:
339
        create_property(method, ua.NodeId(), ua.QualifiedName("OutputArguments", 0), [_vtype_to_argument(vtype) for vtype in outputs])
340
    parent.server.add_method_callback(method.nodeid, callback)
341
    return results[0].AddedNodeId
342
343
344
def _vtype_to_argument(vtype):
345
    if isinstance(vtype, ua.Argument):
346
        return vtype
347
348
    arg = ua.Argument()
349
    v = ua.Variant(None, vtype)
350
    arg.DataType = _guess_datatype(v)
351
    return arg
352
353
354
def _guess_datatype(variant):
355
    if variant.VariantType == ua.VariantType.ExtensionObject:
356
        if variant.Value is None:
357
            raise ua.UaError("Cannot guess DataType from Null ExtensionObject")
358
        if type(variant.Value) in (list, tuple):
359
            if len(variant.Value) == 0:
360
                raise ua.UaError("Cannot guess DataType from Null ExtensionObject")
361
            extobj = variant.Value[0]
362
        else:
363
            extobj = variant.Value
364
        classname = extobj.__class__.__name__
365
        return ua.NodeId(getattr(ua.ObjectIds, classname))
366
    else:
367
        return ua.NodeId(getattr(ua.ObjectIds, variant.VariantType.name))
368
369
370
def delete_nodes(server, nodes, recursive=False):
371
    """
372
    Delete specified nodes. Optionally delete recursively all nodes with a
373
    downward hierachic references to the node
374
    """
375
    nodestodelete = []
376
    if recursive:
377
        nodes += _add_childs(nodes)
378
    for mynode in nodes:
379
        it = ua.DeleteNodesItem()
380
        it.NodeId = mynode.nodeid
381
        it.DeleteTargetReferences = True
382
        nodestodelete.append(it)
383
    return server.delete_nodes(nodestodelete)
384
385
386
def _add_childs(nodes):
387
    results = []
388
    for mynode in nodes[:]:
389
        results += mynode.get_children()
390
    return results
391
392
393