Completed
Pull Request — master (#244)
by Olivier
04:11
created

_parse_nodeid_qname()   D

Complexity

Conditions 8

Size

Total Lines 23

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 21
CRAP Score 8

Importance

Changes 0
Metric Value
cc 8
dl 0
loc 23
ccs 21
cts 21
cp 1
crap 8
rs 4.7619
c 0
b 0
f 0
1
"""
2
High level functions to create nodes
3
"""
4 1
from opcua import ua
5 1
from opcua.common import node
6
7
8 1
def _parse_nodeid_qname(*args):
9 1
    try:
10 1
        if isinstance(args[0], int):
11 1
            nodeid = ua.NodeId(0, int(args[0]))
12 1
            qname = ua.QualifiedName(args[1], int(args[0]))
13 1
            return nodeid, qname
14 1
        if isinstance(args[0], ua.NodeId):
15 1
            nodeid = args[0]
16 1
        elif isinstance(args[0], str):
17 1
            nodeid = ua.NodeId.from_string(args[0])
18
        else:
19 1
            raise RuntimeError()
20 1
        if isinstance(args[1], ua.QualifiedName):
21 1
            qname = args[1]
22 1
        elif isinstance(args[1], str):
23 1
            qname = ua.QualifiedName.from_string(args[1])
24
        else:
25 1
            raise RuntimeError()
26 1
        return nodeid, qname
27 1
    except ua.UaError:
28 1
        raise
29 1
    except Exception as ex:
30 1
        raise TypeError("This method takes either a namespace index and a string as argument or a nodeid and a qualifiedname. Received arguments {} and got exception {}".format(args, ex))
31
32
33 1
def create_folder(parent, nodeid, bname):
34
    """
35
    create a child node folder
36
    arguments are nodeid, browsename
37
    or namespace index, name
38
    """
39 1
    nodeid, qname = _parse_nodeid_qname(nodeid, bname)
40 1
    return node.Node(parent.server, _create_object(parent.server, parent.nodeid, nodeid, qname, ua.ObjectIds.FolderType))
41
42
43 1
def create_object(parent, nodeid, bname, objecttype=ua.ObjectIds.BaseObjectType):
44
    """
45
    create a child node object
46
    arguments are nodeid, browsename
47
    or namespace index, name
48
    """
49 1
    nodeid, qname = _parse_nodeid_qname(nodeid, bname)
50 1
    if isinstance(objecttype, int):
51
        objecttype = ua.NodeId(objecttype)
52
    elif isinstance(objecttype, ua.NodeId):
53 1
        objecttype = objecttype
54
    elif isinstance(objecttype, str):
55
        objecttype = ua.NodeId.from_string(objecttype)
56
    else:
57
        raise TypeError("Could not recognise format of objecttype")
58
    return node.Node(parent.server, _create_object(parent.server, parent.nodeid, nodeid, qname, objecttype))
59 1
60 1
61 1
def create_property(parent, nodeid, bname, val, datatype=None):
62
    """
63
    create a child node property
64 1
    args are nodeid, browsename, value, [variant type]
65
    or idx, name, value, [variant type]
66
    """
67
    nodeid, qname = _parse_nodeid_qname(nodeid, bname)
68
    val, datatype = _to_variant_with_datatype(val, datatype)
69
    if datatype and not isinstance(datatype, ua.NodeId):
70 1
        raise RuntimeError()
71 1
    return node.Node(parent.server, _create_variable(parent.server, parent.nodeid, nodeid, qname, val, datatype, isproperty=True))
72 1
73
74
def create_variable(parent, *args):
75 1
    """
76
    create a child node variable
77
    args are nodeid, browsename, value, [variant type], [data type]
78
    or idx, name, value, [variant type], [data type]
79
    """
80
    nodeid, qname = _parse_nodeid_qname(*args[:2])
81
    val, datatype = _to_variant_with_datatype(*args[2:])
82
    if datatype and not isinstance(datatype, ua.NodeId):
83
        raise RuntimeError()
84 1
85 1
    return node.Node(parent.server, _create_variable(parent.server, parent.nodeid, nodeid, qname, val, datatype, isproperty=False))
86 1
87 1
88
def create_variable_type(parent, nodeid, bname, datatype):
89 1
    """
90 1
    Create a new variable type
91 1
    args are nodeid, browsename, datatype, [variant type], [data type]
92
    or idx, name, value, [variant type], [data type] 
93 1
    """
94 1
    nodeid, qname = _parse_nodeid_qname(nodeid, bname)
95
    val, datatype = _to_variant_with_datatype(datatype)
96
    if datatype and not isinstance(datatype, ua.NodeId):
97 1
        raise RuntimeError()
98
 
99
    return node.Node(parent.server, _create_variable(parent.server, parent.nodeid, nodeid, qname, val, datatype, isproperty=False))
100
101
102
def create_data_type(parent, nodeid, bname):
103 1
    """
104 1
    Create a new data type to be used in new variables, etc ..
105
    arguments are nodeid, browsename
106
    or namespace index, name
107 1
    """
108 1
    nodeid, qname = _parse_nodeid_qname(nodeid, bname)
109 1
    return node.Node(parent.server, _create_data_type(parent.server, parent.nodeid, nodeid, qname))
110 1
111 1
112
def create_object_type(parent, nodeid, bname):
113 1
    """
114 1
    Create a new object type to be instanciated in address space.
115 1
    arguments are nodeid, browsename
116 1
    or namespace index, name
117 1
    """
118
    nodeid, qname = _parse_nodeid_qname(nodeid, bname)
119 1
    return node.Node(parent.server, _create_object_type(parent.server, parent.nodeid, nodeid, qname))
120 1
121
122 1
def create_method(parent, *args):
123
    """
124 1
    create a child method object
125 1
    This is only possible on server side!!
126 1
    args are nodeid, browsename, method_to_be_called, [input argument types], [output argument types]
127 1
    or idx, name, method_to_be_called, [input argument types], [output argument types]
128
    if argument types is specified, child nodes advertising what arguments the method uses and returns will be created
129 1
    a callback is a method accepting the nodeid of the parent as first argument and variants after. returns a list of variants
130 1
    """
131 1
    nodeid, qname = _parse_nodeid_qname(*args[:2])
132 1
    callback = args[2]
133 1
    if len(args) > 3:
134 1
        inputs = args[3]
135 1
    else:
136 1
        inputs = []
137
    if len(args) > 4:
138
        outputs = args[4]
139 1
    else:
140 1
        outputs = []
141 1
    return node.Node(parent.server, _create_method(parent, nodeid, qname, callback, inputs, outputs))
142
143 1
144
def _create_object(server, parentnodeid, nodeid, qname, objecttype):
145
    addnode = ua.AddNodesItem()
146 1
    addnode.RequestedNewNodeId = nodeid
147 1
    addnode.BrowseName = qname
148 1
    addnode.ParentNodeId = parentnodeid
149 1
    if node.Node(server, parentnodeid).get_type_definition() == ua.ObjectIds.FolderType:
150 1
        addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.Organizes)
