async_rx.observable.rx_throw   A
last analyzed

Complexity

Total Complexity 1

Size/Duplication

Total Lines 26
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 1
eloc 10
dl 0
loc 26
rs 10
c 0
b 0
f 0

1 Function

Rating   Name   Duplication   Size   Complexity  
A rx_throw() 0 17 1
1
from typing import Any
2
3
from ..protocol import Observable, Observer, Subscription, default_subscription
4
from .rx_create import rx_create
5
6
__all__ = ["rx_throw"]
7
8
9
def rx_throw(error: Any) -> Observable:
10
    """Create an observable wich always call error.
11
12
    Args:
13
        error (Union[Any, Exception]): the error to observe
14
15
    Returns:
16
        (Observable): observable instance.
17
18
    """
19
20
    async def _subscribe(an_observer: Observer) -> Subscription:
21
        await an_observer.on_error(err=error)
22
23
        return default_subscription
24
25
    return rx_create(subscribe=_subscribe)
26