async_rx.observable.rx_skip   A
last analyzed

Complexity

Total Complexity 3

Size/Duplication

Total Lines 41
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 3
eloc 17
dl 0
loc 41
rs 10
c 0
b 0
f 0

1 Function

Rating   Name   Duplication   Size   Complexity  
A rx_skip() 0 32 3
1
from typing import Any
2
3
from ..protocol import Observable, Observer, Subscription, rx_observer_from
4
from .rx_create import rx_create
5
6
__all__ = ["rx_skip"]
7
8
9
def rx_skip(observable: Observable, count: int) -> Observable:
10
    """Create an obervable wich skip #count event on source.
11
12
    Args:
13
        observable (Observable): observable source
14
        count (int): number of event to skip
15
16
    Returns:
17
        (Observable): observable instance
18
19
    Raise:
20
        (RuntimeError): if count <= 0
21
22
    """
23
    if count <= 0:
24
        raise RuntimeError('count must be greather than zero')
25
26
    async def _subscribe(an_observer: Observer) -> Subscription:
27
28
        _count: int = 0
29
30
        async def _on_next(item: Any):
31
            nonlocal _count
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable _count does not seem to be defined.
Loading history...
32
33
            if _count < count:
34
                _count += 1
35
            else:
36
                await an_observer.on_next(item)
37
38
        return await observable.subscribe(an_observer=rx_observer_from(observer=an_observer, on_next=_on_next))
39
40
    return rx_create(subscribe=_subscribe)
41