Completed
Push — dev ( 7c3457...b4ea62 )
by Olivier
04:07
created

opcua.Node.get_attribute()   A

Complexity

Conditions 1

Size

Total Lines 13

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 9
CRAP Score 1
Metric Value
dl 0
loc 13
ccs 9
cts 9
cp 1
rs 9.4286
cc 1
crap 1
1
"""
2
High level node object, to access node attribute
3
and browse address space
4
"""
5
6 1
from opcua import ua
7
8
9 1
def create_folder(parent, *args):
10
    """
11
    create a child node folder
12
    arguments are nodeid, browsename
13
    or namespace index, name
14
    """
15 1
    nodeid, qname = _parse_add_args(*args)
16 1
    return Node(parent.server, _create_folder(parent.server, parent.nodeid, nodeid, qname))
17
18
19 1
def create_object(parent, *args):
20
    """
21
    create a child node object
22
    arguments are nodeid, browsename
23
    or namespace index, name
24
    """
25 1
    nodeid, qname = _parse_add_args(*args)
26 1
    return Node(parent.server, _create_object(parent.server, parent.nodeid, nodeid, qname))
27
28
29 1
def create_property(parent, *args):
30
    """
31
    create a child node property
32
    args are nodeid, browsename, value, [variant type]
33
    or idx, name, value, [variant type]
34
    """
35 1
    nodeid, qname = _parse_add_args(*args[:2])
36 1
    val = _to_variant(*args[2:])
37 1
    return Node(parent.server, _create_variable(parent.server, parent.nodeid, nodeid, qname, val, isproperty=True))
38
39
40 1
def create_variable(parent, *args):
41
    """
42
    create a child node variable
43
    args are nodeid, browsename, value, [variant type]
44
    or idx, name, value, [variant type]
45
    """
46 1
    nodeid, qname = _parse_add_args(*args[:2])
47 1
    val = _to_variant(*args[2:])
48 1
    return Node(parent.server, _create_variable(parent.server, parent.nodeid, nodeid, qname, val, isproperty=False))
49
50
51 1
def create_method(parent, *args):
52
    """
53
    create a child method object
54
    This is only possible on server side!!
55
    args are nodeid, browsename, method_to_be_called, [input argument types], [output argument types]
56
    or idx, name, method_to_be_called, [input argument types], [output argument types]
57
    if argument types is specified, child nodes advertising what arguments the method uses and returns will be created
58
    a callback is a method accepting the nodeid of the parent as first argument and variants after. returns a list of variants
59
    """
60 1
    nodeid, qname = _parse_add_args(*args[:2])
61 1
    callback = args[2]
62 1
    if len(args) > 3:
63 1
        inputs = args[3]
64 1
    if len(args) > 4:
65 1
        outputs = args[4]
66 1
    return _create_method(parent, nodeid, qname, callback, inputs, outputs)
67
68
69 1
def call_method(parent, methodid, *args):
70
    """
71
    Call an OPC-UA method. methodid is browse name of child method or the
72
    nodeid of method as a NodeId object
73
    arguments are variants or python object convertible to variants.
74
    which may be of different types
75
    returns a list of variants which are output of the method
76
    """
77 1
    if isinstance(methodid, str):
78 1
        methodid = parent.get_child(methodid).nodeid
79 1
    elif isinstance(methodid, Node):
80 1
        methodid = methodid.nodeid
81
82 1
    arguments = []
83 1
    for arg in args:
84 1
        if not isinstance(arg, ua.Variant):
85 1
            arg = ua.Variant(arg)
86 1
        arguments.append(arg)
87
88 1
    result = _call_method(parent.server, parent.nodeid, methodid, arguments)
89
90 1
    if len(result.OutputArguments) == 0:
91
        return None
92 1
    elif len(result.OutputArguments) == 1:
93 1
        return result.OutputArguments[0].Value
94
    else:
