async_btree.leaf.condition()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 14
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 2
dl 0
loc 14
rs 10
c 0
b 0
f 0
1
"""Leaf definition."""
2
3
from .decorator import is_success
4
from .definition import AsyncInnerFunction, CallableFunction, ControlFlowException, node_metadata
5
from .utils import to_async
6
7
__all__ = ['action', 'condition']
8
9
10
def action(target: CallableFunction, **kwargs) -> AsyncInnerFunction:
11
    """Declare an action leaf.
12
13
    Action is an awaitable closure of specified function,
14
    (See alias function).
15
16
    Args:
17
        target (CallableFunction): awaitable function
18
        kwargs: optional kwargs argument to pass on target function
19
20
    Returns:
21
        (AsyncInnerFunction): an awaitable function.
22
23
    Raises:
24
        ControlFlowException : if error occurs
25
26
    """
27
28
    _target = to_async(target)
29
30
    @node_metadata(properties=['_target'])
31
    async def _action():
32
        try:
33
            return await _target(**kwargs)
34
        except Exception as e:
35
            raise ControlFlowException.instanciate(e)
36
37
    return _action
38
39
40
def condition(target: CallableFunction, **kwargs) -> AsyncInnerFunction:
41
    """Declare a condition leaf.
42
43
    Condition is an awaitable closure of specified function.
44
45
    Args:
46
        target (CallableFunction):  awaitable function which be evaluated as True/False.
47
        kwargs: optional kwargs argument to pass on target function
48
49
    Returns:
50
        (AsyncInnerFunction): an awaitable function.
51
    """
52
53
    return node_metadata(name='condition', properties=['target'])(is_success(action(target=target, **kwargs)))
54