async_rx.observable.rx_delay   A
last analyzed

Complexity

Total Complexity 12

Size/Duplication

Total Lines 97
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 12
eloc 57
dl 0
loc 97
rs 10
c 0
b 0
f 0

1 Function

Rating   Name   Duplication   Size   Complexity  
D rx_delay() 0 85 12
1
from datetime import timedelta
2
from typing import Any, Optional
3
4
import curio
5
6
from ..protocol import Observable, Observer, Subscription, rx_observer
7
from .rx_create import rx_create
8
9
__all__ = ["rx_delay"]
10
11
12
def rx_delay(observable: Observable, duration: timedelta, buffer_size: Optional[int] = None, ignore_events_if_full: Optional[bool] = True) -> Observable:
13
    """Delay operator.
14
15
    Delay will project the sequence unmodified, but shifted into the future with a specified
16
    delay.
17
18
    Underlaying implementation use a queue and a dedicated consumer.
19
20
    Args:
21
        observable (Observable): an observable instance
22
        duration (timedelta): timedelta of delay (the duration).
23
        buffer_size (Optional[int]): optional buffer size, if not specified size is unlimited
24
            (ignore_events_if_full has no meaning, but not your memory...)
25
        ignore_events_if_full (Optional[bool]): When true, if internal buffer (here a queue) is full,
26
            events will be ignored until older will be consumed.
27
            Otherwise, producer will be locked until older will be consumed.
28
29
    Returns:
30
        (Observable): observable instance
31
32
    Raise:
33
        (RuntimeError): if no observable or duration are provided or buffer_size <= 0
34
35
    """
36
    if not observable or not duration:
37
        raise RuntimeError("observable and duration are mandatory")
38
    if buffer_size and buffer_size <= 0:
39
        raise RuntimeError("buffer_size must be greather than zero or None")
40
41
    async def _subscribe(an_observer: Observer) -> Subscription:
42
        _queue = curio.Queue(buffer_size) if buffer_size else curio.Queue()
43
        _consumer_task = None
44
        _subscription: Optional[Subscription] = None
45
        _duration = duration.total_seconds()
46
47
        async def consumer():
48
            nonlocal _queue, _duration
49
            try:
50
                while True:
51
                    item = await _queue.get()  # retreaive an item (lock until one)
52
                    await curio.sleep(_duration)  # add duration delay before send
53
                    await an_observer.on_next(item)
54
                    await _queue.task_done()  # notify that job is done
55
            except curio.TaskCancelled:
56
                # it's time to finish
57
                pass
58
59
        async def _cancel_consumer():
60
            nonlocal _consumer_task
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable _consumer_task does not seem to be defined.
Loading history...
61
            if _consumer_task:
62
                await _consumer_task.cancel()
63
                _consumer_task = None
64
65
        async def _on_next(item: Any):
66
            nonlocal _queue
67
            if ignore_events_if_full and _queue.full():
68
                return
69
            await _queue.put(item)
70
71
        async def _on_completed():
72
            nonlocal _queue, _consumer_task
73
            await _queue.join()  # wait complete processing
74
            await _cancel_consumer()
75
            await an_observer.on_completed()
76
77
        async def _on_error(err: Any):
78
            nonlocal _consumer_task
79
            await curio.sleep(_duration)  # add duration delay on error
80
            await _cancel_consumer()
81
            await an_observer.on_error(err=err)
82
83
        async def _subscribe():
84
            nonlocal _consumer_task, _subscription
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable _subscription does not seem to be defined.
Loading history...
85
            await _cancel_consumer()
86
            if _subscription:
87
                await _subscription()
88
                _subscription = None
89
90
        _consumer_task = await curio.spawn(consumer())
91
92
        _subscription = await observable.subscribe(rx_observer(on_next=_on_next, on_error=_on_error, on_completed=_on_completed))
93
94
        return _subscribe
95
96
    return rx_create(subscribe=_subscribe, max_observer=1)
97