async_btree.parallele.parallele()   A
last analyzed

Complexity

Conditions 4

Size

Total Lines 36
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 4
eloc 13
nop 2
dl 0
loc 36
rs 9.75
c 0
b 0
f 0
1
"""Curiosity module define special construct with curio framework."""
2
3
from typing import List, Optional
4
5
from .definition import AsyncInnerFunction, CallableFunction, node_metadata
6
from .utils import to_async
7
8
__all__ = ['parallele']
9
10
11
try:
12
    from curio import TaskGroup
13
14
    def parallele(children: List[CallableFunction], succes_threshold: Optional[int] = None) -> AsyncInnerFunction:
15
        """Return an awaitable function which run children in parallele.
16
17
        `succes_threshold` parameter generalize traditional sequence/fallback,
18
        and must be in [0, len(children)], default value is len(children)
19
20
        if #success = succes_threshold, return a success
21
22
        if #failure = len(children) - succes_threshold, return a failure
23
24
        Args:
25
            children (List[CallableFunction]): list of Awaitable
26
            succes_threshold (int): succes threshold value, default len(children)
27
28
        Returns:
29
            (AsyncInnerFunction): an awaitable function.
30
31
        """
32
        _succes_threshold = succes_threshold or len(children)
33
        if not (0 <= _succes_threshold <= len(children)):
34
            raise AssertionError('succes_threshold')
35
36
        _children = [to_async(child) for child in children]
37
38
        @node_metadata(properties=['_succes_threshold'])
39
        async def _parallele():
40
41
            async with TaskGroup(wait=all) as g:
42
                for child in _children:
43
                    await g.spawn(child)
44
45
            success = len(list(filter(bool, g.results)))
46
47
            return success >= _succes_threshold
48
49
        return _parallele
50
51
52
except Exception:  # pragma: no cover
53
    # default to a simple sequence
54
    from .control import sequence as parallele
55