asyncua.common.ua_utils   F
last analyzed

Complexity

Total Complexity 77

Size/Duplication

Total Lines 306
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 175
dl 0
loc 306
rs 2.24
c 0
b 0
f 0
wmc 77

16 Functions

Rating   Name   Duplication   Size   Complexity  
F val_to_string() 0 41 16
A get_nodes_of_namespace() 0 27 5
A get_base_data_type() 0 17 5
F string_to_val() 0 59 21
A get_node_supertypes() 0 15 4
A get_node_subtypes() 0 7 3
A is_child_present() 0 12 3
A data_type_to_string() 0 7 3
A value_to_datavalue() 0 13 3
A data_type_to_variant_type() 0 11 2
A get_default_value() 0 7 3
A string_to_variant() 0 5 1
A variant_to_string() 0 7 1
A get_node_supertype() 0 11 2
A get_node_children() 0 10 3
A _get_node_supertypes() 0 11 2

How to fix   Complexity   

Complexity

Complex classes like asyncua.common.ua_utils often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
Usefull method and classes not belonging anywhere and depending on asyncua library
3
"""
4
5
from dateutil import parser
6
from datetime import datetime
7
from enum import Enum, IntEnum
8
import uuid
9
import logging
10
11
from asyncua import ua
12
13
logger = logging.getLogger('__name__')
14
15
16
def value_to_datavalue(val, varianttype=None):
17
    """
18
    convert anyting to a DataValue using varianttype
19
    """
20
    if isinstance(val, ua.DataValue):
21
        datavalue = val
22
    elif isinstance(val, ua.Variant):
23
        datavalue = ua.DataValue(val)
24
        datavalue.SourceTimestamp = datetime.utcnow()
25
    else:
26
        datavalue = ua.DataValue(ua.Variant(val, varianttype))
27
        datavalue.SourceTimestamp = datetime.utcnow()
28
    return datavalue
29
30
31
def val_to_string(val, truncate=False):
32
    """
33
    convert a python object or python-asyncua object to a string
34
    which should be easy to understand for human
35
    easy to modify, and not too hard to parse back ....not easy
36
    meant for UI or command lines
37
    if truncate is true then huge strings or bytes are tuncated
38
39
    """
40
    if isinstance(val, (list, tuple)):
41
        res = []
42
        for v in val:
43
            res.append(val_to_string(v))
44
        return "[{}]".format(", ".join(res))
45
46
    if hasattr(val, "to_string"):
47
        val = val.to_string()
48
    elif isinstance(val, ua.StatusCode):
49
        val = val.name
50
    elif isinstance(val, (Enum, IntEnum)):
51
        val = val.name
52
    elif isinstance(val, ua.DataValue):
53
        val = variant_to_string(val.Value)
54
    elif isinstance(val, ua.XmlElement):
55
        val = val.Value
56
    elif isinstance(val, str):
57
        if truncate and len(val) > 100:
58
            val = val[:10] + "...." + val[-10:]
59
    elif isinstance(val, bytes):
60
        if truncate and len(val) > 100:
61
            val = val[:10].decode("utf-8", errors="replace") + "...." + val[-10:].decode("utf-8", errors="replace")
62
        else:
63
            val = val.decode("utf-8", errors="replace")
64
    elif isinstance(val, datetime):
65
        val = val.isoformat()
66
    elif isinstance(val, (int, float)):
67
        val = str(val)
68
    else:
69
        # FIXME: Some types are probably missing!
70
        val = str(val)
71
    return val
72
73
74
def variant_to_string(var):
75
    """
76
    convert a variant to a string which should be easy to understand for human
77
    easy to modify, and not too hard to parse back ....not easy
78
    meant for UI or command lines
79
    """
80
    return val_to_string(var.Value)
81
82
83
def string_to_val(string, vtype):
84
    """
85
    Convert back a string to a python or python-asyncua object
86
    Note: no error checking is done here, supplying null strings could raise exceptions (datetime and guid)
