Completed
Pull Request — master (#149)
by Denis
02:39
created

opcua.common._parse_add_args()   B

Complexity

Conditions 6

Size

Total Lines 19

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 14
CRAP Score 6.0106
Metric Value
cc 6
dl 0
loc 19
ccs 14
cts 15
cp 0.9333
crap 6.0106
rs 8
1
"""
2
High level functions to create nodes
3
"""
4 1
from opcua import ua
5 1
from opcua.common import node
6
7
8 1
def _parse_add_args(*args):
9 1
    nodeid = ua.NodeId()
10 1
    qname = ua.QualifiedName()
11 1
    if isinstance(args[0], ua.NodeId):
12 1
        nodeid = args[0]
13 1
    elif isinstance(args[0], str):
14 1
        nodeid = ua.NodeId.from_string(args[0])
15
16 1
    if isinstance(args[1], ua.QualifiedName):
17 1
        qname = args[1]
18 1
    elif isinstance(args[1], str):
19 1
        if isinstance(args[0], int):
20 1
            qname = ua.QualifiedName(args[1], args[0])
21
        else:
22 1
            qname = ua.QualifiedName.from_string(args[1])
23
    else:
24
        raise TypeError("Add methods takes a nodeid and a qualifiedname as argument, for qualified name received %s" % args[1])
25
26 1
    return nodeid, qname
27
28
29 1
def create_folder(parent, *args):
30
    """
31
    create a child node folder
32
    arguments are nodeid, browsename
33
    or namespace index, name
34
    """
35 1
    nodeid, qname = _parse_add_args(*args)
36 1
    return node.Node(parent.server, _create_folder(parent.server, parent.nodeid, nodeid, qname))
37
38
39 1
def create_object(parent, *args):
40
    """
41
    create a child node object
42
    arguments are nodeid, browsename
43
    or namespace index, name
44
    """
45 1
    nodeid, qname = _parse_add_args(*args)
46 1
    return node.Node(parent.server, _create_object(parent.server, parent.nodeid, nodeid, qname))
47
48
49 1
def create_property(parent, *args):
50
    """
51
    create a child node property
52
    args are nodeid, browsename, value, [variant type]
53
    or idx, name, value, [variant type]
54
    """
55 1
    nodeid, qname = _parse_add_args(*args[:2])
56 1
    val = _to_variant(*args[2:])
57 1
    return node.Node(parent.server, _create_variable(parent.server, parent.nodeid, nodeid, qname, val, isproperty=True))
58
59
60 1
def create_variable(parent, *args):
61
    """
62
    create a child node variable
63
    args are nodeid, browsename, value, [variant type]
64
    or idx, name, value, [variant type]
65
    """
66 1
    nodeid, qname = _parse_add_args(*args[:2])
67 1
    val = _to_variant(*args[2:])
68 1
    return node.Node(parent.server, _create_variable(parent.server, parent.nodeid, nodeid, qname, val, isproperty=False))
69
70
71 1
def create_method(parent, *args):
72
    """
73
    create a child method object
74
    This is only possible on server side!!
75
    args are nodeid, browsename, method_to_be_called, [input argument types], [output argument types]
76
    or idx, name, method_to_be_called, [input argument types], [output argument types]
77
    if argument types is specified, child nodes advertising what arguments the method uses and returns will be created
78
    a callback is a method accepting the nodeid of the parent as first argument and variants after. returns a list of variants
79
    """
80 1
    nodeid, qname = _parse_add_args(*args[:2])
81 1
    callback = args[2]
82 1
    if len(args) > 3:
83 1
        inputs = args[3]
84
    else:
85
        inputs = []
86 1
    if len(args) > 4:
87 1
        outputs = args[4]
88
    else:
89
        outputs = []
90 1
    return _create_method(parent, nodeid, qname, callback, inputs, outputs)
91
92
93 1
def _create_folder(server, parentnodeid, nodeid, qname):
94 1
    addnode = ua.AddNodesItem()
95 1
    addnode.RequestedNewNodeId = nodeid
96 1
    addnode.BrowseName = qname
97 1
    addnode.NodeClass = ua.NodeClass.Object
98 1
    addnode.ParentNodeId = parentnodeid
99 1
    addnode.ReferenceTypeId = ua.NodeId.from_string("i=35")
100 1
    addnode.TypeDefinition = ua.NodeId.from_string("i=61")
101 1
    attrs = ua.ObjectAttributes()
102 1
    attrs.Description = ua.LocalizedText(qname.Name)
103 1
    attrs.DisplayName = ua.LocalizedText(qname.Name)
104 1
    attrs.WriteMask = ua.OpenFileMode.Read
105 1
    attrs.UserWriteMask = ua.OpenFileMode.Read
106 1
    attrs.EventNotifier = 0
107 1
    addnode.NodeAttributes = attrs
108 1
    results = server.add_nodes([addnode])
109 1
    results[0].StatusCode.check()
110 1
    return results[0].AddedNodeId
111
112
113 1
def _create_object(server, parentnodeid, nodeid, qname):
114 1
    addnode = ua.AddNodesItem()
115 1
    addnode.RequestedNewNodeId = nodeid
116 1
    addnode.BrowseName = qname
117 1
    addnode.NodeClass = ua.NodeClass.Object
118 1
    addnode.ParentNodeId = parentnodeid
119 1
    addnode.ReferenceTypeId = ua.NodeId.from_string("i=35")
120 1
    addnode.TypeDefinition = ua.NodeId(ua.ObjectIds.BaseObjectType)
121 1
    attrs = ua.ObjectAttributes()
122 1
    attrs.Description = ua.LocalizedText(qname.Name)
123 1
    attrs.DisplayName = ua.LocalizedText(qname.Name)
124 1
    attrs.EventNotifier = 0
125 1
    attrs.WriteMask = ua.OpenFileMode.Read
126 1
    attrs.UserWriteMask = ua.OpenFileMode.Read
127 1
    addnode.NodeAttributes = attrs
128 1
    results = server.add_nodes([addnode])
129 1
    results[0].StatusCode.check()
130 1
    return results[0].AddedNodeId
131
132
133 1
def _to_variant(val, vtype=None):
134 1
    if isinstance(val, ua.Variant):
135 1
        return val
136
    else:
137 1
        return ua.Variant(val, vtype)
138
139
140 1
def _create_variable(server, parentnodeid, nodeid, qname, val, isproperty=False):
141 1
    addnode = ua.AddNodesItem()
142 1
    addnode.RequestedNewNodeId = nodeid
143 1
    addnode.BrowseName = qname
144 1
    addnode.NodeClass = ua.NodeClass.Variable
145 1
    addnode.ParentNodeId = parentnodeid
146 1
    if isproperty:
147 1
        addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasProperty)