151 1
    else:
152 1
        addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasComponent)
153 1
    addnode.NodeClass = ua.NodeClass.Object
154 1
    if isinstance(objecttype, int):
155
        addnode.TypeDefinition = ua.NodeId(objecttype)
156 1
    elif isinstance(objecttype, ua.NodeId):
157 1
        addnode.TypeDefinition = objecttype
158 1
    attrs = ua.ObjectAttributes()
159 1
    attrs.EventNotifier = 0
160 1
161 1
    attrs.Description = ua.LocalizedText(qname.Name)
162 1
    attrs.DisplayName = ua.LocalizedText(qname.Name)
163 1
    attrs.WriteMask = 0
164
    attrs.UserWriteMask = 0
165
    addnode.NodeAttributes = attrs
166 1
    results = server.add_nodes([addnode])
167
    results[0].StatusCode.check()
168 1
    return results[0].AddedNodeId
169 1
170 1
171 1
def _create_object_type(server, parentnodeid, nodeid, qname):
172 1
    addnode = ua.AddNodesItem()
173 1
    addnode.RequestedNewNodeId = nodeid
174 1
    addnode.BrowseName = qname
175 1
    addnode.ParentNodeId = parentnodeid
176 1
    addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasSubtype)
177
    addnode.NodeClass = ua.NodeClass.ObjectType
178
    attrs = ua.ObjectTypeAttributes()
179 1
    attrs.IsAbstract = False
180 1
    attrs.Description = ua.LocalizedText(qname.Name)
