Passed
Push — master ( 5304dc...5eaab6 )
by Guibert
02:41
created

tests.subject.test_subject_behavior.test_replay_subject()   A

Complexity

Conditions 1

Size

Total Lines 31
Code Lines 24

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 24
dl 0
loc 31
rs 9.304
c 0
b 0
f 0
cc 1
nop 1
1
from async_rx import rx_range, rx_subject_behavior
2
from async_rx.protocol import Observer, subject_handler
3
4
from ..model import ObserverCounterCollector
5
6
7
def test_replay_subject(kernel):
8
9
    current = None
10
11
    async def _on_unsub_subscribe(count: int, source: Observer):
12
        nonlocal current
13
        current = count
14
15
    seeker_a = ObserverCounterCollector()
16
    seeker_b = ObserverCounterCollector()
17
18
    a_subject = rx_subject_behavior(subject_handler=subject_handler(on_subscribe=_on_unsub_subscribe, on_unsubscribe=_on_unsub_subscribe))
19
    assert a_subject
20
21
    # first registration
22
    sub_a = kernel.run(a_subject.subscribe(seeker_a))
23
    sub_subject = kernel.run(rx_range(start=0, stop=10).subscribe(a_subject))
24
    assert seeker_a.on_next_count == 10
25
    assert seeker_a.on_error_count == 0
26
    assert seeker_a.on_completed_count == 1
27
    assert current == 1
28
    # second registration
29
    sub_b = kernel.run(a_subject.subscribe(seeker_b))
30
    assert seeker_b.on_next_count == 1  # buffer size
31
    assert seeker_b.on_error_count == 0
32
    assert seeker_b.on_completed_count == 1
33
    assert current == 2
34
    kernel.run(sub_a())
35
    assert current == 1
36
    kernel.run(sub_b())
37
    assert current == 0
38