87
    """
88
    string = string.strip()
89
    if string.startswith("["):
90
        string = string[1:-1]
91
        var = []
92
        for s in string.split(","):
93
            s = s.strip()
94
            val = string_to_val(s, vtype)
95
            var.append(val)
96
        return var
97
98
    if vtype == ua.VariantType.Null:
99
        val = None
100
    elif vtype == ua.VariantType.Boolean:
101
        if string in ("True", "true", "on", "On", "1"):
102
            val = True
103
        else:
104
            val = False
105
    elif vtype in (ua.VariantType.SByte, ua.VariantType.Int16, ua.VariantType.Int32, ua.VariantType.Int64):
106
        if not string:
107
            val = 0
108
        else:
109
            val = int(string)
110
    elif vtype in (ua.VariantType.Byte, ua.VariantType.UInt16, ua.VariantType.UInt32, ua.VariantType.UInt64):
111
        if not string:
112
            val = 0
113
        else:
114
            val = int(string)
115
    elif vtype in (ua.VariantType.Float, ua.VariantType.Double):
116
        if not string:
117
            val = 0.0
118
        else:
119
            val = float(string)
120
    elif vtype == ua.VariantType.XmlElement:
121
        val = ua.XmlElement(string)
122
    elif vtype == ua.VariantType.String:
123
        val = string
124
    elif vtype == ua.VariantType.ByteString:
125
        val = string.encode()
126
    elif vtype in (ua.VariantType.NodeId, ua.VariantType.ExpandedNodeId):
127
        val = ua.NodeId.from_string(string)
128
    elif vtype == ua.VariantType.QualifiedName:
129
        val = ua.QualifiedName.from_string(string)
130
    elif vtype == ua.VariantType.DateTime:
131
        val = parser.parse(string)
132
    elif vtype == ua.VariantType.LocalizedText:
133
        val = ua.LocalizedText.from_string(string)
134
    elif vtype == ua.VariantType.StatusCode:
135
        val = ua.StatusCode(string)
136
    elif vtype == ua.VariantType.Guid:
137
        val = uuid.UUID(string)
138
    else:
139
        # FIXME: Some types are probably missing!
140
        raise NotImplementedError
141
    return val
142
143
144
def string_to_variant(string, vtype):
145
    """
146
    convert back a string to an ua.Variant
147
    """
148
    return ua.Variant(string_to_val(string, vtype), vtype)
149
150
151
async def get_node_children(node, nodes=None):
152
    """
153
    Get recursively all children of a node
154
    """
155
    if nodes is None:
156
        nodes = [node]
157
    for child in await node.get_children():
158
        nodes.append(child)
159
        await get_node_children(child, nodes)
160
    return nodes
161
162
163
async def get_node_subtypes(node, nodes=None):
164
    if nodes is None:
165
        nodes = [node]
166
    for child in await node.get_children(refs=ua.ObjectIds.HasSubtype):
167
        nodes.append(child)
168
        await get_node_subtypes(child, nodes)
169
    return nodes
170
171
172
async def get_node_supertypes(node, includeitself=False, skipbase=True):
173
    """
174
    return get all subtype parents of node recursive
175
    :param node: can be a ua.Node or ua.NodeId
176
    :param includeitself: include also node to the list
177
    :param skipbase don't include the toplevel one
178
    :returns list of ua.Node, top parent first
179
    """
180
    parents = []
181
    if includeitself:
182
        parents.append(node)
183
    parents.extend(await _get_node_supertypes(node))
184
    if skipbase and len(parents) > 1:
185
        parents = parents[:-1]
186
    return parents
187
188
189
async def _get_node_supertypes(node):
190
    """
191
    recursive implementation of get_node_derived_from_types
192
    """
193
    basetypes = []
194
    parent = await get_node_supertype(node)
195
    if parent:
196
        basetypes.append(parent)
197
        basetypes.extend(await _get_node_supertypes(parent))
198
199
    return basetypes
200
201
202
async def get_node_supertype(node):
203
    """