181 1
    attrs.DisplayName = ua.LocalizedText(qname.Name)
182 1 View Code Duplication
    attrs.WriteMask = 0
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
183 1
    attrs.UserWriteMask = 0
184 1
    addnode.NodeAttributes = attrs
185 1
    results = server.add_nodes([addnode])
186
    results[0].StatusCode.check()
187 1
    return results[0].AddedNodeId
188 1
189 1
190 1
def _to_variant(val, vtype=None):
191 1
    return _to_variant_with_datatype(val, vtype, datatype=None)[0]
192 1
193 1
194 1
def _to_variant_with_datatype(val, vtype=None, datatype=None):
195 1
    if isinstance(val, ua.Variant):
196 1
        if vtype:
197 1
            datatype = vtype
198 1
        return val, datatype
199 1
    else:
200 1
        return ua.Variant(val, vtype), datatype
201 1
202 1
203 1
def _create_variable(server, parentnodeid, nodeid, qname, val, datatype=None, isproperty=False):
204
    addnode = ua.AddNodesItem()
205
    addnode.RequestedNewNodeId = nodeid
206 1
    addnode.BrowseName = qname
207 1
    addnode.NodeClass = ua.NodeClass.Variable
208
    addnode.ParentNodeId = parentnodeid
209
    if isproperty:
210 1
        addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasProperty)
211 1
        addnode.TypeDefinition = ua.NodeId(ua.ObjectIds.PropertyType)
212 1
    else:
213 1
        addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasComponent)
214
        addnode.TypeDefinition = ua.NodeId(ua.ObjectIds.BaseDataVariableType)
215
    attrs = ua.VariableAttributes()
216 1
    attrs.Description = ua.LocalizedText(qname.Name)
217 1
    attrs.DisplayName = ua.LocalizedText(qname.Name)
218 1
    if datatype:
219
        attrs.DataType = datatype
220 1
    else:
221 1
        attrs.DataType = _guess_uatype(val)
222
223 1
    attrs.Value = val
224
    if isinstance(val, list) or isinstance(val, tuple):
225
        attrs.ValueRank = ua.ValueRank.OneDimension
226 1
    else:
227 1
        attrs.ValueRank = ua.ValueRank.Scalar
228
    #attrs.ArrayDimensions = None
229 1
    attrs.WriteMask = 0
230
    attrs.UserWriteMask = 0
231
    attrs.Historizing = 0
232 1
    attrs.AccessLevel = ua.AccessLevelMask.CurrentRead
233
    attrs.UserAccessLevel = ua.AccessLevelMask.CurrentRead
234
    addnode.NodeAttributes = attrs
235
    results = server.add_nodes([addnode])
236
    results[0].StatusCode.check()
237 1
    return results[0].AddedNodeId
238 1
239 1
240 1
def _create_variable_type(server, parentnodeid, nodeid, qname, datatype, value=None):
241 1
    addnode = ua.AddNodesItem()
242 1
    addnode.RequestedNewNodeId = nodeid
243 1
    addnode.BrowseName = qname
244 1
    addnode.NodeClass = ua.NodeClass.VariableType
245 1
    addnode.ParentNodeId = parentnodeid
246
    addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasSubType)
247
    #addnode.TypeDefinition = ua.NodeId(ua.ObjectIds.BaseDataVariableType)
248 1
    attrs = ua.VariableTypeAttributes()
249 1
    attrs.Description = ua.LocalizedText(qname.Name)
250 1
    attrs.DisplayName = ua.LocalizedText(qname.Name)
251 1
    attrs.DataType = datatype
252 1
    if value:
253
        attrs.Value = value
254
        if isinstance(value, (list, tuple)):
255
            attrs.ValueRank = ua.ValueRank.OneDimension
256
        else:
257
            attrs.ValueRank = ua.ValueRank.Scalar
258
    #attrs.ArrayDimensions = None
259
    attrs.WriteMask = 0
260
    attrs.UserWriteMask = 0
261
    attrs.Historizing = 0
262
    attrs.AccessLevel = ua.AccessLevelMask.CurrentRead
263
    attrs.UserAccessLevel = ua.AccessLevelMask.CurrentRead
264
    addnode.NodeAttributes = attrs
265
    results = server.add_nodes([addnode])
266
    results[0].StatusCode.check()
267
    return results[0].AddedNodeId
