async_rx.observable.rx_merge   A
last analyzed

Complexity

Total Complexity 9

Size/Duplication

Total Lines 90
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 9
eloc 45
dl 0
loc 90
rs 10
c 0
b 0
f 0

1 Function

Rating   Name   Duplication   Size   Complexity  
C rx_merge() 0 79 9
1
from typing import Any, List, NoReturn, Optional
2
3
import curio
4
5
from ..protocol import Observable, Observer, Subscription, rx_observer
6
from .rx_create import rx_create
7
8
__all__ = ["rx_merge"]
9
10
11
def rx_merge(*observables: Observable) -> Observable:
12
    """Flattens multiple Observables together by blending their values into one Observable.
13
14
    Creates an output Observable which concurrently emits all values
15
    from every given input Observable.
16
    'merge' subscribes to each given input Observable (either the source or
17
    an Observable given as argument), and simply forwards (without doing any
18
    transformation) all the values from all the input Observables to the output
19
    Observable.
20
    The output Observable only completes once all input Observables have completed.
21
    Any error delivered by an input Observable will be immediately emitted on
22
    the output Observable.
23
24
    Args:
25
        observables (Observable): a list of observable instance
26
27
    Returns:
28
        (Observable): observable instance
29
30
    Raise:
31
        (RuntimeError): if #observables < 1
32
33
    """
34
    if len(observables) < 1:
35
        raise RuntimeError("#observables must be greather than 1")
36
37
    terminated_observable = 0
38
    deliver_next = True
39
    subscriptions: List[Subscription] = []
40
41
    async def _subscription_handler() -> None:
42
        nonlocal subscriptions
43
        for s in subscriptions:
44
            await s()
45
46
    async def _subscribe(an_observer: Observer) -> Subscription:
47
        nonlocal subscriptions
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable subscriptions does not seem to be defined.
Loading history...
48
49
        async def _on_next(item: Any) -> None:
50
            # filter item according to deliver_next
51
            nonlocal deliver_next
52
53
            if deliver_next:  # if no previous error
54
                await an_observer.on_next(item)
55
            return None
56
57
        async def _on_completed() -> None:
58
            nonlocal terminated_observable, deliver_next
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable deliver_next does not seem to be defined.
Loading history...
Comprehensibility Best Practice introduced by
The variable terminated_observable does not seem to be defined.
Loading history...
59
60
            if deliver_next:  # if no previous error
61
                terminated_observable += 1
62
                if terminated_observable == len(observables):  # and all observable complete
63
                    # lock on_next, on_error handler call and other on_completed call.
64
                    deliver_next = False
65
                    await an_observer.on_completed()
66
            return None
67
68
        async def _on_error(err: Any) -> Optional[NoReturn]:
69
            nonlocal deliver_next
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable deliver_next does not seem to be defined.
Loading history...
70
71
            if deliver_next:
72
                # lock on_next, on_completed handler call and other on_error call.
73
                deliver_next = False
74
                await an_observer.on_error(err)
75
            return None
76
77
        # local observer definition
78
        _observer = rx_observer(on_next=_on_next, on_completed=_on_completed, on_error=_on_error)
79
80
        # local observer subscribe to all observables in parallele
81
        _tasks = []
82
        async with curio.TaskGroup(wait=all) as g:
83
            for an_observable in observables:
84
                _tasks.append(await g.spawn(an_observable.subscribe, _observer))
85
        subscriptions = [t.result for t in _tasks]
86
87
        return _subscription_handler
88
89
    return rx_create(subscribe=_subscribe, max_observer=1)
90