async_btree.control   A
last analyzed

Complexity

Total Complexity 14

Size/Duplication

Total Lines 148
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 55
dl 0
loc 148
rs 10
c 0
b 0
f 0
wmc 14

5 Functions

Rating   Name   Duplication   Size   Complexity  
A selector() 0 3 1
A fallback() 0 14 1
A repeat_until() 0 25 2
A decision() 0 32 4
B sequence() 0 56 6
1
"""Control function definition."""
2
from typing import Any, List, Optional
3
4
from .definition import FAILURE, SUCCESS, AsyncInnerFunction, CallableFunction, node_metadata
5
from .utils import to_async
6
7
__all__ = ['sequence', 'fallback', 'selector', 'decision', 'repeat_until']
8
9
10
def sequence(children: List[CallableFunction], succes_threshold: Optional[int] = None) -> AsyncInnerFunction:
11
    """Return a function which execute children in sequence.
12
13
    succes_threshold parameter generalize traditional sequence/fallback and
14
    must be in [0, len(children)]. Default value is (-1) means len(children)
15
16
    if #success = succes_threshold, return a success
17
18
    if #failure = len(children) - succes_threshold, return a failure
19
20
    What we can return as value and keep sematic Failure/Success:
21
     - an array of previous result when success
22
     - last failure when fail
23
24
    Args:
25
        children (List[CallableFunction]): list of Awaitable
26
        succes_threshold (int): succes threshold value
27
28
    Returns:
29
        (AsyncInnerFunction): an awaitable function.
30
31
    Raises:
32
        (AssertionError): if succes_threshold is invalid
33
    """
34
    _succes_threshold = succes_threshold or len(children)
35
    if not (0 <= _succes_threshold <= len(children)):
36
        raise AssertionError('succes_threshold')
37
38
    failure_threshold = len(children) - _succes_threshold + 1
39
40
    _children = [to_async(child) for child in children]
41
42
    @node_metadata(properties=['_succes_threshold'])
43
    async def _sequence():
44
        success = 0
45
        failure = 0
46
        results = []
47
48
        for child in _children:
49
            last_result = await child()
50
            results.append(last_result)
51
52
            if bool(last_result):
53
                success += 1
54
                if success == _succes_threshold:
55
                    # last evaluation is a success
56
                    return results
57
            else:
58
                failure += 1
59
                if failure == failure_threshold:
60
                    # last evaluation is a failure
61
                    return last_result
62
        # should be never reached
63
        return FAILURE
64
65
    return _sequence
66
67
68
def fallback(children: List[CallableFunction]) -> AsyncInnerFunction:
69
    """Execute tasks in sequence and succeed if one succeed or failed if all failed.
70
71
    Often named 'selector', children can be seen as an ordered list
72
        starting from higthest priority to lowest priority.
73
74
    Args:
75
        children (List[CallableFunction]): list of Awaitable
76
77
    Returns:
78
        (AsyncInnerFunction): an awaitable function.
79
    """
80
81
    return node_metadata(name='fallback')(sequence(children, succes_threshold=min(1, len(children))))
82
83
84
def selector(children: List[CallableFunction]) -> AsyncInnerFunction:
85
    """Synonym of fallback."""
86
    return node_metadata(name='selector')(fallback(children))
87
88
89
def decision(
90
    condition: CallableFunction, success_tree: CallableFunction, failure_tree: Optional[CallableFunction] = None
91
) -> AsyncInnerFunction:
92
    """Create a decision node.
93
94
    If condition is meet, return evaluation of success_tree.
95
    Otherwise, it return SUCCESS or evaluation of failure_tree if setted.
96
97
    Args:
98
        condition (CallableFunction): awaitable condition
99
        success_tree (CallableFunction): awaitable success tree which be
100
            evaluated if cond is Truthy
101
        failure_tree (CallableFunction): awaitable failure tree which be
102
            evaluated if cond is Falsy (None per default)
103
104
    Returns:
105
        (AsyncInnerFunction): an awaitable function.
106
    """
107
108
    _condition = to_async(condition)
109
    _success_tree = to_async(success_tree)
110
    _failure_tree = to_async(failure_tree) if failure_tree else None
111
112
    @node_metadata(edges=['_condition', '_success_tree', '_failure_tree'])
113
    async def _decision():
114
        if bool(await _condition()):
115
            return await _success_tree()
116
        if _failure_tree:
117
            return await _failure_tree()
118
        return SUCCESS
119
120
    return _decision
121
122
123
def repeat_until(condition: CallableFunction, child: CallableFunction) -> AsyncInnerFunction:
124
    """Repeat child evaluation until condition is truthy.
125
126
    Return last child evaluation or FAILURE if no evaluation occurs.
127
128
    Args:
129
        condition (CallableFunction): awaitable condition
130
        child (CallableFunction): awaitable child
131
132
    Returns:
133
        (AsyncInnerFunction): an awaitable function.
134
    """
135
136
    _child = to_async(child)
137
    _condition = to_async(condition)
138
139
    @node_metadata(edges=['_condition', '_child'])
140
    async def _repeat_until():
141
        result: Any = FAILURE
142
        while bool(await _condition()):
143
            result = await _child()
144
145
        return result
146
147
    return _repeat_until
148