Issues (58)

async_rx/protocol/observable.py (2 issues)

1
"""Observable utilities."""
2
3
from collections import namedtuple
4
from typing import Any, NoReturn, Optional
5
6
from .definition import Observable, Observer, Subscribe, Subscription
7
from .observer import rx_observer
8
9
__all__ = ["observable", "ensure_observable_contract_operator"]
10
11
ObservableDefinition = namedtuple("ObservableDefinition", "subscribe")
12
"""Implements Observable Protocol."""
13
14
15
def observable(subscribe: Subscribe, ensure_contract: Optional[bool] = True) -> Observable:
16
    """Build an observable.
17
18
    The underlying implementation use an named tuple.
19
20
    Args:
21
        subscribe (Subscribe): subcribe function to use on observable
22
        ensure_contract (bool): boolean flag (default True) to ensure that
23
            this observable will follow Observable contract.
24
25
    Returns:
26
        (Observable): an observable
27
28
    Raise:
29
        (RuntimeError): if subscribe parameter is undefined
30
31
    """
32
    if subscribe is None:
33
        raise RuntimeError('a subscribe function must be provided')
34
35
    async def _subscribe(an_observer: Observer) -> Subscription:
36
        if ensure_contract:
37
            return await subscribe(ensure_observable_contract_operator(an_observer))
38
        return await subscribe(an_observer)
39
40
    return ObservableDefinition(subscribe=_subscribe)
41
42
43
def ensure_observable_contract_operator(an_observer: Observer) -> Observer:
44
    """Ensure Observable Grammar or Contract.
45
46
    ```next*(error|complete)?```
47
48
    In an Observable Execution, zero to infinite Next notifications may be delivered.
49
    If either an Error or Complete notification is delivered, then nothing else
50
    can be delivered afterwards.
51
52
    Args;
53
        an_observer (Observer): the observer which must be ensure that subscribed
54
            observable follow contract.
55
56
    Returns:
57
        (Observer): an Observer which follow the contract.
58
59
    """
60
    _deliver_next = True
61
62
    async def _on_next(item: Any) -> None:
63
        nonlocal _deliver_next
64
65
        if _deliver_next:
66
            await an_observer.on_next(item)
67
68
    async def _on_error(err: Any) -> Optional[NoReturn]:  # type: ignore
69
        nonlocal _deliver_next
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable _deliver_next does not seem to be defined.
Loading history...
70
71
        if _deliver_next:
72
            _deliver_next = False
73
            await an_observer.on_error(err)
74
75
    async def _on_completed() -> None:
76
        nonlocal _deliver_next
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable _deliver_next does not seem to be defined.
Loading history...
77
78
        if _deliver_next:
79
            _deliver_next = False
80
            await an_observer.on_completed()
81
82
    return rx_observer(on_next=_on_next, on_error=_on_error, on_completed=_on_completed)
83