Conditions | 1 |
Total Lines | 31 |
Code Lines | 24 |
Lines | 0 |
Ratio | 0 % |
Changes | 0 |
1 | from async_rx import rx_range, rx_subject_behavior |
||
7 | def test_replay_subject(kernel): |
||
8 | |||
9 | current = None |
||
10 | |||
11 | async def _on_unsub_subscribe(count: int, source: Observer): |
||
12 | nonlocal current |
||
13 | current = count |
||
14 | |||
15 | seeker_a = ObserverCounterCollector() |
||
16 | seeker_b = ObserverCounterCollector() |
||
17 | |||
18 | a_subject = rx_subject_behavior(subject_handler=subject_handler(on_subscribe=_on_unsub_subscribe, on_unsubscribe=_on_unsub_subscribe)) |
||
19 | assert a_subject |
||
20 | |||
21 | # first registration |
||
22 | sub_a = kernel.run(a_subject.subscribe(seeker_a)) |
||
23 | sub_subject = kernel.run(rx_range(start=0, stop=10).subscribe(a_subject)) |
||
24 | assert seeker_a.on_next_count == 10 |
||
25 | assert seeker_a.on_error_count == 0 |
||
26 | assert seeker_a.on_completed_count == 1 |
||
27 | assert current == 1 |
||
28 | # second registration |
||
29 | sub_b = kernel.run(a_subject.subscribe(seeker_b)) |
||
30 | assert seeker_b.on_next_count == 1 # buffer size |
||
31 | assert seeker_b.on_error_count == 0 |
||
32 | assert seeker_b.on_completed_count == 1 |
||
33 | assert current == 2 |
||
34 | kernel.run(sub_a()) |
||
35 | assert current == 1 |
||
36 | kernel.run(sub_b()) |
||
37 | assert current == 0 |
||
38 |