1
|
|
|
from async_rx import Observer, rx_range, rx_subject |
2
|
|
|
|
3
|
|
|
from ..model import ObserverCounterCollector |
4
|
|
|
|
5
|
|
|
|
6
|
|
|
class SubjectHandlerCounter: |
7
|
|
|
def __init__(self): |
8
|
|
|
self.on_subscribe_count = 0 |
9
|
|
|
self.on_unsubscribe_count = 0 |
10
|
|
|
self.current = None |
11
|
|
|
|
12
|
|
|
async def on_subscribe(self, count: int, source: Observer) -> None: |
13
|
|
|
self.on_subscribe_count += 1 |
14
|
|
|
self.current = count |
15
|
|
|
|
16
|
|
|
async def on_unsubscribe(self, count: int, source: Observer) -> None: |
17
|
|
|
self.on_unsubscribe_count += 1 |
18
|
|
|
self.current = count |
19
|
|
|
|
20
|
|
|
|
21
|
|
|
def test_subject(kernel): |
22
|
|
|
|
23
|
|
|
seeker_a = ObserverCounterCollector() |
24
|
|
|
seeker_b = ObserverCounterCollector() |
25
|
|
|
subject_handler = SubjectHandlerCounter() |
26
|
|
|
|
27
|
|
|
a_subject = rx_subject(subject_handler=subject_handler) |
28
|
|
|
assert a_subject |
29
|
|
|
|
30
|
|
|
sub_a = kernel.run(a_subject.subscribe(seeker_a)) |
31
|
|
|
assert subject_handler.on_subscribe_count == 1 |
32
|
|
|
assert subject_handler.current == 1 |
33
|
|
|
|
34
|
|
|
sub_b = kernel.run(a_subject.subscribe(seeker_b)) |
35
|
|
|
assert subject_handler.on_subscribe_count == 2 |
36
|
|
|
assert subject_handler.current == 2 |
37
|
|
|
|
38
|
|
|
sub_subject = kernel.run(rx_range(start=0, stop=10).subscribe(a_subject)) |
39
|
|
|
|
40
|
|
|
# both observer see the same things |
41
|
|
|
assert seeker_a.on_next_count == seeker_b.on_next_count |
42
|
|
|
assert seeker_a.on_error_count == seeker_b.on_error_count |
43
|
|
|
assert seeker_a.on_completed_count == seeker_b.on_completed_count |
44
|
|
|
|
45
|
|
|
assert seeker_a.on_next_count == 10 |
46
|
|
|
assert seeker_a.on_error_count == 0 |
47
|
|
|
assert seeker_a.on_completed_count == 1 |
48
|
|
|
|
49
|
|
|
kernel.run(sub_a()) |
50
|
|
|
assert subject_handler.on_unsubscribe_count == 1 |
51
|
|
|
assert subject_handler.current == 1 |
52
|
|
|
# sub are one shot |
53
|
|
|
kernel.run(sub_a()) |
54
|
|
|
assert subject_handler.on_unsubscribe_count == 1 |
55
|
|
|
assert subject_handler.current == 1 |
56
|
|
|
|
57
|
|
|
kernel.run(sub_b()) |
58
|
|
|
assert subject_handler.on_unsubscribe_count == 2 |
59
|
|
|
assert subject_handler.current == 0 |
60
|
|
|
|