async_rx.observable.rx_forward.rx_forward()   A
last analyzed

Complexity

Conditions 3

Size

Total Lines 30
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 12
dl 0
loc 30
rs 9.8
c 0
b 0
f 0
cc 3
nop 3
1
from typing import Any
2
3
from ..protocol import Observable, Observer, Subscription, rx_observer
4
from .rx_create import rx_create
5
6
__all__ = ["rx_forward"]
7
8
9
def rx_forward(observable: Observable, except_complet: bool = False, except_error: bool = False) -> Observable:
10
    """Create an observable wich forward event.
11
12
    Args:
13
        observable (Observable): observable source
14
        except_complet (bool): if true then did not forward 'on_complet' (default: {False})
15
        except_error (bool): if true then did not forward 'on_error' (default: {False})
16
17
    Returns:
18
        (Observable): observable instance.
19
20
    """
21
22
    async def _dummy_on_completed():
23
        pass
24
25
    async def _dummy_on_error(err: Any):
26
        pass
27
28
    async def _subscribe(an_observer: Observer) -> Subscription:
29
30
        return await observable.subscribe(
31
            an_observer=rx_observer(
32
                on_next=an_observer.on_next,
33
                on_error=_dummy_on_error if except_error else an_observer.on_error,
34
                on_completed=_dummy_on_completed if except_complet else an_observer.on_completed,
35
            )
36
        )
37
38
    return rx_create(subscribe=_subscribe)
39