@@ 12-89 (lines=78) @@ | ||
9 | __all__ = ["rx_debounce"] |
|
10 | ||
11 | ||
12 | def rx_debounce(an_observable: Observable, duration: timedelta) -> Observable: |
|
13 | """Debounce operator. |
|
14 | ||
15 | Debounce are used to rate-limit the sequence. |
|
16 | Debounce will delay a value when it arrives and only emits the last value in a burst of events |
|
17 | after the set delay is over and no new event arrives during this delay. |
|
18 | ||
19 | Args: |
|
20 | an_observable (Observable): an observable instance |
|
21 | duration (timedelta): timedelta of interval (the duration) |
|
22 | ||
23 | Returns: |
|
24 | (Observable): observable instance |
|
25 | ||
26 | Raise: |
|
27 | (RuntimeError): if no observable or duration are provided |
|
28 | ||
29 | """ |
|
30 | if not an_observable or not duration: |
|
31 | raise RuntimeError("observable and duration are mandatory") |
|
32 | ||
33 | async def _subscribe(an_observer: Observer) -> Subscription: |
|
34 | ||
35 | _lastest_value_time = None |
|
36 | _lastest_value = None |
|
37 | _consumer_task = None |
|
38 | _subscription: Optional[Subscription] = None |
|
39 | _sleep_duration = duration.total_seconds() |
|
40 | ||
41 | async def consumer(): |
|
42 | nonlocal _sleep_duration, _lastest_value, _lastest_value_time |
|
43 | try: |
|
44 | while True: |
|
45 | await curio.sleep(_sleep_duration) # add duration delay before process a new one |
|
46 | ||
47 | if _lastest_value_time and (_lastest_value_time + duration <= datetime.utcnow()): # no value between time delta |
|
48 | await an_observer.on_next(_lastest_value) |
|
49 | _lastest_value_time = None |
|
50 | ||
51 | except curio.TaskCancelled: |
|
52 | # it's time to finish |
|
53 | pass |
|
54 | ||
55 | async def _on_next(item: Any): |
|
56 | nonlocal _lastest_value, _lastest_value_time |
|
57 | _lastest_value = item |
|
58 | _lastest_value_time = datetime.utcnow() |
|
59 | ||
60 | async def _cancel_consumer(): |
|
61 | nonlocal _consumer_task |
|
62 | if _consumer_task: |
|
63 | await _consumer_task.cancel() |
|
64 | _consumer_task = None |
|
65 | ||
66 | async def _on_completed(): |
|
67 | nonlocal _consumer_task |
|
68 | await _cancel_consumer() |
|
69 | await an_observer.on_completed() |
|
70 | ||
71 | async def _on_error(err: Any): |
|
72 | nonlocal _consumer_task |
|
73 | await _cancel_consumer() |
|
74 | await an_observer.on_error(err=err) |
|
75 | ||
76 | async def _subscribe(): |
|
77 | nonlocal _consumer_task, _subscription |
|
78 | await _cancel_consumer() |
|
79 | if _subscription: |
|
80 | await _subscription() |
|
81 | _subscription = None |
|
82 | ||
83 | _consumer_task = await curio.spawn(consumer()) |
|
84 | ||
85 | _subscription = await an_observable.subscribe(rx_observer(on_next=_on_next, on_error=_on_error, on_completed=_on_completed)) |
|
86 | ||
87 | return _subscribe |
|
88 | ||
89 | return rx_create(subscribe=_subscribe, max_observer=1) |
|
90 |
@@ 12-88 (lines=77) @@ | ||
9 | __all__ = ["rx_sample"] |
|
10 | ||
11 | ||
12 | def rx_sample(observable: Observable, duration: timedelta) -> Observable: |
|
13 | """Sample operator used to rate-limit the sequence. |
|
14 | ||
15 | Sample filter out elements based on the timing. |
|
16 | Sample will emit the LATEST value on a set interval or emit nothing if no new value arrived during the last interval. |
|
17 | ||
18 | ||
19 | Args: |
|
20 | observable (Observable): an observable instance |
|
21 | duration (timedelta): timedelta of interval (the duration) |
|
22 | ||
23 | Returns: |
|
24 | (Observable): observable instance |
|
25 | ||
26 | Raise: |
|
27 | (RuntimeError): if no observable or duration are provided |
|
28 | ||
29 | """ |
|
30 | if not observable or not duration: |
|
31 | raise RuntimeError("observable and duration are mandatory") |
|
32 | ||
33 | async def _subscribe(an_observer: Observer) -> Subscription: |
|
34 | ||
35 | _receive_value = False |
|
36 | _lastest_value = None |
|
37 | _consumer_task = None |
|
38 | _subscription: Optional[Subscription] = None |
|
39 | _duration = duration.total_seconds() |
|
40 | ||
41 | async def consumer(): |
|
42 | nonlocal _duration, _lastest_value, _receive_value |
|
43 | try: |
|
44 | while True: |
|
45 | await curio.sleep(_duration) # add duration delay before process a new one |
|
46 | if _receive_value: |
|
47 | await an_observer.on_next(_lastest_value) |
|
48 | _receive_value = False |
|
49 | ||
50 | except curio.TaskCancelled: |
|
51 | # it's time to finish |
|
52 | pass |
|
53 | ||
54 | async def _on_next(item: Any): |
|
55 | nonlocal _lastest_value, _receive_value |
|
56 | _lastest_value = item |
|
57 | _receive_value = True |
|
58 | ||
59 | async def _cancel_consumer(): |
|
60 | nonlocal _consumer_task |
|
61 | if _consumer_task: |
|
62 | await _consumer_task.cancel() |
|
63 | _consumer_task = None |
|
64 | ||
65 | async def _on_completed(): |
|
66 | nonlocal _consumer_task |
|
67 | await _cancel_consumer() |
|
68 | await an_observer.on_completed() |
|
69 | ||
70 | async def _on_error(err: Any): |
|
71 | nonlocal _consumer_task |
|
72 | await _cancel_consumer() |
|
73 | await an_observer.on_error(err=err) |
|
74 | ||
75 | async def _subscribe(): |
|
76 | nonlocal _consumer_task, _subscription |
|
77 | await _cancel_consumer() |
|
78 | if _subscription: |
|
79 | await _subscription() |
|
80 | _subscription = None |
|
81 | ||
82 | _consumer_task = await curio.spawn(consumer()) |
|
83 | ||
84 | _subscription = await observable.subscribe(rx_observer(on_next=_on_next, on_error=_on_error, on_completed=_on_completed)) |
|
85 | ||
86 | return _subscribe |
|
87 | ||
88 | return rx_create(subscribe=_subscribe, max_observer=1) |
|
89 |