async_rx.observable.rx_merge.rx_merge()   C
last analyzed

Complexity

Conditions 9

Size

Total Lines 79
Code Lines 39

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 39
dl 0
loc 79
rs 6.6106
c 0
b 0
f 0
cc 9
nop 1

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
from typing import Any, List, NoReturn, Optional
2
3
import curio
4
5
from ..protocol import Observable, Observer, Subscription, rx_observer
6
from .rx_create import rx_create
7
8
__all__ = ["rx_merge"]
9
10
11
def rx_merge(*observables: Observable) -> Observable:
12
    """Flattens multiple Observables together by blending their values into one Observable.
13
14
    Creates an output Observable which concurrently emits all values
15
    from every given input Observable.
16
    'merge' subscribes to each given input Observable (either the source or
17
    an Observable given as argument), and simply forwards (without doing any
18
    transformation) all the values from all the input Observables to the output
19
    Observable.
20
    The output Observable only completes once all input Observables have completed.
21
    Any error delivered by an input Observable will be immediately emitted on
22
    the output Observable.
23
24
    Args:
25
        observables (Observable): a list of observable instance
26
27
    Returns:
28
        (Observable): observable instance
29
30
    Raise:
31
        (RuntimeError): if #observables < 1
32
33
    """
34
    if len(observables) < 1:
35
        raise RuntimeError("#observables must be greather than 1")
36
37
    terminated_observable = 0
38
    deliver_next = True
39
    subscriptions: List[Subscription] = []
40
41
    async def _subscription_handler() -> None:
42
        nonlocal subscriptions
43
        for s in subscriptions:
44
            await s()
45
46
    async def _subscribe(an_observer: Observer) -> Subscription:
47
        nonlocal subscriptions
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable subscriptions does not seem to be defined.
Loading history...
48
49
        async def _on_next(item: Any) -> None:
50
            # filter item according to deliver_next
51
            nonlocal deliver_next
52
53
            if deliver_next:  # if no previous error
54
                await an_observer.on_next(item)
55
            return None
56
57
        async def _on_completed() -> None:
58
            nonlocal terminated_observable, deliver_next
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable deliver_next does not seem to be defined.
Loading history...
Comprehensibility Best Practice introduced by
The variable terminated_observable does not seem to be defined.
Loading history...
59
60
            if deliver_next:  # if no previous error
61
                terminated_observable += 1
62
                if terminated_observable == len(observables):  # and all observable complete
63
                    # lock on_next, on_error handler call and other on_completed call.
64
                    deliver_next = False
65
                    await an_observer.on_completed()
66
            return None
67
68
        async def _on_error(err: Any) -> Optional[NoReturn]:
69
            nonlocal deliver_next
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable deliver_next does not seem to be defined.
Loading history...
70
71
            if deliver_next:
72
                # lock on_next, on_completed handler call and other on_error call.
73
                deliver_next = False
74
                await an_observer.on_error(err)
75
            return None
76
77
        # local observer definition
78
        _observer = rx_observer(on_next=_on_next, on_completed=_on_completed, on_error=_on_error)
79
80
        # local observer subscribe to all observables in parallele
81
        _tasks = []
82
        async with curio.TaskGroup(wait=all) as g:
83
            for an_observable in observables:
84
                _tasks.append(await g.spawn(an_observable.subscribe, _observer))
85
        subscriptions = [t.result for t in _tasks]
86
87
        return _subscription_handler
88
89
    return rx_create(subscribe=_subscribe, max_observer=1)
90