Completed
Push — master ( 52074e...41c04b )
by Olivier
03:37
created

get_nodes_of_namespace()   D

Complexity

Conditions 8

Size

Total Lines 23

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 8
c 1
b 0
f 0
dl 0
loc 23
rs 4.7619
1
"""
2
Usefull method and classes not belonging anywhere and depending on opcua library
3
"""
4
5
from dateutil import parser
6
from datetime import datetime
7
from enum import Enum, IntEnum
8
import uuid
9
10
from opcua import ua
11
from opcua.ua.uaerrors import UaError
12
13
14
def val_to_string(val):
15
    """
16
    convert a python object or python-opcua object to a string
17
    which should be easy to understand for human
18
    easy to modify, and not too hard to parse back ....not easy
19
    meant for UI or command lines
20
21
    """
22
    if isinstance(val, (list, tuple)):
23
        res = []
24
        for v in val:
25
            res.append(val_to_string(v))
26
        return "[" + ", ".join(res) + "]"
27
28
    if hasattr(val, "to_string"):
29
        val = val.to_string()
30
    elif isinstance(val, ua.StatusCode):
31
        val = val.name
32
    elif isinstance(val, (Enum, IntEnum)):
33
        val = val.name
34
    elif isinstance(val, ua.DataValue):
35
        val = variant_to_string(val.Value)
36
    elif isinstance(val, str):
37
        pass
38
    elif isinstance(val, bytes):
39
        val = str(val)
40
    elif isinstance(val, datetime):
41
        val = val.isoformat()
42
    elif isinstance(val, (int, float)):
43
        val = str(val)
44
    else:
45
        # FIXME: Some types are probably missing!
46
        val = str(val)
47
    return val
48
49
50
def variant_to_string(var):
51
    """
52
    convert a variant to a string which should be easy to understand for human
53
    easy to modify, and not too hard to parse back ....not easy
54
    meant for UI or command lines
55
    """
56
    return val_to_string(var.Value)
57
58
59
def string_to_val(string, vtype):
60
    """
61
    Convert back a string to a python or python-opcua object
62
    Note: no error checking is done here, supplying null strings could raise exceptions (datetime and guid)
63
    """
64
    string = string.strip()
65
    if string.startswith("["):
66
        string = string[1:-1]
67
        var = []
68
        for s in string.split(","):
69
            s = s.strip()
70
            val = string_to_val(s, vtype)
71
            var.append(val)
72
        return var
73
74
    if vtype == ua.VariantType.Null:
75
        val = None
76
    elif vtype == ua.VariantType.Boolean:
77
        if string in ("True", "true", "on", "On", "1"):
78
            val = True
79
        else:
80
            val = False
81
    elif vtype in (ua.VariantType.Int16, ua.VariantType.Int32, ua.VariantType.Int64):
82
            val = int(string)
83
    elif vtype in (ua.VariantType.UInt16, ua.VariantType.UInt32, ua.VariantType.UInt64):
84
        val = int(string)
85
    elif vtype in (ua.VariantType.Float, ua.VariantType.Double):
86
        val = float(string)
87
    elif vtype in (ua.VariantType.String, ua.VariantType.XmlElement):
88
        val = string
89
    elif vtype in (ua.VariantType.SByte, ua.VariantType.ByteString):
90
        val = bytes(string)
91
    elif vtype in (ua.VariantType.NodeId, ua.VariantType.ExpandedNodeId):
92
        val = ua.NodeId.from_string(string)
93
    elif vtype == ua.VariantType.QualifiedName:
94
        val = ua.QualifiedName.from_string(string)
95
    elif vtype == ua.VariantType.DateTime:
96
        val = parser.parse(string)
97
    elif vtype == ua.VariantType.LocalizedText:
98
        val = ua.LocalizedText(string)
99
    elif vtype == ua.VariantType.StatusCode:
100
        val = ua.StatusCode(string)
101
    elif vtype == ua.VariantType.Guid:
102
        val = uuid.UUID(string)
103
    else:
104
        # FIXME: Some types are probably missing!
105
        raise NotImplementedError
106
    return val
107
108
109
def string_to_variant(string, vtype):
110
    """
111
    convert back a string to an ua.Variant
112
    """
113
    return ua.Variant(string_to_val(string, vtype), vtype)
114
115
116
def get_node_children(node, nodes=None):
117
    """
118
    Get recursively all children of a node
119
    """
120
    if nodes is None:
121
        nodes = [node]
122
    for child in node.get_children():
123
        nodes.append(child)
124
        get_node_children(child, nodes)
125
    return nodes
126
127
128
def get_node_subtypes(node, nodes=None):
129
    if nodes is None:
