async_rx.observable.rx_delay.rx_delay()   D
last analyzed

Complexity

Conditions 12

Size

Total Lines 85
Code Lines 50

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 50
dl 0
loc 85
rs 4.8
c 0
b 0
f 0
cc 12
nop 4

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like async_rx.observable.rx_delay.rx_delay() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
from datetime import timedelta
2
from typing import Any, Optional
3
4
import curio
5
6
from ..protocol import Observable, Observer, Subscription, rx_observer
7
from .rx_create import rx_create
8
9
__all__ = ["rx_delay"]
10
11
12
def rx_delay(observable: Observable, duration: timedelta, buffer_size: Optional[int] = None, ignore_events_if_full: Optional[bool] = True) -> Observable:
13
    """Delay operator.
14
15
    Delay will project the sequence unmodified, but shifted into the future with a specified
16
    delay.
17
18
    Underlaying implementation use a queue and a dedicated consumer.
19
20
    Args:
21
        observable (Observable): an observable instance
22
        duration (timedelta): timedelta of delay (the duration).
23
        buffer_size (Optional[int]): optional buffer size, if not specified size is unlimited
24
            (ignore_events_if_full has no meaning, but not your memory...)
25
        ignore_events_if_full (Optional[bool]): When true, if internal buffer (here a queue) is full,
26
            events will be ignored until older will be consumed.
27
            Otherwise, producer will be locked until older will be consumed.
28
29
    Returns:
30
        (Observable): observable instance
31
32
    Raise:
33
        (RuntimeError): if no observable or duration are provided or buffer_size <= 0
34
35
    """
36
    if not observable or not duration:
37
        raise RuntimeError("observable and duration are mandatory")
38
    if buffer_size and buffer_size <= 0:
39
        raise RuntimeError("buffer_size must be greather than zero or None")
40
41
    async def _subscribe(an_observer: Observer) -> Subscription:
42
        _queue = curio.Queue(buffer_size) if buffer_size else curio.Queue()
43
        _consumer_task = None
44
        _subscription: Optional[Subscription] = None
45
        _duration = duration.total_seconds()
46
47
        async def consumer():
48
            nonlocal _queue, _duration
49
            try:
50
                while True:
51
                    item = await _queue.get()  # retreaive an item (lock until one)
52
                    await curio.sleep(_duration)  # add duration delay before send
53
                    await an_observer.on_next(item)
54
                    await _queue.task_done()  # notify that job is done
55
            except curio.TaskCancelled:
56
                # it's time to finish
57
                pass
58
59
        async def _cancel_consumer():
60
            nonlocal _consumer_task
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable _consumer_task does not seem to be defined.
Loading history...
61
            if _consumer_task:
62
                await _consumer_task.cancel()
63
                _consumer_task = None
64
65
        async def _on_next(item: Any):
66
            nonlocal _queue
67
            if ignore_events_if_full and _queue.full():
68
                return
69
            await _queue.put(item)
70
71
        async def _on_completed():
72
            nonlocal _queue, _consumer_task
73
            await _queue.join()  # wait complete processing
74
            await _cancel_consumer()
75
            await an_observer.on_completed()
76
77
        async def _on_error(err: Any):
78
            nonlocal _consumer_task
79
            await curio.sleep(_duration)  # add duration delay on error
80
            await _cancel_consumer()
81
            await an_observer.on_error(err=err)
82
83
        async def _subscribe():
84
            nonlocal _consumer_task, _subscription
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable _subscription does not seem to be defined.
Loading history...
85
            await _cancel_consumer()
86
            if _subscription:
87
                await _subscription()
88
                _subscription = None
89
90
        _consumer_task = await curio.spawn(consumer())
91
92
        _subscription = await observable.subscribe(rx_observer(on_next=_on_next, on_error=_on_error, on_completed=_on_completed))
93
94
        return _subscribe
95
96
    return rx_create(subscribe=_subscribe, max_observer=1)
97