Code Duplication    Length = 77-78 lines in 2 locations

async_rx/observable/rx_debounce.py 1 location

@@ 12-89 (lines=78) @@
9
__all__ = ["rx_debounce"]
10
11
12
def rx_debounce(an_observable: Observable, duration: timedelta) -> Observable:
13
    """Debounce operator.
14
15
    Debounce are used to rate-limit the sequence.
16
    Debounce will delay a value when it arrives and only emits the last value in a burst of events
17
    after the set delay is over and no new event arrives during this delay.
18
19
    Args:
20
        an_observable (Observable): an observable instance
21
        duration (timedelta): timedelta of interval (the duration)
22
23
    Returns:
24
        (Observable): observable instance
25
26
    Raise:
27
        (RuntimeError): if no observable or duration are provided
28
29
    """
30
    if not an_observable or not duration:
31
        raise RuntimeError("observable and duration are mandatory")
32
33
    async def _subscribe(an_observer: Observer) -> Subscription:
34
35
        _lastest_value_time = None
36
        _lastest_value = None
37
        _consumer_task = None
38
        _subscription: Optional[Subscription] = None
39
        _sleep_duration = duration.total_seconds()
40
41
        async def consumer():
42
            nonlocal _sleep_duration, _lastest_value, _lastest_value_time
43
            try:
44
                while True:
45
                    await curio.sleep(_sleep_duration)  # add duration delay before process a new one
46
47
                    if _lastest_value_time and (_lastest_value_time + duration <= datetime.utcnow()):  # no value between time delta
48
                        await an_observer.on_next(_lastest_value)
49
                        _lastest_value_time = None
50
51
            except curio.TaskCancelled:
52
                # it's time to finish
53
                pass
54
55
        async def _on_next(item: Any):
56
            nonlocal _lastest_value, _lastest_value_time
57
            _lastest_value = item
58
            _lastest_value_time = datetime.utcnow()
59
60
        async def _cancel_consumer():
61
            nonlocal _consumer_task
62
            if _consumer_task:
63
                await _consumer_task.cancel()
64
                _consumer_task = None
65
66
        async def _on_completed():
67
            nonlocal _consumer_task
68
            await _cancel_consumer()
69
            await an_observer.on_completed()
70
71
        async def _on_error(err: Any):
72
            nonlocal _consumer_task
73
            await _cancel_consumer()
74
            await an_observer.on_error(err=err)
75
76
        async def _subscribe():
77
            nonlocal _consumer_task, _subscription
78
            await _cancel_consumer()
79
            if _subscription:
80
                await _subscription()
81
                _subscription = None
82
83
        _consumer_task = await curio.spawn(consumer())
84
85
        _subscription = await an_observable.subscribe(rx_observer(on_next=_on_next, on_error=_on_error, on_completed=_on_completed))
86
87
        return _subscribe
88
89
    return rx_create(subscribe=_subscribe, max_observer=1)
90

async_rx/observable/rx_sample.py 1 location

@@ 12-88 (lines=77) @@
9
__all__ = ["rx_sample"]
10
11
12
def rx_sample(observable: Observable, duration: timedelta) -> Observable:
13
    """Sample operator used to rate-limit the sequence.
14
15
    Sample filter out elements based on the timing.
16
    Sample will emit the LATEST value on a set interval or emit nothing if no new value arrived during the last interval.
17
18
19
    Args:
20
        observable (Observable): an observable instance
21
        duration (timedelta): timedelta of interval (the duration)
22
23
    Returns:
24
        (Observable): observable instance
25
26
    Raise:
27
        (RuntimeError): if no observable or duration are provided
28
29
    """
30
    if not observable or not duration:
31
        raise RuntimeError("observable and duration are mandatory")
32
33
    async def _subscribe(an_observer: Observer) -> Subscription:
34
35
        _receive_value = False
36
        _lastest_value = None
37
        _consumer_task = None
38
        _subscription: Optional[Subscription] = None
39
        _duration = duration.total_seconds()
40
41
        async def consumer():
42
            nonlocal _duration, _lastest_value, _receive_value
43
            try:
44
                while True:
45
                    await curio.sleep(_duration)  # add duration delay before process a new one
46
                    if _receive_value:
47
                        await an_observer.on_next(_lastest_value)
48
                        _receive_value = False
49
50
            except curio.TaskCancelled:
51
                # it's time to finish
52
                pass
53
54
        async def _on_next(item: Any):
55
            nonlocal _lastest_value, _receive_value
56
            _lastest_value = item
57
            _receive_value = True
58
59
        async def _cancel_consumer():
60
            nonlocal _consumer_task
61
            if _consumer_task:
62
                await _consumer_task.cancel()
63
                _consumer_task = None
64
65
        async def _on_completed():
66
            nonlocal _consumer_task
67
            await _cancel_consumer()
68
            await an_observer.on_completed()
69
70
        async def _on_error(err: Any):
71
            nonlocal _consumer_task
72
            await _cancel_consumer()
73
            await an_observer.on_error(err=err)
74
75
        async def _subscribe():
76
            nonlocal _consumer_task, _subscription
77
            await _cancel_consumer()
78
            if _subscription:
79
                await _subscription()
80
                _subscription = None
81
82
        _consumer_task = await curio.spawn(consumer())
83
84
        _subscription = await observable.subscribe(rx_observer(on_next=_on_next, on_error=_on_error, on_completed=_on_completed))
85
86
        return _subscribe
87
88
    return rx_create(subscribe=_subscribe, max_observer=1)
89