Passed
Push — master ( 5304dc...5eaab6 )
by Guibert
02:41
created

tests.multicast.test_publish.test_multicast()   A

Complexity

Conditions 1

Size

Total Lines 37
Code Lines 26

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 26
dl 0
loc 37
rs 9.256
c 0
b 0
f 0
cc 1
nop 1
1
from async_rx import Observer, rx_publish, rx_range, rx_subject
2
3
from ..model import ObserverCounterCollector
4
5
6
class ConnectableObservableCounter:
7
    def __init__(self):
8
        self.connected = False
9
10
    async def on_connect(self) -> None:
11
        self.connected = True
12
13
    async def on_disconnect(self) -> None:
14
        self.connected = False
15
16
17
class SubjectHandlerCounter:
18
    def __init__(self):
19
        self.on_subscribe_count = 0
20
        self.on_unsubscribe_count = 0
21
        self.current = None
22
23
    async def on_subscribe(self, count: int, source: Observer) -> None:
24
        self.on_subscribe_count += 1
25
        self.current = count
26
27
    async def on_unsubscribe(self, count: int, source: Observer) -> None:
28
        self.on_unsubscribe_count += 1
29
        self.current = count
30
31
32
def test_multicast(kernel):
33
34
    seeker_a = ObserverCounterCollector()
35
    seeker_b = ObserverCounterCollector()
36
    subject_handler = SubjectHandlerCounter()
37
    connection_handler = ConnectableObservableCounter()
38
39
    a_multicast = rx_publish(an_observable=rx_range(start=0, stop=100), subject_handler=subject_handler, connection_handler=connection_handler)
40
    assert a_multicast
41
    assert not connection_handler.connected
42
43
    # subscribe
44
    sub_a = kernel.run(a_multicast.subscribe(seeker_a))
45
    assert subject_handler.on_subscribe_count == 1
46
    assert subject_handler.current == 1
47
    assert not connection_handler.connected
48
49
    sub_b = kernel.run(a_multicast.subscribe(seeker_b))
50
    assert subject_handler.on_subscribe_count == 2
51
    assert subject_handler.current == 2
52
    assert not connection_handler.connected
53
54
    kernel.run(a_multicast.connect())
55
    assert connection_handler.connected  # with no ref_count, did not connect automatically
56
57
    # two call on connect did not matter
58
    kernel.run(a_multicast.connect())
59
    assert connection_handler.connected  # with no ref_count, did not connect automatically
60
61
    # both observer see the same things
62
    assert seeker_a.on_next_count == seeker_b.on_next_count
63
    assert seeker_a.on_error_count == seeker_b.on_error_count
64
    assert seeker_a.on_completed_count == seeker_b.on_completed_count
65
66
    assert seeker_a.on_next_count == 100
67
    assert seeker_a.on_error_count == 0
68
    assert seeker_a.on_completed_count == 1
69
70
71
def test_multicast_with_ref_count(kernel):
72
73
    seeker_a = ObserverCounterCollector()
74
    seeker_b = ObserverCounterCollector()
75
    subject_handler = SubjectHandlerCounter()
76
    connection_handler = ConnectableObservableCounter()
77
78
    a_multicast = kernel.run(
79
        rx_publish(an_observable=rx_range(start=0, stop=100), subject_handler=subject_handler, connection_handler=connection_handler).ref_count()
80
    )
81
    assert a_multicast
82
    assert not connection_handler.connected
83
84
    # subscribe
85
    sub_a = kernel.run(a_multicast.subscribe(seeker_a))
86
    assert subject_handler.on_subscribe_count == 1
87
    assert subject_handler.current == 1
88
89
    assert connection_handler.connected  # autoconnect
90
91
    assert seeker_a.on_next_count == 100
92
    assert seeker_a.on_error_count == 0
93
    assert seeker_a.on_completed_count == 1
94
95
    kernel.run(sub_a())
96
    assert not connection_handler.connected  # auto disconnect
97
98
99
def test_multicast_with_ref_count_on_subject(kernel):
100
101
    seeker_a = ObserverCounterCollector()
102
    seeker_b = ObserverCounterCollector()
103
    subject_handler = SubjectHandlerCounter()
104
    connection_handler = ConnectableObservableCounter()
105
106
    a_subject = rx_subject()
107
108
    a_multicast = kernel.run(rx_publish(an_observable=a_subject, subject_handler=subject_handler, connection_handler=connection_handler).ref_count())
109
    assert a_multicast
110
    assert not connection_handler.connected
111
112
    # subscribe
113
    sub_a = kernel.run(a_multicast.subscribe(seeker_a))
114
    assert subject_handler.on_subscribe_count == 1
115
    assert subject_handler.current == 1
116
117
    assert connection_handler.connected  # autoconnect
118
    # no item in subject
119
    assert seeker_a.on_next_count == 0
120
    assert seeker_a.on_error_count == 0
121
    assert seeker_a.on_completed_count == 0
122
123
    kernel.run(a_subject.on_next(item="one"))  # send "one" item
124
    assert seeker_a.on_next_count == 1
125
    assert seeker_a.on_error_count == 0
126
    assert seeker_a.on_completed_count == 0
127
128
    kernel.run(a_subject.on_next(item="two"))  # send "two" item
129
    assert seeker_a.on_next_count == 2
130
    assert seeker_a.on_error_count == 0
131
    assert seeker_a.on_completed_count == 0
132
133
    kernel.run(a_subject.on_error(err="oups"))  # send error
134
    assert seeker_a.on_next_count == 2
135
    assert seeker_a.on_error_count == 1
136
    assert seeker_a.on_completed_count == 0
137
138
    kernel.run(a_subject.on_completed())  # ensure contract
139
    assert seeker_a.on_next_count == 2
140
    assert seeker_a.on_error_count == 1
141
    assert seeker_a.on_completed_count == 0
142
143
    kernel.run(sub_a())
144
    assert not connection_handler.connected  # auto disconnect
145