async_rx.observable.rx_repeat.rx_repeat()   C
last analyzed

Complexity

Conditions 9

Size

Total Lines 61
Code Lines 31

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 31
dl 0
loc 61
rs 6.6666
c 0
b 0
f 0
cc 9
nop 3

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
from datetime import timedelta
2
from inspect import iscoroutinefunction
3
from typing import Callable, Optional
4
5
from curio import TaskCancelled, spawn, time
6
7
from ..protocol import Observable, Observer, Subscription
8
from .rx_create import rx_create
9
10
__all__ = ["rx_repeat"]
11
12
13
def rx_repeat(duration: timedelta, producer: Callable, initial_delay: Optional[timedelta] = None) -> Observable:
14
    """Repeat data.
15
16
    rx_repeat send data generated by producer function at duration rate until observer
17
    dispose his subscription.
18
19
    Args:
20
        duration (timedelta): duration between each sended item
21
        producer (Callable): producer (asyn/sync) function
22
        initial_delay (Optional[timedelta]): initial delay before produce value (default: None)
23
24
    Returns:
25
        (Observable): observable instance
26
27
    Raise:
28
        (RuntimeError): if no producer or duration are provided
29
30
    """
31
    if not producer or not duration:
32
        raise RuntimeError("producer and duration are mandatory")
33
34
    _is_awaitable = iscoroutinefunction(producer)
35
    _duration = duration.total_seconds()
36
37
    async def _subscribe(an_observer: Observer) -> Subscription:
38
        _task = None
39
40
        async def _producer():
41
            nonlocal _duration, _is_awaitable
42
            try:
43
                # initial delay
44
                if initial_delay:
45
                    await time.sleep(initial_delay.total_seconds())
46
                while True:
47
                    start = await time.clock()
48
                    value = await producer() if _is_awaitable else producer()
49
                    await an_observer.on_next(value)
50
                    duration = await time.clock() - start
51
52
                    # adjust wait time
53
                    time_shift = _duration - duration
54
55
                    if time_shift > 0:
56
                        await time.sleep(time_shift)
57
58
            except TaskCancelled:
59
                # it's time to finish
60
                pass
61
62
        _task = await spawn(_producer())
63
64
        async def _subscribe():
65
            nonlocal _task
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable _task does not seem to be defined.
Loading history...
66
            if _task:
67
                await an_observer.on_completed()
68
                await _task.cancel()
69
                _task = None
70
71
        return _subscribe
72
73
    return rx_create(subscribe=_subscribe)
74