Completed
Push — master ( e5fff1...bc270d )
by Olivier
03:39
created

get_nodes_of_namespace()   F

Complexity

Conditions 9

Size

Total Lines 26

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 9
c 1
b 0
f 0
dl 0
loc 26
rs 3
1
"""
2
Usefull method and classes not belonging anywhere and depending on opcua library
3
"""
4
5
from dateutil import parser
6
from datetime import datetime
7
from enum import Enum, IntEnum
8
import uuid
9
10
from opcua import ua
11
from opcua.ua.uaerrors import UaError
12
13
14
def val_to_string(val):
15
    """
16
    convert a python object or python-opcua object to a string
17
    which should be easy to understand for human
18
    easy to modify, and not too hard to parse back ....not easy
19
    meant for UI or command lines
20
21
    """
22
    if isinstance(val, (list, tuple)):
23
        res = []
24
        for v in val:
25
            res.append(val_to_string(v))
26
        return "[" + ", ".join(res) + "]"
27
28
    if hasattr(val, "to_string"):
29
        val = val.to_string()
30
    elif isinstance(val, ua.StatusCode):
31
        val = val.name
32
    elif isinstance(val, (Enum, IntEnum)):
33
        val = val.name
34
    elif isinstance(val, ua.DataValue):
35
        val = variant_to_string(val.Value)
36
    elif isinstance(val, str):
37
        pass
38
    elif isinstance(val, bytes):
39
        val = str(val)
40
    elif isinstance(val, datetime):
41
        val = val.isoformat()
42
    elif isinstance(val, (int, float)):
43
        val = str(val)
44
    else:
45
        # FIXME: Some types are probably missing!
46
        val = str(val)
47
    return val
48
49
50
def variant_to_string(var):
51
    """
52
    convert a variant to a string which should be easy to understand for human
53
    easy to modify, and not too hard to parse back ....not easy
54
    meant for UI or command lines
55
    """
56
    return val_to_string(var.Value)
57
58
59
def string_to_val(string, vtype):
60
    """
61
    Convert back a string to a python or python-opcua object
62
    Note: no error checking is done here, supplying null strings could raise exceptions (datetime and guid)
63
    """
64
    string = string.strip()
65
    if string.startswith("["):
66
        string = string[1:-1]
67
        var = []
68
        for s in string.split(","):
69
            s = s.strip()
70
            val = string_to_val(s, vtype)
71
            var.append(val)
72
        return var
73
74
    if vtype == ua.VariantType.Null:
75
        val = None
76
    elif vtype == ua.VariantType.Boolean:
77
        if string in ("True", "true", "on", "On", "1"):
78
            val = True
79
        else:
80
            val = False
81
    elif vtype in (ua.VariantType.Int16, ua.VariantType.Int32, ua.VariantType.Int64):
82
            val = int(string)
83
    elif vtype in (ua.VariantType.UInt16, ua.VariantType.UInt32, ua.VariantType.UInt64):
84
        val = int(string)
85
    elif vtype in (ua.VariantType.Float, ua.VariantType.Double):
86
        val = float(string)
87
    elif vtype in (ua.VariantType.String, ua.VariantType.XmlElement):
88
        val = string
89
    elif vtype in (ua.VariantType.SByte, ua.VariantType.ByteString):
90
        val = bytes(string)
91
    elif vtype in (ua.VariantType.NodeId, ua.VariantType.ExpandedNodeId):
92
        val = ua.NodeId.from_string(string)
93
    elif vtype == ua.VariantType.QualifiedName:
94
        val = ua.QualifiedName.from_string(string)
95
    elif vtype == ua.VariantType.DateTime:
96
        val = parser.parse(string)
97
    elif vtype == ua.VariantType.LocalizedText:
98
        val = ua.LocalizedText(string)
99
    elif vtype == ua.VariantType.StatusCode:
100
        val = ua.StatusCode(string)
101
    elif vtype == ua.VariantType.Guid:
102
        val = uuid.UUID(string)
103
    else:
104
        # FIXME: Some types are probably missing!
105
        raise NotImplementedError
106
    return val
107
108
109
def string_to_variant(string, vtype):
110
    """
111
    convert back a string to an ua.Variant
112
    """
113
    return ua.Variant(string_to_val(string, vtype), vtype)
114
115
116
def get_node_children(node, nodes=None):
117
    """
118
    Get recursively all children of a node
119
    """
120
    if nodes is None:
121
        nodes = [node]
122
    for child in node.get_children():
123
        nodes.append(child)
124
        get_node_children(child, nodes)
125
    return nodes
126
127
128
def get_node_subtypes(node, nodes=None):
129
    if nodes is None:
130
        nodes = [node]
131
    for child in node.get_children(refs=ua.ObjectIds.HasSubtype):
132
        nodes.append(child)
133
        get_node_subtypes(child, nodes)
134
    return nodes
135
136
137
def get_node_supertypes(node, includeitself=False, skipbase=True):
138
    """
139
    return get all subtype parents of node recursive
140
    :param node: can be a ua.Node or ua.NodeId
141
    :param includeitself: include also node to the list
142
    :param skipbase don't include the toplevel one
143
    :returns list of ua.Node, top parent first 
144
    """
