Completed
Push — master ( 5b1468...e66350 )
by Olivier
02:24
created

create_object()   C

Complexity

Conditions 7

Size

Total Lines 25

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 8
CRAP Score 7

Importance

Changes 2
Bugs 0 Features 0
Metric Value
cc 7
c 2
b 0
f 0
dl 0
loc 25
ccs 8
cts 8
cp 1
crap 7
rs 5.5
1
"""
2
High level functions to create nodes
3
"""
4 1
from opcua import ua
5 1
from opcua.common import node
6
7
8 1
def _parse_add_args(*args):
9 1
    try:
10 1
        if isinstance(args[0], int):
11 1
            nodeid = ua.NodeId(0, int(args[0]))
12 1
            qname = ua.QualifiedName(args[1], int(args[0]))
13 1
            return nodeid, qname
14 1
        if isinstance(args[0], ua.NodeId):
15 1
            nodeid = args[0]
16 1
        elif isinstance(args[0], str):
17 1
            nodeid = ua.NodeId.from_string(args[0])
18
        else:
19 1
            raise RuntimeError()
20 1
        if isinstance(args[1], ua.QualifiedName):
21 1
            qname = args[1]
22 1
        elif isinstance(args[1], str):
23 1
            qname = ua.QualifiedName.from_string(args[1])
24
        else:
25 1
            raise RuntimeError()
26 1
        return nodeid, qname
27 1
    except ua.UaError:
28 1
        raise
29 1
    except Exception as ex:
30 1
        raise TypeError("This method takes either a namespace index and a string as argument or a nodeid and a qualifiedname. Received arguments {} and got exception {}".format(args, ex))
31
32
33 1
def create_folder(parent, *args):
34
    """
35
    create a child node folder
36
    arguments are nodeid, browsename
37
    or namespace index, name
38
    """
39 1
    nodeid, qname = _parse_add_args(*args)
40 1
    return node.Node(parent.server, _create_object(parent.server, parent.nodeid, nodeid, qname, ua.ObjectIds.FolderType))
41
42
43 1
def create_object(parent, *args):
44
    """
45
    create a child node object
46
    arguments are nodeid, browsename
47
    or namespace index, name
48
    """
49 1
    nodeid, qname = _parse_add_args(*args)
50 1
    objecttype = ua.ObjectIds.BaseObjectType
51
    if(len(args) == 3):
52
        objecttype = args[2]
53 1
        try:
54
            if isinstance(objecttype, int):
55
                objecttype = ua.NodeId(objecttype)
56
            elif isinstance(objecttype, ua.NodeId):
57
                objecttype = objecttype
58
            elif isinstance(objecttype, str):
59 1
                objecttype = ua.NodeId.from_string(objecttype)
60 1
            else:
61 1
                raise RuntimeError()
62
            return nodeid, qname
63
        except ua.UaError:
64 1
            raise
65
        except Exception as ex:
66
            raise TypeError("This provided objecttype takes either a index, nodeid or string. Received arguments {} and got exception {}".format(args, ex))
67
    return node.Node(parent.server, _create_object(parent.server, parent.nodeid, nodeid, qname, objecttype))
68
69
70 1
def create_property(parent, *args):
71 1
    """
72 1
    create a child node property
73
    args are nodeid, browsename, value, [variant type]
74
    or idx, name, value, [variant type]
75 1
    """
76
    nodeid, qname = _parse_add_args(*args[:2])
77
    val = _to_variant(*args[2:])
78
    return node.Node(parent.server, _create_variable(parent.server, parent.nodeid, nodeid, qname, val, isproperty=True))
79
80
81
def create_variable(parent, *args):
82
    """
83
    create a child node variable
84 1
    args are nodeid, browsename, value, [variant type]
85 1
    or idx, name, value, [variant type]
86 1
    """
87 1
    nodeid, qname = _parse_add_args(*args[:2])
88
    val = _to_variant(*args[2:])
89 1
    return node.Node(parent.server, _create_variable(parent.server, parent.nodeid, nodeid, qname, val, isproperty=False))
