Completed
Pull Request — master (#165)
by Denis
05:07 queued 02:20
created

Node.get_parent()   A

Complexity

Conditions 1

Size

Total Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 1
Metric Value
cc 1
dl 0
loc 7
ccs 3
cts 3
cp 1
crap 1
rs 9.4285
1
"""
2
High level node object, to access node attribute
3
and browse address space
4
"""
5
6 1
from datetime import datetime
7
8 1
from opcua import ua
9
10
11 1
class Node(object):
12
13
    """
14
    High level node object, to access node attribute,
15
    browse and populate address space.
16
    Node objects are usefull as-is but they do not expose the entire
17
    OPC-UA protocol. Feel free to look at the code of this class and call
18
    directly UA services methods to optimize your code
19
    """
20
21 1
    def __init__(self, server, nodeid):
22 1
        self.server = server
23 1
        self.nodeid = None
24 1
        if isinstance(nodeid, ua.NodeId):
25 1
            self.nodeid = nodeid
26 1
        elif type(nodeid) in (str, bytes):
27 1
            self.nodeid = ua.NodeId.from_string(nodeid)
28
        elif isinstance(nodeid, int):
29
            self.nodeid = ua.NodeId(nodeid, 0)
30
        else:
31
            raise ua.UaError("argument to node must be a NodeId object or a string defining a nodeid found {} of type {}".format(nodeid, type(nodeid)))
32
33 1
    def __eq__(self, other):
34 1
        if isinstance(other, Node) and self.nodeid == other.nodeid:
35 1
            return True
36 1
        return False
37
38 1
    def __ne__(self, other):
39
        return not self.__eq__(other)
40
41 1
    def __str__(self):
42
        return "Node({})".format(self.nodeid)
43 1
    __repr__ = __str__
44
45 1
    def __hash__(self):
46 1
        return self.nodeid.__hash__()
47
48 1
    def get_browse_name(self):
49
        """
50
        Get browse name of a node. A browse name is a QualifiedName object
51
        composed of a string(name) and a namespace index.
52
        """
53 1
        result = self.get_attribute(ua.AttributeIds.BrowseName)
54 1
        return result.Value.Value
55
56 1
    def get_display_name(self):
57
        """
58
        get description attribute of node
59
        """
60 1
        result = self.get_attribute(ua.AttributeIds.DisplayName)
61 1
        return result.Value.Value
62
63 1
    def get_data_type(self):
64
        """
65
        get data type of node
66
        """
67 1
        result = self.get_attribute(ua.AttributeIds.DataType)
68 1
        return result.Value.Value
69
70 1
    def get_node_class(self):
71
        """
72
        get node class attribute of node
73
        """
74 1
        result = self.get_attribute(ua.AttributeIds.NodeClass)
75 1
        return result.Value.Value
76
77 1
    def get_description(self):
78
        """
79
        get description attribute class of node
80
        """
81 1
        result = self.get_attribute(ua.AttributeIds.Description)
82 1
        return result.Value.Value
83
84 1
    def get_value(self):
85
        """
86
        Get value of a node as a python type. Only variables ( and properties) have values.
87
        An exception will be generated for other node types.
88
        """
89 1
        result = self.get_data_value()
90 1
        return result.Value.Value
91
92 1
    def get_data_value(self):
93
        """
94
        Get value of a node as a DataValue object. Only variables (and properties) have values.
95
        An exception will be generated for other node types.
96
        DataValue contain a variable value as a variant as well as server and source timestamps
97
        """
98 1
        return self.get_attribute(ua.AttributeIds.Value)
99
100 1
    def set_array_dimensions(self, value):
101
        """
102
        Set attribute ArrayDimensions of node
103
        make sure it has the correct data type
104
        """
105 1
        v = ua.Variant(value, ua.VariantType.UInt32)
106 1
        self.set_attribute(ua.AttributeIds.ArrayDimensions, ua.DataValue(v))
107
108 1
    def get_array_dimensions(self):
109
        """
110
        Read and return ArrayDimensions attribute of node
111
        """
112 1
        res = self.get_attribute(ua.AttributeIds.ArrayDimensions)
113 1
        return res.Value.Value
114
115 1
    def set_value_rank(self, value):
116
        """
117
        Set attribute ArrayDimensions of node
118
        """
119 1
        v = ua.Variant(value, ua.VariantType.Int32)
120 1
        self.set_attribute(ua.AttributeIds.ValueRank, ua.DataValue(v))
121
122 1
    def get_value_rank(self):
123
        """
124
        Read and return ArrayDimensions attribute of node
125
        """
126 1
        res = self.get_attribute(ua.AttributeIds.ValueRank)
127 1
        return res.Value.Value
128
129 1
    def set_value(self, value, varianttype=None):
130
        """
131
        Set value of a node. Only variables(properties) have values.
132
        An exception will be generated for other node types.
133
        value argument is either:
134
        * a python built-in type, converted to opc-ua
135
        optionnaly using the variantype argument.
136
        * a ua.Variant, varianttype is then ignored
137
        * a ua.DataValue, you then have full control over data send to server
138
        """
139 1
        datavalue = None
140 1
        if isinstance(value, ua.DataValue):
141 1
            datavalue = value
142 1
        elif isinstance(value, ua.Variant):
143 1
            datavalue = ua.DataValue(value)
144
        else:
145 1
            datavalue = ua.DataValue(ua.Variant(value, varianttype))
146 1
        self.set_attribute(ua.AttributeIds.Value, datavalue)
147
148 1
    set_data_value = set_value
149
150 1
    def set_writable(self, writable=True):
151
        """
152
        Set node as writable by clients.
153
        A node is always writable on server side.
154
        """
155 1
        if writable:
156 1
            self.set_attr_bit(ua.AttributeIds.AccessLevel, ua.AccessLevel.CurrentWrite)
157 1
            self.set_attr_bit(ua.AttributeIds.UserAccessLevel, ua.AccessLevel.CurrentWrite)
158
        else:
159 1
            self.unset_attr_bit(ua.AttributeIds.AccessLevel, ua.AccessLevel.CurrentWrite)
160 1
            self.unset_attr_bit(ua.AttributeIds.UserAccessLevel, ua.AccessLevel.CurrentWrite)
161
162 1
    def set_attr_bit(self, attr, bit):
163 1
        val = self.get_attribute(attr)
164 1
        val.Value.Value = ua.set_bit(val.Value.Value, bit)
165 1
        self.set_attribute(attr, val)
166
167 1
    def unset_attr_bit(self, attr, bit):
168 1
        val = self.get_attribute(attr)
169 1
        val.Value.Value = ua.unset_bit(val.Value.Value, bit)
170 1
        self.set_attribute(attr, val)
171
172 1
    def set_read_only(self):
173
        """
174
        Set a node as read-only for clients.
175
        A node is always writable on server side.
176
        """
177
        return self.set_writable(False)
178
179 1
    def set_attribute(self, attributeid, datavalue):
180
        """
181
        Set an attribute of a node
182
        attributeid is a member of ua.AttributeIds
183
        datavalue is a ua.DataValue object
184
        """
185 1
        attr = ua.WriteValue()
186 1
        attr.NodeId = self.nodeid
187 1
        attr.AttributeId = attributeid
188 1
        attr.Value = datavalue
189 1
        params = ua.WriteParameters()
190 1
        params.NodesToWrite = [attr]
191 1
        result = self.server.write(params)
192 1
        result[0].check()
193
194 1
    def get_attribute(self, attr):
195
        """
196
        Read one attribute of a node
197
        result code from server is checked and an exception is raised in case of error
198
        """
199 1
        rv = ua.ReadValueId()
200 1
        rv.NodeId = self.nodeid
201 1
        rv.AttributeId = attr
202 1
        params = ua.ReadParameters()
203 1
        params.NodesToRead.append(rv)
204 1
        result = self.server.read(params)
205 1
        result[0].StatusCode.check()
206 1
        return result[0]
207
208 1
    def get_attributes(self, attrs):
209
        """
210
        Read several attributes of a node
211
        list of DataValue is returned
212
        """
213
        params = ua.ReadParameters()
214
        for attr in attrs:
215
            rv = ua.ReadValueId()
216
            rv.NodeId = self.nodeid
217
            rv.AttributeId = attr
218
            params.NodesToRead.append(rv)
219
220
        results = self.server.read(params)
221
        return results
222
223 1
    def get_children(self, refs=ua.ObjectIds.HierarchicalReferences, nodeclassmask=ua.NodeClass.Unspecified):
224
        """
225
        Get all children of a node. By default hierarchical references and all node classes are returned.
226
        Other reference types may be given:
227
        References = 31
228
        NonHierarchicalReferences = 32
229
        HierarchicalReferences = 33
230
        HasChild = 34
231
        Organizes = 35
232
        HasEventSource = 36
233
        HasModellingRule = 37
234
        HasEncoding = 38
235
        HasDescription = 39
236
        HasTypeDefinition = 40
237
        GeneratesEvent = 41
238
        Aggregates = 44
239
        HasSubtype = 45
240
        HasProperty = 46
241
        HasComponent = 47
242
        HasNotifier = 48
243
        HasOrderedComponent = 49
244
        """
245 1
        return self.get_referenced_nodes(refs, ua.BrowseDirection.Forward, nodeclassmask)
246
247 1
    def get_properties(self):
248
        """
249
        return properties of node.
250
        properties are child nodes with a reference of type HasProperty and a NodeClass of Variable
251
        """
252
        return self.get_children(refs=ua.ObjectIds.HasProperty, nodeclassmask=ua.NodeClass.Variable)
253
254 1
    def get_children_descriptions(self, refs=ua.ObjectIds.HierarchicalReferences, nodeclassmask=ua.NodeClass.Unspecified, includesubtypes=True):
255 1
        return self.get_references(refs, ua.BrowseDirection.Forward, nodeclassmask, includesubtypes)
256
257 1
    def get_references(self, refs=ua.ObjectIds.References, direction=ua.BrowseDirection.Both, nodeclassmask=ua.NodeClass.Unspecified, includesubtypes=True):
258
        """
259
        returns references of the node based on specific filter defined with:
260
261
        refs = ObjectId of the Reference
262
        direction = Browse direction for references
263
        nodeclassmask = filter nodes based on specific class
264
        includesubtypes = If true subtypes of the reference (ref) are also included
265
        """
266 1
        desc = ua.BrowseDescription()
267 1
        desc.BrowseDirection = direction
268 1
        desc.ReferenceTypeId = ua.TwoByteNodeId(refs)
269 1
        desc.IncludeSubtypes = includesubtypes
270 1
        desc.NodeClassMask = nodeclassmask
271 1
        desc.ResultMask = ua.BrowseResultMask.All
272
273 1
        desc.NodeId = self.nodeid
274 1
        params = ua.BrowseParameters()
275 1
        params.View.Timestamp = ua.win_epoch_to_datetime(0)
276 1
        params.NodesToBrowse.append(desc)
277 1
        results = self.server.browse(params)
278 1
        return results[0].References
279
280 1
    def get_referenced_nodes(self, refs=ua.ObjectIds.References, direction=ua.BrowseDirection.Both, nodeclassmask=ua.NodeClass.Unspecified, includesubtypes=True):
281
        """
282
        returns referenced nodes based on specific filter
283
        Paramters are the same as for get_references
284
285
        """
286 1
        references = self.get_references(refs, direction, nodeclassmask, includesubtypes)
287 1
        nodes = []
288 1
        for desc in references:
289 1
            node = Node(self.server, desc.NodeId)
290 1
            nodes.append(node)
291 1
        return nodes
292
293 1
    def get_type_definition(self):
294
        """
295
        returns type definition of the node.
296
        """
297 1
        references = self.get_references(refs=ua.ObjectIds.HasTypeDefinition, direction=ua.BrowseDirection.Forward)
298 1
        if len(references) == 0:
299 1
            return ua.ObjectIds.BaseObjectType
300 1
        return references[0].NodeId.Identifier
301
302 1
    def get_parent(self):
303
        """
304
        returns parent of the node.
305
        """
306 1
        refs = self.get_references(refs=ua.ObjectIds.HierarchicalReferences, direction=ua.BrowseDirection.Inverse)
307
308 1
        return Node(self.server, refs[0].NodeId)
309
310 1
    def get_child(self, path):
311
        """
312
        get a child specified by its path from this node.
313
        A path might be:
314
        * a string representing a qualified name.
315
        * a qualified name
316
        * a list of string
317
        * a list of qualified names
318
        """
319 1
        if type(path) not in (list, tuple):
320 1
            path = [path]
321 1
        rpath = ua.RelativePath()
322 1
        for item in path:
323 1
            el = ua.RelativePathElement()
324 1
            el.ReferenceTypeId = ua.TwoByteNodeId(ua.ObjectIds.HierarchicalReferences)
325 1
            el.IsInverse = False
326 1
            el.IncludeSubtypes = True
327 1
            if isinstance(item, ua.QualifiedName):
328
                el.TargetName = item
329
            else:
330 1
                el.TargetName = ua.QualifiedName.from_string(item)
331 1
            rpath.Elements.append(el)
332 1
        bpath = ua.BrowsePath()
333 1
        bpath.StartingNode = self.nodeid
334 1
        bpath.RelativePath = rpath
335 1
        result = self.server.translate_browsepaths_to_nodeids([bpath])
336 1
        result = result[0]
337 1
        result.StatusCode.check()
338
        # FIXME: seems this method may return several nodes
339 1
        return Node(self.server, result.Targets[0].TargetId)
340
341 1
    def read_raw_history(self, starttime=None, endtime=None, numvalues=0):
342
        """
343
        Read raw history of a node
344
        result code from server is checked and an exception is raised in case of error
345
        If numvalues is > 0 and number of events in period is > numvalues
346
        then result will be truncated
347
        """
348 1
        details = ua.ReadRawModifiedDetails()
349 1
        details.IsReadModified = False
350 1
        if starttime:
351 1
            details.StartTime = starttime
352
        else:
353 1
            details.StartTime = ua.DateTimeMinValue
354 1
        if endtime:
355 1
            details.EndTime = endtime
356
        else:
357 1
            details.EndTime = ua.DateTimeMinValue
358 1
        details.NumValuesPerNode = numvalues
359 1
        details.ReturnBounds = True
360 1
        result = self.history_read(details)
361 1
        return result.HistoryData.DataValues
362
363 1
    def history_read(self, details):
364
        """
365
        Read raw history of a node, low-level function
366
        result code from server is checked and an exception is raised in case of error
367
        """
368 1
        valueid = ua.HistoryReadValueId()
369 1
        valueid.NodeId = self.nodeid
370 1
        valueid.IndexRange = ''
371
372 1
        params = ua.HistoryReadParameters()
373 1
        params.HistoryReadDetails = details
374 1
        params.TimestampsToReturn = ua.TimestampsToReturn.Both
375 1
        params.ReleaseContinuationPoints = False
376 1
        params.NodesToRead.append(valueid)
377 1
        result = self.server.history_read(params)[0]
378 1
        return result
379
380
    # Hack for convenience methods
381
    # local import is ugly but necessary for python2 support
382
    # feel fri to propose something better but I want to split all those
383
    # create methods from Node
384
385 1
    def add_folder(*args, **kwargs):
386 1
        from opcua.common import manage_nodes
387 1
        return manage_nodes.create_folder(*args, **kwargs)
388
389 1
    def add_object(*args, **kwargs):
390 1
        from opcua.common import manage_nodes
391 1
        return manage_nodes.create_object(*args, **kwargs)
392
393 1
    def add_variable(*args, **kwargs):
394 1
        from opcua.common import manage_nodes
395 1
        return manage_nodes.create_variable(*args, **kwargs)
396
397 1
    def add_property(*args, **kwargs):
398 1
        from opcua.common import manage_nodes
399 1
        return manage_nodes.create_property(*args, **kwargs)
400
401 1
    def add_method(*args, **kwargs):
402 1
        from opcua.common import manage_nodes
403 1
        return manage_nodes.create_method(*args, **kwargs)
404
405 1
    def call_method(*args, **kwargs):
406 1
        from opcua.common import methods
407
        return methods.call_method(*args, **kwargs)
408