async_rx.observable.rx_forward   A
last analyzed

Complexity

Total Complexity 3

Size/Duplication

Total Lines 39
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 3
eloc 17
dl 0
loc 39
rs 10
c 0
b 0
f 0

1 Function

Rating   Name   Duplication   Size   Complexity  
A rx_forward() 0 30 3
1
from typing import Any
2
3
from ..protocol import Observable, Observer, Subscription, rx_observer
4
from .rx_create import rx_create
5
6
__all__ = ["rx_forward"]
7
8
9
def rx_forward(observable: Observable, except_complet: bool = False, except_error: bool = False) -> Observable:
10
    """Create an observable wich forward event.
11
12
    Args:
13
        observable (Observable): observable source
14
        except_complet (bool): if true then did not forward 'on_complet' (default: {False})
15
        except_error (bool): if true then did not forward 'on_error' (default: {False})
16
17
    Returns:
18
        (Observable): observable instance.
19
20
    """
21
22
    async def _dummy_on_completed():
23
        pass
24
25
    async def _dummy_on_error(err: Any):
26
        pass
27
28
    async def _subscribe(an_observer: Observer) -> Subscription:
29
30
        return await observable.subscribe(
31
            an_observer=rx_observer(
32
                on_next=an_observer.on_next,
33
                on_error=_dummy_on_error if except_error else an_observer.on_error,
34
                on_completed=_dummy_on_completed if except_complet else an_observer.on_completed,
35
            )
36
        )
37
38
    return rx_create(subscribe=_subscribe)
39