Completed
Push — master ( 15ed61...18100b )
by Olivier
03:06 queued 35s
created

opcua.common._create_variable()   B

Complexity

Conditions 4

Size

Total Lines 31

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 27
CRAP Score 4.0007
Metric Value
cc 4
dl 0
loc 31
ccs 27
cts 28
cp 0.9643
crap 4.0007
rs 8.5806
1
"""
2
High level functions to create nodes
3
"""
4 1
from opcua import ua
5 1
from opcua.common import node
6
7
8 1
def _parse_add_args(*args):
9 1
    try:
10 1
        if isinstance(args[0], int):
11 1
            nodeid = ua.NodeId(0, int(args[0]))
12 1
            qname = ua.QualifiedName(args[1], int(args[0]))
13 1
            return nodeid, qname
14 1
        if isinstance(args[0], ua.NodeId):
15 1
            nodeid = args[0]
16 1
        elif isinstance(args[0], str):
17 1
            nodeid = ua.NodeId.from_string(args[0])
18
        else:
19 1
            raise RuntimeError()
20 1
        if isinstance(args[1], ua.QualifiedName):
21 1
            qname = args[1]
22 1
        elif isinstance(args[1], str):
23 1
            qname = ua.QualifiedName.from_string(args[1])
24
        else:
25 1
            raise RuntimeError()
26 1
        return nodeid, qname
27 1
    except ua.UaError:
28 1
        raise
29 1
    except Exception as ex:
30 1
        raise TypeError("This method takes either a namespace index and a string as argument or a nodeid and a qualifiedname. Received arguments {} and got exception {}".format(args, ex))
31
32
33 1
def create_folder(parent, *args):
34
    """
35
    create a child node folder
36
    arguments are nodeid, browsename
37
    or namespace index, name
38
    """
39 1
    nodeid, qname = _parse_add_args(*args)
40 1
    return node.Node(parent.server, _create_folder(parent.server, parent.nodeid, nodeid, qname))
41
42
43 1
def create_object(parent, *args):
44
    """
45
    create a child node object
46
    arguments are nodeid, browsename
47
    or namespace index, name
48
    """
49 1
    nodeid, qname = _parse_add_args(*args)
50 1
    return node.Node(parent.server, _create_object(parent.server, parent.nodeid, nodeid, qname))
51
52
53 1
def create_property(parent, *args):
54
    """
55
    create a child node property
56
    args are nodeid, browsename, value, [variant type]
57
    or idx, name, value, [variant type]
58
    """
59 1
    nodeid, qname = _parse_add_args(*args[:2])
60 1
    val = _to_variant(*args[2:])
61 1
    return node.Node(parent.server, _create_variable(parent.server, parent.nodeid, nodeid, qname, val, isproperty=True))
62
63
64 1
def create_variable(parent, *args):
65
    """
66
    create a child node variable
67
    args are nodeid, browsename, value, [variant type]
68
    or idx, name, value, [variant type]
69
    """
70 1
    nodeid, qname = _parse_add_args(*args[:2])
71 1
    val = _to_variant(*args[2:])
72 1
    return node.Node(parent.server, _create_variable(parent.server, parent.nodeid, nodeid, qname, val, isproperty=False))
73
74
75 1
def create_method(parent, *args):
76
    """
77
    create a child method object
78
    This is only possible on server side!!
79
    args are nodeid, browsename, method_to_be_called, [input argument types], [output argument types]
80
    or idx, name, method_to_be_called, [input argument types], [output argument types]
81
    if argument types is specified, child nodes advertising what arguments the method uses and returns will be created
82
    a callback is a method accepting the nodeid of the parent as first argument and variants after. returns a list of variants
83
    """
84 1
    nodeid, qname = _parse_add_args(*args[:2])
85 1
    callback = args[2]
86 1
    if len(args) > 3:
87 1
        inputs = args[3]
88
    else:
89
        inputs = []
90 1
    if len(args) > 4:
91 1
        outputs = args[4]
