async_rx.protocol.observer   A
last analyzed

Complexity

Total Complexity 9

Size/Duplication

Total Lines 79
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 9
eloc 22
dl 0
loc 79
rs 10
c 0
b 0
f 0

5 Functions

Rating   Name   Duplication   Size   Complexity  
A default_on_completed() 0 10 1
A ignore_error_handler() 0 3 1
A default_error() 0 9 2
A rx_observer_from() 0 19 4
A rx_observer() 0 16 1
1
"""Observer utilities."""
2
from collections import namedtuple
3
from typing import Any, NoReturn, Optional
4
5
from .definition import CompleteHandler, ErrorHandler, NextHandler, Observer
6
7
__all__ = ["rx_observer", "rx_observer_from", "default_on_completed", "default_error", "ignore_error_handler"]
8
9
10
ObserverDefinition = namedtuple("ObserverDefinition", ["on_next", "on_error", "on_completed"])
11
"""Implements Observer Protocol."""
12
13
14
async def default_on_completed() -> None:  # pragma: no cover
15
    """Default on complet handler.
16
17
    No operation.
18
19
    Returns:
20
        (None): nothing
21
22
    """
23
    pass
24
25
26
async def default_error(err: Any) -> NoReturn:
27
    """Always raise error.
28
29
    It's our default error handler implementation.
30
    """
31
    if isinstance(err, BaseException):
32
        raise err
33
34
    raise Exception(err)
35
36
37
async def ignore_error_handler(err: Any) -> None:  # pragma: no cover
38
    """Always ignore error."""
39
    pass
40
41
42
def rx_observer(on_next: NextHandler, on_error: ErrorHandler = default_error, on_completed: CompleteHandler = default_on_completed) -> Observer:
43
    """Return an observer.
44
45
    The underlying implementation use an named tuple.
46
47
    Args:
48
        on_next (NextHandler): on_next handler which process items
49
        on_error (ErrorHandler): on_error handler (default with default_error
50
            which raise Exception)
51
        on_completed (CompleteHandler): on_completed handler (default with noop)
52
53
    Returns:
54
        (Observer): an Observer
55
56
    """
57
    return ObserverDefinition(on_next=on_next, on_error=on_error, on_completed=on_completed)
58
59
60
def rx_observer_from(
61
    observer: Observer, on_next: Optional[NextHandler] = None, on_error: Optional[ErrorHandler] = None, on_completed: Optional[CompleteHandler] = None
62
) -> Observer:
63
    """Build an observer from another one.
64
65
    Args:
66
        observer (Observer): the observer to override
67
        on_next (Optional[NextHandler]): override on_next handler if set
68
        on_error (Optional[ErrorHandler]): override on_error handler if set
69
        on_completed (Optional[CompleteHandler]): override on_completed handler if set
70
71
    Returns:
72
        (Observer): an Observer
73
74
    """
75
    return rx_observer(
76
        on_next=on_next if on_next else observer.on_next,
77
        on_error=on_error if on_error else observer.on_error,
78
        on_completed=on_completed if on_completed else observer.on_completed,
79
    )
80