Passed
Pull Request — master (#76)
by Olivier
02:44
created

asyncua.common.ua_utils.value_to_datavalue()   A

Complexity

Conditions 3

Size

Total Lines 14
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 10
nop 2
dl 0
loc 14
rs 9.9
c 0
b 0
f 0
1
"""
2
Usefull method and classes not belonging anywhere and depending on asyncua library
3
"""
4
5
from dateutil import parser
6
from datetime import datetime
7
from enum import Enum, IntEnum
8
import uuid
9
import logging
10
11
from asyncua import ua
12
13
logger = logging.getLogger('__name__')
14
15
16
def value_to_datavalue(val, varianttype=None):
17
    """
18
    convert anyting to a DataValue using varianttype
19
    """
20
    datavalue = None
21
    if isinstance(val, ua.DataValue):
22
        datavalue = val
23
    elif isinstance(val, ua.Variant):
24
        datavalue = ua.DataValue(val)
25
        datavalue.SourceTimestamp = datetime.utcnow()
26
    else:
27
        datavalue = ua.DataValue(ua.Variant(val, varianttype))
28
        datavalue.SourceTimestamp = datetime.utcnow()
29
    return datavalue
30
31
32
def val_to_string(val, truncate=False):
33
    """
34
    convert a python object or python-asyncua object to a string
35
    which should be easy to understand for human
36
    easy to modify, and not too hard to parse back ....not easy
37
    meant for UI or command lines
38
    if truncate is true then huge strings or bytes are tuncated
39
40
    """
41
    if isinstance(val, (list, tuple)):
42
        res = []
43
        for v in val:
44
            res.append(val_to_string(v))
45
        return "[{}]".format(", ".join(res))
46
47
    if hasattr(val, "to_string"):
48
        val = val.to_string()
49
    elif isinstance(val, ua.StatusCode):
50
        val = val.name
51
    elif isinstance(val, (Enum, IntEnum)):
52
        val = val.name
53
    elif isinstance(val, ua.DataValue):
54
        val = variant_to_string(val.Value)
55
    elif isinstance(val, ua.XmlElement):
56
        val = val.Value
57
    elif isinstance(val, str):
58
        if truncate and len(val) > 100:
59
            val = val[:10] + "...." + val[-10:]
60
    elif isinstance(val, bytes):
61
        if truncate and len(val) > 100:
62
            val = val[:10].decode("utf-8", errors="replace") + "...." + val[-10:].decode("utf-8", errors="replace")
63
        else:
64
            val = val.decode("utf-8", errors="replace")
65
    elif isinstance(val, datetime):
66
        val = val.isoformat()
67
    elif isinstance(val, (int, float)):
68
        val = str(val)
69
    else:
70
        # FIXME: Some types are probably missing!
71
        val = str(val)
72
    return val
73
74
75
def variant_to_string(var):
76
    """
77
    convert a variant to a string which should be easy to understand for human
78
    easy to modify, and not too hard to parse back ....not easy
79
    meant for UI or command lines
80
    """
81
    return val_to_string(var.Value)
82
83
84
def string_to_val(string, vtype):
85
    """
86
    Convert back a string to a python or python-asyncua object
87
    Note: no error checking is done here, supplying null strings could raise exceptions (datetime and guid)
88
    """
89
    string = string.strip()
90
    if string.startswith("["):
91
        string = string[1:-1]
92
        var = []
93
        for s in string.split(","):
94
            s = s.strip()
95
            val = string_to_val(s, vtype)
96
            var.append(val)
97
        return var
98
99
    if vtype == ua.VariantType.Null:
100
        val = None
101
    elif vtype == ua.VariantType.Boolean:
102
        if string in ("True", "true", "on", "On", "1"):
103
            val = True
104
        else:
105
            val = False
106
    elif vtype in (ua.VariantType.SByte, ua.VariantType.Int16, ua.VariantType.Int32, ua.VariantType.Int64):
107
        if not string:
108
            val = 0
109
        else:
110
            val = int(string)
111
    elif vtype in (ua.VariantType.Byte, ua.VariantType.UInt16, ua.VariantType.UInt32, ua.VariantType.UInt64):
112
        if not string:
113
            val = 0
114
        else:
115
            val = int(string)
116
    elif vtype in (ua.VariantType.Float, ua.VariantType.Double):
117
        if not string:
118
            val = 0.0
119
        else:
120
            val = float(string)
121
    elif vtype == ua.VariantType.XmlElement:
122
        val = ua.XmlElement(string)
123
    elif vtype == ua.VariantType.String:
124
        val = string
125
    elif vtype == ua.VariantType.ByteString:
126
        val = string.encode()
127
    elif vtype in (ua.VariantType.NodeId, ua.VariantType.ExpandedNodeId):
128
        val = ua.NodeId.from_string(string)
129
    elif vtype == ua.VariantType.QualifiedName:
130
        val = ua.QualifiedName.from_string(string)
131
    elif vtype == ua.VariantType.DateTime:
132
        val = parser.parse(string)
133
    elif vtype == ua.VariantType.LocalizedText:
134
        val = ua.LocalizedText.from_string(string)
135
    elif vtype == ua.VariantType.StatusCode:
136
        val = ua.StatusCode(string)
137
    elif vtype == ua.VariantType.Guid:
138
        val = uuid.UUID(string)
139
    else:
140
        # FIXME: Some types are probably missing!
141
        raise NotImplementedError
142
    return val
143
144
145
def string_to_variant(string, vtype):
146
    """
147
    convert back a string to an ua.Variant
148
    """
149
    return ua.Variant(string_to_val(string, vtype), vtype)
150
151
152
async def get_node_children(node, nodes=None):
153
    """
154
    Get recursively all children of a node
155
    """
156
    if nodes is None:
157
        nodes = [node]
158
    for child in await node.get_children():
159
        nodes.append(child)
160
        await get_node_children(child, nodes)
161
    return nodes
162
163
164
async def get_node_subtypes(node, nodes=None):
165
    if nodes is None:
166
        nodes = [node]
167
    for child in await node.get_children(refs=ua.ObjectIds.HasSubtype):
168
        nodes.append(child)
169
        await get_node_subtypes(child, nodes)
170
    return nodes
171
172
173
async def get_node_supertypes(node, includeitself=False, skipbase=True):
174
    """
175
    return get all subtype parents of node recursive
176
    :param node: can be a ua.Node or ua.NodeId
177
    :param includeitself: include also node to the list
178
    :param skipbase don't include the toplevel one
179
    :returns list of ua.Node, top parent first
180
    """
181
    parents = []
182
    if includeitself:
183
        parents.append(node)
184
    parents.extend(await _get_node_supertypes(node))
185
    if skipbase and len(parents) > 1:
186
        parents = parents[:-1]
187
    return parents
188
189
190
async def _get_node_supertypes(node):
191
    """
192
    recursive implementation of get_node_derived_from_types
193
    """
194
    basetypes = []
195
    parent = await get_node_supertype(node)
196
    if parent:
197
        basetypes.append(parent)
198
        basetypes.extend(await _get_node_supertypes(parent))
199
200
    return basetypes
201
202
203
async def get_node_supertype(node):
204
    """
205
    return node supertype or None
206
    """
207
    supertypes = await node.get_referenced_nodes(
208
        refs=ua.ObjectIds.HasSubtype, direction=ua.BrowseDirection.Inverse, includesubtypes=True
209
    )
210
    if supertypes:
211
        return supertypes[0]
212
    else:
213
        return None
214
215
216
async def is_child_present(node, browsename):
217
    """
218
    return if a browsename is present a child from the provide node
219
    :param node: node wherein to find the browsename
220
    :param browsename: browsename to search
221
    :returns returne True if the browsename is present else False
222
    """
223
    child_descs = await node.get_children_descriptions()
224
    for child_desc in child_descs:
225
        if child_desc.BrowseName == browsename:
226
            return True
227
    return False
228
229
230
async def data_type_to_variant_type(dtype_node):
231
    """
232
    Given a Node datatype, find out the variant type to encode
233
    data. This is not exactly straightforward...
234
    """
235
    base = await get_base_data_type(dtype_node)
236
    if base.nodeid.Identifier != 29:
237
        return ua.VariantType(base.nodeid.Identifier)
238
    else:
239
        # we have an enumeration, value is a Int32
240
        return ua.VariantType.Int32
241
242
243
async def get_base_data_type(datatype):
244
    """
245
    Looks up the base datatype of the provided datatype Node
246
    The base datatype is either:
247
    A primitive type (ns=0, i<=21) or a complex one (ns=0 i>21 and i<=30) like Enum and Struct.
248
249
    Args:
250
        datatype: NodeId of a datype of a variable
251
    Returns:
252
        NodeId of datatype base or None in case base datype can not be determined
253
    """
254
    base = datatype
255
    while base:
256
        if base.nodeid.NamespaceIndex == 0 and isinstance(base.nodeid.Identifier, int) and base.nodeid.Identifier <= 30:
257
            return base
258
        base = await get_node_supertype(base)
259
    raise ua.UaError("Datatype must be a subtype of builtin types {0!s}".format(datatype))
260
261
262
async def get_nodes_of_namespace(server, namespaces=None):
263
    """
264
    Get the nodes of one or more namespaces .
265
    Args:
266
        server: opc ua server to use
267
        namespaces: list of string uri or int indexes of the namespace to export
268
    Returns:
269
        List of nodes that are part of the provided namespaces
270
    """
271
    if namespaces is None:
272
        namespaces = []
273
    ns_available = await server.get_namespace_array()
274
275
    if not namespaces:
276
        namespaces = ns_available[1:]
277
    elif isinstance(namespaces, (str, int)):
278
        namespaces = [namespaces]
279
280
    # make sure all namespace are indexes (if needed convert strings to indexes)
281
    namespace_indexes = [n if isinstance(n, int) else ns_available.index(n) for n in namespaces]
282
283
    # filter nodeis based on the provide namespaces and convert the nodeid to a node
284
    nodes = [
285
        server.get_node(nodeid) for nodeid in server.iserver.aspace.keys()
286
        if nodeid.NamespaceIndex != 0 and nodeid.NamespaceIndex in namespace_indexes
287
    ]
288
    return nodes
289
290
291
def get_default_value(uatype):
292
    if isinstance(uatype, ua.VariantType):
293
        return ua.get_default_values(uatype)
294
    elif hasattr(ua.VariantType, uatype):
295
        return ua.get_default_value(getattr(ua.VariantType, uatype))
296
    else:
297
        return getattr(ua, uatype)()
298
299
300
def data_type_to_string(dtype):
301
    # we could just display browse name of node but it requires a query
302
    if dtype.NamespaceIndex == 0 and dtype.Identifier in ua.ObjectIdNames:
303
        string = ua.ObjectIdNames[dtype.Identifier]
304
    else:
305
        string = dtype.to_string()
306
    return string
307