async_rx.observable.rx_throttle   A
last analyzed

Complexity

Total Complexity 5

Size/Duplication

Total Lines 46
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 5
eloc 19
dl 0
loc 46
rs 10
c 0
b 0
f 0

1 Function

Rating   Name   Duplication   Size   Complexity  
A rx_throttle() 0 36 5
1
from datetime import datetime, timedelta
2
from typing import Any
3
4
from ..protocol import Observable, Observer, Subscription, rx_observer_from
5
from .rx_create import rx_create
6
7
__all__ = ["rx_throttle"]
8
9
10
def rx_throttle(observable: Observable, duration: timedelta) -> Observable:
11
    """Throttle operator.
12
13
    Throttle are used to rate-limit the sequence.
14
    They will filter out elements based on the timing.
15
16
    Throttle will emit the first event from a burst and will ignore all subsequent values that arrive during the set timeout
17
18
    Args:
19
        observable (Observable): an observable instance
20
        duration (timedelta): timedelta of interval (the duration)
21
22
    Returns:
23
        (Observable): observable instance
24
25
    Raise:
26
        (RuntimeError): if no observable or duration are provided
27
28
    """
29
    if not observable or not duration:
30
        raise RuntimeError("observable and duration are mandatory")
31
32
    async def _subscribe(an_observer: Observer) -> Subscription:
33
34
        _last_send_item = None
35
36
        async def _on_next(item: Any):
37
            nonlocal _last_send_item
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable _last_send_item does not seem to be defined.
Loading history...
38
            _now = datetime.utcnow()
39
            if not _last_send_item or _last_send_item + duration <= _now:
40
                _last_send_item = _now
41
                await an_observer.on_next(item)
42
43
        return await observable.subscribe(rx_observer_from(observer=an_observer, on_next=_on_next))
44
45
    return rx_create(subscribe=_subscribe, max_observer=1)
46