async_rx.observable.rx_reduce   A
last analyzed

Complexity

Total Complexity 2

Size/Duplication

Total Lines 45
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 2
eloc 21
dl 0
loc 45
rs 10
c 0
b 0
f 0

1 Function

Rating   Name   Duplication   Size   Complexity  
A rx_reduce() 0 33 2
1
from inspect import iscoroutinefunction
2
from typing import Any, Optional, TypeVar
3
4
from ..protocol import AccumulatorOperator, Observable, Observer, Subscription, rx_observer_from
5
from .rx_create import rx_create
6
7
__all__ = ["rx_reduce"]
8
9
T = TypeVar('T')
10
11
12
def rx_reduce(observable: Observable, accumulator: AccumulatorOperator, seed: Optional[Any] = None) -> Observable:
13
    """Create an observable which reduce source with accumulator and seed value.
14
15
    Args:
16
        observable (Observable): source
17
        accumulator (AccumulatorOperator): accumulator function (two argument, one result) async or sync.
18
        seed (Optional[Any]): optional seed value (default none)
19
20
    Returns:
21
        (Observable): a new observable
22
23
    """
24
25
    is_awaitable = iscoroutinefunction(accumulator)
26
27
    async def _subscribe(an_observer: Observer) -> Subscription:
28
        nonlocal is_awaitable
29
30
        _buffer = seed
31
32
        async def _on_next(item: Any):
33
            nonlocal _buffer
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable _buffer does not seem to be defined.
Loading history...
34
            _buffer = await accumulator(_buffer, item) if is_awaitable else accumulator(_buffer, item)
35
36
        async def _on_completed():
37
            nonlocal _buffer
38
39
            await an_observer.on_next(_buffer)
40
            await an_observer.on_completed()
41
42
        return await observable.subscribe(an_observer=rx_observer_from(observer=an_observer, on_next=_on_next, on_completed=_on_completed))
43
44
    return rx_create(subscribe=_subscribe)
45