90 1
91 1
92
def create_method(parent, *args):
93 1
    """
94 1
    create a child method object
95
    This is only possible on server side!!
96
    args are nodeid, browsename, method_to_be_called, [input argument types], [output argument types]
97 1
    or idx, name, method_to_be_called, [input argument types], [output argument types]
98
    if argument types is specified, child nodes advertising what arguments the method uses and returns will be created
99
    a callback is a method accepting the nodeid of the parent as first argument and variants after. returns a list of variants
100
    """
101
    nodeid, qname = _parse_add_args(*args[:2])
102
    callback = args[2]
103 1
    if len(args) > 3:
104 1
        inputs = args[3]
105
    else:
106
        inputs = []
107 1
    if len(args) > 4:
108 1
        outputs = args[4]
109 1
    else:
110 1
        outputs = []
111 1
    return node.Node(parent.server, _create_method(parent, nodeid, qname, callback, inputs, outputs))
112
113 1
114 1
def create_subtype(parent, *args):
115 1
    """
116 1
    create a child node subtype
117 1
    arguments are nodeid, browsename
118
    or namespace index, name
119 1
    """
120 1
    nodeid, qname = _parse_add_args(*args)
121
    return node.Node(parent.server, _create_object(parent.server, parent.nodeid, nodeid, qname, None))
122 1
123
124 1
def _create_object(server, parentnodeid, nodeid, qname, objecttype):
125 1
    addnode = ua.AddNodesItem()
126 1
    addnode.RequestedNewNodeId = nodeid
127 1
    addnode.BrowseName = qname
128
    addnode.ParentNodeId = parentnodeid
129 1
    #TODO: maybe move to address_space.py and implement for all node types?
130 1
    if not objecttype:
131 1
        addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasSubtype)
132 1
        addnode.NodeClass = ua.NodeClass.ObjectType
133 1
        attrs = ua.ObjectTypeAttributes()
134 1
        attrs.IsAbstract = True
135 1
    else:
136 1
        if node.Node(server, parentnodeid).get_type_definition() == ua.ObjectIds.FolderType:
137
            addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.Organizes)
138
        else:
139 1
            addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasComponent)
140 1
141 1
        addnode.NodeClass = ua.NodeClass.Object
142
        if isinstance(objecttype, int):
143 1
            addnode.TypeDefinition = ua.NodeId(objecttype)
144
        elif isinstance(objecttype, ua.NodeId):
145
            addnode.TypeDefinition = objecttype
146 1
        attrs = ua.ObjectAttributes()
147 1
        attrs.EventNotifier = 0
148 1
149 1
    attrs.Description = ua.LocalizedText(qname.Name)
150 1
    attrs.DisplayName = ua.LocalizedText(qname.Name)
151 1
    attrs.WriteMask = 0
152 1
    attrs.UserWriteMask = 0
153 1
    addnode.NodeAttributes = attrs
154 1
    results = server.add_nodes([addnode])
155
    results[0].StatusCode.check()
156 1
    return results[0].AddedNodeId
157 1
158 1
159 1
def _to_variant(val, vtype=None):
160 1
    if isinstance(val, ua.Variant):
161 1
        return val
162 1
    else:
163 1
        return ua.Variant(val, vtype)
164
165
166 1
def _create_variable(server, parentnodeid, nodeid, qname, val, isproperty=False):
167
    addnode = ua.AddNodesItem()
168 1
    addnode.RequestedNewNodeId = nodeid
169 1
    addnode.BrowseName = qname
170 1
    addnode.NodeClass = ua.NodeClass.Variable
171 1
    addnode.ParentNodeId = parentnodeid
172 1
    if isproperty:
173 1
        addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasProperty)
174 1
        addnode.TypeDefinition = ua.NodeId(ua.ObjectIds.PropertyType)
175 1
    else:
176 1
        addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasComponent)
177
        addnode.TypeDefinition = ua.NodeId(ua.ObjectIds.BaseDataVariableType)
178
    attrs = ua.VariableAttributes()
179 1
    attrs.Description = ua.LocalizedText(qname.Name)
180 1
    attrs.DisplayName = ua.LocalizedText(qname.Name)
181 1
    attrs.DataType = _guess_uatype(val)
182 1
    attrs.Value = val
183 1
    if isinstance(val, list) or isinstance(val, tuple):