268
269
270
def _create_data_type(server, parentnodeid, nodeid, qname):
271
    addnode = ua.AddNodesItem()
272
    addnode.RequestedNewNodeId = nodeid
273
    addnode.BrowseName = qname
274
    addnode.NodeClass = ua.NodeClass.DataType
275
    addnode.ParentNodeId = parentnodeid
276
    addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasSubType)
277
    #addnode.TypeDefinition = ua.NodeId(ua.ObjectIds.BaseDataVariableType) # Not type definition for types
278
    attrs = ua.DataTypeAttributes()
279
    attrs.Description = ua.LocalizedText(qname.Name)
280
    attrs.DisplayName = ua.LocalizedText(qname.Name)
281 View Code Duplication
    attrs.WriteMask = 0
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
282
    attrs.UserWriteMask = 0
283
    attrs.Historizing = 0
284
    attrs.IsAbstract = False  # True mean they cannot be instanciated
285
    addnode.NodeAttributes = attrs
286
    results = server.add_nodes([addnode])
287
    results[0].StatusCode.check()
288
    return results[0].AddedNodeId
289
290
291
def _create_method(parent, nodeid, qname, callback, inputs, outputs):
292
    addnode = ua.AddNodesItem()
293
    addnode.RequestedNewNodeId = nodeid
294
    addnode.BrowseName = qname
295
    addnode.NodeClass = ua.NodeClass.Method
296
    addnode.ParentNodeId = parent.nodeid
297
    addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasComponent)
298
    #node.TypeDefinition = ua.NodeId(ua.ObjectIds.BaseObjectType)
299
    attrs = ua.MethodAttributes()
300
    attrs.Description = ua.LocalizedText(qname.Name)
301
    attrs.DisplayName = ua.LocalizedText(qname.Name)
302
    attrs.WriteMask = 0
303
    attrs.UserWriteMask = 0
304
    attrs.Executable = True
305
    attrs.UserExecutable = True
306
    addnode.NodeAttributes = attrs
307
    results = parent.server.add_nodes([addnode])
308
    results[0].StatusCode.check()
309
    method = node.Node(parent.server, results[0].AddedNodeId)
310
    if inputs:
311
        create_property(method, ua.NodeId(), ua.QualifiedName("InputArguments", 0), [_vtype_to_argument(vtype) for vtype in inputs])
312
    if outputs:
313
        create_property(method, ua.NodeId(), ua.QualifiedName("OutputArguments", 0), [_vtype_to_argument(vtype) for vtype in outputs])
314
    parent.server.add_method_callback(method.nodeid, callback)
315
    return results[0].AddedNodeId
316
317
318
def _vtype_to_argument(vtype):
319
    if isinstance(vtype, ua.Argument):
320
        return vtype
321
322
    arg = ua.Argument()
323
    v = ua.Variant(None, vtype)
324
    arg.DataType = _guess_uatype(v)
325
    return arg
326
327
328
def _guess_uatype(variant):
329
    if variant.VariantType == ua.VariantType.ExtensionObject:
330
        if variant.Value is None:
331
            raise ua.UaError("Cannot guess DataType from Null ExtensionObject")
332
        if type(variant.Value) in (list, tuple):
333
            if len(variant.Value) == 0:
334
                raise ua.UaError("Cannot guess DataType from Null ExtensionObject")
335
            extobj = variant.Value[0]
336
        else:
337
            extobj = variant.Value
338
        classname = extobj.__class__.__name__
339
        return ua.NodeId(getattr(ua.ObjectIds, classname))
340
    else:
341
        return ua.NodeId(getattr(ua.ObjectIds, variant.VariantType.name))
342
343
344
def delete_nodes(server, nodes, recursive=False):
345
    """
346
    Delete specified nodes. Optionally delete recursively all nodes with a
347
    downward hierachic references to the node
348
    """
349
    nodestodelete = []
350
    if recursive:
351
        nodes += _add_childs(nodes)
352
    for mynode in nodes:
353
        it = ua.DeleteNodesItem()
354
        it.NodeId = mynode.nodeid
355
        it.DeleteTargetReferences = True
356
        nodestodelete.append(it)
357
    return server.delete_nodes(nodestodelete)
358
359
360
def _add_childs(nodes):
361
    results = []
362
    for mynode in nodes[:]:
363
        results += mynode.get_children()
364
    return results
365
366
367