95
        return [var.Value for var in result.OutputArguments]
96
97
98 1
class Node(object):
99
100
    """
101
    High level node object, to access node attribute,
102
    browse and populate address space.
103
    Node objects are usefull as-is but they do not expose the entire possibilities of the OPC-UA protocol. Feel free to look at Node code and
104
    """
105
106 1
    def __init__(self, server, nodeid):
107 1
        self.server = server
108 1
        self.nodeid = None
109 1
        if isinstance(nodeid, ua.NodeId):
110 1
            self.nodeid = nodeid
111
        elif type(nodeid) in (str, bytes):
112
            self.nodeid = ua.NodeId.from_string(nodeid)
113
        elif isinstance(nodeid, int):
114
            self.nodeid = ua.NodeId(nodeid, 0)
115
        else:
116
            raise Exception("argument to node must be a NodeId object or a string defining a nodeid found {} of type {}".format(nodeid, type(nodeid)))
117
118 1
    def __eq__(self, other):
119 1
        if isinstance(other, Node) and self.nodeid == other.nodeid:
120 1
            return True
121 1
        return False
122
123 1
    def __str__(self):
124
        return "Node({})".format(self.nodeid)
125 1
    __repr__ = __str__
126
127 1
    def get_browse_name(self):
128
        """
129
        Get browse name of a node. A browse name is a QualifiedName object
130
        composed of a string(name) and a namespace index.
131
        """
132 1
        result = self.get_attribute(ua.AttributeIds.BrowseName)
133 1
        return result.Value.Value
134
135 1
    def get_display_name(self):
136
        """
137
        get description attribute of node
138
        """
139 1
        result = self.get_attribute(ua.AttributeIds.DisplayName)
140 1
        return result.Value.Value
141
142 1
    def get_data_type(self):
143
        """
144
        get data type of node
145
        """
146 1
        result = self.get_attribute(ua.AttributeIds.DataType)
147 1
        return result.Value.Value
148
149 1
    def get_node_class(self):
150
        """
151
        get node class attribute of node
152
        """
153 1
        result = self.get_attribute(ua.AttributeIds.NodeClass)
154 1
        return result.Value.Value
155
156 1
    def get_description(self):
157
        """
158
        get description attribute class of node
159
        """
160 1
        result = self.get_attribute(ua.AttributeIds.Description)
161 1
        return result.Value.Value
162
163 1
    def get_value(self):
164
        """
165
        Get value of a node as a python type. Only variables ( and properties) have values.
166
        An exception will be generated for other node types.
167
        """
168 1
        result = self.get_data_value()
169 1
        return result.Value.Value
170
171 1
    def get_data_value(self):
172
        """
173
        Get value of a node as a DataValue object. Only variables (and properties) have values.
174
        An exception will be generated for other node types.
175
        DataValue contain a variable value as a variant as well as server and source timestamps
176
        """
177 1
        return self.get_attribute(ua.AttributeIds.Value)
178
179 1
    def set_value(self, value, varianttype=None):
180
        """
181
        Set value of a node. Only variables(properties) have values.
182
        An exception will be generated for other node types.
183
        value argument is either:
184
        * a python built-in type, converted to opc-ua
185
        optionnaly using the variantype argument.
186
        * a ua.Variant, varianttype is then ignored
187
        * a ua.DataValue, you then have full control over data send to server
188
        """
189 1
        datavalue = None
190 1
        if isinstance(value, ua.DataValue):
191 1
            datavalue = value
192 1
        elif isinstance(value, ua.Variant):
193 1
            datavalue = ua.DataValue(value)
194
        else:
195 1
            datavalue = ua.DataValue(ua.Variant(value, varianttype))
196 1
        self.set_attribute(ua.AttributeIds.Value, datavalue)
197
198 1
    set_data_value = set_value
199
200 1
    def set_writable(self, writable=True):
201
        """
202
        Set node as writable by clients.
203
        A node is always writable on server side.
204
        """
