Completed
Pull Request — master (#133)
by Denis
02:49
created

Node.get_description()   A

Complexity

Conditions 1

Size

Total Lines 6

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 1.2963
Metric Value
cc 1
dl 0
loc 6
ccs 1
cts 3
cp 0.3333
crap 1.2963
rs 9.4285
1
"""
2
High level node object, to access node attribute
3
and browse address space
4
"""
5
6 1
from datetime import datetime
7
8 1
from opcua import ua
9
10
11 1
class Node(object):
12
13
    """
14
    High level node object, to access node attribute,
15
    browse and populate address space.
16
    Node objects are usefull as-is but they do not expose the entire
17
    OPC-UA protocol. Feel free to look at the code of this class and call
18
    directly UA services methods to optimize your code
19
    """
20
21 1
    def __init__(self, server, nodeid):
22 1
        self.server = server
23 1
        self.nodeid = None
24 1
        if isinstance(nodeid, ua.NodeId):
25 1
            self.nodeid = nodeid
26 1
        elif type(nodeid) in (str, bytes):
27 1
            self.nodeid = ua.NodeId.from_string(nodeid)
28
        elif isinstance(nodeid, int):
29
            self.nodeid = ua.NodeId(nodeid, 0)
30
        else:
31
            raise ua.UaError("argument to node must be a NodeId object or a string defining a nodeid found {} of type {}".format(nodeid, type(nodeid)))
32
33 1
    def __eq__(self, other):
34 1
        if isinstance(other, Node) and self.nodeid == other.nodeid:
35 1
            return True
36 1
        return False
37
38 1
    def __ne__(self, other):
39
        return not self.__eq__(other)
40
41 1
    def __str__(self):
42
        return "Node({})".format(self.nodeid)
43 1
    __repr__ = __str__
44
45 1
    def __hash__(self):
46 1
        return self.nodeid.__hash__()
47
48 1
    def get_browse_name(self):
49
        """
50
        Get browse name of a node. A browse name is a QualifiedName object
51
        composed of a string(name) and a namespace index.
52
        """
53 1
        result = self.get_attribute(ua.AttributeIds.BrowseName)
54 1
        return result.Value.Value
55
56 1
    def get_display_name(self):
57
        """
58
        get description attribute of node
59
        """
60 1
        result = self.get_attribute(ua.AttributeIds.DisplayName)
61 1
        return result.Value.Value
62
63 1
    def get_data_type(self):
64
        """
65
        get data type of node
66
        """
67 1
        result = self.get_attribute(ua.AttributeIds.DataType)
68 1
        return result.Value.Value
69
70 1
    def get_node_class(self):
71
        """
72
        get node class attribute of node
73
        """
74
        result = self.get_attribute(ua.AttributeIds.NodeClass)
75
        return result.Value.Value
76
77 1
    def get_description(self):
78
        """
79
        get description attribute class of node
80
        """
81
        result = self.get_attribute(ua.AttributeIds.Description)
82
        return result.Value.Value
83
84 1
    def get_value(self):
85
        """
86
        Get value of a node as a python type. Only variables ( and properties) have values.
87
        An exception will be generated for other node types.
88
        """
89 1
        result = self.get_data_value()
90 1
        return result.Value.Value
91
92 1
    def get_data_value(self):
93
        """
94
        Get value of a node as a DataValue object. Only variables (and properties) have values.
95
        An exception will be generated for other node types.
96
        DataValue contain a variable value as a variant as well as server and source timestamps
97
        """
98 1
        return self.get_attribute(ua.AttributeIds.Value)
99
100 1
    def set_array_dimensions(self, value):
101
        """
102
        Set attribute ArrayDimensions of node
103
        make sure it has the correct data type
104
        """
105 1
        v = ua.Variant(value, ua.VariantType.UInt32)
106 1
        self.set_attribute(ua.AttributeIds.ArrayDimensions, ua.DataValue(v))
107
108 1
    def get_array_dimensions(self):
109
        """
110
        Read and return ArrayDimensions attribute of node
111
        """
112 1
        res = self.get_attribute(ua.AttributeIds.ArrayDimensions)
113 1
        return res.Value.Value
114
115 1
    def set_value_rank(self, value):
116
        """
117
        Set attribute ArrayDimensions of node
118
        """
119 1
        v = ua.Variant(value, ua.VariantType.Int32)
120 1
        self.set_attribute(ua.AttributeIds.ValueRank, ua.DataValue(v))
121
122 1
    def get_value_rank(self):
123
        """
124
        Read and return ArrayDimensions attribute of node
125
        """
126 1
        res = self.get_attribute(ua.AttributeIds.ValueRank)
127 1
        return res.Value.Value
128
129 1
    def set_value(self, value, varianttype=None):
130
        """
131
        Set value of a node. Only variables(properties) have values.
132
        An exception will be generated for other node types.
133
        value argument is either:
134
        * a python built-in type, converted to opc-ua
135
        optionnaly using the variantype argument.
136
        * a ua.Variant, varianttype is then ignored
137
        * a ua.DataValue, you then have full control over data send to server
138
        """
139 1
        datavalue = None
140 1
        if isinstance(value, ua.DataValue):
141 1
            datavalue = value
142 1
        elif isinstance(value, ua.Variant):
143 1
            datavalue = ua.DataValue(value)
144
        else:
145 1
            datavalue = ua.DataValue(ua.Variant(value, varianttype))
146 1
        self.set_attribute(ua.AttributeIds.Value, datavalue)
147
148 1
    set_data_value = set_value
149
150 1
    def set_writable(self, writable=True):
151
        """
152
        Set node as writable by clients.
153
        A node is always writable on server side.
154
        """
155 1
        if writable:
156 1
            self.set_attr_bit(ua.AttributeIds.AccessLevel, ua.AccessLevel.CurrentWrite)
157 1
            self.set_attr_bit(ua.AttributeIds.UserAccessLevel, ua.AccessLevel.CurrentWrite)
158
        else:
159 1
            self.unset_attr_bit(ua.AttributeIds.AccessLevel, ua.AccessLevel.CurrentWrite)
160 1
            self.unset_attr_bit(ua.AttributeIds.UserAccessLevel, ua.AccessLevel.CurrentWrite)
161
162 1
    def set_attr_bit(self, attr, bit):
163 1
        val = self.get_attribute(attr)
164 1
        val.Value.Value = ua.set_bit(val.Value.Value, bit)
165 1
        self.set_attribute(attr, val)
166
167 1
    def unset_attr_bit(self, attr, bit):
168 1
        val = self.get_attribute(attr)
169 1
        val.Value.Value = ua.unset_bit(val.Value.Value, bit)
170 1
        self.set_attribute(attr, val)
171
172 1
    def set_read_only(self):
173
        """
174
        Set a node as read-only for clients.
175
        A node is always writable on server side.
176
        """
177
        return self.set_writable(False)
178
179 1
    def set_attribute(self, attributeid, datavalue):
180
        """
181
        Set an attribute of a node
182
        attributeid is a member of ua.AttributeIds
183
        datavalue is a ua.DataValue object
184
        """
185 1
        attr = ua.WriteValue()
186 1
        attr.NodeId = self.nodeid
187 1
        attr.AttributeId = attributeid
188 1
        attr.Value = datavalue
189 1
        params = ua.WriteParameters()
190 1
        params.NodesToWrite = [attr]
191 1
        result = self.server.write(params)
192 1
        result[0].check()
193
194 1
    def get_attribute(self, attr):
195
        """
196
        Read one attribute of a node
197
        result code from server is checked and an exception is raised in case of error
198
        """
199 1
        rv = ua.ReadValueId()
200 1
        rv.NodeId = self.nodeid
201 1
        rv.AttributeId = attr
202 1
        params = ua.ReadParameters()
203 1
        params.NodesToRead.append(rv)
204 1
        result = self.server.read(params)
205 1
        result[0].StatusCode.check()
206 1
        return result[0]
207
208 1
    def get_attributes(self, attrs):
209
        """
210
        Read several attributes of a node
211
        list of DataValue is returned
212
        """
213
        params = ua.ReadParameters()
214
        for attr in attrs:
215
            rv = ua.ReadValueId()
216
            rv.NodeId = self.nodeid
217
            rv.AttributeId = attr
218
            params.NodesToRead.append(rv)
219
220
        results = self.server.read(params)
221
        return results
222
223 1
    def get_children(self, refs=ua.ObjectIds.HierarchicalReferences, nodeclassmask=ua.NodeClass.Unspecified):
224
        """
225
        Get all children of a node. By default hierarchical references and all node classes are returned.
226
        Other reference types may be given:
227
        References = 31
228
        NonHierarchicalReferences = 32
229
        HierarchicalReferences = 33
230
        HasChild = 34
231
        Organizes = 35
232
        HasEventSource = 36
233
        HasModellingRule = 37
234
        HasEncoding = 38
235
        HasDescription = 39
236
        HasTypeDefinition = 40
237
        GeneratesEvent = 41
238
        Aggregates = 44
239
        HasSubtype = 45
240
        HasProperty = 46
241
        HasComponent = 47
242
        HasNotifier = 48
243
        HasOrderedComponent = 49
244
        """
245 1
        references = self.get_children_descriptions(refs, nodeclassmask)
246 1
        nodes = []
247 1
        for desc in references:
248 1
            node = Node(self.server, desc.NodeId)
249 1
            nodes.append(node)
250 1
        return nodes
251
252 1
    def get_properties(self):
253
        """
254
        return properties of node.
255
        properties are child nodes with a reference of type HasProperty and a NodeClass of Variable
256
        """
257
        return self.get_children(refs=ua.ObjectIds.HasProperty, nodeclassmask=ua.NodeClass.Variable)
258
259 1
    def get_children_descriptions(self, refs=ua.ObjectIds.HierarchicalReferences, nodeclassmask=ua.NodeClass.Unspecified, includesubtypes=True):
260
        """
261
        return all attributes of child nodes as UA BrowseResult structs
262
        """
263 1
        desc = ua.BrowseDescription()
264 1
        desc.BrowseDirection = ua.BrowseDirection.Forward
265 1
        desc.ReferenceTypeId = ua.TwoByteNodeId(refs)
266 1
        desc.IncludeSubtypes = includesubtypes
267 1
        desc.NodeClassMask = nodeclassmask
268 1
        desc.ResultMask = ua.BrowseResultMask.All
269
270 1
        desc.NodeId = self.nodeid
271 1
        params = ua.BrowseParameters()
272 1
        params.View.Timestamp = ua.win_epoch_to_datetime(0)
273 1
        params.NodesToBrowse.append(desc)
274 1
        results = self.server.browse(params)
275 1
        return results[0].References
276
277 1
    def get_child(self, path):
278
        """
279
        get a child specified by its path from this node.
280
        A path might be:
281
        * a string representing a qualified name.
282
        * a qualified name
283
        * a list of string
284
        * a list of qualified names
285
        """
286 1
        if type(path) not in (list, tuple):
287 1
            path = [path]
288 1
        rpath = ua.RelativePath()
289 1
        for item in path:
290 1
            el = ua.RelativePathElement()
291 1
            el.ReferenceTypeId = ua.TwoByteNodeId(ua.ObjectIds.HierarchicalReferences)
292 1
            el.IsInverse = False
293 1
            el.IncludeSubtypes = True
294 1
            if isinstance(item, ua.QualifiedName):
295
                el.TargetName = item
296
            else:
297 1
                el.TargetName = ua.QualifiedName.from_string(item)
298 1
            rpath.Elements.append(el)
299 1
        bpath = ua.BrowsePath()
300 1
        bpath.StartingNode = self.nodeid
301 1
        bpath.RelativePath = rpath
302 1
        result = self.server.translate_browsepaths_to_nodeids([bpath])
303 1
        result = result[0]
304 1
        result.StatusCode.check()
305
        # FIXME: seems this method may return several nodes
306 1
        return Node(self.server, result.Targets[0].TargetId)
307
308 1
    def read_raw_history(self, starttime=None, endtime=None, numvalues=0):
309
        """
310
        Read raw history of a node
311
        result code from server is checked and an exception is raised in case of error
312
        If numvalues is > 0 and number of events in period is > numvalues
313
        then result will be truncated
314
        """
315 1
        details = ua.ReadRawModifiedDetails()
316 1
        details.IsReadModified = False
317 1
        if starttime:
318 1
            details.StartTime = starttime
319
        else:
320 1
            details.StartTime = ua.DateTimeMinValue
321 1
        if endtime:
322 1
            details.EndTime = endtime
323
        else:
324 1
            details.EndTime = ua.DateTimeMinValue
325 1
        details.NumValuesPerNode = numvalues
326 1
        details.ReturnBounds = True
327 1
        result = self.history_read(details)
328 1
        return result.HistoryData.DataValues
329
330 1
    def history_read(self, details):
331
        """
332
        Read raw history of a node, low-level function
333
        result code from server is checked and an exception is raised in case of error
334
        """
335 1
        valueid = ua.HistoryReadValueId()
336 1
        valueid.NodeId = self.nodeid
337 1
        valueid.IndexRange = ''
338
339 1
        params = ua.HistoryReadParameters()
340 1
        params.HistoryReadDetails = details
341 1
        params.TimestampsToReturn = ua.TimestampsToReturn.Both
342 1
        params.ReleaseContinuationPoints = False
343 1
        params.NodesToRead.append(valueid)
344 1
        result = self.server.history_read(params)[0]
345 1
        return result
346
347
    # Hack for convenience methods
348
    # local import is ugly but necessary for python2 support
349
    # feel fri to propose something better but I want to split all those
350
    # create methods from Node
351
352 1
    def add_folder(*args, **kwargs):
353 1
        from opcua.common import manage_nodes
354 1
        return manage_nodes.create_folder(*args, **kwargs)
355
356 1
    def add_object(*args, **kwargs):
357 1
        from opcua.common import manage_nodes
358 1
        return manage_nodes.create_object(*args, **kwargs)
359
360 1
    def add_variable(*args, **kwargs):
361 1
        from opcua.common import manage_nodes
362 1
        return manage_nodes.create_variable(*args, **kwargs)
363
364 1
    def add_property(*args, **kwargs):
365 1
        from opcua.common import manage_nodes
366 1
        return manage_nodes.create_property(*args, **kwargs)
367
368 1
    def add_method(*args, **kwargs):
369 1
        from opcua.common import manage_nodes
370 1
        return manage_nodes.create_method(*args, **kwargs)
371
372 1
    def call_method(*args, **kwargs):
373 1
        from opcua.common import methods
374
        return methods.call_method(*args, **kwargs)
375