148 1
        addnode.TypeDefinition = ua.NodeId(ua.ObjectIds.PropertyType)
149
    else:
150 1
        addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasComponent)
151 1
        addnode.TypeDefinition = ua.NodeId(ua.ObjectIds.BaseDataVariableType)
152 1
    attrs = ua.VariableAttributes()
153 1
    attrs.Description = ua.LocalizedText(qname.Name)
154 1
    attrs.DisplayName = ua.LocalizedText(qname.Name)
155 1
    attrs.DataType = _guess_uatype(val)
156 1
    attrs.Value = val
157 1
    if isinstance(val, list) or isinstance(val, tuple):
158
        attrs.ValueRank = ua.ValueRank.OneDimension
159
    else:
160 1
        attrs.ValueRank = ua.ValueRank.Scalar
161
    #attrs.ArrayDimensions = None
162 1
    attrs.WriteMask = ua.OpenFileMode.Read
163 1
    attrs.UserWriteMask = ua.OpenFileMode.Read
164 1
    attrs.Historizing = 0
165 1
    addnode.NodeAttributes = attrs
166 1
    results = server.add_nodes([addnode])
167 1
    results[0].StatusCode.check()
168 1
    return results[0].AddedNodeId
169
170
171 1
def _create_method(parent, nodeid, qname, callback, inputs, outputs):
172 1
    addnode = ua.AddNodesItem()