205 1
        if writable:
206 1
            self.set_attribute(ua.AttributeIds.AccessLevel, ua.DataValue(ua.Variant(ua.AccessLevelMask.CurrentWrite, ua.VariantType.Byte)))
207 1
            self.set_attribute(ua.AttributeIds.UserAccessLevel, ua.DataValue(ua.Variant(ua.AccessLevelMask.CurrentWrite, ua.VariantType.Byte)))
208
        else:
209 1
            self.set_attribute(ua.AttributeIds.AccessLevel, ua.DataValue(ua.Variant(ua.AccessLevelMask.CurrentRead, ua.VariantType.Byte)))
210 1
            self.set_attribute(ua.AttributeIds.AccessLevel, ua.DataValue(ua.Variant(ua.AccessLevelMask.CurrentRead, ua.VariantType.Byte)))
211
212 1
    def set_read_only(self):
213
        """
214
        Set a node as read-only for clients.
215
        A node is always writable on server side.
216
        """
217
        return self.set_writable(False)
218
219 1
    def set_attribute(self, attributeid, datavalue):
220
        """
221
        Set an attribute of a node
222
        """
223 1
        attr = ua.WriteValue()
224 1
        attr.NodeId = self.nodeid
225 1
        attr.AttributeId = attributeid
226 1
        attr.Value = datavalue
227 1
        params = ua.WriteParameters()
228 1
        params.NodesToWrite = [attr]
229 1
        result = self.server.write(params)
230 1
        result[0].check()
231
232 1
    def get_attribute(self, attr):
233
        """
234
        Read one attribute of a node
235
        result code from server is checked and an exception is raised in case of error
236
        """
237 1
        rv = ua.ReadValueId()
238 1
        rv.NodeId = self.nodeid
239 1
        rv.AttributeId = attr
240 1
        params = ua.ReadParameters()
241 1
        params.NodesToRead.append(rv)
242 1
        result = self.server.read(params)
243 1
        result[0].StatusCode.check()
244 1
        return result[0]
245
246 1
    def get_attributes(self, attrs):
247
        """
248
        Read several attributes of a node
249
        list of DataValue is returned
250
        """
251
        params = ua.ReadParameters()
252
        for attr in attrs:
253
            rv = ua.ReadValueId()
254
            rv.NodeId = self.nodeid
255
            rv.AttributeId = attr
256
            params.NodesToRead.append(rv)
257
258
        results = self.server.read(params)
259
        return results
260
261 1
    def get_children(self, refs=ua.ObjectIds.HierarchicalReferences, nodeclassmask=ua.NodeClass.Unspecified):
262
        """
263
        Get all children of a node. By default hierarchical references and all node classes are returned.
264
        Other reference types may be given:
265
        References = 31
266
        NonHierarchicalReferences = 32
267
        HierarchicalReferences = 33
268
        HasChild = 34
269
        Organizes = 35
270
        HasEventSource = 36
271
        HasModellingRule = 37
272
        HasEncoding = 38
273
        HasDescription = 39
274
        HasTypeDefinition = 40
275
        GeneratesEvent = 41
276
        Aggregates = 44
277
        HasSubtype = 45
278
        HasProperty = 46
279
        HasComponent = 47
280
        HasNotifier = 48
281
        HasOrderedComponent = 49
282
        """
283 1
        references = self.get_children_descriptions(refs, nodeclassmask)
284 1
        nodes = []
285 1
        for desc in references:
286 1
            node = Node(self.server, desc.NodeId)
287 1
            nodes.append(node)
288 1
        return nodes
289
290 1
    def get_properties(self):
291
        """
292
        return properties of node.
293
        properties are child nodes with a reference of type HasProperty and a NodeClass of Variable
294
        """
295
        return self.get_children(refs=ua.ObjectIds.HasProperty, nodeclassmask=ua.NodeClass.Variable)
