Passed
Push — master ( 5304dc...5eaab6 )
by Guibert
02:41
created

tests.observables.test_rx_buffer   A

Complexity

Total Complexity 5

Size/Duplication

Total Lines 50
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 5
eloc 36
dl 0
loc 50
rs 10
c 0
b 0
f 0
1
import pytest
2
3
from async_rx import Observer, Subscription, rx_buffer, rx_create, rx_empty, rx_from, rx_range
4
from async_rx.protocol import default_subscription
5
6
from ..model import ObserverCounterCollector
7
8
9
def test_rx_buffer_default():
10
    with pytest.raises(RuntimeError):
11
        rx_buffer(observable=rx_empty(), buffer_size=0)
12
    with pytest.raises(RuntimeError):
13
        rx_buffer(observable=rx_empty(), buffer_size=-1)
14
15
16
def test_rx_buffer(kernel):
17
18
    seeker = ObserverCounterCollector()
19
20
    obs = rx_buffer(rx_range(start=1, stop=20), buffer_size=5)
21
    sub_a = kernel.run(obs.subscribe(seeker))
22
    kernel.run(sub_a())
23
24
    assert seeker.on_completed_count == 1
25
    assert seeker.on_error_count == 0
26
    assert seeker.on_next_count == 3
27
    assert seeker.items == [[1, 2, 3, 4, 5], [6, 7, 8, 9, 10], [11, 12, 13, 14, 15]]
28
29
30
def test_rx_buffer_with_error(kernel):
31
    async def _subscribe(an_observer: Observer) -> Subscription:
32
        await an_observer.on_next(1)
33
        await an_observer.on_next(2)
34
        await an_observer.on_next(3)
35
        await an_observer.on_next(4)
36
        await an_observer.on_next(5)
37
        await an_observer.on_error("oups")
38
        return default_subscription
39
40
    seeker = ObserverCounterCollector()
41
42
    obs = rx_buffer(rx_create(subscribe=_subscribe), buffer_size=2)
43
    sub_a = kernel.run(obs.subscribe(seeker))
44
    kernel.run(sub_a())
45
46
    assert seeker.on_completed_count == 0
47
    assert seeker.on_error_count == 1
48
    assert seeker.on_next_count == 2
49
    assert seeker.items == [[1, 2], [3, 4]]
50