async_rx.observable.rx_defer.rx_defer()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 26
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 5
dl 0
loc 26
rs 10
c 0
b 0
f 0
cc 1
nop 1
1
from ..protocol import Observable, ObservableFactory, Observer, Subscription
2
from .rx_create import rx_create
3
4
__all__ = ["rx_defer"]
5
6
7
def rx_defer(observable_factory: ObservableFactory) -> Observable:
8
    """Create an observable when a subscription occurs.
9
10
    Defer allows you to create the Observable only when the Observer subscribes,
11
    and create a fresh Observable for each Observer.
12
13
    It waits until an Observer subscribes to it, and then it generates an Observable,
14
    typically with an Observable factory function.
15
    It does this afresh for each subscriber, so although each subscriber may think
16
    it is subscribing to the same Observable,
17
    in fact each subscriber gets its own individual Observable.
18
19
    Args:
20
        observable_factory (ObservableFactory): observable factory
21
22
    Returns:
23
        (Observable): an observable instance wich create observable
24
            when subscription occurs.
25
26
    """
27
28
    async def _subscribe(an_observer: Observer) -> Subscription:
29
        _obs = await observable_factory()
30
        return await _obs.subscribe(an_observer)
31
32
    return rx_create(subscribe=_subscribe)
33