async_rx.observable.rx_map.rx_map()   B
last analyzed

Complexity

Conditions 6

Size

Total Lines 40
Code Lines 14

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 14
dl 0
loc 40
rs 8.6666
c 0
b 0
f 0
cc 6
nop 4
1
from inspect import iscoroutinefunction
2
from typing import Any, Callable, Optional
3
4
from ..protocol import Observable, Observer, Subscription, rx_observer_from
5
from .rx_create import rx_create
6
7
__all__ = ["rx_map"]
8
9
10
def rx_map(
11
    observable: Observable, transform: Callable, expand_arg_parameters: Optional[bool] = False, expand_kwarg_parameters: Optional[bool] = False
12
) -> Observable:
13
    """Map operator.
14
15
    Map operator modifies an Observable<A> into Observable<B> given a function with the type A->B.
16
17
    For example, if we take the function x => 10 ∗ x and a list of 1,2,3. The result is 10,20,30, see figure 4.
18
    Note that this function did not change the type of the Observable but did change the values.
19
20
    Args:
21
        observable (Observable): an observable instance
22
        transform (Callable): transform function (sync or async)
23
        expand_arg_parameters (Optional[bool]): if true each item will be expanded as args before call transform
24
            (implique expand_kwarg_parameters = False).
25
        expand_kwarg_parameters (Optional[bool]): if true each item will be expanded as kwargs before call transform.
26
27
    Returns:
28
        (Observable): observable instance
29
30
    """
31
32
    _is_awaitable = iscoroutinefunction(transform)
33
34
    async def _subscribe(an_observer: Observer) -> Subscription:
35
        async def _on_next(item: Any):
36
            nonlocal _is_awaitable
37
38
            if expand_kwarg_parameters:
39
                _next_item = await transform(**item) if _is_awaitable else transform(**item)
40
            elif expand_arg_parameters:
41
                _next_item = await transform(*item) if _is_awaitable else transform(*item)
42
            else:
43
                _next_item = await transform(item) if _is_awaitable else transform(item)
44
45
            await an_observer.on_next(_next_item)
46
47
        return await observable.subscribe(rx_observer_from(observer=an_observer, on_next=_on_next))
48
49
    return rx_create(subscribe=_subscribe)
50