1
|
|
|
from datetime import timedelta |
2
|
|
|
|
3
|
|
|
import curio |
4
|
|
|
import pytest |
5
|
|
|
|
6
|
|
|
from async_rx import rx_concat, rx_from, rx_repeat, rx_repeat_series, rx_throw |
7
|
|
|
|
8
|
|
|
from ..model import ObserverCounterCollectorWithTime |
9
|
|
|
|
10
|
|
|
|
11
|
|
|
def test_rx_repeat_default(): |
12
|
|
|
with pytest.raises(RuntimeError): |
13
|
|
|
rx_repeat(None, None) |
14
|
|
|
with pytest.raises(RuntimeError): |
15
|
|
|
rx_repeat(rx_from([1, 2]), None) |
16
|
|
|
with pytest.raises(RuntimeError): |
17
|
|
|
rx_repeat(None, lambda a: not a) |
18
|
|
|
|
19
|
|
|
|
20
|
|
|
def test_rx_repeat(kernel): |
21
|
|
|
|
22
|
|
|
seeker = ObserverCounterCollectorWithTime() |
23
|
|
|
sub = kernel.run(rx_repeat(duration=timedelta(seconds=0.2), producer=lambda: True).subscribe(seeker)) |
24
|
|
|
kernel.run(curio.sleep(1.1)) |
25
|
|
|
kernel.run(sub()) |
26
|
|
|
|
27
|
|
|
assert seeker.on_completed_count == 1 |
28
|
|
|
assert seeker.on_next_count == 5 |
29
|
|
|
assert seeker.on_error_count == 0 |
30
|
|
|
assert seeker.get_delta() == [0.2, 0.2, 0.2, 0.2] |
31
|
|
|
|
32
|
|
|
|
33
|
|
|
def test_rx_repeat_async(kernel): |
34
|
|
|
async def _producer(): |
35
|
|
|
return True |
36
|
|
|
|
37
|
|
|
seeker = ObserverCounterCollectorWithTime() |
38
|
|
|
sub = kernel.run(rx_repeat(duration=timedelta(seconds=0.2), producer=_producer).subscribe(seeker)) |
39
|
|
|
kernel.run(curio.sleep(1.1)) |
40
|
|
|
kernel.run(sub()) |
41
|
|
|
|
42
|
|
|
assert seeker.on_completed_count == 1 |
43
|
|
|
assert seeker.on_next_count == 5 |
44
|
|
|
assert seeker.on_error_count == 0 |
45
|
|
|
assert seeker.get_delta() == [0.2, 0.2, 0.2, 0.2] |
46
|
|
|
|