async_rx.observable.rx_throw.rx_throw()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 17
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 5
dl 0
loc 17
rs 10
c 0
b 0
f 0
cc 1
nop 1
1
from typing import Any
2
3
from ..protocol import Observable, Observer, Subscription, default_subscription
4
from .rx_create import rx_create
5
6
__all__ = ["rx_throw"]
7
8
9
def rx_throw(error: Any) -> Observable:
10
    """Create an observable wich always call error.
11
12
    Args:
13
        error (Union[Any, Exception]): the error to observe
14
15
    Returns:
16
        (Observable): observable instance.
17
18
    """
19
20
    async def _subscribe(an_observer: Observer) -> Subscription:
21
        await an_observer.on_error(err=error)
22
23
        return default_subscription
24
25
    return rx_create(subscribe=_subscribe)
26