Completed
Pull Request — master (#244)
by Olivier
04:07
created

create_object()   A

Complexity

Conditions 4

Size

Total Lines 16

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 4

Importance

Changes 2
Bugs 0 Features 1
Metric Value
cc 4
dl 0
loc 16
ccs 4
cts 4
cp 1
crap 4
rs 9.2
c 2
b 0
f 1
1
"""
2
High level functions to create nodes
3
"""
4 1
from opcua import ua
5 1
from opcua.common import node
6
7
8 1
def _parse_nodeid_qname(*args):
9 1
    try:
10 1
        if isinstance(args[0], int):
11 1
            nodeid = ua.NodeId(0, int(args[0]))
12 1
            qname = ua.QualifiedName(args[1], int(args[0]))
13 1
            return nodeid, qname
14 1
        if isinstance(args[0], ua.NodeId):
15 1
            nodeid = args[0]
16 1
        elif isinstance(args[0], str):
17 1
            nodeid = ua.NodeId.from_string(args[0])
18
        else:
19 1
            raise RuntimeError()
20 1
        if isinstance(args[1], ua.QualifiedName):
21 1
            qname = args[1]
22 1
        elif isinstance(args[1], str):
23 1
            qname = ua.QualifiedName.from_string(args[1])
24
        else:
25 1
            raise RuntimeError()
26 1
        return nodeid, qname
27 1
    except ua.UaError:
28 1
        raise
29 1
    except Exception as ex:
30 1
        raise TypeError("This method takes either a namespace index and a string as argument or a nodeid and a qualifiedname. Received arguments {} and got exception {}".format(args, ex))
31
32
33 1
def create_folder(parent, nodeid, bname):
34
    """
35
    create a child node folder
36
    arguments are nodeid, browsename
37
    or namespace index, name
38
    """
39 1
    nodeid, qname = _parse_nodeid_qname(nodeid, bname)
40 1
    return node.Node(parent.server, _create_object(parent.server, parent.nodeid, nodeid, qname, ua.ObjectIds.FolderType))
41
42
43 1
def create_object(parent, nodeid, bname, objecttype=ua.ObjectIds.BaseObjectType):
44
    """
45
    create a child node object
46
    arguments are nodeid, browsename
47
    or namespace index, name
48
    """
49 1
    nodeid, qname = _parse_nodeid_qname(nodeid, bname)
50 1
    if isinstance(objecttype, int):
51
        objecttype = ua.NodeId(objecttype)
52
    elif isinstance(objecttype, ua.NodeId):
53 1
        objecttype = objecttype
54
    elif isinstance(objecttype, str):
55
        objecttype = ua.NodeId.from_string(objecttype)
56
    else:
57
        raise TypeError("Could not recognise format of objecttype")
58
    return node.Node(parent.server, _create_object(parent.server, parent.nodeid, nodeid, qname, objecttype))
59 1
60 1
61 1
def create_property(parent, nodeid, bname, val, datatype=None):
62
    """
63
    create a child node property
64 1
    args are nodeid, browsename, value, [variant type]
65
    or idx, name, value, [variant type]
66
    """
67
    nodeid, qname = _parse_nodeid_qname(nodeid, bname)
68
    val, datatype = _to_variant_with_datatype(val, datatype)
69
    if datatype and not isinstance(datatype, ua.NodeId):
70 1
        raise RuntimeError()
71 1
    return node.Node(parent.server, _create_variable(parent.server, parent.nodeid, nodeid, qname, val, datatype, isproperty=True))
72 1
73
74
def create_variable(parent, *args):
75 1
    """
76
    create a child node variable
77
    args are nodeid, browsename, value, [variant type], [data type]
78
    or idx, name, value, [variant type], [data type]
79
    """
80
    nodeid, qname = _parse_nodeid_qname(*args[:2])
81
    val, datatype = _to_variant_with_datatype(*args[2:])
82
    if datatype and not isinstance(datatype, ua.NodeId):
83
        raise RuntimeError()
84 1
85 1
    return node.Node(parent.server, _create_variable(parent.server, parent.nodeid, nodeid, qname, val, datatype, isproperty=False))
86 1
87 1
88
def create_variable_type(parent, nodeid, bname, datatype):
89 1
    """
90 1
    Create a new variable type
91 1
    args are nodeid, browsename, datatype, [variant type], [data type]
92
    or idx, name, value, [variant type], [data type] 
93 1
    """
94 1
    nodeid, qname = _parse_nodeid_qname(nodeid, bname)
95
    val, datatype = _to_variant_with_datatype(datatype)
96
    if datatype and not isinstance(datatype, ua.NodeId):
97 1
        raise RuntimeError()
98
 
99
    return node.Node(parent.server, _create_variable(parent.server, parent.nodeid, nodeid, qname, val, datatype, isproperty=False))
100
101
102
def create_data_type(parent, nodeid, bname):
103 1
    """
104 1
    Create a new data type to be used in new variables, etc ..
105
    arguments are nodeid, browsename
106
    or namespace index, name
107 1
    """
108 1
    nodeid, qname = _parse_nodeid_qname(nodeid, bname)
109 1
    return node.Node(parent.server, _create_data_type(parent.server, parent.nodeid, nodeid, qname))
110 1
111 1
112
def create_object_type(parent, nodeid, bname):
113 1
    """
114 1
    Create a new object type to be instanciated in address space.
115 1
    arguments are nodeid, browsename
116 1
    or namespace index, name
117 1
    """
118
    nodeid, qname = _parse_nodeid_qname(nodeid, bname)
119 1
    return node.Node(parent.server, _create_object_type(parent.server, parent.nodeid, nodeid, qname))
120 1
121
122 1
def create_method(parent, *args):
123
    """
124 1
    create a child method object
125 1
    This is only possible on server side!!
126 1
    args are nodeid, browsename, method_to_be_called, [input argument types], [output argument types]
127 1
    or idx, name, method_to_be_called, [input argument types], [output argument types]
128
    if argument types is specified, child nodes advertising what arguments the method uses and returns will be created
129 1
    a callback is a method accepting the nodeid of the parent as first argument and variants after. returns a list of variants
130 1
    """
131 1
    nodeid, qname = _parse_nodeid_qname(*args[:2])
132 1
    callback = args[2]
133 1
    if len(args) > 3:
134 1
        inputs = args[3]
135 1
    else:
136 1
        inputs = []
137
    if len(args) > 4:
138
        outputs = args[4]
139 1
    else:
140 1
        outputs = []
141 1
    return node.Node(parent.server, _create_method(parent, nodeid, qname, callback, inputs, outputs))
142
143 1
144
# FIXME: remove, it implicitely means create_object_type
145
def create_subtype(parent, *args):
146 1
    """
147 1
    create a child node subtype
148 1
    arguments are nodeid, browsename
149 1
    or namespace index, name
150 1
    """
151 1
    nodeid, qname = _parse_nodeid_qname(*args[:2])
152 1
    return node.Node(parent.server, _create_object_type(parent.server, parent.nodeid, nodeid, qname))
153 1
154 1
155
def _create_object(server, parentnodeid, nodeid, qname, objecttype):
156 1
    addnode = ua.AddNodesItem()
157 1
    addnode.RequestedNewNodeId = nodeid
158 1
    addnode.BrowseName = qname
159 1
    addnode.ParentNodeId = parentnodeid
160 1
    if node.Node(server, parentnodeid).get_type_definition() == ua.ObjectIds.FolderType:
161 1
        addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.Organizes)
