async_rx.observable.rx_empty   A
last analyzed

Complexity

Total Complexity 1

Size/Duplication

Total Lines 22
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 1
eloc 9
dl 0
loc 22
rs 10
c 0
b 0
f 0

1 Function

Rating   Name   Duplication   Size   Complexity  
A rx_empty() 0 15 1
1
from ..protocol import Observable, Observer, Subscription, default_subscription
2
from .rx_create import rx_create
3
4
__all__ = ["rx_empty"]
5
6
7
def rx_empty() -> Observable:
8
    """Create an empty Observable.
9
10
    An "empty" Observable emits only the complete notification.
11
12
    Returns:
13
        (Observable) observable instance
14
15
    """
16
17
    async def _subscribe(an_observer: Observer) -> Subscription:
18
        await an_observer.on_completed()
19
        return default_subscription
20
21
    return rx_create(subscribe=_subscribe)
22