async_rx.observable.rx_merge_map   A
last analyzed

Complexity

Total Complexity 2

Size/Duplication

Total Lines 42
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 2
eloc 19
dl 0
loc 42
rs 10
c 0
b 0
f 0

1 Function

Rating   Name   Duplication   Size   Complexity  
A rx_merge_map() 0 31 2
1
from inspect import iscoroutinefunction
2
from typing import Any, Callable
3
4
from ..protocol import Observable, Observer, Subscription, rx_observer_from
5
from .rx_create import rx_create
6
from .rx_merge import rx_merge
7
8
__all__ = ["rx_merge_map"]
9
10
11
def rx_merge_map(*observables: Observable, transform: Callable) -> Observable:
12
    """Merge map operator.
13
14
    rx_merge_map allows asynchronous queries, resulting in an observable of observables and it flattens the results.
15
    There may be multiple inner observables that run simultaneously, so the results from these inner observables may be intertwined.
16
17
    Args:
18
        observables (Observable): a list of observable instance
19
        transform (Callable): transform function (sync or async)
20
21
    Returns:
22
        (Observable): observable instance
23
24
    """
25
26
    _is_awaitable = iscoroutinefunction(transform)
27
    _source = rx_merge(*observables)
28
29
    async def _subscribe(an_observer: Observer) -> Subscription:
30
        nonlocal _source
31
32
        async def _on_next(item: Any):
33
            nonlocal _is_awaitable
34
            if _is_awaitable:
35
                await an_observer.on_next(await transform(item))
36
            else:
37
                await an_observer.on_next(transform(item))
38
39
        return await _source.subscribe(rx_observer_from(observer=an_observer, on_next=_on_next))
40
41
    return rx_create(subscribe=_subscribe, max_observer=1)
42