async_rx.observable.rx_skip.rx_skip()   A
last analyzed

Complexity

Conditions 3

Size

Total Lines 32
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 12
dl 0
loc 32
rs 9.8
c 0
b 0
f 0
cc 3
nop 2
1
from typing import Any
2
3
from ..protocol import Observable, Observer, Subscription, rx_observer_from
4
from .rx_create import rx_create
5
6
__all__ = ["rx_skip"]
7
8
9
def rx_skip(observable: Observable, count: int) -> Observable:
10
    """Create an obervable wich skip #count event on source.
11
12
    Args:
13
        observable (Observable): observable source
14
        count (int): number of event to skip
15
16
    Returns:
17
        (Observable): observable instance
18
19
    Raise:
20
        (RuntimeError): if count <= 0
21
22
    """
23
    if count <= 0:
24
        raise RuntimeError('count must be greather than zero')
25
26
    async def _subscribe(an_observer: Observer) -> Subscription:
27
28
        _count: int = 0
29
30
        async def _on_next(item: Any):
31
            nonlocal _count
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable _count does not seem to be defined.
Loading history...
32
33
            if _count < count:
34
                _count += 1
35
            else:
36
                await an_observer.on_next(item)
37
38
        return await observable.subscribe(an_observer=rx_observer_from(observer=an_observer, on_next=_on_next))
39
40
    return rx_create(subscribe=_subscribe)
41