Completed
Pull Request — master (#130)
by Denis
02:26
created

opcua.common.create_method()   A

Complexity

Conditions 3

Size

Total Lines 20

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 8
CRAP Score 3.072
Metric Value
cc 3
dl 0
loc 20
ccs 8
cts 10
cp 0.8
crap 3.072
rs 9.4285
1
"""
2
High level functions to create nodes
3
"""
4 1
from opcua import ua
5 1
from opcua.common import node
6
7
8 1
def _parse_add_args(*args):
9 1
    if isinstance(args[0], ua.NodeId):
10 1
        return args[0], args[1]
11 1
    elif isinstance(args[0], str):
12 1
        return ua.NodeId.from_string(args[0]), ua.QualifiedName.from_string(args[1])
13 1
    elif isinstance(args[0], int):
14 1
        return ua.generate_nodeid(args[0]), ua.QualifiedName(args[1], args[0])
15
    else:
16
        raise TypeError("Add methods takes a nodeid and a qualifiedname as argument, received %s" % args)
17
18
19 1
def create_folder(parent, *args):
20
    """
21
    create a child node folder
22
    arguments are nodeid, browsename
23
    or namespace index, name
24
    """
25 1
    nodeid, qname = _parse_add_args(*args)
26 1
    return node.Node(parent.server, _create_folder(parent.server, parent.nodeid, nodeid, qname))
27
28
29 1
def create_object(parent, *args):
30
    """
31
    create a child node object
32
    arguments are nodeid, browsename
33
    or namespace index, name
34
    """
35 1
    nodeid, qname = _parse_add_args(*args)
36 1
    return node.Node(parent.server, _create_object(parent.server, parent.nodeid, nodeid, qname))
37
38
39 1
def create_property(parent, *args):
40
    """
41
    create a child node property
42
    args are nodeid, browsename, value, [variant type]
43
    or idx, name, value, [variant type]
44
    """
45 1
    nodeid, qname = _parse_add_args(*args[:2])
46 1
    val = _to_variant(*args[2:])
47 1
    return node.Node(parent.server, _create_variable(parent.server, parent.nodeid, nodeid, qname, val, isproperty=True))
48
49
50 1
def create_variable(parent, *args):
51
    """
52
    create a child node variable
53
    args are nodeid, browsename, value, [variant type]
54
    or idx, name, value, [variant type]
55
    """
56 1
    nodeid, qname = _parse_add_args(*args[:2])
57 1
    val = _to_variant(*args[2:])
58 1
    return node.Node(parent.server, _create_variable(parent.server, parent.nodeid, nodeid, qname, val, isproperty=False))
59
60
61 1
def create_method(parent, *args):
62
    """
63
    create a child method object
64
    This is only possible on server side!!
65
    args are nodeid, browsename, method_to_be_called, [input argument types], [output argument types]
66
    or idx, name, method_to_be_called, [input argument types], [output argument types]
67
    if argument types is specified, child nodes advertising what arguments the method uses and returns will be created
68
    a callback is a method accepting the nodeid of the parent as first argument and variants after. returns a list of variants
69
    """
70 1
    nodeid, qname = _parse_add_args(*args[:2])
71 1
    callback = args[2]
72 1
    if len(args) > 3:
73 1
        inputs = args[3]
74
    else:
75
        inputs = []
76 1
    if len(args) > 4:
77 1
        outputs = args[4]
78
    else:
79
        outputs = []
80 1
    return _create_method(parent, nodeid, qname, callback, inputs, outputs)
81
82
83 1
def _create_folder(server, parentnodeid, nodeid, qname):
84 1
    addnode = ua.AddNodesItem()
85 1
    addnode.RequestedNewNodeId = nodeid
86 1
    addnode.BrowseName = qname
87 1
    addnode.NodeClass = ua.NodeClass.Object
88 1
    addnode.ParentNodeId = parentnodeid
89 1
    addnode.ReferenceTypeId = ua.NodeId.from_string("i=35")
90 1
    addnode.TypeDefinition = ua.NodeId.from_string("i=61")
91 1
    attrs = ua.ObjectAttributes()
92 1
    attrs.Description = ua.LocalizedText(qname.Name)
93 1
    attrs.DisplayName = ua.LocalizedText(qname.Name)
94 1
    attrs.WriteMask = ua.OpenFileMode.Read
95 1
    attrs.UserWriteMask = ua.OpenFileMode.Read
96 1
    attrs.EventNotifier = 0
97 1
    addnode.NodeAttributes = attrs
98 1
    results = server.add_nodes([addnode])
99 1
    results[0].StatusCode.check()
100 1
    return nodeid
101
102
103 1
def _create_object(server, parentnodeid, nodeid, qname):
104 1
    addnode = ua.AddNodesItem()
105 1
    addnode.RequestedNewNodeId = nodeid
106 1
    addnode.BrowseName = qname
107 1
    addnode.NodeClass = ua.NodeClass.Object
108 1
    addnode.ParentNodeId = parentnodeid
109 1
    addnode.ReferenceTypeId = ua.NodeId.from_string("i=35")
110 1
    addnode.TypeDefinition = ua.NodeId(ua.ObjectIds.BaseObjectType)
111 1
    attrs = ua.ObjectAttributes()
112 1
    attrs.Description = ua.LocalizedText(qname.Name)
113 1
    attrs.DisplayName = ua.LocalizedText(qname.Name)
114 1
    attrs.EventNotifier = 0
115 1
    attrs.WriteMask = ua.OpenFileMode.Read
116 1
    attrs.UserWriteMask = ua.OpenFileMode.Read
117 1
    addnode.NodeAttributes = attrs
118 1
    results = server.add_nodes([addnode])
119 1
    results[0].StatusCode.check()
120 1
    return nodeid
121
122
123 1
def _to_variant(val, vtype=None):
124 1
    if isinstance(val, ua.Variant):
125 1
        return val
126
    else:
127 1
        return ua.Variant(val, vtype)
128
129
130 1
def _create_variable(server, parentnodeid, nodeid, qname, val, isproperty=False):
131 1
    addnode = ua.AddNodesItem()
132 1
    addnode.RequestedNewNodeId = nodeid
133 1
    addnode.BrowseName = qname
134 1
    addnode.NodeClass = ua.NodeClass.Variable
135 1
    addnode.ParentNodeId = parentnodeid
136 1
    if isproperty:
137 1
        addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasProperty)