296
297 1
    def get_children_descriptions(self, refs=ua.ObjectIds.HierarchicalReferences, nodeclassmask=ua.NodeClass.Unspecified, includesubtypes=True):
298
        """
299
        return all attributes of child nodes as UA BrowseResult structs
300
        """
301 1
        desc = ua.BrowseDescription()
302 1
        desc.BrowseDirection = ua.BrowseDirection.Forward
303 1
        desc.ReferenceTypeId = ua.TwoByteNodeId(refs)
304 1
        desc.IncludeSubtypes = includesubtypes
305 1
        desc.NodeClassMask = nodeclassmask
306 1
        desc.ResultMask = ua.BrowseResultMask.All
307
308 1
        desc.NodeId = self.nodeid
309 1
        params = ua.BrowseParameters()
310 1
        params.View.Timestamp = ua.win_epoch_to_datetime(0)
311 1
        params.NodesToBrowse.append(desc)
312 1
        results = self.server.browse(params)
313 1
        return results[0].References
314
315 1
    def get_child(self, path):
316
        """
317
        get a child specified by its path from this node.
318
        A path might be:
319
        * a string representing a qualified name.
320
        * a qualified name
321
        * a list of string
322
        * a list of qualified names
323
        """
324 1
        if type(path) not in (list, tuple):
325 1
            path = [path]
326 1
        rpath = ua.RelativePath()
327 1
        for item in path:
328 1
            el = ua.RelativePathElement()
329 1
            el.ReferenceTypeId = ua.TwoByteNodeId(ua.ObjectIds.HierarchicalReferences)
330 1
            el.IsInverse = False
331 1
            el.IncludeSubtypes = True
332 1
            if isinstance(item, ua.QualifiedName):
333
                el.TargetName = item
334
            else:
335 1
                el.TargetName = ua.QualifiedName.from_string(item)
336 1
            rpath.Elements.append(el)
337 1
        bpath = ua.BrowsePath()
338 1
        bpath.StartingNode = self.nodeid
339 1
        bpath.RelativePath = rpath
340 1
        result = self.server.translate_browsepaths_to_nodeids([bpath])
341 1
        result = result[0]
342 1
        result.StatusCode.check()
343
        # FIXME: seems this method may return several nodes
344 1
        return Node(self.server, result.Targets[0].TargetId)
345
346 1
    def read_raw_history(self, starttime=None, endtime=None, numvalues=0, returnbounds=True):
347
        """
348
        Read raw history of a node
349
        result code from server is checked and an exception is raised in case of error
350
        """
351
        details = ua.ReadRawModifiedDetails()
352
        details.IsReadModified = False
353
        if starttime:
354
            details.StartTime = starttime
355
        if endtime:
356
            details.EndTime = endtime
357
        details.NumValuesPerNode = numvalues
358
        details.ReturnBounds = returnbounds
359
        return self.history_read(details)
360
361 1
    def history_read(self, details):
362
        """
363
        Read raw history of a node, low-level function
364
        result code from server is checked and an exception is raised in case of error
365
        """
366
        valueid = ua.HistoryReadValueId()
367
        valueid.NodeId = self.nodeid
368
        valueid.IndexRange = ''
369
370
        params = ua.HistoryReadParameters()
371
        params.HistoryReadDetails = ua.ExtensionObject.from_object(details)
372
        params.TimestampsToReturn = ua.TimestampsToReturn.Both
373
        params.ReleaseContinuationPoints = False
374
        params.NodesToRead.append(valueid)
375
        result = self.server.history_read(params)[0]
376
        return result.HistoryData
377
378
    # Convenience legacy methods
379 1
    add_folder = create_folder
380 1
    add_property = create_property
381 1
    add_object = create_object
382 1
    add_variable = create_variable
383 1
    add_method = create_method
384 1
    call_method = call_method
