1
|
|
|
from typing import Any, List, NoReturn, Optional |
2
|
|
|
|
3
|
|
|
import curio |
4
|
|
|
|
5
|
|
|
from ..protocol import Observable, Observer, Subscription, disposable_subscription_on_cancel, rx_observer |
6
|
|
|
from ..subject import rx_subject |
7
|
|
|
from .rx_create import rx_create |
8
|
|
|
from .rx_first import rx_first |
9
|
|
|
|
10
|
|
|
__all__ = ["rx_amb"] |
11
|
|
|
|
12
|
|
|
|
13
|
|
|
def rx_amb(*observables: Observable) -> Observable: |
14
|
|
|
"""Amb operator. |
15
|
|
|
|
16
|
|
|
The Amb operator (stands for ambiguous), alias race, subscribes to a number of observables |
17
|
|
|
and retrieves the first observable that yields a value, closing off all others. |
18
|
|
|
For example, Amb can automatically select the best server to download from: Amb listens to both servers |
19
|
|
|
and the first server that replies is used. |
20
|
|
|
|
21
|
|
|
Args: |
22
|
|
|
observables (Observable): a list of observable instance |
23
|
|
|
|
24
|
|
|
Returns: |
25
|
|
|
(Observable): observable instance |
26
|
|
|
|
27
|
|
|
Raise: |
28
|
|
|
(RuntimeError): if #observables < 1 |
29
|
|
|
|
30
|
|
|
""" |
31
|
|
|
|
32
|
|
|
if len(observables) < 1: |
33
|
|
|
raise RuntimeError("#observables must be greather than 1") |
34
|
|
|
|
35
|
|
|
async def _subscribe(an_observer: Observer) -> Subscription: |
36
|
|
|
|
37
|
|
|
_subject = rx_subject() |
38
|
|
|
|
39
|
|
|
# we send the first |
40
|
|
|
_first_subscription: Subscription = await rx_first(observable=_subject).subscribe(an_observer) |
41
|
|
|
|
42
|
|
|
# subscribe to all observables in parallele |
43
|
|
|
_subscriptions: List[Subscription] = [] |
44
|
|
|
_tasks = [] |
45
|
|
|
async with curio.TaskGroup(wait=all) as g: |
46
|
|
|
for an_observable in observables: |
47
|
|
|
_tasks.append(await g.spawn(_build_observer_and_subscribe, an_observable, _subject)) |
48
|
|
|
_subscriptions = [t.result for t in _tasks] |
49
|
|
|
|
50
|
|
|
async def _subscription_handler(): |
51
|
|
|
nonlocal _first_subscription, _subscriptions |
52
|
|
|
if _first_subscription: |
53
|
|
|
await _first_subscription() |
54
|
|
|
for _unsub in _subscriptions: |
55
|
|
|
if _unsub: |
56
|
|
|
await _unsub() |
57
|
|
|
|
58
|
|
|
return _subscription_handler |
59
|
|
|
|
60
|
|
|
return rx_create(subscribe=_subscribe, max_observer=1) |
61
|
|
|
|
62
|
|
|
|
63
|
|
|
async def _build_observer_and_subscribe(an_observable: Observable, an_observer: Observer): |
64
|
|
|
_observer = await _observer_for(an_observable=an_observable, an_observer=an_observer) |
65
|
|
|
return await disposable_subscription_on_cancel(an_observable=an_observable, an_observer=_observer) |
66
|
|
|
|
67
|
|
|
|
68
|
|
|
async def _observer_for(an_observable: Observable, an_observer: Observer): |
69
|
|
|
"""Build an observer that send observable when respond.""" |
70
|
|
|
|
71
|
|
|
async def _on_next(item: Any) -> None: |
72
|
|
|
await an_observer.on_next(an_observable) |
73
|
|
|
return None |
74
|
|
|
|
75
|
|
|
async def _on_completed() -> None: |
76
|
|
|
await an_observer.on_next(an_observable) |
77
|
|
|
await an_observer.on_completed() |
78
|
|
|
return None |
79
|
|
|
|
80
|
|
|
async def _on_error(err: Any) -> Optional[NoReturn]: |
81
|
|
|
return None |
82
|
|
|
|
83
|
|
|
return rx_observer(on_next=_on_next, on_completed=_on_completed, on_error=_on_error) |
84
|
|
|
|