Passed
Push — master ( 5b4bf6...e09c16 )
by Olivier
02:23
created

asyncua.common.methods._format_call_inputs()   A

Complexity

Conditions 2

Size

Total Lines 8
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 7
nop 2
dl 0
loc 8
rs 10
c 0
b 0
f 0
1
"""
2
High level method related functions
3
"""
4
5
from asyncio import iscoroutinefunction
6
7
from asyncua import ua
8
9
10
async def call_method(parent, methodid, *args):
11
    """
12
    Call an OPC-UA method. methodid is browse name of child method or the
13
    nodeid of method as a NodeId object
14
    arguments are variants or python object convertible to variants.
15
    which may be of different types
16
    returns a list of values or a single value depending on the output of the method
17
    : param: parent `Node`
18
    """
19
    result = await call_method_full(parent, methodid, *args)
20
21
    if len(result.OutputArguments) == 0:
22
        return None
23
    elif len(result.OutputArguments) == 1:
24
        return result.OutputArguments[0]
25
    else:
26
        return result.OutputArguments
27
28
29
async def call_method_full(parent, methodid, *args):
30
    """
31
    Call an OPC-UA method. methodid is browse name of child method or the
32
    nodeid of method as a NodeId object
33
    arguments are variants or python object convertible to variants.
34
    which may be of different types
35
    returns a CallMethodResult object with converted OutputArguments
36
    : param: parent `Node`
37
    """
38
    if isinstance(methodid, (str, ua.uatypes.QualifiedName)):
39
        methodid = (await parent.get_child(methodid)).nodeid
40
    elif hasattr(methodid, 'nodeid'):
41
        methodid = methodid.nodeid
42
43
    result = await _call_method(parent.server, parent.nodeid, methodid, to_variant(*args))
44
    result.OutputArguments = [var.Value for var in result.OutputArguments]
45
    return result
46
47
48
async def _call_method(server, parentnodeid, methodid, arguments):
49
    """
50
    :param server: `UaClient` or `InternalSession`
51
    :param parentnodeid:
52
    :param methodid:
53
    :param arguments:
54
    :return:
55
    """
56
    request = ua.CallMethodRequest()
57
    request.ObjectId = parentnodeid
58
    request.MethodId = methodid
59
    request.InputArguments = arguments
60
    methodstocall = [request]
61
    results = await server.call(methodstocall)
62
    res = results[0]
63
    res.StatusCode.check()
64
    return res
65
66
67
def uamethod(func):
68
    """
69
    Method decorator to automatically convert
70
    arguments and output to and from variants
71
    """
72
73
    if iscoroutinefunction(func):
74
        async def wrapper(parent, *args):
75
            func_args = _format_call_inputs(parent, *args)
76
            result = await func(*func_args)
77
            return _format_call_outputs(result)
78
79
    else:
80
        def wrapper(parent, *args):
81
            func_args = _format_call_inputs(parent, *args)
82
            result = func(*func_args)
83
            return _format_call_outputs(result)
84
    return wrapper
85
86
87
def _format_call_inputs(parent, *args):
88
        if isinstance(parent, ua.NodeId):
89
            return (parent, *[arg.Value for arg in args])
90
        else:
91
            self = parent
92
            parent = args[0]
93
            args = args[1:]
94
        return (self, parent, *[arg.Value for arg in args])
95
96
97
def _format_call_outputs(result):
98
        if result is None:
99
            return []
100
        elif isinstance(result, ua.CallMethodResult):
101
            result.OutputArguments = to_variant(*result.OutputArguments)
102
            return result
103
        elif isinstance(result, ua.StatusCode):
104
            return result
105
        elif isinstance(result, tuple):
106
            return to_variant(*result)
107
        else:
108
            return to_variant(result)
109
110
111
def to_variant(*args):
112
    uaargs = []
113
    for arg in args:
114
        if not isinstance(arg, ua.Variant):
115
            arg = ua.Variant(arg)
116
        uaargs.append(arg)
117
    return uaargs
118