Completed
Push — master ( a1fcd1...4af13f )
by Guibert
13s queued 11s
created

async_btree.utils   A

Complexity

Total Complexity 15

Size/Duplication

Total Lines 114
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 37
dl 0
loc 114
rs 10
c 0
b 0
f 0
wmc 15

4 Functions

Rating   Name   Duplication   Size   Complexity  
B afilter() 0 28 6
A to_async() 0 20 3
A amap() 0 26 4
A run() 0 11 1
1
"""Utility function."""
2
from inspect import iscoroutinefunction
3
from typing import Any, AsyncGenerator, AsyncIterable, Awaitable, Callable, Iterable, TypeVar, Union
4
5
from .definition import CallableFunction, node_metadata
6
7
__all__ = ['amap', 'afilter', 'run', 'to_async']
8
9
T = TypeVar('T')
10
11
12
async def amap(
13
    corofunc: Callable[[Any], Awaitable[T]], iterable: Union[AsyncIterable, Iterable]
14
) -> AsyncGenerator[T, None]:
15
    """Map an async function onto an iterable or an async iterable.
16
17
    This simplify writing of mapping a function on something iterable
18
    between 'async for ...' and 'for...' .
19
20
    Args:
21
        corofunc (Callable[[Any], Awaitable[T]]): coroutine function
22
        iterable (Union[AsyncIterable, Iterable]): iterable or async iterable collection
23
            which will be applied.
24
25
    Returns:
26
        AsyncGenerator[T]: an async iterator of corofunc(item)
27
28
    Example:
29
        ```[i async for i in amap(inc, afilter(even, [0, 1, 2, 3, 4]))]```
30
31
    """
32
    if isinstance(iterable, AsyncIterable):  # if hasattr(iterable, '__aiter__'):
33
        async for item in iterable:
34
            yield await corofunc(item)
35
    else:
36
        for item in iterable:
37
            yield await corofunc(item)
38
39
40
async def afilter(
41
    corofunc: Callable[[Any], Awaitable[bool]], iterable: Union[AsyncIterable, Iterable]
42
) -> AsyncGenerator[T, None]:
43
    """Filter an iterable or an async iterable with an async function.
44
45
    This simplify writing of filtering by a function on something iterable
46
    between 'async for ...' and 'for...' .
47
48
    Args:
49
        corofunc (Callable[[Any], Awaitable[bool]]): filter async function
50
        iterable (Union[AsyncIterable, Iterable]): iterable or async iterable collection
51
            which will be applied.
52
53
    Returns:
54
        (AsyncGenerator[T]): an async iterator of item which satisfy corofunc(item) == True
55
56
    Example:
57
        ```[i async for i in amap(inc, afilter(even, [0, 1, 2, 3, 4]))]```
58
59
    """
60
    if isinstance(iterable, AsyncIterable):  # if hasattr(iterable, '__aiter__'):
61
        async for item in iterable:
62
            if await corofunc(item):
63
                yield item
64
    else:
65
        for item in iterable:
66
            if await corofunc(item):
67
                yield item
68
69
70
def to_async(target: CallableFunction) -> Callable[..., Awaitable[Any]]:
71
    """Transform target function in async function if necessary.
72
73
    Args:
74
        target (CallableFunction): function to transform in async if necessary
75
76
    Returns:
77
        (Callable[..., Awaitable[Any]]): an async version of target function
78
    """
79
80
    if iscoroutinefunction(target):
81
        # nothing todo
82
        return target
83
84
    # use node_metadata to keep trace of target function name
85
    @node_metadata(name=target.__name__.lstrip("_") if hasattr(target, "__name__") else "anonymous")
86
    async def _to_async(*args, **kwargs):
87
        return target(*args, **kwargs)
88
89
    return _to_async
90
91
92
try:
93
    # TOOD this is not ncessary with curio 1.4
94
    import curio  # noqa: F401
95
    from contextvars import copy_context
96
97
    def run(kernel, target, *args):
98
        """Curio run with independent contextvars.
99
100
        This mimic asyncio framework behaviour.
101
102
        ```
103
        copy_context().run(kernel.run, target, *args)
104
        ```
105
106
        """
107
        return copy_context().run(kernel.run, target, *args)
108
109
110
except Exception:  # pragma: no cover
111
112
    def run(kernel, target, *args):
113
        raise RuntimeError('curio not installed!')
114