async_rx.observable.rx_amb.rx_amb()   B
last analyzed

Complexity

Conditions 7

Size

Total Lines 48
Code Lines 21

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 21
dl 0
loc 48
rs 7.9759
c 0
b 0
f 0
cc 7
nop 1
1
from typing import Any, List, NoReturn, Optional
2
3
import curio
4
5
from ..protocol import Observable, Observer, Subscription, disposable_subscription_on_cancel, rx_observer
6
from ..subject import rx_subject
7
from .rx_create import rx_create
8
from .rx_first import rx_first
9
10
__all__ = ["rx_amb"]
11
12
13
def rx_amb(*observables: Observable) -> Observable:
14
    """Amb operator.
15
16
    The Amb operator (stands for ambiguous), alias race, subscribes to a number of observables
17
    and retrieves the first observable that yields a value, closing off all others.
18
    For example, Amb can automatically select the best server to download from: Amb listens to both servers
19
    and the first server that replies is used.
20
21
    Args:
22
        observables (Observable): a list of observable instance
23
24
    Returns:
25
        (Observable): observable instance
26
27
    Raise:
28
        (RuntimeError): if #observables < 1
29
30
    """
31
32
    if len(observables) < 1:
33
        raise RuntimeError("#observables must be greather than 1")
34
35
    async def _subscribe(an_observer: Observer) -> Subscription:
36
37
        _subject = rx_subject()
38
39
        # we send the first
40
        _first_subscription: Subscription = await rx_first(observable=_subject).subscribe(an_observer)
41
42
        # subscribe to all observables in parallele
43
        _subscriptions: List[Subscription] = []
44
        _tasks = []
45
        async with curio.TaskGroup(wait=all) as g:
46
            for an_observable in observables:
47
                _tasks.append(await g.spawn(_build_observer_and_subscribe, an_observable, _subject))
48
        _subscriptions = [t.result for t in _tasks]
49
50
        async def _subscription_handler():
51
            nonlocal _first_subscription, _subscriptions
52
            if _first_subscription:
53
                await _first_subscription()
54
            for _unsub in _subscriptions:
55
                if _unsub:
56
                    await _unsub()
57
58
        return _subscription_handler
59
60
    return rx_create(subscribe=_subscribe, max_observer=1)
61
62
63
async def _build_observer_and_subscribe(an_observable: Observable, an_observer: Observer):
64
    _observer = await _observer_for(an_observable=an_observable, an_observer=an_observer)
65
    return await disposable_subscription_on_cancel(an_observable=an_observable, an_observer=_observer)
66
67
68
async def _observer_for(an_observable: Observable, an_observer: Observer):
69
    """Build an observer that send observable when respond."""
70
71
    async def _on_next(item: Any) -> None:
72
        await an_observer.on_next(an_observable)
73
        return None
74
75
    async def _on_completed() -> None:
76
        await an_observer.on_next(an_observable)
77
        await an_observer.on_completed()
78
        return None
79
80
    async def _on_error(err: Any) -> Optional[NoReturn]:
81
        return None
82
83
    return rx_observer(on_next=_on_next, on_completed=_on_completed, on_error=_on_error)
84