rx_repeat_series()   B
last analyzed

Complexity

Conditions 8

Size

Total Lines 51
Code Lines 28

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 28
dl 0
loc 51
rs 7.3333
c 0
b 0
f 0
cc 8
nop 2

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
from typing import Any, Optional
2
3
import curio
4
5
from ..protocol import Observable, Observer, Subscription
6
from .rx_create import rx_create
7
8
__all__ = ["rx_repeat_series"]
9
10
11
def rx_repeat_series(source: Any, ratio: Optional[float] = 1.0) -> Observable:
12
    """Repeat a series (delay, value) as an observable for each subscription.
13
14
    Args:
15
        source (Any): iterable or async iterable source of tuple (duration, value)
16
        ratio (Optional[float]): ratio apply on duration (1.0 per default)
17
18
    Returns:
19
        (Observable): an observable
20
21
    Raise:
22
        (RuntimeError): if source is not iterable (sync or async)
23
24
    """
25
    if not hasattr(source, "__iter__") and not hasattr(source, "__aiter__"):
26
        raise RuntimeError("source must be (async/sync) iterable")
27
28
    async def _subscribe(an_observer: Observer) -> Subscription:
29
        _task = None
30
31
        async def _proceed_item(item: Any):
32
            (duration, value) = item
33
            await curio.sleep(duration * ratio)
34
            await an_observer.on_next(value)
35
36
        async def _producer():
37
            nonlocal _task
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable _task does not seem to be defined.
Loading history...
38
            try:
39
                if hasattr(source, "__aiter__"):
40
                    async for item in source:
41
                        await _proceed_item(item)
42
                else:
43
                    for item in source:
44
                        await _proceed_item(item)
45
46
                _task = None  # do not cancel this task if concurrent call to _subscribe occurs
47
                await an_observer.on_completed()
48
            except curio.TaskCancelled:  # pragma: no cover
49
                # it's time to finish
50
                pass
51
52
        _task = await curio.spawn(_producer())
53
54
        async def _subscribe():
55
            nonlocal _task
56
            if _task:  # pragma: no cover
57
                await _task.cancel()
58
59
        return _subscribe
60
61
    return rx_create(subscribe=_subscribe)
62