Completed
Pull Request — master (#388)
by Olivier
05:04
created

Node.get_encoding_refs()   A

Complexity

Conditions 1

Size

Total Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 1.125

Importance

Changes 0
Metric Value
cc 1
dl 0
loc 2
ccs 1
cts 2
cp 0.5
crap 1.125
rs 10
c 0
b 0
f 0
1
"""
2
High level node object, to access node attribute
3
and browse address space
4
"""
5
6 1
from opcua import ua
7 1
from opcua.common import events
8 1
import opcua.common
9
10 1
class Node(object):
11
12
    """
13
    High level node object, to access node attribute,
14
    browse and populate address space.
15
    Node objects are usefull as-is but they do not expose the entire
16
    OPC-UA protocol. Feel free to look at the code of this class and call
17
    directly UA services methods to optimize your code
18
    """
19
20 1
    def __init__(self, server, nodeid):
21 1
        self.server = server
22 1
        self.nodeid = None
23 1
        if isinstance(nodeid, Node):
24 1
            self.nodeid = nodeid.nodeid
25 1
        elif isinstance(nodeid, ua.NodeId):
26 1
            self.nodeid = nodeid
27 1
        elif type(nodeid) in (str, bytes):
28 1
            self.nodeid = ua.NodeId.from_string(nodeid)
29 1
        elif isinstance(nodeid, int):
30 1
            self.nodeid = ua.NodeId(nodeid, 0)
31
        else:
32
            raise ua.UaError("argument to node must be a NodeId object or a string defining a nodeid found {0} of type {1}".format(nodeid, type(nodeid)))
33
34 1
    def __eq__(self, other):
35 1
        if isinstance(other, Node) and self.nodeid == other.nodeid:
36 1
            return True
37 1
        return False
38
39 1
    def __ne__(self, other):
40
        return not self.__eq__(other)
41
42 1
    def __str__(self):
43 1
        return "Node({0})".format(self.nodeid)
44 1
    __repr__ = __str__
45
46 1
    def __hash__(self):
47 1
        return self.nodeid.__hash__()
48
49 1
    def get_browse_name(self):
50
        """
51
        Get browse name of a node. A browse name is a QualifiedName object
52
        composed of a string(name) and a namespace index.
53
        """
54 1
        result = self.get_attribute(ua.AttributeIds.BrowseName)
55 1
        return result.Value.Value
56
57 1
    def get_display_name(self):
58
        """
59
        get description attribute of node
60
        """
61 1
        result = self.get_attribute(ua.AttributeIds.DisplayName)
62 1
        return result.Value.Value
63
64 1
    def get_data_type(self):
65
        """
66
        get data type of node as NodeId
67
        """
68 1
        result = self.get_attribute(ua.AttributeIds.DataType)
69 1
        return result.Value.Value
70
71 1
    def get_data_type_as_variant_type(self):
72
        """
73
        get data type of node as VariantType
74
        This only works if node is a variable, otherwise type
75
        may not be convertible to VariantType
76
        """
77 1
        result = self.get_attribute(ua.AttributeIds.DataType)
78 1
        return opcua.common.ua_utils.data_type_to_variant_type(Node(self.server, result.Value.Value))
79
80 1
    def get_access_level(self):
81
        """
82
        Get the access level attribute of the node as a set of AccessLevel enum values.
83
        """
84
        result = self.get_attribute(ua.AttributeIds.AccessLevel)
85
        return ua.AccessLevel.parse_bitfield(result.Value.Value)
86
87 1
    def get_user_access_level(self):
88
        """
89
        Get the user access level attribute of the node as a set of AccessLevel enum values.
90
        """
91
        result = self.get_attribute(ua.AttributeIds.UserAccessLevel)
92
        return ua.AccessLevel.parse_bitfield(result.Value.Value)
93
94 1
    def get_event_notifier(self):
95
        """
96
        Get the event notifier attribute of the node as a set of EventNotifier enum values.
97
        """
98 1
        result = self.get_attribute(ua.AttributeIds.EventNotifier)
99 1
        return ua.EventNotifier.parse_bitfield(result.Value.Value)
100
101 1
    def set_event_notifier(self, values):
102
        """
103
        Set the event notifier attribute.
104
105
        :param values: an iterable of EventNotifier enum values.
106
        """
107 1
        event_notifier_bitfield = ua.EventNotifier.to_bitfield(values)
108 1
        self.set_attribute(ua.AttributeIds.EventNotifier, ua.DataValue(ua.Variant(event_notifier_bitfield, ua.VariantType.Byte)))
109
110 1
    def get_node_class(self):
111
        """
112
        get node class attribute of node
113
        """
114 1
        result = self.get_attribute(ua.AttributeIds.NodeClass)
115 1
        return result.Value.Value
116
117 1
    def get_description(self):
118
        """
119
        get description attribute class of node
120
        """
121 1
        result = self.get_attribute(ua.AttributeIds.Description)
122 1
        return result.Value.Value
123
124 1
    def get_value(self):
125
        """
126
        Get value of a node as a python type. Only variables ( and properties) have values.
127
        An exception will be generated for other node types.
128
        """
129 1
        result = self.get_data_value()
130 1
        return result.Value.Value
131
132 1
    def get_data_value(self):
133
        """
134
        Get value of a node as a DataValue object. Only variables (and properties) have values.
135
        An exception will be generated for other node types.
136
        DataValue contain a variable value as a variant as well as server and source timestamps
137
        """
138 1
        return self.get_attribute(ua.AttributeIds.Value)
139
140 1
    def set_array_dimensions(self, value):
141
        """
142
        Set attribute ArrayDimensions of node
143
        make sure it has the correct data type
144
        """
145 1
        v = ua.Variant(value, ua.VariantType.UInt32)
146 1
        self.set_attribute(ua.AttributeIds.ArrayDimensions, ua.DataValue(v))
147
148 1
    def get_array_dimensions(self):
149
        """
150
        Read and return ArrayDimensions attribute of node
151
        """
152 1
        res = self.get_attribute(ua.AttributeIds.ArrayDimensions)
153 1
        return res.Value.Value
154
155 1
    def set_value_rank(self, value):
156
        """
157
        Set attribute ArrayDimensions of node
158
        """
159 1
        v = ua.Variant(value, ua.VariantType.Int32)
160 1
        self.set_attribute(ua.AttributeIds.ValueRank, ua.DataValue(v))
161
162 1
    def get_value_rank(self):
163
        """
164
        Read and return ArrayDimensions attribute of node
165
        """
166 1
        res = self.get_attribute(ua.AttributeIds.ValueRank)
167 1
        return res.Value.Value
168
169 1
    def set_value(self, value, varianttype=None):
170
        """
171
        Set value of a node. Only variables(properties) have values.
172
        An exception will be generated for other node types.
173
        value argument is either:
174
        * a python built-in type, converted to opc-ua
175
        optionnaly using the variantype argument.
176
        * a ua.Variant, varianttype is then ignored
177
        * a ua.DataValue, you then have full control over data send to server
178
        """
179 1
        datavalue = None
180 1
        if isinstance(value, ua.DataValue):
181 1
            datavalue = value
182 1
        elif isinstance(value, ua.Variant):
183 1
            datavalue = ua.DataValue(value)
184
        else:
185 1
            datavalue = ua.DataValue(ua.Variant(value, varianttype))
186 1
        self.set_attribute(ua.AttributeIds.Value, datavalue)
187
188 1
    set_data_value = set_value
189
190 1
    def set_writable(self, writable=True):
191
        """
192
        Set node as writable by clients.
193
        A node is always writable on server side.
194
        """
195 1
        if writable:
196 1
            self.set_attr_bit(ua.AttributeIds.AccessLevel, ua.AccessLevel.CurrentWrite)
197 1
            self.set_attr_bit(ua.AttributeIds.UserAccessLevel, ua.AccessLevel.CurrentWrite)
198
        else:
199 1
            self.unset_attr_bit(ua.AttributeIds.AccessLevel, ua.AccessLevel.CurrentWrite)
200 1
            self.unset_attr_bit(ua.AttributeIds.UserAccessLevel, ua.AccessLevel.CurrentWrite)
201
202 1
    def set_attr_bit(self, attr, bit):
203 1
        val = self.get_attribute(attr)
204 1
        val.Value.Value = ua.ua_binary.set_bit(val.Value.Value, bit)
205 1
        self.set_attribute(attr, val)
206
207 1
    def unset_attr_bit(self, attr, bit):
208 1
        val = self.get_attribute(attr)
209 1
        val.Value.Value = ua.ua_binary.unset_bit(val.Value.Value, bit)
210 1
        self.set_attribute(attr, val)
211
212 1
    def set_read_only(self):
213
        """
214
        Set a node as read-only for clients.
215
        A node is always writable on server side.
216
        """
217
        return self.set_writable(False)
218
219 1
    def set_attribute(self, attributeid, datavalue):
220
        """
221
        Set an attribute of a node
222
        attributeid is a member of ua.AttributeIds
223
        datavalue is a ua.DataValue object
224
        """
225 1
        attr = ua.WriteValue()
226 1
        attr.NodeId = self.nodeid
227 1
        attr.AttributeId = attributeid
228 1
        attr.Value = datavalue
229 1
        params = ua.WriteParameters()
230 1
        params.NodesToWrite = [attr]
231 1
        result = self.server.write(params)
232 1
        result[0].check()
233
234 1
    def get_attribute(self, attr):
235
        """
236
        Read one attribute of a node
237
        result code from server is checked and an exception is raised in case of error
238
        """
239 1
        rv = ua.ReadValueId()
240 1
        rv.NodeId = self.nodeid
241 1
        rv.AttributeId = attr
242 1
        params = ua.ReadParameters()
243 1
        params.NodesToRead.append(rv)
244 1
        result = self.server.read(params)
245 1
        result[0].StatusCode.check()
246 1
        return result[0]
247
248 1
    def get_attributes(self, attrs):
249
        """
250
        Read several attributes of a node
251
        list of DataValue is returned
252
        """
253 1
        params = ua.ReadParameters()
254 1
        for attr in attrs:
255 1
            rv = ua.ReadValueId()
256 1
            rv.NodeId = self.nodeid
257 1
            rv.AttributeId = attr
258 1
            params.NodesToRead.append(rv)
259
260 1
        results = self.server.read(params)
261 1
        return results
262
263 1
    def get_children(self, refs=ua.ObjectIds.HierarchicalReferences, nodeclassmask=ua.NodeClass.Unspecified):
264
        """
265
        Get all children of a node. By default hierarchical references and all node classes are returned.
266
        Other reference types may be given:
267
        References = 31
268
        NonHierarchicalReferences = 32
269
        HierarchicalReferences = 33
270
        HasChild = 34
271
        Organizes = 35
272
        HasEventSource = 36
273
        HasModellingRule = 37
274
        HasEncoding = 38
275
        HasDescription = 39
276
        HasTypeDefinition = 40
277
        GeneratesEvent = 41
278
        Aggregates = 44
279
        HasSubtype = 45
280
        HasProperty = 46
281
        HasComponent = 47
282
        HasNotifier = 48
283
        HasOrderedComponent = 49
284
        """
285 1
        return self.get_referenced_nodes(refs, ua.BrowseDirection.Forward, nodeclassmask)
286
287 1
    def get_properties(self):
288
        """
289
        return properties of node.
290
        properties are child nodes with a reference of type HasProperty and a NodeClass of Variable
291
        """
292 1
        return self.get_children(refs=ua.ObjectIds.HasProperty, nodeclassmask=ua.NodeClass.Variable)
293
294 1
    def get_variables(self):
295
        """
296
        return variables of node.
297
        properties are child nodes with a reference of type HasComponent and a NodeClass of Variable
298
        """
299 1
        return self.get_children(refs=ua.ObjectIds.HasComponent, nodeclassmask=ua.NodeClass.Variable)
300
301 1
    def get_methods(self):
302
        """
303
        return methods of node.
304
        properties are child nodes with a reference of type HasComponent and a NodeClass of Method
305
        """
306 1
        return self.get_children(refs=ua.ObjectIds.HasComponent, nodeclassmask=ua.NodeClass.Method)
307
308 1
    def get_children_descriptions(self, refs=ua.ObjectIds.HierarchicalReferences, nodeclassmask=ua.NodeClass.Unspecified, includesubtypes=True):
309 1
        return self.get_references(refs, ua.BrowseDirection.Forward, nodeclassmask, includesubtypes)
310
311 1
    def get_encoding_refs(self):
312
        return self.get_referenced_nodes(ua.ObjectIds.HasEncoding, ua.BrowseDirection.Forward)
313
314 1
    def get_description_refs(self):
315
        return self.get_referenced_nodes(ua.ObjectIds.HasDescription, ua.BrowseDirection.Forward)
316
317 1
    def get_references(self, refs=ua.ObjectIds.References, direction=ua.BrowseDirection.Both, nodeclassmask=ua.NodeClass.Unspecified, includesubtypes=True):
318
        """
319
        returns references of the node based on specific filter defined with:
320
321
        refs = ObjectId of the Reference
322
        direction = Browse direction for references
323
        nodeclassmask = filter nodes based on specific class
324
        includesubtypes = If true subtypes of the reference (ref) are also included
325
        """
326 1
        desc = ua.BrowseDescription()
327 1
        desc.BrowseDirection = direction
328 1
        desc.ReferenceTypeId = ua.TwoByteNodeId(refs)
329 1
        desc.IncludeSubtypes = includesubtypes
330 1
        desc.NodeClassMask = nodeclassmask
331 1
        desc.ResultMask = ua.BrowseResultMask.All
332
333 1
        desc.NodeId = self.nodeid
334 1
        params = ua.BrowseParameters()
335 1
        params.View.Timestamp = ua.get_win_epoch()
336 1
        params.NodesToBrowse.append(desc)
337 1
        params.RequestedMaxReferencesPerNode = 0
338 1
        results = self.server.browse(params)
339
340 1
        references = self._browse_next(results)
341 1
        return references
342
343 1
    def _browse_next(self, results):
344 1
        references = results[0].References
345 1
        while results[0].ContinuationPoint:
346
            params = ua.BrowseNextParameters()
347
            params.ContinuationPoints = [results[0].ContinuationPoint]
348
            params.ReleaseContinuationPoints = False
349
            results = self.server.browse_next(params)
350
            references.extend(results[0].References)
351 1
        return references
352
353 1
    def get_referenced_nodes(self, refs=ua.ObjectIds.References, direction=ua.BrowseDirection.Both, nodeclassmask=ua.NodeClass.Unspecified, includesubtypes=True):
354
        """
355
        returns referenced nodes based on specific filter
356
        Paramters are the same as for get_references
357
358
        """
359 1
        references = self.get_references(refs, direction, nodeclassmask, includesubtypes)
360 1
        nodes = []
361 1
        for desc in references:
362 1
            node = Node(self.server, desc.NodeId)
363 1
            nodes.append(node)
364 1
        return nodes
365
366 1
    def get_type_definition(self):
367
        """
368
        returns type definition of the node.
369
        """
370 1
        references = self.get_references(refs=ua.ObjectIds.HasTypeDefinition, direction=ua.BrowseDirection.Forward)
371 1
        if len(references) == 0:
372 1
            return None
373 1
        return references[0].NodeId
374
375 1
    def get_path_as_string(self, max_length=20):
376
        """
377
        Attempt to find path of node from root node and return it as a list of strings.
378
        There might several possible paths to a node, this function will return one
379
        Some nodes may be missing references, so this method may
380
        return an empty list
381
        Since address space may have circular references, a max length is specified
382
383
        """
384 1
        path = self._get_path(max_length)
385 1
        path = [ref.BrowseName.to_string() for ref in path]
386 1
        path.append(self.get_browse_name().to_string())
387 1
        return path
388
389 1
    def get_path(self, max_length=20):
390
        """
391
        Attempt to find path of node from root node and return it as a list of Nodes.
392
        There might several possible paths to a node, this function will return one
393
        Some nodes may be missing references, so this method may
394
        return an empty list
395
        Since address space may have circular references, a max length is specified
396
397
        """
398 1
        path = self._get_path(max_length)
399 1
        path = [Node(self.server, ref.NodeId) for ref in path]
400 1
        path.append(self)
401 1
        return path
402
403 1
    def _get_path(self, max_length=20):
404
        """
405
        Attempt to find path of node from root node and return it as a list of Nodes.
406
        There might several possible paths to a node, this function will return one
407
        Some nodes may be missing references, so this method may
408
        return an empty list
409
        Since address space may have circular references, a max length is specified
410
411
        """
412 1
        path = []
413 1
        node = self
414 1
        while True:
415 1
            refs = node.get_references(refs=ua.ObjectIds.HierarchicalReferences, direction=ua.BrowseDirection.Inverse)
416 1
            if len(refs) > 0:
417 1
                path.insert(0, refs[0])
418 1
                node = Node(self.server, refs[0].NodeId)
419 1
                if len(path) >= (max_length -1):
420 1
                    return path
421
            else:
422 1
                return path
423
424 1
    def get_parent(self):
425
        """
426
        returns parent of the node.
427
        A Node may have several parents, the first found is returned.
428
        This method uses reverse references, a node might be missing such a link,
429
        thus we will not find its parent.
430
        """
431 1
        refs = self.get_references(refs=ua.ObjectIds.HierarchicalReferences, direction=ua.BrowseDirection.Inverse)
432 1
        if len(refs) > 0:
433 1
            return Node(self.server, refs[0].NodeId)
434
        else:
435
            return None
436
437 1
    def get_child(self, path):
438
        """
439
        get a child specified by its path from this node.
440
        A path might be:
441
        * a string representing a qualified name.
442
        * a qualified name
443
        * a list of string
444
        * a list of qualified names
445
        """
446 1
        if type(path) not in (list, tuple):
447 1
            path = [path]
448 1
        rpath = self._make_relative_path(path)
449 1
        bpath = ua.BrowsePath()
450 1
        bpath.StartingNode = self.nodeid
451 1
        bpath.RelativePath = rpath
452 1
        result = self.server.translate_browsepaths_to_nodeids([bpath])
453 1
        result = result[0]
454 1
        result.StatusCode.check()
455
        # FIXME: seems this method may return several nodes
456 1
        return Node(self.server, result.Targets[0].TargetId)
457
458 1
    def _make_relative_path(self, path):
459 1
        rpath = ua.RelativePath()
460 1
        for item in path:
461 1
            el = ua.RelativePathElement()
462 1
            el.ReferenceTypeId = ua.TwoByteNodeId(ua.ObjectIds.HierarchicalReferences)
463 1
            el.IsInverse = False
464 1
            el.IncludeSubtypes = True
465 1
            if isinstance(item, ua.QualifiedName):
466
                el.TargetName = item
467
            else:
468 1
                el.TargetName = ua.QualifiedName.from_string(item)
469 1
            rpath.Elements.append(el)
470 1
        return rpath
471
472 1
    def read_raw_history(self, starttime=None, endtime=None, numvalues=0):
473
        """
474
        Read raw history of a node
475
        result code from server is checked and an exception is raised in case of error
476
        If numvalues is > 0 and number of events in period is > numvalues
477
        then result will be truncated
478
        """
479 1
        details = ua.ReadRawModifiedDetails()
480 1
        details.IsReadModified = False
481 1
        if starttime:
482 1
            details.StartTime = starttime
483
        else:
484 1
            details.StartTime = ua.get_win_epoch()
485 1
        if endtime:
486 1
            details.EndTime = endtime
487
        else:
488 1
            details.EndTime = ua.get_win_epoch()
489 1
        details.NumValuesPerNode = numvalues
490 1
        details.ReturnBounds = True
491 1
        result = self.history_read(details)
492 1
        return result.HistoryData.DataValues
493
494 1
    def history_read(self, details):
495
        """
496
        Read raw history of a node, low-level function
497
        result code from server is checked and an exception is raised in case of error
498
        """
499 1
        valueid = ua.HistoryReadValueId()
500 1
        valueid.NodeId = self.nodeid
501 1
        valueid.IndexRange = ''
502
503 1
        params = ua.HistoryReadParameters()
504 1
        params.HistoryReadDetails = details
505 1
        params.TimestampsToReturn = ua.TimestampsToReturn.Both
506 1
        params.ReleaseContinuationPoints = False
507 1
        params.NodesToRead.append(valueid)
508 1
        result = self.server.history_read(params)[0]
509 1
        return result
510
511 1
    def read_event_history(self, starttime=None, endtime=None, numvalues=0, evtypes=ua.ObjectIds.BaseEventType):
512
        """
513
        Read event history of a source node
514
        result code from server is checked and an exception is raised in case of error
515
        If numvalues is > 0 and number of events in period is > numvalues
516
        then result will be truncated
517
        """
518
519 1
        details = ua.ReadEventDetails()
520 1
        if starttime:
521 1
            details.StartTime = starttime
522
        else:
523 1
            details.StartTime = ua.get_win_epoch()
524 1
        if endtime:
525 1
            details.EndTime = endtime
526
        else:
527 1
            details.EndTime = ua.get_win_epoch()
528 1
        details.NumValuesPerNode = numvalues
529
530 1
        if not isinstance(evtypes, (list, tuple)):
531 1
            evtypes = [evtypes]
532
533 1
        evtypes = [Node(self.server, evtype) for evtype in evtypes]
534
535 1
        evfilter = events.get_filter_from_event_type(evtypes)
536 1
        details.Filter = evfilter
537
538 1
        result = self.history_read_events(details)
539 1
        event_res = []
540 1
        for res in result.HistoryData.Events:
541 1
            event_res.append(events.Event.from_event_fields(evfilter.SelectClauses, res.EventFields))
542 1
        return event_res
543
544 1
    def history_read_events(self, details):
545
        """
546
        Read event history of a node, low-level function
547
        result code from server is checked and an exception is raised in case of error
548
        """
549 1
        valueid = ua.HistoryReadValueId()
550 1
        valueid.NodeId = self.nodeid
551 1
        valueid.IndexRange = ''
552
553 1
        params = ua.HistoryReadParameters()
554 1
        params.HistoryReadDetails = details
555 1
        params.TimestampsToReturn = ua.TimestampsToReturn.Both
556 1
        params.ReleaseContinuationPoints = False
557 1
        params.NodesToRead.append(valueid)
558 1
        result = self.server.history_read(params)[0]
559 1
        return result
560
561 1
    def delete(self, delete_references=True):
562
        """
563
        Delete node from address space
564
        """
565
        ditem = ua.DeleteNodesItem()
566
        ditem.NodeId = self.nodeid
567
        ditem.DeleteTargetReferences = delete_references
568
        params = ua.DeleteNodesParameters()
569
        params.NodesToDelete = [ditem]
570
        result = self.server.delete_nodes(params)
571
        result[0].check()
572
573 1
    def add_folder(self, nodeid, bname):
574 1
        return  opcua.common.manage_nodes.create_folder(self, nodeid, bname)
575
576 1
    def add_object(self, nodeid, bname, objecttype=None):
577 1
        return opcua.common.manage_nodes.create_object(self, nodeid, bname, objecttype)
578
579 1
    def add_variable(self, nodeid, bname, val, varianttype=None, datatype=None):
580 1
        return opcua.common.manage_nodes.create_variable(self, nodeid, bname, val, varianttype, datatype)
581
582 1
    def add_object_type(self, nodeid, bname):
583 1
        return opcua.common.manage_nodes.create_object_type(self, nodeid, bname)
584
585 1
    def add_variable_type(self, nodeid, bname, datatype):
586
        return opcua.common.manage_nodes.create_variable_type(self, nodeid, bname, datatype)
587
588 1
    def add_data_type(self, nodeid, bname, description=None):
589 1
        return opcua.common.manage_nodes.create_data_type(self, nodeid, bname, description=None)
590
591 1
    def add_property(self, nodeid, bname, val, varianttype=None, datatype=None):
592 1
        return opcua.common.manage_nodes.create_property(self, nodeid, bname, val, varianttype, datatype)
593
594 1
    def add_method(self, *args):
595 1
        return opcua.common.manage_nodes.create_method(self, *args)
596
597 1
    def add_reference_type(self, parent, nodeid, bname):
598
        return opcua.common.manage_nodes.create_reference_type(parent, nodeid, bname)
599
600 1
    def call_method(self, methodid, *args):
601
        return opcua.common.methods.call_method(self, methodid, *args)
602