130
        nodes = [node]
131
    for child in node.get_children(refs=ua.ObjectIds.HasSubtype):
132
        nodes.append(child)
133
        get_node_subtypes(child, nodes)
134
    return nodes
135
136
137
def get_node_supertypes(node, includeitself=False, skipbase=True):
138
    """
139
    return get all subtype parents of node recursive
140
    :param node: can be a ua.Node or ua.NodeId
141
    :param includeitself: include also node to the list
142
    :param skipbase don't include the toplevel one
143
    :returns list of ua.Node, top parent first 
144
    """
145
    parents = []
146
    if includeitself:
147
        parents.append(node)
148
    parents.extend(_get_node_supertypes(node))
149
    if skipbase and len(parents) > 1:
150
        parents = parents[:-1]
151
152
    return parents
153
154
155
def _get_node_supertypes(node):
156
    """
157
    recursive implementation of get_node_derived_from_types
158
    """
159
    basetypes = []
160
    parent = get_node_supertype(node)
161
    if parent:
162
        basetypes.append(parent)
163
        basetypes.extend(_get_node_supertypes(parent))
164
165
    return basetypes
166
167
168
def get_node_supertype(node):
169
    """
170
    return node supertype or None
171
    """
172
    supertypes = node.get_referenced_nodes(refs=ua.ObjectIds.HasSubtype,
173
                                           direction=ua.BrowseDirection.Inverse,
174
                                           includesubtypes=True)
175
    if supertypes:
176
        return supertypes[0]
177
    else:
178
        return None
179
180
181
def is_child_present(node, browsename):
182
    """
183
    return if a browsename is present a child from the provide node
184
    :param node: node wherein to find the browsename
185
    :param browsename: browsename to search
186
    :returns returne True if the browsename is present else False 
187
    """
188
    child_descs = node.get_children_descriptions()
189
    for child_desc in child_descs:
190
        if child_desc.BrowseName == browsename:
191
            return True
192
193
    return False
194
195
196
def data_type_to_variant_type(dtype_node):
197
    """
198
    Given a Node datatype, find out the variant type to encode
199
    data. This is not exactly straightforward...
200
    """
201
    base = get_base_data_type(dtype_node)
202
203
    if base.nodeid.Identifier != 29:
204
        return ua.VariantType(base.nodeid.Identifier)
205
    else:
206
        # we have an enumeration, we need to look at child to find type
207
        descs = dtype_node.get_children_descriptions()
208
        bnames = [d.BrowseName.Name for d in descs]
209
        if "EnumStrings" in bnames:
210
            return ua.VariantType.LocalizedText
211
        elif "EnumValues" in bnames:
212
            return ua.VariantType.ExtensionObject
213
        else:
214
            raise ua.UaError("Enumeration must have a child node describing its type and values")
215
216
217
def get_base_data_type(datatype):
218
    """
219
    Looks up the base datatype of the provided datatype Node
220
    The base datatype is either:
221
    A primitive type (ns=0, i<=21) or a complex one (ns=0 i>21 and i<=30) like Enum and Struct.
222
    
223
    Args:
224
        datatype: NodeId of a datype of a variable
225
    Returns:
226
        NodeId of datatype base or None in case base datype can not be determined
227
    """
228
    base = datatype
229
    while base:
230
        if base.nodeid.NamespaceIndex == 0 and isinstance(base.nodeid.Identifier, int) and base.nodeid.Identifier <= 30:
231
            return base
232
        base = get_node_supertype(base)
233
    raise ua.UaError("Datatype must be a subtype of builtin types %s" % datatype)
234
235
236
def get_nodes_of_namespace(server, namespaces=[]):
237
    """
238
    Get the nodes of one or more namespaces .      
239
    Args:
240
        server: opc ua server to use
241
        namespaces: list of string uri or int indexes of the namespace to export
242
    Returns:
243
        List of nodes that are part of the provided namespaces
244
    """
245
    ns_available = server.get_namespace_array()
246
247
    if not namespaces:
248
        namespaces = ns_available[1:]
249
    elif isinstance(namespaces, (str, int)):
250
        namespaces = [namespaces]
251
252
    # make sure all namespace are indexes (if needed convert strings to indexes)
253
    namespace_indexes = [n if isinstance(n, int) else ns_available.index(n) for n in namespaces]
254
255
    # filter nodeis based on the provide namespaces and convert the nodeid to a node
256
    nodes = [server.get_node(nodeid) for nodeid in server.iserver.aspace.keys()
257
             if nodeid.NamespaceIndex != 0 and nodeid.NamespaceIndex in namespace_indexes]
258
    return nodes
259