162 1
    else:
163 1
        addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasComponent)
164
    addnode.NodeClass = ua.NodeClass.Object
165
    if isinstance(objecttype, int):
166 1
        addnode.TypeDefinition = ua.NodeId(objecttype)
167
    elif isinstance(objecttype, ua.NodeId):
168 1
        addnode.TypeDefinition = objecttype
169 1
    attrs = ua.ObjectAttributes()
170 1
    attrs.EventNotifier = 0
171 1
172 1
    attrs.Description = ua.LocalizedText(qname.Name)
173 1
    attrs.DisplayName = ua.LocalizedText(qname.Name)
174 1
    attrs.WriteMask = 0
175 1
    attrs.UserWriteMask = 0
176 1
    addnode.NodeAttributes = attrs
177
    results = server.add_nodes([addnode])
178
    results[0].StatusCode.check()
179 1
    return results[0].AddedNodeId
180 1
181 1
182 1 View Code Duplication
def _create_object_type(server, parentnodeid, nodeid, qname):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
183 1
    addnode = ua.AddNodesItem()
184 1
    addnode.RequestedNewNodeId = nodeid
185 1
    addnode.BrowseName = qname
186
    addnode.ParentNodeId = parentnodeid
187 1
    addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasSubtype)
188 1
    addnode.NodeClass = ua.NodeClass.ObjectType
