async_rx.observable.rx_take   A
last analyzed

Complexity

Total Complexity 5

Size/Duplication

Total Lines 53
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 5
eloc 26
dl 0
loc 53
rs 10
c 0
b 0
f 0

1 Function

Rating   Name   Duplication   Size   Complexity  
B rx_take() 0 44 5
1
from typing import Any, Optional
2
3
from ..protocol import Observable, Observer, Subscription, rx_observer_from
4
from .rx_create import rx_create
5
6
__all__ = ["rx_take"]
7
8
9
def rx_take(observable: Observable, count: int) -> Observable:
10
    """Create an observable which take only first #count event maximum (could be less).
11
12
    Args:
13
        observable (Observable): observable source
14
        count (int): #items to take
15
16
    Returns:
17
        (Observable): observable instance
18
19
    Raise:
20
        (RuntimeError): if count <= 0
21
22
    """
23
    if count <= 0:
24
        raise RuntimeError('count must be greather than zero')
25
26
    async def _subscribe(an_observer: Observer) -> Subscription:
27
28
        _count: int = 0
29
        _subscription: Optional[Subscription] = None
30
31
        async def _unsubscribe():
32
            nonlocal _subscription
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable _subscription does not seem to be defined.
Loading history...
33
34
            if _subscription:
35
                await _subscription()
36
                _subscription = None
37
38
        async def _on_next(item: Any):
39
            nonlocal _count, _subscription
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable _count does not seem to be defined.
Loading history...
40
41
            if _count < count:
42
                _count += 1
43
                await an_observer.on_next(item)
44
45
            if _count == count:
46
                await an_observer.on_completed()
47
48
        _subscription = await observable.subscribe(an_observer=rx_observer_from(observer=an_observer, on_next=_on_next))
49
50
        return _unsubscribe
51
52
    return rx_create(subscribe=_subscribe)
53