145
    parents = []
146
    if includeitself:
147
        parents.append(node)
148
    parents.extend(_get_node_supertypes(node))
149
    if skipbase and len(parents) > 1:
150
        parents = parents[:-1]
151
152
    return parents
153
154
155
def _get_node_supertypes(node):
156
    """
157
    recursive implementation of get_node_derived_from_types
158
    """
159
    basetypes = []
160
    parents = node.get_referenced_nodes(refs=ua.ObjectIds.HasSubtype, direction=ua.BrowseDirection.Inverse, includesubtypes=True)
161
    if len(parents) != 0:
162
        # TODO: Is it possible to have multiple subtypes ? If so extended support for it
163
        basetypes.append(parents[0])
164
        basetypes.extend(_get_node_supertypes(parents[0]))
165
166
    return basetypes
167
168
169
def is_child_present(node, browsename):
170
    """
171
    return if a browsename is present a child from the provide node
172
    :param node: node wherein to find the browsename
173
    :param browsename: browsename to search
174
    :returns returne True if the browsename is present else False 
175
    """
176
    child_descs = node.get_children_descriptions()
177
    for child_desc in child_descs:
178
        if child_desc.BrowseName == browsename:
179
            return True
180
181
    return False
182
183
def _get_var_basetypes(server, datatype):
184
    """
185
    Looks up the super datatypes of the provided datatype.
186
    Special case for buildin datatypes where ns=0 and i<=30 then it return itself, no other super are returned.  
187
         
188
    Args:
189
        datatype: NodeId of a datype of a variable
190
    Returns:
191
        Nodes of datatype base or an expection in case base datype can not be determined
192
    """
193
    # first check if datatype is a simple built in type
194
    if datatype.NamespaceIndex == 0 and type(datatype) == ua.NumericNodeId and datatype.Identifier <= 30:
195
        return [datatype]
196
    # now handle some special cases
197
    parents = get_node_supertypes(server.get_node(datatype), includeitself=True, skipbase=True)
198
    return parents
199
200
def dtype_to_vtype(server, dtype_node):
201
    """
202
    Given a node datatype, find out the variant type to encode
203
    data. This is not exactly straightforward...
204
    """
205
    parents = _get_var_basetypes(server, dtype_node.nodeid)
206
    if not parents:
207
        raise ua.UaError("Datatype must be a subtype of builtin types")
208
    # TODO: parents[0] doesn't have to be an build in type. Is this correct use ? .
209
    parent = parents[0]
210
211
    if parent.nodeid.Identifier == 29:
212
        # we have an enumeration, we need to llok at child to find type
213
        descs = dtype_node.get_children_descriptions()
214
        bnames = [d.BrowseName.Name for d in descs]
215
        if "EnumStrings" in bnames:
216
            return ua.VariantType.LocalizedText
217
        elif "EnumValues" in bnames:
218
            return ua.VariantType.ExtensionObject
219
        else:
220
            raise ua.UaError("Enumeration must have a child node describing its type and values")
221
222
    return dtype_to_vtype(server, parents[0])
223
224
def get_variable_basetype(server, datatype_id):
225
    """
226
    Looks up the base datatype of the provided datatype. 
227
    The base datatype is either:
228
    A primitive type (ns=0, i<=21) or a complex one (ns=0 i>21 and i<=30) like Enum and Struct.
229
    
230
    Args:
231
        datatype: NodeId of a datype of a variable
232
    Returns:
233
        NodeId of datatype base or None in case base datype can not be determined
234
    """
235
    parents = _get_var_basetypes(server, datatype_id)
236
237
    dtype_supers_nodeids = [node.nodeid for node in parents if node.nodeid.NamespaceIndex == 0 and  node.nodeid.Identifier <= 30]
238
    if not dtype_supers_nodeids:
239
        raise ua.UaError("Datatype must be a subtype of builtin types %s" % datatype_id.nodeid)
240
    return dtype_supers_nodeids[0]
241
242
def get_nodes_of_namespace(server, namespaces=[]):
243
    """
244
    Get the nodes of one or more namespaces .      
245
    Args:
246
        server: opc ua server to use
247
        namespaces: list of string uri or int indexes of the namespace to export
248
    Returns:
249
        List of nodes that are part of the provided namespaces
250
    """
251
    ns_available = server.get_namespace_array()
252
253
    if not namespaces:
254
        namespaces = ns_available[1:]
255
    elif isinstance(namespaces, (str, int)):
256
        namespaces = [namespaces]
257
258
    # make sure all namespace are indexes (if needed convert strings to indexes)
259
    namespace_indexes = [ n if isinstance(n, int) else ns_available.index(n) for n in namespaces ]
260
    ns_count = len(ns_available)
261
    if max(namespace_indexes) >= ns_count:
262
        raise ua.UaError("Namespace index '%d' is invalid" % max(namespace_indexes))
263
264
    # filter nodeis based on the provide namespaces and convert the nodeid to a node
265
    nodes = [server.get_node(nodeid) for nodeid in server.iserver.aspace.keys()
266
             if nodeid.NamespaceIndex != 0 and nodeid.NamespaceIndex in namespace_indexes]
267
    return nodes
268