async_rx.observable.rx_group_by.rx_group_by()   B
last analyzed

Complexity

Conditions 6

Size

Total Lines 50
Code Lines 27

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 27
dl 0
loc 50
rs 8.2986
c 0
b 0
f 0
cc 6
nop 2
1
from inspect import iscoroutinefunction
2
from typing import Any, Callable, Dict
3
4
from ..protocol import Observable, Observer, Subject, Subscription, rx_observer
5
from ..subject import rx_subject
6
from .rx_create import rx_create
7
8
__all__ = ["rx_group_by"]
9
10
11
def rx_group_by(observable: Observable, key_selector: Callable) -> Observable:
12
    """Group by operator.
13
14
    Similar to Window, GroupBy projects the sequence onto a number of inner observables
15
    but as opposite to Window where all windows receive the same sequence,
16
    GroupBy will emit elements only to one inner observable that is associated
17
    with the current element based on a key selector function.
18
    The observer receive tuple with (key, subject).
19
20
    Args:
21
        observable (Observable): observable instance
22
        key_selector (Callable): key selector function (sync/async) [Any]->[Any]
23
24
    Returns:
25
        (Observable): observable instance
26
27
    """
28
    _is_awaitable = iscoroutinefunction(key_selector)
29
30
    async def _subscribe(an_observer: Observer) -> Subscription:
31
32
        _observables: Dict[Any, Subject] = {}
33
34
        async def _on_next(item: Any):
35
            nonlocal _observables, _is_awaitable
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable _observables does not seem to be defined.
Loading history...
36
            key = await key_selector(item) if _is_awaitable else key_selector(item)
37
            if key not in _observables:
38
                _observables[key] = rx_subject()
39
                await an_observer.on_next((key, _observables[key]))
40
            await _observables[key].on_next(item)
41
42
        async def _on_completed():
43
            nonlocal _observables
44
            for _, o in _observables.items():
45
                await o.on_completed()
46
            await an_observer.on_completed()
47
48
        async def _on_error(err: Any):
49
            nonlocal _observables
50
            for _, o in _observables.items():
51
                try:
52
                    await o.on_error(err=err)
53
                except Exception:  # pragma: no cover
54
                    pass
55
            await an_observer.on_error(err=err)
56
            return None
57
58
        return await observable.subscribe(rx_observer(on_next=_on_next, on_completed=_on_completed, on_error=_on_error))
59
60
    return rx_create(subscribe=_subscribe, max_observer=1)
61