async_rx.observable.rx_repeat_series   A
last analyzed

Complexity

Total Complexity 8

Size/Duplication

Total Lines 62
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 8
eloc 34
dl 0
loc 62
rs 10
c 0
b 0
f 0

1 Function

Rating   Name   Duplication   Size   Complexity  
B rx_repeat_series() 0 51 8
1
from typing import Any, Optional
2
3
import curio
4
5
from ..protocol import Observable, Observer, Subscription
6
from .rx_create import rx_create
7
8
__all__ = ["rx_repeat_series"]
9
10
11
def rx_repeat_series(source: Any, ratio: Optional[float] = 1.0) -> Observable:
12
    """Repeat a series (delay, value) as an observable for each subscription.
13
14
    Args:
15
        source (Any): iterable or async iterable source of tuple (duration, value)
16
        ratio (Optional[float]): ratio apply on duration (1.0 per default)
17
18
    Returns:
19
        (Observable): an observable
20
21
    Raise:
22
        (RuntimeError): if source is not iterable (sync or async)
23
24
    """
25
    if not hasattr(source, "__iter__") and not hasattr(source, "__aiter__"):
26
        raise RuntimeError("source must be (async/sync) iterable")
27
28
    async def _subscribe(an_observer: Observer) -> Subscription:
29
        _task = None
30
31
        async def _proceed_item(item: Any):
32
            (duration, value) = item
33
            await curio.sleep(duration * ratio)
34
            await an_observer.on_next(value)
35
36
        async def _producer():
37
            nonlocal _task
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable _task does not seem to be defined.
Loading history...
38
            try:
39
                if hasattr(source, "__aiter__"):
40
                    async for item in source:
41
                        await _proceed_item(item)
42
                else:
43
                    for item in source:
44
                        await _proceed_item(item)
45
46
                _task = None  # do not cancel this task if concurrent call to _subscribe occurs
47
                await an_observer.on_completed()
48
            except curio.TaskCancelled:  # pragma: no cover
49
                # it's time to finish
50
                pass
51
52
        _task = await curio.spawn(_producer())
53
54
        async def _subscribe():
55
            nonlocal _task
56
            if _task:  # pragma: no cover
57
                await _task.cancel()
58
59
        return _subscribe
60
61
    return rx_create(subscribe=_subscribe)
62