189 1
    attrs = ua.ObjectTypeAttributes()
190 1
    attrs.IsAbstract = False
191 1
    attrs.Description = ua.LocalizedText(qname.Name)
192 1
    attrs.DisplayName = ua.LocalizedText(qname.Name)
193 1
    attrs.WriteMask = 0
194 1
    attrs.UserWriteMask = 0
195 1
    addnode.NodeAttributes = attrs
196 1
    results = server.add_nodes([addnode])
197 1
    results[0].StatusCode.check()
198 1
    return results[0].AddedNodeId
199 1
200 1
201 1
def _to_variant(val, vtype=None):
202 1
    return _to_variant_with_datatype(val, vtype, datatype=None)[0]
203 1
204
205
def _to_variant_with_datatype(val, vtype=None, datatype=None):
206 1
    if isinstance(val, ua.Variant):
207 1
        if vtype:
208
            datatype = vtype
209
        return val, datatype
210 1
    else:
211 1
        return ua.Variant(val, vtype), datatype
212 1
213 1
214
def _create_variable(server, parentnodeid, nodeid, qname, val, datatype=None, isproperty=False):
215
    addnode = ua.AddNodesItem()
216 1
    addnode.RequestedNewNodeId = nodeid
217 1
    addnode.BrowseName = qname
218 1
    addnode.NodeClass = ua.NodeClass.Variable
219
    addnode.ParentNodeId = parentnodeid
220 1
    if isproperty:
221 1
        addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasProperty)
222
        addnode.TypeDefinition = ua.NodeId(ua.ObjectIds.PropertyType)
223 1
    else:
224
        addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasComponent)
225
        addnode.TypeDefinition = ua.NodeId(ua.ObjectIds.BaseDataVariableType)
226 1
    attrs = ua.VariableAttributes()
227 1
    attrs.Description = ua.LocalizedText(qname.Name)
228
    attrs.DisplayName = ua.LocalizedText(qname.Name)
229 1
    if datatype:
230
        attrs.DataType = datatype
231
    else:
232 1
        attrs.DataType = _guess_uatype(val)
233
234
    attrs.Value = val
235
    if isinstance(val, list) or isinstance(val, tuple):
236
        attrs.ValueRank = ua.ValueRank.OneDimension
237 1
    else:
238 1
        attrs.ValueRank = ua.ValueRank.Scalar
239 1
    #attrs.ArrayDimensions = None
240 1
    attrs.WriteMask = 0
241 1
    attrs.UserWriteMask = 0
242 1
    attrs.Historizing = 0
243 1
    attrs.AccessLevel = ua.AccessLevelMask.CurrentRead
244 1
    attrs.UserAccessLevel = ua.AccessLevelMask.CurrentRead
245 1
    addnode.NodeAttributes = attrs
246
    results = server.add_nodes([addnode])
247
    results[0].StatusCode.check()
248 1
    return results[0].AddedNodeId
249 1
250 1
251 1
def _create_variable_type(server, parentnodeid, nodeid, qname, datatype, value=None):
252 1
    addnode = ua.AddNodesItem()
253
    addnode.RequestedNewNodeId = nodeid
254
    addnode.BrowseName = qname
255
    addnode.NodeClass = ua.NodeClass.VariableType
256
    addnode.ParentNodeId = parentnodeid
257
    addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasSubType)
258
    #addnode.TypeDefinition = ua.NodeId(ua.ObjectIds.BaseDataVariableType)
259
    attrs = ua.VariableTypeAttributes()
260
    attrs.Description = ua.LocalizedText(qname.Name)
261
    attrs.DisplayName = ua.LocalizedText(qname.Name)
262
    attrs.DataType = datatype
263
    if value:
264
        attrs.Value = value
265
        if isinstance(value, (list, tuple)):
266
            attrs.ValueRank = ua.ValueRank.OneDimension
267
        else:
268
            attrs.ValueRank = ua.ValueRank.Scalar
269
    #attrs.ArrayDimensions = None
270
    attrs.WriteMask = 0
271
    attrs.UserWriteMask = 0
272
    attrs.Historizing = 0
273
    attrs.AccessLevel = ua.AccessLevelMask.CurrentRead
274
    attrs.UserAccessLevel = ua.AccessLevelMask.CurrentRead
275
    addnode.NodeAttributes = attrs
276
    results = server.add_nodes([addnode])