92
    else:
93
        outputs = []
94 1
    return _create_method(parent, nodeid, qname, callback, inputs, outputs)
95
96
97 1
def _create_folder(server, parentnodeid, nodeid, qname):
98 1
    addnode = ua.AddNodesItem()
99 1
    addnode.RequestedNewNodeId = nodeid
100 1
    addnode.BrowseName = qname
101 1
    addnode.NodeClass = ua.NodeClass.Object
102 1
    addnode.ParentNodeId = parentnodeid
103 1
    addnode.ReferenceTypeId = ua.NodeId.from_string("i=35")
104 1
    addnode.TypeDefinition = ua.NodeId.from_string("i=61")
105 1
    attrs = ua.ObjectAttributes()
106 1
    attrs.Description = ua.LocalizedText(qname.Name)
107 1
    attrs.DisplayName = ua.LocalizedText(qname.Name)
108 1
    attrs.WriteMask = 0
109 1
    attrs.UserWriteMask = 0
110 1
    attrs.EventNotifier = 0
111 1
    addnode.NodeAttributes = attrs
112 1
    results = server.add_nodes([addnode])
113 1
    results[0].StatusCode.check()
114 1
    return results[0].AddedNodeId
115
116
117 1
def _create_object(server, parentnodeid, nodeid, qname):
118 1
    addnode = ua.AddNodesItem()
119 1
    addnode.RequestedNewNodeId = nodeid
120 1
    addnode.BrowseName = qname
121 1
    addnode.NodeClass = ua.NodeClass.Object
122 1
    addnode.ParentNodeId = parentnodeid
123 1
    addnode.ReferenceTypeId = ua.NodeId.from_string("i=35")
124 1
    addnode.TypeDefinition = ua.NodeId(ua.ObjectIds.BaseObjectType)
125 1
    attrs = ua.ObjectAttributes()
126 1
    attrs.Description = ua.LocalizedText(qname.Name)
127 1
    attrs.DisplayName = ua.LocalizedText(qname.Name)
128 1
    attrs.EventNotifier = 0
129 1
    attrs.WriteMask = 0
130 1
    attrs.UserWriteMask = 0
131 1
    addnode.NodeAttributes = attrs
132 1
    results = server.add_nodes([addnode])
133 1
    results[0].StatusCode.check()
134 1
    return results[0].AddedNodeId
135
136
137 1
def _to_variant(val, vtype=None):
138 1
    if isinstance(val, ua.Variant):
139 1
        return val
140
    else:
141 1
        return ua.Variant(val, vtype)
142
143
144 1
def _create_variable(server, parentnodeid, nodeid, qname, val, isproperty=False):
145 1
    addnode = ua.AddNodesItem()
146 1
    addnode.RequestedNewNodeId = nodeid
147 1
    addnode.BrowseName = qname
148 1
    addnode.NodeClass = ua.NodeClass.Variable
149 1
    addnode.ParentNodeId = parentnodeid
150 1
    if isproperty:
151 1
        addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasProperty)
152 1
        addnode.TypeDefinition = ua.NodeId(ua.ObjectIds.PropertyType)
153
    else:
154 1
        addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasComponent)
155 1
        addnode.TypeDefinition = ua.NodeId(ua.ObjectIds.BaseDataVariableType)
156 1
    attrs = ua.VariableAttributes()
157 1
    attrs.Description = ua.LocalizedText(qname.Name)
158 1
    attrs.DisplayName = ua.LocalizedText(qname.Name)
159 1
    attrs.DataType = _guess_uatype(val)
160 1
    attrs.Value = val
161 1
    if isinstance(val, list) or isinstance(val, tuple):
162
        attrs.ValueRank = ua.ValueRank.OneDimension
163
    else:
164 1
        attrs.ValueRank = ua.ValueRank.Scalar
165
    #attrs.ArrayDimensions = None
166 1
    attrs.WriteMask = 0
167 1
    attrs.UserWriteMask = 0
168 1
    attrs.Historizing = 0
