async_rx.observable.rx_map   A
last analyzed

Complexity

Total Complexity 6

Size/Duplication

Total Lines 50
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 6
eloc 20
dl 0
loc 50
rs 10
c 0
b 0
f 0

1 Function

Rating   Name   Duplication   Size   Complexity  
B rx_map() 0 40 6
1
from inspect import iscoroutinefunction
2
from typing import Any, Callable, Optional
3
4
from ..protocol import Observable, Observer, Subscription, rx_observer_from
5
from .rx_create import rx_create
6
7
__all__ = ["rx_map"]
8
9
10
def rx_map(
11
    observable: Observable, transform: Callable, expand_arg_parameters: Optional[bool] = False, expand_kwarg_parameters: Optional[bool] = False
12
) -> Observable:
13
    """Map operator.
14
15
    Map operator modifies an Observable<A> into Observable<B> given a function with the type A->B.
16
17
    For example, if we take the function x => 10 ∗ x and a list of 1,2,3. The result is 10,20,30, see figure 4.
18
    Note that this function did not change the type of the Observable but did change the values.
19
20
    Args:
21
        observable (Observable): an observable instance
22
        transform (Callable): transform function (sync or async)
23
        expand_arg_parameters (Optional[bool]): if true each item will be expanded as args before call transform
24
            (implique expand_kwarg_parameters = False).
25
        expand_kwarg_parameters (Optional[bool]): if true each item will be expanded as kwargs before call transform.
26
27
    Returns:
28
        (Observable): observable instance
29
30
    """
31
32
    _is_awaitable = iscoroutinefunction(transform)
33
34
    async def _subscribe(an_observer: Observer) -> Subscription:
35
        async def _on_next(item: Any):
36
            nonlocal _is_awaitable
37
38
            if expand_kwarg_parameters:
39
                _next_item = await transform(**item) if _is_awaitable else transform(**item)
40
            elif expand_arg_parameters:
41
                _next_item = await transform(*item) if _is_awaitable else transform(*item)
42
            else:
43
                _next_item = await transform(item) if _is_awaitable else transform(item)
44
45
            await an_observer.on_next(_next_item)
46
47
        return await observable.subscribe(rx_observer_from(observer=an_observer, on_next=_on_next))
48
49
    return rx_create(subscribe=_subscribe)
50