async_rx.observable.rx_debounce   A
last analyzed

Complexity

Total Complexity 9

Size/Duplication

Total Lines 90
Duplicated Lines 86.67 %

Importance

Changes 0
Metric Value
wmc 9
eloc 53
dl 78
loc 90
rs 10
c 0
b 0
f 0

1 Function

Rating   Name   Duplication   Size   Complexity  
C rx_debounce() 78 78 9

How to fix   Duplicated Code   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

1
from datetime import datetime, timedelta
2
from typing import Any, Optional
3
4
import curio
5
6
from ..protocol import Observable, Observer, Subscription, rx_observer
7
from .rx_create import rx_create
8
9
__all__ = ["rx_debounce"]
10
11
12 View Code Duplication
def rx_debounce(an_observable: Observable, duration: timedelta) -> Observable:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
13
    """Debounce operator.
14
15
    Debounce are used to rate-limit the sequence.
16
    Debounce will delay a value when it arrives and only emits the last value in a burst of events
17
    after the set delay is over and no new event arrives during this delay.
18
19
    Args:
20
        an_observable (Observable): an observable instance
21
        duration (timedelta): timedelta of interval (the duration)
22
23
    Returns:
24
        (Observable): observable instance
25
26
    Raise:
27
        (RuntimeError): if no observable or duration are provided
28
29
    """
30
    if not an_observable or not duration:
31
        raise RuntimeError("observable and duration are mandatory")
32
33
    async def _subscribe(an_observer: Observer) -> Subscription:
34
35
        _lastest_value_time = None
36
        _lastest_value = None
37
        _consumer_task = None
38
        _subscription: Optional[Subscription] = None
39
        _sleep_duration = duration.total_seconds()
40
41
        async def consumer():
42
            nonlocal _sleep_duration, _lastest_value, _lastest_value_time
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable _lastest_value_time does not seem to be defined.
Loading history...
43
            try:
44
                while True:
45
                    await curio.sleep(_sleep_duration)  # add duration delay before process a new one
46
47
                    if _lastest_value_time and (_lastest_value_time + duration <= datetime.utcnow()):  # no value between time delta
48
                        await an_observer.on_next(_lastest_value)
49
                        _lastest_value_time = None
50
51
            except curio.TaskCancelled:
52
                # it's time to finish
53
                pass
54
55
        async def _on_next(item: Any):
56
            nonlocal _lastest_value, _lastest_value_time
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable _lastest_value_time does not seem to be defined.
Loading history...
Comprehensibility Best Practice introduced by
The variable _lastest_value does not seem to be defined.
Loading history...
57
            _lastest_value = item
58
            _lastest_value_time = datetime.utcnow()
59
60
        async def _cancel_consumer():
61
            nonlocal _consumer_task
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable _consumer_task does not seem to be defined.
Loading history...
62
            if _consumer_task:
63
                await _consumer_task.cancel()
64
                _consumer_task = None
65
66
        async def _on_completed():
67
            nonlocal _consumer_task
68
            await _cancel_consumer()
69
            await an_observer.on_completed()
70
71
        async def _on_error(err: Any):
72
            nonlocal _consumer_task
73
            await _cancel_consumer()
74
            await an_observer.on_error(err=err)
75
76
        async def _subscribe():
77
            nonlocal _consumer_task, _subscription
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable _subscription does not seem to be defined.
Loading history...
78
            await _cancel_consumer()
79
            if _subscription:
80
                await _subscription()
81
                _subscription = None
82
83
        _consumer_task = await curio.spawn(consumer())
84
85
        _subscription = await an_observable.subscribe(rx_observer(on_next=_on_next, on_error=_on_error, on_completed=_on_completed))
86
87
        return _subscribe
88
89
    return rx_create(subscribe=_subscribe, max_observer=1)
90