385
386
387 1
def _create_folder(server, parentnodeid, nodeid, qname):
388 1
    node = ua.AddNodesItem()
389 1
    node.RequestedNewNodeId = nodeid
390 1
    node.BrowseName = qname
391 1
    node.NodeClass = ua.NodeClass.Object
392 1
    node.ParentNodeId = parentnodeid
393 1
    node.ReferenceTypeId = ua.NodeId.from_string("i=35")
394 1
    node.TypeDefinition = ua.NodeId.from_string("i=61")
395 1
    attrs = ua.ObjectAttributes()
396 1
    attrs.Description = ua.LocalizedText(qname.Name)
397 1
    attrs.DisplayName = ua.LocalizedText(qname.Name)
398 1
    attrs.WriteMask = ua.OpenFileMode.Read
399 1
    attrs.UserWriteMask = ua.OpenFileMode.Read
400 1
    attrs.EventNotifier = 0
401 1
    node.NodeAttributes = attrs
402 1
    results = server.add_nodes([node])
403 1
    results[0].StatusCode.check()
404 1
    return nodeid
405
406
407 1
def _create_object(server, parentnodeid, nodeid, qname):
408 1
    node = ua.AddNodesItem()
409 1
    node.RequestedNewNodeId = nodeid
410 1
    node.BrowseName = qname
411 1
    node.NodeClass = ua.NodeClass.Object
412 1
    node.ParentNodeId = parentnodeid
413 1
    node.ReferenceTypeId = ua.NodeId.from_string("i=35")
414 1
    node.TypeDefinition = ua.NodeId(ua.ObjectIds.BaseObjectType)
415 1
    attrs = ua.ObjectAttributes()
416 1
    attrs.Description = ua.LocalizedText(qname.Name)
417 1
    attrs.DisplayName = ua.LocalizedText(qname.Name)
418 1
    attrs.EventNotifier = 0
419 1
    attrs.WriteMask = ua.OpenFileMode.Read
420 1
    attrs.UserWriteMask = ua.OpenFileMode.Read
421 1
    node.NodeAttributes = attrs
422 1
    results = server.add_nodes([node])
423 1
    results[0].StatusCode.check()
424 1
    return nodeid
425
426
427 1
def _to_variant(val, vtype=None):
428 1
    if isinstance(val, ua.Variant):
429 1
        return val
430
    else:
431 1
        return ua.Variant(val, vtype)
432
433
434 1
def _create_variable(server, parentnodeid, nodeid, qname, val, isproperty=False):
435 1
    node = ua.AddNodesItem()
436 1
    node.RequestedNewNodeId = nodeid
437 1
    node.BrowseName = qname
438 1
    node.NodeClass = ua.NodeClass.Variable
439 1
    node.ParentNodeId = parentnodeid
440 1
    if isproperty:
441 1
        node.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasProperty)
442 1
        node.TypeDefinition = ua.NodeId(ua.ObjectIds.PropertyType)
443
    else:
444 1
        node.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasComponent)
445 1
        node.TypeDefinition = ua.NodeId(ua.ObjectIds.BaseDataVariableType)
446 1
    attrs = ua.VariableAttributes()
447 1
    attrs.Description = ua.LocalizedText(qname.Name)
448 1
    attrs.DisplayName = ua.LocalizedText(qname.Name)
449 1
    attrs.DataType = _guess_uatype(val)
450 1
    attrs.Value = val
451 1
    attrs.ValueRank = 0
452 1
    attrs.WriteMask = ua.OpenFileMode.Read
453 1
    attrs.UserWriteMask = ua.OpenFileMode.Read
454 1
    attrs.Historizing = 0
455 1
    node.NodeAttributes = attrs
456 1
    results = server.add_nodes([node])
457 1
    results[0].StatusCode.check()
458 1
    return nodeid
459
460
461 1
def _create_method(parent, nodeid, qname, callback, inputs, outputs):
462 1
    node = ua.AddNodesItem()