138 1
        addnode.TypeDefinition = ua.NodeId(ua.ObjectIds.PropertyType)
139
    else:
140 1
        addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasComponent)
141 1
        addnode.TypeDefinition = ua.NodeId(ua.ObjectIds.BaseDataVariableType)
142 1
    attrs = ua.VariableAttributes()
143 1
    attrs.Description = ua.LocalizedText(qname.Name)
144 1
    attrs.DisplayName = ua.LocalizedText(qname.Name)
145 1
    attrs.DataType = _guess_uatype(val)
146 1
    attrs.Value = val
147 1
    if isinstance(val, list) or isinstance(val, tuple):
148
        attrs.ValueRank = ua.ValueRank.OneDimension
149
    else:
150 1
        attrs.ValueRank = ua.ValueRank.Scalar
151
    #attrs.ArrayDimensions = None
152 1
    attrs.WriteMask = ua.OpenFileMode.Read
153 1
    attrs.UserWriteMask = ua.OpenFileMode.Read
154 1
    attrs.Historizing = 0
155 1
    addnode.NodeAttributes = attrs
156 1
    results = server.add_nodes([addnode])
157 1
    results[0].StatusCode.check()
158 1
    return nodeid
159
160
161 1
def _create_method(parent, nodeid, qname, callback, inputs, outputs):
162 1
    addnode = ua.AddNodesItem()
163 1
    addnode.RequestedNewNodeId = nodeid
164 1
    addnode.BrowseName = qname
165 1
    addnode.NodeClass = ua.NodeClass.Method
166 1
    addnode.ParentNodeId = parent.nodeid
167 1
    addnode.ReferenceTypeId = ua.NodeId.from_string("i=47")
168
    #node.TypeDefinition = ua.NodeId(ua.ObjectIds.BaseObjectType)
169 1
    attrs = ua.MethodAttributes()
170 1
    attrs.Description = ua.LocalizedText(qname.Name)
171 1
    attrs.DisplayName = ua.LocalizedText(qname.Name)
172 1
    attrs.WriteMask = ua.OpenFileMode.Read
173 1
    attrs.UserWriteMask = ua.OpenFileMode.Read
174 1
    attrs.Executable = True
175 1
    attrs.UserExecutable = True
176 1
    addnode.NodeAttributes = attrs
177 1
    results = parent.server.add_nodes([addnode])
178 1
    results[0].StatusCode.check()
179 1
    method = node.Node(parent.server, nodeid)
180 1
    if inputs:
181 1
        create_property(method, ua.generate_nodeid(qname.NamespaceIndex), ua.QualifiedName("InputArguments", 0), [_vtype_to_argument(vtype) for vtype in inputs])
182 1
    if outputs:
183 1
        create_property(method, ua.generate_nodeid(qname.NamespaceIndex), ua.QualifiedName("OutputArguments", 0), [_vtype_to_argument(vtype) for vtype in outputs])
184 1
    parent.server.add_method_callback(method.nodeid, callback)
185 1
    return nodeid
186
187
188 1
def _vtype_to_argument(vtype):
189 1
    if isinstance(vtype, ua.Argument):
190
        return vtype
191
192 1
    arg = ua.Argument()
193 1
    v = ua.Variant(None, vtype)
194 1
    arg.DataType = _guess_uatype(v)
195 1
    return arg
196
197
198 1
def _guess_uatype(variant):
199 1
    if variant.VariantType == ua.VariantType.ExtensionObject:
200 1
        if variant.Value is None:
201
            raise ua.UaError("Cannot guess DataType from Null ExtensionObject")
202 1
        if type(variant.Value) in (list, tuple):
203 1
            if len(variant.Value) == 0:
204
                raise ua.UaError("Cannot guess DataType from Null ExtensionObject")
205 1
            extobj = variant.Value[0]
206
        else:
207
            extobj = variant.Value
208 1
        classname = extobj.__class__.__name__
209 1
        return ua.NodeId(getattr(ua.ObjectIds, classname))
210
    else:
211 1
        return ua.NodeId(getattr(ua.ObjectIds, variant.VariantType.name))
212
213
214 1
def delete_nodes(server, nodes, recursive=False):
215
    """
216
    Delete specified nodes. Optionally delete recursively all nodes with a
217
    downward hierachic references to the node
218
    """
219 1
    nodestodelete = []
220 1
    if recursive:
221 1
        nodes += _add_childs(nodes)
222 1
    for mynode in nodes:
223 1
        it = ua.DeleteNodesItem()
224 1
        it.NodeId = mynode.nodeid
225 1
        it.DeleteTargetReferences = True
226 1
        nodestodelete.append(it)
227 1
    return server.delete_nodes(nodestodelete)
228
229
230 1
def _add_childs(nodes):
231 1
    results = []
232 1
    for mynode in nodes[:]:
233 1
        results += mynode.get_children()
234 1
    return results
235
236
237