async_rx.observable.rx_empty.rx_empty()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 15
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 5
dl 0
loc 15
rs 10
c 0
b 0
f 0
cc 1
nop 0
1
from ..protocol import Observable, Observer, Subscription, default_subscription
2
from .rx_create import rx_create
3
4
__all__ = ["rx_empty"]
5
6
7
def rx_empty() -> Observable:
8
    """Create an empty Observable.
9
10
    An "empty" Observable emits only the complete notification.
11
12
    Returns:
13
        (Observable) observable instance
14
15
    """
16
17
    async def _subscribe(an_observer: Observer) -> Subscription:
18
        await an_observer.on_completed()
19
        return default_subscription
20
21
    return rx_create(subscribe=_subscribe)
22