173 1
    addnode.RequestedNewNodeId = nodeid
174 1
    addnode.BrowseName = qname
175 1
    addnode.NodeClass = ua.NodeClass.Method
176 1
    addnode.ParentNodeId = parent.nodeid
177 1
    addnode.ReferenceTypeId = ua.NodeId.from_string("i=47")
178
    #node.TypeDefinition = ua.NodeId(ua.ObjectIds.BaseObjectType)
179 1
    attrs = ua.MethodAttributes()
180 1
    attrs.Description = ua.LocalizedText(qname.Name)
181 1
    attrs.DisplayName = ua.LocalizedText(qname.Name)
182 1
    attrs.WriteMask = ua.OpenFileMode.Read
183 1
    attrs.UserWriteMask = ua.OpenFileMode.Read
184 1
    attrs.Executable = True
185 1
    attrs.UserExecutable = True
186 1
    addnode.NodeAttributes = attrs
187 1
    results = parent.server.add_nodes([addnode])
188 1
    results[0].StatusCode.check()
189 1
    method = node.Node(parent.server, results[0].AddedNodeId)
190 1
    if inputs:
191 1
        create_property(method, ua.NodeId(), ua.QualifiedName("InputArguments", 0), [_vtype_to_argument(vtype) for vtype in inputs])
192 1
    if outputs:
193 1
        create_property(method, ua.NodeId(), ua.QualifiedName("OutputArguments", 0), [_vtype_to_argument(vtype) for vtype in outputs])
194 1
    parent.server.add_method_callback(method.nodeid, callback)
195 1
    return results[0].AddedNodeId
196
197
198 1
def _vtype_to_argument(vtype):
199 1
    if isinstance(vtype, ua.Argument):
200
        return vtype
201
202 1
    arg = ua.Argument()
203 1
    v = ua.Variant(None, vtype)
204 1
    arg.DataType = _guess_uatype(v)
205 1
    return arg
206
207
208 1
def _guess_uatype(variant):
209 1
    if variant.VariantType == ua.VariantType.ExtensionObject:
210 1
        if variant.Value is None:
211
            raise ua.UaError("Cannot guess DataType from Null ExtensionObject")
212 1
        if type(variant.Value) in (list, tuple):
213 1
            if len(variant.Value) == 0:
214
                raise ua.UaError("Cannot guess DataType from Null ExtensionObject")
215 1
            extobj = variant.Value[0]
216
        else:
217
            extobj = variant.Value
218 1
        classname = extobj.__class__.__name__
219 1
        return ua.NodeId(getattr(ua.ObjectIds, classname))
220
    else:
221 1
        return ua.NodeId(getattr(ua.ObjectIds, variant.VariantType.name))
222
223
224 1
def delete_nodes(server, nodes, recursive=False):
225
    """
226
    Delete specified nodes. Optionally delete recursively all nodes with a
227
    downward hierachic references to the node
228
    """
229 1
    nodestodelete = []
230 1
    if recursive:
231 1
        nodes += _add_childs(nodes)
232 1
    for mynode in nodes:
233 1
        it = ua.DeleteNodesItem()
234 1
        it.NodeId = mynode.nodeid
235 1
        it.DeleteTargetReferences = True
236 1
        nodestodelete.append(it)
237 1
    return server.delete_nodes(nodestodelete)
238
239
240 1
def _add_childs(nodes):
241 1
    results = []
242 1
    for mynode in nodes[:]:
243 1
        results += mynode.get_children()
244 1
    return results
245
246
247