Passed
Push — master ( 5304dc...5eaab6 )
by Guibert
02:41
created

tests.observables.test_rx_defer   A

Complexity

Total Complexity 1

Size/Duplication

Total Lines 35
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 1
eloc 25
dl 0
loc 35
rs 10
c 0
b 0
f 0
1
import pytest
2
3
from async_rx import Observable, Observer, rx_create, rx_defer
4
from async_rx.protocol import default_subscription
5
6
from ..model import ObserverCounter
7
8
9
def test_rx_defer(kernel):
10
    async def _subscribe(an_observer: Observer):
11
        await an_observer.on_next(item=1)
12
        await an_observer.on_completed()
13
        return default_subscription
14
15
    async def _observable_factory():
16
        return rx_create(subscribe=_subscribe, max_observer=1)
17
18
    obs = rx_defer(observable_factory=_observable_factory)
19
20
    seeker = ObserverCounter()
21
22
    assert seeker.on_next_count == 0
23
    assert seeker.on_completed_count == 0
24
    assert seeker.on_error_count == 0
25
    unsub1 = kernel.run(obs.subscribe(seeker))
26
    assert seeker.on_next_count == 1
27
    assert seeker.on_completed_count == 1
28
    assert seeker.on_error_count == 0
29
30
    # even with max_observer=1 we can do that because of rx_defer
31
    unsub2 = kernel.run(obs.subscribe(seeker))
32
    assert seeker.on_next_count == 2
33
    assert seeker.on_completed_count == 2
34
    assert seeker.on_error_count == 0
35