async_rx.observable.rx_defer   A
last analyzed

Complexity

Total Complexity 1

Size/Duplication

Total Lines 33
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 1
eloc 9
dl 0
loc 33
rs 10
c 0
b 0
f 0

1 Function

Rating   Name   Duplication   Size   Complexity  
A rx_defer() 0 26 1
1
from ..protocol import Observable, ObservableFactory, Observer, Subscription
2
from .rx_create import rx_create
3
4
__all__ = ["rx_defer"]
5
6
7
def rx_defer(observable_factory: ObservableFactory) -> Observable:
8
    """Create an observable when a subscription occurs.
9
10
    Defer allows you to create the Observable only when the Observer subscribes,
11
    and create a fresh Observable for each Observer.
12
13
    It waits until an Observer subscribes to it, and then it generates an Observable,
14
    typically with an Observable factory function.
15
    It does this afresh for each subscriber, so although each subscriber may think
16
    it is subscribing to the same Observable,
17
    in fact each subscriber gets its own individual Observable.
18
19
    Args:
20
        observable_factory (ObservableFactory): observable factory
21
22
    Returns:
23
        (Observable): an observable instance wich create observable
24
            when subscription occurs.
25
26
    """
27
28
    async def _subscribe(an_observer: Observer) -> Subscription:
29
        _obs = await observable_factory()
30
        return await _obs.subscribe(an_observer)
31
32
    return rx_create(subscribe=_subscribe)
33