277
    results[0].StatusCode.check()
278
    return results[0].AddedNodeId
279
280
281 View Code Duplication
def _create_data_type(server, parentnodeid, nodeid, qname):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
282
    addnode = ua.AddNodesItem()
283
    addnode.RequestedNewNodeId = nodeid
284
    addnode.BrowseName = qname
285
    addnode.NodeClass = ua.NodeClass.DataType
286
    addnode.ParentNodeId = parentnodeid
287
    addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasSubType)
288
    #addnode.TypeDefinition = ua.NodeId(ua.ObjectIds.BaseDataVariableType) # Not type definition for types
289
    attrs = ua.DataTypeAttributes()
290
    attrs.Description = ua.LocalizedText(qname.Name)
291
    attrs.DisplayName = ua.LocalizedText(qname.Name)
292
    attrs.WriteMask = 0
293
    attrs.UserWriteMask = 0
294
    attrs.Historizing = 0
295
    attrs.IsAbstract = False  # True mean they cannot be instanciated
296
    addnode.NodeAttributes = attrs
297
    results = server.add_nodes([addnode])
298
    results[0].StatusCode.check()
299
    return results[0].AddedNodeId
300
301
302
def _create_method(parent, nodeid, qname, callback, inputs, outputs):
303
    addnode = ua.AddNodesItem()
304
    addnode.RequestedNewNodeId = nodeid
305
    addnode.BrowseName = qname
306
    addnode.NodeClass = ua.NodeClass.Method
307
    addnode.ParentNodeId = parent.nodeid
308
    addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasComponent)
309
    #node.TypeDefinition = ua.NodeId(ua.ObjectIds.BaseObjectType)
310
    attrs = ua.MethodAttributes()
311
    attrs.Description = ua.LocalizedText(qname.Name)
312
    attrs.DisplayName = ua.LocalizedText(qname.Name)
313
    attrs.WriteMask = 0
314
    attrs.UserWriteMask = 0
315
    attrs.Executable = True
316
    attrs.UserExecutable = True
317
    addnode.NodeAttributes = attrs
318
    results = parent.server.add_nodes([addnode])
319
    results[0].StatusCode.check()
320
    method = node.Node(parent.server, results[0].AddedNodeId)
321
    if inputs:
322
        create_property(method, ua.NodeId(), ua.QualifiedName("InputArguments", 0), [_vtype_to_argument(vtype) for vtype in inputs])
323
    if outputs:
324
        create_property(method, ua.NodeId(), ua.QualifiedName("OutputArguments", 0), [_vtype_to_argument(vtype) for vtype in outputs])
325
    parent.server.add_method_callback(method.nodeid, callback)
326
    return results[0].AddedNodeId
327
328
329
def _vtype_to_argument(vtype):
330
    if isinstance(vtype, ua.Argument):
331
        return vtype
332
333
    arg = ua.Argument()
334
    v = ua.Variant(None, vtype)
335
    arg.DataType = _guess_uatype(v)
336
    return arg
337
338
339
def _guess_uatype(variant):
340
    if variant.VariantType == ua.VariantType.ExtensionObject:
341
        if variant.Value is None:
342
            raise ua.UaError("Cannot guess DataType from Null ExtensionObject")
343
        if type(variant.Value) in (list, tuple):
344
            if len(variant.Value) == 0:
345
                raise ua.UaError("Cannot guess DataType from Null ExtensionObject")
346
            extobj = variant.Value[0]
347
        else:
348
            extobj = variant.Value
349
        classname = extobj.__class__.__name__
350
        return ua.NodeId(getattr(ua.ObjectIds, classname))
351
    else:
352
        return ua.NodeId(getattr(ua.ObjectIds, variant.VariantType.name))
353
354
355
def delete_nodes(server, nodes, recursive=False):
356
    """
357
    Delete specified nodes. Optionally delete recursively all nodes with a
358
    downward hierachic references to the node
359
    """
360
    nodestodelete = []
361
    if recursive:
362
        nodes += _add_childs(nodes)
363
    for mynode in nodes:
364
        it = ua.DeleteNodesItem()
365
        it.NodeId = mynode.nodeid
366
        it.DeleteTargetReferences = True
367
        nodestodelete.append(it)
368
    return server.delete_nodes(nodestodelete)
369
370
371
def _add_childs(nodes):
372
    results = []
373
    for mynode in nodes[:]:
374
        results += mynode.get_children()
375
    return results
376
377
378