Passed
Push — master ( 5304dc...5eaab6 )
by Guibert
02:41
created

tests.subject.test_replay_subject   A

Complexity

Total Complexity 5

Size/Duplication

Total Lines 68
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 5
eloc 47
dl 0
loc 68
rs 10
c 0
b 0
f 0
1
import pytest
2
3
from async_rx import Observer, Subscription, rx_create, rx_range, rx_subject_replay
4
from async_rx.protocol import default_subscription
5
6
from ..model import ObserverCounterCollector
7
8
9
def test_replay_subject_default():
10
    with pytest.raises(RuntimeError):
11
        rx_subject_replay(0)
12
    with pytest.raises(RuntimeError):
13
        rx_subject_replay(-1)
14
15
16
def test_replay_subject(kernel):
17
18
    seeker_a = ObserverCounterCollector()
19
    seeker_b = ObserverCounterCollector()
20
21
    a_subject = rx_subject_replay(buffer_size=4)
22
    assert a_subject
23
24
    # first registration
25
    sub_a = kernel.run(a_subject.subscribe(seeker_a))
26
    sub_subject = kernel.run(rx_range(start=0, stop=10).subscribe(a_subject))
27
    assert seeker_a.on_next_count == 10
28
    assert seeker_a.on_error_count == 0
29
    assert seeker_a.on_completed_count == 1
30
31
    # second registration
32
    sub_b = kernel.run(a_subject.subscribe(seeker_b))
33
    assert seeker_b.on_next_count == 4  # buffer size
34
    assert seeker_b.on_error_count == 0
35
    assert seeker_b.on_completed_count == 1
36
37
    kernel.run(sub_a())
38
    kernel.run(sub_b())
39
40
41
def test_replay_subject_with_error(kernel):
42
    async def _subscribe(an_observer: Observer) -> Subscription:
43
        await an_observer.on_next("A")
44
        await an_observer.on_error("Args")
45
        return default_subscription
46
47
    seeker_a = ObserverCounterCollector()
48
    seeker_b = ObserverCounterCollector()
49
    a_subject = rx_subject_replay(buffer_size=4)
50
51
    sub_a = kernel.run(a_subject.subscribe(seeker_a))
52
53
    sub_subject = kernel.run(rx_create(subscribe=_subscribe).subscribe(a_subject))
54
55
    assert seeker_a.on_next_count == 1
56
    assert seeker_a.on_error_count == 1
57
    assert seeker_a.on_completed_count == 0
58
    assert seeker_a.items == ["A"]
59
60
    sub_b = kernel.run(a_subject.subscribe(seeker_b))
61
    assert seeker_b.on_next_count == 1
62
    assert seeker_b.on_error_count == 1
63
    assert seeker_b.on_completed_count == 0
64
    assert seeker_b.items == ["A"]
65
66
    kernel.run(sub_a())
67
    kernel.run(sub_b())
68