async_rx.observable.rx_window   A
last analyzed

Complexity

Total Complexity 1

Size/Duplication

Total Lines 45
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 1
eloc 16
dl 0
loc 45
rs 10
c 0
b 0
f 0

1 Function

Rating   Name   Duplication   Size   Complexity  
A rx_window() 0 34 1
1
from typing import Any
2
3
from ..protocol import Observable, Observer, Subscription, rx_observer_from
4
from .rx_buffer import rx_buffer
5
from .rx_create import rx_create
6
from .rx_from import rx_from
7
8
__all__ = ["rx_window"]
9
10
11
def rx_window(observable: Observable, buffer_size: int) -> Observable:
12
    """Window operator.
13
14
    Window collect elements from the source sequence and emit them in groups.
15
    Window emits these elements in nested observables.
16
    It will emit a new inner observable when a window opens and will complete the inner observable when the window closes.
17
    Notice that there can be overlap between multiple windows if the next one opens before the last one closes.
18
19
    For example Window with a count of 2 and a skip of 1 will emit the last 2 elements (count 2) for every element (skip 1),
20
    so the sequence -1-2-3-4-| becomes --[12][23][34][4]|.
21
22
    Args:
23
        observable (Observable): the source
24
        buffer_size (int): buffer size
25
26
    Returns:
27
        (Observable): observable instance
28
29
    Raise:
30
        (RuntimeError): if buffer_size <= 0
31
32
    """
33
    _buffer = rx_buffer(observable=observable, buffer_size=buffer_size)
34
35
    async def _subscribe(an_observer: Observer) -> Subscription:
36
        nonlocal _buffer
37
38
        async def _on_next(item: Any) -> None:
39
            await an_observer.on_next(rx_from(item))
40
            return None
41
42
        return await _buffer.subscribe(rx_observer_from(observer=an_observer, on_next=_on_next))
43
44
    return rx_create(subscribe=_subscribe, max_observer=1)
45