async_rx.observable.rx_zip   A
last analyzed

Complexity

Total Complexity 8

Size/Duplication

Total Lines 92
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 8
eloc 53
dl 0
loc 92
rs 10
c 0
b 0
f 0

1 Function

Rating   Name   Duplication   Size   Complexity  
C rx_zip() 0 74 8
1
"""rx_zip module.
2
3
Note on this implementation:
4
 - Initial implementation is not on my own: the idea of using all, enumerate, list of done was found by someone else.
5
 - I was unable to retreive where I see it, and who wrote this heart of this algorithm.
6
   If someone reconize this, I will be really happy to set author and reference on this part.
7
8
"""
9
10
from typing import Any, List, NoReturn, Optional
11
12
from ..protocol import CompleteHandler, NextHandler, Observable, Observer, Subscription, rx_observer
13
from .rx_create import rx_create
14
15
__all__ = ["rx_zip"]
16
17
18
def rx_zip(*observables: Observable) -> Observable:
19
    """Combine multiple Observables to create an Observable.
20
21
    The Obsevable values are calculated from the values, in order,
22
    of each of its input Observables.
23
24
    Args:
25
        (Observable): a list of observable instance
26
27
    Returns:
28
        (Observable): observable instance
29
30
    """
31
32
    async def _subscribe(an_observer: Observer) -> Subscription:
33
34
        subscriptions: List[Subscription] = []
35
        sources = list(observables)
36
        n = len(sources)
37
        queues: List[List] = [[] for _ in range(n)]
38
        _disposable = True  # error or completed not send
39
        _is_done = [False] * n
40
41
        async def _subscription_handler() -> None:
42
            nonlocal subscriptions
43
            for s in subscriptions:
44
                await s()
45
46
        def _on_completed(i: int) -> CompleteHandler:
47
            async def __on_completed():
48
                nonlocal _disposable, _is_done
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable _disposable does not seem to be defined.
Loading history...
Comprehensibility Best Practice introduced by
The variable _is_done does not seem to be defined.
Loading history...
49
                _is_done[i] = True
50
                if _disposable and all(_is_done):
51
                    await an_observer.on_completed()
52
                    _disposable = False
53
54
            return __on_completed
55
56
        async def _on_error(err: Any) -> Optional[NoReturn]:
57
            nonlocal _disposable
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable _disposable does not seem to be defined.
Loading history...
58
            _disposable = False
59
            return await an_observer.on_error(err=err)
60
61
        async def _on_next_tuple(i: int) -> None:
62
            nonlocal _disposable, queues, _is_done
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable _disposable does not seem to be defined.
Loading history...
63
64
            if all(len(q) for q in queues):
65
                try:
66
                    queued_values = [x.pop(0) for x in queues]
67
                    await an_observer.on_next(tuple(queued_values))
68
                except Exception as ex:  # pragma: no cover
69
                    await _on_error(ex)
70
            elif all(x for j, x in enumerate(_is_done) if j != i):
71
                _disposable = False
72
                await an_observer.on_completed()
73
74
        def _on_next(i: int) -> NextHandler:
75
            async def __on_next(item: Any) -> None:
76
                nonlocal _disposable, queues
77
                if _disposable:
78
                    queues[i].append(item)
79
                    await _on_next_tuple(i)
80
81
            return __on_next
82
83
        # observer factory
84
        def _observer_factory(i: int) -> Observer:
85
            return rx_observer(on_next=_on_next(i), on_completed=_on_completed(i), on_error=_on_error)
86
87
        subscriptions = [await an_observable.subscribe(_observer_factory(i)) for i, an_observable in enumerate(sources)]
88
89
        return _subscription_handler
90
91
    return rx_create(subscribe=_subscribe, max_observer=1)
92