| Total Complexity | 1 |
| Total Lines | 35 |
| Duplicated Lines | 0 % |
| Changes | 0 | ||
| 1 | import pytest |
||
| 2 | |||
| 3 | from async_rx import Observable, Observer, rx_create, rx_defer |
||
| 4 | from async_rx.protocol import default_subscription |
||
| 5 | |||
| 6 | from ..model import ObserverCounter |
||
| 7 | |||
| 8 | |||
| 9 | def test_rx_defer(kernel): |
||
| 10 | async def _subscribe(an_observer: Observer): |
||
| 11 | await an_observer.on_next(item=1) |
||
| 12 | await an_observer.on_completed() |
||
| 13 | return default_subscription |
||
| 14 | |||
| 15 | async def _observable_factory(): |
||
| 16 | return rx_create(subscribe=_subscribe, max_observer=1) |
||
| 17 | |||
| 18 | obs = rx_defer(observable_factory=_observable_factory) |
||
| 19 | |||
| 20 | seeker = ObserverCounter() |
||
| 21 | |||
| 22 | assert seeker.on_next_count == 0 |
||
| 23 | assert seeker.on_completed_count == 0 |
||
| 24 | assert seeker.on_error_count == 0 |
||
| 25 | unsub1 = kernel.run(obs.subscribe(seeker)) |
||
| 26 | assert seeker.on_next_count == 1 |
||
| 27 | assert seeker.on_completed_count == 1 |
||
| 28 | assert seeker.on_error_count == 0 |
||
| 29 | |||
| 30 | # even with max_observer=1 we can do that because of rx_defer |
||
| 31 | unsub2 = kernel.run(obs.subscribe(seeker)) |
||
| 32 | assert seeker.on_next_count == 2 |
||
| 33 | assert seeker.on_completed_count == 2 |
||
| 34 | assert seeker.on_error_count == 0 |
||
| 35 |