184 1
        attrs.ValueRank = ua.ValueRank.OneDimension
185 1
    else:
186
        attrs.ValueRank = ua.ValueRank.Scalar
187 1
    #attrs.ArrayDimensions = None
188 1
    attrs.WriteMask = 0
189 1
    attrs.UserWriteMask = 0
190 1
    attrs.Historizing = 0
191 1
    attrs.AccessLevel = ua.AccessLevelMask.CurrentRead
192 1
    attrs.UserAccessLevel = ua.AccessLevelMask.CurrentRead
193 1
    addnode.NodeAttributes = attrs
194 1
    results = server.add_nodes([addnode])
195 1
    results[0].StatusCode.check()
196 1
    return results[0].AddedNodeId
197 1
198 1
199 1
def _create_method(parent, nodeid, qname, callback, inputs, outputs):
200 1
    addnode = ua.AddNodesItem()
201 1
    addnode.RequestedNewNodeId = nodeid
202 1
    addnode.BrowseName = qname
203 1
    addnode.NodeClass = ua.NodeClass.Method
204
    addnode.ParentNodeId = parent.nodeid
205
    addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasComponent)
206 1
    #node.TypeDefinition = ua.NodeId(ua.ObjectIds.BaseObjectType)
207 1
    attrs = ua.MethodAttributes()
208
    attrs.Description = ua.LocalizedText(qname.Name)
209
    attrs.DisplayName = ua.LocalizedText(qname.Name)
210 1
    attrs.WriteMask = 0
211 1
    attrs.UserWriteMask = 0
212 1
    attrs.Executable = True
213 1
    attrs.UserExecutable = True
214
    addnode.NodeAttributes = attrs
215
    results = parent.server.add_nodes([addnode])
216 1
    results[0].StatusCode.check()
217 1
    method = node.Node(parent.server, results[0].AddedNodeId)
218 1
    if inputs:
219
        create_property(method, ua.NodeId(), ua.QualifiedName("InputArguments", 0), [_vtype_to_argument(vtype) for vtype in inputs])
220 1
    if outputs:
221 1
        create_property(method, ua.NodeId(), ua.QualifiedName("OutputArguments", 0), [_vtype_to_argument(vtype) for vtype in outputs])
222
    parent.server.add_method_callback(method.nodeid, callback)
223 1
    return results[0].AddedNodeId
224
225
226 1
def _vtype_to_argument(vtype):
227 1
    if isinstance(vtype, ua.Argument):
228
        return vtype
229 1
230
    arg = ua.Argument()
231
    v = ua.Variant(None, vtype)
232 1
    arg.DataType = _guess_uatype(v)
233
    return arg
234
235
236
def _guess_uatype(variant):
237 1
    if variant.VariantType == ua.VariantType.ExtensionObject:
238 1
        if variant.Value is None:
239 1
            raise ua.UaError("Cannot guess DataType from Null ExtensionObject")
240 1
        if type(variant.Value) in (list, tuple):
241 1
            if len(variant.Value) == 0:
242 1
                raise ua.UaError("Cannot guess DataType from Null ExtensionObject")
243 1
            extobj = variant.Value[0]
244 1
        else:
245 1
            extobj = variant.Value
246
        classname = extobj.__class__.__name__
247
        return ua.NodeId(getattr(ua.ObjectIds, classname))
248 1
    else:
249 1
        return ua.NodeId(getattr(ua.ObjectIds, variant.VariantType.name))
250 1
251 1
252 1
def delete_nodes(server, nodes, recursive=False):
253
    """
254
    Delete specified nodes. Optionally delete recursively all nodes with a
255
    downward hierachic references to the node
256
    """
257
    nodestodelete = []
258
    if recursive:
259
        nodes += _add_childs(nodes)
260
    for mynode in nodes:
261
        it = ua.DeleteNodesItem()
262
        it.NodeId = mynode.nodeid
263
        it.DeleteTargetReferences = True
264
        nodestodelete.append(it)
265
    return server.delete_nodes(nodestodelete)
266
267
268
def _add_childs(nodes):
269
    results = []
270
    for mynode in nodes[:]:
271
        results += mynode.get_children()
272
    return results
273
274
275