Passed
Push — master ( 5304dc...5eaab6 )
by Guibert
02:41
created

tests.subject.test_subject.test_subject()   A

Complexity

Conditions 1

Size

Total Lines 39
Code Lines 28

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 28
dl 0
loc 39
rs 9.208
c 0
b 0
f 0
cc 1
nop 1
1
from async_rx import Observer, rx_range, rx_subject
2
3
from ..model import ObserverCounterCollector
4
5
6
class SubjectHandlerCounter:
7
    def __init__(self):
8
        self.on_subscribe_count = 0
9
        self.on_unsubscribe_count = 0
10
        self.current = None
11
12
    async def on_subscribe(self, count: int, source: Observer) -> None:
13
        self.on_subscribe_count += 1
14
        self.current = count
15
16
    async def on_unsubscribe(self, count: int, source: Observer) -> None:
17
        self.on_unsubscribe_count += 1
18
        self.current = count
19
20
21
def test_subject(kernel):
22
23
    seeker_a = ObserverCounterCollector()
24
    seeker_b = ObserverCounterCollector()
25
    subject_handler = SubjectHandlerCounter()
26
27
    a_subject = rx_subject(subject_handler=subject_handler)
28
    assert a_subject
29
30
    sub_a = kernel.run(a_subject.subscribe(seeker_a))
31
    assert subject_handler.on_subscribe_count == 1
32
    assert subject_handler.current == 1
33
34
    sub_b = kernel.run(a_subject.subscribe(seeker_b))
35
    assert subject_handler.on_subscribe_count == 2
36
    assert subject_handler.current == 2
37
38
    sub_subject = kernel.run(rx_range(start=0, stop=10).subscribe(a_subject))
39
40
    # both observer see the same things
41
    assert seeker_a.on_next_count == seeker_b.on_next_count
42
    assert seeker_a.on_error_count == seeker_b.on_error_count
43
    assert seeker_a.on_completed_count == seeker_b.on_completed_count
44
45
    assert seeker_a.on_next_count == 10
46
    assert seeker_a.on_error_count == 0
47
    assert seeker_a.on_completed_count == 1
48
49
    kernel.run(sub_a())
50
    assert subject_handler.on_unsubscribe_count == 1
51
    assert subject_handler.current == 1
52
    # sub are one shot
53
    kernel.run(sub_a())
54
    assert subject_handler.on_unsubscribe_count == 1
55
    assert subject_handler.current == 1
56
57
    kernel.run(sub_b())
58
    assert subject_handler.on_unsubscribe_count == 2
59
    assert subject_handler.current == 0
60