1
|
|
|
from datetime import timedelta |
2
|
|
|
from inspect import iscoroutinefunction |
3
|
|
|
from typing import Callable, Optional |
4
|
|
|
|
5
|
|
|
from curio import TaskCancelled, spawn, time |
6
|
|
|
|
7
|
|
|
from ..protocol import Observable, Observer, Subscription |
8
|
|
|
from .rx_create import rx_create |
9
|
|
|
|
10
|
|
|
__all__ = ["rx_repeat"] |
11
|
|
|
|
12
|
|
|
|
13
|
|
|
def rx_repeat(duration: timedelta, producer: Callable, initial_delay: Optional[timedelta] = None) -> Observable: |
14
|
|
|
"""Repeat data. |
15
|
|
|
|
16
|
|
|
rx_repeat send data generated by producer function at duration rate until observer |
17
|
|
|
dispose his subscription. |
18
|
|
|
|
19
|
|
|
Args: |
20
|
|
|
duration (timedelta): duration between each sended item |
21
|
|
|
producer (Callable): producer (asyn/sync) function |
22
|
|
|
initial_delay (Optional[timedelta]): initial delay before produce value (default: None) |
23
|
|
|
|
24
|
|
|
Returns: |
25
|
|
|
(Observable): observable instance |
26
|
|
|
|
27
|
|
|
Raise: |
28
|
|
|
(RuntimeError): if no producer or duration are provided |
29
|
|
|
|
30
|
|
|
""" |
31
|
|
|
if not producer or not duration: |
32
|
|
|
raise RuntimeError("producer and duration are mandatory") |
33
|
|
|
|
34
|
|
|
_is_awaitable = iscoroutinefunction(producer) |
35
|
|
|
_duration = duration.total_seconds() |
36
|
|
|
|
37
|
|
|
async def _subscribe(an_observer: Observer) -> Subscription: |
38
|
|
|
_task = None |
39
|
|
|
|
40
|
|
|
async def _producer(): |
41
|
|
|
nonlocal _duration, _is_awaitable |
42
|
|
|
try: |
43
|
|
|
# initial delay |
44
|
|
|
if initial_delay: |
45
|
|
|
await time.sleep(initial_delay.total_seconds()) |
46
|
|
|
while True: |
47
|
|
|
start = await time.clock() |
48
|
|
|
value = await producer() if _is_awaitable else producer() |
49
|
|
|
await an_observer.on_next(value) |
50
|
|
|
duration = await time.clock() - start |
51
|
|
|
|
52
|
|
|
# adjust wait time |
53
|
|
|
time_shift = _duration - duration |
54
|
|
|
|
55
|
|
|
if time_shift > 0: |
56
|
|
|
await time.sleep(time_shift) |
57
|
|
|
|
58
|
|
|
except TaskCancelled: |
59
|
|
|
# it's time to finish |
60
|
|
|
pass |
61
|
|
|
|
62
|
|
|
_task = await spawn(_producer()) |
63
|
|
|
|
64
|
|
|
async def _subscribe(): |
65
|
|
|
nonlocal _task |
|
|
|
|
66
|
|
|
if _task: |
67
|
|
|
await an_observer.on_completed() |
68
|
|
|
await _task.cancel() |
69
|
|
|
_task = None |
70
|
|
|
|
71
|
|
|
return _subscribe |
72
|
|
|
|
73
|
|
|
return rx_create(subscribe=_subscribe) |
74
|
|
|
|