async_rx.observable.rx_sample.rx_sample()   C
last analyzed

Complexity

Conditions 8

Size

Total Lines 77
Code Lines 46

Duplication

Lines 77
Ratio 100 %

Importance

Changes 0
Metric Value
eloc 46
dl 77
loc 77
rs 6.9006
c 0
b 0
f 0
cc 8
nop 2

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
from datetime import timedelta
2
from typing import Any, Optional
3
4
import curio
5
6
from ..protocol import Observable, Observer, Subscription, rx_observer
7
from .rx_create import rx_create
8
9
__all__ = ["rx_sample"]
10
11
12 View Code Duplication
def rx_sample(observable: Observable, duration: timedelta) -> Observable:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
13
    """Sample operator used to rate-limit the sequence.
14
15
    Sample filter out elements based on the timing.
16
    Sample will emit the LATEST value on a set interval or emit nothing if no new value arrived during the last interval.
17
18
19
    Args:
20
        observable (Observable): an observable instance
21
        duration (timedelta): timedelta of interval (the duration)
22
23
    Returns:
24
        (Observable): observable instance
25
26
    Raise:
27
        (RuntimeError): if no observable or duration are provided
28
29
    """
30
    if not observable or not duration:
31
        raise RuntimeError("observable and duration are mandatory")
32
33
    async def _subscribe(an_observer: Observer) -> Subscription:
34
35
        _receive_value = False
36
        _lastest_value = None
37
        _consumer_task = None
38
        _subscription: Optional[Subscription] = None
39
        _duration = duration.total_seconds()
40
41
        async def consumer():
42
            nonlocal _duration, _lastest_value, _receive_value
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable _receive_value does not seem to be defined.
Loading history...
43
            try:
44
                while True:
45
                    await curio.sleep(_duration)  # add duration delay before process a new one
46
                    if _receive_value:
47
                        await an_observer.on_next(_lastest_value)
48
                        _receive_value = False
49
50
            except curio.TaskCancelled:
51
                # it's time to finish
52
                pass
53
54
        async def _on_next(item: Any):
55
            nonlocal _lastest_value, _receive_value
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable _lastest_value does not seem to be defined.
Loading history...
Comprehensibility Best Practice introduced by
The variable _receive_value does not seem to be defined.
Loading history...
56
            _lastest_value = item
57
            _receive_value = True
58
59
        async def _cancel_consumer():
60
            nonlocal _consumer_task
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable _consumer_task does not seem to be defined.
Loading history...
61
            if _consumer_task:
62
                await _consumer_task.cancel()
63
                _consumer_task = None
64
65
        async def _on_completed():
66
            nonlocal _consumer_task
67
            await _cancel_consumer()
68
            await an_observer.on_completed()
69
70
        async def _on_error(err: Any):
71
            nonlocal _consumer_task
72
            await _cancel_consumer()
73
            await an_observer.on_error(err=err)
74
75
        async def _subscribe():
76
            nonlocal _consumer_task, _subscription
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable _subscription does not seem to be defined.
Loading history...
77
            await _cancel_consumer()
78
            if _subscription:
79
                await _subscription()
80
                _subscription = None
81
82
        _consumer_task = await curio.spawn(consumer())
83
84
        _subscription = await observable.subscribe(rx_observer(on_next=_on_next, on_error=_on_error, on_completed=_on_completed))
85
86
        return _subscribe
87
88
    return rx_create(subscribe=_subscribe, max_observer=1)
89