async_rx.observable.rx_merge_map.rx_merge_map()   A
last analyzed

Complexity

Conditions 2

Size

Total Lines 31
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 12
dl 0
loc 31
rs 9.8
c 0
b 0
f 0
cc 2
nop 2
1
from inspect import iscoroutinefunction
2
from typing import Any, Callable
3
4
from ..protocol import Observable, Observer, Subscription, rx_observer_from
5
from .rx_create import rx_create
6
from .rx_merge import rx_merge
7
8
__all__ = ["rx_merge_map"]
9
10
11
def rx_merge_map(*observables: Observable, transform: Callable) -> Observable:
12
    """Merge map operator.
13
14
    rx_merge_map allows asynchronous queries, resulting in an observable of observables and it flattens the results.
15
    There may be multiple inner observables that run simultaneously, so the results from these inner observables may be intertwined.
16
17
    Args:
18
        observables (Observable): a list of observable instance
19
        transform (Callable): transform function (sync or async)
20
21
    Returns:
22
        (Observable): observable instance
23
24
    """
25
26
    _is_awaitable = iscoroutinefunction(transform)
27
    _source = rx_merge(*observables)
28
29
    async def _subscribe(an_observer: Observer) -> Subscription:
30
        nonlocal _source
31
32
        async def _on_next(item: Any):
33
            nonlocal _is_awaitable
34
            if _is_awaitable:
35
                await an_observer.on_next(await transform(item))
36
            else:
37
                await an_observer.on_next(transform(item))
38
39
        return await _source.subscribe(rx_observer_from(observer=an_observer, on_next=_on_next))
40
41
    return rx_create(subscribe=_subscribe, max_observer=1)
42