async_rx.observable.rx_window.rx_window()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 34
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 9
dl 0
loc 34
rs 9.95
c 0
b 0
f 0
cc 1
nop 2
1
from typing import Any
2
3
from ..protocol import Observable, Observer, Subscription, rx_observer_from
4
from .rx_buffer import rx_buffer
5
from .rx_create import rx_create
6
from .rx_from import rx_from
7
8
__all__ = ["rx_window"]
9
10
11
def rx_window(observable: Observable, buffer_size: int) -> Observable:
12
    """Window operator.
13
14
    Window collect elements from the source sequence and emit them in groups.
15
    Window emits these elements in nested observables.
16
    It will emit a new inner observable when a window opens and will complete the inner observable when the window closes.
17
    Notice that there can be overlap between multiple windows if the next one opens before the last one closes.
18
19
    For example Window with a count of 2 and a skip of 1 will emit the last 2 elements (count 2) for every element (skip 1),
20
    so the sequence -1-2-3-4-| becomes --[12][23][34][4]|.
21
22
    Args:
23
        observable (Observable): the source
24
        buffer_size (int): buffer size
25
26
    Returns:
27
        (Observable): observable instance
28
29
    Raise:
30
        (RuntimeError): if buffer_size <= 0
31
32
    """
33
    _buffer = rx_buffer(observable=observable, buffer_size=buffer_size)
34
35
    async def _subscribe(an_observer: Observer) -> Subscription:
36
        nonlocal _buffer
37
38
        async def _on_next(item: Any) -> None:
39
            await an_observer.on_next(rx_from(item))
40
            return None
41
42
        return await _buffer.subscribe(rx_observer_from(observer=an_observer, on_next=_on_next))
43
44
    return rx_create(subscribe=_subscribe, max_observer=1)
45