async_rx.observable.rx_avg   A
last analyzed

Complexity

Total Complexity 2

Size/Duplication

Total Lines 42
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 2
eloc 21
dl 0
loc 42
rs 10
c 0
b 0
f 0

1 Function

Rating   Name   Duplication   Size   Complexity  
A rx_avg() 0 32 2
1
from typing import Any
2
3
from ..protocol import Observable, Observer, Subscription, rx_observer_from
4
from .rx_create import rx_create
5
from .rx_reduce import rx_reduce
6
7
__all__ = ["rx_avg"]
8
9
10
def rx_avg(observable: Observable) -> Observable:
11
    """Create an observable wich return the average items in the source when completes.
12
13
    Args:
14
        observable (observable): the observable source
15
16
    Returns:
17
        (Observable): observable instance
18
19
    """
20
21
    _count = 0
22
23
    async def accumulator(current, item):
24
        nonlocal _count
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable _count does not seem to be defined.
Loading history...
25
        _count += 1
26
        return current + item
27
28
    async def _subscribe(an_observer: Observer) -> Subscription:
29
30
        reducer = rx_reduce(observable=observable, accumulator=accumulator, seed=0)
31
32
        async def _on_next(item: Any):
33
            nonlocal _count
34
            if _count == 0:
35
                await an_observer.on_error('No value emitted')
36
            else:
37
                await an_observer.on_next(item / _count)
38
39
        return await reducer.subscribe(an_observer=rx_observer_from(observer=an_observer, on_next=_on_next))
40
41
    return rx_create(subscribe=_subscribe)
42