169 1
    attrs.AccessLevel = ua.AccessLevelMask.CurrentRead
170 1
    attrs.UserAccessLevel = ua.AccessLevelMask.CurrentRead
171 1
    addnode.NodeAttributes = attrs
172 1
    results = server.add_nodes([addnode])
173 1
    results[0].StatusCode.check()
174 1
    return results[0].AddedNodeId
175
176
177 1
def _create_method(parent, nodeid, qname, callback, inputs, outputs):
178 1
    addnode = ua.AddNodesItem()
179 1
    addnode.RequestedNewNodeId = nodeid
180 1
    addnode.BrowseName = qname
181 1
    addnode.NodeClass = ua.NodeClass.Method
182 1
    addnode.ParentNodeId = parent.nodeid
183 1
    addnode.ReferenceTypeId = ua.NodeId.from_string("i=47")
184
    #node.TypeDefinition = ua.NodeId(ua.ObjectIds.BaseObjectType)
185 1
    attrs = ua.MethodAttributes()
186 1
    attrs.Description = ua.LocalizedText(qname.Name)
187 1
    attrs.DisplayName = ua.LocalizedText(qname.Name)
188 1
    attrs.WriteMask = 0
189 1
    attrs.UserWriteMask = 0
190 1
    attrs.Executable = True
191 1
    attrs.UserExecutable = True
192 1
    addnode.NodeAttributes = attrs
193 1
    results = parent.server.add_nodes([addnode])
194 1
    results[0].StatusCode.check()
195 1
    method = node.Node(parent.server, results[0].AddedNodeId)
196 1
    if inputs:
197 1
        create_property(method, ua.NodeId(), ua.QualifiedName("InputArguments", 0), [_vtype_to_argument(vtype) for vtype in inputs])
198 1
    if outputs:
199 1
        create_property(method, ua.NodeId(), ua.QualifiedName("OutputArguments", 0), [_vtype_to_argument(vtype) for vtype in outputs])
200 1
    parent.server.add_method_callback(method.nodeid, callback)
201 1
    return results[0].AddedNodeId
202
203
204 1
def _vtype_to_argument(vtype):
205 1
    if isinstance(vtype, ua.Argument):
206
        return vtype
207
208 1
    arg = ua.Argument()
209 1
    v = ua.Variant(None, vtype)
210 1
    arg.DataType = _guess_uatype(v)
211 1
    return arg
212
213
214 1
def _guess_uatype(variant):
215 1
    if variant.VariantType == ua.VariantType.ExtensionObject:
216 1
        if variant.Value is None:
217
            raise ua.UaError("Cannot guess DataType from Null ExtensionObject")
218 1
        if type(variant.Value) in (list, tuple):
219 1
            if len(variant.Value) == 0:
220
                raise ua.UaError("Cannot guess DataType from Null ExtensionObject")
221 1
            extobj = variant.Value[0]
222
        else:
223
            extobj = variant.Value
224 1
        classname = extobj.__class__.__name__
225 1
        return ua.NodeId(getattr(ua.ObjectIds, classname))
226
    else:
227 1
        return ua.NodeId(getattr(ua.ObjectIds, variant.VariantType.name))
228
229
230 1
def delete_nodes(server, nodes, recursive=False):
231
    """
232
    Delete specified nodes. Optionally delete recursively all nodes with a
233
    downward hierachic references to the node
234
    """
235 1
    nodestodelete = []
236 1
    if recursive:
237 1
        nodes += _add_childs(nodes)
238 1
    for mynode in nodes:
239 1
        it = ua.DeleteNodesItem()
240 1
        it.NodeId = mynode.nodeid
241 1
        it.DeleteTargetReferences = True
242 1
        nodestodelete.append(it)
243 1
    return server.delete_nodes(nodestodelete)
244
245
246 1
def _add_childs(nodes):
247 1
    results = []
248 1
    for mynode in nodes[:]:
249 1
        results += mynode.get_children()
250 1
    return results
251
252
253