463 1
    node.RequestedNewNodeId = nodeid
464 1
    node.BrowseName = qname
465 1
    node.NodeClass = ua.NodeClass.Method
466 1
    node.ParentNodeId = parent.nodeid
467 1
    node.ReferenceTypeId = ua.NodeId.from_string("i=47")
468
    #node.TypeDefinition = ua.NodeId(ua.ObjectIds.BaseObjectType)
469 1
    attrs = ua.MethodAttributes()
470 1
    attrs.Description = ua.LocalizedText(qname.Name)
471 1
    attrs.DisplayName = ua.LocalizedText(qname.Name)
472 1
    attrs.WriteMask = ua.OpenFileMode.Read
473 1
    attrs.UserWriteMask = ua.OpenFileMode.Read
474 1
    attrs.Executable = True
475 1
    attrs.UserExecutable = True
476 1
    node.NodeAttributes = attrs
477 1
    results = parent.server.add_nodes([node])
478 1
    results[0].StatusCode.check()
479 1
    method = Node(parent.server, nodeid)
480 1
    if inputs:
481 1
        create_property(method, ua.generate_nodeid(qname.NamespaceIndex), ua.QualifiedName("InputArguments", 0), [_vtype_to_argument(vtype) for vtype in inputs])
482 1
    if outputs:
483 1
        create_property(method, ua.generate_nodeid(qname.NamespaceIndex), ua.QualifiedName("OutputArguments", 0), [_vtype_to_argument(vtype) for vtype in outputs])
484 1
    parent.server.add_method_callback(method.nodeid, callback)
485 1
    return nodeid
486
487
488 1
def _call_method(server, parentnodeid, methodid, arguments):
489 1
    request = ua.CallMethodRequest()
490 1
    request.ObjectId = parentnodeid
491 1
    request.MethodId = methodid
492 1
    request.InputArguments = arguments
493 1
    methodstocall = [request]
494 1
    results = server.call(methodstocall)
495 1
    res = results[0]
496 1
    res.StatusCode.check()
497 1
    return res
498
499
500 1
def _vtype_to_argument(vtype):
501 1
    if isinstance(vtype, ua.Argument):
502
        return ua.ExtensionObject.from_object(vtype)
503
504 1
    arg = ua.Argument()
505 1
    v = ua.Variant(None, vtype)
506 1
    arg.DataType = _guess_uatype(v)
507 1
    return ua.ExtensionObject.from_object(arg)
508
509
510 1
def _guess_uatype(variant):
511 1
    if variant.VariantType == ua.VariantType.ExtensionObject:
512 1
        if variant.Value is None:
513
            raise Exception("Cannot guess DataType from Null ExtensionObject")
514 1
        if type(variant.Value) in (list, tuple):
515 1
            if len(variant.Value) == 0:
516
                raise Exception("Cannot guess DataType from Null ExtensionObject")
517 1
            extobj = variant.Value[0]
518
        else:
519
            extobj = variant.Value
520 1
        objectidname = ua.ObjectIdsInv[extobj.TypeId.Identifier]
521 1
        classname = objectidname.split("_")[0]
522 1
        return ua.NodeId(getattr(ua.ObjectIds, classname))
523
    else:
524 1
        return ua.NodeId(getattr(ua.ObjectIds, variant.VariantType.name))
525
526
527 1
def _parse_add_args(*args):
528 1
    if isinstance(args[0], ua.NodeId):
529 1
        return args[0], args[1]
530 1
    elif isinstance(args[0], str):
531 1
        return ua.NodeId.from_string(args[0]), ua.QualifiedName.from_string(args[1])
532 1
    elif isinstance(args[0], int):
533 1
        return ua.generate_nodeid(args[0]), ua.QualifiedName(args[1], args[0])
534
    else:
535
        raise TypeError("Add methods takes a nodeid and a qualifiedname as argument, received %s" % args)
536