| Conditions | 1 |
| Total Lines | 31 |
| Code Lines | 24 |
| Lines | 0 |
| Ratio | 0 % |
| Changes | 0 | ||
| 1 | from async_rx import rx_range, rx_subject_behavior |
||
| 7 | def test_replay_subject(kernel): |
||
| 8 | |||
| 9 | current = None |
||
| 10 | |||
| 11 | async def _on_unsub_subscribe(count: int, source: Observer): |
||
| 12 | nonlocal current |
||
| 13 | current = count |
||
| 14 | |||
| 15 | seeker_a = ObserverCounterCollector() |
||
| 16 | seeker_b = ObserverCounterCollector() |
||
| 17 | |||
| 18 | a_subject = rx_subject_behavior(subject_handler=subject_handler(on_subscribe=_on_unsub_subscribe, on_unsubscribe=_on_unsub_subscribe)) |
||
| 19 | assert a_subject |
||
| 20 | |||
| 21 | # first registration |
||
| 22 | sub_a = kernel.run(a_subject.subscribe(seeker_a)) |
||
| 23 | sub_subject = kernel.run(rx_range(start=0, stop=10).subscribe(a_subject)) |
||
| 24 | assert seeker_a.on_next_count == 10 |
||
| 25 | assert seeker_a.on_error_count == 0 |
||
| 26 | assert seeker_a.on_completed_count == 1 |
||
| 27 | assert current == 1 |
||
| 28 | # second registration |
||
| 29 | sub_b = kernel.run(a_subject.subscribe(seeker_b)) |
||
| 30 | assert seeker_b.on_next_count == 1 # buffer size |
||
| 31 | assert seeker_b.on_error_count == 0 |
||
| 32 | assert seeker_b.on_completed_count == 1 |
||
| 33 | assert current == 2 |
||
| 34 | kernel.run(sub_a()) |
||
| 35 | assert current == 1 |
||
| 36 | kernel.run(sub_b()) |
||
| 37 | assert current == 0 |
||
| 38 |