204
    return node supertype or None
205
    """
206
    supertypes = await node.get_referenced_nodes(
207
        refs=ua.ObjectIds.HasSubtype, direction=ua.BrowseDirection.Inverse, includesubtypes=True
208
    )
209
    if supertypes:
210
        return supertypes[0]
211
    else:
212
        return None
213
214
215
async def is_child_present(node, browsename):
216
    """
217
    return if a browsename is present a child from the provide node
218
    :param node: node wherein to find the browsename
219
    :param browsename: browsename to search
220
    :returns returne True if the browsename is present else False
221
    """
222
    child_descs = await node.get_children_descriptions()
223
    for child_desc in child_descs:
224
        if child_desc.BrowseName == browsename:
225
            return True
226
    return False
227
228
229
async def data_type_to_variant_type(dtype_node):
230
    """
231
    Given a Node datatype, find out the variant type to encode
232
    data. This is not exactly straightforward...
233
    """
234
    base = await get_base_data_type(dtype_node)
235
    if base.nodeid.Identifier != 29:
236
        return ua.VariantType(base.nodeid.Identifier)
237
    else:
238
        # we have an enumeration, value is a Int32
239
        return ua.VariantType.Int32
240
241
242
async def get_base_data_type(datatype):
243
    """
244
    Looks up the base datatype of the provided datatype Node
245
    The base datatype is either:
246
    A primitive type (ns=0, i<=21) or a complex one (ns=0 i>21 and i<=30) like Enum and Struct.
247
248
    Args:
249
        datatype: NodeId of a datype of a variable
250
    Returns:
251
        NodeId of datatype base or None in case base datype can not be determined
252
    """
253
    base = datatype
254
    while base:
255
        if base.nodeid.NamespaceIndex == 0 and isinstance(base.nodeid.Identifier, int) and base.nodeid.Identifier <= 30:
256
            return base
257
        base = await get_node_supertype(base)
258
    raise ua.UaError(f"Datatype must be a subtype of builtin types {str(datatype)}")
259
260
261
async def get_nodes_of_namespace(server, namespaces=None):
262
    """
263
    Get the nodes of one or more namespaces .
264
    Args:
265
        server: opc ua server to use
266
        namespaces: list of string uri or int indexes of the namespace to export
267
    Returns:
268
        List of nodes that are part of the provided namespaces
269
    """
270
    if namespaces is None:
271
        namespaces = []
272
    ns_available = await server.get_namespace_array()
273
274
    if not namespaces:
275
        namespaces = ns_available[1:]
276
    elif isinstance(namespaces, (str, int)):
277
        namespaces = [namespaces]
278
279
    # make sure all namespace are indexes (if needed convert strings to indexes)
280
    namespace_indexes = [n if isinstance(n, int) else ns_available.index(n) for n in namespaces]
281
282
    # filter nodeis based on the provide namespaces and convert the nodeid to a node
283
    nodes = [
284
        server.get_node(nodeid) for nodeid in server.iserver.aspace.keys()
285
        if nodeid.NamespaceIndex != 0 and nodeid.NamespaceIndex in namespace_indexes
286
    ]
287
    return nodes
288
289
290
def get_default_value(uatype):
291
    if isinstance(uatype, ua.VariantType):
292
        return ua.get_default_values(uatype)
293
    elif hasattr(ua.VariantType, uatype):
294
        return ua.get_default_value(getattr(ua.VariantType, uatype))
295
    else:
296
        return getattr(ua, uatype)()
297
298
299
def data_type_to_string(dtype):
300
    # we could just display browse name of node but it requires a query
301
    if dtype.NamespaceIndex == 0 and dtype.Identifier in ua.ObjectIdNames:
302
        string = ua.ObjectIdNames[dtype.Identifier]
303
    else:
304
        string = dtype.to_string()
305
    return string
306